From 8a7b8a1cd36e9db2f20600661fadd4703e59a25e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Sat, 28 Nov 2020 17:04:51 +0100 Subject: [PATCH] write chunk --- docs/exceptions.rst | 12 +++++ docs/index.rst | 5 ++ docs/time-series.rst | 4 ++ requirements.txt | 1 + setup.py | 2 +- tempsdb/chunks.pxd | 17 +++++++ tempsdb/chunks.pyx | 107 +++++++++++++++++++++++++++++++++++++++++ tempsdb/exceptions.pyx | 10 ++++ tempsdb/series.pxd | 6 +++ tempsdb/series.pyx | 40 +++++++++++++++ tests/test_db.py | 13 ++++- 11 files changed, 214 insertions(+), 3 deletions(-) create mode 100644 docs/exceptions.rst create mode 100644 docs/time-series.rst create mode 100644 tempsdb/chunks.pxd create mode 100644 tempsdb/chunks.pyx create mode 100644 tempsdb/exceptions.pyx diff --git a/docs/exceptions.rst b/docs/exceptions.rst new file mode 100644 index 0000000..4d833a3 --- /dev/null +++ b/docs/exceptions.rst @@ -0,0 +1,12 @@ +Exceptions +========== + +The base TempsDB exception is + +.. autoclass:: tempsdb.exceptions.TempsDBError + +The exceptions that inherit from it are: + +.. autoclass:: tempsdb.exceptions.DoesNotExist + +.. autoclass:: tempsdb.exceptions.Corruption diff --git a/docs/index.rst b/docs/index.rst index 3c8d264..5e88436 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -10,6 +10,11 @@ Welcome to tempsdb's documentation! :maxdepth: 2 :caption: Contents: + time-series + exceptions + +It tries to use mmap where possible, and in general be as zero-copy as possible (ie. the +only time data is unserialized is when a particular entry is read). Indices and tables diff --git a/docs/time-series.rst b/docs/time-series.rst new file mode 100644 index 0000000..8ac5564 --- /dev/null +++ b/docs/time-series.rst @@ -0,0 +1,4 @@ +Time series +=========== + +The name of the series must be a valid name for a directory on your filesystem. diff --git a/requirements.txt b/requirements.txt index 35cb25c..4a937e6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ satella +ujson diff --git a/setup.py b/setup.py index 89b3186..056b905 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ def find_pyx(*path) -> tp.List[str]: setup(name='tempsdb', version='0.1_a1', packages=find_packages(include=['tempsdb', 'tempsdb.*']), - install_requires=['satella'], + install_requires=['satella', 'ujson'], ext_modules=build([Multibuild('tempsdb', find_pyx('tempsdb')), ], compiler_directives={ 'language_level': '3', diff --git a/tempsdb/chunks.pxd b/tempsdb/chunks.pxd new file mode 100644 index 0000000..eb0a6a0 --- /dev/null +++ b/tempsdb/chunks.pxd @@ -0,0 +1,17 @@ +cdef class Chunk: + cdef: + readonly str path + readonly unsigned long long min_ts + readonly unsigned long long max_ts + readonly unsigned long block_size + unsigned long pointer + readonly unsigned long entries + object file + object mmap + bint closed + + cpdef void close(self) + cpdef tuple get_piece_at(self, unsigned int index) + + +cpdef Chunk create_chunk(str path, list data) diff --git a/tempsdb/chunks.pyx b/tempsdb/chunks.pyx new file mode 100644 index 0000000..3640b7a --- /dev/null +++ b/tempsdb/chunks.pyx @@ -0,0 +1,107 @@ +import os +import typing as tp +import struct +import mmap +from .exceptions import Corruption + +STRUCT_QQL = struct.Struct('>QQL') +STRUCT_Q = struct.Struct('>Q') + + +cdef class Chunk: + """ + Represents a single chunk of time series + + :param path: path to the chunk file + :type path: str + + :ivar path: path to the chunk (str) + :ivar min_ts: timestamp of the first entry stored (int) + :ivar max_ts: timestamp of the last entry stored (int) + :ivar block_size: size of the data entries (int) + """ + def __init__(self, path: str): + self.closed = False + self.path = path + cdef bytes b + self.file = open(self.path, 'rb') + try: + self.mmap = mmap.mmap(self.file.fileno(), 0) + except OSError: + raise Corruption('Empty chunk file!') + try: + self.min_ts, self.max_ts, self.block_size = STRUCT_QQL.unpack(self.file.read(16)) + except IOError: + raise Corruption('Could not read the header of the chunk file %s' % (self.path, )) + self.pointer = 8 + self.entries = (os.path.getsize(self.path)-20) / self.block_size + + cpdef void close(self): + """ + Close the chunk and close the allocated resources + """ + if self.closed: + return + self.mmap.close() + self.file.close() + + def __del__(self): + self.close() + + cpdef tuple get_piece_at(self, unsigned int index): + """ + Return a piece of data at a particular index, numbered from 0 + + :return: at piece of data at given index + :rtype: tp.Tuple[int, bytes] + """ + if index >= self.entries: + raise IndexError('Index too large') + cdef: + unsigned long starting_index = 20 + index * self.block_size + unsigned long stopping_index = starting_index + self.block_size + bytes bytes_at = self.mmap[starting_index:stopping_index] + unsigned long long ts = STRUCT_Q.unpack(self.mmap[starting_index:starting_index+8])[0] + return ts, self.mmap[starting_index+8:stopping_index] + + + +cpdef Chunk create_chunk(str path, list data): + """ + Creates a new chunk on disk + + :param path: path to the new chunk file + :type path: str + :param data: data to write, must be nonempty + :type data: tp.List[tp.Tuple[int, bytes]] + :raises ValueError: entries in data were not of equal size, or data was empty + """ + if not data: + raise ValueError('Data is empty') + file = open(path, 'wb') + cdef: + unsigned long long min_ts = 0xFFFFFFFFFFFFFFFF + unsigned long long max_ts = 0 + bytes b + unsigned long long ts + unsigned long block_size = len(data[0][1]) + for ts, b in data: + if ts < min_ts: + min_ts = ts + elif ts > max_ts: + max_ts = ts + + file.write(STRUCT_QQL.pack(min_ts, max_ts, len(data[0][1]))) + try: + for ts, b in data: + if len(b) != block_size: + raise ValueError('Block size has entries of not equal length') + file.write(STRUCT_Q.pack(ts)) + file.write(b) + file.close() + except ValueError: + file.close() + os.unlink(path) + raise + return Chunk(path) + diff --git a/tempsdb/exceptions.pyx b/tempsdb/exceptions.pyx new file mode 100644 index 0000000..f90293f --- /dev/null +++ b/tempsdb/exceptions.pyx @@ -0,0 +1,10 @@ +class TempsDBError(Exception): + """Base class for TempsDB errors""" + + +class DoesNotExist(TempsDBError): + """The required resource does not exist""" + + +class Corruption(TempsDBError): + """Corruption was detected in the dataset""" diff --git a/tempsdb/series.pxd b/tempsdb/series.pxd index 533dfad..f5678d3 100644 --- a/tempsdb/series.pxd +++ b/tempsdb/series.pxd @@ -2,5 +2,11 @@ from .database cimport Database cdef class TimeSeries: cdef: + str path Database parent str name + int block_size + unsigned long long last_entry_ts + list chunks + + cpdef void sync(self) diff --git a/tempsdb/series.pyx b/tempsdb/series.pyx index 654b7e7..79fddcb 100644 --- a/tempsdb/series.pyx +++ b/tempsdb/series.pyx @@ -1,6 +1,46 @@ +import ujson +from satella.files import read_in_file + +from .chunks cimport Chunk from .database cimport Database +from .exceptions import DoesNotExist, Corruption +import os + cdef class TimeSeries: def __init__(self, parent: Database, name: str): self.parent = parent self.name = name + + if not os.path.isdir(self.parent.path, name): + raise DoesNotExist('Chosen time series does not exist') + + self.path = os.path.join(self.parent.path, self.name) + + + cdef str metadata_s = read_in_file(os.path.join(self.path, 'metadata.txt'), + 'utf-8', 'invalid json') + cdef dict metadata + try: + metadata = ujson.loads(metadata_s) + except ValueError: + raise Corruption('Corrupted series') + + cdef list files = os.path.listdir(self.path) + cdef set files_s = set(files) + files_s.remove('metadata.txt') + self.chunks = [] + cdef str chunk + for chunk in files_s: + try: + self.chunks.append(int(chunk)) + except ValueError: + raise Corruption('Detected invalid file "%s"' % (chunk, )) + + self.last_entry_ts = metadata['last_entry_ts'] + self.block_size = metadata['block_size'] + + cpdef void sync(self): + """ + Synchronize the data kept in the memory with these kept on disk + """ diff --git a/tests/test_db.py b/tests/test_db.py index d8b2ce0..6186db7 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -1,6 +1,15 @@ import unittest +from tempsdb.chunks import create_chunk + class TestDB(unittest.TestCase): - def test_something(self): - ... + def test_chunk(self): + data = [(0, b'ala '), (1, b'ma '), (4, b'kota')] + chunk = create_chunk('chunk.db', data) + self.assertEqual(chunk.min_ts, 0) + self.assertEqual(chunk.max_ts, 4) + self.assertEqual(chunk.bs, 4) + self.assertEqual(chunk.get_piece_at(0), (0, b'ala ')) + self.assertEqual(chunk.get_piece_at(1), (1, b'ma ')) + self.assertEqual(chunk.get_piece_at(2), (4, b'kota')) -- GitLab