Skip to content
Snippets Groups Projects
Commit 72aeccfe authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

v2.7.44

parent ef15d114
No related branches found
No related tags found
No related merge requests found
# v2.7.44
* add `get_timestamp` to `ExpiringEntryDict`
* added `enumerate`
......@@ -47,6 +47,11 @@ is_last
.. autofunction:: satella.coding.sequences.is_last
enumerate
---------
.. autofunction:: satella.coding.sequences.enumerate
add_next
--------
......
__version__ = '2.7.44_a2'
__version__ = '2.7.44'
from .choose import choose
from .iterators import infinite_counter, take_n, is_instance, skip_first, zip_shifted, stop_after, \
iter_dict_of_list, shift, other_sequence_no_longer_than, count, even, odd, n_th
iter_dict_of_list, shift, other_sequence_no_longer_than, count, even, odd, n_th, enumerate
from .sequences import is_last, add_next, half_cartesian, group_quantity, Multirun
__all__ = ['choose', 'infinite_counter', 'take_n', 'is_instance', 'is_last', 'add_next',
'half_cartesian', 'skip_first', 'zip_shifted', 'stop_after', 'group_quantity',
'iter_dict_of_list', 'shift', 'other_sequence_no_longer_than', 'count', 'n_th',
'even', 'odd', 'Multirun', 'ListDeleter']
'even', 'odd', 'Multirun', 'enumerate']
......@@ -216,6 +216,33 @@ def n_th(iterator: IteratorOrIterable, n: int = 0) -> T:
raise IndexError('Iterable was too short')
def enumerate(iterator: IteratorOrIterable, start: int = 0) -> tp.Iterator[tp.Tuple]:
"""
An enumerate that talks pretty with lists of tuples. Consider
>>> a = [(1, 2), (3, 4), (5, 6)]
>>> for i, b in enumerate(a):
>>> c, d = b
>>> ...
This function allows you just to write:
>>> for i, c, d in enumerate(a):
>>> ...
Note that elements in your iterable must be either a list of a tuple for that to work,
or need to be able to be coerced to a tuple. Otherwise, TypeError will be thrown.
:raise TypeError: could not coerce the elements in your iterable to a tuple
"""
i = start
for row in iterator:
if isinstance(row, tuple):
yield (i, ) + row
else:
yield (i,) + tuple(row)
i += 1
def take_n(iterator: IteratorOrIterable, n: int, skip: int = 0) -> tp.List[T]:
"""
Take (first) n elements of an iterator, or the entire iterator, whichever comes first
......
import typing as tp __all__ = ['is_last', 'add_next', 'half_cartesian', 'group_quantity'] from satella.coding.decorators import wrapsfrom .iterators import n_th T = tp.TypeVar('T')U = tp.TypeVar('U') # shamelessly copied from https://medium.com/better-programming/is-this-the-last-element-of-my-python-for-loop-784f5ff90bb5 def is_last(lst: tp.Iterable[T]) -> tp.Generator[tp.Tuple[bool, T], None, None]: """ Return every element of the list, alongside a flag telling is this the last element. Use like: >>> for is_last, element in is_last(my_list): >>> if is_last: >>> ... :param lst: list to iterate thru :return: a p_gen returning (bool, T) """ iterable = iter(lst) ret_var = next(iterable) for val in iterable: yield False, ret_var ret_var = val yield True, ret_var def add_next(lst: tp.Iterable[T], wrap_over: bool = False, skip_last: bool = False) -> tp.Iterator[ tp.Tuple[T, tp.Optional[T]]]: """ Yields a 2-tuple of given iterable, presenting the next element as second element of the tuple. The last element will be the last element alongside with a None, if wrap_over is False, or the first element if wrap_over was True Example: >>> list(add_next([1, 2, 3, 4, 5])) == [(1, 2), (2, 3), (3, 4), (4, 5), (5, None)] >>> list(add_next([1, 2, 3, 4, 5], True)) == [(1, 2), (2, 3), (3, 4), (4, 5), (5, 1)] :param lst: iterable to iterate over :param wrap_over: whether to attach the first element to the pair of the last element instead of None :param skip_last: if this is True, then last element, alongside with a None, won't be output """ iterator = iter(lst) try: first_val = prev_val = next(iterator) except StopIteration: return for val in iterator: yield prev_val, val prev_val = val if wrap_over: yield prev_val, first_val else: if not skip_last: yield prev_val, None def half_cartesian(seq1: tp.Iterable[T], include_same_pairs: bool = True) -> tp.Iterator[tp.Tuple[T, T]]: """ Generate half of the Cartesian product of both sequences. Useful when you have a commutative operation that you'd like to execute on both elements (eg. checking for collisions). Example: >>> list(half_cartesian([1, 2, 3], [1, 2, 3])) == \ >>> [(1, 1), (1, 2), (1, 3), (2, 2), (2, 3), (3, 3)] :param seq1: First sequence :param seq2: Second sequence :param include_same_pairs: if True, then pairs returning two of the same objects will be returned. Ie. if False, the following will be true: >>> list(half_cartesian([1, 2, 3], [1, 2, 3], include_same_pairs=False)) == \ >>> [(1, 2), (1, 3), (2, 3)] """ for i, elem1 in enumerate(seq1): for j, elem2 in enumerate(seq1): if include_same_pairs: if j >= i: yield elem1, elem2 else: if j > i: yield elem1, elem2 def group_quantity(length: int, seq: tp.Iterable[T]) -> tp.Generator[tp.List[T], None, None]: """ Slice an iterable into lists containing at most len entries. Eg. >>> assert list(group_quantity(3, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])) == [[1, 2, 3], [4, 5, 6], >>> [7, 8, 9], [10]] :param length: p_len for the returning sequences :param seq: sequence to split """ entries = [] for elem in seq: if len(entries) < length: entries.append(elem) else: yield entries entries = [elem] if entries: yield entries class Multirun: """ A class to launch the same operation on the entire sequence. Consider: >>> class Counter: >>> def __init__(self, value=0): >>> self.count = value >>> def add(self, v): >>> self.count += 1 >>> def __eq__(self, other): >>> return self.count == other.count >>> def __iadd__(self, other): >>> self.add(other) >>> a = [Counter(), Counter()] The following: >>> for b in a: >>> b.add(2) Can be replaced with >>> Multirun(a).add(2) And the following: >>> for b in a: >>> b += 3 With this >>> b = Mulirun(a) >>> b += 3 Furthermore note that: >>> Multirun(a).add(2) == [Counter(2), Counter(2)] :param sequence: sequence to execute these operations for :param dont_return_list: the operation won't return a list if this is True """ __slots__ = ('sequence', 'dont_return_list') def __init__(self, sequence: tp.Iterable, dont_return_list: bool = False): self.sequence = sequence self.dont_return_list = dont_return_list def __iter__(self): return iter(self.sequence) def __getattr__(self, item): def inner(*args, **kwargs): if not self.dont_return_list: results = [] for element in self: getattr(element, item)(*args, **kwargs) if not self.dont_return_list: results.append(element) if not self.dont_return_list: return results # Take care: the array might just be empty... try: fun = getattr(n_th(self), item) inner = wraps(fun)(inner) except IndexError: pass return inner def __iadd__(self, other): for element in self: element += other return self def __isub__(self, other): for element in self: element -= other return self def __imul__(self, other): for element in self: element *= other return self def __itruediv__(self, other): for element in self: element /= other return self def __ifloordiv__(self, other): for element in self: element //= other return self def __ilshift__(self, other): for element in self: element <<= other return self def __irshift__(self, other): for element in self: element >>= other return self def __ipow__(self, other): for element in self: element **= other return self
\ No newline at end of file
import typing as tp from satella.coding.decorators import wrapsfrom .iterators import n_th T = tp.TypeVar('T')U = tp.TypeVar('U') # shamelessly copied from # https://medium.com/better-programming/is-this-the-last-element-of-my-python-for-loop-784f5ff90bb5 def is_last(lst: tp.Iterable[T]) -> tp.Generator[tp.Tuple[bool, T], None, None]: """ Return every element of the list, alongside a flag telling is this the last element. Use like: >>> for is_last, element in is_last(my_list): >>> if is_last: >>> ... :param lst: list to iterate thru :return: a p_gen returning (bool, T) """ iterable = iter(lst) ret_var = next(iterable) for val in iterable: yield False, ret_var ret_var = val yield True, ret_var def add_next(lst: tp.Iterable[T], wrap_over: bool = False, skip_last: bool = False) -> tp.Iterator[ tp.Tuple[T, tp.Optional[T]]]: """ Yields a 2-tuple of given iterable, presenting the next element as second element of the tuple. The last element will be the last element alongside with a None, if wrap_over is False, or the first element if wrap_over was True Example: >>> list(add_next([1, 2, 3, 4, 5])) == [(1, 2), (2, 3), (3, 4), (4, 5), (5, None)] >>> list(add_next([1, 2, 3, 4, 5], True)) == [(1, 2), (2, 3), (3, 4), (4, 5), (5, 1)] :param lst: iterable to iterate over :param wrap_over: whether to attach the first element to the pair of the last element instead of None :param skip_last: if this is True, then last element, alongside with a None, won't be output """ iterator = iter(lst) try: first_val = prev_val = next(iterator) except StopIteration: return for val in iterator: yield prev_val, val prev_val = val if wrap_over: yield prev_val, first_val else: if not skip_last: yield prev_val, None def half_cartesian(seq1: tp.Iterable[T], include_same_pairs: bool = True) -> tp.Iterator[tp.Tuple[T, T]]: """ Generate half of the Cartesian product of both sequences. Useful when you have a commutative operation that you'd like to execute on both elements (eg. checking for collisions). Example: >>> list(half_cartesian([1, 2, 3], [1, 2, 3])) == \ >>> [(1, 1), (1, 2), (1, 3), (2, 2), (2, 3), (3, 3)] :param seq1: First sequence :param seq2: Second sequence :param include_same_pairs: if True, then pairs returning two of the same objects will be returned. Ie. if False, the following will be true: >>> list(half_cartesian([1, 2, 3], [1, 2, 3], include_same_pairs=False)) == \ >>> [(1, 2), (1, 3), (2, 3)] """ for i, elem1 in enumerate(seq1): for j, elem2 in enumerate(seq1): if include_same_pairs: if j >= i: yield elem1, elem2 else: if j > i: yield elem1, elem2 def group_quantity(length: int, seq: tp.Iterable[T]) -> tp.Generator[tp.List[T], None, None]: """ Slice an iterable into lists containing at most len entries. Eg. >>> assert list(group_quantity(3, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])) == [[1, 2, 3], [4, 5, 6], >>> [7, 8, 9], [10]] :param length: p_len for the returning sequences :param seq: sequence to split """ entries = [] for elem in seq: if len(entries) < length: entries.append(elem) else: yield entries entries = [elem] if entries: yield entries class Multirun: """ A class to launch the same operation on the entire sequence. Consider: >>> class Counter: >>> def __init__(self, value=0): >>> self.count = value >>> def add(self, v): >>> self.count += 1 >>> def __eq__(self, other): >>> return self.count == other.count >>> def __iadd__(self, other): >>> self.add(other) >>> a = [Counter(), Counter()] The following: >>> for b in a: >>> b.add(2) Can be replaced with >>> Multirun(a).add(2) And the following: >>> for b in a: >>> b += 3 With this >>> b = Mulirun(a) >>> b += 3 Furthermore note that: >>> Multirun(a).add(2) == [Counter(2), Counter(2)] :param sequence: sequence to execute these operations for :param dont_return_list: the operation won't return a list if this is True """ __slots__ = ('sequence', 'dont_return_list') def __init__(self, sequence: tp.Iterable, dont_return_list: bool = False): self.sequence = sequence self.dont_return_list = dont_return_list def __iter__(self): return iter(self.sequence) def __getattr__(self, item): def inner(*args, **kwargs): if not self.dont_return_list: results = [] for element in self: getattr(element, item)(*args, **kwargs) if not self.dont_return_list: results.append(element) if not self.dont_return_list: return results # Take care: the array might just be empty... try: fun = getattr(n_th(self), item) inner = wraps(fun)(inner) except IndexError: pass return inner def __iadd__(self, other): for element in self: element += other return self def __isub__(self, other): for element in self: element -= other return self def __imul__(self, other): for element in self: element *= other return self def __itruediv__(self, other): for element in self: element /= other return self def __ifloordiv__(self, other): for element in self: element //= other return self def __ilshift__(self, other): for element in self: element <<= other return self def __irshift__(self, other): for element in self: element >>= other return self def __ipow__(self, other): for element in self: element **= other return self
\ No newline at end of file
......
......@@ -134,6 +134,8 @@ class SetHeap(Heap):
"""
A heap with additional invariant that no two elements are the same.
Note that elements you insert in this must be eq-able and hashable, ie. you can put them in a dict.
Optimized for fast insertions and fast __contains__
#notthreadsafe
......@@ -283,6 +285,8 @@ class TimeBasedSetHeap(Heap):
"""
A heap of items sorted by timestamps, with such invariant that every item can appear at most once.
Note that elements you insert in this must be eq-able and hashable, ie. you can put them in a dict.
It is easy to ask for items, whose timestamps are LOWER than a value, and
easy to remove them.
......
......@@ -2,9 +2,15 @@ import sys
import unittest
from satella.coding import SelfClosingGenerator, hint_with_length
from satella.coding.sequences import enumerate
class TestIterators(unittest.TestCase):
def test_enumerate(self):
a = [(1, 2), (3, 4), (5, 6)]
b = list(enumerate(a))
self.assertEqual([(0, 1, 2), (1, 3, 4), (2, 5, 6)], b)
def test_hint_with_length(self):
def generator():
yield from range(1000)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment