diff --git a/satella/coding/concurrent/list_processor.py b/satella/coding/concurrent/list_processor.py index 9b385d0a032f031d346b665e5f4f163f233b600d..fafcedadd2aa76f2b79d584372f4449fcfed76f3 100644 --- a/satella/coding/concurrent/list_processor.py +++ b/satella/coding/concurrent/list_processor.py @@ -11,7 +11,8 @@ except ImportError: def parallel_construct(iterable: tp.Iterable[V], function: tp.Callable[[V], tp.Optional[U]], - thread_pool: ThreadPoolExecutor) -> tp.List[U]: + thread_pool: ThreadPoolExecutor, + span_title: tp.Optional[str] = None) -> tp.List[U]: """ Construct a list from executing given function in a thread pool executor. @@ -20,6 +21,7 @@ def parallel_construct(iterable: tp.Iterable[V], :param iterable: iterable to apply :param function: function to apply. If that function returns None, no element will be added :param thread_pool: thread pool to execute + :param span_title: span title to create. For each execution a child span will be returned :return: list that is the result of parallel application of function on each element """ wrap_iterable = None @@ -28,13 +30,10 @@ def parallel_construct(iterable: tp.Iterable[V], tracer = opentracing.global_tracer() span = tracer.active_span if span is not None: - current_span = tracer.active_span def wrap_iterable(arg, *args, **kwargs): - scope = tracer.scope_manager.activate(current_span, finish_on_close=False) - v = function(arg, *args, **kwargs) - scope.close() - return v + with tracer.start_active_span(span_title or 'New span', child_of=span): + return function(arg, *args, **kwargs) if wrap_iterable is None: wrap_iterable = function