Skip to content
Snippets Groups Projects
Commit d19e6d29 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

v2.8.10

parent 7d0de335
No related branches found
No related tags found
No related merge requests found
# v2.8.10
* improvements to CacheDict
__version__ = '2.8.10_a1'
__version__ = '2.8.10'
......@@ -2,6 +2,8 @@ import time
import typing as tp
from concurrent.futures import ThreadPoolExecutor, Executor, Future
from satella.coding.recast_exceptions import silence_excs
K, V = tp.TypeVar('K'), tp.TypeVar('V')
......@@ -19,12 +21,16 @@ class CacheDict(tp.Mapping[K, V]):
If an expired value is read, it will block until the result is available.
Else, the value is served straight from fast memory.
Note that value_getter raising KeyError is not cached, so don't use this
cache for situations where misses are frequent.
:param stale_interval: time in seconds after which an entry will be stale, ie.
it will be served from cache, but a task will be launched in background to
refresh it
:param expiration_interval: time in seconds after which an entry will be ejected
from dict, and further calls to get it will block until the entry is available
:param value_getter: a callable that accepts a key, and returns a value for given entry
:param value_getter: a callable that accepts a key, and returns a value for given entry.
If value_getter raises KeyError, then given entry will be evicted from the cache
:param value_getter_executor: an executor to execute the value_getter function in background.
If None is passed, a ThreadPoolExecutor will be used with max_workers of 4.
"""
......@@ -47,15 +53,19 @@ class CacheDict(tp.Mapping[K, V]):
self.value_getter_executor = value_getter_executor
self.data = {} # type: tp.Dict[K, V]
self.timestamp_data = {} # type: tp.Dict[K, float]
self.missed_cache = {} # type: tp.Dict[K, bool]
def get_value_block(self, key: K) -> V:
"""
Get a value using value_getter. Block until it's available. Store it into the cache.
"""
future = self.value_getter_executor.submit(self.value_getter, key)
value = future.result()
self.data[key] = value
self.timestamp_data[key] = time.monotonic()
try:
value = future.result()
except KeyError:
self.try_delete(key)
raise
self[key] = value
return value
def schedule_a_fetch(self, key: K) -> None:
......@@ -65,11 +75,27 @@ class CacheDict(tp.Mapping[K, V]):
future = self.value_getter_executor.submit(self.value_getter, key)
def on_done_callback(fut: Future) -> None:
result = fut.result()
try:
result = fut.result()
except KeyError:
self.try_delete(key)
self[key] = result
future.add_done_callback(on_done_callback)
@silence_excs(KeyError)
def try_delete(self, key: K) -> None:
"""
Syntactic sugar for
>>> try:
>>> del self[key]
>>> except KeyError:
>>> pass
"""
del self[key]
def __getitem__(self, item: K) -> V:
if item not in self.data:
return self.get_value_block(item)
......
......@@ -25,6 +25,8 @@ class TestMisc(unittest.TestCase):
self.value = 2
def __call__(self, key):
if self.value is None:
raise KeyError('no value available')
self.called_on[key] = time.monotonic()
return self.value
......@@ -49,6 +51,15 @@ class TestMisc(unittest.TestCase):
self.assertGreaterEqual(cg.called_on[2], now)
del cd[2]
self.assertEqual(len(cd), 0)
cg.value = None
self.assertRaises(KeyError, lambda: cd[2])
cg.value = 2
self.assertEqual(cd[2], 2)
cg.value = None
time.sleep(1.5)
self.assertEqual(cd[2], 2)
time.sleep(0.6)
self.assertRaises(KeyError, lambda: cd[2])
def test_dictobject_dictobject(self):
a = DictObject(a=5, k=3)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment