From 887b721b6a4b346cfae934f37214806fa93796d3 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl>
Date: Fri, 11 Dec 2020 16:38:11 +0100
Subject: [PATCH] started working on varlen series

---
 README.md            |   4 +-
 setup.py             |   2 +-
 tempsdb/database.pyx |   1 +
 tempsdb/series.pxd   |   1 +
 tempsdb/series.pyx   |  18 +++++
 tempsdb/varlen.pxd   |  23 +++++++
 tempsdb/varlen.pyx   | 152 +++++++++++++++++++++++++++++++++++++++++++
 7 files changed, 199 insertions(+), 2 deletions(-)
 create mode 100644 tempsdb/varlen.pxd
 create mode 100644 tempsdb/varlen.pyx

diff --git a/README.md b/README.md
index 9133982..1683474 100644
--- a/README.md
+++ b/README.md
@@ -52,10 +52,12 @@ Then copy your resulting wheel and install it via pip on the target system.
 
 # Changelog
 
-## v0.4.5
+## v0.5
 
 * if page_size is default, it won't be written as part of the metadata
 * added support for per-series metadata
+* added `TimeSeries.append_padded`
+* added variable length series
 
 ## v0.4.4
 
diff --git a/setup.py b/setup.py
index 73b48e0..826957f 100644
--- a/setup.py
+++ b/setup.py
@@ -28,7 +28,7 @@ if 'CI' in os.environ:
 
 
 setup(name='tempsdb',
-      version='0.4.5a3',
+      version='0.5a1',
       packages=['tempsdb'],
       install_requires=['satella>=2.14.24', 'ujson'],
       ext_modules=build([Multibuild('tempsdb', find_pyx('tempsdb')), ],
diff --git a/tempsdb/database.pyx b/tempsdb/database.pyx
index 9412572..8a57db6 100644
--- a/tempsdb/database.pyx
+++ b/tempsdb/database.pyx
@@ -206,4 +206,5 @@ cpdef Database create_database(str path):
     if os.path.exists(path):
         raise AlreadyExists('directory already exists')
     os.mkdir(path)
+    os.mkdir(os.path.join(path, 'varlen'))
     return Database(path)
diff --git a/tempsdb/series.pxd b/tempsdb/series.pxd
index 7f12446..5c6974e 100644
--- a/tempsdb/series.pxd
+++ b/tempsdb/series.pxd
@@ -32,6 +32,7 @@ cdef class TimeSeries:
     cdef int sync_metadata(self) except -1
     cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1
     cpdef int append(self, unsigned long long timestamp, bytes data) except -1
+    cpdef int append_padded(self, unsigned long long timestamp, bytes data) except -1
     cpdef int sync(self) except -1
     cpdef int close_chunks(self) except -1
     cpdef Iterator iterate_range(self, unsigned long long start, unsigned long long stop)
diff --git a/tempsdb/series.pyx b/tempsdb/series.pyx
index 98bc8a9..f6393c3 100644
--- a/tempsdb/series.pyx
+++ b/tempsdb/series.pyx
@@ -372,6 +372,24 @@ cdef class TimeSeries:
                         pass
         return 0
 
+    cpdef int append_padded(self, unsigned long long timestamp, bytes data) except -1:
+        """
+        Same as :meth:`~tempsdb.series.TimeSeries.append` but will accept data shorter
+        than block_size.
+        
+        It will be padded with zeros.
+
+        :param timestamp: timestamp, must be larger than current last_entry_ts
+        :param data: data to write
+        :raises ValueError: Timestamp not larger than previous timestamp or invalid block size
+        :raises InvalidState: the resource is closed
+        """
+        cdef int data_len = len(data)
+        if data_len > self.block_size:
+            raise ValueError('Data too long')
+        data = data + b'\x00'*(self.block_size - data_len)
+        self.append(timestamp, data)
+
     cpdef int append(self, unsigned long long timestamp, bytes data) except -1:
         """
         Append an entry.
diff --git a/tempsdb/varlen.pxd b/tempsdb/varlen.pxd
new file mode 100644
index 0000000..d5dabd3
--- /dev/null
+++ b/tempsdb/varlen.pxd
@@ -0,0 +1,23 @@
+from .series cimport TimeSeries
+
+
+cdef class VarlenSeries:
+    cdef:
+        bint closed
+        int size_field
+        object size_struct
+        readonly str path
+        readonly str name
+        TimeSeries root_series
+        list series
+        list length_profile
+        int max_entries_per_chunk
+        int current_maximum_length
+
+    cpdef int close(self) except -1
+    cpdef int get_length_for(self, int index)
+    cpdef int add_series(self) except -1
+    cpdef int append(self, unsigned long long timestamp, bytes data) except -1
+
+cpdef VarlenSeries create_varlen_series(str path, str name, int size_struct, list length_profile,
+                                        int max_entries_per_chunk)
diff --git a/tempsdb/varlen.pyx b/tempsdb/varlen.pyx
new file mode 100644
index 0000000..d259edc
--- /dev/null
+++ b/tempsdb/varlen.pyx
@@ -0,0 +1,152 @@
+import os
+import struct
+
+from tempsdb.exceptions import Corruption, AlreadyExists
+from .series cimport TimeSeries, create_series
+
+
+cdef class VarlenSeries:
+    """
+    A time series housing variable length data.
+
+    It does that by splitting the data into chunks and encoding them in multiple
+    series.
+
+    :param path: path to directory containing the series
+    :param name: name of the series
+    """
+    def __init__(self, path: str, name: str):
+        self.closed = False
+        self.path = path
+        self.name = name
+        self.root_series = TimeSeries(os.path.join(path, 'root'), 'root')
+        self.max_entries_per_chunk = self.root_series.max_entries_per_chunk
+        try:
+            self.size_field = self.root_series.metadata['size_field']
+            self.length_profile = self.root_series.metadata['length_profile']
+        except (KeyError, TypeError):
+            raise Corruption('required keys not present or invalid in root subseries')
+
+        if self.size_field == 1:
+            self.size_struct = struct.Struct('<B')
+        elif self.size_field == 2:
+            self.size_struct = struct.Struct('<H')
+        elif self.size_field == 4:
+            self.size_struct = struct.Struct('<L')
+        else:
+            self.root_series.close()
+            raise Corruption('Invalid size_field!')
+
+        cdef:
+            list sub_series = []
+            str dir_name
+        for dir_name in os.listdir(path):
+            if dir_name != 'root':
+                sub_series.append(dir_name)
+
+        try:
+            sub_series.sort(key=lambda x: int(x))
+        except ValueError:
+            raise Corruption('Invalid directory name')
+
+        cdef:
+            int i = 1
+            int tot_length = self.length_profile[0]
+        self.series = [self.root_series]
+        for dir_name in sub_series:
+            tot_length += self.get_length_for(i)
+            i += 1
+            self.series.append(TimeSeries(os.path.join(path, dir_name), dir_name))
+
+        self.current_maximum_length = tot_length
+
+    cpdef int append(self, unsigned long long timestamp, bytes data) except -1:
+        """
+        Append an entry to the series
+        
+        :param timestamp: timestamp to append it with
+        :param data: data to write
+        """
+        cdef int data_len = len(data)
+        if data_len < self.get_length_for(0):
+            data = self.size_struct.pack(len(data)) + data
+            self.root_series.append_padded(timestamp, data)
+            return 0
+
+        while self.current_maximum_length < data_len:
+            self.add_series()
+
+        # At this point data is too large to be put in a single series
+        cdef:
+            bytes data_to_put = self.size_struct.pack(len(data)) + data[:self.get_length_for(0)]
+            int pointer = self.get_length_for(0)
+            int segment = 1
+            int cur_len
+        self.root_series.append(timestamp, data_to_put)
+        while pointer < len(data):
+            cur_len = self.get_length_for(segment)
+            data_to_put = data[pointer:pointer+cur_len]
+            self.series[segment].append_padded(timestamp, data_to_put)
+            pointer += cur_len
+            segment += 1
+
+    cpdef int add_series(self) except -1:
+        """
+        Creates a new series to hold part of ours data 
+        
+        Updates :attr:`~tempsdb.varlen.VarlenSeries.current_maximum_length`.
+        """
+        cdef:
+            int new_name = len(self.series)
+            int new_len = self.get_length_for(new_name)
+            str new_name_s = str(new_name)
+            TimeSeries series = create_series(os.path.join(self.path, new_name_s),
+                                              new_name_s,
+                                              new_len,
+                                              self.max_entries_per_chunk)
+        self.series.append(series)
+        self.current_maximum_length += new_len
+
+    cpdef int get_length_for(self, int index):
+        """
+        Get the length of the time series at a particular index.
+        
+        :param index: index of the time series, numbered from 0
+        """
+        return self.length_profile[-1 if index >= len(self.length_profile) else index]
+
+    cpdef int close(self) except -1:
+        """
+        Close this series
+        """
+        if self.closed:
+            return 0
+
+        self.closed = True
+        cdef TimeSeries series
+        for series in self.series:
+            series.close()
+
+
+cpdef VarlenSeries create_varlen_series(str path, str name, int size_struct, list length_profile,
+                                        int max_entries_per_chunk):
+    """
+    Create a variable length series
+    
+    :raises AlreadyExists: directory exists at given path
+    :raises ValueError: invalid length profile or max_entries_per_chunk
+    """
+    if os.path.exists(path):
+        raise AlreadyExists('directory present at paht')
+    if not length_profile or not max_entries_per_chunk:
+        raise ValueError('invalid parameter')
+
+    os.mkdir(path)
+    cdef TimeSeries root_series = create_series(os.path.join(path, 'root'),
+                                                'root',
+                                                size_struct+length_profile[0],
+                                                max_entries_per_chunk)
+    root_series.set_metadata({'size_field': size_struct,
+                              'length_profile': length_profile})
+    root_series.close()
+    return VarlenSeries(path, name)
-- 
GitLab