diff --git a/docs/exceptions.rst b/docs/exceptions.rst index 4d833a3c0f37105a8b916cdfe671b1523feaf45a..0ddde5531e56638b972c9ce88a63ccdf48509fc5 100644 --- a/docs/exceptions.rst +++ b/docs/exceptions.rst @@ -10,3 +10,5 @@ The exceptions that inherit from it are: .. autoclass:: tempsdb.exceptions.DoesNotExist .. autoclass:: tempsdb.exceptions.Corruption + +.. autoclass:: tempsdb.exceptions.InvalidState diff --git a/tempsdb/chunks.pxd b/tempsdb/chunks.pxd index 0c43076db8fe0344b9462acd95c8d8424a1566c3..5f9574e0d6a91158360a2cc4e7ae5e02dd9e17d0 100644 --- a/tempsdb/chunks.pxd +++ b/tempsdb/chunks.pxd @@ -7,10 +7,11 @@ cdef class Chunk: readonly unsigned long entries object file object mmap - bint closed + bint closed, writable + object write_lock cpdef void close(self) cpdef tuple get_piece_at(self, unsigned int index) - + cpdef int put(self, unsigned long long timestamp, bytes data) except -1 cpdef Chunk create_chunk(str path, list data) diff --git a/tempsdb/chunks.pyx b/tempsdb/chunks.pyx index 43fea54c14bd34cfe6313c91e495ca1c1cf132fa..7dba60504e49d99db0187d87ed4d214b5686c3b7 100644 --- a/tempsdb/chunks.pyx +++ b/tempsdb/chunks.pyx @@ -1,12 +1,13 @@ import os +import threading import typing as tp import struct import mmap -from .exceptions import Corruption +from .exceptions import Corruption, InvalidState -STRUCT_QQL = struct.Struct('>QQL') +STRUCT_L = struct.Struct('>L') STRUCT_Q = struct.Struct('>Q') -DEF HEADER_SIZE = 20 +DEF HEADER_SIZE = 4 DEF TIMESTAMP_SIZE = 8 @@ -14,7 +15,7 @@ cdef class Chunk: """ Represents a single chunk of time series. - This also implements an iterator interface. This will iterate with tp.Tuple[int, bytes]. + This also implements an iterator interface, and will iterate with tp.Tuple[int, bytes]. :param path: path to the chunk file :type path: str @@ -25,26 +26,58 @@ cdef class Chunk: :ivar block_size: size of the data entries (int) :ivar entries: amount of entries in this chunk (int) """ - def __init__(self, path: str): + def __init__(self, path: str, writable: bool = True): cdef: unsigned long long file_size = os.path.getsize(path) bytes b + self.writable = writable + self.write_lock = threading.Lock() self.closed = False self.path = path - self.file = open(self.path, 'rb') + self.file = open(self.path, 'rb+' if self.writable else 'rb') try: - self.mmap = mmap.mmap(self.file.fileno(), file_size, access=mmap.ACCESS_READ) + if self.writable: + self.mmap = mmap.mmap(self.file.fileno(), file_size) + else: + self.mmap = mmap.mmap(self.file.fileno(), file_size, access=mmap.ACCESS_READ) except OSError as e: self.file.close() self.closed = True raise Corruption(f'Empty chunk file!') try: - self.min_ts, self.max_ts, self.block_size = STRUCT_QQL.unpack(self.mmap[:HEADER_SIZE]) + self.block_size, = STRUCT_L.unpack(self.mmap[:HEADER_SIZE]) except struct.error: self.close() raise Corruption('Could not read the header of the chunk file %s' % (self.path, )) - print(f'Readed in {file_size} bytes bs={self.block_size}') self.entries = (file_size-HEADER_SIZE) // (self.block_size+TIMESTAMP_SIZE) + self.max_ts, = STRUCT_Q.unpack(self.mmap[-TIMESTAMP_SIZE-self.block_size:-self.block_size]) + + cpdef int put(self, unsigned long long timestamp, bytes data) except -1: + """ + Append a record to this chunk + + :param timestamp: timestamp of the entry + :type timestamp: int + :param data: data to write + :type data: bytes + :raises InvalidState: chunk is closed or not writable + :raises ValueError: invalid timestamp or data + """ + if self.closed or not self.writable: + raise InvalidState('chunk is closed') + if len(data) != self.block_size: + raise ValueError('data not equal in length to block size!') + if timestamp <= self.max_ts: + raise ValueError('invalid timestamp') + cdef bytearray data_to_write = bytearray(TIMESTAMP_SIZE+self.block_size) + data_to_write[0:TIMESTAMP_SIZE] = STRUCT_Q.pack(timestamp) + data_to_write[TIMESTAMP_SIZE:] = data + with self.write_lock: + self.file.seek(0, 2) + self.file.write(data_to_write) + self.entries += 1 + self.mmap.resize(self.entries*(8+self.block_size)+HEADER_SIZE) + return 0 def __iter__(self) -> tp.Iterator[tp.Tuple[int, bytes]]: cdef unsigned long i = 0 @@ -99,21 +132,13 @@ cpdef Chunk create_chunk(str path, list data): raise ValueError('Data is empty') file = open(path, 'wb') cdef: - unsigned long long min_ts = 0xFFFFFFFFFFFFFFFF - unsigned long long max_ts = 0 bytes b unsigned long long ts unsigned long block_size = len(data[0][1]) unsigned long long last_ts = 0 bint first_element = True - for ts, b in data: - if ts < min_ts: - min_ts = ts - elif ts > max_ts: - max_ts = ts - - file.write(STRUCT_QQL.pack(min_ts, max_ts, block_size)) + file.write(STRUCT_L.pack(block_size)) try: for ts, b in data: if not first_element: diff --git a/tempsdb/database.pxd b/tempsdb/database.pxd index dc550e891b90cadff8bdf928c7f1904f6fb2c761..dfa83dd10493a00909827938121ac991ee1f9fe3 100644 --- a/tempsdb/database.pxd +++ b/tempsdb/database.pxd @@ -1,3 +1,6 @@ cdef class Database: cdef: str path + bint closed + + cpdef void close(self) diff --git a/tempsdb/database.pyx b/tempsdb/database.pyx index df55dd42c6cd2d846834f91539f7eafdb7633939..bfbc12f9fa19f7e4d3c2d2f9d2155981558b8aee 100644 --- a/tempsdb/database.pyx +++ b/tempsdb/database.pyx @@ -4,4 +4,12 @@ cdef class Database: """ def __init__(self, path: str): self.path = path + self.closed = False + cpdef void close(self): + """ + Close this TempsDB database + """ + if self.closed: + return + self.closed = True diff --git a/tempsdb/exceptions.pyx b/tempsdb/exceptions.pyx index f90293ff47b95117b58dec73c4f0a79338f15d7c..538c08dc300709d2413aa8aa50e6e9b35547fbd9 100644 --- a/tempsdb/exceptions.pyx +++ b/tempsdb/exceptions.pyx @@ -8,3 +8,6 @@ class DoesNotExist(TempsDBError): class Corruption(TempsDBError): """Corruption was detected in the dataset""" + +class InvalidState(TempsDBError): + """An attempt was made to write to a resource that's closed""" diff --git a/tempsdb/series.pxd b/tempsdb/series.pxd index 7aefa9b80bd8781e9e8c492858ebfe8f6b85d9fe..6eeb69c096f295285bbffbc6d7cbea55265aedef 100644 --- a/tempsdb/series.pxd +++ b/tempsdb/series.pxd @@ -1,8 +1,11 @@ from .database cimport Database +from .chunks cimport Chunk + cdef class TimeSeries: cdef: - object lock + bint closed + object lock, fopen_lock str path Database parent str name @@ -15,8 +18,11 @@ cdef class TimeSeries: list chunks dict open_chunks list data_in_memory + Chunk last_chunk cdef dict _get_metadata(self) + cpdef void close(self) + cpdef Chunk open_chunk(self, unsigned long long name) cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1 cpdef int try_sync(self) except -1 cpdef int _sync_metadata(self) except -1 diff --git a/tempsdb/series.pyx b/tempsdb/series.pyx index 7d0c20209f5266d551934a34f55241192a9c7ede..924ec278c6621e5836360574533463826b7bb3c3 100644 --- a/tempsdb/series.pyx +++ b/tempsdb/series.pyx @@ -3,9 +3,9 @@ import time import ujson from satella.files import read_in_file -from .chunks cimport create_chunk +from .chunks cimport create_chunk, Chunk from .database cimport Database -from .exceptions import DoesNotExist, Corruption +from .exceptions import DoesNotExist, Corruption, InvalidState import os DEF METADATA_FILE_NAME = 'metadata.txt' @@ -20,28 +20,30 @@ cdef class TimeSeries: """ def __init__(self, parent: Database, name: str): self.lock = threading.Lock() + self.fopen_lock = threading.Lock() self.parent = parent self.name = name + self.closed = False if not os.path.isdir(self.parent.path, name): raise DoesNotExist('Chosen time series does not exist') self.path = os.path.join(self.parent.path, self.name) - - cdef str metadata_s = read_in_file(os.path.join(self.path, METADATA_FILE_NAME), + cdef: + str metadata_s = read_in_file(os.path.join(self.path, METADATA_FILE_NAME), 'utf-8', 'invalid json') - cdef dict metadata + dict metadata + list files = os.path.listdir(self.path) + set files_s = set(files) + str chunk try: metadata = ujson.loads(metadata_s) except ValueError: raise Corruption('Corrupted series') - cdef list files = os.path.listdir(self.path) - cdef set files_s = set(files) files_s.remove('metadata.txt') self.chunks = [] # type: tp.List[int] # sorted by ASC - cdef str chunk for chunk in files: try: self.chunks.append(int(chunk)) @@ -61,6 +63,40 @@ cdef class TimeSeries: self.data_in_memory = [] self.open_chunks = {} # tp.Dict[int, Chunk] self.last_synced = time.monotonic() + self.last_chunk = Chunk(os.path.join(self.path, str(max(self.chunks)))) + + cpdef Chunk open_chunk(self, unsigned long long name): + """ + Opens a provided chunk + + :param name: name of the chunk + :type name: int + :return: chunk + :rtype: Chunk + :raises DoesNotExist: chunk not found + :raises InvalidState: resource closed + """ + if self.closed: + raise InvalidState('Series is closed') + if name not in self.chunks: + raise DoesNotExist('Invalid chunk!') + with self.fopen_lock: + if name not in self.open_chunks: + self.open_chunks[name] = Chunk(os.path.join(self.path, str(name))) + return self.open_chunks[name] + + cpdef void close(self): + """ + Close the series. + + No further operations can be executed on it afterwards. + """ + if self.closed: + return + cdef Chunk chunk + for chunk in self.data_in_memory.values(): + chunk.close() + self.closed = True cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1: """ @@ -76,7 +112,11 @@ cdef class TimeSeries: cpdef int sync(self) except -1: """ Synchronize the data kept in the memory with these kept on disk + + :raises InvalidState: the resource is closed """ + if self.closed: + raise InvalidState('series is closed') cdef: unsigned long long min_ts = self.data_in_memory[0][0] str path = os.path.join(self.path, str(min_ts)) @@ -125,15 +165,23 @@ cdef class TimeSeries: :param data: data to write :type data: bytes :raises ValueError: Timestamp not larger than previous timestamp or invalid block size + :raises InvalidState: the resource is closed """ + if self.closed: + raise InvalidState('series is closed') if len(data) != self.block_size: raise ValueError('Invalid block size') if timestamp <= self.last_entry_ts: raise ValueError('Timestamp not larger than previous timestamp') with self.lock: - self.data_in_memory.append((timestamp, data)) + if len(self.last_chunk) >= self.max_entries_per_block: + self.last_chunk.close() + self.last_chunk = create_chunk(os.path.join(self.path, str(timestamp)), + [(timestamp, data)]) + else: + self.last_chunk.put(timestamp, data) + self.last_entry_ts = timestamp - if len(self.data_in_memory) >= self.max_entries_per_block: - self.sync() + return 0 diff --git a/tests/test_db.py b/tests/test_db.py index 58d72ec946846189ee86cd9ca74a8b36c20bc70a..6a63d632ec03136a3eacf2f5ecd28214636e0db5 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -1,3 +1,4 @@ +import os import unittest from tempsdb.chunks import create_chunk @@ -14,3 +15,6 @@ class TestDB(unittest.TestCase): self.assertEqual(chunk.get_piece_at(2), (4, b'kota')) self.assertEqual(len(chunk), 3) self.assertEqual(list(iter(chunk)), data) + chunk.put(5, b'test') + chunk.close() + self.assertEqual(os.path.getsize('chunk.db'), 4+4*12)