Skip to content
Snippets Groups Projects
Commit 3061138e authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

add merge_list, v2.14.39

parent c236fa62
No related branches found
Tags v2.14.39
No related merge requests found
# v2.14.39
* added `merge_list`
__version__ = '2.14.39a1'
__version__ = '2.14.39'
......@@ -6,6 +6,7 @@ import typing as tp
from satella.coding.decorators import for_argument
from .jsonify import jsonify
from .merger import merge_series
from .merge_list import merge_list
from .percentile import percentile
from .base64 import b64encode
from .interpol import linear_interpolate
......@@ -13,7 +14,8 @@ from .words import hashables_to_int
__all__ = ['stringify', 'split_shuffle_and_join', 'one_tuple', 'none_if_false',
'merge_series', 'pad_to_multiple_of_length', 'clip', 'hashables_to_int',
'jsonify', 'intify', 'percentile', 'b64encode', 'linear_interpolate']
'jsonify', 'intify', 'percentile', 'b64encode', 'linear_interpolate',
'merge_list']
from satella.coding.typing import T, NoArgCallable, Appendable, Number, Predicate
......
from satella.coding.structures import Heap
from satella.coding.typing import K, V
import typing as tp
class merge_list(tp.Iterator[tp.Tuple[K, V]]):
"""
Merge two sorted lists.
This is an iterator which consumes elements as they are required.
Each list must be of type tuple/2 with the first element being the key.
The list has to be sorted by this value, ascending.
When the algorithm encounters two identical keys, it calls merge_function on it's
result and inserts the result.
:param lists: lists to sort
:param merge_function: a callable that accepts two pieces of the tuple and returns a result
:return: an resulting iterator
"""
__slots__ = ('merge_func', 'its', 'heap', 'available_lists',
'k', 'v', 'i', 'closed')
def __iter__(self):
return self
def __init__(self, *lists: tp.Iterator[tp.Tuple[K, V]],
merge_function: tp.Callable[[V, V], V]):
self.its = [iter(y) for y in lists]
self.merge_func = merge_function
self.available_lists = set()
self.heap = Heap()
self.closed = False
for i, it in enumerate(self.its):
try:
self.heap.push((*next(it), i))
self.available_lists.add(i)
except StopIteration:
pass
try:
self.k, self.v, self.i = self.pop()
except IndexError:
self.closed = True
def __next__(self):
if self.closed:
raise StopIteration()
try:
k2, v2, i2 = self.pop()
except IndexError:
self.closed = True
return self.k, self.v
if k2 == self.k:
self.v = self.merge_func(self.v, v2)
return next(self)
try:
return self.k, self.v
finally:
self.k, self.v, self.i = k2, v2, i2
def pop(self):
k, v, i = self.heap.pop()
if i in self.available_lists:
try:
k2, v2 = next(self.its[i])
self.heap.push((k2, v2, i))
except StopIteration:
self.available_lists.remove(i)
return k, v, i
import enum
import operator
import unittest
import base64
from satella.coding.transforms import stringify, split_shuffle_and_join, one_tuple, \
merge_series, pad_to_multiple_of_length, clip, b64encode, linear_interpolate, \
hashables_to_int, none_if_false
hashables_to_int, none_if_false, merge_list
class TestTransforms(unittest.TestCase):
def test_merge_list(self):
a = [(1, 1), (2, 2), (3, 3)]
b = [(1, 2), (3, 2), (4, 4)]
c = merge_list(a, b, merge_function=operator.add)
self.assertEqual(list(c), [(1, 3), (2, 2), (3, 5), (4, 4)])
a = [(1, 1), (1, 3), (2, 2), (3, 3)]
b = [(1, 2), (3, 2), (4, 4)]
c = merge_list(a, b, merge_function=operator.add)
self.assertEqual(list(c), [(1, 6), (2, 2), (3, 5), (4, 4)])
a = []
b = []
c = merge_list(a, b, merge_function=operator.add)
self.assertEqual(list(c), [])
def test_none_if_false(self):
self.assertEqual(none_if_false(1), 1)
self.assertEqual(none_if_false(''), None)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment