Skip to content
Snippets Groups Projects
Commit 1268b34d authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

add MetrifiedExclusiveWritebackCache

parent 1e6b741e
No related branches found
Tags v2.13.4
No related merge requests found
# v2.13.4
* added `MetrifiedExclusiveWritebackCache`
......@@ -245,3 +245,5 @@ Here go they:
.. autoclass:: satella.instrumentation.metrics.structures.MetrifiedLRUCacheDict
.. autoclass:: satella.instrumentation.metrics.structures.MetrifiedExclusiveWritebackCache
__version__ = '2.13.4_a1'
__version__ = '2.13.4'
......@@ -43,7 +43,6 @@ class CacheDict(tp.Mapping[K, V]):
that will be given to user instead of throwing KeyError. If not given (default),
KeyError will be thrown
"""
def __len__(self) -> int:
return len(self.data)
......
......@@ -26,8 +26,7 @@ class ExclusiveWritebackCache(tp.Generic[K, V]):
"""
__slots__ = ('executor', 'read_method', 'write_method', 'delete_method',
'no_concurrent_executors', 'in_cache', 'cache_lock',
'cache', 'operations', '_get_queue_length',
'store_key_errors')
'cache', 'operations', 'store_key_errors')
def __init__(self, write_method: tp.Callable[[K, V], None],
read_method: tp.Callable[[K], V],
......@@ -50,26 +49,26 @@ class ExclusiveWritebackCache(tp.Generic[K, V]):
self.cache = {}
self.operations = 0
def get_queue_length(self) -> int:
"""
Return current amount of entries waiting for writeback
"""
if isinstance(self.executor, ThreadPoolExecutor):
def get_queue_length():
# noinspection PyProtectedMember
return self.executor._work_queue.qsize()
# noinspection PyProtectedMember
return self.executor._work_queue.qsize()
elif isinstance(self.executor, ProcessPoolExecutor):
def get_queue_length():
# noinspection PyProtectedMember
return self.executor._call_queue.qsize()
# noinspection PyProtectedMember
return self.executor._call_queue.qsize()
else:
def get_queue_length():
return 0
self._get_queue_length = get_queue_length
return 0
def sync(self):
def sync(self) -> None:
"""
Wait until current tasks are complete.
Note that this guarantees nothing, and is best-effort only
"""
while self._get_queue_length() > 0:
while self.get_queue_length() > 0:
time.sleep(0.1)
def fix():
......
from .cache_dict import MetrifiedCacheDict, MetrifiedLRUCacheDict
from .cache_dict import MetrifiedCacheDict, MetrifiedLRUCacheDict, MetrifiedExclusiveWritebackCache
from .threadpool import MetrifiedThreadPoolExecutor
__all__ = ['MetrifiedCacheDict', 'MetrifiedThreadPoolExecutor', 'MetrifiedLRUCacheDict']
__all__ = ['MetrifiedCacheDict', 'MetrifiedThreadPoolExecutor', 'MetrifiedLRUCacheDict',
'MetrifiedExclusiveWritebackCache']
......@@ -2,8 +2,10 @@ import logging
import time
import typing as tp
from satella.coding.structures import CacheDict, LRUCacheDict
from satella.coding.structures import CacheDict, LRUCacheDict, ExclusiveWritebackCache
from .. import Metric
from ..metric_types.callable import CallableMetric
from ..metric_types.counter import CounterMetric
from ..metric_types.measurable_mixin import MeasurableMixin
......@@ -24,9 +26,9 @@ class MetrifiedCacheDict(CacheDict):
value_getter_executor=None, cache_failures_interval=None,
time_getter=time.monotonic,
default_value_factory=None,
cache_hits: tp.Optional[Metric] = None,
cache_miss: tp.Optional[Metric] = None,
refreshes: tp.Optional[Metric] = None,
cache_hits: tp.Optional[CounterMetric] = None,
cache_miss: tp.Optional[CounterMetric] = None,
refreshes: tp.Optional[CounterMetric] = None,
how_long_refresh_takes: tp.Optional[MeasurableMixin] = None):
if refreshes:
old_value_getter = value_getter
......@@ -119,3 +121,28 @@ class MetrifiedLRUCacheDict(LRUCacheDict):
if self.cache_miss:
self.cache_miss.runtime(+1)
return super().__getitem__(item)
class MetrifiedExclusiveWritebackCache(ExclusiveWritebackCache):
__slots__ = ('cache_miss', 'cache_hits')
def __init__(self, *args,
cache_hits: tp.Optional[CounterMetric] = None,
cache_miss: tp.Optional[CounterMetric] = None,
entries_waiting: tp.Optional[CallableMetric] = None,
**kwargs):
super().__init__(*args,**kwargs)
self.cache_miss = cache_miss
self.cache_hits = cache_hits
if entries_waiting is not None:
entries_waiting.callable = self.get_queue_length()
def __getitem__(self, item):
if item in self.in_cache:
if self.cache_hits:
self.cache_hits.runtime(+1)
else:
if self.cache_miss:
self.cache_miss.runtime(+1)
return super().__getitem__(item)
......@@ -5,7 +5,7 @@ from satella.instrumentation.metrics import getMetric
import time
from satella.instrumentation.metrics.structures import MetrifiedThreadPoolExecutor, \
MetrifiedCacheDict, MetrifiedLRUCacheDict
MetrifiedCacheDict, MetrifiedLRUCacheDict, MetrifiedExclusiveWritebackCache
from .test_metrics import choose
......@@ -14,6 +14,49 @@ def wait():
class TestThreadPoolExecutor(unittest.TestCase):
def test_exclusive_writeback_cache(self):
cache_hits = getMetric('wbc.cachedict.hits', 'counter')
cache_miss = getMetric('wbc.cachedict.miss', 'counter')
entries_waiting = getMetric('wbc.cachedict.waiting', 'callable')
a = {5: 3, 4: 2, 1: 0}
b = {'no_calls': 0}
def setitem(k, v):
nonlocal a, b
b['no_calls'] += 1
a[k] = v
def getitem(k):
nonlocal a, b
b['no_calls'] += 1
return a[k]
def delitem(k):
nonlocal a, b
b['no_calls'] += 1
del a[k]
wbc = MetrifiedExclusiveWritebackCache(setitem, getitem, delitem,
cache_hits=cache_hits,
cache_miss=cache_miss,
entries_waiting=entries_waiting)
self.assertEqual(wbc[5], 3)
self.assertEqual(b['no_calls'], 1)
self.assertEqual(n_th(entries_waiting.to_metric_data().values).value, 0)
self.assertRaises(KeyError, lambda: wbc[-1])
self.assertEqual(b['no_calls'], 2)
self.assertEqual(wbc[5], 3)
self.assertEqual(b['no_calls'], 2)
self.assertEqual(n_th(cache_miss.to_metric_data().values).value, 2)
self.assertEqual(n_th(cache_hits.to_metric_data().values).value, 1)
wbc[5] = 2
wbc.sync()
self.assertEqual(a[5], 2)
self.assertEqual(b['no_calls'], 3)
del wbc[4]
wbc.sync()
self.assertRaises(KeyError, lambda: a[4])
def test_metrified_cache_dict(self):
cache_hits = getMetric('cachedict.hits', 'counter')
cache_miss = getMetric('cachedict.miss', 'counter')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment