diff --git a/docs/chunks.rst b/docs/chunks.rst index 640ace6df1be31a98a834aa2cb9575293d8d988e..14bb6e2a406ec8fb48037252ac3e5755ab36860f 100644 --- a/docs/chunks.rst +++ b/docs/chunks.rst @@ -16,3 +16,5 @@ A file storing a chunk consists as follows: * repeated * 8 bytes unsigned long long - timestamp * block_size bytes of data + +It's padded to `page_size` with zeros, and four last bytes is the `unsigned long` amount of entries diff --git a/docs/memory-pressure-manager.rst b/docs/memory-pressure-manager.rst index 87c918e3c79bccc88285ee84085152542716db74..d150b7ad475e922b6aa7b4a52c47b41ae0b7b474 100644 --- a/docs/memory-pressure-manager.rst +++ b/docs/memory-pressure-manager.rst @@ -1,7 +1,9 @@ Integration with Satella's MemoryPressureManager ================================================ -This library integrates itself with satella's MemoryPressureManager_. +This library integrates itself with Satella_ MemoryPressureManager_. + +.. _Satella: https://github.com/piotrmaslanka/satella It will close the non-required chunks when remaining in severity 1 each 30 seconds. diff --git a/setup.py b/setup.py index 93a82b19baa05c77a0253dc39c6f75265fe58713..e9cffff53b26dc4c33375374d21bce6a604bf3d6 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,11 @@ import os import typing as tp + +from Cython.Build import cythonize from satella.files import find_files from distutils.core import setup + +from setuptools import Extension from snakehouse import Multibuild, build @@ -9,14 +13,26 @@ def find_pyx(*path) -> tp.List[str]: return list(find_files(os.path.join(*path), r'(.*)\.pyx', scan_subdirectories=True)) +# extensions = [Extension("tempsdb.chunks", ['tempsdb/chunks.pyx']), +# Extension("tempsdb.database", ['tempsdb/database.pyx']), +# Extension('tempsdb.exceptions', ['tempsdb/exceptions.pyx']), +# Extension('tempsdb.series', ['tempsdb/series.pyx'])] + + setup(name='tempsdb', - version='0.1_a2', + version='0.1_a3', packages=['tempsdb'], install_requires=['satella>=2.14.21', 'ujson'], ext_modules=build([Multibuild('tempsdb', find_pyx('tempsdb')), ], + gdb_debug=True, compiler_directives={ 'language_level': '3', }), + # ext_modules=cythonize(extensions, + # gdb_debug=True, + # compiler_directives={ + # 'language_level': '3', + # }), python_requires='!=2.7.*,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,!=3.7.*', test_suite="tests", zip_safe=False diff --git a/tempsdb/__init__.py b/tempsdb/__init__.py index 93a0b2e8f779eb4b71867716e1321d66782a5d9b..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 100644 --- a/tempsdb/__init__.py +++ b/tempsdb/__init__.py @@ -1,2 +0,0 @@ -from tempsdb.__bootstrap__ import bootstrap_cython_submodules -bootstrap_cython_submodules() diff --git a/tempsdb/chunks.pxd b/tempsdb/chunks.pxd index 2a747569bdfebca475c673441efb10265e89f8f7..335d48ae0032f53d5660d1a50d81e951eb7697fc 100644 --- a/tempsdb/chunks.pxd +++ b/tempsdb/chunks.pxd @@ -14,6 +14,9 @@ cdef class Chunk: unsigned int block_size_plus readonly unsigned int block_size readonly unsigned long entries + unsigned long file_size + unsigned long pointer # position to write next entry at + unsigned long page_size object file object mmap bint closed @@ -27,6 +30,7 @@ cdef class Chunk: cpdef int sync(self) except -1 cpdef unsigned int find_left(self, unsigned long long timestamp) cpdef unsigned int find_right(self, unsigned long long timestamp) + cdef int extend(self) except -1 cdef inline unsigned long long name(self): """ @@ -42,16 +46,7 @@ cdef class Chunk: """ return self.entries - cdef inline unsigned long long get_timestamp_at(self, unsigned int index): - """ - Get timestamp at given entry - - :param index: index of the entry - :type index: int - :return: timestamp at this entry - :rtype: int - """ - cdef unsigned long offset = HEADER_SIZE+index*self.block_size_plus - return STRUCT_Q.unpack(self.mmap[offset:offset+TIMESTAMP_SIZE])[0] + cdef unsigned long long get_timestamp_at(self, unsigned int index) + -cpdef Chunk create_chunk(TimeSeries parent, str path, list data) +cpdef Chunk create_chunk(TimeSeries parent, str path, list data, int page_size) diff --git a/tempsdb/chunks.pyx b/tempsdb/chunks.pyx index 65eedaab83192a55e8b440e550596b4739feece0..0f114cfc3b84baee441ef12fcd9fc1640d7f97cf 100644 --- a/tempsdb/chunks.pyx +++ b/tempsdb/chunks.pyx @@ -8,6 +8,7 @@ from .series cimport TimeSeries DEF HEADER_SIZE = 4 DEF TIMESTAMP_SIZE = 8 +DEF FOOTER_SIZE = 4 STRUCT_Q = struct.Struct('<Q') STRUCT_L = struct.Struct('<L') @@ -31,10 +32,10 @@ cdef class Chunk: :ivar entries: amount of entries in this chunk (int) :ivar writable: is this chunk writable (bool) """ - def __init__(self, parent: tp.Optional[TimeSeries], path: str, writable: bool = True): - cdef: - unsigned long long file_size = os.path.getsize(path) - bytes b + def __init__(self, parent: tp.Optional[TimeSeries], path: str, page_size: int, writable: bool = True): + cdef bytes b + self.file_size = os.path.getsize(path) + self.page_size = page_size self.parent = parent self.writable = writable self.write_lock = threading.Lock() @@ -43,9 +44,9 @@ cdef class Chunk: self.file = open(self.path, 'rb+' if self.writable else 'rb') try: if self.writable: - self.mmap = mmap.mmap(self.file.fileno(), file_size) + self.mmap = mmap.mmap(self.file.fileno(), self.file_size) else: - self.mmap = mmap.mmap(self.file.fileno(), file_size, access=mmap.ACCESS_READ) + self.mmap = mmap.mmap(self.file.fileno(), self.file_size, access=mmap.ACCESS_READ) except OSError as e: self.file.close() self.closed = True @@ -56,9 +57,10 @@ cdef class Chunk: except struct.error: self.close() raise Corruption('Could not read the header of the chunk file %s' % (self.path, )) - self.entries = (file_size-HEADER_SIZE) // (self.block_size_plus) - self.max_ts, = STRUCT_Q.unpack(self.mmap[file_size-self.block_size_plus:file_size-self.block_size]) - self.min_ts, = STRUCT_Q.unpack(self.mmap[HEADER_SIZE:HEADER_SIZE+TIMESTAMP_SIZE]) + self.entries, = STRUCT_L.unpack(self.mmap[self.file_size-FOOTER_SIZE:self.file_size]) + self.pointer = self.entries*self.block_size_plus+HEADER_SIZE + self.max_ts = self.get_timestamp_at(self.entries-1) + self.min_ts = self.get_timestamp_at(0) cpdef unsigned int find_left(self, unsigned long long timestamp): """ @@ -125,6 +127,26 @@ cdef class Chunk: self.mmap.flush() return 0 + cdef int extend(self) except -1: + """ + Adds PAGE_SIZE bytes to this file + """ + self.file_size += self.page_size + self.mmap.resize(self.file_size) + self.mmap[self.file_size-FOOTER_SIZE:self.file_size] = STRUCT_L.pack(self.entries) + + cdef unsigned long long get_timestamp_at(self, unsigned int index): + """ + Get timestamp at given entry + + :param index: index of the entry + :type index: int + :return: timestamp at this entry + :rtype: int + """ + cdef unsigned long offset = HEADER_SIZE+index*self.block_size_plus + return STRUCT_Q.unpack(self.mmap[offset:offset+TIMESTAMP_SIZE])[0] + cpdef int append(self, unsigned long long timestamp, bytes data) except -1: """ Append a record to this chunk @@ -142,13 +164,20 @@ cdef class Chunk: raise ValueError('data not equal in length to block size!') if timestamp <= self.max_ts: raise ValueError('invalid timestamp') - cdef unsigned long long pointer_at_end = (self.entries+1)*self.block_size_plus + HEADER_SIZE + + if self.pointer >= self.file_size-FOOTER_SIZE-self.block_size_plus: + self.extend() + with self.write_lock: - self.mmap.resize(pointer_at_end) - self.mmap[pointer_at_end-self.block_size_plus:pointer_at_end-self.block_size] = STRUCT_Q.pack(timestamp) - self.mmap[pointer_at_end-self.block_size:pointer_at_end] = data + # Append entry + self.mmap[self.pointer:self.pointer+TIMESTAMP_SIZE] = STRUCT_Q.pack(timestamp) + self.mmap[self.pointer+TIMESTAMP_SIZE:self.pointer+TIMESTAMP_SIZE+self.block_size] = data self.entries += 1 + # Update entries count + self.mmap[self.file_size-FOOTER_SIZE:self.file_size] = STRUCT_L.pack(self.entries) + # Update properties self.max_ts = timestamp + self.pointer += self.block_size_plus return 0 cpdef object iterate_range(self, unsigned long starting_entry, unsigned long stopping_entry): @@ -210,7 +239,7 @@ cdef class Chunk: return ts, self.mmap[starting_index+TIMESTAMP_SIZE:stopping_index] -cpdef Chunk create_chunk(TimeSeries parent, str path, list data): +cpdef Chunk create_chunk(TimeSeries parent, str path, list data, int page_size): """ Creates a new chunk on disk @@ -221,6 +250,8 @@ cpdef Chunk create_chunk(TimeSeries parent, str path, list data): :param data: data to write, list of tuple (timestamp, entry to write). Must be nonempty and sorted by timestamp. :type data: tp.List[tp.Tuple[int, bytes]] + :param page_size: size of a single page for storage + :type page_size: int :raises ValueError: entries in data were not of equal size, or data was empty or data was not sorted by timestamp or same timestamp appeared twice :raises AlreadyExists: chunk already exists @@ -234,10 +265,12 @@ cpdef Chunk create_chunk(TimeSeries parent, str path, list data): bytes b unsigned long long ts unsigned long block_size = len(data[0][1]) + unsigned long file_size = 0 unsigned long long last_ts = 0 + unsigned int entries = len(data) bint first_element = True - file.write(STRUCT_L.pack(block_size)) + file_size += file.write(STRUCT_L.pack(block_size)) try: for ts, b in data: if not first_element: @@ -245,14 +278,24 @@ cpdef Chunk create_chunk(TimeSeries parent, str path, list data): raise ValueError('Timestamp appeared twice or data was not sorted') if len(b) != block_size: raise ValueError('Block size has entries of not equal length') - file.write(STRUCT_Q.pack(ts)) - file.write(b) + file_size += file.write(STRUCT_Q.pack(ts)) + file_size += file.write(b) last_ts = ts first_element = False except ValueError: file.close() os.unlink(path) raise + + # Pad this thing to page_size + cdef unsigned long bytes_to_pad = page_size - (file_size % page_size) + file.write(b'\x00' * bytes_to_pad) + + # Create a footer at the end + cdef bytearray footer = bytearray(page_size) + footer[-4:] = STRUCT_L.pack(entries) + file.write(footer) file.close() - return Chunk(parent, path) + print('Finished creating chunk') + return Chunk(parent, path, page_size) diff --git a/tempsdb/series.pxd b/tempsdb/series.pxd index ddf8575b6f07f92b641036cf4b6096bf1bc0005b..5ef0aa291f5a9379ec898a97929576b375b4339e 100644 --- a/tempsdb/series.pxd +++ b/tempsdb/series.pxd @@ -11,6 +11,7 @@ cdef class TimeSeries: readonly unsigned long long last_entry_synced readonly unsigned int block_size readonly unsigned long long last_entry_ts + unsigned int page_size list chunks dict open_chunks list data_in_memory @@ -27,4 +28,5 @@ cdef class TimeSeries: cpdef int sync(self) except -1 cpdef int close_chunks(self) except -1 -cpdef TimeSeries create_series(str path, unsigned int block_size, int max_entries_per_chunk) +cpdef TimeSeries create_series(str path, unsigned int block_size, + int max_entries_per_chunk, int page_size=*) diff --git a/tempsdb/series.pyx b/tempsdb/series.pyx index b1fa7df635ea2114d9ba5848454d1a65551c0db5..81e10bf93b6908744dcaf387345de2ed0bcd1cee 100644 --- a/tempsdb/series.pyx +++ b/tempsdb/series.pyx @@ -63,6 +63,7 @@ cdef class TimeSeries: self.block_size = metadata['block_size'] self.max_entries_per_chunk = metadata['max_entries_per_chunk'] self.last_entry_synced = metadata['last_entry_synced'] + self.page_size = metadata['page_size'] except KeyError: raise Corruption('Could not read metadata item') @@ -138,7 +139,8 @@ cdef class TimeSeries: return { 'block_size': self.block_size, 'max_entries_per_chunk': self.max_entries_per_chunk, - 'last_entry_synced': self.last_entry_synced + 'last_entry_synced': self.last_entry_synced, + 'page_size': self.page_size } cpdef void register_memory_pressure_manager(self, object mpm): @@ -193,12 +195,12 @@ cdef class TimeSeries: with self.lock: if self.last_chunk is None: self.last_chunk = create_chunk(self, os.path.join(self.path, str(timestamp)), - [(timestamp, data)]) + [(timestamp, data)], self.page_size) self.open_chunks[timestamp] = self.last_chunk self.chunks.append(timestamp) elif self.last_chunk.length() >= self.max_entries_per_chunk: self.last_chunk = create_chunk(self, os.path.join(self.path, str(timestamp)), - [(timestamp, data)]) + [(timestamp, data)], self.page_size) self.chunks.append(timestamp) else: self.last_chunk.append(timestamp, data) @@ -219,7 +221,7 @@ cdef class TimeSeries: cpdef TimeSeries create_series(str path, unsigned int block_size, - int max_entries_per_chunk): + int max_entries_per_chunk, int page_size=4096): if os.path.exists(path): raise AlreadyExists('This series already exists!') @@ -228,7 +230,8 @@ cpdef TimeSeries create_series(str path, unsigned int block_size, ujson.dump({ 'block_size': block_size, 'max_entries_per_chunk': max_entries_per_chunk, - 'last_entry_synced': 0 + 'last_entry_synced': 0, + 'page_size': page_size }, f_out ) return TimeSeries(path) diff --git a/tests/test_db.py b/tests/test_db.py index cd603659fd18e5e0bbdd5109085085a017a70939..f3d3a1e22ef7fc93752aa9e7b9cb72ff8bd9b645 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -1,17 +1,17 @@ import os import unittest -from tempsdb.chunks import create_chunk -from tempsdb.series import create_series class TestDB(unittest.TestCase): - def test_create_series(self): + from tempsdb.series import create_series + series = create_series('test', 8, 10) def test_chunk(self): + from tempsdb.chunks import create_chunk data = [(0, b'ala '), (1, b'ma '), (4, b'kota')] - chunk = create_chunk(None, 'chunk.db', data) + chunk = create_chunk(None, 'chunk.db', data, 4096) self.assertEqual(chunk.min_ts, 0) self.assertEqual(chunk.max_ts, 4) self.assertEqual(chunk.block_size, 4) @@ -36,4 +36,4 @@ class TestDB(unittest.TestCase): self.assertEqual(chunk.find_right(5), 4) self.assertEqual(chunk.find_right(6), 4) chunk.close() - self.assertEqual(os.path.getsize('chunk.db'), 4+4*12) + self.assertEqual(os.path.getsize('chunk.db'), 8192)