From 96af6311877e5d4dbc27df528c0cb2d954ae15ce Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl>
Date: Mon, 30 Nov 2020 16:03:56 +0100
Subject: [PATCH] integrate with MemoryPressureManager

---
 docs/index.rst                   |  1 +
 docs/memory-pressure-manager.rst | 14 ++++++++++
 requirements.txt                 |  2 +-
 setup.py                         |  2 +-
 tempsdb/chunks.pxd               |  2 +-
 tempsdb/chunks.pyx               |  2 +-
 tempsdb/database.pxd             |  2 ++
 tempsdb/database.pyx             | 17 ++++++++++++
 tempsdb/series.pxd               |  6 +++-
 tempsdb/series.pyx               | 47 +++++++++++++++++++++++++++++---
 10 files changed, 86 insertions(+), 9 deletions(-)
 create mode 100644 docs/memory-pressure-manager.rst

diff --git a/docs/index.rst b/docs/index.rst
index d4621d4..d14fa1b 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -13,6 +13,7 @@ Welcome to tempsdb's documentation!
    usage
    exceptions
    chunks
+   memory-pressure-manager
 
 It tries to use mmap for reads and writes, and in general is as zero-copy as possible (ie. the
 only time data is unserialized is when a particular entry is read).
diff --git a/docs/memory-pressure-manager.rst b/docs/memory-pressure-manager.rst
new file mode 100644
index 0000000..0736847
--- /dev/null
+++ b/docs/memory-pressure-manager.rst
@@ -0,0 +1,14 @@
+Integration with Satella's MemoryPressureManager
+================================================
+
+This library integrates itself with satella's MemoryPressureManager_.
+
+.. _MemoryPressureManager: https://satella.readthedocs.io/en/latest/instrumentation/memory.html
+
+It will close the non-required chunks when remaining in severity 1 each 30 seconds.
+
+To attach a MPM to a database, use
+:meth:`tempsdb.database.Database.register_memory_pressure_manager`.
+
+Series will automatically inherit the parent database's `MemoryPressureManager`.
+
diff --git a/requirements.txt b/requirements.txt
index 2fe238c..fa7a848 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,4 +1,4 @@
-satella
+satella>=2.14.21
 ujson
 snakehouse
 six
diff --git a/setup.py b/setup.py
index fd459cc..93a82b1 100644
--- a/setup.py
+++ b/setup.py
@@ -12,7 +12,7 @@ def find_pyx(*path) -> tp.List[str]:
 setup(name='tempsdb',
       version='0.1_a2',
       packages=['tempsdb'],
-      install_requires=['satella', 'ujson'],
+      install_requires=['satella>=2.14.21', 'ujson'],
       ext_modules=build([Multibuild('tempsdb', find_pyx('tempsdb')), ],
                         compiler_directives={
                             'language_level': '3',
diff --git a/tempsdb/chunks.pxd b/tempsdb/chunks.pxd
index 1459962..2a74756 100644
--- a/tempsdb/chunks.pxd
+++ b/tempsdb/chunks.pxd
@@ -28,7 +28,7 @@ cdef class Chunk:
     cpdef unsigned int find_left(self, unsigned long long timestamp)
     cpdef unsigned int find_right(self, unsigned long long timestamp)
 
-    cdef unsigned long long name(self):
+    cdef inline unsigned long long name(self):
         """
         :return: the name of this chunk
         :rtype: int 
diff --git a/tempsdb/chunks.pyx b/tempsdb/chunks.pyx
index 4764a5c..65eedaa 100644
--- a/tempsdb/chunks.pyx
+++ b/tempsdb/chunks.pyx
@@ -184,7 +184,7 @@ cdef class Chunk:
         if self.closed:
             return
         if self.parent:
-            with self.parent.fopen_lock:
+            with self.parent.open_lock:
                 del self.parent.open_chunks[self.min_ts]
         self.parent = None
         self.mmap.close()
diff --git a/tempsdb/database.pxd b/tempsdb/database.pxd
index 3050a8b..10209f1 100644
--- a/tempsdb/database.pxd
+++ b/tempsdb/database.pxd
@@ -6,9 +6,11 @@ cdef class Database:
         str path
         bint closed
         object lock
+        object mpm
 
     cpdef void close(self)
     cpdef TimeSeries get_series(self, str name)
+    cpdef void register_memory_pressure_manager(self, object mpm)
 
 cpdef Database create_database(str path)
 
diff --git a/tempsdb/database.pyx b/tempsdb/database.pyx
index 24705dc..468a429 100644
--- a/tempsdb/database.pyx
+++ b/tempsdb/database.pyx
@@ -16,6 +16,7 @@ cdef class Database:
         self.closed = False
         self.open_series = {}
         self.lock = threading.Lock()
+        self.mpm = None
 
     cpdef TimeSeries get_series(self, name: str):
         """
@@ -41,8 +42,24 @@ cdef class Database:
                 if not os.path.isdir(path):
                     raise DoesNotExist('series %s does not exist' % (name, ))
                 self.open_series[name] = result = TimeSeries(path)
+                if self.mpm is not None:
+                    result.register_memory_pressure_manager(self.mpm)
         return result
 
+    cpdef void register_memory_pressure_manager(self, object mpm):
+        """
+        Register a satella MemoryPressureManager_ to close chunks if low on memory.
+        
+        .. _MemoryPressureManager: https://satella.readthedocs.io/en/latest/instrumentation/memory.html
+        
+        :param mpm: MemoryPressureManager to use
+        :type mpm: satella.instrumentation.memory.MemoryPressureManager
+        """
+        self.mpm = mpm
+        cdef TimeSeries series
+        for series in self.open_series.values():
+            series.register_memory_pressure_manager(mpm)
+
     def __del__(self):
         self.close()
 
diff --git a/tempsdb/series.pxd b/tempsdb/series.pxd
index 66e9084..ddf8575 100644
--- a/tempsdb/series.pxd
+++ b/tempsdb/series.pxd
@@ -4,7 +4,8 @@ from .chunks cimport Chunk
 cdef class TimeSeries:
     cdef:
         bint closed
-        object lock, fopen_lock
+        object lock             # lock to hold while writing
+        object open_lock        # lock to hold while opening or closing chunks
         readonly str path
         unsigned int max_entries_per_chunk
         readonly unsigned long long last_entry_synced
@@ -14,7 +15,9 @@ cdef class TimeSeries:
         dict open_chunks
         list data_in_memory
         Chunk last_chunk
+        object mpm      # satella.instrumentation.memory.MemoryPressureManager
 
+    cpdef void register_memory_pressure_manager(self, object mpm)
     cpdef int delete(self) except -1
     cdef dict _get_metadata(self)
     cpdef void close(self)
@@ -22,5 +25,6 @@ cdef class TimeSeries:
     cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1
     cpdef int append(self, unsigned long long timestamp, bytes data) except -1
     cpdef int sync(self) except -1
+    cpdef int close_chunks(self) except -1
 
 cpdef TimeSeries create_series(str path, unsigned int block_size, int max_entries_per_chunk)
diff --git a/tempsdb/series.pyx b/tempsdb/series.pyx
index 135ae6e..b1fa7df 100644
--- a/tempsdb/series.pyx
+++ b/tempsdb/series.pyx
@@ -21,8 +21,9 @@ cdef class TimeSeries:
     :ivar path: path to the directory containing the series (str)
     """
     def __init__(self, path: str):
+        self.mpm = None
         self.lock = threading.Lock()
-        self.fopen_lock = threading.Lock()
+        self.open_lock = threading.Lock()
         self.closed = False
 
         self.path = path
@@ -84,7 +85,7 @@ cdef class TimeSeries:
             raise InvalidState('Series is closed')
         if name not in self.chunks:
             raise DoesNotExist('Invalid chunk!')
-        with self.fopen_lock:
+        with self.open_lock:
             if name not in self.open_chunks:
                 self.open_chunks[name] = Chunk(self, os.path.join(self.path, str(name)))
         return self.open_chunks[name]
@@ -100,6 +101,9 @@ cdef class TimeSeries:
         cdef Chunk chunk
         for chunk in self.data_in_memory.values():
             chunk.close()
+        if self.mpm is not None:
+            self.mpm.cancel()
+            self.mpm = None
         self.closed = True
 
     cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1:
@@ -137,6 +141,37 @@ cdef class TimeSeries:
                 'last_entry_synced': self.last_entry_synced
             }
 
+    cpdef void register_memory_pressure_manager(self, object mpm):
+        """
+        Register a memory pressure manager.
+        
+        This registers :meth:`~tempsdb.series.TimeSeries.close_chunks` as remaining in severity
+        to be called each 30 minutes.
+        """
+        self.mpm = mpm.register_on_remaining_in_severity(1, 30)(self.close_chunks)
+
+    cpdef int close_chunks(self) except -1:
+        """
+        Close all superficially opened chunks
+        """
+        if self.last_chunk is None:
+            return 0
+        if len(self.chunks) == 1:
+            return 0
+        cdef:
+            unsigned long long chunk_name
+            list chunks = list(self.open_chunks.keys())
+            unsigned long long last_chunk_name = self.last_chunk.name()
+
+        with self.open_lock:
+            for chunk_name in chunks:
+                if chunk_name != last_chunk_name:
+                    continue
+                else:
+                    self.open_chunks[chunk_name].close()
+                    del self.open_chunks[chunk_name]
+        return 0
+
     cpdef int append(self, unsigned long long timestamp, bytes data) except -1:
         """
         Append an entry.
@@ -160,21 +195,25 @@ cdef class TimeSeries:
                 self.last_chunk = create_chunk(self, os.path.join(self.path, str(timestamp)),
                                                [(timestamp, data)])
                 self.open_chunks[timestamp] = self.last_chunk
+                self.chunks.append(timestamp)
             elif self.last_chunk.length() >= self.max_entries_per_chunk:
                 self.last_chunk = create_chunk(self, os.path.join(self.path, str(timestamp)),
                                                [(timestamp, data)])
                 self.chunks.append(timestamp)
             else:
                 self.last_chunk.append(timestamp, data)
-
             self.last_entry_ts = timestamp
 
         return 0
 
     cpdef int delete(self) except -1:
         """
-        Erase this series from the disk
+        Erase this series from the disk. Series must be opened to do that.
+        
+        :raises InvalidState: series is not opened
         """
+        if self.closed:
+            raise InvalidState('series is closed')
         self.close()
         shutil.rmtree(self.path)
 
-- 
GitLab