Skip to content
Snippets Groups Projects
Commit 56cbc83b authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

add typing

parent 0d501267
No related branches found
No related tags found
No related merge requests found
Showing
with 61 additions and 61 deletions
# v2.11.36 # v2.12
* added `intify` * added `intify`
* added `satella.coding.typing`
Typing
======
Satella contains some expressions to help you with typing.
You import them from `satella.coding.typing`.
They are as follows:
* `ExceptionClassType` - base type of exception class
* `Number` - an amalgam of int and float
* `T`, `U`, `K`, `V` - type vars to use
* `Iteratable` - a generic iterator or an iterable of `T`
...@@ -19,6 +19,7 @@ Visit the project's page at GitHub_! ...@@ -19,6 +19,7 @@ Visit the project's page at GitHub_!
coding/concurrent coding/concurrent
coding/sequences coding/sequences
coding/transforms coding/transforms
coding/typing
instrumentation/traceback instrumentation/traceback
instrumentation/memory instrumentation/memory
instrumentation/metrics instrumentation/metrics
......
...@@ -3,7 +3,7 @@ import copy ...@@ -3,7 +3,7 @@ import copy
import time import time
import typing as tp import typing as tp
T = tp.TypeVar('T') from satella.coding.typing import T
class CallableGroup(tp.Generic[T]): class CallableGroup(tp.Generic[T]):
......
...@@ -6,7 +6,7 @@ from satella.coding.decorators.decorators import wraps ...@@ -6,7 +6,7 @@ from satella.coding.decorators.decorators import wraps
from satella.coding.sequences.sequences import infinite_iterator from satella.coding.sequences.sequences import infinite_iterator
T = tp.TypeVar('T') from satella.coding.typing import T
def run_as_future(fun): def run_as_future(fun):
......
...@@ -3,7 +3,7 @@ import typing as tp ...@@ -3,7 +3,7 @@ import typing as tp
from ..structures.proxy import Proxy from ..structures.proxy import Proxy
T = tp.TypeVar('T') from satella.coding.typing import T
class LockedStructure(Proxy, tp.Generic[T]): class LockedStructure(Proxy, tp.Generic[T]):
......
...@@ -9,9 +9,7 @@ __all__ = [ ...@@ -9,9 +9,7 @@ __all__ = [
'Monitor', 'RMonitor', 'MonitorDict', 'MonitorList' 'Monitor', 'RMonitor', 'MonitorDict', 'MonitorList'
] ]
K = tp.TypeVar('K') from ..typing import K, V, T
V = tp.TypeVar('V')
T = tp.TypeVar('T')
class Monitor: class Monitor:
......
...@@ -7,7 +7,7 @@ from inspect import Parameter ...@@ -7,7 +7,7 @@ from inspect import Parameter
from .decorators import wraps from .decorators import wraps
from ..misc import source_to_function, get_arguments, call_with_arguments, _get_arguments from ..misc import source_to_function, get_arguments, call_with_arguments, _get_arguments
T = tp.TypeVar('T') from satella.coding.typing import T
U = tp.TypeVar('U') U = tp.TypeVar('U')
......
...@@ -2,12 +2,11 @@ import inspect ...@@ -2,12 +2,11 @@ import inspect
import typing as tp import typing as tp
import warnings import warnings
from satella.coding.typing import T, U
from satella.exceptions import PreconditionError from satella.exceptions import PreconditionError
T = tp.TypeVar('T')
U = tp.TypeVar('U')
Expression = tp.NewType('Expression', str) Expression = tp.NewType('Expression', str)
ExcType = tp.Type[Exception]
# noinspection PyPep8Naming # noinspection PyPep8Naming
......
import inspect
import typing as tp import typing as tp
import queue import queue
from .decorators import wraps, ExcType from .decorators import wraps
from ..typing import ExceptionClassType
Queue = tp.TypeVar('Queue') Queue = tp.TypeVar('Queue')
def queue_get(queue_getter: tp.Union[str, tp.Callable[[object], Queue]], def queue_get(queue_getter: tp.Union[str, tp.Callable[[object], Queue]],
timeout: tp.Optional[float] = None, timeout: tp.Optional[float] = None,
exception_empty: tp.Union[ExcType, tp.Tuple[ExcType, ...]] = queue.Empty, exception_empty: tp.Union[ExceptionClassType, tp.Tuple[ExceptionClassType, ...]] = queue.Empty,
queue_get_method: tp.Callable[[Queue, tp.Optional[float]], tp.Any] = queue_get_method: tp.Callable[[Queue, tp.Optional[float]], tp.Any] =
lambda x, timeout: x.get( lambda x, timeout: x.get(
timeout=timeout), timeout=timeout),
......
...@@ -5,7 +5,7 @@ from satella.exceptions import PreconditionError ...@@ -5,7 +5,7 @@ from satella.exceptions import PreconditionError
from .decorators import wraps from .decorators import wraps
from ..misc import source_to_function from ..misc import source_to_function
T = tp.TypeVar('T') from satella.coding.typing import T
Expression = tp.NewType('Expression', str) Expression = tp.NewType('Expression', str)
Condition = tp.Union[tp.Callable[[T], bool], Expression] Condition = tp.Union[tp.Callable[[T], bool], Expression]
......
...@@ -2,10 +2,11 @@ import copy ...@@ -2,10 +2,11 @@ import copy
import typing as tp import typing as tp
import collections import collections
from satella.coding.typing import T
ITER_KEYS = 0 ITER_KEYS = 0
ITER_VALUES = 1 ITER_VALUES = 1
ITER_ITEMS = 2 ITER_ITEMS = 2
T = tp.TypeVar('T')
class DictDeleter: class DictDeleter:
......
...@@ -4,12 +4,10 @@ import threading ...@@ -4,12 +4,10 @@ import threading
import typing as tp import typing as tp
from .decorators.decorators import wraps from .decorators.decorators import wraps
from .typing import ExceptionClassType, T
ExcType = tp.Type[Exception]
T = tp.TypeVar('T')
def silence_excs(*exc_types: ExceptionClassType, returns=None,
def silence_excs(*exc_types: ExcType, returns=None,
returns_factory: tp.Optional[tp.Callable[[], tp.Any]] = None): returns_factory: tp.Optional[tp.Callable[[], tp.Any]] = None):
""" """
Silence given exception types. Silence given exception types.
...@@ -68,7 +66,7 @@ class log_exceptions: ...@@ -68,7 +66,7 @@ class log_exceptions:
severity: int = logging.ERROR, severity: int = logging.ERROR,
format_string: str = '{e}', format_string: str = '{e}',
locals_: tp.Optional[tp.Dict] = None, locals_: tp.Optional[tp.Dict] = None,
exc_types: tp.Union[ExcType, tp.Sequence[ExcType]] = Exception, exc_types: tp.Union[ExceptionClassType, tp.Sequence[ExceptionClassType]] = Exception,
swallow_exception: bool = False): swallow_exception: bool = False):
self.logger = logger self.logger = logger
self.swallow_exception = swallow_exception self.swallow_exception = swallow_exception
...@@ -162,7 +160,7 @@ class rethrow_as: ...@@ -162,7 +160,7 @@ class rethrow_as:
__slots__ = ('mapping', 'exception_preprocessor', 'returns', '__exception_remapped', __slots__ = ('mapping', 'exception_preprocessor', 'returns', '__exception_remapped',
'returns_factory') 'returns_factory')
def __init__(self, *pairs: tp.Union[ExcType, tp.Tuple[ExcType, ...]], def __init__(self, *pairs: tp.Union[ExceptionClassType, tp.Tuple[ExceptionClassType, ...]],
exception_preprocessor: tp.Optional[tp.Callable[[Exception], str]] = repr, exception_preprocessor: tp.Optional[tp.Callable[[Exception], str]] = repr,
returns=None, returns=None,
returns_factory: tp.Optional[tp.Callable[[], tp.Any]] = None): returns_factory: tp.Optional[tp.Callable[[], tp.Any]] = None):
...@@ -221,7 +219,7 @@ class rethrow_as: ...@@ -221,7 +219,7 @@ class rethrow_as:
raise to(self.exception_preprocessor(exc_val)) raise to(self.exception_preprocessor(exc_val))
def raises_exception(exc_class: tp.Union[ExcType, tp.Tuple[ExcType, ...]], def raises_exception(exc_class: tp.Union[ExceptionClassType, tp.Tuple[ExceptionClassType, ...]],
clb: tp.Callable[[], None]) -> bool: clb: tp.Callable[[], None]) -> bool:
""" """
Does the callable raise a given exception? Does the callable raise a given exception?
...@@ -234,7 +232,7 @@ def raises_exception(exc_class: tp.Union[ExcType, tp.Tuple[ExcType, ...]], ...@@ -234,7 +232,7 @@ def raises_exception(exc_class: tp.Union[ExcType, tp.Tuple[ExcType, ...]],
return False return False
def catch_exception(exc_class: tp.Union[ExcType, tp.Tuple[ExcType, ...]], def catch_exception(exc_class: tp.Union[ExceptionClassType, tp.Tuple[ExceptionClassType, ...]],
clb: tp.Callable[[], tp.Optional[T]], clb: tp.Callable[[], tp.Optional[T]],
return_instead: tp.Optional[T] = None, return_instead: tp.Optional[T] = None,
return_value_on_no_exception: bool = False) -> tp.Union[Exception, T]: return_value_on_no_exception: bool = False) -> tp.Union[Exception, T]:
......
import typing as tp import typing as tp
IteratorOrIterable = tp.Union[tp.Iterator, tp.Iterable] from satella.coding.typing import Iteratable
def choose_one(filter_fun: tp.Callable[[tp.Any], bool], iterable: IteratorOrIterable) -> tp.Any: def choose_one(filter_fun: tp.Callable[[tp.Any], bool], iterable: Iteratable) -> tp.Any:
""" """
Syntactic sugar for Syntactic sugar for
...@@ -19,7 +19,7 @@ def choose_one(filter_fun: tp.Callable[[tp.Any], bool], iterable: IteratorOrIter ...@@ -19,7 +19,7 @@ def choose_one(filter_fun: tp.Callable[[tp.Any], bool], iterable: IteratorOrIter
return choose(filter_fun, iterable, True) return choose(filter_fun, iterable, True)
def choose(filter_fun: tp.Callable[[tp.Any], bool], iterable: IteratorOrIterable, def choose(filter_fun: tp.Callable[[tp.Any], bool], iterable: Iteratable,
check_multiple: bool = False) -> tp.Any: check_multiple: bool = False) -> tp.Any:
""" """
Return a single value that exists in given iterable. Return a single value that exists in given iterable.
......
...@@ -5,13 +5,10 @@ import warnings ...@@ -5,13 +5,10 @@ import warnings
from ..recast_exceptions import rethrow_as, silence_excs from ..recast_exceptions import rethrow_as, silence_excs
from ..decorators import for_argument, wraps from ..decorators import for_argument, wraps
from ..typing import Iteratable, T, U
T = tp.TypeVar('T')
U = tp.TypeVar('U')
IteratorOrIterable = tp.Union[tp.Iterator[T], tp.Iterable[T]]
def length(iterator: Iteratable) -> int:
def length(iterator: IteratorOrIterable) -> int:
""" """
Return the length of an iterator, exhausting it by the way Return the length of an iterator, exhausting it by the way
""" """
...@@ -114,7 +111,7 @@ class ConstruableIterator: ...@@ -114,7 +111,7 @@ class ConstruableIterator:
return len(self.entries) return len(self.entries)
def unique(lst: IteratorOrIterable) -> tp.Iterator[T]: def unique(lst: Iteratable) -> tp.Iterator[T]:
""" """
Return each element from lst, but return every element only once. Return each element from lst, but return every element only once.
...@@ -134,7 +131,7 @@ def unique(lst: IteratorOrIterable) -> tp.Iterator[T]: ...@@ -134,7 +131,7 @@ def unique(lst: IteratorOrIterable) -> tp.Iterator[T]:
@for_argument(iter) @for_argument(iter)
def even(sq: IteratorOrIterable) -> tp.Iterator[T]: def even(sq: Iteratable) -> tp.Iterator[T]:
""" """
Return only elements with even indices in this iterable (first element will be returned, Return only elements with even indices in this iterable (first element will be returned,
as indices are counted from 0) as indices are counted from 0)
...@@ -149,7 +146,7 @@ def even(sq: IteratorOrIterable) -> tp.Iterator[T]: ...@@ -149,7 +146,7 @@ def even(sq: IteratorOrIterable) -> tp.Iterator[T]:
@silence_excs(StopIteration) @silence_excs(StopIteration)
@for_argument(iter) @for_argument(iter)
def odd(sq: IteratorOrIterable) -> tp.Iterator[T]: def odd(sq: Iteratable) -> tp.Iterator[T]:
""" """
Return only elements with odd indices in this iterable. Return only elements with odd indices in this iterable.
""" """
...@@ -161,7 +158,7 @@ def odd(sq: IteratorOrIterable) -> tp.Iterator[T]: ...@@ -161,7 +158,7 @@ def odd(sq: IteratorOrIterable) -> tp.Iterator[T]:
return return
def count(sq: IteratorOrIterable, start: tp.Optional[int] = None, step: int = 1, def count(sq: Iteratable, start: tp.Optional[int] = None, step: int = 1,
start_at: tp.Optional[int] = None) -> tp.Iterator[int]: start_at: tp.Optional[int] = None) -> tp.Iterator[int]:
""" """
Return a sequence of integers, for each entry in the sequence with provided step. Return a sequence of integers, for each entry in the sequence with provided step.
...@@ -222,8 +219,8 @@ def is_instance(classes: tp.Union[tp.Tuple[type, ...], type]) -> tp.Callable[[ob ...@@ -222,8 +219,8 @@ def is_instance(classes: tp.Union[tp.Tuple[type, ...], type]) -> tp.Callable[[ob
@for_argument(iter, iter) @for_argument(iter, iter)
def other_sequence_no_longer_than(base_sequence: IteratorOrIterable, def other_sequence_no_longer_than(base_sequence: Iteratable,
other_sequence: IteratorOrIterable) -> tp.Iterator[T]: other_sequence: Iteratable) -> tp.Iterator[T]:
""" """
Return every item in other_sequence, but limit it's p_len to that of base_sequence. Return every item in other_sequence, but limit it's p_len to that of base_sequence.
...@@ -240,7 +237,7 @@ def other_sequence_no_longer_than(base_sequence: IteratorOrIterable, ...@@ -240,7 +237,7 @@ def other_sequence_no_longer_than(base_sequence: IteratorOrIterable,
return return
def shift(iterable_: tp.Union[tp.Reversible[T], IteratorOrIterable], def shift(iterable_: tp.Union[tp.Reversible[T], Iteratable],
shift_factor: int) -> tp.Iterator[T]: shift_factor: int) -> tp.Iterator[T]:
""" """
Return this sequence, but shifted by factor elements, so that elements will appear Return this sequence, but shifted by factor elements, so that elements will appear
...@@ -277,7 +274,7 @@ def shift(iterable_: tp.Union[tp.Reversible[T], IteratorOrIterable], ...@@ -277,7 +274,7 @@ def shift(iterable_: tp.Union[tp.Reversible[T], IteratorOrIterable],
@silence_excs(StopIteration) @silence_excs(StopIteration)
def zip_shifted(*args: tp.Union[IteratorOrIterable, tp.Tuple[IteratorOrIterable, int]]) -> \ def zip_shifted(*args: tp.Union[Iteratable, tp.Tuple[Iteratable, int]]) -> \
tp.Iterator[tp.Tuple[T, ...]]: tp.Iterator[tp.Tuple[T, ...]]:
""" """
Construct an iterator, just like zip but first by cycling it's elements by it's shift factor. Construct an iterator, just like zip but first by cycling it's elements by it's shift factor.
...@@ -314,7 +311,7 @@ def zip_shifted(*args: tp.Union[IteratorOrIterable, tp.Tuple[IteratorOrIterable, ...@@ -314,7 +311,7 @@ def zip_shifted(*args: tp.Union[IteratorOrIterable, tp.Tuple[IteratorOrIterable,
@for_argument(iter) @for_argument(iter)
@silence_excs(StopIteration) @silence_excs(StopIteration)
def skip_first(iterator: IteratorOrIterable, n: int) -> tp.Iterator[T]: def skip_first(iterator: Iteratable, n: int) -> tp.Iterator[T]:
""" """
Skip first n elements from given iterator. Skip first n elements from given iterator.
...@@ -364,7 +361,7 @@ class ListWrapperIterator(tp.Generic[T]): ...@@ -364,7 +361,7 @@ class ListWrapperIterator(tp.Generic[T]):
""" """
__slots__ = ('iterator', 'exhausted', 'list') __slots__ = ('iterator', 'exhausted', 'list')
def __init__(self, iterator: IteratorOrIterable): def __init__(self, iterator: Iteratable):
self.iterator = iter(iterator) self.iterator = iter(iterator)
self.exhausted = False self.exhausted = False
self.list = [] self.list = []
...@@ -424,7 +421,7 @@ class ListWrapperIterator(tp.Generic[T]): ...@@ -424,7 +421,7 @@ class ListWrapperIterator(tp.Generic[T]):
@silence_excs(StopIteration) @silence_excs(StopIteration)
@for_argument(iter) @for_argument(iter)
def stop_after(iterator: IteratorOrIterable, n: int) -> tp.Iterator[T]: def stop_after(iterator: Iteratable, n: int) -> tp.Iterator[T]:
""" """
Stop this iterator after returning n elements, even if it's longer than that. Stop this iterator after returning n elements, even if it's longer than that.
...@@ -441,7 +438,7 @@ def stop_after(iterator: IteratorOrIterable, n: int) -> tp.Iterator[T]: ...@@ -441,7 +438,7 @@ def stop_after(iterator: IteratorOrIterable, n: int) -> tp.Iterator[T]:
@for_argument(iter) @for_argument(iter)
def n_th(iterator: IteratorOrIterable, n: int = 0) -> T: def n_th(iterator: Iteratable, n: int = 0) -> T:
""" """
Obtain n-th element (counting from 0) of an iterable Obtain n-th element (counting from 0) of an iterable
...@@ -510,7 +507,7 @@ class IteratorListAdapter: ...@@ -510,7 +507,7 @@ class IteratorListAdapter:
@silence_excs(StopIteration, returns=True) @silence_excs(StopIteration, returns=True)
def is_empty(iterable: IteratorOrIterable, exhaust: bool = True) -> bool: def is_empty(iterable: Iteratable, exhaust: bool = True) -> bool:
""" """
Checks whether an iterator is empty. Checks whether an iterator is empty.
...@@ -532,7 +529,7 @@ def is_empty(iterable: IteratorOrIterable, exhaust: bool = True) -> bool: ...@@ -532,7 +529,7 @@ def is_empty(iterable: IteratorOrIterable, exhaust: bool = True) -> bool:
return False return False
def map_list(fun: tp.Callable, iterable: IteratorOrIterable) -> tp.List: def map_list(fun: tp.Callable, iterable: Iteratable) -> tp.List:
""" """
A syntactic sugar for A syntactic sugar for
...@@ -568,7 +565,7 @@ def to_iterator(fun): ...@@ -568,7 +565,7 @@ def to_iterator(fun):
return inner return inner
def smart_zip(*iterators: IteratorOrIterable) -> tp.Iterator[tp.Tuple[T, ...]]: def smart_zip(*iterators: Iteratable) -> tp.Iterator[tp.Tuple[T, ...]]:
""" """
Zip in such a way that resulted tuples are automatically expanded. Zip in such a way that resulted tuples are automatically expanded.
...@@ -593,7 +590,7 @@ def smart_zip(*iterators: IteratorOrIterable) -> tp.Iterator[tp.Tuple[T, ...]]: ...@@ -593,7 +590,7 @@ def smart_zip(*iterators: IteratorOrIterable) -> tp.Iterator[tp.Tuple[T, ...]]:
yield tuple(a) yield tuple(a)
def enumerate2(iterable: IteratorOrIterable, start: int = 0, def enumerate2(iterable: Iteratable, start: int = 0,
step: int = 1) -> tp.Iterator[tp.Tuple[int, T]]: step: int = 1) -> tp.Iterator[tp.Tuple[int, T]]:
""" """
Enumerate with a custom step Enumerate with a custom step
...@@ -608,7 +605,7 @@ def enumerate2(iterable: IteratorOrIterable, start: int = 0, ...@@ -608,7 +605,7 @@ def enumerate2(iterable: IteratorOrIterable, start: int = 0,
v += step v += step
def smart_enumerate(iterator: IteratorOrIterable, start: int = 0, def smart_enumerate(iterator: Iteratable, start: int = 0,
step: int = 1) -> tp.Iterator[tp.Tuple]: step: int = 1) -> tp.Iterator[tp.Tuple]:
""" """
An enumerate that talks pretty with lists of tuples. Consider An enumerate that talks pretty with lists of tuples. Consider
...@@ -640,7 +637,7 @@ def smart_enumerate(iterator: IteratorOrIterable, start: int = 0, ...@@ -640,7 +637,7 @@ def smart_enumerate(iterator: IteratorOrIterable, start: int = 0,
@for_argument(iter) @for_argument(iter)
def take_n(iterator: IteratorOrIterable, n: int, skip: int = 0) -> tp.List[T]: def take_n(iterator: Iteratable, n: int, skip: int = 0) -> tp.List[T]:
""" """
Take (first) n elements of an iterator, or the entire iterator, whichever comes first Take (first) n elements of an iterator, or the entire iterator, whichever comes first
......
import copyimport typing as tp from satella.coding.decorators.decorators import wrapsfrom .iterators import n_thfrom satella.coding.recast_exceptions import rethrow_as T = tp.TypeVar('T')U = tp.TypeVar('U')IteratorOrIterable = tp.Union[tp.Iterator[T], tp.Iterable[T]] def infinite_iterator(returns: tp.Optional[T] = None, return_factory: tp.Optional[tp.Callable[[], T]] = None) -> tp.Iterator[T]: """ Return an infinite number of objects. :param returns: object to return. Note that this will be this very object, it will not be copied. :param return_factory: a callable that takes 0 args and returns an element to return. :return: an infinite iterator of provided values """ while True: if returns is None: if return_factory is None: yield None else: yield return_factory() else: yield returns def make_list(element: T, n: int, deep_copy: bool = False) -> tp.List[T]: """ Make a list consisting of n times element. Element will be copied via copy.copy before adding to list. :param element: element :param n: times to repeat the element :param deep_copy: whether to use copy.deepcopy instead of copy.copy :return: list of length n """ output = [] if deep_copy: copy_op = copy.deepcopy else: copy_op = copy.copy for _ in range(n): output.append(copy_op(element)) return output # shamelessly copied from # https://medium.com/better-programming/is-this-the-last-element-of-my-python-for-loop-784f5ff90bb5 def is_last(lst: IteratorOrIterable) -> tp.Iterator[tp.Tuple[bool, T]]: """ Return every element of the list, alongside a flag telling is this the last element. Use like: >>> for is_last, element in is_last(my_list): >>> if is_last: >>> ... :param lst: list to iterate thru :return: a p_gen returning (bool, T) Note that this returns a nice, O(1) iterator. """ iterable = iter(lst) ret_var = next(iterable) for val in iterable: yield False, ret_var ret_var = val yield True, ret_var def add_next(lst: IteratorOrIterable, wrap_over: bool = False, skip_last: bool = False) -> tp.Iterator[tp.Tuple[T, tp.Optional[T]]]: """ Yields a 2-tuple of given iterable, presenting the next element as second element of the tuple. The last element will be the last element alongside with a None, if wrap_over is False, or the first element if wrap_over was True Example: >>> list(add_next([1, 2, 3, 4, 5])) == [(1, 2), (2, 3), (3, 4), (4, 5), (5, None)] >>> list(add_next([1, 2, 3, 4, 5], True)) == [(1, 2), (2, 3), (3, 4), (4, 5), (5, 1)] :param lst: iterable to iterate over :param wrap_over: whether to attach the first element to the pair of the last element instead of None :param skip_last: if this is True, then last element, alongside with a None, won't be output """ iterator = iter(lst) try: first_val = prev_val = next(iterator) except StopIteration: return for val in iterator: yield prev_val, val prev_val = val if wrap_over: yield prev_val, first_val else: if not skip_last: yield prev_val, None def half_cartesian(seq: tp.Iterable[T], include_same_pairs: bool = True) -> tp.Iterator[tp.Tuple[T, T]]: """ Generate half of the Cartesian product of both sequences. Useful when you have a commutative operation that you'd like to execute on both elements (eg. checking for collisions). Example: >>> list(half_cartesian([1, 2, 3], [1, 2, 3])) == \ >>> [(1, 1), (1, 2), (1, 3), (2, 2), (2, 3), (3, 3)] :param seq: The sequence :param include_same_pairs: if True, then pairs returning two of the same objects will be returned. For example, if False, the following will be true: >>> list(half_cartesian([1, 2, 3], [1, 2, 3], include_same_pairs=False)) == \ >>> [(1, 2), (1, 3), (2, 3)] """ for i, elem1 in enumerate(seq): for j, elem2 in enumerate(seq): if include_same_pairs: if j >= i: yield elem1, elem2 else: if j > i: yield elem1, elem2 def group_quantity(length: int, seq: IteratorOrIterable) -> tp.Iterator[tp.List[T]]: """ Slice an iterable into lists containing at most len entries. Eg. >>> assert list(group_quantity(3, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])) == [[1, 2, 3], [4, 5, 6], >>> [7, 8, 9], [10]] This correctly detects sequences, and uses an optimized variant via slicing if a sequence is passed. You can safely pass ranges :param length: p_len for the returning sequences :param seq: sequence to split """ if isinstance(seq, tp.Sequence) and not isinstance(seq, range): i = 0 while i < len(seq): yield seq[i:i + length] i += length else: entries = [] for elem in seq: if len(entries) == length: yield entries entries = [elem] else: entries.append(elem) if entries: yield entries def filter_out_nones(y: tp.Sequence[T]) -> tp.List[T]: """ Return all elements, as a list, that are not None :param y: a sequence of items :return: a list of all subelements, in order, that are not None """ output = [] for item in y: if item is not None: output.append(item) return output def filter_out_false(y: tp.Sequence[T]) -> tp.List[T]: """ Return all elements, as a list, that are True :param y: a sequence of items :return: a list of all subelements, in order, that are not None """ output = [] for item in y: if item: output.append(item) return output @rethrow_as(IndexError, ValueError) def index_of_max(seq: tp.Sequence[T]) -> int: """ Return the index of the maximum element :param seq: sequence to examine :return: index of the maximum element :raise ValueError: sequence was empty """ max_index = 0 max_elem = seq[0] for i, elem in enumerate(seq): if elem > max_elem: max_index = i max_elem = elem return max_index def index_of(predicate: tp.Callable[[T], bool], seq: tp.Sequence[T]) -> int: """ Return an index of first met element that calling predicate on it returns True :param predicate: predicate to apply :param seq: sequence to examine :return: index of the element :raises ValueError: if no element found """ i = 0 for elem in seq: if predicate(elem): return i i +=1 raise ValueError('Element not found') class Multirun: """ A class to launch the same operation on the entire sequence. Consider: >>> class Counter: >>> def __init__(self, value=0): >>> self.count = value >>> def add(self, v): >>> self.count += 1 >>> def __eq__(self, other): >>> return self.count == other.count >>> def __iadd__(self, other): >>> self.add(other) >>> a = [Counter(), Counter()] The following: >>> for b in a: >>> b.add(2) Can be replaced with >>> Multirun(a).add(2) And the following: >>> for b in a: >>> b += 3 With this >>> b = Mulirun(a) >>> b += 3 Furthermore note that: >>> Multirun(a).add(2) == [Counter(2), Counter(2)] :param sequence: sequence to execute these operations for :param dont_return_list: the operation won't return a list if this is True """ __slots__ = ('sequence', 'dont_return_list') def __bool__(self) -> bool: return bool(self.sequence) def __init__(self, sequence: tp.Iterable, dont_return_list: bool = False): self.sequence = sequence self.dont_return_list = dont_return_list def __iter__(self): return iter(self.sequence) def __getattr__(self, item): def inner(*args, **kwargs): if not self.dont_return_list: results = [] for element in self: getattr(element, item)(*args, **kwargs) results.append(element) return results else: for element in self: getattr(element, item)(*args, **kwargs) # Take care: the array might just be empty... try: fun = getattr(n_th(self), item) inner = wraps(fun)(inner) except IndexError: pass return inner def __iadd__(self, other): for element in self: element += other return self def __isub__(self, other): for element in self: element -= other return self def __imul__(self, other): for element in self: element *= other return self def __itruediv__(self, other): for element in self: element /= other return self def __ifloordiv__(self, other): for element in self: element //= other return self def __ilshift__(self, other): for element in self: element <<= other return self def __irshift__(self, other): for element in self: element >>= other return self def __ipow__(self, other): for element in self: element **= other return self import copyimport typing as tp from satella.coding.decorators.decorators import wrapsfrom .iterators import n_thfrom satella.coding.recast_exceptions import rethrow_asfrom ..typing import T, Iteratable def infinite_iterator(returns: tp.Optional[T] = None, return_factory: tp.Optional[tp.Callable[[], T]] = None) -> tp.Iterator[T]: """ Return an infinite number of objects. :param returns: object to return. Note that this will be this very object, it will not be copied. :param return_factory: a callable that takes 0 args and returns an element to return. :return: an infinite iterator of provided values """ while True: if returns is None: if return_factory is None: yield None else: yield return_factory() else: yield returns def make_list(element: T, n: int, deep_copy: bool = False) -> tp.List[T]: """ Make a list consisting of n times element. Element will be copied via copy.copy before adding to list. :param element: element :param n: times to repeat the element :param deep_copy: whether to use copy.deepcopy instead of copy.copy :return: list of length n """ output = [] if deep_copy: copy_op = copy.deepcopy else: copy_op = copy.copy for _ in range(n): output.append(copy_op(element)) return output # shamelessly copied from # https://medium.com/better-programming/is-this-the-last-element-of-my-python-for-loop-784f5ff90bb5 def is_last(lst: Iteratable) -> tp.Iterator[tp.Tuple[bool, T]]: """ Return every element of the list, alongside a flag telling is this the last element. Use like: >>> for is_last, element in is_last(my_list): >>> if is_last: >>> ... :param lst: list to iterate thru :return: a p_gen returning (bool, T) Note that this returns a nice, O(1) iterator. """ iterable = iter(lst) ret_var = next(iterable) for val in iterable: yield False, ret_var ret_var = val yield True, ret_var def add_next(lst: Iteratable, wrap_over: bool = False, skip_last: bool = False) -> tp.Iterator[tp.Tuple[T, tp.Optional[T]]]: """ Yields a 2-tuple of given iterable, presenting the next element as second element of the tuple. The last element will be the last element alongside with a None, if wrap_over is False, or the first element if wrap_over was True Example: >>> list(add_next([1, 2, 3, 4, 5])) == [(1, 2), (2, 3), (3, 4), (4, 5), (5, None)] >>> list(add_next([1, 2, 3, 4, 5], True)) == [(1, 2), (2, 3), (3, 4), (4, 5), (5, 1)] :param lst: iterable to iterate over :param wrap_over: whether to attach the first element to the pair of the last element instead of None :param skip_last: if this is True, then last element, alongside with a None, won't be output """ iterator = iter(lst) try: first_val = prev_val = next(iterator) except StopIteration: return for val in iterator: yield prev_val, val prev_val = val if wrap_over: yield prev_val, first_val else: if not skip_last: yield prev_val, None def half_cartesian(seq: tp.Iterable[T], include_same_pairs: bool = True) -> tp.Iterator[tp.Tuple[T, T]]: """ Generate half of the Cartesian product of both sequences. Useful when you have a commutative operation that you'd like to execute on both elements (eg. checking for collisions). Example: >>> list(half_cartesian([1, 2, 3], [1, 2, 3])) == \ >>> [(1, 1), (1, 2), (1, 3), (2, 2), (2, 3), (3, 3)] :param seq: The sequence :param include_same_pairs: if True, then pairs returning two of the same objects will be returned. For example, if False, the following will be true: >>> list(half_cartesian([1, 2, 3], [1, 2, 3], include_same_pairs=False)) == \ >>> [(1, 2), (1, 3), (2, 3)] """ for i, elem1 in enumerate(seq): for j, elem2 in enumerate(seq): if include_same_pairs: if j >= i: yield elem1, elem2 else: if j > i: yield elem1, elem2 def group_quantity(length: int, seq: Iteratable) -> tp.Iterator[tp.List[T]]: """ Slice an iterable into lists containing at most len entries. Eg. >>> assert list(group_quantity(3, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])) == [[1, 2, 3], [4, 5, 6], >>> [7, 8, 9], [10]] This correctly detects sequences, and uses an optimized variant via slicing if a sequence is passed. You can safely pass ranges :param length: p_len for the returning sequences :param seq: sequence to split """ if isinstance(seq, tp.Sequence) and not isinstance(seq, range): i = 0 while i < len(seq): yield seq[i:i + length] i += length else: entries = [] for elem in seq: if len(entries) == length: yield entries entries = [elem] else: entries.append(elem) if entries: yield entries def filter_out_nones(y: tp.Sequence[T]) -> tp.List[T]: """ Return all elements, as a list, that are not None :param y: a sequence of items :return: a list of all subelements, in order, that are not None """ output = [] for item in y: if item is not None: output.append(item) return output def filter_out_false(y: tp.Sequence[T]) -> tp.List[T]: """ Return all elements, as a list, that are True :param y: a sequence of items :return: a list of all subelements, in order, that are not None """ output = [] for item in y: if item: output.append(item) return output @rethrow_as(IndexError, ValueError) def index_of_max(seq: tp.Sequence[T]) -> int: """ Return the index of the maximum element :param seq: sequence to examine :return: index of the maximum element :raise ValueError: sequence was empty """ max_index = 0 max_elem = seq[0] for i, elem in enumerate(seq): if elem > max_elem: max_index = i max_elem = elem return max_index def index_of(predicate: tp.Callable[[T], bool], seq: tp.Sequence[T]) -> int: """ Return an index of first met element that calling predicate on it returns True :param predicate: predicate to apply :param seq: sequence to examine :return: index of the element :raises ValueError: if no element found """ i = 0 for elem in seq: if predicate(elem): return i i +=1 raise ValueError('Element not found') class Multirun: """ A class to launch the same operation on the entire sequence. Consider: >>> class Counter: >>> def __init__(self, value=0): >>> self.count = value >>> def add(self, v): >>> self.count += 1 >>> def __eq__(self, other): >>> return self.count == other.count >>> def __iadd__(self, other): >>> self.add(other) >>> a = [Counter(), Counter()] The following: >>> for b in a: >>> b.add(2) Can be replaced with >>> Multirun(a).add(2) And the following: >>> for b in a: >>> b += 3 With this >>> b = Mulirun(a) >>> b += 3 Furthermore note that: >>> Multirun(a).add(2) == [Counter(2), Counter(2)] :param sequence: sequence to execute these operations for :param dont_return_list: the operation won't return a list if this is True """ __slots__ = ('sequence', 'dont_return_list') def __bool__(self) -> bool: return bool(self.sequence) def __init__(self, sequence: tp.Iterable, dont_return_list: bool = False): self.sequence = sequence self.dont_return_list = dont_return_list def __iter__(self): return iter(self.sequence) def __getattr__(self, item): def inner(*args, **kwargs): if not self.dont_return_list: results = [] for element in self: getattr(element, item)(*args, **kwargs) results.append(element) return results else: for element in self: getattr(element, item)(*args, **kwargs) # Take care: the array might just be empty... try: fun = getattr(n_th(self), item) inner = wraps(fun)(inner) except IndexError: pass return inner def __iadd__(self, other): for element in self: element += other return self def __isub__(self, other): for element in self: element -= other return self def __imul__(self, other): for element in self: element *= other return self def __itruediv__(self, other): for element in self: element /= other return self def __ifloordiv__(self, other): for element in self: element //= other return self def __ilshift__(self, other): for element in self: element <<= other return self def __irshift__(self, other): for element in self: element >>= other return self def __ipow__(self, other): for element in self: element **= other return self
\ No newline at end of file \ No newline at end of file
......
...@@ -4,10 +4,9 @@ import typing as tp ...@@ -4,10 +4,9 @@ import typing as tp
from concurrent.futures import ThreadPoolExecutor, Executor, Future from concurrent.futures import ThreadPoolExecutor, Executor, Future
from satella.coding.recast_exceptions import silence_excs from satella.coding.recast_exceptions import silence_excs
from satella.coding.typing import K, V
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
K = tp.TypeVar('K')
V = tp.TypeVar('V')
class CacheDict(tp.Mapping[K, V]): class CacheDict(tp.Mapping[K, V]):
......
...@@ -5,7 +5,7 @@ from satella.coding.recast_exceptions import rethrow_as ...@@ -5,7 +5,7 @@ from satella.coding.recast_exceptions import rethrow_as
from satella.configuration.schema import Descriptor, descriptor_from_dict from satella.configuration.schema import Descriptor, descriptor_from_dict
from satella.exceptions import ConfigurationValidationError from satella.exceptions import ConfigurationValidationError
T = tp.TypeVar('T') from satella.coding.typing import T
class DictObject(dict, tp.MutableMapping[tp.Hashable, T]): class DictObject(dict, tp.MutableMapping[tp.Hashable, T]):
......
...@@ -8,9 +8,7 @@ from ..heaps import TimeBasedSetHeap ...@@ -8,9 +8,7 @@ from ..heaps import TimeBasedSetHeap
from ..singleton import Singleton from ..singleton import Singleton
from ...concurrent.monitor import Monitor from ...concurrent.monitor import Monitor
from ...recast_exceptions import rethrow_as, silence_excs from ...recast_exceptions import rethrow_as, silence_excs
from ...typing import K, V
K = tp.TypeVar('K')
V = tp.TypeVar('V')
class Cleanupable(metaclass=ABCMeta): class Cleanupable(metaclass=ABCMeta):
......
import copy import copy
import typing as tp import typing as tp
K = tp.TypeVar('K') from satella.coding.typing import T, K, V
V = tp.TypeVar('V')
T = tp.TypeVar('T')
class DirtyDict(tp.MutableMapping[K, V]): class DirtyDict(tp.MutableMapping[K, V]):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment