diff --git a/.coveragerc b/.coveragerc index 001df73cf25030d81c4bc49cc70c5ae66bd7f27f..62eb0ed383b1c0e759180db7f5d40fe46c9e9908 100644 --- a/.coveragerc +++ b/.coveragerc @@ -8,7 +8,6 @@ omit= tests/* .eggs/* setup.py - tempsdb/__init__.py [report] include= diff --git a/README.md b/README.md index f99038af93a13c52e68e2d762be4e04890c666d7..9c04d17a98b79f63ff6577ebcbb465c588e6fb2e 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,9 @@ Then copy your resulting wheel and install it via pip on the target system. ## v0.5 +* if mmap is used, the kernel will be informed after loading the chunk that we + don't need it's memory right now +* deleting a `TimeSeries` will now correctly return a zero * both `Database`, `TimeSeries` and `Chunk` destructor will close and emit a warning if the user forgot to * if page_size is default, it won't be written as part of the metadata @@ -61,6 +64,7 @@ Then copy your resulting wheel and install it via pip on the target system. * added `append_padded` * added metadata support, `metadata` property and `set_metadata` call * added variable length series +* added experimental support for gzipping time series * fixed a bug where getting a series that was already closed would TypeError * following additions to `Chunk`: * `get_slice_of_piece_at` @@ -80,10 +84,18 @@ Then copy your resulting wheel and install it via pip on the target system. ## v0.4.3 +* improving handling mmap failures on too low memory +* slightly reduced `metadata.txt` by defaulting `page_size` +* moved `Chunk` +* added support for gzipping +* added `DirectChunk` * iterating and writing at the same time from multiple threads made safe * added `TimeSeries.disable_mmap` * `Iterator`'s destructor will emit a warning if you forget to close it explicitly. +* added option for transparent gzip compression + Please note that gzip disables mmap! +* experimental gzip support for constant-length time series ## v0.4.2 diff --git a/docs/chunks.rst b/docs/chunks.rst index 14bb6e2a406ec8fb48037252ac3e5755ab36860f..d45a93c2c3c24b3e00f02bfee579e5e0f5e98733 100644 --- a/docs/chunks.rst +++ b/docs/chunks.rst @@ -1,16 +1,34 @@ Chunk ===== -For your convenience the class :class:`~tempsdb.chunks.Chunk` was also documented, but don't use +.. versionadded:: 0.5 + +There are two kinds of chunk - a "normal" chunk and "direct" chunk. + +The difference is that a normal chunk will preallocate a page ahead, in order for writes to be fast, +while direct chunk will write only as much data as is strictly required. + +Only "direct" chunks are capable to be gzipped, since one page is preallocated for normal chunk, which +would prevent modifications made post-factum to it. + +For your convenience the class :class:`~tempsdb.chunks.base.Chunk` was also documented, but don't use it directly: -.. autoclass:: tempsdb.chunks.Chunk +.. autoclass:: tempsdb.chunks.base.Chunk :members: Data stored in files is little endian. +A way to tell which chunk are we dealing with is to look at it's extension. +Chunks that have: + +* no extension - are normal chunks +* `.direct` extension - are direct chunks +* `.gz` extension - are direct and gzipped chunks -A file storing a chunk consists as follows: +Normal chunk +------------ +A file storing a normal chunk consists as follows: * 4 bytes unsigned int - block size * repeated @@ -18,3 +36,15 @@ A file storing a chunk consists as follows: * block_size bytes of data It's padded to `page_size` with zeros, and four last bytes is the `unsigned long` amount of entries + +Direct chunk +------------ +A file storing a direct chunk consists as follows: + +* 4 bytes unsigned int - block size +* repeated + * 8 bytes unsigned long long - timestamp + * block_size bytes of data + +Note that a direct chunk will be able to be gzipped. If it's file name ends with .gz, then it's +a direct chunk which is gzipped. diff --git a/docs/index.rst b/docs/index.rst index 44fcf65ebe75181fb7d1c4ebfb8d1b24a5bf597f..661603e6521d106257c0113bcfbd0cbe87b92c03 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -25,6 +25,13 @@ So no variable encoding for you! When mmap fails due to memory issues, this falls back to slower fwrite()/fread() implementation. You can also manually select the descriptor-based implementation if you want to. +.. versionadded:: 0.5 + +Experimental support for gzipping time series is added. Note that reading from gzipped files might be +slow, as every seek requires reading the file from the beginning. + +Warnings will be issued while using gzipped series to remind you of this fact. + Indices and tables ================== diff --git a/docs/usage.rst b/docs/usage.rst index f06750d8a81de50aec5d1f2838c36ae67406f6af..268e062571083fcd9106a770901b923e7b549202 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -19,6 +19,14 @@ Start off by instantiating an object .. autoclass:: tempsdb.database.Database :members: +Note that if you specify a `gzip_level` argument in +:meth:`~tempsdb.database.Database.create_series`, GZIP compression will be used. + +Note that gzip-compressed series are very slow to read, since every seek needs +to start from scratch. This will be fixed in the future. + +Also, any gzip-opened series will raise a warning, since their support is experimental at best. + You can create new databases via .. autofunction:: tempsdb.database.create_database diff --git a/requirements.txt b/requirements.txt index 2fe238cfb6d36a0700aac897d9433cd41260142e..39ce8d3b742bdad46630addbf64b257d9e1fb775 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ satella ujson -snakehouse +snakehouse>=1.2.3 six diff --git a/setup.py b/setup.py index 4ec7056f62d7a45e316eae3049dbc5d2931585bd..69dafaa661f1082e4ec743ba1ea99b7d588d4fde 100644 --- a/setup.py +++ b/setup.py @@ -6,8 +6,7 @@ from satella.files import find_files from distutils.core import setup from setuptools import Extension -from snakehouse import Multibuild, build -from satella.distutils import monkey_patch_parallel_compilation +from snakehouse import Multibuild, build, monkey_patch_parallel_compilation def find_pyx(*path) -> tp.List[str]: @@ -23,15 +22,17 @@ monkey_patch_parallel_compilation() # Extension('tempsdb.iterators', ['tempsdb/iterators.pyx'])] # directives = {'language_level': '3'} +m_kwargs = {} if 'CI' in os.environ: directives.update(profile=True, linetrace=True, embedsignature=True) + m_kwargs['define_macros'] = [("CYTHON_TRACE_NOGIL", "1")] setup(name='tempsdb', - version='0.5a3', + version='0.5a10', packages=['tempsdb'], - install_requires=['satella', 'ujson'], - ext_modules=build([Multibuild('tempsdb', find_pyx('tempsdb')), ], + install_requires=['satella>=2.14.21', 'ujson'], + ext_modules=build([Multibuild('tempsdb', find_pyx('tempsdb'), **m_kwargs), ], compiler_directives=directives), # ext_modules=cythonize(extensions, # gdb_debug=True, diff --git a/tempsdb/chunks/__init__.py b/tempsdb/chunks/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tempsdb/chunks.pxd b/tempsdb/chunks/base.pxd similarity index 88% rename from tempsdb/chunks.pxd rename to tempsdb/chunks/base.pxd index ff1585ccd6d7c86ed9ac47fb19e664c64b1745b2..83ccb8a1c269063068544d26692ec196190f7735 100644 --- a/tempsdb/chunks.pxd +++ b/tempsdb/chunks/base.pxd @@ -1,10 +1,10 @@ -from .series cimport TimeSeries - +from ..series cimport TimeSeries cdef class AlternativeMMap: cdef: object io, file_lock_object unsigned long size + unsigned long long pointer cdef class Chunk: @@ -21,26 +21,27 @@ cdef class Chunk: readonly unsigned long page_size object file, mmap, file_lock_object bint closed - - cdef void incref(self) - cdef int decref(self) except -1 cpdef object iterate_indices(self, unsigned long starting_entry, unsigned long stopping_entry) cpdef int close(self, bint force=*) except -1 - cpdef unsigned long long get_timestamp_at(self, unsigned int index) cdef tuple get_piece_at(self, unsigned int index) + cdef int sync(self) except -1 + cpdef unsigned long long get_timestamp_at(self, unsigned int index) cpdef bytes get_value_at(self, unsigned int index) cpdef bytes get_slice_of_piece_at(self, unsigned int index, int start, int stop) cpdef bytes get_slice_of_piece_starting_at(self, unsigned int index, int start) cpdef int get_byte_of_piece(self, unsigned int index, int byte_index) except -1 - cpdef int append(self, unsigned long long timestamp, bytes data) except -1 - cdef int sync(self) except -1 cpdef unsigned int find_left(self, unsigned long long timestamp) cpdef unsigned int find_right(self, unsigned long long timestamp) - cdef int extend(self) except -1 + cpdef object open_file(self, str path) + cpdef int extend(self) except -1 + cpdef int after_init(self) except -1 + cpdef int append(self, unsigned long long timestamp, bytes data) except -1 cpdef int delete(self) except -1 cpdef int switch_to_descriptor_based_access(self) except -1 cpdef int switch_to_mmap_based_access(self) except -1 cpdef unsigned long get_mmap_size(self) + cdef void incref(self) + cdef int decref(self) except -1 cdef inline unsigned long long name(self): """ @@ -55,8 +56,3 @@ cdef class Chunk: :rtype: int """ return self.entries - - -cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timestamp, - bytes data, int page_size, - bint descriptor_based_access=*) diff --git a/tempsdb/chunks.pyx b/tempsdb/chunks/base.pyx similarity index 78% rename from tempsdb/chunks.pyx rename to tempsdb/chunks/base.pyx index 065628832e4d50e6f54fa8e89ccaeb3560ad91e1..2a7538b45286ea9eaf2934f184e1a091323d5007 100644 --- a/tempsdb/chunks.pyx +++ b/tempsdb/chunks/base.pyx @@ -6,20 +6,25 @@ import struct import mmap import warnings -from .exceptions import Corruption, InvalidState, AlreadyExists, StillOpen -from .series cimport TimeSeries +from .gzip cimport ReadWriteGzipFile +from ..exceptions import Corruption, InvalidState, AlreadyExists, StillOpen +from ..series cimport TimeSeries + -DEF HEADER_SIZE = 4 -DEF TIMESTAMP_SIZE = 8 -DEF FOOTER_SIZE = 4 STRUCT_Q = struct.Struct('<Q') STRUCT_L = struct.Struct('<L') STRUCT_LQ = struct.Struct('<LQ') +DEF HEADER_SIZE = 4 +DEF TIMESTAMP_SIZE = 8 +DEF FOOTER_SIZE = 4 + cdef class AlternativeMMap: """ - An alternative mmap implementation used when mmap cannot allocate due to memory issues + An alternative mmap implementation used when mmap cannot allocate due to memory issues. + + Note that opening gzip files is slow, as the script needs to iterate """ def flush(self): self.io.flush() @@ -32,8 +37,13 @@ cdef class AlternativeMMap: def __init__(self, io_file: io.BinaryIO, file_lock_object): self.io = io_file - self.io.seek(0, io.SEEK_END) - self.size = self.io.tell() + cdef ReadWriteGzipFile rw_gz + if isinstance(io_file, ReadWriteGzipFile): + rw_gz = io_file + self.size = rw_gz.size + else: + self.io.seek(0, 2) + self.size = self.io.tell() self.file_lock_object = file_lock_object def __getitem__(self, item: tp.Union[int, slice]) -> tp.Union[int, bytes]: @@ -59,17 +69,25 @@ cdef class AlternativeMMap: self[key:key+1] = bytes([value]) else: with self.file_lock_object: - self.io.seek(start, 0) + if not isinstance(self.io, ReadWriteGzipFile): + self.io.seek(start, 0) self.io.write(value) def close(self): pass + def close(self): + pass + cdef class Chunk: """ Represents a single chunk of time series. + This implementation is the default - it allocates a page ahead of the stream and + writes the amount of currently written entries to the end of the page. Suitable for SSD + and RAM media. + This also implements an iterator interface, and will iterate with tp.Tuple[int, bytes], as well as a sequence protocol. @@ -134,17 +152,24 @@ cdef class Chunk: self.mmap = AlternativeMMap(self.file, self.file_lock_object) return 0 - cdef void incref(self): - if self.parent is not None: - self.parent.incref_chunk(self.min_ts) + cpdef int after_init(self) except -1: + """ + Load the :attr:`~Chunk.entries`, :attr:`~Chunk.pointer` and :attr:`~Chunk.max_ts` + + :meta private: + """ + self.entries, = STRUCT_L.unpack(self.mmap[self.file_size-FOOTER_SIZE:self.file_size]) + self.pointer = self.entries*self.block_size_plus+HEADER_SIZE + self.max_ts = self.get_timestamp_at(self.entries-1) - cdef int decref(self) except -1: - if self.parent is not None: - self.parent.decref_chunk(self.name()) - if self.parent.get_references_for(self.name()) < 0: - raise ValueError('reference of chunk fell below zero!') + if self.pointer >= self.page_size: + # Inform the OS that we don't need the header anymore + self.mmap.madvise(mmap.MADV_DONTNEED, 0, HEADER_SIZE+TIMESTAMP_SIZE) return 0 + cpdef object open_file(self, str path): + return open(self.path, 'rb+') + def __init__(self, parent: tp.Optional[TimeSeries], path: str, page_size: int, use_descriptor_access: bool = False): cdef bytes b @@ -153,7 +178,7 @@ cdef class Chunk: self.parent = parent self.closed = False self.path = path - self.file = open(self.path, 'rb+') + self.file = self.open_file(path) self.file_lock_object = None if use_descriptor_access: @@ -163,10 +188,7 @@ cdef class Chunk: try: self.mmap = mmap.mmap(self.file.fileno(), 0) except OSError as e: - if e.errno in (11, # EAGAIN - memory is too low - 12, # ENOMEM - no memory space available - 19, # ENODEV - fs does not support mmapping - 75): # EOVERFLOW - too many pages would have been used + if e.errno in (11, 12): # Cannot allocate memory or memory range exhausted self.file_lock_object = threading.Lock() self.mmap = AlternativeMMap(self.file, self.file_lock_object) else: @@ -181,9 +203,11 @@ cdef class Chunk: self.close() raise Corruption('Could not read the header of the chunk file %s' % (self.path, )) - self.entries, = STRUCT_L.unpack(self.mmap[self.file_size-FOOTER_SIZE:self.file_size]) - self.pointer = self.entries*self.block_size_plus+HEADER_SIZE - self.max_ts = self.get_timestamp_at(self.entries-1) + self.entries = 0 + self.max_ts = 0 + self.pointer = 0 + + self.after_init() if self.pointer >= self.page_size: # Inform the OS that we don't need the header anymore @@ -307,29 +331,8 @@ cdef class Chunk: self.mmap.flush() return 0 - cdef int extend(self) except -1: - """ - Adds PAGE_SIZE bytes to this file - """ - cdef bytearray ba - if self.file_lock_object: - self.file_lock_object.acquire() - try: - self.file_size += self.page_size - self.file.seek(0, io.SEEK_END) - ba = bytearray(self.page_size) - ba[self.page_size-FOOTER_SIZE:self.page_size] = STRUCT_L.pack(self.entries) - self.file.write(ba) - try: - self.mmap.resize(self.file_size) - except OSError as e: - if e.errno == 12: # ENOMEM - self.switch_to_descriptor_based_access() - else: - raise - finally: - if self.file_lock_object: - self.file_lock_object.release() + cpdef int extend(self) except -1: + return 0 cpdef int delete(self) except -1: """ @@ -355,21 +358,7 @@ cdef class Chunk: :param data: data to write :raises InvalidState: chunk is closed """ - if self.closed: - raise InvalidState('chunk is closed') - - if self.pointer >= self.file_size-FOOTER_SIZE-self.block_size_plus: - self.extend() - cdef unsigned long long ptr_end = self.pointer + TIMESTAMP_SIZE - # Append entry - self.mmap[self.pointer:ptr_end] = STRUCT_Q.pack(timestamp) - self.mmap[ptr_end:ptr_end+self.block_size] = data - self.entries += 1 - # Update entries count - self.mmap[self.file_size-FOOTER_SIZE:self.file_size] = STRUCT_L.pack(self.entries) - # Update properties - self.max_ts = timestamp - self.pointer += self.block_size_plus + raise NotImplementedError('Abstract method!') return 0 cpdef object iterate_indices(self, unsigned long starting_entry, unsigned long stopping_entry): @@ -394,6 +383,30 @@ cdef class Chunk: def __len__(self): return self.length() + cdef void incref(self): + if self.parent is not None: + self.parent.incref_chunk(self.min_ts) + + cdef int decref(self) except -1: + if self.parent is not None: + self.parent.decref_chunk(self.name()) + if self.parent.get_references_for(self.name()) < 0: + raise ValueError('reference of chunk fell below zero!') + return 0 + + cpdef bytes get_value_at(self, unsigned int index): + """ + Return only the value at a particular index, numbered from 0 + + :return: value at given index + """ + if index >= self.entries: + raise IndexError('Index too large') + cdef: + unsigned long starting_index = HEADER_SIZE + TIMESTAMP_SIZE + index * self.block_size_plus + unsigned long stopping_index = starting_index + self.block_size + return self.mmap[starting_index:stopping_index] + cpdef int close(self, bint force=False) except -1: """ Close the chunk and close the allocated resources @@ -422,19 +435,6 @@ cdef class Chunk: warnings.warn('You forgot to close a Chunk') self.close() - cpdef bytes get_value_at(self, unsigned int index): - """ - Return only the value at a particular index, numbered from 0 - - :return: value at given index - """ - if index >= self.entries: - raise IndexError('Index too large') - cdef: - unsigned long starting_index = HEADER_SIZE + TIMESTAMP_SIZE + index * self.block_size_plus - unsigned long stopping_index = starting_index + self.block_size - return self.mmap[starting_index:stopping_index] - cdef tuple get_piece_at(self, unsigned int index): """ Return a piece of data at a particular index, numbered from 0 @@ -443,7 +443,8 @@ cdef class Chunk: :rtype: tp.Tuple[int, bytes] """ if index >= self.entries: - raise IndexError('Index too large') + raise IndexError('Index too large, got %s while max entries is %s' % (index, + self.entries)) cdef: unsigned long starting_index = HEADER_SIZE + index * self.block_size_plus unsigned long stopping_index = starting_index + self.block_size_plus @@ -451,48 +452,3 @@ cdef class Chunk: self.mmap[starting_index:starting_index+TIMESTAMP_SIZE])[0] return ts, self.mmap[starting_index+TIMESTAMP_SIZE:stopping_index] - -cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timestamp, - bytes data, int page_size, bint descriptor_based_access=False): - """ - Creates a new chunk on disk - - :param parent: parent time series - :param path: path to the new chunk file - :param timestamp: timestamp for first entry to contain - :param data: data of the first entry - :param page_size: size of a single page for storage - :param descriptor_based_access: whether to use descriptor based access instead of mmap. - Default is False - :raises ValueError: entries in data were not of equal size, or data was empty or data - was not sorted by timestamp or same timestamp appeared twice - :raises AlreadyExists: chunk already exists - """ - if os.path.exists(path): - raise AlreadyExists('chunk already exists!') - if not data: - raise ValueError('Data is empty') - file = open(path, 'wb') - cdef: - bytes b - unsigned long long ts - unsigned long block_size = len(data) - unsigned long file_size = 0 - unsigned long long last_ts = 0 - unsigned int entries = 1 - bint first_element = True - file_size += file.write(STRUCT_L.pack(block_size)) - file_size += file.write(STRUCT_Q.pack(timestamp)) - file_size += file.write(data) - - # Pad this thing to page_size - cdef unsigned long bytes_to_pad = page_size - (file_size % page_size) - file.write(b'\x00' * bytes_to_pad) - - # Create a footer at the end - cdef bytearray footer = bytearray(page_size) - footer[-4:] = b'\x01\x00\x00\x00' # 1 in little endian - file.write(footer) - file.close() - return Chunk(parent, path, page_size, use_descriptor_access=descriptor_based_access) - diff --git a/tempsdb/chunks/direct.pxd b/tempsdb/chunks/direct.pxd new file mode 100644 index 0000000000000000000000000000000000000000..ddee23ac118c52441b2959382f25393efe8a02d3 --- /dev/null +++ b/tempsdb/chunks/direct.pxd @@ -0,0 +1,10 @@ +from .base cimport Chunk + + +cdef class DirectChunk(Chunk): + cdef: + int gzip + + cpdef object open_file(self, str path) + cpdef int after_init(self) except -1 + cpdef int append(self, unsigned long long timestamp, bytes data) except -1 diff --git a/tempsdb/chunks/direct.pyx b/tempsdb/chunks/direct.pyx new file mode 100644 index 0000000000000000000000000000000000000000..4f23a977fa01255a88829c231825a8aa44c140be --- /dev/null +++ b/tempsdb/chunks/direct.pyx @@ -0,0 +1,100 @@ +import os +import typing as tp +import struct +import warnings + +from ..series cimport TimeSeries +from .gzip cimport ReadWriteGzipFile +from .base cimport Chunk + + +STRUCT_Q = struct.Struct('<Q') +DEF HEADER_SIZE = 4 +DEF TIMESTAMP_SIZE = 8 +DEF FOOTER_SIZE = 4 + + +cdef class DirectChunk(Chunk): + """ + Alternative implementation that extends the file as-it-goes, without allocating an entire page + in advance. + + This is also the only chunk type capable of supporting gzip. + + Note that if you system doesn't like mmap resizing a lot, try to use it with + `use_descriptor_access=True`. + + Note that you can only use gzip if you set use_descriptor_access to True + + :param gzip_compression_level: gzip compression level to use. 0 is default and means + gzip disabled. If given, a warning will be emitted as gzip support is still experimental. + :raises ValueError: non-direct descriptor was requested and gzip was enabled + """ + def __init__(self, parent: tp.Optional[TimeSeries], path: str, page_size: int, + use_descriptor_access: tp.Optional[bool] = None, + gzip_compression_level: int = 0): + if path.endswith('.gz'): + warnings.warn('Please pass the path without .gz') + path = path.replace('.gz', '') + if path.endswith('.direct'): + warnings.warn('Please pass the path without .direct') + path = path.replace('.direct', '') + if use_descriptor_access is None: + use_descriptor_access = False + if gzip_compression_level: + warnings.warn('Gzip support is experimental') + use_descriptor_access = True + + self.gzip = gzip_compression_level + + if gzip_compression_level: + path = path + '.gz' + else: + path = path + '.direct' + + if gzip_compression_level: + if not use_descriptor_access: + raise ValueError('Use descriptor access must be enabled when using gzip') + super().__init__(parent, path, page_size, + use_descriptor_access=use_descriptor_access | bool(gzip_compression_level)) + + cpdef object open_file(self, str path): + if self.gzip: + return ReadWriteGzipFile(path, compresslevel=self.gzip) + else: + return super().open_file(path) + + cpdef int after_init(self) except -1: + cdef ReadWriteGzipFile rw_gz + if isinstance(self.file, ReadWriteGzipFile): + rw_gz = self.file + self.file_size = rw_gz.size + else: + self.file.seek(0, os.SEEK_END) + self.file_size = self.file.tell() + self.entries = (self.file_size - HEADER_SIZE) // self.block_size_plus + self.pointer = self.file_size + d = (self.file_size - self.block_size) - (self.file_size-self.block_size_plus) + cdef bytes b = self.mmap[self.file_size-self.block_size_plus:self.file_size-self.block_size] + + self.max_ts, = STRUCT_Q.unpack(b) + return 0 + + cpdef int append(self, unsigned long long timestamp, bytes data) except -1: + cdef bytes b + if self.file_lock_object: + self.file_lock_object.acquire() + try: + self.file_size += self.block_size_plus + if not isinstance(self.file, ReadWriteGzipFile): + self.file.seek(self.pointer, 0) + b = STRUCT_Q.pack(timestamp) + data + self.file.write(b) + self.mmap.resize(self.file_size) + self.pointer += self.block_size_plus + self.entries += 1 + finally: + if self.file_lock_object: + self.file_lock_object.release() + return 0 + diff --git a/tempsdb/chunks/gzip.pxd b/tempsdb/chunks/gzip.pxd new file mode 100644 index 0000000000000000000000000000000000000000..65d766e7f6e8e9e2a43c4bf70cbb83aa7ff9ea57 --- /dev/null +++ b/tempsdb/chunks/gzip.pxd @@ -0,0 +1,12 @@ +cdef class ReadWriteGzipFile: + cdef: + str path + object ro_file, rw_file + int compress_level + object lock + unsigned long pointer + unsigned long size + bint needs_flush_before_read + + cpdef int flush(self) except -1 + diff --git a/tempsdb/chunks/gzip.pyx b/tempsdb/chunks/gzip.pyx new file mode 100644 index 0000000000000000000000000000000000000000..db1895925205f253c75b377c30e135da6e160a4b --- /dev/null +++ b/tempsdb/chunks/gzip.pyx @@ -0,0 +1,68 @@ +import gzip +import threading + + +cdef class ReadWriteGzipFile: + def __init__(self, path: str, compresslevel: int = gzip._COMPRESS_LEVEL_FAST): + self.path = path + self.compress_level = compresslevel + self.ro_file = gzip.GzipFile(path, 'rb') + self.rw_file = gzip.GzipFile(path, 'ab', compresslevel=self.compress_level) + self.pointer = 0 + self.lock = threading.RLock() + self.needs_flush_before_read = False + cdef bytes b + b = self.read(128) + while b: + b = self.read(128) + self.size = self.pointer + + cpdef int flush(self) except -1: + self.rw_file.flush() + self.ro_file.close() + self.ro_file = gzip.GzipFile(self.path, 'rb') + self.pointer = 0 + self.needs_flush_before_read = False + return 0 + + def close(self): + with self.lock: + self.rw_file.close() + self.ro_file.close() + + def read(self, int maxinplen): + cdef bytes b + with self.lock: + if self.needs_flush_before_read: + self.flush() + b = self.ro_file.read(maxinplen) + self.pointer += len(b) + return b + + def write(self, bytes value): + """ + Always an append, despite what + :meth:`~tempsdb.chunks.gzip.ReadWriteGzipFile.tell` and + :meth:`~tempsdb.chunks.gzip.ReadWriteGzipFile.seek` may say. + """ + with self.lock: + self.rw_file.write(value) + self.size += len(value) + self.needs_flush_before_read = True + + def seek(self, unsigned long pos, int mode): + if self.needs_flush_before_read: + self.flush() + if mode == 2: + self.seek(self.size-pos, 0) + elif mode == 0: + if pos != self.pointer: + self.ro_file.seek(pos, 0) + self.pointer = pos + elif mode == 1: + raise NotImplementedError('Unimplemented seek mode') + else: + raise ValueError('Invalid seek mode') + + def tell(self): + return self.pointer diff --git a/tempsdb/chunks/maker.pxd b/tempsdb/chunks/maker.pxd new file mode 100644 index 0000000000000000000000000000000000000000..ed6cd7b49cec72cbe9e96bf5d3044fd333019e19 --- /dev/null +++ b/tempsdb/chunks/maker.pxd @@ -0,0 +1,7 @@ +from .base cimport Chunk +from ..series cimport TimeSeries + + +cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timestamp, + bytes data, int page_size, bint descriptor_based_access=*, + bint use_direct_mode=*, int gzip_compression_level=*) diff --git a/tempsdb/chunks/maker.pyx b/tempsdb/chunks/maker.pyx new file mode 100644 index 0000000000000000000000000000000000000000..703858a27ecb32bbcd532445c4c027bff2063019 --- /dev/null +++ b/tempsdb/chunks/maker.pyx @@ -0,0 +1,83 @@ +import gzip +import os +import struct + +from tempsdb.exceptions import AlreadyExists +from .base cimport Chunk +from .normal cimport NormalChunk +from .direct cimport DirectChunk +from ..series cimport TimeSeries + + +STRUCT_Q = struct.Struct('<Q') +STRUCT_L = struct.Struct('<L') + + +cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timestamp, + bytes data, int page_size, bint descriptor_based_access=False, + bint use_direct_mode = False, int gzip_compression_level=0): + """ + Creates a new chunk on disk + + :param parent: parent time series + :param path: path to the new chunk file + :param timestamp: timestamp for first entry to contain + :param data: data of the first entry + :param page_size: size of a single page for storage + :param descriptor_based_access: whether to use descriptor based access instead of mmap. + Default is False + :param use_direct_mode: if True, chunk will be created using direct mode, without page + preallocation + :param gzip_compression_level: gzip compression level. Use 0 to disable compression. + :raises ValueError: entries in data were not of equal size, or data was empty or data + was not sorted by timestamp or same timestamp appeared twice + :raises AlreadyExists: chunk already exists + """ + cdef str original_path = path + if os.path.exists(path): + raise AlreadyExists('chunk already exists!') + if not data: + raise ValueError('Data is empty') + if not gzip_compression_level and use_direct_mode: + path = path + '.direct' + elif gzip_compression_level: + path = path + '.gz' + + if gzip_compression_level: + file = gzip.open(path, 'wb', compresslevel=gzip_compression_level) + else: + file = open(path, 'wb') + cdef: + bytes b + unsigned long long ts + unsigned long block_size = len(data) + unsigned long file_size = 0 + unsigned long long last_ts = 0 + unsigned int entries = 1 + bint first_element = True + file_size += file.write(STRUCT_L.pack(block_size)) + file_size += file.write(STRUCT_Q.pack(timestamp)) + file_size += file.write(data) + cdef: + bytearray footer + unsigned long bytes_to_pad + if not use_direct_mode: + # Pad this thing to page_size + bytes_to_pad = page_size - (file_size % page_size) + file.write(b'\x00' * bytes_to_pad) + + # Create a footer at the end + footer = bytearray(page_size) + footer[-4:] = b'\x01\x00\x00\x00' # 1 in little endian + file.write(footer) + file.close() + if gzip_compression_level: + return DirectChunk(parent, original_path, page_size, use_descriptor_access=True, + gzip_compression_level=gzip_compression_level) + else: + if use_direct_mode: + return DirectChunk(parent, original_path, page_size, + use_descriptor_access=descriptor_based_access) + else: + return NormalChunk(parent, original_path, page_size, use_descriptor_access=descriptor_based_access) + diff --git a/tempsdb/chunks/normal.pxd b/tempsdb/chunks/normal.pxd new file mode 100644 index 0000000000000000000000000000000000000000..3d6d8361c956d5752b23582c2b89f0b26c199e6d --- /dev/null +++ b/tempsdb/chunks/normal.pxd @@ -0,0 +1,6 @@ +from .base cimport Chunk + + +cdef class NormalChunk(Chunk): + cpdef int append(self, unsigned long long timestamp, bytes data) except -1 + cpdef int extend(self) except -1 diff --git a/tempsdb/chunks/normal.pyx b/tempsdb/chunks/normal.pyx new file mode 100644 index 0000000000000000000000000000000000000000..8339d979d78f1f47f915f257d1087b95e414f187 --- /dev/null +++ b/tempsdb/chunks/normal.pyx @@ -0,0 +1,92 @@ +import io +import struct + +from ..exceptions import Corruption, InvalidState, AlreadyExists, StillOpen +from .base cimport Chunk + +DEF HEADER_SIZE = 4 +DEF TIMESTAMP_SIZE = 8 +DEF FOOTER_SIZE = 4 +STRUCT_Q = struct.Struct('<Q') +STRUCT_L = struct.Struct('<L') + + +cdef class NormalChunk(Chunk): + """ + Represents a single chunk of time series. + + This also implements an iterator interface, and will iterate with tp.Tuple[int, bytes], + as well as a sequence protocol. + + This will try to mmap opened files, but if mmap fails due to not enough memory this + will use descriptor-based access. + + :param parent: parent time series + :param path: path to the chunk file + :param use_descriptor_access: whether to use descriptor based access instead of mmap + + :ivar path: path to the chunk (str) + :ivar min_ts: timestamp of the first entry stored (int) + :ivar max_ts: timestamp of the last entry stored (int) + :ivar block_size: size of the data entries (int) + :ivar entries: amount of entries in this chunk (int) + :ivar page_size: size of the page (int) + """ + + cpdef int extend(self) except -1: + """ + Adds PAGE_SIZE bytes to this file + """ + cdef bytearray ba + if self.file_lock_object: + self.file_lock_object.acquire() + try: + self.file_size += self.page_size + self.file.seek(0, io.SEEK_END) + ba = bytearray(self.page_size) + ba[self.page_size-FOOTER_SIZE:self.page_size] = STRUCT_L.pack(self.entries) + self.file.write(ba) + try: + self.mmap.resize(self.file_size) + except OSError as e: + if e.errno == 12: # ENOMEM + self.switch_to_descriptor_based_access() + else: + raise + finally: + if self.file_lock_object: + self.file_lock_object.release() + return 0 + + cpdef int append(self, unsigned long long timestamp, bytes data) except -1: + """ + Append a record to this chunk. + + Might range from very fast (just a memory operation) to quite slow (adding a new page + to the file). + + Simultaneous writing is not thread-safe. + + Timestamp and data is not checked for, this is supposed to be handled by + :class:`~tempsdb.series.TimeSeries`. + + :param timestamp: timestamp of the entry + :param data: data to write + :raises InvalidState: chunk is closed + """ + if self.closed: + raise InvalidState('chunk is closed') + + if self.pointer >= self.file_size-FOOTER_SIZE-self.block_size_plus: + self.extend() + cdef unsigned long long ptr_end = self.pointer + TIMESTAMP_SIZE + # Append entry + self.mmap[self.pointer:ptr_end] = STRUCT_Q.pack(timestamp) + self.mmap[ptr_end:ptr_end+self.block_size] = data + self.entries += 1 + # Update entries count + self.mmap[self.file_size-FOOTER_SIZE:self.file_size] = STRUCT_L.pack(self.entries) + # Update properties + self.max_ts = timestamp + self.pointer += self.block_size_plus + return 0 diff --git a/tempsdb/database.pxd b/tempsdb/database.pxd index 9d75efc741a3ad845aece4c59001946982bd5dd5..c729f440103e73068026473157966243e6cce560 100644 --- a/tempsdb/database.pxd +++ b/tempsdb/database.pxd @@ -19,10 +19,12 @@ cdef class Database: cpdef TimeSeries create_series(self, str name, int block_size, unsigned long entries_per_chunk, int page_size=*, - bint use_descriptor_based_access=*) + bint use_descriptor_based_access=*, + int gzip_level=*) cpdef VarlenSeries create_varlen_series(self, str name, list length_profile, int size_struct, - unsigned long entries_per_chunk) + unsigned long entries_per_chunk, + int gzip_level=*) cpdef list get_open_series(self) cpdef list get_all_series(self) cpdef int close_all_open_series(self) except -1 diff --git a/tempsdb/database.pyx b/tempsdb/database.pyx index 4a79f1414a8098e778867c31f6823a7b75601023..2d54421135499288f5e527c468b0b7ccf245acdc 100644 --- a/tempsdb/database.pyx +++ b/tempsdb/database.pyx @@ -149,7 +149,8 @@ cdef class Database: cpdef VarlenSeries create_varlen_series(self, str name, list length_profile, int size_struct, - unsigned long entries_per_chunk): + unsigned long entries_per_chunk, + int gzip_level=0): """ Create a new variable length series @@ -158,6 +159,7 @@ cdef class Database: :param size_struct: how many bytes will be used to store length? Valid entries are 1, 2 and 4 :param entries_per_chunk: entries per chunk file + :param gzip_level: level of gzip compression. Leave at 0 (default) to disable compression. :return: new variable length series :raises AlreadyExists: series with given name already exists """ @@ -166,7 +168,8 @@ cdef class Database: cdef VarlenSeries series = create_varlen_series(os.path.join(self.path, name), name, size_struct, length_profile, - entries_per_chunk) + entries_per_chunk, + gzip_level=gzip_level) self.open_varlen_series[name] = series return series @@ -201,7 +204,8 @@ cdef class Database: cpdef TimeSeries create_series(self, str name, int block_size, unsigned long entries_per_chunk, int page_size=4096, - bint use_descriptor_based_access=False): + bint use_descriptor_based_access=False, + int gzip_level=0): """ Create a new series @@ -211,6 +215,7 @@ cdef class Database: :param page_size: size of a single page. Default is 4096 :param use_descriptor_based_access: whether to use descriptor based access instead of mmap. Default is False + :param gzip_level: gzip compression level. Default is 0 which means "don't use gzip" :return: new series :raises ValueError: block size was larger than page_size plus a timestamp :raises AlreadyExists: series with given name already exists @@ -219,10 +224,13 @@ cdef class Database: raise ValueError('Invalid block size, pick larger page') if os.path.isdir(os.path.join(self.path, name)): raise AlreadyExists('Series already exists') + if gzip_level: + warnings.warn('Gzip support is experimental') cdef TimeSeries series = create_series(os.path.join(self.path, name), name, block_size, entries_per_chunk, page_size=page_size, - use_descriptor_based_access=use_descriptor_based_access) + use_descriptor_based_access=use_descriptor_based_access, + gzip_level=gzip_level) self.open_series[name] = series return series diff --git a/tempsdb/iterators.pxd b/tempsdb/iterators.pxd index b5befd8b773d2442ce57746cc0bdd5becd806118..c84a722e7cfae4ef0430fe219b255eea251c7a49 100644 --- a/tempsdb/iterators.pxd +++ b/tempsdb/iterators.pxd @@ -1,4 +1,4 @@ -from .chunks cimport Chunk +from .chunks.base cimport Chunk from .series cimport TimeSeries diff --git a/tempsdb/iterators.pyx b/tempsdb/iterators.pyx index e898745aafadc13521d105c6e732942d47386445..b7d5d11871006f7783e06131af0caad0dd30489e 100644 --- a/tempsdb/iterators.pyx +++ b/tempsdb/iterators.pyx @@ -1,7 +1,7 @@ import typing as tp import warnings -from .chunks cimport Chunk +from .chunks.base cimport Chunk from .series cimport TimeSeries import collections diff --git a/tempsdb/series.pxd b/tempsdb/series.pxd index 08976e03015f2b3f31907397a7e37839e34dc0b0..b91210b28b1a0ad548188012c7e1adc5130e9696 100644 --- a/tempsdb/series.pxd +++ b/tempsdb/series.pxd @@ -1,4 +1,4 @@ -from .chunks cimport Chunk +from .chunks.base cimport Chunk from .iterators cimport Iterator @@ -13,6 +13,7 @@ cdef class TimeSeries: readonly unsigned long long last_entry_synced readonly int block_size readonly unsigned long long last_entry_ts + readonly int gzip_level unsigned int page_size readonly dict metadata readonly bint descriptor_based_access @@ -28,7 +29,7 @@ cdef class TimeSeries: cpdef int close(self) except -1 cdef void incref_chunk(self, unsigned long long name) cdef void decref_chunk(self, unsigned long long name) - cdef Chunk open_chunk(self, unsigned long long name) + cdef Chunk open_chunk(self, unsigned long long name, bint is_direct, bint is_gzip) cdef int sync_metadata(self) except -1 cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1 cpdef int append(self, unsigned long long timestamp, bytes data) except -1 @@ -48,4 +49,5 @@ cdef class TimeSeries: cpdef TimeSeries create_series(str path, str name, unsigned int block_size, int max_entries_per_chunk, int page_size=*, - bint use_descriptor_based_access=*) + bint use_descriptor_based_access=*, + int gzip_level=*) diff --git a/tempsdb/series.pyx b/tempsdb/series.pyx index fced5eab089672bceaf8c1cedab003b28226c45b..c1966ffd8536853d00db88175e54f304bfe57891 100644 --- a/tempsdb/series.pyx +++ b/tempsdb/series.pyx @@ -5,7 +5,10 @@ import warnings from satella.json import write_json_to_file, read_json_from_file -from .chunks cimport create_chunk, Chunk +from .chunks.base cimport Chunk +from .chunks.normal cimport NormalChunk +from .chunks.direct cimport DirectChunk +from .chunks.maker cimport create_chunk from .exceptions import DoesNotExist, Corruption, InvalidState, AlreadyExists import os @@ -109,41 +112,53 @@ cdef class TimeSeries: str filename list files = os.listdir(self.path) unsigned long long last_chunk_name + bint is_direct + bint is_gzip try: metadata = read_json_from_file(os.path.join(self.path, METADATA_FILE_NAME)) self.block_size = metadata['block_size'] self.max_entries_per_chunk = metadata['max_entries_per_chunk'] self.last_entry_synced = metadata['last_entry_synced'] - self.page_size = metadata.get('page_size', DEFAULT_PAGE_SIZE) + self.page_size = metadata.get('page_size', 4096) self.metadata = metadata.get('metadata') + self.gzip_level = metadata.get('gzip_level', 0) + except ValueError: + raise Corruption('Corrupted series') except (OSError, ValueError) as e: raise Corruption('Corrupted series: %s' % (e, )) except KeyError: raise Corruption('Could not read metadata item') self.open_chunks = {} # tp.Dict[int, Chunk] + self.chunks = [] # type: tp.List[tp.Tuple[int, bool, bool]] # sorted by ASC + #: timestamp, is_direct, is_gzip if not len(files): raise Corruption('Empty directory!') elif len(files) == 1: # empty series self.last_chunk = None - self.chunks = [] self.last_entry_ts = 0 else: - self.chunks = [] # type: tp.List[int] # sorted by ASC for filename in files: if filename == METADATA_FILE_NAME: continue + is_gzip = filename.endswith('.gz') + if is_gzip: + filename = filename.replace('.gz', '') + is_direct = filename.endswith('.direct') + if is_direct: + filename = filename.replace('.direct', '') + is_direct |= is_gzip try: self.chunks.append(int(filename)) except ValueError: raise Corruption('Detected invalid file "%s"' % (filename, )) self.chunks.sort() - last_chunk_name = self.chunks[-1] - self.last_chunk = self.open_chunk(last_chunk_name) + last_chunk_name, is_direct, is_gzip = self.chunks[-1] + self.last_chunk = self.open_chunk(last_chunk_name, is_direct, is_gzip) self.last_entry_ts = self.last_chunk.max_ts cdef void decref_chunk(self, unsigned long long name): @@ -155,28 +170,41 @@ cdef class TimeSeries: else: self.refs_chunks[name] += 1 - cdef Chunk open_chunk(self, unsigned long long name): + cdef Chunk open_chunk(self, unsigned long long name, bint is_direct, bint is_gzip): """ Opens a provided chunk. Acquires a reference to the chunk. :param name: name of the chunk + :param is_direct: is this a direct chunk? + :param is_gzip: is this a gzipped chunk? :return: chunk :raises DoesNotExist: chunk not found :raises InvalidState: resource closed + :raises ValueError: chunk was gzipped but not direct """ if self.closed: raise InvalidState('Series is closed') - if name not in self.chunks: - raise DoesNotExist('Invalid chunk!') + if name not in (v[0] for v in self.chunks): + raise DoesNotExist('Invalid chunk') + if is_gzip and not is_direct: + raise ValueError('Chunk that is gzipped must be direct') cdef Chunk chunk with self.open_lock: if name not in self.open_chunks: - self.open_chunks[name] = chunk = Chunk(self, - os.path.join(self.path, str(name)), - self.page_size, - use_descriptor_access=self.descriptor_based_access) + if is_direct: + chunk = DirectChunk(self, + os.path.join(self.path, str(name)), + self.page_size, + use_descriptor_access=True, + gzip_compression_level=self.gzip_level if is_gzip else 0) + else: + chunk = NormalChunk(self, + os.path.join(self.path, str(name)), + self.page_size, + use_descriptor_access=self.descriptor_based_access) + self.open_chunks[name] = chunk else: chunk = self.open_chunks[name] self.incref_chunk(name) @@ -184,20 +212,36 @@ cdef class TimeSeries: cpdef int trim(self, unsigned long long timestamp) except -1: """ - Delete all entries earlier than timestamp that are closed. + Delete all entries earlier than timestamp. Note that this will drop entire chunks, so it may be possible that some entries will linger - on. - - This will affect only closed chunks. Chunks ready to delete that are closed after - this will not be deleted, as :meth:`~tempsdb.series.TimeSeries.trim` will need - to be called again. + on. This will not delete currently opened chunks! :param timestamp: timestamp to delete entries earlier than """ + if len(self.chunks) == 1: + return 0 cdef: unsigned long long chunk_to_delete int refs + try: + with self.open_lock: + while len(self.chunks) >= 2 and timestamp > self.chunks[1][0]: + chunk_to_delete = self.chunks[0][0] + if chunk_to_delete in self.open_chunks: + refs = self.refs_chunks.get(chunk_to_delete, 0) + if not refs: + self.open_chunks[chunk_to_delete].delete() + else: + # I would delete it, but it's open... + return 0 + else: + os.unlink(os.path.join(self.path, str(chunk_to_delete))) + del self.chunks[0] + else: + return 0 + except IndexError: + return 0 if len(self.chunks) > 1: try: with self.open_lock: @@ -251,13 +295,13 @@ cdef class TimeSeries: unsigned int mid while lo < hi: mid = (lo+hi)//2 - if self.chunks[mid] < timestamp: + if self.chunks[mid][0] < timestamp: lo = mid+1 else: hi = mid try: - if self.chunks[lo] == timestamp: + if self.chunks[lo][0] == timestamp: return lo else: return lo-1 @@ -278,8 +322,8 @@ cdef class TimeSeries: if start > stop: raise ValueError('start larger than stop') - if start < self.chunks[0]: - start = self.chunks[0] + if start < self.chunks[0][0]: + start = self.chunks[0][0] if stop > self.last_entry_ts: stop = self.last_entry_ts @@ -293,7 +337,8 @@ cdef class TimeSeries: Chunk chunk for chunk_index in range(ch_start, ch_stop+1): - chunks.append(self.open_chunk(self.chunks[chunk_index])) + ts, is_direct, is_gzip = self.chunks[chunk_index] + chunks.append(self.open_chunk(ts, is_direct, is_gzip)) return Iterator(self, start, stop, chunks) cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1: @@ -403,6 +448,7 @@ cdef class TimeSeries: raise ValueError('Data too long') data = data + b'\x00'*(self.block_size - data_len) self.append(timestamp, data) + return 0 cpdef int append(self, unsigned long long timestamp, bytes data) except -1: """ @@ -428,10 +474,12 @@ cdef class TimeSeries: self.decref_chunk(self.last_chunk.name()) self.last_chunk = create_chunk(self, os.path.join(self.path, str(timestamp)), timestamp, data, self.page_size, - descriptor_based_access=self.descriptor_based_access) + descriptor_based_access=self.descriptor_based_access, + use_direct_mode=bool(self.gzip_level), + gzip_compression_level=self.gzip_level) self.open_chunks[timestamp] = self.last_chunk self.incref_chunk(timestamp) - self.chunks.append(timestamp) + self.chunks.append((timestamp, bool(self.gzip_level), bool(self.gzip_level))) else: self.last_chunk.append(timestamp, data) self.last_entry_ts = timestamp @@ -472,17 +520,20 @@ cdef class TimeSeries: cpdef TimeSeries create_series(str path, str name, unsigned int block_size, int max_entries_per_chunk, int page_size=DEFAULT_PAGE_SIZE, - bint use_descriptor_based_access=False): + bint use_descriptor_based_access=False, + int gzip_level=0): if os.path.exists(path): raise AlreadyExists('This series already exists!') - + os.mkdir(path) cdef dict meta = { 'block_size': block_size, 'max_entries_per_chunk': max_entries_per_chunk, 'last_entry_synced': 0 - } + } if page_size != DEFAULT_PAGE_SIZE: meta['page_size'] = page_size - os.mkdir(path) - write_json_to_file(os.path.join(path, METADATA_FILE_NAME), meta) - return TimeSeries(path, name) + if gzip_level: + meta['gzip_level'] = gzip_level + write_json_to_file(os.path.join(path, 'metadata.txt'), meta) + return TimeSeries(path, name, + use_descriptor_based_access=use_descriptor_based_access) diff --git a/tempsdb/varlen.pxd b/tempsdb/varlen.pxd index 456364f16636035724a3c0fb83646bc5a2048031..e792533cadb3c89774abdc8e03a460d1686a86c4 100644 --- a/tempsdb/varlen.pxd +++ b/tempsdb/varlen.pxd @@ -1,4 +1,4 @@ -from .series cimport TimeSeries +from .series cimport TimeSeries, create_series cdef class VarlenSeries: @@ -15,6 +15,7 @@ cdef class VarlenSeries: int max_entries_per_chunk int current_maximum_length object mpm + int gzip_level cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1 cpdef int close(self) except -1 @@ -64,4 +65,5 @@ cdef class VarlenEntry: cpdef int close(self) except -1 cpdef VarlenSeries create_varlen_series(str path, str name, int size_struct, list length_profile, - int max_entries_per_chunk) + int max_entries_per_chunk, + int gzip_level=*) diff --git a/tempsdb/varlen.pyx b/tempsdb/varlen.pyx index 41b9355b225b4a6ad8b0824a0e42a4c6810b3e46..4e1b39386ea92bcebfc856b56c6c7a8a1814f395 100644 --- a/tempsdb/varlen.pyx +++ b/tempsdb/varlen.pyx @@ -4,10 +4,10 @@ import typing as tp import struct import warnings -from .chunks cimport Chunk +from .chunks.base cimport Chunk from .exceptions import Corruption, AlreadyExists, StillOpen from .iterators cimport Iterator -from .series cimport TimeSeries, create_series +from tempsdb.series cimport TimeSeries, create_series cdef class VarlenEntry: @@ -442,6 +442,7 @@ cdef class VarlenSeries: self.mpm = None self.name = name self.root_series = TimeSeries(os.path.join(path, 'root'), 'root') + self.gzip_level = self.root_series.gzip_level self.max_entries_per_chunk = self.root_series.max_entries_per_chunk try: self.size_field = self.root_series.metadata['size_field'] @@ -551,6 +552,8 @@ cdef class VarlenSeries: Updates :attr:`~tempsdb.varlen.VarlenSeries.current_maximum_length`. """ + from tempsdb.series import create_series + cdef: int new_name = len(self.series) int new_len = self.get_length_for(new_name) @@ -558,7 +561,10 @@ cdef class VarlenSeries: TimeSeries series = create_series(os.path.join(self.path, new_name_s), new_name_s, new_len, - self.max_entries_per_chunk) + self.max_entries_per_chunk, + gzip_level=self.gzip_level) + if self.mpm is not None: + series.register_memory_pressure_manager(self.mpm) self.series.append(series) self.current_maximum_length += new_len @@ -616,9 +622,11 @@ cdef class VarlenSeries: else: raise ValueError('How did this happen?') +from tempsdb.series cimport TimeSeries cpdef VarlenSeries create_varlen_series(str path, str name, int size_struct, list length_profile, - int max_entries_per_chunk): + int max_entries_per_chunk, + int gzip_level=0): """ Create a variable length series @@ -627,10 +635,13 @@ cpdef VarlenSeries create_varlen_series(str path, str name, int size_struct, lis :param size_struct: size of the length indicator. Must be one of 1, 2, 3 or 4. :param length_profile: series' length profile :param max_entries_per_chunk: maximum entries per a chunk file + :param gzip_level: level of gzip compression. Leave at 0 (default) to disable compression. :return: newly created VarlenSeries :raises AlreadyExists: directory exists at given path :raises ValueError: invalid length profile or max_entries_per_chunk or size_struct """ + from tempsdb.series import create_series + if os.path.exists(path): raise AlreadyExists('directory present at paht') if not length_profile or not max_entries_per_chunk: @@ -642,7 +653,8 @@ cpdef VarlenSeries create_varlen_series(str path, str name, int size_struct, lis cdef TimeSeries root_series = create_series(os.path.join(path, 'root'), 'root', size_struct+length_profile[0], - max_entries_per_chunk) + max_entries_per_chunk, + gzip_level=gzip_level) root_series.set_metadata({'size_field': size_struct, 'length_profile': length_profile}) root_series.close() diff --git a/tests/test_db.py b/tests/test_db.py index 4e24666d12c8a0ba23684576d56f4a196b37f7a4..91716d13aff99da11b48d2fc3f184eecaf63c714 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -1,8 +1,6 @@ import os import unittest -from tempsdb.chunks import Chunk - class TestDB(unittest.TestCase): def test_write_series(self): @@ -42,6 +40,25 @@ class TestDB(unittest.TestCase): self.do_verify_series(series, 0, 1800) series.close() + def test_create_series_gzip(self): + from tempsdb.series import create_series + + series = create_series('test.gz', 'test.gz', 1, 10, gzip_level=6) + start, ts = 127, 100 + for i in range(20): + series.append(ts, bytes(bytearray([start]))) + start -= 1 + ts += 100 + + self.do_verify_series(series, 0, 2000) + self.do_verify_series(series, 500, 2000) + self.do_verify_series(series, 1000, 2000) + self.do_verify_series(series, 1500, 2000) + self.do_verify_series(series, 0, 500) + self.do_verify_series(series, 0, 1200) + self.do_verify_series(series, 0, 1800) + series.close() + def do_verify_series(self, series, start, stop): it = series.iterate_range(start, stop) items = list(it) @@ -50,11 +67,13 @@ class TestDB(unittest.TestCase): self.assertLessEqual(items[-1][0], stop) def test_chunk_alternative(self): - from tempsdb.chunks import create_chunk + from tempsdb.chunks.normal import NormalChunk + from tempsdb.chunks.maker import create_chunk + data = [(0, b'ala '), (1, b'ma '), (4, b'kota')] chunk = create_chunk(None, 'chunk_a.db', 0, b'ala ', 4096) chunk.close() - chunk = Chunk(None, 'chunk_a.db', 4096, use_descriptor_access=True) + chunk = NormalChunk(None, 'chunk_a.db', 4096, use_descriptor_access=True) chunk.append(1, b'ma ') chunk.append(4, b'kota') self.assertEqual(chunk.min_ts, 0) @@ -84,7 +103,8 @@ class TestDB(unittest.TestCase): self.assertEqual(os.path.getsize('chunk.db'), 8192) def test_chunk(self): - from tempsdb.chunks import create_chunk + from tempsdb.chunks.maker import create_chunk + data = [(0, b'ala '), (1, b'ma '), (4, b'kota')] chunk = create_chunk(None, 'chunk.db', 0, b'ala ', 4096) chunk.append(1, b'ma ') diff --git a/tests/test_varlen.py b/tests/test_varlen.py index a88933a0bd1b815dd707d484a3f7c46ff53b2ba7..78f3608a24769cd98bbd36a219addfd37e1bd8fb 100644 --- a/tests/test_varlen.py +++ b/tests/test_varlen.py @@ -1,15 +1,11 @@ -import logging import os import unittest -from tempsdb.varlen import create_varlen_series -from tempsdb.database import Database - -logger = logging.getLogger(__name__) - class TestVarlen(unittest.TestCase): def test_varlen(self): + from tempsdb.varlen import create_varlen_series + series = [(0, b'test skarabeusza'), (10, b'test skarabeuszatest skarabeusza')] varlen = create_varlen_series('test_dir', 'test_dir', 2, [10, 20, 10], 20) @@ -23,3 +19,21 @@ class TestVarlen(unittest.TestCase): lst = [(ts, v.to_bytes()) for ts, v in it] it.close() self.assertEqual(lst, series) + + def test_varlen_gzip(self): + from tempsdb.varlen import create_varlen_series + + series = [(0, b'test skarabeusza'), (10, b'test skarabeuszatest skarabeusza')] + varlen = create_varlen_series('test_dir.gz', 'test_dir.gz', 2, [10, 20, 10], 20, + gzip_level=1) + + varlen.append(*series[0]) + self.assertEqual(len(os.listdir('test_dir.gz')), 2) + + varlen.append(*series[1]) + self.assertEqual(len(os.listdir('test_dir.gz')), 3) + + it = varlen.iterate_range(0, 20) + lst = [(ts, v.to_bytes()) for ts, v in it] + it.close() + self.assertEqual(lst, series) diff --git a/unittest.Dockerfile b/unittest.Dockerfile index bccaee128e06f1d891c83204193b8f90f442213e..e3aa11357ec9d96d01b3682aa1c796aa43876629 100644 --- a/unittest.Dockerfile +++ b/unittest.Dockerfile @@ -1,15 +1,14 @@ FROM python:3.8 -RUN pip install satella>=2.14.24 snakehouse nose2 wheel ujson coverage +RUN pip install satella snakehouse>=1.2.3 nose2 wheel ujson coverage ADD tempsdb /app/tempsdb ADD setup.py /app/setup.py ADD .coveragerc /app/.coveragerc ADD setup.cfg /app/setup.cfg WORKDIR /app - ENV CI=true RUN python setup.py build_ext --inplace ADD tests /app/tests -CMD ["nose2", "-vv"] +CMD ["coverage", "run", "-m", "nose2", "-vv"]