diff --git a/.circleci/config.yml b/.circleci/config.yml index 933e85d60092d6b3e49a4b979353527cd4e49949..0a9101cb6e7f1f175c74779ce4451c00ca5320bb 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -14,12 +14,10 @@ jobs: - run: command: | sudo pip install satella snakehouse - sudo python setup.py install - sudo pip install pytest-xdist pytest-cov pytest pytest-forked pluggy py mock name: Install necessary modules - run: command: | - pytest + sudo python setup.py test name: Test workflows: diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000000000000000000000000000000000000..eb2762e9d02a4f5f7e09170a16017dbfc72df62b --- /dev/null +++ b/.dockerignore @@ -0,0 +1,3 @@ +.git +.circleci +docs diff --git a/.gitignore b/.gitignore index 626a3d1a702e6fe01eff924adf63de3592a9bb75..e51a1344b61ddd49a377c83403342bcd2fb2adbe 100644 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,8 @@ share/python-wheels/ *.egg MANIFEST docs/_build +*.c +*.h # PyInstaller # Usually these files are written by a python script from a template diff --git a/setup.cfg b/setup.cfg index d18f3d6eedcd2fb09570acd65aa756b40b39b6e2..c9fdb14cfd3e327a13a6f5620f583084ad59ea31 100644 --- a/setup.cfg +++ b/setup.cfg @@ -32,7 +32,3 @@ max-line-length = 100 [pep8] max-line-length = 100 - -[bdist_wheel] -universal = 1 - diff --git a/setup.py b/setup.py index 056b905768a2590bbe5f6d5e7e26cb413497a2cd..4bfc77d89d856a93a8887bd4e44ebaef2e33cc81 100644 --- a/setup.py +++ b/setup.py @@ -18,6 +18,5 @@ setup(name='tempsdb', compiler_directives={ 'language_level': '3', }), - python_requires='!=2.7.*,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,!=3.7.*', - zip_safe=False + python_requires='!=2.7.*,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,!=3.7.*' ) diff --git a/tempsdb/__init__.py b/tempsdb/__init__.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..93a0b2e8f779eb4b71867716e1321d66782a5d9b 100644 --- a/tempsdb/__init__.py +++ b/tempsdb/__init__.py @@ -0,0 +1,2 @@ +from tempsdb.__bootstrap__ import bootstrap_cython_submodules +bootstrap_cython_submodules() diff --git a/tempsdb/chunks.pyx b/tempsdb/chunks.pyx index 3640b7ac2bf7b0b7a7345f76d0ea69b730634acd..524f5a6b00e0b3ca03eb55683f326ec8c9db4c74 100644 --- a/tempsdb/chunks.pyx +++ b/tempsdb/chunks.pyx @@ -19,6 +19,7 @@ cdef class Chunk: :ivar min_ts: timestamp of the first entry stored (int) :ivar max_ts: timestamp of the last entry stored (int) :ivar block_size: size of the data entries (int) + :ivar entries: amount of entries in this chunk (int) """ def __init__(self, path: str): self.closed = False @@ -31,7 +32,7 @@ cdef class Chunk: raise Corruption('Empty chunk file!') try: self.min_ts, self.max_ts, self.block_size = STRUCT_QQL.unpack(self.file.read(16)) - except IOError: + except struct.error: raise Corruption('Could not read the header of the chunk file %s' % (self.path, )) self.pointer = 8 self.entries = (os.path.getsize(self.path)-20) / self.block_size @@ -65,16 +66,17 @@ cdef class Chunk: return ts, self.mmap[starting_index+8:stopping_index] - cpdef Chunk create_chunk(str path, list data): """ Creates a new chunk on disk :param path: path to the new chunk file :type path: str - :param data: data to write, must be nonempty + :param data: data to write, list of tuple (timestamp, entry to write). + Must be nonempty and sorted by timestamp. :type data: tp.List[tp.Tuple[int, bytes]] - :raises ValueError: entries in data were not of equal size, or data was empty + :raises ValueError: entries in data were not of equal size, or data was empty or data + was not sorted by timestamp or same timestamp appeared twice """ if not data: raise ValueError('Data is empty') @@ -85,20 +87,27 @@ cpdef Chunk create_chunk(str path, list data): bytes b unsigned long long ts unsigned long block_size = len(data[0][1]) + unsigned long long last_ts = 0 + bint first_element = True for ts, b in data: if ts < min_ts: min_ts = ts elif ts > max_ts: max_ts = ts - file.write(STRUCT_QQL.pack(min_ts, max_ts, len(data[0][1]))) + file.write(STRUCT_QQL.pack(min_ts, max_ts, block_size)) try: for ts, b in data: + if not first_element: + if ts <= last_ts: + raise ValueError('Timestamp appeared twice or data was not sorted') if len(b) != block_size: raise ValueError('Block size has entries of not equal length') file.write(STRUCT_Q.pack(ts)) file.write(b) - file.close() + last_ts = ts + first_element = False + file.close() except ValueError: file.close() os.unlink(path) diff --git a/tempsdb/series.pxd b/tempsdb/series.pxd index f5678d3ec628921e4776a51653383959e6ff40f5..7aefa9b80bd8781e9e8c492858ebfe8f6b85d9fe 100644 --- a/tempsdb/series.pxd +++ b/tempsdb/series.pxd @@ -2,11 +2,24 @@ from .database cimport Database cdef class TimeSeries: cdef: + object lock str path Database parent str name - int block_size - unsigned long long last_entry_ts + unsigned int max_entries_per_chunk + double last_synced + readonly double interval_between_synces + readonly unsigned long long last_entry_synced + readonly unsigned int block_size + readonly unsigned long long last_entry_ts list chunks + dict open_chunks + list data_in_memory + + cdef dict _get_metadata(self) + cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1 + cpdef int try_sync(self) except -1 + cpdef int _sync_metadata(self) except -1 + cpdef int put(self, unsigned long long timestamp, bytes data) except -1 + cpdef int sync(self) except -1 - cpdef void sync(self) diff --git a/tempsdb/series.pyx b/tempsdb/series.pyx index 79fddcb8969ec87f8cd1e4620f665dcbb25c048a..d87eba5bd2cf72517f9f2d5bce441f49efebb112 100644 --- a/tempsdb/series.pyx +++ b/tempsdb/series.pyx @@ -1,14 +1,26 @@ +import threading +import time + import ujson from satella.files import read_in_file -from .chunks cimport Chunk +from .chunks cimport Chunk, create_chunk from .database cimport Database from .exceptions import DoesNotExist, Corruption import os +DEF METADATA_FILE_NAME = 'metadata.txt' cdef class TimeSeries: + """ + This is thread-safe + + :ivar last_entry_ts: timestamp of the last entry added (int) + :ivar last_entry_synced: timestamp of the last synchronized entry (int) + :ivar block_size: size of the writable block of data + """ def __init__(self, parent: Database, name: str): + self.lock = threading.Lock() self.parent = parent self.name = name @@ -18,7 +30,7 @@ cdef class TimeSeries: self.path = os.path.join(self.parent.path, self.name) - cdef str metadata_s = read_in_file(os.path.join(self.path, 'metadata.txt'), + cdef str metadata_s = read_in_file(os.path.join(self.path, METADATA_FILE_NAME), 'utf-8', 'invalid json') cdef dict metadata try: @@ -29,18 +41,100 @@ cdef class TimeSeries: cdef list files = os.path.listdir(self.path) cdef set files_s = set(files) files_s.remove('metadata.txt') - self.chunks = [] + self.chunks = [] # type: tp.List[int] # sorted by ASC cdef str chunk - for chunk in files_s: + for chunk in files: try: self.chunks.append(int(chunk)) except ValueError: raise Corruption('Detected invalid file "%s"' % (chunk, )) - self.last_entry_ts = metadata['last_entry_ts'] - self.block_size = metadata['block_size'] + self.chunks.sort() + try: + self.last_entry_ts = metadata['last_entry_ts'] + self.block_size = metadata['block_size'] + self.max_entries_per_block = metadata['max_entries_per_block'] + self.last_entry_synced = metadata['last_entry_synced'] + self.interval_between_synces = metadata['interval_between_synces'] + except KeyError: + raise Corruption('Could not read metadata item') - cpdef void sync(self): + self.data_in_memory = [] + self.open_chunks = {} # tp.Dict[int, Chunk] + self.last_synced = time.monotonic() + + cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1: + """ + Mark the series as synced up to particular timestamp + + :param timestamp: timestamp of the last synced entry + :type timestamp: int + """ + self.last_entry_synced = timestamp + self._sync_metadata() + return 0 + + cpdef int sync(self) except -1: """ Synchronize the data kept in the memory with these kept on disk """ + cdef: + unsigned long long min_ts = self.data_in_memory[0][0] + str path = os.path.join(self.path, str(min_ts)) + with self.lock: + self.last_synced = time.monotonic() + if not self.data_in_memory: + return 0 + + chunk = create_chunk(path, self.data_in_memory) + self.chunks.append(chunk.min_ts) + self.data_in_memory = [] + self._sync_metadata() + return 0 + + cdef dict _get_metadata(self): + return { + 'last_entry_ts': self.last_entry_ts, + 'block_size': self.block_size, + 'max_entries_per_block': self.max_entries_per_block, + 'last_entry_synced': self.last_entry_synced, + 'interval_between_synces': self.interval_between_synces + } + + cpdef int _sync_metadata(self) except -1: + with open(os.path.join(self.path, METADATA_FILE_NAME), 'w') as f_out: + ujson.dump(self._get_metadata(), f_out) + return 0 + + cpdef int try_sync(self) except -1: + """ + Check if synchronization is necessary, and if so, perform it. + + Prefer this to :meth:`~tempsdb.series.Series.sync` + """ + if len(self.data_in_memory) == self.max_entries_per_block or \ + time.monotonic() - self.last_synced > self.interval_between_synces: + self.sync() + return 0 + + cpdef int put(self, unsigned long long timestamp, bytes data) except -1: + """ + Append an entry. + + :param timestamp: timestamp, must be larger than current last_entry_ts + :type timestamp: int + :param data: data to write + :type data: bytes + :raises ValueError: Timestamp not larger than previous timestamp or invalid block size + """ + if len(data) != self.block_size: + raise ValueError('Invalid block size') + if timestamp <= self.last_entry_ts: + raise ValueError('Timestamp not larger than previous timestamp') + + with self.lock: + self.data_in_memory.append((timestamp, data)) + self.last_entry_ts = timestamp + if len(self.data_in_memory) >= self.max_entries_per_block: + self.sync() + return 0 diff --git a/tests/test_db.py b/tests/test_db.py index 6186db71e54c396da08daec6dd584fefbe5271ec..56413578c53be2d16df3ea72c7729c2183273795 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -1,10 +1,10 @@ import unittest -from tempsdb.chunks import create_chunk - class TestDB(unittest.TestCase): def test_chunk(self): + from tempsdb.chunks import create_chunk + data = [(0, b'ala '), (1, b'ma '), (4, b'kota')] chunk = create_chunk('chunk.db', data) self.assertEqual(chunk.min_ts, 0) diff --git a/unittest.Dockerfile b/unittest.Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..cb64eb186826be299fcfd363890535d90601d8c4 --- /dev/null +++ b/unittest.Dockerfile @@ -0,0 +1,8 @@ +FROM python:3.8 + +RUN pip install satella snakehouse nose2 + +ADD . /app +WORKDIR /app + +CMD ["python", "setup.py", "test"]