From b1b0c988bae4d953a9e2953bfe9a7e48b22cf513 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <pmaslanka@smok.co> Date: Fri, 10 Dec 2021 18:24:51 +0100 Subject: [PATCH] added parallel_construct --- .travis.yml | 2 +- CHANGELOG.md | 2 + docs/coding/concurrent.rst | 5 +++ satella/__init__.py | 2 +- satella/coding/concurrent/__init__.py | 3 +- satella/coding/concurrent/list_processor.py | 44 +++++++++++++++++++++ tests/test_coding/test_concurrent.py | 17 +++++++- 7 files changed, 71 insertions(+), 4 deletions(-) create mode 100644 satella/coding/concurrent/list_processor.py diff --git a/.travis.yml b/.travis.yml index 8718f661..5af14fde 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,7 +10,7 @@ before_script: - chmod +x ./cc-test-reporter - ./cc-test-reporter before-build - pip install -r requirements.txt - - pip install -U pytest-xdist pytest-cov pytest pytest-forked pluggy py + - pip install -U pytest-xdist pytest-cov pytest pytest-forked pluggy py opentracing - python setup.py install jobs: include: diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ef665fe..ce613408 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1 +1,3 @@ # v2.18.10 + +* added parallel_construct diff --git a/docs/coding/concurrent.rst b/docs/coding/concurrent.rst index e36c8741..ebb65b89 100644 --- a/docs/coding/concurrent.rst +++ b/docs/coding/concurrent.rst @@ -14,6 +14,11 @@ CallNoOftenThan .. autoclass:: satella.coding.concurrent.CallNoOftenThan :members: +parallel_construct +------------------ + +.. autofunction:: satella.coding.concurrent.parallel_construct + CancellableCallback ------------------- diff --git a/satella/__init__.py b/satella/__init__.py index 8f93d001..717ae9b8 100644 --- a/satella/__init__.py +++ b/satella/__init__.py @@ -1,2 +1,2 @@ -__version__ = '2.18.10a1' +__version__ = '2.18.10rc1' diff --git a/satella/coding/concurrent/__init__.py b/satella/coding/concurrent/__init__.py index 71b96832..7f4840a9 100644 --- a/satella/coding/concurrent/__init__.py +++ b/satella/coding/concurrent/__init__.py @@ -12,12 +12,13 @@ from .thread import TerminableThread, Condition, SingleStartThread, call_in_sepa from .timer import Timer from .thread_collection import ThreadCollection from .queue import PeekableQueue +from .list_processor import parallel_construct __all__ = ['LockedDataset', 'Monitor', 'RMonitor', 'CallableGroup', 'TerminableThread', 'MonitorDict', 'MonitorList', 'Condition', 'LockedStructure', 'AtomicNumber', 'CallNoOftenThan', 'SingleStartThread', 'IDAllocator', 'call_in_separate_thread', 'BogusTerminableThread', 'Timer', 'parallel_execute', 'run_as_future', 'sync_threadpool', 'IntervalTerminableThread', 'Future', 'MonitorSet', - 'WrappingFuture', 'InvalidStateError', 'PeekableQueue', + 'WrappingFuture', 'InvalidStateError', 'PeekableQueue', 'parallel_construct', 'CancellableCallback', 'ThreadCollection', 'FutureCollection', 'SequentialIssuer'] diff --git a/satella/coding/concurrent/list_processor.py b/satella/coding/concurrent/list_processor.py new file mode 100644 index 00000000..b251a42f --- /dev/null +++ b/satella/coding/concurrent/list_processor.py @@ -0,0 +1,44 @@ +import typing as tp +from concurrent.futures.thread import ThreadPoolExecutor + +from satella.coding.typing import V, U + +try: + import opentracing +except ImportError: + opentracing = None + + +def parallel_construct(iterable: tp.Iterable[V], + function: tp.Callable[[V], tp.Optional[U]], + thread_pool: ThreadPoolExecutor) -> tp.List[U]: + """ + Construct a list from executing given function in a thread pool executor. + + If opentracing is installed, and tracing is enabled, current span will be passed to child threads. + + :param iterable: iterable to apply + :param function: function to apply. If that function returns None, no element will be added + :param thread_pool: thread pool to execute + :return: list that is the result of parallel application of function on each element + """ + wrap_iterable = None + + if opentracing is not None: + tracer = opentracing.global_tracer() + span = tracer.active_span + if span is not None: + current_span = tracer.active_span + + def wrap_iterable(arg, *args, **kwargs): + tracer.scope_manager.activate(current_span, finish_on_close=False) + return function(arg, *args, **kwargs) + + if wrap_iterable is None: + wrap_iterable = function + + result = [] + for item in thread_pool.map(wrap_iterable, iterable): + if item is not None: + result.append(item) + return result diff --git a/tests/test_coding/test_concurrent.py b/tests/test_coding/test_concurrent.py index aa913113..98a9fb19 100644 --- a/tests/test_coding/test_concurrent.py +++ b/tests/test_coding/test_concurrent.py @@ -7,11 +7,14 @@ import time import unittest from concurrent.futures import ThreadPoolExecutor, Future as PythonFuture +from opentracing.mocktracer import MockTracer +from opentracing import set_global_tracer + from satella.coding.concurrent import TerminableThread, CallableGroup, Condition, MonitorList, \ LockedStructure, AtomicNumber, Monitor, IDAllocator, call_in_separate_thread, Timer, \ parallel_execute, run_as_future, sync_threadpool, IntervalTerminableThread, Future, \ WrappingFuture, PeekableQueue, SequentialIssuer, CancellableCallback, ThreadCollection, \ - BogusTerminableThread, SingleStartThread, FutureCollection, MonitorSet + BogusTerminableThread, SingleStartThread, FutureCollection, MonitorSet, parallel_construct from satella.coding.concurrent.futures import call_in_future, ExecutorWrapper from satella.coding.sequences import unique from satella.exceptions import WouldWaitMore, AlreadyAllocated, Empty @@ -19,6 +22,18 @@ from satella.exceptions import WouldWaitMore, AlreadyAllocated, Empty class TestConcurrent(unittest.TestCase): + def test_parallel_construct(self): + mock = MockTracer() + set_global_tracer(mock) + + def mult2(x): + return x*2 + + tpe = ThreadPoolExecutor(max_workers=2) + + ret = parallel_construct([1, 2, 3], mult2, tpe) + self.assertEqual(ret, [2, 4, 6]) + def test_monitor_set(self): ms = MonitorSet([1, 2, 3]) self.assertFalse(ms.insert_and_check(2)) -- GitLab