From 24f2d3e440364c818d5b6aedef31be6299521cf9 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl>
Date: Mon, 30 Nov 2020 21:53:03 +0100
Subject: [PATCH] v0.1

---
 setup.py              |   7 +--
 tempsdb/chunks.pxd    |   3 +-
 tempsdb/chunks.pyx    |  20 +++++--
 tempsdb/database.pxd  |   3 ++
 tempsdb/database.pyx  |  30 ++++++++++-
 tempsdb/iterators.pxd |   2 +-
 tempsdb/iterators.pyx |  12 ++---
 tempsdb/series.pxd    |  20 ++++---
 tempsdb/series.pyx    | 118 +++++++++++++++++++++++++++---------------
 tests/test_db.py      |  17 +++++-
 10 files changed, 166 insertions(+), 66 deletions(-)

diff --git a/setup.py b/setup.py
index 9c4d64f..a185a6a 100644
--- a/setup.py
+++ b/setup.py
@@ -16,11 +16,12 @@ def find_pyx(*path) -> tp.List[str]:
 # extensions = [Extension("tempsdb.chunks", ['tempsdb/chunks.pyx']),
 #               Extension("tempsdb.database", ['tempsdb/database.pyx']),
 #               Extension('tempsdb.exceptions', ['tempsdb/exceptions.pyx']),
-#               Extension('tempsdb.series', ['tempsdb/series.pyx'])]
-
+#               Extension('tempsdb.series', ['tempsdb/series.pyx']),
+#               Extension('tempsdb.iterators', ['tempsdb/iterators.pyx'])]
+#
 
 setup(name='tempsdb',
-      version='0.1_a5',
+      version='0.1',
       packages=['tempsdb'],
       install_requires=['satella>=2.14.21', 'ujson'],
       ext_modules=build([Multibuild('tempsdb', find_pyx('tempsdb')), ],
diff --git a/tempsdb/chunks.pxd b/tempsdb/chunks.pxd
index 34dece1..a93eea6 100644
--- a/tempsdb/chunks.pxd
+++ b/tempsdb/chunks.pxd
@@ -24,13 +24,14 @@ cdef class Chunk:
         object write_lock
 
     cpdef object iterate_indices(self, unsigned long starting_entry, unsigned long stopping_entry)
-    cpdef void close(self)
+    cpdef int close(self) except -1
     cdef tuple get_piece_at(self, unsigned int index)
     cpdef int append(self, unsigned long long timestamp, bytes data) except -1
     cpdef int sync(self) except -1
     cpdef unsigned int find_left(self, unsigned long long timestamp)
     cpdef unsigned int find_right(self, unsigned long long timestamp)
     cdef int extend(self) except -1
+    cpdef int delete(self) except -1
 
     cdef inline unsigned long long name(self):
         """
diff --git a/tempsdb/chunks.pyx b/tempsdb/chunks.pyx
index 13b5541..d310794 100644
--- a/tempsdb/chunks.pyx
+++ b/tempsdb/chunks.pyx
@@ -133,8 +133,11 @@ cdef class Chunk:
         Adds PAGE_SIZE bytes to this file
         """
         self.file_size += self.page_size
+        self.file.seek(0, 2)
+        cdef bytearray ba = bytearray(self.page_size)
+        ba[self.page_size-FOOTER_SIZE:self.page_size] = STRUCT_L.pack(self.entries)
+        self.file.write(ba)
         self.mmap.resize(self.file_size)
-        self.mmap[self.file_size-FOOTER_SIZE:self.file_size] = STRUCT_L.pack(self.entries)
 
     cdef unsigned long long get_timestamp_at(self, unsigned int index):
         """
@@ -148,6 +151,14 @@ cdef class Chunk:
         cdef unsigned long offset = HEADER_SIZE+index*self.block_size_plus
         return STRUCT_Q.unpack(self.mmap[offset:offset+TIMESTAMP_SIZE])[0]
 
+    cpdef int delete(self) except -1:
+        """
+        Close and delete this chunk.
+        """
+        self.close()
+        os.unlink(self.path)
+        return 0
+
     cpdef int append(self, unsigned long long timestamp, bytes data) except -1:
         """
         Append a record to this chunk
@@ -209,18 +220,19 @@ cdef class Chunk:
     def __len__(self):
         return self.length()
 
-    cpdef void close(self):
+    cpdef int close(self) except -1:
         """
         Close the chunk and close the allocated resources
         """
         if self.closed:
-            return
+            return 0
         if self.parent:
             with self.parent.open_lock:
-                del self.parent.open_chunks[self.min_ts]
+                del self.parent.open_chunks[self.name()]
         self.parent = None
         self.mmap.close()
         self.file.close()
+        return 0
 
     def __del__(self):
         self.close()
diff --git a/tempsdb/database.pxd b/tempsdb/database.pxd
index 10209f1..57240a2 100644
--- a/tempsdb/database.pxd
+++ b/tempsdb/database.pxd
@@ -11,6 +11,9 @@ cdef class Database:
     cpdef void close(self)
     cpdef TimeSeries get_series(self, str name)
     cpdef void register_memory_pressure_manager(self, object mpm)
+    cpdef TimeSeries create_series(self, str name, int block_size,
+                                   unsigned long entries_per_chunk,
+                                   int page_size=*)
 
 cpdef Database create_database(str path)
 
diff --git a/tempsdb/database.pyx b/tempsdb/database.pyx
index 468a429..bee8618 100644
--- a/tempsdb/database.pyx
+++ b/tempsdb/database.pyx
@@ -2,7 +2,7 @@ import os
 import threading
 
 from tempsdb.exceptions import DoesNotExist, AlreadyExists
-from .series cimport TimeSeries
+from .series cimport TimeSeries, create_series
 
 
 cdef class Database:
@@ -38,6 +38,9 @@ cdef class Database:
             with self.lock:
                 # Check a second time due to the lock
                 if name in self.open_series:
+                    if self.open_series[name].closed:
+                        del self.open_series[name]
+                        return self.open_series(name)
                     return self.open_series[name]
                 if not os.path.isdir(path):
                     raise DoesNotExist('series %s does not exist' % (name, ))
@@ -46,6 +49,31 @@ cdef class Database:
                     result.register_memory_pressure_manager(self.mpm)
         return result
 
+    cpdef TimeSeries create_series(self, str name, int block_size,
+                                   unsigned long entries_per_chunk,
+                                   int page_size=4096):
+        """
+        Create a new series
+        
+        :param name: name of the series
+        :type name: str
+        :param block_size: size of the data field
+        :type block_size: int
+        :param entries_per_chunk: entries per chunk file
+        :type entries_per_chunk: int
+        :param page_size: size of a single page
+        :type page_size: int
+        :return: new series
+        :rtype: TimeSeries
+        """
+        if os.path.isdir(os.path.join(self.path, name)):
+            raise AlreadyExists('Series already exists')
+        cdef TimeSeries series = create_series(os.path.join(self.name, name),
+                                               block_size,
+                                               entries_per_chunk, page_size=page_size)
+        self.open_series[name] = series
+        return series
+
     cpdef void register_memory_pressure_manager(self, object mpm):
         """
         Register a satella MemoryPressureManager_ to close chunks if low on memory.
diff --git a/tempsdb/iterators.pxd b/tempsdb/iterators.pxd
index 49b67d6..f5f4715 100644
--- a/tempsdb/iterators.pxd
+++ b/tempsdb/iterators.pxd
@@ -14,5 +14,5 @@ cdef class Iterator:
         Chunk current_chunk
 
     cpdef void close(self)
-    cdef int _get_next(self) except -1
+    cdef int get_next(self) except -1
     cpdef tuple next(self)
diff --git a/tempsdb/iterators.pyx b/tempsdb/iterators.pyx
index 216874f..73a0502 100644
--- a/tempsdb/iterators.pyx
+++ b/tempsdb/iterators.pyx
@@ -43,15 +43,15 @@ cdef class Iterator:
         self.closed = True
         cdef Chunk chunk
         for chunk in self.chunks:
-            self.parent.done_chunk(chunk.name())
+            self.parent.decref_chunk(chunk.name())
         self.chunks = None
 
-    cdef int _get_next(self) except -1:
+    cdef int get_next(self) except -1:
         """
         Fetch next chunk, set i, is_first, is_last and limit appropriately
         """
         if self.current_chunk is not None:
-            self.parent.done_chunk(self.current_chunk.name())
+            self.parent.decref_chunk(self.current_chunk.name())
             self.is_first = False
         else:
             self.is_first = True
@@ -92,9 +92,9 @@ cdef class Iterator:
         """
         try:
             if self.current_chunk is None:
-                self._get_next()
-            if self.i == self.limit:
-                self._get_next()
+                self.get_next()
+            elif self.i == self.limit:
+                self.get_next()
             return self.current_chunk.get_piece_at(self.i)
         except StopIteration:
             return None
diff --git a/tempsdb/series.pxd b/tempsdb/series.pxd
index 0725547..24a6ac3 100644
--- a/tempsdb/series.pxd
+++ b/tempsdb/series.pxd
@@ -14,25 +14,29 @@ cdef class TimeSeries:
         readonly unsigned long long last_entry_ts
         unsigned int page_size
         list chunks
-        dict refs_chunks        # tp.Dict[int, int]
-        dict open_chunks
-        list data_in_memory
+        dict refs_chunks        # type: tp.Dict[int, int]
+        dict open_chunks        # type: tp.Dict[int, Chunk]
         Chunk last_chunk
         object mpm      # satella.instrumentation.memory.MemoryPressureManager
 
-    cpdef void register_memory_pressure_manager(self, object mpm)
+    cdef void register_memory_pressure_manager(self, object mpm)
     cpdef int delete(self) except -1
-    cdef dict _get_metadata(self)
+    cdef dict get_metadata(self)
     cpdef void close(self)
-    cpdef void done_chunk(self, unsigned long long name)
-    cpdef Chunk open_chunk(self, unsigned long long name)
+    cdef void incref_chunk(self, unsigned long long name)
+    cdef void decref_chunk(self, unsigned long long name)
+    cdef Chunk open_chunk(self, unsigned long long name)
+    cdef int sync_metadata(self) except -1
     cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1
     cpdef int append(self, unsigned long long timestamp, bytes data) except -1
     cpdef int sync(self) except -1
     cpdef int close_chunks(self) except -1
     cpdef Iterator iterate_range(self, unsigned long long start, unsigned long long stop)
-    cpdef unsigned int get_index_of_chunk_for(self, unsigned long long timestamp)
+    cdef unsigned int get_index_of_chunk_for(self, unsigned long long timestamp)
     cpdef int trim(self, unsigned long long timestamp) except -1
 
+    cdef inline int get_references_for(self, unsigned long long timestamp):
+        return self.refs_chunks.get(timestamp, 0)
+
 cpdef TimeSeries create_series(str path, unsigned int block_size,
                                int max_entries_per_chunk, int page_size=*)
diff --git a/tempsdb/series.pyx b/tempsdb/series.pyx
index 5a19b4f..f7a350e 100644
--- a/tempsdb/series.pyx
+++ b/tempsdb/series.pyx
@@ -1,12 +1,9 @@
-import itertools
 import shutil
 import threading
-import time
 import ujson
 from satella.files import read_in_file
 
 from .chunks cimport create_chunk, Chunk
-from .database cimport Database
 from .exceptions import DoesNotExist, Corruption, InvalidState, AlreadyExists
 import os
 
@@ -27,8 +24,8 @@ cdef class TimeSeries:
     """
     def __init__(self, path: str):
         self.mpm = None
-        self.lock = threading.Lock()
-        self.open_lock = threading.Lock()
+        self.lock = threading.RLock()
+        self.open_lock = threading.RLock()
         self.refs_chunks = {}
         self.closed = False
 
@@ -41,9 +38,10 @@ cdef class TimeSeries:
             str metadata_s = read_in_file(os.path.join(self.path, METADATA_FILE_NAME),
                                          'utf-8', 'invalid json')
             dict metadata
+            str filename
             list files = os.listdir(self.path)
-            set files_s = set(files)
-            str chunk
+            unsigned long long last_chunk_name
+
         try:
             metadata = ujson.loads(metadata_s)      # raises ValueError
             # raises KeyError
@@ -58,32 +56,38 @@ cdef class TimeSeries:
 
         self.open_chunks = {}       # tp.Dict[int, Chunk]
 
-        files_s.remove('metadata.txt')
-        if not files_s:
+        if not len(files):
+            raise Corruption('Empty directory!')
+        elif len(files) == 1:
+            # empty series
             self.last_chunk = None
             self.chunks = []
             self.last_entry_ts = 0
         else:
             self.chunks = []        # type: tp.List[int] # sorted by ASC
-            for chunk in files:
+            for filename in files:
+                if filename == METADATA_FILE_NAME:
+                    continue
                 try:
-                    self.chunks.append(int(chunk))
+                    self.chunks.append(int(filename))
                 except ValueError:
-                    raise Corruption('Detected invalid file "%s"' % (chunk, ))
+                    raise Corruption('Detected invalid file "%s"' % (filename, ))
             self.chunks.sort()
-            self.last_chunk = Chunk(self, os.path.join(self.path, str(max(self.chunks))))
-            self.open_chunks[self.last_chunk.min_ts] = self.last_chunk
+
+            last_chunk_name = self.chunks[-1]
+            self.last_chunk = self.open_chunk(last_chunk_name)
             self.last_entry_ts = self.last_chunk.max_ts
 
-    cpdef void done_chunk(self, unsigned long long name):
-        """
-        Signal that we are done with given chunk and that it can be freed.
-        
-        Releases the reference to a chunk.
-        """
+    cdef void decref_chunk(self, unsigned long long name):
         self.refs_chunks[name] -= 1
 
-    cpdef Chunk open_chunk(self, unsigned long long name):
+    cdef void incref_chunk(self, unsigned long long name):
+        if name not in self.refs_chunks:
+            self.refs_chunks[name] = 1
+        else:
+            self.refs_chunks[name] += 1
+
+    cdef Chunk open_chunk(self, unsigned long long name):
         """
         Opens a provided chunk.
         
@@ -100,28 +104,51 @@ cdef class TimeSeries:
             raise InvalidState('Series is closed')
         if name not in self.chunks:
             raise DoesNotExist('Invalid chunk!')
+        cdef Chunk chunk
         with self.open_lock:
             if name not in self.open_chunks:
-                self.open_chunks[name] = Chunk(self,
-                                               os.path.join(self.path, str(name)),
-                                               self.page_size)
-        if name not in self.refs_chunks:
-            self.refs_chunks[name] = 1
-        else:
-            self.refs_chunks[name] += 1
-        return self.open_chunks[name]
+                self.open_chunks[name] = chunk = Chunk(self,
+                                                       os.path.join(self.path, str(name)),
+                                                       self.page_size)
+            else:
+                chunk = self.open_chunks[name]
+            self.incref_chunk(name)
+        return chunk
 
     cpdef int trim(self, unsigned long long timestamp) except -1:
         """
         Delete all entries earlier than timestamp.
         
         Note that this will drop entire chunks, so it may be possible that some entries will linger
-        on. This will not delete opened chunks, but it will delete them on release.
+        on. This will not delete currently opened chunks!
         
         :param timestamp: timestamp to delete entries earlier than
         :type timestamp: int
         """
-        # todo: write it
+        if len(self.chunks) == 1:
+            return 0
+        cdef:
+            unsigned long long chunk_to_delete
+            int refs
+        try:
+            with self.open_lock:
+                while len(self.chunks) >= 2 and timestamp > self.chunks[1]:
+                    chunk_to_delete = self.chunks[0]
+                    if chunk_to_delete in self.open_chunks:
+                        refs = self.refs_chunks.get(chunk_to_delete, 0)
+                        if not refs:
+                            self.open_chunks[chunk_to_delete].delete()
+                        else:
+                            # I would delete it, but it's open...
+                            return 0
+                    else:
+                        os.unlink(os.path.join(self.path, str(chunk_to_delete)))
+                    del self.chunks[0]
+                else:
+                    return 0
+        except IndexError:
+            return 0
+        return 0
 
     cpdef void close(self):
         """
@@ -131,15 +158,18 @@ cdef class TimeSeries:
         """
         if self.closed:
             return
-        cdef Chunk chunk
-        for chunk in self.data_in_memory.values():
+        cdef:
+            Chunk chunk
+            list open_chunks
+        open_chunks = list(self.open_chunks.values())
+        for chunk in open_chunks:
             chunk.close()
         if self.mpm is not None:
             self.mpm.cancel()
             self.mpm = None
         self.closed = True
 
-    cpdef unsigned int get_index_of_chunk_for(self, unsigned long long timestamp):
+    cdef unsigned int get_index_of_chunk_for(self, unsigned long long timestamp):
         """
         Return the index of chunk that should have given timestamp
         
@@ -210,9 +240,13 @@ cdef class TimeSeries:
         :type timestamp: int
         """
         self.last_entry_synced = timestamp
-        self.sync()
+        self.sync_metadata()
         return 0
 
+    cdef int sync_metadata(self) except -1:
+        with self.lock, open(os.path.join(self.path, METADATA_FILE_NAME), 'w') as f_out:
+            ujson.dump(self.get_metadata(), f_out)
+
     cpdef int sync(self) except -1:
         """
         Synchronize the data kept in the memory with these kept on disk
@@ -222,15 +256,14 @@ cdef class TimeSeries:
         if self.closed:
             raise InvalidState('series is closed')
 
-        with self.lock, open(os.path.join(self.path, METADATA_FILE_NAME), 'w') as f_out:
-            ujson.dump(self._get_metadata(), f_out)
+        self.sync_metadata()
 
-        if self.last_chunk:
+        if self.last_chunk is not None:
             self.last_chunk.sync()
 
         return 0
 
-    cdef dict _get_metadata(self):
+    cdef dict get_metadata(self):
         return {
                 'block_size': self.block_size,
                 'max_entries_per_chunk': self.max_entries_per_chunk,
@@ -238,7 +271,7 @@ cdef class TimeSeries:
                 'page_size': self.page_size
             }
 
-    cpdef void register_memory_pressure_manager(self, object mpm):
+    cdef void register_memory_pressure_manager(self, object mpm):
         """
         Register a memory pressure manager.
         
@@ -285,16 +318,19 @@ cdef class TimeSeries:
             raise InvalidState('series is closed')
         if len(data) != self.block_size:
             raise ValueError('Invalid block size, was %s should be %s' % (len(data), self.block_size))
-        if timestamp <= self.last_entry_ts:
+        if timestamp <= self.last_entry_ts and self.last_entry_ts:
             raise ValueError('Timestamp not larger than previous timestamp')
 
         with self.lock, self.open_lock:
             # If this is indeed our first chunk, or we've exceeded the limit of entries per chunk
             if self.last_chunk is None or self.last_chunk.length() >= self.max_entries_per_chunk:
                 # Create a next chunk
+                if self.last_chunk is not None:
+                    self.decref_chunk(self.last_chunk.name())
                 self.last_chunk = create_chunk(self, os.path.join(self.path, str(timestamp)),
                                                timestamp, data, self.page_size)
                 self.open_chunks[timestamp] = self.last_chunk
+                self.incref_chunk(timestamp)
                 self.chunks.append(timestamp)
             else:
                 self.last_chunk.append(timestamp, data)
diff --git a/tests/test_db.py b/tests/test_db.py
index 105fb27..58b3529 100644
--- a/tests/test_db.py
+++ b/tests/test_db.py
@@ -1,8 +1,19 @@
 import os
+import sys
 import unittest
 
 
 class TestDB(unittest.TestCase):
+    def test_write_series(self):
+        from tempsdb.series import create_series
+        series = create_series('test3', 10, 4096)
+        for i in range(8000):
+            series.append(i, b'\x00'*10)
+        series.trim(4100)
+
+        self.assertEqual(len(os.listdir('test3')), 2)
+        series.close()
+
     def test_create_series(self):
         from tempsdb.series import create_series
 
@@ -20,9 +31,13 @@ class TestDB(unittest.TestCase):
         self.do_verify_series(series, 0, 500)
         self.do_verify_series(series, 0, 1200)
         self.do_verify_series(series, 0, 1800)
+        series.close()
+        print(f'after close')
 
     def do_verify_series(self, series, start, stop):
-        items = list(series.iterate_range(start, stop))
+        it = series.iterate_range(start, stop)
+        items = list(it)
+        it.close()
         self.assertGreaterEqual(items[0][0], start)
         self.assertLessEqual(items[-1][0], stop)
 
-- 
GitLab