diff --git a/README.md b/README.md index 92f62b3d994ef15a82e03b11a7cb9c698194ddd7..9c04d17a98b79f63ff6577ebcbb465c588e6fb2e 100644 --- a/README.md +++ b/README.md @@ -55,6 +55,7 @@ Then copy your resulting wheel and install it via pip on the target system. * if mmap is used, the kernel will be informed after loading the chunk that we don't need it's memory right now +* deleting a `TimeSeries` will now correctly return a zero * both `Database`, `TimeSeries` and `Chunk` destructor will close and emit a warning if the user forgot to * if page_size is default, it won't be written as part of the metadata @@ -63,7 +64,7 @@ Then copy your resulting wheel and install it via pip on the target system. * added `append_padded` * added metadata support, `metadata` property and `set_metadata` call * added variable length series -* added experimental support for gzipping (constant-length series only for now) +* added experimental support for gzipping time series * fixed a bug where getting a series that was already closed would TypeError * following additions to `Chunk`: * `get_slice_of_piece_at` diff --git a/docs/index.rst b/docs/index.rst index b289c9b21dc17c27124c5800c81354016975c65e..661603e6521d106257c0113bcfbd0cbe87b92c03 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -27,7 +27,7 @@ You can also manually select the descriptor-based implementation if you want to. .. versionadded:: 0.5 -Experimental support for gzipping time series is added. Note that reading from gzipped files is for now +Experimental support for gzipping time series is added. Note that reading from gzipped files might be slow, as every seek requires reading the file from the beginning. Warnings will be issued while using gzipped series to remind you of this fact. diff --git a/setup.py b/setup.py index c2db648f5bb48b2cfa4fd2dfd664220957ad388d..86f800f9e42743cdbd7b111da3e265de9c98b613 100644 --- a/setup.py +++ b/setup.py @@ -28,7 +28,7 @@ if 'CI' in os.environ: setup(name='tempsdb', - version='0.5.0a8', + version='0.5.0a9', packages=['tempsdb'], install_requires=['satella>=2.14.21', 'ujson'], ext_modules=build([Multibuild('tempsdb', find_pyx('tempsdb')), ], diff --git a/tempsdb/chunks/base.pyx b/tempsdb/chunks/base.pyx index 2a69104e8c04d5e904ac8f013da3c808cdb452cd..2a7538b45286ea9eaf2934f184e1a091323d5007 100644 --- a/tempsdb/chunks/base.pyx +++ b/tempsdb/chunks/base.pyx @@ -37,8 +37,10 @@ cdef class AlternativeMMap: def __init__(self, io_file: io.BinaryIO, file_lock_object): self.io = io_file + cdef ReadWriteGzipFile rw_gz if isinstance(io_file, ReadWriteGzipFile): - self.size = io_file.size() + rw_gz = io_file + self.size = rw_gz.size else: self.io.seek(0, 2) self.size = self.io.tell() @@ -330,7 +332,6 @@ cdef class Chunk: return 0 cpdef int extend(self) except -1: - raise NotImplementedError('Abstract method!') return 0 cpdef int delete(self) except -1: diff --git a/tempsdb/chunks/direct.pxd b/tempsdb/chunks/direct.pxd index f2ee1265e45fcccb21953d42fd973b93f671feec..ddee23ac118c52441b2959382f25393efe8a02d3 100644 --- a/tempsdb/chunks/direct.pxd +++ b/tempsdb/chunks/direct.pxd @@ -6,6 +6,5 @@ cdef class DirectChunk(Chunk): int gzip cpdef object open_file(self, str path) - cpdef int extend(self) except -1 cpdef int after_init(self) except -1 cpdef int append(self, unsigned long long timestamp, bytes data) except -1 diff --git a/tempsdb/chunks/direct.pyx b/tempsdb/chunks/direct.pyx index 763804ecc25c017f0a61e259136ce12f4ce0d1d1..4f23a977fa01255a88829c231825a8aa44c140be 100644 --- a/tempsdb/chunks/direct.pyx +++ b/tempsdb/chunks/direct.pyx @@ -3,8 +3,8 @@ import typing as tp import struct import warnings -from tempsdb.chunks.gzip import ReadWriteGzipFile from ..series cimport TimeSeries +from .gzip cimport ReadWriteGzipFile from .base cimport Chunk @@ -64,12 +64,11 @@ cdef class DirectChunk(Chunk): else: return super().open_file(path) - cpdef int extend(self) except -1: - return 0 - cpdef int after_init(self) except -1: + cdef ReadWriteGzipFile rw_gz if isinstance(self.file, ReadWriteGzipFile): - self.file_size = self.file.size() + rw_gz = self.file + self.file_size = rw_gz.size else: self.file.seek(0, os.SEEK_END) self.file_size = self.file.tell() @@ -87,7 +86,8 @@ cdef class DirectChunk(Chunk): self.file_lock_object.acquire() try: self.file_size += self.block_size_plus - self.file.seek(self.pointer, 0) + if not isinstance(self.file, ReadWriteGzipFile): + self.file.seek(self.pointer, 0) b = STRUCT_Q.pack(timestamp) + data self.file.write(b) self.mmap.resize(self.file_size) diff --git a/tempsdb/chunks/gzip.pxd b/tempsdb/chunks/gzip.pxd index 4ab65baded1b8f47e4963749008f1b32c3453806..65d766e7f6e8e9e2a43c4bf70cbb83aa7ff9ea57 100644 --- a/tempsdb/chunks/gzip.pxd +++ b/tempsdb/chunks/gzip.pxd @@ -5,6 +5,8 @@ cdef class ReadWriteGzipFile: int compress_level object lock unsigned long pointer + unsigned long size + bint needs_flush_before_read - cdef int reopen_read(self) except -1 + cpdef int flush(self) except -1 diff --git a/tempsdb/chunks/gzip.pyx b/tempsdb/chunks/gzip.pyx index 3d2f6eb2eeae73be393ca4fc75e03730a7159270..db1895925205f253c75b377c30e135da6e160a4b 100644 --- a/tempsdb/chunks/gzip.pyx +++ b/tempsdb/chunks/gzip.pyx @@ -6,40 +6,35 @@ cdef class ReadWriteGzipFile: def __init__(self, path: str, compresslevel: int = gzip._COMPRESS_LEVEL_FAST): self.path = path self.compress_level = compresslevel - self.ro_file = gzip.GzipFile(path, 'rb', compresslevel=self.compress_level) + self.ro_file = gzip.GzipFile(path, 'rb') self.rw_file = gzip.GzipFile(path, 'ab', compresslevel=self.compress_level) self.pointer = 0 self.lock = threading.RLock() + self.needs_flush_before_read = False + cdef bytes b + b = self.read(128) + while b: + b = self.read(128) + self.size = self.pointer - def flush(self): + cpdef int flush(self) except -1: self.rw_file.flush() - self.reopen_read() - - def size(self): - cdef: - bytes b - with self.lock: - self.seek(0, 0) - b = self.read(128) - while b: - b = self.read(128) - return self.pointer + self.ro_file.close() + self.ro_file = gzip.GzipFile(self.path, 'rb') + self.pointer = 0 + self.needs_flush_before_read = False + return 0 def close(self): with self.lock: - self.ro_file.close() self.rw_file.close() - - cdef int reopen_read(self) except -1: - with self.lock: self.ro_file.close() - self.ro_file = gzip.GzipFile(self.path, 'rb', compresslevel=self.compress_level) - self.pointer = 0 - return 0 def read(self, int maxinplen): cdef bytes b with self.lock: + if self.needs_flush_before_read: + self.flush() b = self.ro_file.read(maxinplen) self.pointer += len(b) return b @@ -52,17 +47,22 @@ cdef class ReadWriteGzipFile: """ with self.lock: self.rw_file.write(value) - self.reopen_read() + self.size += len(value) + self.needs_flush_before_read = True def seek(self, unsigned long pos, int mode): + if self.needs_flush_before_read: + self.flush() if mode == 2: - self.pointer = self.size()-pos - self.ro_file.seek(self.pointer, 0) - else: + self.seek(self.size-pos, 0) + elif mode == 0: if pos != self.pointer: - self.ro_file.seek(pos, mode) - if mode == 0: - self.pointer = pos + self.ro_file.seek(pos, 0) + self.pointer = pos + elif mode == 1: + raise NotImplementedError('Unimplemented seek mode') + else: + raise ValueError('Invalid seek mode') def tell(self): return self.pointer diff --git a/tempsdb/database.pxd b/tempsdb/database.pxd index 9c6d167fa182b94f19743ce1f9deb05cfdd65fc4..c729f440103e73068026473157966243e6cce560 100644 --- a/tempsdb/database.pxd +++ b/tempsdb/database.pxd @@ -23,7 +23,8 @@ cdef class Database: int gzip_level=*) cpdef VarlenSeries create_varlen_series(self, str name, list length_profile, int size_struct, - unsigned long entries_per_chunk) + unsigned long entries_per_chunk, + int gzip_level=*) cpdef list get_open_series(self) cpdef list get_all_series(self) cpdef int close_all_open_series(self) except -1 diff --git a/tempsdb/database.pyx b/tempsdb/database.pyx index 8ed9d113d0d71c94b5abc42be3630c3da2ad3b4b..2d54421135499288f5e527c468b0b7ccf245acdc 100644 --- a/tempsdb/database.pyx +++ b/tempsdb/database.pyx @@ -149,7 +149,8 @@ cdef class Database: cpdef VarlenSeries create_varlen_series(self, str name, list length_profile, int size_struct, - unsigned long entries_per_chunk): + unsigned long entries_per_chunk, + int gzip_level=0): """ Create a new variable length series @@ -158,6 +159,7 @@ cdef class Database: :param size_struct: how many bytes will be used to store length? Valid entries are 1, 2 and 4 :param entries_per_chunk: entries per chunk file + :param gzip_level: level of gzip compression. Leave at 0 (default) to disable compression. :return: new variable length series :raises AlreadyExists: series with given name already exists """ @@ -166,7 +168,8 @@ cdef class Database: cdef VarlenSeries series = create_varlen_series(os.path.join(self.path, name), name, size_struct, length_profile, - entries_per_chunk) + entries_per_chunk, + gzip_level=gzip_level) self.open_varlen_series[name] = series return series diff --git a/tempsdb/series.pyx b/tempsdb/series.pyx index 26ee3fdb8bc2457d40b466bed6c572df0e6910fc..baaa9eee3b2f0832de02d6ecf4cc8b84ff4be4ec 100644 --- a/tempsdb/series.pyx +++ b/tempsdb/series.pyx @@ -443,6 +443,7 @@ cdef class TimeSeries: raise ValueError('Data too long') data = data + b'\x00'*(self.block_size - data_len) self.append(timestamp, data) + return 0 cpdef int append(self, unsigned long long timestamp, bytes data) except -1: """ @@ -490,6 +491,7 @@ cdef class TimeSeries: raise InvalidState('series is closed') self.close() shutil.rmtree(self.path) + return 0 cpdef unsigned long open_chunks_mmap_size(self): """ diff --git a/tempsdb/varlen.pxd b/tempsdb/varlen.pxd index 456364f16636035724a3c0fb83646bc5a2048031..173e81179334234a7aed239de34f359530d623da 100644 --- a/tempsdb/varlen.pxd +++ b/tempsdb/varlen.pxd @@ -15,6 +15,7 @@ cdef class VarlenSeries: int max_entries_per_chunk int current_maximum_length object mpm + int gzip_level cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1 cpdef int close(self) except -1 @@ -64,4 +65,5 @@ cdef class VarlenEntry: cpdef int close(self) except -1 cpdef VarlenSeries create_varlen_series(str path, str name, int size_struct, list length_profile, - int max_entries_per_chunk) + int max_entries_per_chunk, + int gzip_level=*) diff --git a/tempsdb/varlen.pyx b/tempsdb/varlen.pyx index 19ccd2424385c0ff2e9cfb3c3b927f1f37440699..4100c47974d0aa810a921a8938edf28b025c0d4c 100644 --- a/tempsdb/varlen.pyx +++ b/tempsdb/varlen.pyx @@ -442,6 +442,7 @@ cdef class VarlenSeries: self.mpm = None self.name = name self.root_series = TimeSeries(os.path.join(path, 'root'), 'root') + self.gzip_level = self.root_series.gzip_level self.max_entries_per_chunk = self.root_series.max_entries_per_chunk try: self.size_field = self.root_series.metadata['size_field'] @@ -558,7 +559,8 @@ cdef class VarlenSeries: TimeSeries series = create_series(os.path.join(self.path, new_name_s), new_name_s, new_len, - self.max_entries_per_chunk) + self.max_entries_per_chunk, + gzip_level=self.gzip_level) self.series.append(series) self.current_maximum_length += new_len @@ -616,9 +618,11 @@ cdef class VarlenSeries: else: raise ValueError('How did this happen?') +from tempsdb.series cimport TimeSeries cpdef VarlenSeries create_varlen_series(str path, str name, int size_struct, list length_profile, - int max_entries_per_chunk): + int max_entries_per_chunk, + int gzip_level=0): """ Create a variable length series @@ -627,10 +631,13 @@ cpdef VarlenSeries create_varlen_series(str path, str name, int size_struct, lis :param size_struct: size of the length indicator. Must be one of 1, 2, 3 or 4. :param length_profile: series' length profile :param max_entries_per_chunk: maximum entries per a chunk file + :param gzip_level: level of gzip compression. Leave at 0 (default) to disable compression. :return: newly created VarlenSeries :raises AlreadyExists: directory exists at given path :raises ValueError: invalid length profile or max_entries_per_chunk or size_struct """ + from .series import create_series + if os.path.exists(path): raise AlreadyExists('directory present at paht') if not length_profile or not max_entries_per_chunk: @@ -642,7 +649,8 @@ cpdef VarlenSeries create_varlen_series(str path, str name, int size_struct, lis cdef TimeSeries root_series = create_series(os.path.join(path, 'root'), 'root', size_struct+length_profile[0], - max_entries_per_chunk) + max_entries_per_chunk, + gzip_level=gzip_level) root_series.set_metadata({'size_field': size_struct, 'length_profile': length_profile}) root_series.close() diff --git a/tests/test_varlen.py b/tests/test_varlen.py index a88933a0bd1b815dd707d484a3f7c46ff53b2ba7..78f3608a24769cd98bbd36a219addfd37e1bd8fb 100644 --- a/tests/test_varlen.py +++ b/tests/test_varlen.py @@ -1,15 +1,11 @@ -import logging import os import unittest -from tempsdb.varlen import create_varlen_series -from tempsdb.database import Database - -logger = logging.getLogger(__name__) - class TestVarlen(unittest.TestCase): def test_varlen(self): + from tempsdb.varlen import create_varlen_series + series = [(0, b'test skarabeusza'), (10, b'test skarabeuszatest skarabeusza')] varlen = create_varlen_series('test_dir', 'test_dir', 2, [10, 20, 10], 20) @@ -23,3 +19,21 @@ class TestVarlen(unittest.TestCase): lst = [(ts, v.to_bytes()) for ts, v in it] it.close() self.assertEqual(lst, series) + + def test_varlen_gzip(self): + from tempsdb.varlen import create_varlen_series + + series = [(0, b'test skarabeusza'), (10, b'test skarabeuszatest skarabeusza')] + varlen = create_varlen_series('test_dir.gz', 'test_dir.gz', 2, [10, 20, 10], 20, + gzip_level=1) + + varlen.append(*series[0]) + self.assertEqual(len(os.listdir('test_dir.gz')), 2) + + varlen.append(*series[1]) + self.assertEqual(len(os.listdir('test_dir.gz')), 3) + + it = varlen.iterate_range(0, 20) + lst = [(ts, v.to_bytes()) for ts, v in it] + it.close() + self.assertEqual(lst, series)