From 4983d2b9f5dce8a6e1fd2414f1f88b5794ae1927 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Mon, 4 Mar 2024 09:32:56 +0100 Subject: [PATCH] fix SyncableDroppable --- CHANGELOG.md | 1 + .../coding/structures/syncable_droppable.py | 113 +++++++++--------- 2 files changed, 60 insertions(+), 54 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fe08aa85..516c3e9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ The software * added satella.instrumentation.cpu_time.get_own_cpu_time() * better documentation for satella.time.sleep() * fixed behaviour of PIDFileLock +* multiple minor code changes to improve pylint statistics Build process ------------- diff --git a/satella/coding/structures/syncable_droppable.py b/satella/coding/structures/syncable_droppable.py index df624158..9c05d684 100644 --- a/satella/coding/structures/syncable_droppable.py +++ b/satella/coding/structures/syncable_droppable.py @@ -5,7 +5,6 @@ from abc import ABCMeta, abstractmethod import typing as tp from satella.coding.concurrent.monitor import RMonitor -from satella.coding.recast_exceptions import silence_excs from satella.coding.sequences import try_close from satella.coding.typing import V, K, KVTuple @@ -310,6 +309,60 @@ class SyncableDroppable(RMonitor, tp.Generic[K, V]): if self.start_entry is None: self.start_entry = key + def _assert_correctness(self, maximum_entries): + if not self.start_entry: + raise ValueError('Nothing to synchronize!') + if self.synced_up_to == self.data_in_memory[-1][0]: + raise ValueError('Nothing to synchronize!') + + def _on_sync_request_synced_up_none(self, maximum_entries): + # Sync everything + iterator = self.db_storage.iterate(None) + try: + data = list(iterator) + if len(data) < maximum_entries: + entries_left = maximum_entries - len(data) + if entries_left == math.inf: + data = itertools.chain(data, self.data_in_memory) + else: + data = itertools.chain(data, self.data_in_memory[:entries_left]) + return data + finally: + try_close(iterator) + + def _on_sync_request_synced_up_is_not_none(self, maximum_entries): + if self.first_key_in_memory <= self.synced_up_to: + # Means we have to sync from memory + if self.synced_up_to is None: + v = self.data_in_memory + else: + index = bisect.bisect_right([y[0] for y in self.data_in_memory], + self.synced_up_to) + if maximum_entries == math.inf: + v = self.data_in_memory[index:] + else: + v = self.data_in_memory[index:index + maximum_entries] + else: + # We have to start off the disk + data = [] + iterator = self.db_storage.iterate(self.start_entry) + try: + while len(data) < maximum_entries: + try: + data.append(next(iterator)) + except StopIteration: + for index, tpl in enumerate(self.data_in_memory): + if len(data) >= maximum_entries: + break + if self.synced_up_to is not None: + if tpl[0] > self.synced_up_to: + break + v = itertools.chain(data, self.data_in_memory[:index]) + break + finally: + try_close(iterator) + return v + def on_sync_request(self, maximum_entries: tp.Optional[int] = math.inf) -> tp.Iterator[KVTuple]: """ Return an iterator that will provide the source of the data for synchronization. @@ -321,57 +374,12 @@ class SyncableDroppable(RMonitor, tp.Generic[K, V]): :return: an iterator of (KVTuple) that should be synchronized against the server :raise ValueError: nothing to synchronize! """ - if not self.start_entry: - raise ValueError('Nothing to synchronize!') - if self.synced_up_to == self.data_in_memory[-1][0]: - raise ValueError('Nothing to synchronize!') + self._assert_correctness(maximum_entries) + if self.synced_up_to is None: - # Sync everything - iterator = self.db_storage.iterate(None) - try: - data = list(iterator) - if len(data) < maximum_entries: - entries_left = maximum_entries - len(data) - if entries_left == math.inf: - data = itertools.chain(data, self.data_in_memory) - else: - data = itertools.chain(data, self.data_in_memory[:entries_left]) - v = data - finally: - try_close(iterator) + v = self._on_sync_request_synced_up_none(maximum_entries) else: - if self.first_key_in_memory <= self.synced_up_to: - # Means we have to sync from memory - if self.synced_up_to is None: - v = self.data_in_memory - else: - index = bisect.bisect_right([y[0] for y in self.data_in_memory], - self.synced_up_to) - if maximum_entries == math.inf: - v = self.data_in_memory[index:] - else: - v = self.data_in_memory[index:index + maximum_entries] - else: - # We have to start off the disk - data = [] - iterator = self.db_storage.iterate(self.start_entry) - try: - while len(data) < maximum_entries: - try: - data.append(next(iterator)) - except StopIteration: - for index, tpl in enumerate(self.data_in_memory): - if len(data) >= maximum_entries: - break - if self.synced_up_to is not None: - if tpl[0] > self.synced_up_to: - break - v = itertools.chain(data, self.data_in_memory[:index]) - break - else: - v = data - finally: - try_close(iterator) + v = self._on_sync_request_synced_up_is_not_none(maximum_entries) return v def on_synced_up_to(self, key: K) -> None: @@ -387,7 +395,4 @@ class SyncableDroppable(RMonitor, tp.Generic[K, V]): """ :return: key of the first element stored in memory """ - if not self.data_in_memory: - return None - else: - return self.data_in_memory[0][0] + return None if not self.data_in_memory else self.data_in_memory[0][0] -- GitLab