Skip to content
Snippets Groups Projects
Commit 3de331c6 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

2.5.7

parent fabbc056
No related branches found
Tags v1.0.3
No related merge requests found
# v2.5.7
* _TBA_
* optimized `Ranking`
* added `SlicableDeque`
# v2.5.6
......
......@@ -135,3 +135,15 @@ Ranking
.. autoclass:: satella.coding.structures.Ranking
:members:
SortedList
==========
.. autoclass:: satella.coding.structures.SortedList
:members:
SliceableDeque
==============
.. autoclass:: satella.coding.structures.SliceableDeque
:members:
__version__ = '2.5.7a1'
__version__ = '2.5.7'
......@@ -5,10 +5,11 @@ from .singleton import Singleton, SingletonWithRegardsTo
from .structures import SetHeap, OmniHashableMixin, TimeBasedHeap, Heap
from .typednamedtuple import typednamedtuple
from .ranking import Ranking
from .sorted_list import SortedList
from .sorted_list import SortedList, SliceableDeque
__all__ = [
'SortedList',
'SliceableDeque',
'Ranking',
'TwoWayDictionary',
'typednamedtuple',
......
import logging
import collections
import typing as tp
from satella.coding.recast_exceptions import rethrow_as
from .sorted_list import SortedList
logger = logging.getLogger(__name__)
......@@ -9,10 +9,8 @@ __all__ = ['Ranking']
T = tp.TypeVar('T')
Id = tp.NewType('Id', int)
# todo optimize this
class Ranking(tp.Generic[T]):
"""
A set of objects with them being ranked by their single property with the assumption that this
......@@ -20,6 +18,9 @@ class Ranking(tp.Generic[T]):
Positions are counted from 0, where 0 has the least key value.
Essentially, this is a SortedList with the option to query at which position can be given
element found.
Example usage:
>>> Entry = collections.namedtuple('Entry', ('key', ))
......@@ -31,53 +32,36 @@ class Ranking(tp.Generic[T]):
>>> assert ranking[-1] == e3 # Get the last element
>>> assert ranking.get_position_of(e1) == 0
"""
__slots__ = ('items', 'key', 'ranking', 'reverse_ranking')
__slots__ = ('items', 'key', 'ranking', 'element_to_position')
def __init__(self, items: tp.List[T], key: tp.Callable[[T], int]):
self.items = items
self.key = key
self.ranking = {} # tp.Dict[Id, int]
self.reverse_ranking = {} # tp.Dict[int, T]
self.recalculate_ranking()
def recalculate_ranking(self) -> None:
ranking = [(self.key(t), t) for t in self.items] # type: tp.List[tp.Tuple[int, T]]
ranking.sort()
self.ranking = {}
self.reverse_ranking = {}
for position, ranking_row in enumerate(ranking):
t = ranking_row[1]
self.ranking[id(t)] = position
self.reverse_ranking[position] = t
self.ranking = SortedList(items, key=key) # type: SortedList[T]
self.element_to_position = {} # type: tp.Dict[int, int]
for position, item in enumerate(self.ranking):
self.element_to_position[id(item)] = position
def calculate_ranking_for(self, item: T) -> int:
return self.element_to_position[id(item)]
def add(self, item: T) -> None:
"""
Add a single element to the ranking and recalculate it
"""
self.add_many([item])
index = self.ranking.add(item)
for position, item in enumerate(self.ranking[index:], start=index):
self.element_to_position[id(item)] = position
def remove(self, item: T) -> None:
"""
Remove a single element from the ranking and recalculate it
"""
self.remove_many([item])
def add_many(self, items: tp.List[T]) -> None:
"""
Add many elements to the ranking and recalculate it
"""
self.items.extend(items)
self.recalculate_ranking()
index = self.ranking.index(item)
self.ranking.remove(item)
for position, item in enumerate(self.ranking[index:], start=index):
self.element_to_position[id(item)] = position
def remove_many(self, items: tp.List[T]) -> None:
"""
Remove multiple elements from the ranking and recalculate it
"""
for item in items:
self.items.remove(item)
self.recalculate_ranking()
@rethrow_as(KeyError, ValueError)
def get_position_of(self, item: T) -> int:
"""
Return the position in the ranking of element item
......@@ -86,19 +70,19 @@ class Ranking(tp.Generic[T]):
:return: position
:raises ValueError: this element is not in the ranking
"""
return self.ranking[id(item)]
return self.ranking.index(item)
def __getitem__(self, item: int) -> T:
"""
Return n-th item in ranking.
:param item: position in ranking. Can be negative
:param item: position in ranking. Can be negative, or even a slice
"""
return self.reverse_ranking[item % len(self.ranking)]
return self.ranking[item]
def get_sorted(self) -> tp.Iterator[T]:
"""
Return all the elements sorted from the least key value to the highest key value
"""
for i in range(len(self.ranking)):
yield self.reverse_ranking[i]
yield from self.ranking
......@@ -4,6 +4,8 @@ import collections
logger = logging.getLogger(__name__)
__all__ = ['SortedList', 'SliceableDeque']
T = tp.TypeVar('T')
......@@ -22,9 +24,9 @@ class SortedList(tp.Generic[T]):
def __init__(self, items: tp.Iterable[T] = (), key: tp.Callable[[T], int] = lambda a: a):
sort = sorted((key(item), item) for item in items)
self.items = collections.deque(a[1] for a in sort)
self.keys = collections.deque(a[0] for a in sort)
self.key = key
self.items = SliceableDeque(a[1] for a in sort) # type: SliceableDeque[T]
self.keys = collections.deque(a[0] for a in sort) # type: collections.deque[int]
self.key = key # type: tp.Callable[[T], int]
def __contains__(self, item: T) -> bool:
return item in self.items
......@@ -44,7 +46,8 @@ class SortedList(tp.Generic[T]):
for elem in elements:
self.add(elem)
def __getitem__(self, item: int) -> tp.Union[T, tp.List[T]]:
def __getitem__(self, item: tp.Union[slice, int]) -> tp.Union[T, tp.Iterator[T]]:
logger.warning(f'Querying with {item} items is {self.items}')
return self.items[item]
def remove(self, other: T) -> None:
......@@ -77,3 +80,49 @@ class SortedList(tp.Generic[T]):
self.keys.insert(index, key_value)
return index
class SliceableDeque:
"""
A collections.deque that supports slicing
"""
__slots__ = ('deque', )
def __bool__(self) -> bool:
return bool(self.deque)
def __init__(self, *args, **kwargs):
self.deque = collections.deque(*args, **kwargs)
def __setitem__(self, key: int, value: T) -> None:
self.deque[key] = value
def __delitem__(self, key: int) -> None:
del self.deque[key]
def __iter__(self) -> tp.Iterator[T]:
return iter(self.deque)
def __len__(self) -> int:
return len(self.deque)
def __reversed__(self) -> tp.Iterator[T]:
return reversed(self.deque)
def __contains__(self, item: T) -> bool:
return item in self.deque
def __getattr__(self, item: str):
return getattr(self.deque, item)
def __getitem__(self, item) -> tp.Union[tp.Iterator[T], T]:
tot_length = len(self)
if type(item) is slice:
start, stop, step = item.indices(tot_length)
def generator():
for index in range(start, stop, step):
yield self.deque[index]
return generator()
else:
return self.deque[item]
import abc
import copy
import unittest
import collections
import copy
import logging
import unittest
import mock
from satella.coding.structures import TimeBasedHeap, Heap, typednamedtuple, \
OmniHashableMixin, DictObject, apply_dict_object, Immutable, frozendict, SetHeap, \
DictionaryView, HashableWrapper, TwoWayDictionary, Ranking, SortedList
DictionaryView, HashableWrapper, TwoWayDictionary, Ranking, SortedList, SliceableDeque
logger = logging.getLogger(__name__)
class TestMisc(unittest.TestCase):
def test_sliceable_deque(self):
sd = SliceableDeque([1, 2, 3, 4, 5, 6, 7])
self.assertEqual(list(sd[1:-1]), [2, 3, 4, 5, 6])
self.assertEqual(sd.__getitem__(0), 1)
def test_sorted_list(self):
sl = SortedList([3, 2, 1], lambda a: a)
self.assertEqual(sl[0], 1)
......@@ -32,11 +36,11 @@ class TestMisc(unittest.TestCase):
self.assertRaises(ValueError, lambda: TwoWayDictionary([(1, 2), (2, 2)]))
def test_ranking(self):
Entry = collections.namedtuple('Entry', ('a', ))
Entry = collections.namedtuple('Entry', ('a',))
e1 = Entry(1)
e2 = Entry(2)
e3 = Entry(3)
ranking = Ranking([e1, e2], lambda e: e.a)
ranking = Ranking([e1, e2], lambda e: e.a) # type: Ranking[Entry]
self.assertEqual(ranking[0], e1)
self.assertEqual(ranking[1], e2)
......@@ -48,9 +52,12 @@ class TestMisc(unittest.TestCase):
ranking.remove(e1)
self.assertEqual(list(ranking.get_sorted()), [e2, e3])
self.assertEqual(ranking[-1], e3)
e25 = Entry(2.5)
ranking.add(e25)
self.assertEqual(list(ranking.get_sorted()), [e2, e25, e3])
def test_two_way_dict(self):
twd = TwoWayDictionary({1:2, 3:4})
twd = TwoWayDictionary({1: 2, 3: 4})
self.assertEqual(twd.reverse[4], 3)
del twd[1]
self.assertRaises(KeyError, lambda: twd[1])
......@@ -80,8 +87,8 @@ class TestMisc(unittest.TestCase):
self.assertEqual(nw(), 5)
def test_dictionary_view(self):
a = {1:2, 3:4}
b = {4:5, 6:7}
a = {1: 2, 3: 4}
b = {4: 5, 6: 7}
dva = DictionaryView(a, b)
self.assertEqual(dva[1], 2)
......@@ -103,8 +110,6 @@ class TestMisc(unittest.TestCase):
self.assertEqual(len(dvb), 4)
def test_setheap(self):
a = SetHeap([1, 2, 3])
self.assertIn(2, a)
......@@ -237,7 +242,7 @@ class TestDictObject(unittest.TestCase):
self.assertRaises(AttributeError, delete)
def test_copying(self):
a = DictObject({1:2})
a = DictObject({1: 2})
b = copy.copy(a)
self.assertEqual(a, b)
self.assertFalse(id(a) == id(b))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment