Skip to content
Snippets Groups Projects
Commit d6caf9c9 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

hallo heap

parent 03791ddf
No related branches found
No related tags found
No related merge requests found
## v2.0.11
* dodano Heap
## v2.0.10
* bugfix release
......
......@@ -7,6 +7,6 @@ from __future__ import print_function, absolute_import, division
from .typecheck import typed, List, Tuple, Dict, NewType, Callable, Sequence, \
TypeVar, Generic, Mapping, Iterable, Union, Any, Optional, CallSignature
from .structures import TimeBasedHeap, CallableGroup
from .structures import TimeBasedHeap, CallableGroup, Heap
from .monitor import Monitor, RMonitor
from .algos import merge_dicts
......@@ -2,13 +2,19 @@
from __future__ import print_function, absolute_import, division
import six
import logging
import copy
import heapq
from .typecheck import typed, Callable
import functools
from .typecheck import typed, Callable, Iterable
logger = logging.getLogger(__name__)
__all__ = [
'CallableGroup',
'Heap',
'TimeBasedHeap'
]
class CallableGroup(object):
......@@ -81,7 +87,134 @@ class CallableGroup(object):
return results
class TimeBasedHeap(object):
def _extras_to_one(fun):
@functools.wraps(fun)
def inner(self, a, *args):
return fun(self, ((a, ) + args) if len(args) > 0 else a)
return inner
class Heap(object):
"""
Sane heap as object - not like heapq.
Goes from lowest-to-highest (first popped is smallest).
Standard Python comparision rules apply.
Not thread-safe
"""
__slots__ = ('heap', ) # this is rather private, plz
# TODO needs tests
@typed(object, (None, Iterable))
def __init__(self, from_list=None):
if from_list is None:
self.heap = []
else:
self.heap = heapq.heapify(list(from_list))
# TODO needs tests
@_extras_to_one
def push(self, item):
"""
Use it like:
heap.push(3)
or:
heap.push(4, myobject)
"""
heapq.heappush(self.heap, item)
# TODO needs tests
def __deepcopy__(self):
h = Heap()
h.heap = copy.deepcopy(self.heap)
return h
# TODO needs tests
def __copy__(self):
h = Heap()
h.heap = copy.copy(self.heap)
return h
# TODO needs tests
def pop(self):
"""
:raises IndexError: on empty heap
"""
return heapq.heappop(self.heap)
# TODO needs tests
@typed(object, Callable, Callable)
def filtermap(self, filterer=lambda i: True, mapfun=lambda i: i):
"""
Get only items that return True when condition(item) is True. Apply a transform: item' = item(condition) on
the rest. Maintain heap invariant.
"""
self.heap = [mapfun(s) for s in self.heap if filterer(s)]
heapq.heapify(self.heap)
@typed(returns=bool)
def __bool__(self):
"""
Is this empty?
"""
return len(self.heap) > 0
@typed(returns=Iterable)
def iter_ascending(self):
"""
Return an iterator returning all elements in this heap sorted ascending.
State of the heap is not changed
:return: Iterator
"""
cph = self.copy()
while cph:
yield cph.pop()
@typed(object, object, returns=Iterable)
def get_less_than(self, less):
"""
Return all elements less (sharp inequality) than particular value.
This changes state of the heap
:param less: value to compare against
:return: Iterator
"""
while self:
if self.heap[0] < less:
return
yield self.pop()
@typed(returns=Iterable)
def iter_descending(self):
"""
Return an iterator returning all elements in this heap sorted descending.
State of the heap is not changed
:return: Iterator
"""
return reversed(self.iter_ascending())
@typed(returns=six.integer_types)
def __len__(self):
return len(self)
def __str__(self):
return '<satella.coding.Heap: %s elements>' % (len(self.heap, ))
def __unicode__(self):
return six.text_type(str(self))
def __repr__(self):
return u'<satella.coding.Heap>'
def __in__(self, item):
return item in self.heap
class TimeBasedHeap(Heap):
"""
A heap of items sorted by timestamps.
......@@ -98,7 +231,7 @@ class TimeBasedHeap(object):
"""
Initialize an empty heap
"""
self.heap = []
super(TimeBasedHeap, self).__init__()
@typed(None, (float, int), None)
def put(self, timestamp, item):
......@@ -107,7 +240,7 @@ class TimeBasedHeap(object):
:param timestamp: timestamp for this item
:param item: object
"""
heapq.heappush(self.heap, (timestamp, item))
self.push(timestamp, item)
@typed(None, (float, int))
def pop_less_than(self, timestamp):
......@@ -117,17 +250,11 @@ class TimeBasedHeap(object):
Items will be removed from heap
:return: list of tuple(timestamp::float, item)
"""
out = []
while len(self.heap) > 0:
if self.heap[0][0] >= timestamp:
return out
out.append(heapq.heappop(self.heap))
return out
return list(self.get_less_than(timestamp))
def remove(self, item):
"""
Remove all things equal to item
"""
self.heap = [q for q in self.heap if q != item]
heapq.heapify(self.heap)
self.filter(lambda i: i != item)
......@@ -2,7 +2,7 @@
from setuptools import setup, find_packages
setup(name='satella',
version='2.0.10',
version='2.0.11rc1',
description=u'Utilities for writing servers in Python',
author=u'Piotr Maślanka',
author_email='piotrm@smok.co',
......
......@@ -2,7 +2,7 @@
from __future__ import print_function, absolute_import, division
import six
import unittest
from satella.coding import TimeBasedHeap
from satella.coding import TimeBasedHeap, Heap
class TestTimeBasedHeap(unittest.TestCase):
......@@ -19,3 +19,21 @@ class TestTimeBasedHeap(unittest.TestCase):
self.assertIn((10, 'ala'), q)
self.assertIn((20, 'ma'), q)
self.assertNotIn((30, 'kota'), q)
class TestHeap(unittest.TestCase):
def test_tbh(self):
tbh = Heap()
tbh.put((10, 'ala'))
tbh.put(20, 'ma')
self.assertIn((10, 'ala'), tbh)
self.assertIn((20, 'ma'), tbh)
tbh.filtermap(lambda x: x[0] != 20, lambda x: x[0]+10, 'azomg')
self.assertIn((20, 'azomg'), tbh)
self.assertNotIn((10, 'ala'), tbh)
self.assertNotIn((20, 'ma'), tbh)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment