diff --git a/README.md b/README.md index 53220703ad329204a79517286c53612f73859f53..a4ff7eed3d752edbec640e44260ce56fc9a69e5f 100644 --- a/README.md +++ b/README.md @@ -7,4 +7,7 @@ [](https://codeclimate.com/github/smok-serwis/tempsdb/maintainability) [](https://app.circleci.com/pipelines/github/smok-serwis/tempsdb) -Embedded Cython library for time series that you need to upload somewhere +Embedded Cython library for time series that you need to upload somewhere. + +Stored time series with a 8-bit timestamp and a fixed length of data. +So no variable encoding for you! diff --git a/docs/exceptions.rst b/docs/exceptions.rst index 0ddde5531e56638b972c9ce88a63ccdf48509fc5..f42a9204878b9686209f89891a481b978231d624 100644 --- a/docs/exceptions.rst +++ b/docs/exceptions.rst @@ -12,3 +12,5 @@ The exceptions that inherit from it are: .. autoclass:: tempsdb.exceptions.Corruption .. autoclass:: tempsdb.exceptions.InvalidState + +.. autoclass:: tempsdb.exceptions.AlreadyExists diff --git a/docs/index.rst b/docs/index.rst index 5e884362a0addcbf46684fb3d3df31eabd219100..896d0bbb82a8e1cfbbcb2fde119859ea1ad7977a 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -16,6 +16,9 @@ Welcome to tempsdb's documentation! It tries to use mmap where possible, and in general be as zero-copy as possible (ie. the only time data is unserialized is when a particular entry is read). +Stored time series with a 8-bit timestamp and a fixed length of data. +So no variable encoding for you! + Indices and tables ================== diff --git a/requirements.txt b/requirements.txt index 4a937e6d111e48f2328331fe823927f0710727a4..450194ebdf9acd0013cdf36aa8cad4f00eff18b1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ satella ujson +snakehouse diff --git a/tempsdb/chunks.pxd b/tempsdb/chunks.pxd index 5f9574e0d6a91158360a2cc4e7ae5e02dd9e17d0..7467316b7928a2c22da73494d6449eb6118ecffa 100644 --- a/tempsdb/chunks.pxd +++ b/tempsdb/chunks.pxd @@ -13,5 +13,7 @@ cdef class Chunk: cpdef void close(self) cpdef tuple get_piece_at(self, unsigned int index) cpdef int put(self, unsigned long long timestamp, bytes data) except -1 + cdef inline int length(self): + return self.entries cpdef Chunk create_chunk(str path, list data) diff --git a/tempsdb/chunks.pyx b/tempsdb/chunks.pyx index 7dba60504e49d99db0187d87ed4d214b5686c3b7..180e6494073fca48808889f6d620c5a448bd84f7 100644 --- a/tempsdb/chunks.pyx +++ b/tempsdb/chunks.pyx @@ -3,7 +3,7 @@ import threading import typing as tp import struct import mmap -from .exceptions import Corruption, InvalidState +from .exceptions import Corruption, InvalidState, AlreadyExists STRUCT_L = struct.Struct('>L') STRUCT_Q = struct.Struct('>Q') @@ -85,7 +85,7 @@ cdef class Chunk: yield self.get_piece_at(i) def __len__(self): - return self.entries + return self.length() cpdef void close(self): """ @@ -126,8 +126,11 @@ cpdef Chunk create_chunk(str path, list data): Must be nonempty and sorted by timestamp. :type data: tp.List[tp.Tuple[int, bytes]] :raises ValueError: entries in data were not of equal size, or data was empty or data - was not sorted by timestamp or same timestamp appeared twice + was not sorted by timestamp or same timestamp appeared twice + :raises AlreadyExists: chunk already exists """ + if os.path.exists(path): + raise AlreadyExists('chunk already exists!') if not data: raise ValueError('Data is empty') file = open(path, 'wb') diff --git a/tempsdb/database.pxd b/tempsdb/database.pxd index dfa83dd10493a00909827938121ac991ee1f9fe3..0de222550691bd9ce779f98ef5288081d701cbe5 100644 --- a/tempsdb/database.pxd +++ b/tempsdb/database.pxd @@ -1,6 +1,11 @@ +from .series cimport TimeSeries + + cdef class Database: cdef: str path bint closed + object lock cpdef void close(self) + cpdef TimeSeries get_series(self, str name) diff --git a/tempsdb/database.pyx b/tempsdb/database.pyx index bfbc12f9fa19f7e4d3c2d2f9d2155981558b8aee..676689e89946158c4571b7b48179337cd7ada736 100644 --- a/tempsdb/database.pyx +++ b/tempsdb/database.pyx @@ -1,3 +1,10 @@ +import os +import threading + +from tempsdb.exceptions import DoesNotExist +from .series cimport TimeSeries + + cdef class Database: """ A basic TempsDB object. @@ -5,6 +12,20 @@ cdef class Database: def __init__(self, path: str): self.path = path self.closed = False + self.open_series = {} + self.lock = threading.Lock() + + cpdef TimeSeries get_series(self, name: str): + cdef TimeSeries result + if name in self.open_series: + result = self.open_series[name] + else: + with self.lock: + if not os.path.isdir(os.path.join(self.path, name)): + raise DoesNotExist('series %s does not exist' % (name, )) + self.open_series[name] = result = TimeSeries(self, name) + return result + cpdef void close(self): """ @@ -12,4 +33,7 @@ cdef class Database: """ if self.closed: return + cdef TimeSeries series + for series in self.open_series.values(): + series.close() self.closed = True diff --git a/tempsdb/exceptions.pyx b/tempsdb/exceptions.pyx index 538c08dc300709d2413aa8aa50e6e9b35547fbd9..8ad47d9b759d3108a0ee86461c65463c83457a1b 100644 --- a/tempsdb/exceptions.pyx +++ b/tempsdb/exceptions.pyx @@ -9,5 +9,10 @@ class DoesNotExist(TempsDBError): class Corruption(TempsDBError): """Corruption was detected in the dataset""" + class InvalidState(TempsDBError): """An attempt was made to write to a resource that's closed""" + + +class AlreadyExists(TempsDBError): + """Provided object already exists""" diff --git a/tempsdb/series.pxd b/tempsdb/series.pxd index 6eeb69c096f295285bbffbc6d7cbea55265aedef..3fc5dbe11abfbf779948cf27d9ba2cb7c25fb5da 100644 --- a/tempsdb/series.pxd +++ b/tempsdb/series.pxd @@ -10,8 +10,6 @@ cdef class TimeSeries: Database parent str name unsigned int max_entries_per_chunk - double last_synced - readonly double interval_between_synces readonly unsigned long long last_entry_synced readonly unsigned int block_size readonly unsigned long long last_entry_ts @@ -20,12 +18,13 @@ cdef class TimeSeries: list data_in_memory Chunk last_chunk + cpdef int delete(self) except -1 cdef dict _get_metadata(self) cpdef void close(self) cpdef Chunk open_chunk(self, unsigned long long name) cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1 - cpdef int try_sync(self) except -1 - cpdef int _sync_metadata(self) except -1 cpdef int put(self, unsigned long long timestamp, bytes data) except -1 cpdef int sync(self) except -1 +cpdef TimeSeries create_series(Database parent, str name, unsigned int block_size, + int max_entries_per_chunk) diff --git a/tempsdb/series.pyx b/tempsdb/series.pyx index 924ec278c6621e5836360574533463826b7bb3c3..425044eef47ac9f6c241fa9684e555413cbb2b1b 100644 --- a/tempsdb/series.pyx +++ b/tempsdb/series.pyx @@ -1,3 +1,4 @@ +import shutil import threading import time import ujson @@ -5,7 +6,7 @@ from satella.files import read_in_file from .chunks cimport create_chunk, Chunk from .database cimport Database -from .exceptions import DoesNotExist, Corruption, InvalidState +from .exceptions import DoesNotExist, Corruption, InvalidState, AlreadyExists import os DEF METADATA_FILE_NAME = 'metadata.txt' @@ -42,28 +43,32 @@ cdef class TimeSeries: except ValueError: raise Corruption('Corrupted series') + self.open_chunks = {} # tp.Dict[int, Chunk] + self.last_synced = time.monotonic() + files_s.remove('metadata.txt') - self.chunks = [] # type: tp.List[int] # sorted by ASC - for chunk in files: + if not files_s: + self.last_chunk = None + self.open_chunks = {} + self.chunks = [] + else: + self.chunks = [] # type: tp.List[int] # sorted by ASC + for chunk in files: + try: + self.chunks.append(int(chunk)) + except ValueError: + raise Corruption('Detected invalid file "%s"' % (chunk, )) + + self.chunks.sort() try: - self.chunks.append(int(chunk)) - except ValueError: - raise Corruption('Detected invalid file "%s"' % (chunk, )) + self.last_entry_ts = metadata['last_entry_ts'] + self.block_size = metadata['block_size'] + self.max_entries_per_chunk = metadata['max_entries_per_chunk'] + self.last_entry_synced = metadata['last_entry_synced'] + except KeyError: + raise Corruption('Could not read metadata item') - self.chunks.sort() - try: - self.last_entry_ts = metadata['last_entry_ts'] - self.block_size = metadata['block_size'] - self.max_entries_per_block = metadata['max_entries_per_block'] - self.last_entry_synced = metadata['last_entry_synced'] - self.interval_between_synces = metadata['interval_between_synces'] - except KeyError: - raise Corruption('Could not read metadata item') - - self.data_in_memory = [] - self.open_chunks = {} # tp.Dict[int, Chunk] - self.last_synced = time.monotonic() - self.last_chunk = Chunk(os.path.join(self.path, str(max(self.chunks)))) + self.last_chunk = Chunk(os.path.join(self.path, str(max(self.chunks)))) cpdef Chunk open_chunk(self, unsigned long long name): """ @@ -106,7 +111,7 @@ cdef class TimeSeries: :type timestamp: int """ self.last_entry_synced = timestamp - self._sync_metadata() + self.sync() return 0 cpdef int sync(self) except -1: @@ -120,42 +125,18 @@ cdef class TimeSeries: cdef: unsigned long long min_ts = self.data_in_memory[0][0] str path = os.path.join(self.path, str(min_ts)) - with self.lock: - self.last_synced = time.monotonic() - if not self.data_in_memory: - return 0 - - chunk = create_chunk(path, self.data_in_memory) - self.chunks.append(chunk.min_ts) - self.data_in_memory = [] - self._sync_metadata() + with self.lock, open(os.path.join(self.path, METADATA_FILE_NAME), 'w') as f_out: + ujson.dump(self._get_metadata(), f_out) return 0 cdef dict _get_metadata(self): return { 'last_entry_ts': self.last_entry_ts, 'block_size': self.block_size, - 'max_entries_per_block': self.max_entries_per_block, - 'last_entry_synced': self.last_entry_synced, - 'interval_between_synces': self.interval_between_synces + 'max_entries_per_chunk': self.max_entries_per_chunk, + 'last_entry_synced': self.last_entry_synced } - cpdef int _sync_metadata(self) except -1: - with open(os.path.join(self.path, METADATA_FILE_NAME), 'w') as f_out: - ujson.dump(self._get_metadata(), f_out) - return 0 - - cpdef int try_sync(self) except -1: - """ - Check if synchronization is necessary, and if so, perform it. - - Prefer this to :meth:`~tempsdb.series.Series.sync` - """ - if len(self.data_in_memory) == self.max_entries_per_block or \ - time.monotonic() - self.last_synced > self.interval_between_synces: - self.sync() - return 0 - cpdef int put(self, unsigned long long timestamp, bytes data) except -1: """ Append an entry. @@ -175,13 +156,42 @@ cdef class TimeSeries: raise ValueError('Timestamp not larger than previous timestamp') with self.lock: - if len(self.last_chunk) >= self.max_entries_per_block: - self.last_chunk.close() + if self.last_chunk is None: self.last_chunk = create_chunk(os.path.join(self.path, str(timestamp)), [(timestamp, data)]) + self.open_chunks[timestamp] = self.last_chunk + elif self.last_chunk.length() >= self.max_entries_per_chunk: + self.last_chunk = create_chunk(os.path.join(self.path, str(timestamp)), + [(timestamp, data)]) + self.chunks.append(timestamp) else: self.last_chunk.put(timestamp, data) self.last_entry_ts = timestamp return 0 + + cpdef int delete(self) except -1: + """ + Erase this series from the disk + """ + self.close() + shutil.rmtree(self.path) + + +cpdef TimeSeries create_series(Database parent, str name, unsigned int block_size, + int max_entries_per_chunk): + cdef path = os.path.join(parent.path, name) + if os.path.exists(path): + raise AlreadyExists('This series already exists!') + + os.mkdir(path) + with open(os.path.join(path, METADATA_FILE_NAME), 'w') as f_out: + ujson.dump({ + 'last_entry_ts': 0, + 'block_size': block_size, + 'max_entries_per_chunk': max_entries_per_chunk, + 'last_entry_synced': 0 + }, f_out + ) + return TimeSeries(parent, name) diff --git a/tests/test_db.py b/tests/test_db.py index 6a63d632ec03136a3eacf2f5ecd28214636e0db5..1f8b5b524989b057430452e3307d42f7669b8636 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -1,9 +1,10 @@ import os import unittest from tempsdb.chunks import create_chunk - +from tempsdb.series import create_series class TestDB(unittest.TestCase): + def test_chunk(self): data = [(0, b'ala '), (1, b'ma '), (4, b'kota')] chunk = create_chunk('chunk.db', data) diff --git a/unittest.Dockerfile b/unittest.Dockerfile index 610937e4c063e23c06aef87c709bc26b76ca20dc..bfcae7c7a5e3c6197e16d7152847bcc8b40c513d 100644 --- a/unittest.Dockerfile +++ b/unittest.Dockerfile @@ -1,6 +1,6 @@ FROM python:3.8 -RUN pip install satella snakehouse nose2 wheel +RUN pip install satella snakehouse nose2 wheel ujson ADD . /app WORKDIR /app