From 8cd0fb3001159d631f7eb5e3bd493acaadef516c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Sun, 13 Dec 2020 20:29:01 +0100 Subject: [PATCH] advance gzip somewhat --- README.md | 1 + docs/usage.rst | 8 +++++ tempsdb/chunks/base.pxd | 1 + tempsdb/chunks/base.pyx | 23 +++++++++---- tempsdb/chunks/direct.pyx | 7 ++-- tempsdb/chunks/gzip.pxd | 10 ++++++ tempsdb/chunks/gzip.pyx | 68 +++++++++++++++++++++++++++++++++++++++ tempsdb/database.pyx | 2 ++ 8 files changed, 110 insertions(+), 10 deletions(-) create mode 100644 tempsdb/chunks/gzip.pxd create mode 100644 tempsdb/chunks/gzip.pyx diff --git a/README.md b/README.md index 9bcc7aa..c5041e0 100644 --- a/README.md +++ b/README.md @@ -91,6 +91,7 @@ Then copy your resulting wheel and install it via pip on the target system. * `Iterator`'s destructor will emit a warning if you forget to close it explicitly. * added option for transparent gzip compression Please note that gzip disables mmap! +* experimental gzip support for constant-length time series ## v0.4.2 diff --git a/docs/usage.rst b/docs/usage.rst index f06750d..268e062 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -19,6 +19,14 @@ Start off by instantiating an object .. autoclass:: tempsdb.database.Database :members: +Note that if you specify a `gzip_level` argument in +:meth:`~tempsdb.database.Database.create_series`, GZIP compression will be used. + +Note that gzip-compressed series are very slow to read, since every seek needs +to start from scratch. This will be fixed in the future. + +Also, any gzip-opened series will raise a warning, since their support is experimental at best. + You can create new databases via .. autofunction:: tempsdb.database.create_database diff --git a/tempsdb/chunks/base.pxd b/tempsdb/chunks/base.pxd index e7c27dc..37afb91 100644 --- a/tempsdb/chunks/base.pxd +++ b/tempsdb/chunks/base.pxd @@ -4,6 +4,7 @@ cdef class AlternativeMMap: cdef: object io, file_lock_object unsigned long size + unsigned long long pointer cdef class Chunk: diff --git a/tempsdb/chunks/base.pyx b/tempsdb/chunks/base.pyx index 7dc170e..1ef8201 100644 --- a/tempsdb/chunks/base.pyx +++ b/tempsdb/chunks/base.pyx @@ -1,10 +1,11 @@ -import bz2 import io import os import threading import typing as tp import struct import mmap + +from .gzip cimport ReadWriteGzipFile from ..exceptions import Corruption, InvalidState, AlreadyExists from ..series cimport TimeSeries @@ -20,7 +21,9 @@ DEF FOOTER_SIZE = 4 cdef class AlternativeMMap: """ - An alternative mmap implementation used when mmap cannot allocate due to memory issues + An alternative mmap implementation used when mmap cannot allocate due to memory issues. + + Note that opening gzip files is slow, as the script needs to iterate """ def flush(self): self.io.flush() @@ -33,17 +36,22 @@ cdef class AlternativeMMap: def __init__(self, io_file: io.BinaryIO, file_lock_object): self.io = io_file - self.io.seek(0, 2) - self.size = self.io.tell() + if isinstance(io_file, ReadWriteGzipFile): + self.size = io_file.size() + else: + self.io.seek(0, 2) + self.size = self.io.tell() self.file_lock_object = file_lock_object def __getitem__(self, item: slice) -> bytes: cdef: unsigned long start = item.start unsigned long stop = item.stop + bytes b with self.file_lock_object: - self.io.seek(start, 0) + if start != self.io.tell(): + self.io.seek(start, 0) return self.io.read(stop-start) def __setitem__(self, key: slice, value: bytes): @@ -51,7 +59,8 @@ cdef class AlternativeMMap: unsigned long start = key.start with self.file_lock_object: - self.io.seek(start, 0) + if not isinstance(self.io, ReadWriteGzipFile): + self.io.seek(0, 2) self.io.write(value) def close(self): @@ -232,8 +241,8 @@ cdef class Chunk: if self.file_lock_object: self.file_lock_object.acquire() try: + self.mmap.seek(self.file_size) self.file_size += self.page_size - self.file.seek(0, 2) ba = bytearray(self.page_size) ba[self.page_size-FOOTER_SIZE:self.page_size] = STRUCT_L.pack(self.entries) self.file.write(ba) diff --git a/tempsdb/chunks/direct.pyx b/tempsdb/chunks/direct.pyx index c30f136..90c2d4c 100644 --- a/tempsdb/chunks/direct.pyx +++ b/tempsdb/chunks/direct.pyx @@ -1,10 +1,10 @@ -import bz2 import gzip import os import typing as tp import struct import warnings +from tempsdb.chunks.gzip import ReadWriteGzipFile from ..series cimport TimeSeries from .base cimport Chunk @@ -28,7 +28,7 @@ cdef class DirectChunk(Chunk): Note that you can only use gzip if you set use_descriptor_access to True :param gzip_compression_level: gzip compression level to use. 0 is default and means - gzip disabled. + gzip disabled. If given, a warning will be emitted as gzip support is still experimental. :raises ValueError: non-direct descriptor was requested and gzip was enabled """ def __init__(self, parent: tp.Optional[TimeSeries], path: str, page_size: int, @@ -43,6 +43,7 @@ cdef class DirectChunk(Chunk): if use_descriptor_access is None: use_descriptor_access = False if gzip_compression_level: + warnings.warn('Gzip support is experimental') use_descriptor_access = True self.gzip = gzip_compression_level @@ -60,7 +61,7 @@ cdef class DirectChunk(Chunk): cpdef object open_file(self, str path): if self.gzip: - return bz2.BZ2File(path, 'rb+', compresslevel=self.gzip) + return ReadWriteGzipFile(path, compresslevel=self.gzip) else: return super().open_file(path) diff --git a/tempsdb/chunks/gzip.pxd b/tempsdb/chunks/gzip.pxd new file mode 100644 index 0000000..4ab65ba --- /dev/null +++ b/tempsdb/chunks/gzip.pxd @@ -0,0 +1,10 @@ +cdef class ReadWriteGzipFile: + cdef: + str path + object ro_file, rw_file + int compress_level + object lock + unsigned long pointer + + cdef int reopen_read(self) except -1 + diff --git a/tempsdb/chunks/gzip.pyx b/tempsdb/chunks/gzip.pyx new file mode 100644 index 0000000..3d2f6eb --- /dev/null +++ b/tempsdb/chunks/gzip.pyx @@ -0,0 +1,68 @@ +import gzip +import threading + + +cdef class ReadWriteGzipFile: + def __init__(self, path: str, compresslevel: int = gzip._COMPRESS_LEVEL_FAST): + self.path = path + self.compress_level = compresslevel + self.ro_file = gzip.GzipFile(path, 'rb', compresslevel=self.compress_level) + self.rw_file = gzip.GzipFile(path, 'ab', compresslevel=self.compress_level) + self.pointer = 0 + self.lock = threading.RLock() + + def flush(self): + self.rw_file.flush() + self.reopen_read() + + def size(self): + cdef: + bytes b + with self.lock: + self.seek(0, 0) + b = self.read(128) + while b: + b = self.read(128) + return self.pointer + + def close(self): + with self.lock: + self.ro_file.close() + self.rw_file.close() + + cdef int reopen_read(self) except -1: + with self.lock: + self.ro_file.close() + self.ro_file = gzip.GzipFile(self.path, 'rb', compresslevel=self.compress_level) + self.pointer = 0 + return 0 + + def read(self, int maxinplen): + cdef bytes b + with self.lock: + b = self.ro_file.read(maxinplen) + self.pointer += len(b) + return b + + def write(self, bytes value): + """ + Always an append, despite what + :meth:`~tempsdb.chunks.gzip.ReadWriteGzipFile.tell` and + :meth:`~tempsdb.chunks.gzip.ReadWriteGzipFile.seek` may say. + """ + with self.lock: + self.rw_file.write(value) + self.reopen_read() + + def seek(self, unsigned long pos, int mode): + if mode == 2: + self.pointer = self.size()-pos + self.ro_file.seek(self.pointer, 0) + else: + if pos != self.pointer: + self.ro_file.seek(pos, mode) + if mode == 0: + self.pointer = pos + + def tell(self): + return self.pointer diff --git a/tempsdb/database.pyx b/tempsdb/database.pyx index bc2c889..8ed9d11 100644 --- a/tempsdb/database.pyx +++ b/tempsdb/database.pyx @@ -221,6 +221,8 @@ cdef class Database: raise ValueError('Invalid block size, pick larger page') if os.path.isdir(os.path.join(self.path, name)): raise AlreadyExists('Series already exists') + if gzip_level: + warnings.warn('Gzip support is experimental') cdef TimeSeries series = create_series(os.path.join(self.path, name), name, block_size, entries_per_chunk, page_size=page_size, -- GitLab