From afc78609d75c93421b87596752b9e38109d7d757 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl>
Date: Mon, 30 Nov 2020 19:08:23 +0100
Subject: [PATCH] added correct reading

---
 setup.py            |   2 +-
 tempsdb/chunks.pxd  |   5 +-
 tempsdb/chunks.pyx  |  56 +++++++++-------------
 tempsdb/series.pxd  |   3 ++
 tempsdb/series.pyx  | 114 ++++++++++++++++++++++++++++++++++++--------
 tests/test_db.py    |  24 +++++++++-
 unittest.Dockerfile |   6 ++-
 7 files changed, 152 insertions(+), 58 deletions(-)

diff --git a/setup.py b/setup.py
index e9cffff..d4b5d73 100644
--- a/setup.py
+++ b/setup.py
@@ -20,7 +20,7 @@ def find_pyx(*path) -> tp.List[str]:
 
 
 setup(name='tempsdb',
-      version='0.1_a3',
+      version='0.1_a4',
       packages=['tempsdb'],
       install_requires=['satella>=2.14.21', 'ujson'],
       ext_modules=build([Multibuild('tempsdb', find_pyx('tempsdb')), ],
diff --git a/tempsdb/chunks.pxd b/tempsdb/chunks.pxd
index 335d48a..4a05a41 100644
--- a/tempsdb/chunks.pxd
+++ b/tempsdb/chunks.pxd
@@ -23,7 +23,7 @@ cdef class Chunk:
         readonly bint writable
         object write_lock
 
-    cpdef object iterate_range(self, unsigned long starting_entry, unsigned long stopping_entry)
+    cpdef object iterate_indices(self, unsigned long starting_entry, unsigned long stopping_entry)
     cpdef void close(self)
     cpdef tuple get_piece_at(self, unsigned int index)
     cpdef int append(self, unsigned long long timestamp, bytes data) except -1
@@ -49,4 +49,5 @@ cdef class Chunk:
     cdef unsigned long long get_timestamp_at(self, unsigned int index)
 
 
-cpdef Chunk create_chunk(TimeSeries parent, str path, list data, int page_size)
+cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timestamp,
+                         bytes data, int page_size)
diff --git a/tempsdb/chunks.pyx b/tempsdb/chunks.pyx
index 0f114cf..dbea93a 100644
--- a/tempsdb/chunks.pyx
+++ b/tempsdb/chunks.pyx
@@ -11,6 +11,7 @@ DEF TIMESTAMP_SIZE = 8
 DEF FOOTER_SIZE = 4
 STRUCT_Q = struct.Struct('<Q')
 STRUCT_L = struct.Struct('<L')
+STRUCT_LQ = struct.Struct('<LQ')
 
 
 cdef class Chunk:
@@ -44,23 +45,23 @@ cdef class Chunk:
         self.file = open(self.path, 'rb+' if self.writable else 'rb')
         try:
             if self.writable:
-                self.mmap = mmap.mmap(self.file.fileno(), self.file_size)
+                self.mmap = mmap.mmap(self.file.fileno(), 0)
             else:
-                self.mmap = mmap.mmap(self.file.fileno(), self.file_size, access=mmap.ACCESS_READ)
+                self.mmap = mmap.mmap(self.file.fileno(), 0, access=mmap.ACCESS_READ)
         except OSError as e:
             self.file.close()
             self.closed = True
             raise Corruption(f'Empty chunk file!')
         try:
-            self.block_size, = STRUCT_L.unpack(self.mmap[:HEADER_SIZE])
+            self.block_size, self.min_ts = STRUCT_LQ.unpack(self.mmap[0:HEADER_SIZE+TIMESTAMP_SIZE])
             self.block_size_plus = self.block_size + TIMESTAMP_SIZE
         except struct.error:
             self.close()
             raise Corruption('Could not read the header of the chunk file %s' % (self.path, ))
+
         self.entries, = STRUCT_L.unpack(self.mmap[self.file_size-FOOTER_SIZE:self.file_size])
         self.pointer = self.entries*self.block_size_plus+HEADER_SIZE
         self.max_ts = self.get_timestamp_at(self.entries-1)
-        self.min_ts = self.get_timestamp_at(0)
 
     cpdef unsigned int find_left(self, unsigned long long timestamp):
         """
@@ -161,7 +162,9 @@ cdef class Chunk:
         if self.closed or not self.writable:
             raise InvalidState('chunk is closed')
         if len(data) != self.block_size:
-            raise ValueError('data not equal in length to block size!')
+            raise ValueError('data (%s) not equal in length to block size (%s)!' % (
+                len(data), self.block_size
+            ))
         if timestamp <= self.max_ts:
             raise ValueError('invalid timestamp')
 
@@ -180,13 +183,13 @@ cdef class Chunk:
             self.pointer += self.block_size_plus
         return 0
 
-    cpdef object iterate_range(self, unsigned long starting_entry, unsigned long stopping_entry):
+    cpdef object iterate_indices(self, unsigned long starting_entry, unsigned long stopping_entry):
         """
-        Return a partial iterator starting at starting_entry and ending at stopping_entry (exclusive)
+        Return a partial iterator starting at starting_entry and ending at stopping_entry (exclusive).
         
-        :param starting_entry: number of starting entry
+        :param starting_entry: index of starting entry
         :type starting_entry: int
-        :param stopping_entry: number of stopping entry
+        :param stopping_entry: index of stopping entry
         :type stopping_entry:
         :return: an iterator
         :rtype: tp.Iterator[tp.Tuple[int, bytes]]
@@ -239,7 +242,8 @@ cdef class Chunk:
         return ts, self.mmap[starting_index+TIMESTAMP_SIZE:stopping_index]
 
 
-cpdef Chunk create_chunk(TimeSeries parent, str path, list data, int page_size):
+cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timestamp,
+                         bytes data, int page_size):
     """
     Creates a new chunk on disk
     
@@ -247,9 +251,10 @@ cpdef Chunk create_chunk(TimeSeries parent, str path, list data, int page_size):
     :type parent: TimeSeries
     :param path: path to the new chunk file
     :type path: str
-    :param data: data to write, list of tuple (timestamp, entry to write).
-        Must be nonempty and sorted by timestamp.
-    :type data: tp.List[tp.Tuple[int, bytes]]
+    :param timestamp: timestamp for first entry to contain
+    :type timestamp: int
+    :param data: data of the first entry
+    :type data: bytes
     :param page_size: size of a single page for storage
     :type page_size: int
     :raises ValueError: entries in data were not of equal size, or data was empty or data
@@ -264,28 +269,14 @@ cpdef Chunk create_chunk(TimeSeries parent, str path, list data, int page_size):
     cdef:
         bytes b
         unsigned long long ts
-        unsigned long block_size = len(data[0][1])
+        unsigned long block_size = len(data)
         unsigned long file_size = 0
         unsigned long long last_ts = 0
-        unsigned int entries = len(data)
+        unsigned int entries = 1
         bint first_element = True
-
     file_size += file.write(STRUCT_L.pack(block_size))
-    try:
-        for ts, b in data:
-            if not first_element:
-                if ts <= last_ts:
-                    raise ValueError('Timestamp appeared twice or data was not sorted')
-            if len(b) != block_size:
-                raise ValueError('Block size has entries of not equal length')
-            file_size += file.write(STRUCT_Q.pack(ts))
-            file_size += file.write(b)
-            last_ts = ts
-            first_element = False
-    except ValueError:
-        file.close()
-        os.unlink(path)
-        raise
+    file_size += file.write(STRUCT_Q.pack(timestamp))
+    file_size += file.write(data)
 
     # Pad this thing to page_size
     cdef unsigned long bytes_to_pad = page_size - (file_size % page_size)
@@ -293,9 +284,8 @@ cpdef Chunk create_chunk(TimeSeries parent, str path, list data, int page_size):
 
     # Create a footer at the end
     cdef bytearray footer = bytearray(page_size)
-    footer[-4:] = STRUCT_L.pack(entries)
+    footer[-4:] = b'\x01\x00\x00\x00'   # 1 in little endian
     file.write(footer)
     file.close()
-    print('Finished creating chunk')
     return Chunk(parent, path, page_size)
 
diff --git a/tempsdb/series.pxd b/tempsdb/series.pxd
index 5ef0aa2..83be72e 100644
--- a/tempsdb/series.pxd
+++ b/tempsdb/series.pxd
@@ -27,6 +27,9 @@ cdef class TimeSeries:
     cpdef int append(self, unsigned long long timestamp, bytes data) except -1
     cpdef int sync(self) except -1
     cpdef int close_chunks(self) except -1
+    cpdef object iterate_range(self, unsigned long long start, unsigned long long stop)
+    cpdef unsigned int get_index_of_chunk_for(self, unsigned long long timestamp)
+
 
 cpdef TimeSeries create_series(str path, unsigned int block_size,
                                int max_entries_per_chunk, int page_size=*)
diff --git a/tempsdb/series.pyx b/tempsdb/series.pyx
index 81e10bf..2eaa526 100644
--- a/tempsdb/series.pyx
+++ b/tempsdb/series.pyx
@@ -1,3 +1,4 @@
+import itertools
 import shutil
 import threading
 import time
@@ -11,6 +12,10 @@ import os
 
 DEF METADATA_FILE_NAME = 'metadata.txt'
 
+
+
+
+
 cdef class TimeSeries:
     """
     This is thread-safe
@@ -39,9 +44,16 @@ cdef class TimeSeries:
             set files_s = set(files)
             str chunk
         try:
-            metadata = ujson.loads(metadata_s)
+            metadata = ujson.loads(metadata_s)      # raises ValueError
+            # raises KeyError
+            self.block_size = metadata['block_size']
+            self.max_entries_per_chunk = metadata['max_entries_per_chunk']
+            self.last_entry_synced = metadata['last_entry_synced']
+            self.page_size = metadata['page_size']
         except ValueError:
             raise Corruption('Corrupted series')
+        except KeyError:
+            raise Corruption('Could not read metadata item')
 
         self.open_chunks = {}       # tp.Dict[int, Chunk]
 
@@ -57,16 +69,7 @@ cdef class TimeSeries:
                     self.chunks.append(int(chunk))
                 except ValueError:
                     raise Corruption('Detected invalid file "%s"' % (chunk, ))
-
             self.chunks.sort()
-            try:
-                self.block_size = metadata['block_size']
-                self.max_entries_per_chunk = metadata['max_entries_per_chunk']
-                self.last_entry_synced = metadata['last_entry_synced']
-                self.page_size = metadata['page_size']
-            except KeyError:
-                raise Corruption('Could not read metadata item')
-
             self.last_chunk = Chunk(self, os.path.join(self.path, str(max(self.chunks))))
             self.open_chunks[self.last_chunk.min_ts] = self.last_chunk
             self.last_entry_ts = self.last_chunk.max_ts
@@ -88,7 +91,9 @@ cdef class TimeSeries:
             raise DoesNotExist('Invalid chunk!')
         with self.open_lock:
             if name not in self.open_chunks:
-                self.open_chunks[name] = Chunk(self, os.path.join(self.path, str(name)))
+                self.open_chunks[name] = Chunk(self,
+                                               os.path.join(self.path, str(name)),
+                                               self.page_size)
         return self.open_chunks[name]
 
     cpdef void close(self):
@@ -107,6 +112,79 @@ cdef class TimeSeries:
             self.mpm = None
         self.closed = True
 
+    cpdef unsigned int get_index_of_chunk_for(self, unsigned long long timestamp):
+        """
+        Return the index of chunk that should have given timestamp
+        
+        :param timestamp: timestamp to check, larger than first timestamp,
+            smaller or equal to current timestamp
+        :type timestamp: int
+        :return: name of the starting chunk
+        :rtype: int
+        """
+        cdef:
+            unsigned int lo = 0
+            unsigned int hi = len(self.chunks)
+            unsigned int mid
+        while lo < hi:
+            mid = (lo+hi)//2
+            if self.chunks[mid] < timestamp:
+                lo = mid+1
+            else:
+                hi = mid
+
+        try:
+            if self.chunks[lo] == timestamp:
+                return lo
+            else:
+                return lo-1
+        except IndexError:
+            return len(self.chunks)-1
+
+    cpdef object iterate_range(self, unsigned long long start, unsigned long long stop):
+        """
+        Return an iterator through collected data with given timestamps.
+        
+        :param start: timestamp to start at
+        :type start: int
+        :param stop: timestamp to stop at
+        :type stop: int
+        :return: an iterator with the data
+        :rtype: tp.Iterator[tp.Tuple[int, bytes]]
+        :raises ValueError: start larger than stop
+        """
+        if self.last_chunk is None:
+            return iter([])
+        if start > stop:
+            raise ValueError('start larger than stop')
+        if start < self.chunks[0]:
+            start = self.chunks[0]
+        if stop > self.last_entry_ts:
+            stop = self.last_entry_ts
+
+        cdef:
+            unsigned int ch_start = self.get_index_of_chunk_for(start)
+            unsigned int ch_stop = self.get_index_of_chunk_for(stop)
+            list iterator = []
+            bint is_first
+            bint is_last
+            unsigned int chunk_index
+            Chunk chunk
+
+        for chunk_index in range(ch_start, ch_stop+1):
+            chunk = self.open_chunk(self.chunks[chunk_index])
+            is_first = chunk_index == ch_start
+            is_last = chunk_index == ch_stop
+            if is_first and is_last:
+                return chunk.iterate_indices(chunk.find_left(start), chunk.find_right(stop))
+            elif is_first:
+                iterator.append(chunk.iterate_indices(chunk.find_left(start), chunk.entries))
+            elif is_last:
+                iterator.append(chunk.iterate_indices(0, chunk.find_right(stop)))
+            else:
+                iterator.append(chunk.iterate_indices(0, chunk.entries))
+        return itertools.chain(*iterator)
+
     cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1:
         """
         Mark the series as synced up to particular timestamp
@@ -188,20 +266,18 @@ cdef class TimeSeries:
         if self.closed:
             raise InvalidState('series is closed')
         if len(data) != self.block_size:
-            raise ValueError('Invalid block size')
+            raise ValueError('Invalid block size, was %s should be %s' % (len(data), self.block_size))
         if timestamp <= self.last_entry_ts:
             raise ValueError('Timestamp not larger than previous timestamp')
 
-        with self.lock:
-            if self.last_chunk is None:
+        with self.lock, self.open_lock:
+            # If this is indeed our first chunk, or we've exceeded the limit of entries per chunk
+            if self.last_chunk is None or self.last_chunk.length() >= self.max_entries_per_chunk:
+                # Create a next chunk
                 self.last_chunk = create_chunk(self, os.path.join(self.path, str(timestamp)),
-                                               [(timestamp, data)], self.page_size)
+                                               timestamp, data, self.page_size)
                 self.open_chunks[timestamp] = self.last_chunk
                 self.chunks.append(timestamp)
-            elif self.last_chunk.length() >= self.max_entries_per_chunk:
-                self.last_chunk = create_chunk(self, os.path.join(self.path, str(timestamp)),
-                                               [(timestamp, data)], self.page_size)
-                self.chunks.append(timestamp)
             else:
                 self.last_chunk.append(timestamp, data)
             self.last_entry_ts = timestamp
diff --git a/tests/test_db.py b/tests/test_db.py
index f3d3a1e..d070e0e 100644
--- a/tests/test_db.py
+++ b/tests/test_db.py
@@ -6,12 +6,32 @@ class TestDB(unittest.TestCase):
     def test_create_series(self):
         from tempsdb.series import create_series
 
-        series = create_series('test', 8, 10)
+        series = create_series('test', 1, 10)
+        start, ts = 127, 100
+        for i in range(20):
+            series.append(ts, bytes(bytearray([start])))
+            start -= 1
+            ts += 100
+
+        self.do_verify_series(series, 0, 2000)
+        self.do_verify_series(series, 500, 2000)
+        self.do_verify_series(series, 1000, 2000)
+        self.do_verify_series(series, 1500, 2000)
+        self.do_verify_series(series, 0, 500)
+        self.do_verify_series(series, 0, 1200)
+        self.do_verify_series(series, 0, 1800)
+
+    def do_verify_series(self, series, start, stop):
+        items = list(series.iterate_range(start, stop))
+        self.assertGreaterEqual(items[0][0], start)
+        self.assertLessEqual(items[-1][0], stop)
 
     def test_chunk(self):
         from tempsdb.chunks import create_chunk
         data = [(0, b'ala '), (1, b'ma  '), (4, b'kota')]
-        chunk = create_chunk(None, 'chunk.db', data, 4096)
+        chunk = create_chunk(None, 'chunk.db', 0, b'ala ', 4096)
+        chunk.append(1, b'ma  ')
+        chunk.append(4, b'kota')
         self.assertEqual(chunk.min_ts, 0)
         self.assertEqual(chunk.max_ts, 4)
         self.assertEqual(chunk.block_size, 4)
diff --git a/unittest.Dockerfile b/unittest.Dockerfile
index bfcae7c..8d69b7f 100644
--- a/unittest.Dockerfile
+++ b/unittest.Dockerfile
@@ -2,8 +2,12 @@ FROM python:3.8
 
 RUN pip install satella snakehouse nose2 wheel ujson
 
-ADD . /app
+ADD tempsdb /app/tempsdb
+ADD setup.py /app/setup.py
+ADD setup.cfg /app/setup.cfg
 WORKDIR /app
 RUN python setup.py build_ext --inplace
 
+ADD tests /app/tests
+
 CMD ["nose2", "-vv"]
-- 
GitLab