diff --git a/.travis.yml b/.travis.yml index 8718f66168f854b8858c7315be2ce6ebf49a94cd..5af14fdec701f63bc7a26607c01e7c943dadc942 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,7 +10,7 @@ before_script: - chmod +x ./cc-test-reporter - ./cc-test-reporter before-build - pip install -r requirements.txt - - pip install -U pytest-xdist pytest-cov pytest pytest-forked pluggy py + - pip install -U pytest-xdist pytest-cov pytest pytest-forked pluggy py opentracing - python setup.py install jobs: include: diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ef665fe397f9cbbf56ae0443a58f9e4bcfa6ef5..ce61340830949e93029725c7dcab5bb7c1e0335f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1 +1,3 @@ # v2.18.10 + +* added parallel_construct diff --git a/docs/coding/concurrent.rst b/docs/coding/concurrent.rst index e36c87411e1cb4e35fd7a5b15cbfe0860894e98c..ebb65b89b4bdfeacc25f3394606b03f13599c024 100644 --- a/docs/coding/concurrent.rst +++ b/docs/coding/concurrent.rst @@ -14,6 +14,11 @@ CallNoOftenThan .. autoclass:: satella.coding.concurrent.CallNoOftenThan :members: +parallel_construct +------------------ + +.. autofunction:: satella.coding.concurrent.parallel_construct + CancellableCallback ------------------- diff --git a/satella/__init__.py b/satella/__init__.py index 8f93d0014a83e58f0d15b54be92a984ac7695df2..717ae9b8fa9d03483d0e46fea99a2134055695f5 100644 --- a/satella/__init__.py +++ b/satella/__init__.py @@ -1,2 +1,2 @@ -__version__ = '2.18.10a1' +__version__ = '2.18.10rc1' diff --git a/satella/coding/concurrent/__init__.py b/satella/coding/concurrent/__init__.py index 71b96832138e72ddcdd228fd8a032f9c3708be68..7f4840a98b144c6887b1fa60904253b369256c7d 100644 --- a/satella/coding/concurrent/__init__.py +++ b/satella/coding/concurrent/__init__.py @@ -12,12 +12,13 @@ from .thread import TerminableThread, Condition, SingleStartThread, call_in_sepa from .timer import Timer from .thread_collection import ThreadCollection from .queue import PeekableQueue +from .list_processor import parallel_construct __all__ = ['LockedDataset', 'Monitor', 'RMonitor', 'CallableGroup', 'TerminableThread', 'MonitorDict', 'MonitorList', 'Condition', 'LockedStructure', 'AtomicNumber', 'CallNoOftenThan', 'SingleStartThread', 'IDAllocator', 'call_in_separate_thread', 'BogusTerminableThread', 'Timer', 'parallel_execute', 'run_as_future', 'sync_threadpool', 'IntervalTerminableThread', 'Future', 'MonitorSet', - 'WrappingFuture', 'InvalidStateError', 'PeekableQueue', + 'WrappingFuture', 'InvalidStateError', 'PeekableQueue', 'parallel_construct', 'CancellableCallback', 'ThreadCollection', 'FutureCollection', 'SequentialIssuer'] diff --git a/satella/coding/concurrent/list_processor.py b/satella/coding/concurrent/list_processor.py new file mode 100644 index 0000000000000000000000000000000000000000..b251a42ff880578795d134f6eded93b66fe661bc --- /dev/null +++ b/satella/coding/concurrent/list_processor.py @@ -0,0 +1,44 @@ +import typing as tp +from concurrent.futures.thread import ThreadPoolExecutor + +from satella.coding.typing import V, U + +try: + import opentracing +except ImportError: + opentracing = None + + +def parallel_construct(iterable: tp.Iterable[V], + function: tp.Callable[[V], tp.Optional[U]], + thread_pool: ThreadPoolExecutor) -> tp.List[U]: + """ + Construct a list from executing given function in a thread pool executor. + + If opentracing is installed, and tracing is enabled, current span will be passed to child threads. + + :param iterable: iterable to apply + :param function: function to apply. If that function returns None, no element will be added + :param thread_pool: thread pool to execute + :return: list that is the result of parallel application of function on each element + """ + wrap_iterable = None + + if opentracing is not None: + tracer = opentracing.global_tracer() + span = tracer.active_span + if span is not None: + current_span = tracer.active_span + + def wrap_iterable(arg, *args, **kwargs): + tracer.scope_manager.activate(current_span, finish_on_close=False) + return function(arg, *args, **kwargs) + + if wrap_iterable is None: + wrap_iterable = function + + result = [] + for item in thread_pool.map(wrap_iterable, iterable): + if item is not None: + result.append(item) + return result diff --git a/tests/test_coding/test_concurrent.py b/tests/test_coding/test_concurrent.py index aa913113645e471a0918abdedc70b225066e4ebd..98a9fb19fff0bc3822a22de0c4e3551bc96a5871 100644 --- a/tests/test_coding/test_concurrent.py +++ b/tests/test_coding/test_concurrent.py @@ -7,11 +7,14 @@ import time import unittest from concurrent.futures import ThreadPoolExecutor, Future as PythonFuture +from opentracing.mocktracer import MockTracer +from opentracing import set_global_tracer + from satella.coding.concurrent import TerminableThread, CallableGroup, Condition, MonitorList, \ LockedStructure, AtomicNumber, Monitor, IDAllocator, call_in_separate_thread, Timer, \ parallel_execute, run_as_future, sync_threadpool, IntervalTerminableThread, Future, \ WrappingFuture, PeekableQueue, SequentialIssuer, CancellableCallback, ThreadCollection, \ - BogusTerminableThread, SingleStartThread, FutureCollection, MonitorSet + BogusTerminableThread, SingleStartThread, FutureCollection, MonitorSet, parallel_construct from satella.coding.concurrent.futures import call_in_future, ExecutorWrapper from satella.coding.sequences import unique from satella.exceptions import WouldWaitMore, AlreadyAllocated, Empty @@ -19,6 +22,18 @@ from satella.exceptions import WouldWaitMore, AlreadyAllocated, Empty class TestConcurrent(unittest.TestCase): + def test_parallel_construct(self): + mock = MockTracer() + set_global_tracer(mock) + + def mult2(x): + return x*2 + + tpe = ThreadPoolExecutor(max_workers=2) + + ret = parallel_construct([1, 2, 3], mult2, tpe) + self.assertEqual(ret, [2, 4, 6]) + def test_monitor_set(self): ms = MonitorSet([1, 2, 3]) self.assertFalse(ms.insert_and_check(2))