From 56cbc83b50ddb1c626b41fbb91e7d755b675197c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Mon, 26 Oct 2020 23:03:12 +0100 Subject: [PATCH] add typing --- CHANGELOG.md | 3 +- docs/coding/typing.rst | 11 +++++ docs/index.rst | 1 + satella/coding/concurrent/callablegroup.py | 2 +- satella/coding/concurrent/functions.py | 2 +- satella/coding/concurrent/locked_structure.py | 2 +- satella/coding/concurrent/monitor.py | 4 +- satella/coding/decorators/arguments.py | 2 +- satella/coding/decorators/decorators.py | 5 +-- satella/coding/decorators/flow_control.py | 7 ++- satella/coding/decorators/preconditions.py | 2 +- satella/coding/deleters.py | 3 +- satella/coding/recast_exceptions.py | 14 +++--- satella/coding/sequences/choose.py | 6 +-- satella/coding/sequences/iterators.py | 43 +++++++++---------- satella/coding/sequences/sequences.py | 2 +- .../structures/dictionaries/cache_dict.py | 3 +- .../structures/dictionaries/dict_object.py | 2 +- .../structures/dictionaries/expiring.py | 4 +- .../coding/structures/dictionaries/objects.py | 4 +- .../dictionaries/writeback_cache.py | 4 +- satella/coding/structures/heaps/base.py | 2 +- satella/coding/structures/heaps/time.py | 2 +- satella/coding/structures/proxy.py | 2 +- satella/coding/structures/queues.py | 2 +- satella/coding/structures/ranking.py | 2 +- satella/coding/structures/sorted_list.py | 2 +- satella/coding/structures/sparse_matrix.py | 2 +- satella/coding/transforms/__init__.py | 4 +- satella/coding/typing.py | 12 ++++++ tests/test_coding/test_typing.py | 14 ++++++ 31 files changed, 97 insertions(+), 73 deletions(-) create mode 100644 docs/coding/typing.rst create mode 100644 satella/coding/typing.py create mode 100644 tests/test_coding/test_typing.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 7fe0e0c5..c98b72ed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ -# v2.11.36 +# v2.12 * added `intify` +* added `satella.coding.typing` diff --git a/docs/coding/typing.rst b/docs/coding/typing.rst new file mode 100644 index 00000000..2051e7cc --- /dev/null +++ b/docs/coding/typing.rst @@ -0,0 +1,11 @@ +Typing +====== + +Satella contains some expressions to help you with typing. +You import them from `satella.coding.typing`. +They are as follows: + +* `ExceptionClassType` - base type of exception class +* `Number` - an amalgam of int and float +* `T`, `U`, `K`, `V` - type vars to use +* `Iteratable` - a generic iterator or an iterable of `T` diff --git a/docs/index.rst b/docs/index.rst index 7e77d41f..1b1a469c 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -19,6 +19,7 @@ Visit the project's page at GitHub_! coding/concurrent coding/sequences coding/transforms + coding/typing instrumentation/traceback instrumentation/memory instrumentation/metrics diff --git a/satella/coding/concurrent/callablegroup.py b/satella/coding/concurrent/callablegroup.py index ceb48f0f..1325c407 100644 --- a/satella/coding/concurrent/callablegroup.py +++ b/satella/coding/concurrent/callablegroup.py @@ -3,7 +3,7 @@ import copy import time import typing as tp -T = tp.TypeVar('T') +from satella.coding.typing import T class CallableGroup(tp.Generic[T]): diff --git a/satella/coding/concurrent/functions.py b/satella/coding/concurrent/functions.py index 2f31679c..cb1f9176 100644 --- a/satella/coding/concurrent/functions.py +++ b/satella/coding/concurrent/functions.py @@ -6,7 +6,7 @@ from satella.coding.decorators.decorators import wraps from satella.coding.sequences.sequences import infinite_iterator -T = tp.TypeVar('T') +from satella.coding.typing import T def run_as_future(fun): diff --git a/satella/coding/concurrent/locked_structure.py b/satella/coding/concurrent/locked_structure.py index 7ba6359a..2fb16eb3 100644 --- a/satella/coding/concurrent/locked_structure.py +++ b/satella/coding/concurrent/locked_structure.py @@ -3,7 +3,7 @@ import typing as tp from ..structures.proxy import Proxy -T = tp.TypeVar('T') +from satella.coding.typing import T class LockedStructure(Proxy, tp.Generic[T]): diff --git a/satella/coding/concurrent/monitor.py b/satella/coding/concurrent/monitor.py index 099e05e4..e7d2c641 100644 --- a/satella/coding/concurrent/monitor.py +++ b/satella/coding/concurrent/monitor.py @@ -9,9 +9,7 @@ __all__ = [ 'Monitor', 'RMonitor', 'MonitorDict', 'MonitorList' ] -K = tp.TypeVar('K') -V = tp.TypeVar('V') -T = tp.TypeVar('T') +from ..typing import K, V, T class Monitor: diff --git a/satella/coding/decorators/arguments.py b/satella/coding/decorators/arguments.py index 96233a2b..9a1c2090 100644 --- a/satella/coding/decorators/arguments.py +++ b/satella/coding/decorators/arguments.py @@ -7,7 +7,7 @@ from inspect import Parameter from .decorators import wraps from ..misc import source_to_function, get_arguments, call_with_arguments, _get_arguments -T = tp.TypeVar('T') +from satella.coding.typing import T U = tp.TypeVar('U') diff --git a/satella/coding/decorators/decorators.py b/satella/coding/decorators/decorators.py index b888ff99..900420f1 100644 --- a/satella/coding/decorators/decorators.py +++ b/satella/coding/decorators/decorators.py @@ -2,12 +2,11 @@ import inspect import typing as tp import warnings +from satella.coding.typing import T, U from satella.exceptions import PreconditionError -T = tp.TypeVar('T') -U = tp.TypeVar('U') Expression = tp.NewType('Expression', str) -ExcType = tp.Type[Exception] + # noinspection PyPep8Naming diff --git a/satella/coding/decorators/flow_control.py b/satella/coding/decorators/flow_control.py index e2e31291..d305df2f 100644 --- a/satella/coding/decorators/flow_control.py +++ b/satella/coding/decorators/flow_control.py @@ -1,15 +1,14 @@ -import inspect import typing as tp import queue -from .decorators import wraps, ExcType - +from .decorators import wraps +from ..typing import ExceptionClassType Queue = tp.TypeVar('Queue') def queue_get(queue_getter: tp.Union[str, tp.Callable[[object], Queue]], timeout: tp.Optional[float] = None, - exception_empty: tp.Union[ExcType, tp.Tuple[ExcType, ...]] = queue.Empty, + exception_empty: tp.Union[ExceptionClassType, tp.Tuple[ExceptionClassType, ...]] = queue.Empty, queue_get_method: tp.Callable[[Queue, tp.Optional[float]], tp.Any] = lambda x, timeout: x.get( timeout=timeout), diff --git a/satella/coding/decorators/preconditions.py b/satella/coding/decorators/preconditions.py index 27d2eedd..73fa0572 100644 --- a/satella/coding/decorators/preconditions.py +++ b/satella/coding/decorators/preconditions.py @@ -5,7 +5,7 @@ from satella.exceptions import PreconditionError from .decorators import wraps from ..misc import source_to_function -T = tp.TypeVar('T') +from satella.coding.typing import T Expression = tp.NewType('Expression', str) Condition = tp.Union[tp.Callable[[T], bool], Expression] diff --git a/satella/coding/deleters.py b/satella/coding/deleters.py index 138e5fa2..28fdaf5f 100644 --- a/satella/coding/deleters.py +++ b/satella/coding/deleters.py @@ -2,10 +2,11 @@ import copy import typing as tp import collections +from satella.coding.typing import T + ITER_KEYS = 0 ITER_VALUES = 1 ITER_ITEMS = 2 -T = tp.TypeVar('T') class DictDeleter: diff --git a/satella/coding/recast_exceptions.py b/satella/coding/recast_exceptions.py index 8cedeb4f..cd51490d 100644 --- a/satella/coding/recast_exceptions.py +++ b/satella/coding/recast_exceptions.py @@ -4,12 +4,10 @@ import threading import typing as tp from .decorators.decorators import wraps +from .typing import ExceptionClassType, T -ExcType = tp.Type[Exception] -T = tp.TypeVar('T') - -def silence_excs(*exc_types: ExcType, returns=None, +def silence_excs(*exc_types: ExceptionClassType, returns=None, returns_factory: tp.Optional[tp.Callable[[], tp.Any]] = None): """ Silence given exception types. @@ -68,7 +66,7 @@ class log_exceptions: severity: int = logging.ERROR, format_string: str = '{e}', locals_: tp.Optional[tp.Dict] = None, - exc_types: tp.Union[ExcType, tp.Sequence[ExcType]] = Exception, + exc_types: tp.Union[ExceptionClassType, tp.Sequence[ExceptionClassType]] = Exception, swallow_exception: bool = False): self.logger = logger self.swallow_exception = swallow_exception @@ -162,7 +160,7 @@ class rethrow_as: __slots__ = ('mapping', 'exception_preprocessor', 'returns', '__exception_remapped', 'returns_factory') - def __init__(self, *pairs: tp.Union[ExcType, tp.Tuple[ExcType, ...]], + def __init__(self, *pairs: tp.Union[ExceptionClassType, tp.Tuple[ExceptionClassType, ...]], exception_preprocessor: tp.Optional[tp.Callable[[Exception], str]] = repr, returns=None, returns_factory: tp.Optional[tp.Callable[[], tp.Any]] = None): @@ -221,7 +219,7 @@ class rethrow_as: raise to(self.exception_preprocessor(exc_val)) -def raises_exception(exc_class: tp.Union[ExcType, tp.Tuple[ExcType, ...]], +def raises_exception(exc_class: tp.Union[ExceptionClassType, tp.Tuple[ExceptionClassType, ...]], clb: tp.Callable[[], None]) -> bool: """ Does the callable raise a given exception? @@ -234,7 +232,7 @@ def raises_exception(exc_class: tp.Union[ExcType, tp.Tuple[ExcType, ...]], return False -def catch_exception(exc_class: tp.Union[ExcType, tp.Tuple[ExcType, ...]], +def catch_exception(exc_class: tp.Union[ExceptionClassType, tp.Tuple[ExceptionClassType, ...]], clb: tp.Callable[[], tp.Optional[T]], return_instead: tp.Optional[T] = None, return_value_on_no_exception: bool = False) -> tp.Union[Exception, T]: diff --git a/satella/coding/sequences/choose.py b/satella/coding/sequences/choose.py index 2b4a4291..0eea3a37 100644 --- a/satella/coding/sequences/choose.py +++ b/satella/coding/sequences/choose.py @@ -1,9 +1,9 @@ import typing as tp -IteratorOrIterable = tp.Union[tp.Iterator, tp.Iterable] +from satella.coding.typing import Iteratable -def choose_one(filter_fun: tp.Callable[[tp.Any], bool], iterable: IteratorOrIterable) -> tp.Any: +def choose_one(filter_fun: tp.Callable[[tp.Any], bool], iterable: Iteratable) -> tp.Any: """ Syntactic sugar for @@ -19,7 +19,7 @@ def choose_one(filter_fun: tp.Callable[[tp.Any], bool], iterable: IteratorOrIter return choose(filter_fun, iterable, True) -def choose(filter_fun: tp.Callable[[tp.Any], bool], iterable: IteratorOrIterable, +def choose(filter_fun: tp.Callable[[tp.Any], bool], iterable: Iteratable, check_multiple: bool = False) -> tp.Any: """ Return a single value that exists in given iterable. diff --git a/satella/coding/sequences/iterators.py b/satella/coding/sequences/iterators.py index a79aaf31..73fc7ff6 100644 --- a/satella/coding/sequences/iterators.py +++ b/satella/coding/sequences/iterators.py @@ -5,13 +5,10 @@ import warnings from ..recast_exceptions import rethrow_as, silence_excs from ..decorators import for_argument, wraps +from ..typing import Iteratable, T, U -T = tp.TypeVar('T') -U = tp.TypeVar('U') -IteratorOrIterable = tp.Union[tp.Iterator[T], tp.Iterable[T]] - -def length(iterator: IteratorOrIterable) -> int: +def length(iterator: Iteratable) -> int: """ Return the length of an iterator, exhausting it by the way """ @@ -114,7 +111,7 @@ class ConstruableIterator: return len(self.entries) -def unique(lst: IteratorOrIterable) -> tp.Iterator[T]: +def unique(lst: Iteratable) -> tp.Iterator[T]: """ Return each element from lst, but return every element only once. @@ -134,7 +131,7 @@ def unique(lst: IteratorOrIterable) -> tp.Iterator[T]: @for_argument(iter) -def even(sq: IteratorOrIterable) -> tp.Iterator[T]: +def even(sq: Iteratable) -> tp.Iterator[T]: """ Return only elements with even indices in this iterable (first element will be returned, as indices are counted from 0) @@ -149,7 +146,7 @@ def even(sq: IteratorOrIterable) -> tp.Iterator[T]: @silence_excs(StopIteration) @for_argument(iter) -def odd(sq: IteratorOrIterable) -> tp.Iterator[T]: +def odd(sq: Iteratable) -> tp.Iterator[T]: """ Return only elements with odd indices in this iterable. """ @@ -161,7 +158,7 @@ def odd(sq: IteratorOrIterable) -> tp.Iterator[T]: return -def count(sq: IteratorOrIterable, start: tp.Optional[int] = None, step: int = 1, +def count(sq: Iteratable, start: tp.Optional[int] = None, step: int = 1, start_at: tp.Optional[int] = None) -> tp.Iterator[int]: """ Return a sequence of integers, for each entry in the sequence with provided step. @@ -222,8 +219,8 @@ def is_instance(classes: tp.Union[tp.Tuple[type, ...], type]) -> tp.Callable[[ob @for_argument(iter, iter) -def other_sequence_no_longer_than(base_sequence: IteratorOrIterable, - other_sequence: IteratorOrIterable) -> tp.Iterator[T]: +def other_sequence_no_longer_than(base_sequence: Iteratable, + other_sequence: Iteratable) -> tp.Iterator[T]: """ Return every item in other_sequence, but limit it's p_len to that of base_sequence. @@ -240,7 +237,7 @@ def other_sequence_no_longer_than(base_sequence: IteratorOrIterable, return -def shift(iterable_: tp.Union[tp.Reversible[T], IteratorOrIterable], +def shift(iterable_: tp.Union[tp.Reversible[T], Iteratable], shift_factor: int) -> tp.Iterator[T]: """ Return this sequence, but shifted by factor elements, so that elements will appear @@ -277,7 +274,7 @@ def shift(iterable_: tp.Union[tp.Reversible[T], IteratorOrIterable], @silence_excs(StopIteration) -def zip_shifted(*args: tp.Union[IteratorOrIterable, tp.Tuple[IteratorOrIterable, int]]) -> \ +def zip_shifted(*args: tp.Union[Iteratable, tp.Tuple[Iteratable, int]]) -> \ tp.Iterator[tp.Tuple[T, ...]]: """ Construct an iterator, just like zip but first by cycling it's elements by it's shift factor. @@ -314,7 +311,7 @@ def zip_shifted(*args: tp.Union[IteratorOrIterable, tp.Tuple[IteratorOrIterable, @for_argument(iter) @silence_excs(StopIteration) -def skip_first(iterator: IteratorOrIterable, n: int) -> tp.Iterator[T]: +def skip_first(iterator: Iteratable, n: int) -> tp.Iterator[T]: """ Skip first n elements from given iterator. @@ -364,7 +361,7 @@ class ListWrapperIterator(tp.Generic[T]): """ __slots__ = ('iterator', 'exhausted', 'list') - def __init__(self, iterator: IteratorOrIterable): + def __init__(self, iterator: Iteratable): self.iterator = iter(iterator) self.exhausted = False self.list = [] @@ -424,7 +421,7 @@ class ListWrapperIterator(tp.Generic[T]): @silence_excs(StopIteration) @for_argument(iter) -def stop_after(iterator: IteratorOrIterable, n: int) -> tp.Iterator[T]: +def stop_after(iterator: Iteratable, n: int) -> tp.Iterator[T]: """ Stop this iterator after returning n elements, even if it's longer than that. @@ -441,7 +438,7 @@ def stop_after(iterator: IteratorOrIterable, n: int) -> tp.Iterator[T]: @for_argument(iter) -def n_th(iterator: IteratorOrIterable, n: int = 0) -> T: +def n_th(iterator: Iteratable, n: int = 0) -> T: """ Obtain n-th element (counting from 0) of an iterable @@ -510,7 +507,7 @@ class IteratorListAdapter: @silence_excs(StopIteration, returns=True) -def is_empty(iterable: IteratorOrIterable, exhaust: bool = True) -> bool: +def is_empty(iterable: Iteratable, exhaust: bool = True) -> bool: """ Checks whether an iterator is empty. @@ -532,7 +529,7 @@ def is_empty(iterable: IteratorOrIterable, exhaust: bool = True) -> bool: return False -def map_list(fun: tp.Callable, iterable: IteratorOrIterable) -> tp.List: +def map_list(fun: tp.Callable, iterable: Iteratable) -> tp.List: """ A syntactic sugar for @@ -568,7 +565,7 @@ def to_iterator(fun): return inner -def smart_zip(*iterators: IteratorOrIterable) -> tp.Iterator[tp.Tuple[T, ...]]: +def smart_zip(*iterators: Iteratable) -> tp.Iterator[tp.Tuple[T, ...]]: """ Zip in such a way that resulted tuples are automatically expanded. @@ -593,7 +590,7 @@ def smart_zip(*iterators: IteratorOrIterable) -> tp.Iterator[tp.Tuple[T, ...]]: yield tuple(a) -def enumerate2(iterable: IteratorOrIterable, start: int = 0, +def enumerate2(iterable: Iteratable, start: int = 0, step: int = 1) -> tp.Iterator[tp.Tuple[int, T]]: """ Enumerate with a custom step @@ -608,7 +605,7 @@ def enumerate2(iterable: IteratorOrIterable, start: int = 0, v += step -def smart_enumerate(iterator: IteratorOrIterable, start: int = 0, +def smart_enumerate(iterator: Iteratable, start: int = 0, step: int = 1) -> tp.Iterator[tp.Tuple]: """ An enumerate that talks pretty with lists of tuples. Consider @@ -640,7 +637,7 @@ def smart_enumerate(iterator: IteratorOrIterable, start: int = 0, @for_argument(iter) -def take_n(iterator: IteratorOrIterable, n: int, skip: int = 0) -> tp.List[T]: +def take_n(iterator: Iteratable, n: int, skip: int = 0) -> tp.List[T]: """ Take (first) n elements of an iterator, or the entire iterator, whichever comes first diff --git a/satella/coding/sequences/sequences.py b/satella/coding/sequences/sequences.py index b39765f5..47ddaade 100644 --- a/satella/coding/sequences/sequences.py +++ b/satella/coding/sequences/sequences.py @@ -1 +1 @@ -import copy import typing as tp from satella.coding.decorators.decorators import wraps from .iterators import n_th from satella.coding.recast_exceptions import rethrow_as T = tp.TypeVar('T') U = tp.TypeVar('U') IteratorOrIterable = tp.Union[tp.Iterator[T], tp.Iterable[T]] def infinite_iterator(returns: tp.Optional[T] = None, return_factory: tp.Optional[tp.Callable[[], T]] = None) -> tp.Iterator[T]: """ Return an infinite number of objects. :param returns: object to return. Note that this will be this very object, it will not be copied. :param return_factory: a callable that takes 0 args and returns an element to return. :return: an infinite iterator of provided values """ while True: if returns is None: if return_factory is None: yield None else: yield return_factory() else: yield returns def make_list(element: T, n: int, deep_copy: bool = False) -> tp.List[T]: """ Make a list consisting of n times element. Element will be copied via copy.copy before adding to list. :param element: element :param n: times to repeat the element :param deep_copy: whether to use copy.deepcopy instead of copy.copy :return: list of length n """ output = [] if deep_copy: copy_op = copy.deepcopy else: copy_op = copy.copy for _ in range(n): output.append(copy_op(element)) return output # shamelessly copied from # https://medium.com/better-programming/is-this-the-last-element-of-my-python-for-loop-784f5ff90bb5 def is_last(lst: IteratorOrIterable) -> tp.Iterator[tp.Tuple[bool, T]]: """ Return every element of the list, alongside a flag telling is this the last element. Use like: >>> for is_last, element in is_last(my_list): >>> if is_last: >>> ... :param lst: list to iterate thru :return: a p_gen returning (bool, T) Note that this returns a nice, O(1) iterator. """ iterable = iter(lst) ret_var = next(iterable) for val in iterable: yield False, ret_var ret_var = val yield True, ret_var def add_next(lst: IteratorOrIterable, wrap_over: bool = False, skip_last: bool = False) -> tp.Iterator[tp.Tuple[T, tp.Optional[T]]]: """ Yields a 2-tuple of given iterable, presenting the next element as second element of the tuple. The last element will be the last element alongside with a None, if wrap_over is False, or the first element if wrap_over was True Example: >>> list(add_next([1, 2, 3, 4, 5])) == [(1, 2), (2, 3), (3, 4), (4, 5), (5, None)] >>> list(add_next([1, 2, 3, 4, 5], True)) == [(1, 2), (2, 3), (3, 4), (4, 5), (5, 1)] :param lst: iterable to iterate over :param wrap_over: whether to attach the first element to the pair of the last element instead of None :param skip_last: if this is True, then last element, alongside with a None, won't be output """ iterator = iter(lst) try: first_val = prev_val = next(iterator) except StopIteration: return for val in iterator: yield prev_val, val prev_val = val if wrap_over: yield prev_val, first_val else: if not skip_last: yield prev_val, None def half_cartesian(seq: tp.Iterable[T], include_same_pairs: bool = True) -> tp.Iterator[tp.Tuple[T, T]]: """ Generate half of the Cartesian product of both sequences. Useful when you have a commutative operation that you'd like to execute on both elements (eg. checking for collisions). Example: >>> list(half_cartesian([1, 2, 3], [1, 2, 3])) == \ >>> [(1, 1), (1, 2), (1, 3), (2, 2), (2, 3), (3, 3)] :param seq: The sequence :param include_same_pairs: if True, then pairs returning two of the same objects will be returned. For example, if False, the following will be true: >>> list(half_cartesian([1, 2, 3], [1, 2, 3], include_same_pairs=False)) == \ >>> [(1, 2), (1, 3), (2, 3)] """ for i, elem1 in enumerate(seq): for j, elem2 in enumerate(seq): if include_same_pairs: if j >= i: yield elem1, elem2 else: if j > i: yield elem1, elem2 def group_quantity(length: int, seq: IteratorOrIterable) -> tp.Iterator[tp.List[T]]: """ Slice an iterable into lists containing at most len entries. Eg. >>> assert list(group_quantity(3, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])) == [[1, 2, 3], [4, 5, 6], >>> [7, 8, 9], [10]] This correctly detects sequences, and uses an optimized variant via slicing if a sequence is passed. You can safely pass ranges :param length: p_len for the returning sequences :param seq: sequence to split """ if isinstance(seq, tp.Sequence) and not isinstance(seq, range): i = 0 while i < len(seq): yield seq[i:i + length] i += length else: entries = [] for elem in seq: if len(entries) == length: yield entries entries = [elem] else: entries.append(elem) if entries: yield entries def filter_out_nones(y: tp.Sequence[T]) -> tp.List[T]: """ Return all elements, as a list, that are not None :param y: a sequence of items :return: a list of all subelements, in order, that are not None """ output = [] for item in y: if item is not None: output.append(item) return output def filter_out_false(y: tp.Sequence[T]) -> tp.List[T]: """ Return all elements, as a list, that are True :param y: a sequence of items :return: a list of all subelements, in order, that are not None """ output = [] for item in y: if item: output.append(item) return output @rethrow_as(IndexError, ValueError) def index_of_max(seq: tp.Sequence[T]) -> int: """ Return the index of the maximum element :param seq: sequence to examine :return: index of the maximum element :raise ValueError: sequence was empty """ max_index = 0 max_elem = seq[0] for i, elem in enumerate(seq): if elem > max_elem: max_index = i max_elem = elem return max_index def index_of(predicate: tp.Callable[[T], bool], seq: tp.Sequence[T]) -> int: """ Return an index of first met element that calling predicate on it returns True :param predicate: predicate to apply :param seq: sequence to examine :return: index of the element :raises ValueError: if no element found """ i = 0 for elem in seq: if predicate(elem): return i i +=1 raise ValueError('Element not found') class Multirun: """ A class to launch the same operation on the entire sequence. Consider: >>> class Counter: >>> def __init__(self, value=0): >>> self.count = value >>> def add(self, v): >>> self.count += 1 >>> def __eq__(self, other): >>> return self.count == other.count >>> def __iadd__(self, other): >>> self.add(other) >>> a = [Counter(), Counter()] The following: >>> for b in a: >>> b.add(2) Can be replaced with >>> Multirun(a).add(2) And the following: >>> for b in a: >>> b += 3 With this >>> b = Mulirun(a) >>> b += 3 Furthermore note that: >>> Multirun(a).add(2) == [Counter(2), Counter(2)] :param sequence: sequence to execute these operations for :param dont_return_list: the operation won't return a list if this is True """ __slots__ = ('sequence', 'dont_return_list') def __bool__(self) -> bool: return bool(self.sequence) def __init__(self, sequence: tp.Iterable, dont_return_list: bool = False): self.sequence = sequence self.dont_return_list = dont_return_list def __iter__(self): return iter(self.sequence) def __getattr__(self, item): def inner(*args, **kwargs): if not self.dont_return_list: results = [] for element in self: getattr(element, item)(*args, **kwargs) results.append(element) return results else: for element in self: getattr(element, item)(*args, **kwargs) # Take care: the array might just be empty... try: fun = getattr(n_th(self), item) inner = wraps(fun)(inner) except IndexError: pass return inner def __iadd__(self, other): for element in self: element += other return self def __isub__(self, other): for element in self: element -= other return self def __imul__(self, other): for element in self: element *= other return self def __itruediv__(self, other): for element in self: element /= other return self def __ifloordiv__(self, other): for element in self: element //= other return self def __ilshift__(self, other): for element in self: element <<= other return self def __irshift__(self, other): for element in self: element >>= other return self def __ipow__(self, other): for element in self: element **= other return self \ No newline at end of file +import copy import typing as tp from satella.coding.decorators.decorators import wraps from .iterators import n_th from satella.coding.recast_exceptions import rethrow_as from ..typing import T, Iteratable def infinite_iterator(returns: tp.Optional[T] = None, return_factory: tp.Optional[tp.Callable[[], T]] = None) -> tp.Iterator[T]: """ Return an infinite number of objects. :param returns: object to return. Note that this will be this very object, it will not be copied. :param return_factory: a callable that takes 0 args and returns an element to return. :return: an infinite iterator of provided values """ while True: if returns is None: if return_factory is None: yield None else: yield return_factory() else: yield returns def make_list(element: T, n: int, deep_copy: bool = False) -> tp.List[T]: """ Make a list consisting of n times element. Element will be copied via copy.copy before adding to list. :param element: element :param n: times to repeat the element :param deep_copy: whether to use copy.deepcopy instead of copy.copy :return: list of length n """ output = [] if deep_copy: copy_op = copy.deepcopy else: copy_op = copy.copy for _ in range(n): output.append(copy_op(element)) return output # shamelessly copied from # https://medium.com/better-programming/is-this-the-last-element-of-my-python-for-loop-784f5ff90bb5 def is_last(lst: Iteratable) -> tp.Iterator[tp.Tuple[bool, T]]: """ Return every element of the list, alongside a flag telling is this the last element. Use like: >>> for is_last, element in is_last(my_list): >>> if is_last: >>> ... :param lst: list to iterate thru :return: a p_gen returning (bool, T) Note that this returns a nice, O(1) iterator. """ iterable = iter(lst) ret_var = next(iterable) for val in iterable: yield False, ret_var ret_var = val yield True, ret_var def add_next(lst: Iteratable, wrap_over: bool = False, skip_last: bool = False) -> tp.Iterator[tp.Tuple[T, tp.Optional[T]]]: """ Yields a 2-tuple of given iterable, presenting the next element as second element of the tuple. The last element will be the last element alongside with a None, if wrap_over is False, or the first element if wrap_over was True Example: >>> list(add_next([1, 2, 3, 4, 5])) == [(1, 2), (2, 3), (3, 4), (4, 5), (5, None)] >>> list(add_next([1, 2, 3, 4, 5], True)) == [(1, 2), (2, 3), (3, 4), (4, 5), (5, 1)] :param lst: iterable to iterate over :param wrap_over: whether to attach the first element to the pair of the last element instead of None :param skip_last: if this is True, then last element, alongside with a None, won't be output """ iterator = iter(lst) try: first_val = prev_val = next(iterator) except StopIteration: return for val in iterator: yield prev_val, val prev_val = val if wrap_over: yield prev_val, first_val else: if not skip_last: yield prev_val, None def half_cartesian(seq: tp.Iterable[T], include_same_pairs: bool = True) -> tp.Iterator[tp.Tuple[T, T]]: """ Generate half of the Cartesian product of both sequences. Useful when you have a commutative operation that you'd like to execute on both elements (eg. checking for collisions). Example: >>> list(half_cartesian([1, 2, 3], [1, 2, 3])) == \ >>> [(1, 1), (1, 2), (1, 3), (2, 2), (2, 3), (3, 3)] :param seq: The sequence :param include_same_pairs: if True, then pairs returning two of the same objects will be returned. For example, if False, the following will be true: >>> list(half_cartesian([1, 2, 3], [1, 2, 3], include_same_pairs=False)) == \ >>> [(1, 2), (1, 3), (2, 3)] """ for i, elem1 in enumerate(seq): for j, elem2 in enumerate(seq): if include_same_pairs: if j >= i: yield elem1, elem2 else: if j > i: yield elem1, elem2 def group_quantity(length: int, seq: Iteratable) -> tp.Iterator[tp.List[T]]: """ Slice an iterable into lists containing at most len entries. Eg. >>> assert list(group_quantity(3, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])) == [[1, 2, 3], [4, 5, 6], >>> [7, 8, 9], [10]] This correctly detects sequences, and uses an optimized variant via slicing if a sequence is passed. You can safely pass ranges :param length: p_len for the returning sequences :param seq: sequence to split """ if isinstance(seq, tp.Sequence) and not isinstance(seq, range): i = 0 while i < len(seq): yield seq[i:i + length] i += length else: entries = [] for elem in seq: if len(entries) == length: yield entries entries = [elem] else: entries.append(elem) if entries: yield entries def filter_out_nones(y: tp.Sequence[T]) -> tp.List[T]: """ Return all elements, as a list, that are not None :param y: a sequence of items :return: a list of all subelements, in order, that are not None """ output = [] for item in y: if item is not None: output.append(item) return output def filter_out_false(y: tp.Sequence[T]) -> tp.List[T]: """ Return all elements, as a list, that are True :param y: a sequence of items :return: a list of all subelements, in order, that are not None """ output = [] for item in y: if item: output.append(item) return output @rethrow_as(IndexError, ValueError) def index_of_max(seq: tp.Sequence[T]) -> int: """ Return the index of the maximum element :param seq: sequence to examine :return: index of the maximum element :raise ValueError: sequence was empty """ max_index = 0 max_elem = seq[0] for i, elem in enumerate(seq): if elem > max_elem: max_index = i max_elem = elem return max_index def index_of(predicate: tp.Callable[[T], bool], seq: tp.Sequence[T]) -> int: """ Return an index of first met element that calling predicate on it returns True :param predicate: predicate to apply :param seq: sequence to examine :return: index of the element :raises ValueError: if no element found """ i = 0 for elem in seq: if predicate(elem): return i i +=1 raise ValueError('Element not found') class Multirun: """ A class to launch the same operation on the entire sequence. Consider: >>> class Counter: >>> def __init__(self, value=0): >>> self.count = value >>> def add(self, v): >>> self.count += 1 >>> def __eq__(self, other): >>> return self.count == other.count >>> def __iadd__(self, other): >>> self.add(other) >>> a = [Counter(), Counter()] The following: >>> for b in a: >>> b.add(2) Can be replaced with >>> Multirun(a).add(2) And the following: >>> for b in a: >>> b += 3 With this >>> b = Mulirun(a) >>> b += 3 Furthermore note that: >>> Multirun(a).add(2) == [Counter(2), Counter(2)] :param sequence: sequence to execute these operations for :param dont_return_list: the operation won't return a list if this is True """ __slots__ = ('sequence', 'dont_return_list') def __bool__(self) -> bool: return bool(self.sequence) def __init__(self, sequence: tp.Iterable, dont_return_list: bool = False): self.sequence = sequence self.dont_return_list = dont_return_list def __iter__(self): return iter(self.sequence) def __getattr__(self, item): def inner(*args, **kwargs): if not self.dont_return_list: results = [] for element in self: getattr(element, item)(*args, **kwargs) results.append(element) return results else: for element in self: getattr(element, item)(*args, **kwargs) # Take care: the array might just be empty... try: fun = getattr(n_th(self), item) inner = wraps(fun)(inner) except IndexError: pass return inner def __iadd__(self, other): for element in self: element += other return self def __isub__(self, other): for element in self: element -= other return self def __imul__(self, other): for element in self: element *= other return self def __itruediv__(self, other): for element in self: element /= other return self def __ifloordiv__(self, other): for element in self: element //= other return self def __ilshift__(self, other): for element in self: element <<= other return self def __irshift__(self, other): for element in self: element >>= other return self def __ipow__(self, other): for element in self: element **= other return self \ No newline at end of file diff --git a/satella/coding/structures/dictionaries/cache_dict.py b/satella/coding/structures/dictionaries/cache_dict.py index df3d3aa0..7804ee35 100644 --- a/satella/coding/structures/dictionaries/cache_dict.py +++ b/satella/coding/structures/dictionaries/cache_dict.py @@ -4,10 +4,9 @@ import typing as tp from concurrent.futures import ThreadPoolExecutor, Executor, Future from satella.coding.recast_exceptions import silence_excs +from satella.coding.typing import K, V logger = logging.getLogger(__name__) -K = tp.TypeVar('K') -V = tp.TypeVar('V') class CacheDict(tp.Mapping[K, V]): diff --git a/satella/coding/structures/dictionaries/dict_object.py b/satella/coding/structures/dictionaries/dict_object.py index e1d1cf36..f662d6ef 100644 --- a/satella/coding/structures/dictionaries/dict_object.py +++ b/satella/coding/structures/dictionaries/dict_object.py @@ -5,7 +5,7 @@ from satella.coding.recast_exceptions import rethrow_as from satella.configuration.schema import Descriptor, descriptor_from_dict from satella.exceptions import ConfigurationValidationError -T = tp.TypeVar('T') +from satella.coding.typing import T class DictObject(dict, tp.MutableMapping[tp.Hashable, T]): diff --git a/satella/coding/structures/dictionaries/expiring.py b/satella/coding/structures/dictionaries/expiring.py index 64a6691f..c3ac0776 100644 --- a/satella/coding/structures/dictionaries/expiring.py +++ b/satella/coding/structures/dictionaries/expiring.py @@ -8,9 +8,7 @@ from ..heaps import TimeBasedSetHeap from ..singleton import Singleton from ...concurrent.monitor import Monitor from ...recast_exceptions import rethrow_as, silence_excs - -K = tp.TypeVar('K') -V = tp.TypeVar('V') +from ...typing import K, V class Cleanupable(metaclass=ABCMeta): diff --git a/satella/coding/structures/dictionaries/objects.py b/satella/coding/structures/dictionaries/objects.py index e276401c..d1dacbfb 100644 --- a/satella/coding/structures/dictionaries/objects.py +++ b/satella/coding/structures/dictionaries/objects.py @@ -1,9 +1,7 @@ import copy import typing as tp -K = tp.TypeVar('K') -V = tp.TypeVar('V') -T = tp.TypeVar('T') +from satella.coding.typing import T, K, V class DirtyDict(tp.MutableMapping[K, V]): diff --git a/satella/coding/structures/dictionaries/writeback_cache.py b/satella/coding/structures/dictionaries/writeback_cache.py index 674482d6..6d5a1693 100644 --- a/satella/coding/structures/dictionaries/writeback_cache.py +++ b/satella/coding/structures/dictionaries/writeback_cache.py @@ -5,9 +5,7 @@ from concurrent.futures import Executor, ThreadPoolExecutor, wait, ProcessPoolEx from satella.coding.concurrent.monitor import Monitor from satella.coding.recast_exceptions import silence_excs - -K = tp.TypeVar('K') -V = tp.TypeVar('V') +from satella.coding.typing import V, K class ExclusiveWritebackCache(tp.Generic[K, V]): diff --git a/satella/coding/structures/heaps/base.py b/satella/coding/structures/heaps/base.py index c734dfa8..f89b5947 100644 --- a/satella/coding/structures/heaps/base.py +++ b/satella/coding/structures/heaps/base.py @@ -5,7 +5,7 @@ import typing as tp from satella.coding.decorators import wraps -T = tp.TypeVar('T') +from satella.coding.typing import T def _extras_to_one(fun): diff --git a/satella/coding/structures/heaps/time.py b/satella/coding/structures/heaps/time.py index 1354d1af..a1b02021 100644 --- a/satella/coding/structures/heaps/time.py +++ b/satella/coding/structures/heaps/time.py @@ -6,7 +6,7 @@ from satella.coding.recast_exceptions import rethrow_as from .base import Heap Number = tp.Union[int, float] -T = tp.TypeVar('T') +from satella.coding.typing import T class TimeBasedHeap(Heap): diff --git a/satella/coding/structures/proxy.py b/satella/coding/structures/proxy.py index 2d5a3ac1..8ead232b 100644 --- a/satella/coding/structures/proxy.py +++ b/satella/coding/structures/proxy.py @@ -5,7 +5,7 @@ import typing as tp from satella.coding.decorators import wraps from satella.coding.recast_exceptions import rethrow_as -T = tp.TypeVar('T') +from satella.coding.typing import T logger = logging.getLogger(__name__) _SETTABLE_KEYS = {'_Proxy__obj', '_Proxy__wrap_operations'} diff --git a/satella/coding/structures/queues.py b/satella/coding/structures/queues.py index 74e29a65..69c47bec 100644 --- a/satella/coding/structures/queues.py +++ b/satella/coding/structures/queues.py @@ -4,7 +4,7 @@ import queue from satella.coding.recast_exceptions import silence_excs from satella.coding.concurrent.monitor import Monitor -T = tp.TypeVar('T') +from satella.coding.typing import T class Subqueue(tp.Generic[T]): diff --git a/satella/coding/structures/ranking.py b/satella/coding/structures/ranking.py index 1cec577f..704c8b85 100644 --- a/satella/coding/structures/ranking.py +++ b/satella/coding/structures/ranking.py @@ -3,7 +3,7 @@ import typing as tp from .sorted_list import SortedList -T = tp.TypeVar('T') +from satella.coding.typing import T class Ranking(tp.Generic[T]): diff --git a/satella/coding/structures/sorted_list.py b/satella/coding/structures/sorted_list.py index 380665cb..dd2589c9 100644 --- a/satella/coding/structures/sorted_list.py +++ b/satella/coding/structures/sorted_list.py @@ -1,7 +1,7 @@ import collections import typing as tp -T = tp.TypeVar('T') +from satella.coding.typing import T class SortedList(tp.Generic[T]): diff --git a/satella/coding/structures/sparse_matrix.py b/satella/coding/structures/sparse_matrix.py index 786e4869..39de91d7 100644 --- a/satella/coding/structures/sparse_matrix.py +++ b/satella/coding/structures/sparse_matrix.py @@ -3,7 +3,7 @@ import typing as tp from satella.coding.recast_exceptions import silence_excs -T = tp.TypeVar('T') +from satella.coding.typing import T KeyArg = tp.Tuple[tp.Union[int, slice], tp.Union[int, slice]] diff --git a/satella/coding/transforms/__init__.py b/satella/coding/transforms/__init__.py index e1300dac..6645324e 100644 --- a/satella/coding/transforms/__init__.py +++ b/satella/coding/transforms/__init__.py @@ -10,7 +10,7 @@ __all__ = ['stringify', 'split_shuffle_and_join', 'one_tuple', 'merge_series', 'pad_to_multiple_of_length', 'clip', 'jsonify', 'intify'] -T = tp.TypeVar('T') +from satella.coding.typing import T Number = tp.Union[int, float] @@ -68,7 +68,7 @@ def _stringify_none(str_none, stringifier): return None -T = tp.TypeVar('T') +from satella.coding.typing import T def one_tuple(x: tp.Iterable[T]) -> tp.Iterator[tp.Tuple[T]]: diff --git a/satella/coding/typing.py b/satella/coding/typing.py new file mode 100644 index 00000000..9fe6b808 --- /dev/null +++ b/satella/coding/typing.py @@ -0,0 +1,12 @@ +import typing as tp + +T = tp.TypeVar('T') +Iteratable = tp.Union[tp.Iterator[T], tp.Iterable[T]] +U = tp.TypeVar('U') +V = tp.TypeVar('V') +K = tp.TypeVar('K') +Number = tp.Union[int, float] + +ExceptionClassType = tp.Type[Exception] + +__all__ = ['Iteratable', 'T', 'U', 'V', 'K', 'Number', 'ExceptionClassType'] diff --git a/tests/test_coding/test_typing.py b/tests/test_coding/test_typing.py new file mode 100644 index 00000000..63b028af --- /dev/null +++ b/tests/test_coding/test_typing.py @@ -0,0 +1,14 @@ +import unittest + +from satella.coding.typing import ExceptionClassType + + +class TestTyping(unittest.TestCase): + def test_exception_type(self): + def catch_exception(e) -> ExceptionClassType: + try: + e() + except Exception as e: + return e.__class__ + + a = catch_exception(lambda: hello) -- GitLab