diff --git a/satella/coding/concurrent/sync.py b/satella/coding/concurrent/sync.py index 8be256015f06df0cf26e2e68171471922a872084..9dda7e2ce929b27e9c167ffc5cef1670e527fa62 100644 --- a/satella/coding/concurrent/sync.py +++ b/satella/coding/concurrent/sync.py @@ -9,6 +9,16 @@ from satella.exceptions import WouldWaitMore from satella.time.measure import measure +def _while_sync_threadpool(tpe, max_wait, measurement, futures): + while tpe._work_queue.qsize() > 0: # pylint: disable=protected-access + if max_wait is not None: + if measurement() > max_wait: + for future in futures: + future.cancel() + raise WouldWaitMore('timeout exceeded') + time.sleep(0.5) + + def sync_threadpool(tpe: tp.Union[ExecutorWrapper, ThreadPoolExecutor], max_wait: tp.Optional[float] = None) -> None: """ @@ -27,8 +37,7 @@ def sync_threadpool(tpe: tp.Union[ExecutorWrapper, ThreadPoolExecutor], assert isinstance(tpe, ThreadPoolExecutor), 'Must be a ThreadPoolExecutor!' with measure(timeout=max_wait) as measurement: - # noinspection PyProtectedMember - workers = tpe._max_workers + workers = tpe._max_workers # pylint: disable=protected-access atm_n = AtomicNumber(workers) cond = Condition() @@ -40,14 +49,7 @@ def sync_threadpool(tpe: tp.Union[ExecutorWrapper, ThreadPoolExecutor], futures = [tpe.submit(decrease_atm) for _ in range(workers)] # wait for all currently scheduled jobs to be picked up - # noinspection PyProtectedMember - while tpe._work_queue.qsize() > 0: - if max_wait is not None: - if measurement() > max_wait: - for future in futures: - future.cancel() - raise WouldWaitMore('timeout exceeded') - time.sleep(0.5) + _while_sync_threadpool(tpe, max_wait, measurement, futures) if max_wait is None: atm_n.wait_until_equal(0) diff --git a/satella/coding/sequences/sequences.py b/satella/coding/sequences/sequences.py index 22482187be40928174d1ff2829439eb78ec10c21..99be8c89f9e94de804f381514ddb83c92cb32c86 100644 --- a/satella/coding/sequences/sequences.py +++ b/satella/coding/sequences/sequences.py @@ -1 +1 @@ -import copy import typing as tp from satella.coding.decorators.decorators import wraps from satella.coding.recast_exceptions import rethrow_as from .iterators import n_th from ..typing import T, Iteratable, NoArgCallable, Predicate def infinite_iterator(returns: tp.Optional[T] = None, return_factory: tp.Optional[NoArgCallable[T]] = None) -> tp.Iterator[T]: """ Return an infinite number of objects. :param returns: object to return. Note that this will be this very object, it will not be copied. :param return_factory: a callable that takes 0 args and returns an element to return. :return: an infinite iterator of provided values """ while True: if returns is None: if return_factory is None: yield None else: yield return_factory() else: yield returns def make_list(element: T, n: int, deep_copy: bool = False) -> tp.List[T]: """ Make a list consisting of n times element. Element will be copied via copy.copy before adding to list. :param element: element :param n: times to repeat the element :param deep_copy: whether to use copy.deepcopy instead of copy.copy :return: list of length n """ output = [] if deep_copy: copy_op = copy.deepcopy else: copy_op = copy.copy for _ in range(n): output.append(copy_op(element)) return output # shamelessly copied from # https://medium.com/better-programming/is-this-the-last-element-of-my-python-for-loop-784f5ff90bb5 def is_last(lst: Iteratable) -> tp.Iterator[tp.Tuple[bool, T]]: """ Return every element of the list, alongside a flag telling is this the last element. Use like: >>> for is_last, element in is_last(my_list): >>> if is_last: >>> ... :param lst: list to iterate thru :return: a p_gen returning (bool, T) Note that this returns a nice, O(1) iterator. """ iterable = iter(lst) ret_var = next(iterable) for val in iterable: yield False, ret_var ret_var = val yield True, ret_var def add_next(lst: Iteratable, wrap_over: bool = False, skip_last: bool = False) -> tp.Iterator[tp.Tuple[T, tp.Optional[T]]]: """ Yields a 2-tuple of given iterable, presenting the next element as second element of the tuple. The last element will be the last element alongside with a None, if wrap_over is False, or the first element if wrap_over was True Example: >>> list(add_next([1, 2, 3, 4, 5])) == [(1, 2), (2, 3), (3, 4), (4, 5), (5, None)] >>> list(add_next([1, 2, 3, 4, 5], True)) == [(1, 2), (2, 3), (3, 4), (4, 5), (5, 1)] :param lst: iterable to iterate over :param wrap_over: whether to attach the first element to the pair of the last element instead of None :param skip_last: if this is True, then last element, alongside with a None, won't be output """ iterator = iter(lst) try: first_val = prev_val = next(iterator) except StopIteration: return for val in iterator: yield prev_val, val prev_val = val if wrap_over: yield prev_val, first_val else: if not skip_last: yield prev_val, None def half_cartesian(seq: tp.Iterable[T], include_same_pairs: bool = True) -> tp.Iterator[tp.Tuple[T, T]]: """ Generate half of the Cartesian product of both sequences. Useful when you have a commutative operation that you'd like to execute on both elements (eg. checking for collisions). Example: >>> list(half_cartesian([1, 2, 3], [1, 2, 3])) == \ >>> [(1, 1), (1, 2), (1, 3), (2, 2), (2, 3), (3, 3)] :param seq: The sequence :param include_same_pairs: if True, then pairs returning two of the same objects will be returned. For example, if False, the following will be true: >>> list(half_cartesian([1, 2, 3], [1, 2, 3], include_same_pairs=False)) == \ >>> [(1, 2), (1, 3), (2, 3)] """ for i, elem1 in enumerate(seq): for j, elem2 in enumerate(seq): if include_same_pairs: if j >= i: yield elem1, elem2 else: if j > i: yield elem1, elem2 def group_quantity(length: int, seq: Iteratable) -> tp.Iterator[tp.List[T]]: """ Slice an iterable into lists containing at most len entries. Eg. >>> assert list(group_quantity(3, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])) == [[1, 2, 3], [4, 5, 6], >>> [7, 8, 9], [10]] This correctly detects sequences, and uses an optimized variant via slicing if a sequence is passed. You can safely pass ranges :param length: p_len for the returning sequences :param seq: sequence to split """ if isinstance(seq, tp.Sequence) and not isinstance(seq, range): i = 0 while i < len(seq): yield seq[i:i + length] i += length else: entries = [] for elem in seq: if len(entries) == length: yield entries entries = [elem] else: entries.append(elem) if entries: yield entries def filter_out_nones(y: tp.Sequence[T]) -> tp.List[T]: """ Return all elements, as a list, that are not None :param y: a sequence of items :return: a list of all subelements, in order, that are not None """ output = [] for item in y: if item is not None: output.append(item) return output def filter_out_false(y: tp.Sequence[T]) -> tp.List[T]: """ Return all elements, as a list, that are True :param y: a sequence of items :return: a list of all subelements, in order, that are not None """ output = [] for item in y: if item: output.append(item) return output @rethrow_as(IndexError, ValueError) def index_of_max(seq: tp.Sequence[T]) -> int: """ Return the index of the maximum element :param seq: sequence to examine :return: index of the maximum element :raise ValueError: sequence was empty """ max_index = 0 max_elem = seq[0] for i, elem in enumerate(seq): if elem > max_elem: max_index = i max_elem = elem return max_index def index_of(predicate: Predicate, seq: tp.Sequence[T]) -> int: """ Return an index of first met element that calling predicate on it returns True :param predicate: predicate to apply :param seq: sequence to examine :return: index of the element :raises ValueError: if no element found """ i = 0 for elem in seq: if predicate(elem): return i i += 1 raise ValueError('Element not found') class Multirun: """ A class to launch the same operation on the entire sequence. Consider: >>> class Counter: >>> def __init__(self, value=0): >>> self.count = value >>> def add(self, v): >>> self.count += 1 >>> def __eq__(self, other): >>> return self.count == other.count >>> def __iadd__(self, other): >>> self.add(other) >>> a = [Counter(), Counter()] The following: >>> for b in a: >>> b.add(2) Can be replaced with >>> Multirun(a).add(2) And the following: >>> for b in a: >>> b += 3 With this >>> b = Mulirun(a) >>> b += 3 Furthermore note that: >>> Multirun(a).add(2) == [Counter(2), Counter(2)] :param sequence: sequence to execute these operations for :param dont_return_list: the operation won't return a list if this is True """ __slots__ = 'sequence', 'dont_return_list' def __bool__(self) -> bool: return bool(self.sequence) def __init__(self, sequence: tp.Iterable, dont_return_list: bool = False): self.sequence = sequence self.dont_return_list = dont_return_list def __iter__(self): return iter(self.sequence) def __getattr__(self, item): def inner(*args, **kwargs): if not self.dont_return_list: results = [] for element in self: getattr(element, item)(*args, **kwargs) results.append(element) return results else: for element in self: getattr(element, item)(*args, **kwargs) # Take care: the array might just be empty... try: fun = getattr(n_th(self), item) inner = wraps(fun)(inner) except IndexError: pass return inner def __iadd__(self, other): for element in self: element += other return self def __isub__(self, other): for element in self: element -= other return self def __imul__(self, other): for element in self: element *= other return self def __itruediv__(self, other): for element in self: element /= other return self def __ifloordiv__(self, other): for element in self: element //= other return self def __ilshift__(self, other): for element in self: element <<= other return self def __irshift__(self, other): for element in self: element >>= other return self def __ipow__(self, other): for element in self: element **= other return self \ No newline at end of file +import copy import typing as tp from satella.coding.decorators.decorators import wraps from satella.coding.recast_exceptions import rethrow_as from .iterators import n_th from ..typing import T, Iteratable, NoArgCallable, Predicate def infinite_iterator(returns: tp.Optional[T] = None, return_factory: tp.Optional[NoArgCallable[T]] = None) -> tp.Iterator[T]: """ Return an infinite number of objects. :param returns: object to return. Note that this will be this very object, it will not be copied. :param return_factory: a callable that takes 0 args and returns an element to return. :return: an infinite iterator of provided values """ while True: if returns is None: yield None if return_factory is None else return_factory() else: yield returns def make_list(element: T, n: int, deep_copy: bool = False) -> tp.List[T]: """ Make a list consisting of n times element. Element will be copied via copy.copy before adding to list. :param element: element :param n: times to repeat the element :param deep_copy: whether to use copy.deepcopy instead of copy.copy :return: list of length n """ output = [] if deep_copy: copy_op = copy.deepcopy else: copy_op = copy.copy for _ in range(n): output.append(copy_op(element)) return output # shamelessly copied from # https://medium.com/better-programming/is-this-the-last-element-of-my-python-for-loop-784f5ff90bb5 def is_last(lst: Iteratable) -> tp.Iterator[tp.Tuple[bool, T]]: """ Return every element of the list, alongside a flag telling is this the last element. Use like: >>> for is_last, element in is_last(my_list): >>> if is_last: >>> ... :param lst: list to iterate thru :return: a p_gen returning (bool, T) Note that this returns a nice, O(1) iterator. """ iterable = iter(lst) ret_var = next(iterable) for val in iterable: yield False, ret_var ret_var = val yield True, ret_var def add_next(lst: Iteratable, wrap_over: bool = False, skip_last: bool = False) -> tp.Iterator[tp.Tuple[T, tp.Optional[T]]]: """ Yields a 2-tuple of given iterable, presenting the next element as second element of the tuple. The last element will be the last element alongside with a None, if wrap_over is False, or the first element if wrap_over was True Example: >>> list(add_next([1, 2, 3, 4, 5])) == [(1, 2), (2, 3), (3, 4), (4, 5), (5, None)] >>> list(add_next([1, 2, 3, 4, 5], True)) == [(1, 2), (2, 3), (3, 4), (4, 5), (5, 1)] :param lst: iterable to iterate over :param wrap_over: whether to attach the first element to the pair of the last element instead of None :param skip_last: if this is True, then last element, alongside with a None, won't be output """ iterator = iter(lst) try: first_val = prev_val = next(iterator) except StopIteration: return for val in iterator: yield prev_val, val prev_val = val if wrap_over: yield prev_val, first_val else: if not skip_last: yield prev_val, None def half_cartesian(seq: tp.Iterable[T], include_same_pairs: bool = True) -> tp.Iterator[tp.Tuple[T, T]]: """ Generate half of the Cartesian product of both sequences. Useful when you have a commutative operation that you'd like to execute on both elements (eg. checking for collisions). Example: >>> list(half_cartesian([1, 2, 3], [1, 2, 3])) == \ >>> [(1, 1), (1, 2), (1, 3), (2, 2), (2, 3), (3, 3)] :param seq: The sequence :param include_same_pairs: if True, then pairs returning two of the same objects will be returned. For example, if False, the following will be true: >>> list(half_cartesian([1, 2, 3], [1, 2, 3], include_same_pairs=False)) == \ >>> [(1, 2), (1, 3), (2, 3)] """ for i, elem1 in enumerate(seq): for j, elem2 in enumerate(seq): if include_same_pairs: if j >= i: yield elem1, elem2 else: if j > i: yield elem1, elem2 def group_quantity(length: int, seq: Iteratable) -> tp.Iterator[tp.List[T]]: """ Slice an iterable into lists containing at most len entries. Eg. >>> assert list(group_quantity(3, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])) == [[1, 2, 3], [4, 5, 6], >>> [7, 8, 9], [10]] This correctly detects sequences, and uses an optimized variant via slicing if a sequence is passed. You can safely pass ranges :param length: p_len for the returning sequences :param seq: sequence to split """ if isinstance(seq, tp.Sequence) and not isinstance(seq, range): i = 0 while i < len(seq): yield seq[i:i + length] i += length else: entries = [] for elem in seq: if len(entries) == length: yield entries entries = [elem] else: entries.append(elem) if entries: yield entries def filter_out_nones(y: tp.Sequence[T]) -> tp.List[T]: """ Return all elements, as a list, that are not None :param y: a sequence of items :return: a list of all subelements, in order, that are not None """ output = [] for item in y: if item is not None: output.append(item) return output def filter_out_false(y: tp.Sequence[T]) -> tp.List[T]: """ Return all elements, as a list, that are True :param y: a sequence of items :return: a list of all subelements, in order, that are not None """ output = [] for item in y: if item: output.append(item) return output @rethrow_as(IndexError, ValueError) def index_of_max(seq: tp.Sequence[T]) -> int: """ Return the index of the maximum element :param seq: sequence to examine :return: index of the maximum element :raise ValueError: sequence was empty """ max_index = 0 max_elem = seq[0] for i, elem in enumerate(seq): if elem > max_elem: max_index = i max_elem = elem return max_index def index_of(predicate: Predicate, seq: tp.Sequence[T]) -> int: """ Return an index of first met element that calling predicate on it returns True :param predicate: predicate to apply :param seq: sequence to examine :return: index of the element :raises ValueError: if no element found """ i = 0 for elem in seq: if predicate(elem): return i i += 1 raise ValueError('Element not found') class Multirun: """ A class to launch the same operation on the entire sequence. Consider: >>> class Counter: >>> def __init__(self, value=0): >>> self.count = value >>> def add(self, v): >>> self.count += 1 >>> def __eq__(self, other): >>> return self.count == other.count >>> def __iadd__(self, other): >>> self.add(other) >>> a = [Counter(), Counter()] The following: >>> for b in a: >>> b.add(2) Can be replaced with >>> Multirun(a).add(2) And the following: >>> for b in a: >>> b += 3 With this >>> b = Mulirun(a) >>> b += 3 Furthermore note that: >>> Multirun(a).add(2) == [Counter(2), Counter(2)] :param sequence: sequence to execute these operations for :param dont_return_list: the operation won't return a list if this is True """ __slots__ = 'sequence', 'dont_return_list' def __bool__(self) -> bool: return bool(self.sequence) def __init__(self, sequence: tp.Iterable, dont_return_list: bool = False): self.sequence = sequence self.dont_return_list = dont_return_list def __iter__(self): return iter(self.sequence) def __getattr__(self, item): def inner(*args, **kwargs): if not self.dont_return_list: results = [] for element in self: getattr(element, item)(*args, **kwargs) results.append(element) return results else: for element in self: getattr(element, item)(*args, **kwargs) # Take care: the array might just be empty... try: fun = getattr(n_th(self), item) inner = wraps(fun)(inner) except IndexError: pass return inner def __iadd__(self, other): for element in self: element += other return self def __isub__(self, other): for element in self: element -= other return self def __imul__(self, other): for element in self: element *= other return self def __itruediv__(self, other): for element in self: element /= other return self def __ifloordiv__(self, other): for element in self: element //= other return self def __ilshift__(self, other): for element in self: element <<= other return self def __irshift__(self, other): for element in self: element >>= other return self def __ipow__(self, other): for element in self: element **= other return self \ No newline at end of file