From 1794489ff07962a845282f324e22f726ca559c76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Sat, 28 Nov 2020 17:58:51 +0100 Subject: [PATCH] fix tests, some more series --- .circleci/config.yml | 4 +- .dockerignore | 3 ++ .gitignore | 2 + setup.cfg | 4 -- setup.py | 3 +- tempsdb/__init__.py | 2 + tempsdb/chunks.pyx | 21 ++++++--- tempsdb/series.pxd | 19 ++++++-- tempsdb/series.pyx | 108 ++++++++++++++++++++++++++++++++++++++++--- tests/test_db.py | 4 +- unittest.Dockerfile | 8 ++++ 11 files changed, 151 insertions(+), 27 deletions(-) create mode 100644 .dockerignore create mode 100644 unittest.Dockerfile diff --git a/.circleci/config.yml b/.circleci/config.yml index 933e85d..0a9101c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -14,12 +14,10 @@ jobs: - run: command: | sudo pip install satella snakehouse - sudo python setup.py install - sudo pip install pytest-xdist pytest-cov pytest pytest-forked pluggy py mock name: Install necessary modules - run: command: | - pytest + sudo python setup.py test name: Test workflows: diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..eb2762e --- /dev/null +++ b/.dockerignore @@ -0,0 +1,3 @@ +.git +.circleci +docs diff --git a/.gitignore b/.gitignore index 626a3d1..e51a134 100644 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,8 @@ share/python-wheels/ *.egg MANIFEST docs/_build +*.c +*.h # PyInstaller # Usually these files are written by a python script from a template diff --git a/setup.cfg b/setup.cfg index d18f3d6..c9fdb14 100644 --- a/setup.cfg +++ b/setup.cfg @@ -32,7 +32,3 @@ max-line-length = 100 [pep8] max-line-length = 100 - -[bdist_wheel] -universal = 1 - diff --git a/setup.py b/setup.py index 056b905..4bfc77d 100644 --- a/setup.py +++ b/setup.py @@ -18,6 +18,5 @@ setup(name='tempsdb', compiler_directives={ 'language_level': '3', }), - python_requires='!=2.7.*,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,!=3.7.*', - zip_safe=False + python_requires='!=2.7.*,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,!=3.7.*' ) diff --git a/tempsdb/__init__.py b/tempsdb/__init__.py index e69de29..93a0b2e 100644 --- a/tempsdb/__init__.py +++ b/tempsdb/__init__.py @@ -0,0 +1,2 @@ +from tempsdb.__bootstrap__ import bootstrap_cython_submodules +bootstrap_cython_submodules() diff --git a/tempsdb/chunks.pyx b/tempsdb/chunks.pyx index 3640b7a..524f5a6 100644 --- a/tempsdb/chunks.pyx +++ b/tempsdb/chunks.pyx @@ -19,6 +19,7 @@ cdef class Chunk: :ivar min_ts: timestamp of the first entry stored (int) :ivar max_ts: timestamp of the last entry stored (int) :ivar block_size: size of the data entries (int) + :ivar entries: amount of entries in this chunk (int) """ def __init__(self, path: str): self.closed = False @@ -31,7 +32,7 @@ cdef class Chunk: raise Corruption('Empty chunk file!') try: self.min_ts, self.max_ts, self.block_size = STRUCT_QQL.unpack(self.file.read(16)) - except IOError: + except struct.error: raise Corruption('Could not read the header of the chunk file %s' % (self.path, )) self.pointer = 8 self.entries = (os.path.getsize(self.path)-20) / self.block_size @@ -65,16 +66,17 @@ cdef class Chunk: return ts, self.mmap[starting_index+8:stopping_index] - cpdef Chunk create_chunk(str path, list data): """ Creates a new chunk on disk :param path: path to the new chunk file :type path: str - :param data: data to write, must be nonempty + :param data: data to write, list of tuple (timestamp, entry to write). + Must be nonempty and sorted by timestamp. :type data: tp.List[tp.Tuple[int, bytes]] - :raises ValueError: entries in data were not of equal size, or data was empty + :raises ValueError: entries in data were not of equal size, or data was empty or data + was not sorted by timestamp or same timestamp appeared twice """ if not data: raise ValueError('Data is empty') @@ -85,20 +87,27 @@ cpdef Chunk create_chunk(str path, list data): bytes b unsigned long long ts unsigned long block_size = len(data[0][1]) + unsigned long long last_ts = 0 + bint first_element = True for ts, b in data: if ts < min_ts: min_ts = ts elif ts > max_ts: max_ts = ts - file.write(STRUCT_QQL.pack(min_ts, max_ts, len(data[0][1]))) + file.write(STRUCT_QQL.pack(min_ts, max_ts, block_size)) try: for ts, b in data: + if not first_element: + if ts <= last_ts: + raise ValueError('Timestamp appeared twice or data was not sorted') if len(b) != block_size: raise ValueError('Block size has entries of not equal length') file.write(STRUCT_Q.pack(ts)) file.write(b) - file.close() + last_ts = ts + first_element = False + file.close() except ValueError: file.close() os.unlink(path) diff --git a/tempsdb/series.pxd b/tempsdb/series.pxd index f5678d3..7aefa9b 100644 --- a/tempsdb/series.pxd +++ b/tempsdb/series.pxd @@ -2,11 +2,24 @@ from .database cimport Database cdef class TimeSeries: cdef: + object lock str path Database parent str name - int block_size - unsigned long long last_entry_ts + unsigned int max_entries_per_chunk + double last_synced + readonly double interval_between_synces + readonly unsigned long long last_entry_synced + readonly unsigned int block_size + readonly unsigned long long last_entry_ts list chunks + dict open_chunks + list data_in_memory + + cdef dict _get_metadata(self) + cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1 + cpdef int try_sync(self) except -1 + cpdef int _sync_metadata(self) except -1 + cpdef int put(self, unsigned long long timestamp, bytes data) except -1 + cpdef int sync(self) except -1 - cpdef void sync(self) diff --git a/tempsdb/series.pyx b/tempsdb/series.pyx index 79fddcb..d87eba5 100644 --- a/tempsdb/series.pyx +++ b/tempsdb/series.pyx @@ -1,14 +1,26 @@ +import threading +import time + import ujson from satella.files import read_in_file -from .chunks cimport Chunk +from .chunks cimport Chunk, create_chunk from .database cimport Database from .exceptions import DoesNotExist, Corruption import os +DEF METADATA_FILE_NAME = 'metadata.txt' cdef class TimeSeries: + """ + This is thread-safe + + :ivar last_entry_ts: timestamp of the last entry added (int) + :ivar last_entry_synced: timestamp of the last synchronized entry (int) + :ivar block_size: size of the writable block of data + """ def __init__(self, parent: Database, name: str): + self.lock = threading.Lock() self.parent = parent self.name = name @@ -18,7 +30,7 @@ cdef class TimeSeries: self.path = os.path.join(self.parent.path, self.name) - cdef str metadata_s = read_in_file(os.path.join(self.path, 'metadata.txt'), + cdef str metadata_s = read_in_file(os.path.join(self.path, METADATA_FILE_NAME), 'utf-8', 'invalid json') cdef dict metadata try: @@ -29,18 +41,100 @@ cdef class TimeSeries: cdef list files = os.path.listdir(self.path) cdef set files_s = set(files) files_s.remove('metadata.txt') - self.chunks = [] + self.chunks = [] # type: tp.List[int] # sorted by ASC cdef str chunk - for chunk in files_s: + for chunk in files: try: self.chunks.append(int(chunk)) except ValueError: raise Corruption('Detected invalid file "%s"' % (chunk, )) - self.last_entry_ts = metadata['last_entry_ts'] - self.block_size = metadata['block_size'] + self.chunks.sort() + try: + self.last_entry_ts = metadata['last_entry_ts'] + self.block_size = metadata['block_size'] + self.max_entries_per_block = metadata['max_entries_per_block'] + self.last_entry_synced = metadata['last_entry_synced'] + self.interval_between_synces = metadata['interval_between_synces'] + except KeyError: + raise Corruption('Could not read metadata item') - cpdef void sync(self): + self.data_in_memory = [] + self.open_chunks = {} # tp.Dict[int, Chunk] + self.last_synced = time.monotonic() + + cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1: + """ + Mark the series as synced up to particular timestamp + + :param timestamp: timestamp of the last synced entry + :type timestamp: int + """ + self.last_entry_synced = timestamp + self._sync_metadata() + return 0 + + cpdef int sync(self) except -1: """ Synchronize the data kept in the memory with these kept on disk """ + cdef: + unsigned long long min_ts = self.data_in_memory[0][0] + str path = os.path.join(self.path, str(min_ts)) + with self.lock: + self.last_synced = time.monotonic() + if not self.data_in_memory: + return 0 + + chunk = create_chunk(path, self.data_in_memory) + self.chunks.append(chunk.min_ts) + self.data_in_memory = [] + self._sync_metadata() + return 0 + + cdef dict _get_metadata(self): + return { + 'last_entry_ts': self.last_entry_ts, + 'block_size': self.block_size, + 'max_entries_per_block': self.max_entries_per_block, + 'last_entry_synced': self.last_entry_synced, + 'interval_between_synces': self.interval_between_synces + } + + cpdef int _sync_metadata(self) except -1: + with open(os.path.join(self.path, METADATA_FILE_NAME), 'w') as f_out: + ujson.dump(self._get_metadata(), f_out) + return 0 + + cpdef int try_sync(self) except -1: + """ + Check if synchronization is necessary, and if so, perform it. + + Prefer this to :meth:`~tempsdb.series.Series.sync` + """ + if len(self.data_in_memory) == self.max_entries_per_block or \ + time.monotonic() - self.last_synced > self.interval_between_synces: + self.sync() + return 0 + + cpdef int put(self, unsigned long long timestamp, bytes data) except -1: + """ + Append an entry. + + :param timestamp: timestamp, must be larger than current last_entry_ts + :type timestamp: int + :param data: data to write + :type data: bytes + :raises ValueError: Timestamp not larger than previous timestamp or invalid block size + """ + if len(data) != self.block_size: + raise ValueError('Invalid block size') + if timestamp <= self.last_entry_ts: + raise ValueError('Timestamp not larger than previous timestamp') + + with self.lock: + self.data_in_memory.append((timestamp, data)) + self.last_entry_ts = timestamp + if len(self.data_in_memory) >= self.max_entries_per_block: + self.sync() + return 0 diff --git a/tests/test_db.py b/tests/test_db.py index 6186db7..5641357 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -1,10 +1,10 @@ import unittest -from tempsdb.chunks import create_chunk - class TestDB(unittest.TestCase): def test_chunk(self): + from tempsdb.chunks import create_chunk + data = [(0, b'ala '), (1, b'ma '), (4, b'kota')] chunk = create_chunk('chunk.db', data) self.assertEqual(chunk.min_ts, 0) diff --git a/unittest.Dockerfile b/unittest.Dockerfile new file mode 100644 index 0000000..cb64eb1 --- /dev/null +++ b/unittest.Dockerfile @@ -0,0 +1,8 @@ +FROM python:3.8 + +RUN pip install satella snakehouse nose2 + +ADD . /app +WORKDIR /app + +CMD ["python", "setup.py", "test"] -- GitLab