diff --git a/docs/index.rst b/docs/index.rst index 896d0bbb82a8e1cfbbcb2fde119859ea1ad7977a..3c84af43339a10699504f43ad7edae10624ab3de 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -13,7 +13,7 @@ Welcome to tempsdb's documentation! time-series exceptions -It tries to use mmap where possible, and in general be as zero-copy as possible (ie. the +It tries to use mmap for reads where possible, and in general be as zero-copy as possible (ie. the only time data is unserialized is when a particular entry is read). Stored time series with a 8-bit timestamp and a fixed length of data. diff --git a/requirements.txt b/requirements.txt index 450194ebdf9acd0013cdf36aa8cad4f00eff18b1..2fe238cfb6d36a0700aac897d9433cd41260142e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ satella ujson snakehouse +six diff --git a/tempsdb/chunks.pxd b/tempsdb/chunks.pxd index 7467316b7928a2c22da73494d6449eb6118ecffa..20d1bf80204f84c642385ca964788e63b206187f 100644 --- a/tempsdb/chunks.pxd +++ b/tempsdb/chunks.pxd @@ -1,5 +1,8 @@ +from .series cimport TimeSeries + cdef class Chunk: cdef: + TimeSeries parent readonly str path readonly unsigned long long min_ts readonly unsigned long long max_ts @@ -7,7 +10,8 @@ cdef class Chunk: readonly unsigned long entries object file object mmap - bint closed, writable + bint closed + readonly bint writable object write_lock cpdef void close(self) @@ -16,4 +20,4 @@ cdef class Chunk: cdef inline int length(self): return self.entries -cpdef Chunk create_chunk(str path, list data) +cpdef Chunk create_chunk(TimeSeries parent, str path, list data) diff --git a/tempsdb/chunks.pyx b/tempsdb/chunks.pyx index 180e6494073fca48808889f6d620c5a448bd84f7..726a05493640c27a1d39f62f1373ebc343b756be 100644 --- a/tempsdb/chunks.pyx +++ b/tempsdb/chunks.pyx @@ -4,6 +4,7 @@ import typing as tp import struct import mmap from .exceptions import Corruption, InvalidState, AlreadyExists +from .series cimport TimeSeries STRUCT_L = struct.Struct('>L') STRUCT_Q = struct.Struct('>Q') @@ -25,11 +26,13 @@ cdef class Chunk: :ivar max_ts: timestamp of the last entry stored (int) :ivar block_size: size of the data entries (int) :ivar entries: amount of entries in this chunk (int) + :ivar writable: is this chunk writable (bool) """ - def __init__(self, path: str, writable: bool = True): + def __init__(self, parent: tp.Optional[TimeSeries], path: str, writable: bool = True): cdef: unsigned long long file_size = os.path.getsize(path) bytes b + self.parent = parent self.writable = writable self.write_lock = threading.Lock() self.closed = False @@ -51,6 +54,7 @@ cdef class Chunk: raise Corruption('Could not read the header of the chunk file %s' % (self.path, )) self.entries = (file_size-HEADER_SIZE) // (self.block_size+TIMESTAMP_SIZE) self.max_ts, = STRUCT_Q.unpack(self.mmap[-TIMESTAMP_SIZE-self.block_size:-self.block_size]) + self.min_ts, = STRUCT_Q.unpack(self.mmap[HEADER_SIZE:HEADER_SIZE+TIMESTAMP_SIZE]) cpdef int put(self, unsigned long long timestamp, bytes data) except -1: """ @@ -69,14 +73,16 @@ cdef class Chunk: raise ValueError('data not equal in length to block size!') if timestamp <= self.max_ts: raise ValueError('invalid timestamp') + cdef bytearray data_to_write = bytearray(TIMESTAMP_SIZE+self.block_size) data_to_write[0:TIMESTAMP_SIZE] = STRUCT_Q.pack(timestamp) data_to_write[TIMESTAMP_SIZE:] = data with self.write_lock: self.file.seek(0, 2) self.file.write(data_to_write) + self.mmap.resize((self.entries+1)*(8+self.block_size)+HEADER_SIZE) self.entries += 1 - self.mmap.resize(self.entries*(8+self.block_size)+HEADER_SIZE) + self.max_ts = timestamp return 0 def __iter__(self) -> tp.Iterator[tp.Tuple[int, bytes]]: @@ -93,6 +99,10 @@ cdef class Chunk: """ if self.closed: return + if self.parent: + with self.parent.fopen_lock: + del self.parent.open_chunks[self.min_ts] + self.parent = None self.mmap.close() self.file.close() @@ -116,10 +126,12 @@ cdef class Chunk: return ts, self.mmap[starting_index+TIMESTAMP_SIZE:stopping_index] -cpdef Chunk create_chunk(str path, list data): +cpdef Chunk create_chunk(TimeSeries parent, str path, list data): """ Creates a new chunk on disk + :param parent: parent time series + :type parent: TimeSeries :param path: path to the new chunk file :type path: str :param data: data to write, list of tuple (timestamp, entry to write). @@ -158,5 +170,5 @@ cpdef Chunk create_chunk(str path, list data): os.unlink(path) raise file.close() - return Chunk(path) + return Chunk(parent, path) diff --git a/tempsdb/series.pxd b/tempsdb/series.pxd index 3fc5dbe11abfbf779948cf27d9ba2cb7c25fb5da..da4f3db42e87518a1ea0a5a7486b4b5e3aa198a5 100644 --- a/tempsdb/series.pxd +++ b/tempsdb/series.pxd @@ -1,4 +1,3 @@ -from .database cimport Database from .chunks cimport Chunk @@ -7,8 +6,6 @@ cdef class TimeSeries: bint closed object lock, fopen_lock str path - Database parent - str name unsigned int max_entries_per_chunk readonly unsigned long long last_entry_synced readonly unsigned int block_size @@ -26,5 +23,4 @@ cdef class TimeSeries: cpdef int put(self, unsigned long long timestamp, bytes data) except -1 cpdef int sync(self) except -1 -cpdef TimeSeries create_series(Database parent, str name, unsigned int block_size, - int max_entries_per_chunk) +cpdef TimeSeries create_series(str path, unsigned int block_size, int max_entries_per_chunk) diff --git a/tempsdb/series.pyx b/tempsdb/series.pyx index 425044eef47ac9f6c241fa9684e555413cbb2b1b..c0344884b5d1380e3640e08262baf14f42a594f5 100644 --- a/tempsdb/series.pyx +++ b/tempsdb/series.pyx @@ -15,27 +15,25 @@ cdef class TimeSeries: """ This is thread-safe - :ivar last_entry_ts: timestamp of the last entry added (int) + :ivar last_entry_ts: timestamp of the last entry added or 0 if no entries yet (int) :ivar last_entry_synced: timestamp of the last synchronized entry (int) :ivar block_size: size of the writable block of data """ - def __init__(self, parent: Database, name: str): + def __init__(self, path: str): self.lock = threading.Lock() self.fopen_lock = threading.Lock() - self.parent = parent - self.name = name self.closed = False - if not os.path.isdir(self.parent.path, name): - raise DoesNotExist('Chosen time series does not exist') + self.path = path - self.path = os.path.join(self.parent.path, self.name) + if not os.path.isdir(self.path): + raise DoesNotExist('Chosen time series does not exist') cdef: str metadata_s = read_in_file(os.path.join(self.path, METADATA_FILE_NAME), 'utf-8', 'invalid json') dict metadata - list files = os.path.listdir(self.path) + list files = os.listdir(self.path) set files_s = set(files) str chunk try: @@ -44,13 +42,12 @@ cdef class TimeSeries: raise Corruption('Corrupted series') self.open_chunks = {} # tp.Dict[int, Chunk] - self.last_synced = time.monotonic() files_s.remove('metadata.txt') if not files_s: self.last_chunk = None - self.open_chunks = {} self.chunks = [] + self.last_entry_ts = 0 else: self.chunks = [] # type: tp.List[int] # sorted by ASC for chunk in files: @@ -61,14 +58,15 @@ cdef class TimeSeries: self.chunks.sort() try: - self.last_entry_ts = metadata['last_entry_ts'] self.block_size = metadata['block_size'] self.max_entries_per_chunk = metadata['max_entries_per_chunk'] self.last_entry_synced = metadata['last_entry_synced'] except KeyError: raise Corruption('Could not read metadata item') - self.last_chunk = Chunk(os.path.join(self.path, str(max(self.chunks)))) + self.last_chunk = Chunk(self, os.path.join(self.path, str(max(self.chunks)))) + self.open_chunks[self.last_chunk.min_ts] = self.last_chunk + self.last_entry_ts = self.last_chunk.max_ts cpdef Chunk open_chunk(self, unsigned long long name): """ @@ -87,7 +85,7 @@ cdef class TimeSeries: raise DoesNotExist('Invalid chunk!') with self.fopen_lock: if name not in self.open_chunks: - self.open_chunks[name] = Chunk(os.path.join(self.path, str(name))) + self.open_chunks[name] = Chunk(self, os.path.join(self.path, str(name))) return self.open_chunks[name] cpdef void close(self): @@ -131,7 +129,6 @@ cdef class TimeSeries: cdef dict _get_metadata(self): return { - 'last_entry_ts': self.last_entry_ts, 'block_size': self.block_size, 'max_entries_per_chunk': self.max_entries_per_chunk, 'last_entry_synced': self.last_entry_synced @@ -157,11 +154,11 @@ cdef class TimeSeries: with self.lock: if self.last_chunk is None: - self.last_chunk = create_chunk(os.path.join(self.path, str(timestamp)), + self.last_chunk = create_chunk(self, os.path.join(self.path, str(timestamp)), [(timestamp, data)]) self.open_chunks[timestamp] = self.last_chunk elif self.last_chunk.length() >= self.max_entries_per_chunk: - self.last_chunk = create_chunk(os.path.join(self.path, str(timestamp)), + self.last_chunk = create_chunk(self, os.path.join(self.path, str(timestamp)), [(timestamp, data)]) self.chunks.append(timestamp) else: @@ -179,19 +176,17 @@ cdef class TimeSeries: shutil.rmtree(self.path) -cpdef TimeSeries create_series(Database parent, str name, unsigned int block_size, +cpdef TimeSeries create_series(str path, unsigned int block_size, int max_entries_per_chunk): - cdef path = os.path.join(parent.path, name) if os.path.exists(path): raise AlreadyExists('This series already exists!') os.mkdir(path) with open(os.path.join(path, METADATA_FILE_NAME), 'w') as f_out: ujson.dump({ - 'last_entry_ts': 0, 'block_size': block_size, 'max_entries_per_chunk': max_entries_per_chunk, 'last_entry_synced': 0 }, f_out ) - return TimeSeries(parent, name) + return TimeSeries(path) diff --git a/tests/test_db.py b/tests/test_db.py index 1f8b5b524989b057430452e3307d42f7669b8636..7c5bbd7da533324d5c005225316abaef680631e1 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -3,11 +3,15 @@ import unittest from tempsdb.chunks import create_chunk from tempsdb.series import create_series + class TestDB(unittest.TestCase): + def test_create_series(self): + series = create_series('test', 8, 10) + def test_chunk(self): data = [(0, b'ala '), (1, b'ma '), (4, b'kota')] - chunk = create_chunk('chunk.db', data) + chunk = create_chunk(None, 'chunk.db', data) self.assertEqual(chunk.min_ts, 0) self.assertEqual(chunk.max_ts, 4) self.assertEqual(chunk.block_size, 4)