Skip to content
Snippets Groups Projects
Commit 4983d2b9 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

fix SyncableDroppable

parent 2ad63a45
No related branches found
No related tags found
No related merge requests found
......@@ -11,6 +11,7 @@ The software
* added satella.instrumentation.cpu_time.get_own_cpu_time()
* better documentation for satella.time.sleep()
* fixed behaviour of PIDFileLock
* multiple minor code changes to improve pylint statistics
Build process
-------------
......
......@@ -5,7 +5,6 @@ from abc import ABCMeta, abstractmethod
import typing as tp
from satella.coding.concurrent.monitor import RMonitor
from satella.coding.recast_exceptions import silence_excs
from satella.coding.sequences import try_close
from satella.coding.typing import V, K, KVTuple
......@@ -310,6 +309,60 @@ class SyncableDroppable(RMonitor, tp.Generic[K, V]):
if self.start_entry is None:
self.start_entry = key
def _assert_correctness(self, maximum_entries):
if not self.start_entry:
raise ValueError('Nothing to synchronize!')
if self.synced_up_to == self.data_in_memory[-1][0]:
raise ValueError('Nothing to synchronize!')
def _on_sync_request_synced_up_none(self, maximum_entries):
# Sync everything
iterator = self.db_storage.iterate(None)
try:
data = list(iterator)
if len(data) < maximum_entries:
entries_left = maximum_entries - len(data)
if entries_left == math.inf:
data = itertools.chain(data, self.data_in_memory)
else:
data = itertools.chain(data, self.data_in_memory[:entries_left])
return data
finally:
try_close(iterator)
def _on_sync_request_synced_up_is_not_none(self, maximum_entries):
if self.first_key_in_memory <= self.synced_up_to:
# Means we have to sync from memory
if self.synced_up_to is None:
v = self.data_in_memory
else:
index = bisect.bisect_right([y[0] for y in self.data_in_memory],
self.synced_up_to)
if maximum_entries == math.inf:
v = self.data_in_memory[index:]
else:
v = self.data_in_memory[index:index + maximum_entries]
else:
# We have to start off the disk
data = []
iterator = self.db_storage.iterate(self.start_entry)
try:
while len(data) < maximum_entries:
try:
data.append(next(iterator))
except StopIteration:
for index, tpl in enumerate(self.data_in_memory):
if len(data) >= maximum_entries:
break
if self.synced_up_to is not None:
if tpl[0] > self.synced_up_to:
break
v = itertools.chain(data, self.data_in_memory[:index])
break
finally:
try_close(iterator)
return v
def on_sync_request(self, maximum_entries: tp.Optional[int] = math.inf) -> tp.Iterator[KVTuple]:
"""
Return an iterator that will provide the source of the data for synchronization.
......@@ -321,57 +374,12 @@ class SyncableDroppable(RMonitor, tp.Generic[K, V]):
:return: an iterator of (KVTuple) that should be synchronized against the server
:raise ValueError: nothing to synchronize!
"""
if not self.start_entry:
raise ValueError('Nothing to synchronize!')
if self.synced_up_to == self.data_in_memory[-1][0]:
raise ValueError('Nothing to synchronize!')
self._assert_correctness(maximum_entries)
if self.synced_up_to is None:
# Sync everything
iterator = self.db_storage.iterate(None)
try:
data = list(iterator)
if len(data) < maximum_entries:
entries_left = maximum_entries - len(data)
if entries_left == math.inf:
data = itertools.chain(data, self.data_in_memory)
else:
data = itertools.chain(data, self.data_in_memory[:entries_left])
v = data
finally:
try_close(iterator)
v = self._on_sync_request_synced_up_none(maximum_entries)
else:
if self.first_key_in_memory <= self.synced_up_to:
# Means we have to sync from memory
if self.synced_up_to is None:
v = self.data_in_memory
else:
index = bisect.bisect_right([y[0] for y in self.data_in_memory],
self.synced_up_to)
if maximum_entries == math.inf:
v = self.data_in_memory[index:]
else:
v = self.data_in_memory[index:index + maximum_entries]
else:
# We have to start off the disk
data = []
iterator = self.db_storage.iterate(self.start_entry)
try:
while len(data) < maximum_entries:
try:
data.append(next(iterator))
except StopIteration:
for index, tpl in enumerate(self.data_in_memory):
if len(data) >= maximum_entries:
break
if self.synced_up_to is not None:
if tpl[0] > self.synced_up_to:
break
v = itertools.chain(data, self.data_in_memory[:index])
break
else:
v = data
finally:
try_close(iterator)
v = self._on_sync_request_synced_up_is_not_none(maximum_entries)
return v
def on_synced_up_to(self, key: K) -> None:
......@@ -387,7 +395,4 @@ class SyncableDroppable(RMonitor, tp.Generic[K, V]):
"""
:return: key of the first element stored in memory
"""
if not self.data_in_memory:
return None
else:
return self.data_in_memory[0][0]
return None if not self.data_in_memory else self.data_in_memory[0][0]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment