From 0b8f0816d4792532d54ba13fa753bdeba4a398ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Mon, 14 Dec 2020 18:22:47 +0100 Subject: [PATCH] add gzip to varlen --- README.md | 3 ++- docs/index.rst | 2 +- setup.py | 2 +- tempsdb/chunks/base.pyx | 5 ++-- tempsdb/chunks/direct.pxd | 1 - tempsdb/chunks/direct.pyx | 12 ++++----- tempsdb/chunks/gzip.pxd | 4 ++- tempsdb/chunks/gzip.pyx | 54 +++++++++++++++++++-------------------- tempsdb/database.pxd | 3 ++- tempsdb/database.pyx | 7 +++-- tempsdb/series.pyx | 2 ++ tempsdb/varlen.pxd | 4 ++- tempsdb/varlen.pyx | 14 +++++++--- tests/test_varlen.py | 26 ++++++++++++++----- 14 files changed, 86 insertions(+), 53 deletions(-) diff --git a/README.md b/README.md index 92f62b3..9c04d17 100644 --- a/README.md +++ b/README.md @@ -55,6 +55,7 @@ Then copy your resulting wheel and install it via pip on the target system. * if mmap is used, the kernel will be informed after loading the chunk that we don't need it's memory right now +* deleting a `TimeSeries` will now correctly return a zero * both `Database`, `TimeSeries` and `Chunk` destructor will close and emit a warning if the user forgot to * if page_size is default, it won't be written as part of the metadata @@ -63,7 +64,7 @@ Then copy your resulting wheel and install it via pip on the target system. * added `append_padded` * added metadata support, `metadata` property and `set_metadata` call * added variable length series -* added experimental support for gzipping (constant-length series only for now) +* added experimental support for gzipping time series * fixed a bug where getting a series that was already closed would TypeError * following additions to `Chunk`: * `get_slice_of_piece_at` diff --git a/docs/index.rst b/docs/index.rst index b289c9b..661603e 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -27,7 +27,7 @@ You can also manually select the descriptor-based implementation if you want to. .. versionadded:: 0.5 -Experimental support for gzipping time series is added. Note that reading from gzipped files is for now +Experimental support for gzipping time series is added. Note that reading from gzipped files might be slow, as every seek requires reading the file from the beginning. Warnings will be issued while using gzipped series to remind you of this fact. diff --git a/setup.py b/setup.py index c2db648..86f800f 100644 --- a/setup.py +++ b/setup.py @@ -28,7 +28,7 @@ if 'CI' in os.environ: setup(name='tempsdb', - version='0.5.0a8', + version='0.5.0a9', packages=['tempsdb'], install_requires=['satella>=2.14.21', 'ujson'], ext_modules=build([Multibuild('tempsdb', find_pyx('tempsdb')), ], diff --git a/tempsdb/chunks/base.pyx b/tempsdb/chunks/base.pyx index 2a69104..2a7538b 100644 --- a/tempsdb/chunks/base.pyx +++ b/tempsdb/chunks/base.pyx @@ -37,8 +37,10 @@ cdef class AlternativeMMap: def __init__(self, io_file: io.BinaryIO, file_lock_object): self.io = io_file + cdef ReadWriteGzipFile rw_gz if isinstance(io_file, ReadWriteGzipFile): - self.size = io_file.size() + rw_gz = io_file + self.size = rw_gz.size else: self.io.seek(0, 2) self.size = self.io.tell() @@ -330,7 +332,6 @@ cdef class Chunk: return 0 cpdef int extend(self) except -1: - raise NotImplementedError('Abstract method!') return 0 cpdef int delete(self) except -1: diff --git a/tempsdb/chunks/direct.pxd b/tempsdb/chunks/direct.pxd index f2ee126..ddee23a 100644 --- a/tempsdb/chunks/direct.pxd +++ b/tempsdb/chunks/direct.pxd @@ -6,6 +6,5 @@ cdef class DirectChunk(Chunk): int gzip cpdef object open_file(self, str path) - cpdef int extend(self) except -1 cpdef int after_init(self) except -1 cpdef int append(self, unsigned long long timestamp, bytes data) except -1 diff --git a/tempsdb/chunks/direct.pyx b/tempsdb/chunks/direct.pyx index 763804e..4f23a97 100644 --- a/tempsdb/chunks/direct.pyx +++ b/tempsdb/chunks/direct.pyx @@ -3,8 +3,8 @@ import typing as tp import struct import warnings -from tempsdb.chunks.gzip import ReadWriteGzipFile from ..series cimport TimeSeries +from .gzip cimport ReadWriteGzipFile from .base cimport Chunk @@ -64,12 +64,11 @@ cdef class DirectChunk(Chunk): else: return super().open_file(path) - cpdef int extend(self) except -1: - return 0 - cpdef int after_init(self) except -1: + cdef ReadWriteGzipFile rw_gz if isinstance(self.file, ReadWriteGzipFile): - self.file_size = self.file.size() + rw_gz = self.file + self.file_size = rw_gz.size else: self.file.seek(0, os.SEEK_END) self.file_size = self.file.tell() @@ -87,7 +86,8 @@ cdef class DirectChunk(Chunk): self.file_lock_object.acquire() try: self.file_size += self.block_size_plus - self.file.seek(self.pointer, 0) + if not isinstance(self.file, ReadWriteGzipFile): + self.file.seek(self.pointer, 0) b = STRUCT_Q.pack(timestamp) + data self.file.write(b) self.mmap.resize(self.file_size) diff --git a/tempsdb/chunks/gzip.pxd b/tempsdb/chunks/gzip.pxd index 4ab65ba..65d766e 100644 --- a/tempsdb/chunks/gzip.pxd +++ b/tempsdb/chunks/gzip.pxd @@ -5,6 +5,8 @@ cdef class ReadWriteGzipFile: int compress_level object lock unsigned long pointer + unsigned long size + bint needs_flush_before_read - cdef int reopen_read(self) except -1 + cpdef int flush(self) except -1 diff --git a/tempsdb/chunks/gzip.pyx b/tempsdb/chunks/gzip.pyx index 3d2f6eb..db18959 100644 --- a/tempsdb/chunks/gzip.pyx +++ b/tempsdb/chunks/gzip.pyx @@ -6,40 +6,35 @@ cdef class ReadWriteGzipFile: def __init__(self, path: str, compresslevel: int = gzip._COMPRESS_LEVEL_FAST): self.path = path self.compress_level = compresslevel - self.ro_file = gzip.GzipFile(path, 'rb', compresslevel=self.compress_level) + self.ro_file = gzip.GzipFile(path, 'rb') self.rw_file = gzip.GzipFile(path, 'ab', compresslevel=self.compress_level) self.pointer = 0 self.lock = threading.RLock() + self.needs_flush_before_read = False + cdef bytes b + b = self.read(128) + while b: + b = self.read(128) + self.size = self.pointer - def flush(self): + cpdef int flush(self) except -1: self.rw_file.flush() - self.reopen_read() - - def size(self): - cdef: - bytes b - with self.lock: - self.seek(0, 0) - b = self.read(128) - while b: - b = self.read(128) - return self.pointer + self.ro_file.close() + self.ro_file = gzip.GzipFile(self.path, 'rb') + self.pointer = 0 + self.needs_flush_before_read = False + return 0 def close(self): with self.lock: - self.ro_file.close() self.rw_file.close() - - cdef int reopen_read(self) except -1: - with self.lock: self.ro_file.close() - self.ro_file = gzip.GzipFile(self.path, 'rb', compresslevel=self.compress_level) - self.pointer = 0 - return 0 def read(self, int maxinplen): cdef bytes b with self.lock: + if self.needs_flush_before_read: + self.flush() b = self.ro_file.read(maxinplen) self.pointer += len(b) return b @@ -52,17 +47,22 @@ cdef class ReadWriteGzipFile: """ with self.lock: self.rw_file.write(value) - self.reopen_read() + self.size += len(value) + self.needs_flush_before_read = True def seek(self, unsigned long pos, int mode): + if self.needs_flush_before_read: + self.flush() if mode == 2: - self.pointer = self.size()-pos - self.ro_file.seek(self.pointer, 0) - else: + self.seek(self.size-pos, 0) + elif mode == 0: if pos != self.pointer: - self.ro_file.seek(pos, mode) - if mode == 0: - self.pointer = pos + self.ro_file.seek(pos, 0) + self.pointer = pos + elif mode == 1: + raise NotImplementedError('Unimplemented seek mode') + else: + raise ValueError('Invalid seek mode') def tell(self): return self.pointer diff --git a/tempsdb/database.pxd b/tempsdb/database.pxd index 9c6d167..c729f44 100644 --- a/tempsdb/database.pxd +++ b/tempsdb/database.pxd @@ -23,7 +23,8 @@ cdef class Database: int gzip_level=*) cpdef VarlenSeries create_varlen_series(self, str name, list length_profile, int size_struct, - unsigned long entries_per_chunk) + unsigned long entries_per_chunk, + int gzip_level=*) cpdef list get_open_series(self) cpdef list get_all_series(self) cpdef int close_all_open_series(self) except -1 diff --git a/tempsdb/database.pyx b/tempsdb/database.pyx index 8ed9d11..2d54421 100644 --- a/tempsdb/database.pyx +++ b/tempsdb/database.pyx @@ -149,7 +149,8 @@ cdef class Database: cpdef VarlenSeries create_varlen_series(self, str name, list length_profile, int size_struct, - unsigned long entries_per_chunk): + unsigned long entries_per_chunk, + int gzip_level=0): """ Create a new variable length series @@ -158,6 +159,7 @@ cdef class Database: :param size_struct: how many bytes will be used to store length? Valid entries are 1, 2 and 4 :param entries_per_chunk: entries per chunk file + :param gzip_level: level of gzip compression. Leave at 0 (default) to disable compression. :return: new variable length series :raises AlreadyExists: series with given name already exists """ @@ -166,7 +168,8 @@ cdef class Database: cdef VarlenSeries series = create_varlen_series(os.path.join(self.path, name), name, size_struct, length_profile, - entries_per_chunk) + entries_per_chunk, + gzip_level=gzip_level) self.open_varlen_series[name] = series return series diff --git a/tempsdb/series.pyx b/tempsdb/series.pyx index 26ee3fd..baaa9ee 100644 --- a/tempsdb/series.pyx +++ b/tempsdb/series.pyx @@ -443,6 +443,7 @@ cdef class TimeSeries: raise ValueError('Data too long') data = data + b'\x00'*(self.block_size - data_len) self.append(timestamp, data) + return 0 cpdef int append(self, unsigned long long timestamp, bytes data) except -1: """ @@ -490,6 +491,7 @@ cdef class TimeSeries: raise InvalidState('series is closed') self.close() shutil.rmtree(self.path) + return 0 cpdef unsigned long open_chunks_mmap_size(self): """ diff --git a/tempsdb/varlen.pxd b/tempsdb/varlen.pxd index 456364f..173e811 100644 --- a/tempsdb/varlen.pxd +++ b/tempsdb/varlen.pxd @@ -15,6 +15,7 @@ cdef class VarlenSeries: int max_entries_per_chunk int current_maximum_length object mpm + int gzip_level cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1 cpdef int close(self) except -1 @@ -64,4 +65,5 @@ cdef class VarlenEntry: cpdef int close(self) except -1 cpdef VarlenSeries create_varlen_series(str path, str name, int size_struct, list length_profile, - int max_entries_per_chunk) + int max_entries_per_chunk, + int gzip_level=*) diff --git a/tempsdb/varlen.pyx b/tempsdb/varlen.pyx index 19ccd24..4100c47 100644 --- a/tempsdb/varlen.pyx +++ b/tempsdb/varlen.pyx @@ -442,6 +442,7 @@ cdef class VarlenSeries: self.mpm = None self.name = name self.root_series = TimeSeries(os.path.join(path, 'root'), 'root') + self.gzip_level = self.root_series.gzip_level self.max_entries_per_chunk = self.root_series.max_entries_per_chunk try: self.size_field = self.root_series.metadata['size_field'] @@ -558,7 +559,8 @@ cdef class VarlenSeries: TimeSeries series = create_series(os.path.join(self.path, new_name_s), new_name_s, new_len, - self.max_entries_per_chunk) + self.max_entries_per_chunk, + gzip_level=self.gzip_level) self.series.append(series) self.current_maximum_length += new_len @@ -616,9 +618,11 @@ cdef class VarlenSeries: else: raise ValueError('How did this happen?') +from tempsdb.series cimport TimeSeries cpdef VarlenSeries create_varlen_series(str path, str name, int size_struct, list length_profile, - int max_entries_per_chunk): + int max_entries_per_chunk, + int gzip_level=0): """ Create a variable length series @@ -627,10 +631,13 @@ cpdef VarlenSeries create_varlen_series(str path, str name, int size_struct, lis :param size_struct: size of the length indicator. Must be one of 1, 2, 3 or 4. :param length_profile: series' length profile :param max_entries_per_chunk: maximum entries per a chunk file + :param gzip_level: level of gzip compression. Leave at 0 (default) to disable compression. :return: newly created VarlenSeries :raises AlreadyExists: directory exists at given path :raises ValueError: invalid length profile or max_entries_per_chunk or size_struct """ + from .series import create_series + if os.path.exists(path): raise AlreadyExists('directory present at paht') if not length_profile or not max_entries_per_chunk: @@ -642,7 +649,8 @@ cpdef VarlenSeries create_varlen_series(str path, str name, int size_struct, lis cdef TimeSeries root_series = create_series(os.path.join(path, 'root'), 'root', size_struct+length_profile[0], - max_entries_per_chunk) + max_entries_per_chunk, + gzip_level=gzip_level) root_series.set_metadata({'size_field': size_struct, 'length_profile': length_profile}) root_series.close() diff --git a/tests/test_varlen.py b/tests/test_varlen.py index a88933a..78f3608 100644 --- a/tests/test_varlen.py +++ b/tests/test_varlen.py @@ -1,15 +1,11 @@ -import logging import os import unittest -from tempsdb.varlen import create_varlen_series -from tempsdb.database import Database - -logger = logging.getLogger(__name__) - class TestVarlen(unittest.TestCase): def test_varlen(self): + from tempsdb.varlen import create_varlen_series + series = [(0, b'test skarabeusza'), (10, b'test skarabeuszatest skarabeusza')] varlen = create_varlen_series('test_dir', 'test_dir', 2, [10, 20, 10], 20) @@ -23,3 +19,21 @@ class TestVarlen(unittest.TestCase): lst = [(ts, v.to_bytes()) for ts, v in it] it.close() self.assertEqual(lst, series) + + def test_varlen_gzip(self): + from tempsdb.varlen import create_varlen_series + + series = [(0, b'test skarabeusza'), (10, b'test skarabeuszatest skarabeusza')] + varlen = create_varlen_series('test_dir.gz', 'test_dir.gz', 2, [10, 20, 10], 20, + gzip_level=1) + + varlen.append(*series[0]) + self.assertEqual(len(os.listdir('test_dir.gz')), 2) + + varlen.append(*series[1]) + self.assertEqual(len(os.listdir('test_dir.gz')), 3) + + it = varlen.iterate_range(0, 20) + lst = [(ts, v.to_bytes()) for ts, v in it] + it.close() + self.assertEqual(lst, series) -- GitLab