diff --git a/docs/index.rst b/docs/index.rst index d4621d42f2fcf0c67e5541b1f081bc9ac0242cbe..d14fa1b792248dd81c81d12c55b72fd7d138d5e0 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -13,6 +13,7 @@ Welcome to tempsdb's documentation! usage exceptions chunks + memory-pressure-manager It tries to use mmap for reads and writes, and in general is as zero-copy as possible (ie. the only time data is unserialized is when a particular entry is read). diff --git a/docs/memory-pressure-manager.rst b/docs/memory-pressure-manager.rst new file mode 100644 index 0000000000000000000000000000000000000000..07368473a227ca45e8b8d6eb04b4e0a0b7b8c5d8 --- /dev/null +++ b/docs/memory-pressure-manager.rst @@ -0,0 +1,14 @@ +Integration with Satella's MemoryPressureManager +================================================ + +This library integrates itself with satella's MemoryPressureManager_. + +.. _MemoryPressureManager: https://satella.readthedocs.io/en/latest/instrumentation/memory.html + +It will close the non-required chunks when remaining in severity 1 each 30 seconds. + +To attach a MPM to a database, use +:meth:`tempsdb.database.Database.register_memory_pressure_manager`. + +Series will automatically inherit the parent database's `MemoryPressureManager`. + diff --git a/requirements.txt b/requirements.txt index 2fe238cfb6d36a0700aac897d9433cd41260142e..fa7a84806dee5b4115760f6bc21d313b5d4a5353 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -satella +satella>=2.14.21 ujson snakehouse six diff --git a/setup.py b/setup.py index fd459ccef3b67830a4054523199c38fc221613f2..93a82b19baa05c77a0253dc39c6f75265fe58713 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ def find_pyx(*path) -> tp.List[str]: setup(name='tempsdb', version='0.1_a2', packages=['tempsdb'], - install_requires=['satella', 'ujson'], + install_requires=['satella>=2.14.21', 'ujson'], ext_modules=build([Multibuild('tempsdb', find_pyx('tempsdb')), ], compiler_directives={ 'language_level': '3', diff --git a/tempsdb/chunks.pxd b/tempsdb/chunks.pxd index 145996269502726f06c1a76fe6bad93b7e7bf904..2a747569bdfebca475c673441efb10265e89f8f7 100644 --- a/tempsdb/chunks.pxd +++ b/tempsdb/chunks.pxd @@ -28,7 +28,7 @@ cdef class Chunk: cpdef unsigned int find_left(self, unsigned long long timestamp) cpdef unsigned int find_right(self, unsigned long long timestamp) - cdef unsigned long long name(self): + cdef inline unsigned long long name(self): """ :return: the name of this chunk :rtype: int diff --git a/tempsdb/chunks.pyx b/tempsdb/chunks.pyx index 4764a5c86db0316c39a8bbf33a51b38b932b33ec..65eedaab83192a55e8b440e550596b4739feece0 100644 --- a/tempsdb/chunks.pyx +++ b/tempsdb/chunks.pyx @@ -184,7 +184,7 @@ cdef class Chunk: if self.closed: return if self.parent: - with self.parent.fopen_lock: + with self.parent.open_lock: del self.parent.open_chunks[self.min_ts] self.parent = None self.mmap.close() diff --git a/tempsdb/database.pxd b/tempsdb/database.pxd index 3050a8b76d6937f282d4489edd1cf5de87295e86..10209f1bd601b430672577c2113be24290cee9cd 100644 --- a/tempsdb/database.pxd +++ b/tempsdb/database.pxd @@ -6,9 +6,11 @@ cdef class Database: str path bint closed object lock + object mpm cpdef void close(self) cpdef TimeSeries get_series(self, str name) + cpdef void register_memory_pressure_manager(self, object mpm) cpdef Database create_database(str path) diff --git a/tempsdb/database.pyx b/tempsdb/database.pyx index 24705dc211e23bd6fb9d1ff4ab24a1e74924050e..468a4298305f585048ea1ce84832016cceab8224 100644 --- a/tempsdb/database.pyx +++ b/tempsdb/database.pyx @@ -16,6 +16,7 @@ cdef class Database: self.closed = False self.open_series = {} self.lock = threading.Lock() + self.mpm = None cpdef TimeSeries get_series(self, name: str): """ @@ -41,8 +42,24 @@ cdef class Database: if not os.path.isdir(path): raise DoesNotExist('series %s does not exist' % (name, )) self.open_series[name] = result = TimeSeries(path) + if self.mpm is not None: + result.register_memory_pressure_manager(self.mpm) return result + cpdef void register_memory_pressure_manager(self, object mpm): + """ + Register a satella MemoryPressureManager_ to close chunks if low on memory. + + .. _MemoryPressureManager: https://satella.readthedocs.io/en/latest/instrumentation/memory.html + + :param mpm: MemoryPressureManager to use + :type mpm: satella.instrumentation.memory.MemoryPressureManager + """ + self.mpm = mpm + cdef TimeSeries series + for series in self.open_series.values(): + series.register_memory_pressure_manager(mpm) + def __del__(self): self.close() diff --git a/tempsdb/series.pxd b/tempsdb/series.pxd index 66e9084db154e3ed8ebe38da07ec17ca1c55d8f3..ddf8575b6f07f92b641036cf4b6096bf1bc0005b 100644 --- a/tempsdb/series.pxd +++ b/tempsdb/series.pxd @@ -4,7 +4,8 @@ from .chunks cimport Chunk cdef class TimeSeries: cdef: bint closed - object lock, fopen_lock + object lock # lock to hold while writing + object open_lock # lock to hold while opening or closing chunks readonly str path unsigned int max_entries_per_chunk readonly unsigned long long last_entry_synced @@ -14,7 +15,9 @@ cdef class TimeSeries: dict open_chunks list data_in_memory Chunk last_chunk + object mpm # satella.instrumentation.memory.MemoryPressureManager + cpdef void register_memory_pressure_manager(self, object mpm) cpdef int delete(self) except -1 cdef dict _get_metadata(self) cpdef void close(self) @@ -22,5 +25,6 @@ cdef class TimeSeries: cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1 cpdef int append(self, unsigned long long timestamp, bytes data) except -1 cpdef int sync(self) except -1 + cpdef int close_chunks(self) except -1 cpdef TimeSeries create_series(str path, unsigned int block_size, int max_entries_per_chunk) diff --git a/tempsdb/series.pyx b/tempsdb/series.pyx index 135ae6e95f287fca23f006bab1c93e071f74c4cf..b1fa7df635ea2114d9ba5848454d1a65551c0db5 100644 --- a/tempsdb/series.pyx +++ b/tempsdb/series.pyx @@ -21,8 +21,9 @@ cdef class TimeSeries: :ivar path: path to the directory containing the series (str) """ def __init__(self, path: str): + self.mpm = None self.lock = threading.Lock() - self.fopen_lock = threading.Lock() + self.open_lock = threading.Lock() self.closed = False self.path = path @@ -84,7 +85,7 @@ cdef class TimeSeries: raise InvalidState('Series is closed') if name not in self.chunks: raise DoesNotExist('Invalid chunk!') - with self.fopen_lock: + with self.open_lock: if name not in self.open_chunks: self.open_chunks[name] = Chunk(self, os.path.join(self.path, str(name))) return self.open_chunks[name] @@ -100,6 +101,9 @@ cdef class TimeSeries: cdef Chunk chunk for chunk in self.data_in_memory.values(): chunk.close() + if self.mpm is not None: + self.mpm.cancel() + self.mpm = None self.closed = True cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1: @@ -137,6 +141,37 @@ cdef class TimeSeries: 'last_entry_synced': self.last_entry_synced } + cpdef void register_memory_pressure_manager(self, object mpm): + """ + Register a memory pressure manager. + + This registers :meth:`~tempsdb.series.TimeSeries.close_chunks` as remaining in severity + to be called each 30 minutes. + """ + self.mpm = mpm.register_on_remaining_in_severity(1, 30)(self.close_chunks) + + cpdef int close_chunks(self) except -1: + """ + Close all superficially opened chunks + """ + if self.last_chunk is None: + return 0 + if len(self.chunks) == 1: + return 0 + cdef: + unsigned long long chunk_name + list chunks = list(self.open_chunks.keys()) + unsigned long long last_chunk_name = self.last_chunk.name() + + with self.open_lock: + for chunk_name in chunks: + if chunk_name != last_chunk_name: + continue + else: + self.open_chunks[chunk_name].close() + del self.open_chunks[chunk_name] + return 0 + cpdef int append(self, unsigned long long timestamp, bytes data) except -1: """ Append an entry. @@ -160,21 +195,25 @@ cdef class TimeSeries: self.last_chunk = create_chunk(self, os.path.join(self.path, str(timestamp)), [(timestamp, data)]) self.open_chunks[timestamp] = self.last_chunk + self.chunks.append(timestamp) elif self.last_chunk.length() >= self.max_entries_per_chunk: self.last_chunk = create_chunk(self, os.path.join(self.path, str(timestamp)), [(timestamp, data)]) self.chunks.append(timestamp) else: self.last_chunk.append(timestamp, data) - self.last_entry_ts = timestamp return 0 cpdef int delete(self) except -1: """ - Erase this series from the disk + Erase this series from the disk. Series must be opened to do that. + + :raises InvalidState: series is not opened """ + if self.closed: + raise InvalidState('series is closed') self.close() shutil.rmtree(self.path)