diff --git a/README.md b/README.md index 9bcc7aaa80dd8dfd89d7eb96adfcf097c645b027..c5041e0650c7966038a4c96dfe17509e88322b8e 100644 --- a/README.md +++ b/README.md @@ -91,6 +91,7 @@ Then copy your resulting wheel and install it via pip on the target system. * `Iterator`'s destructor will emit a warning if you forget to close it explicitly. * added option for transparent gzip compression Please note that gzip disables mmap! +* experimental gzip support for constant-length time series ## v0.4.2 diff --git a/docs/usage.rst b/docs/usage.rst index f06750d8a81de50aec5d1f2838c36ae67406f6af..268e062571083fcd9106a770901b923e7b549202 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -19,6 +19,14 @@ Start off by instantiating an object .. autoclass:: tempsdb.database.Database :members: +Note that if you specify a `gzip_level` argument in +:meth:`~tempsdb.database.Database.create_series`, GZIP compression will be used. + +Note that gzip-compressed series are very slow to read, since every seek needs +to start from scratch. This will be fixed in the future. + +Also, any gzip-opened series will raise a warning, since their support is experimental at best. + You can create new databases via .. autofunction:: tempsdb.database.create_database diff --git a/tempsdb/chunks/base.pxd b/tempsdb/chunks/base.pxd index e7c27dc3ea1f2a4e3669c0317b770a3df59096bc..37afb91368e9f904310e664e85aebccabd746f80 100644 --- a/tempsdb/chunks/base.pxd +++ b/tempsdb/chunks/base.pxd @@ -4,6 +4,7 @@ cdef class AlternativeMMap: cdef: object io, file_lock_object unsigned long size + unsigned long long pointer cdef class Chunk: diff --git a/tempsdb/chunks/base.pyx b/tempsdb/chunks/base.pyx index 7dc170e7399a90aa8b5bea73897490441d7dce64..1ef8201f510046670527566857d55d51a3c44fcc 100644 --- a/tempsdb/chunks/base.pyx +++ b/tempsdb/chunks/base.pyx @@ -1,10 +1,11 @@ -import bz2 import io import os import threading import typing as tp import struct import mmap + +from .gzip cimport ReadWriteGzipFile from ..exceptions import Corruption, InvalidState, AlreadyExists from ..series cimport TimeSeries @@ -20,7 +21,9 @@ DEF FOOTER_SIZE = 4 cdef class AlternativeMMap: """ - An alternative mmap implementation used when mmap cannot allocate due to memory issues + An alternative mmap implementation used when mmap cannot allocate due to memory issues. + + Note that opening gzip files is slow, as the script needs to iterate """ def flush(self): self.io.flush() @@ -33,17 +36,22 @@ cdef class AlternativeMMap: def __init__(self, io_file: io.BinaryIO, file_lock_object): self.io = io_file - self.io.seek(0, 2) - self.size = self.io.tell() + if isinstance(io_file, ReadWriteGzipFile): + self.size = io_file.size() + else: + self.io.seek(0, 2) + self.size = self.io.tell() self.file_lock_object = file_lock_object def __getitem__(self, item: slice) -> bytes: cdef: unsigned long start = item.start unsigned long stop = item.stop + bytes b with self.file_lock_object: - self.io.seek(start, 0) + if start != self.io.tell(): + self.io.seek(start, 0) return self.io.read(stop-start) def __setitem__(self, key: slice, value: bytes): @@ -51,7 +59,8 @@ cdef class AlternativeMMap: unsigned long start = key.start with self.file_lock_object: - self.io.seek(start, 0) + if not isinstance(self.io, ReadWriteGzipFile): + self.io.seek(0, 2) self.io.write(value) def close(self): @@ -232,8 +241,8 @@ cdef class Chunk: if self.file_lock_object: self.file_lock_object.acquire() try: + self.mmap.seek(self.file_size) self.file_size += self.page_size - self.file.seek(0, 2) ba = bytearray(self.page_size) ba[self.page_size-FOOTER_SIZE:self.page_size] = STRUCT_L.pack(self.entries) self.file.write(ba) diff --git a/tempsdb/chunks/direct.pyx b/tempsdb/chunks/direct.pyx index c30f1363447c30f002175f3e8f900bf8f5b5fd52..90c2d4cddfe9397a46635d1a26a565d4e641f2d1 100644 --- a/tempsdb/chunks/direct.pyx +++ b/tempsdb/chunks/direct.pyx @@ -1,10 +1,10 @@ -import bz2 import gzip import os import typing as tp import struct import warnings +from tempsdb.chunks.gzip import ReadWriteGzipFile from ..series cimport TimeSeries from .base cimport Chunk @@ -28,7 +28,7 @@ cdef class DirectChunk(Chunk): Note that you can only use gzip if you set use_descriptor_access to True :param gzip_compression_level: gzip compression level to use. 0 is default and means - gzip disabled. + gzip disabled. If given, a warning will be emitted as gzip support is still experimental. :raises ValueError: non-direct descriptor was requested and gzip was enabled """ def __init__(self, parent: tp.Optional[TimeSeries], path: str, page_size: int, @@ -43,6 +43,7 @@ cdef class DirectChunk(Chunk): if use_descriptor_access is None: use_descriptor_access = False if gzip_compression_level: + warnings.warn('Gzip support is experimental') use_descriptor_access = True self.gzip = gzip_compression_level @@ -60,7 +61,7 @@ cdef class DirectChunk(Chunk): cpdef object open_file(self, str path): if self.gzip: - return bz2.BZ2File(path, 'rb+', compresslevel=self.gzip) + return ReadWriteGzipFile(path, compresslevel=self.gzip) else: return super().open_file(path) diff --git a/tempsdb/chunks/gzip.pxd b/tempsdb/chunks/gzip.pxd new file mode 100644 index 0000000000000000000000000000000000000000..4ab65baded1b8f47e4963749008f1b32c3453806 --- /dev/null +++ b/tempsdb/chunks/gzip.pxd @@ -0,0 +1,10 @@ +cdef class ReadWriteGzipFile: + cdef: + str path + object ro_file, rw_file + int compress_level + object lock + unsigned long pointer + + cdef int reopen_read(self) except -1 + diff --git a/tempsdb/chunks/gzip.pyx b/tempsdb/chunks/gzip.pyx new file mode 100644 index 0000000000000000000000000000000000000000..3d2f6eb2eeae73be393ca4fc75e03730a7159270 --- /dev/null +++ b/tempsdb/chunks/gzip.pyx @@ -0,0 +1,68 @@ +import gzip +import threading + + +cdef class ReadWriteGzipFile: + def __init__(self, path: str, compresslevel: int = gzip._COMPRESS_LEVEL_FAST): + self.path = path + self.compress_level = compresslevel + self.ro_file = gzip.GzipFile(path, 'rb', compresslevel=self.compress_level) + self.rw_file = gzip.GzipFile(path, 'ab', compresslevel=self.compress_level) + self.pointer = 0 + self.lock = threading.RLock() + + def flush(self): + self.rw_file.flush() + self.reopen_read() + + def size(self): + cdef: + bytes b + with self.lock: + self.seek(0, 0) + b = self.read(128) + while b: + b = self.read(128) + return self.pointer + + def close(self): + with self.lock: + self.ro_file.close() + self.rw_file.close() + + cdef int reopen_read(self) except -1: + with self.lock: + self.ro_file.close() + self.ro_file = gzip.GzipFile(self.path, 'rb', compresslevel=self.compress_level) + self.pointer = 0 + return 0 + + def read(self, int maxinplen): + cdef bytes b + with self.lock: + b = self.ro_file.read(maxinplen) + self.pointer += len(b) + return b + + def write(self, bytes value): + """ + Always an append, despite what + :meth:`~tempsdb.chunks.gzip.ReadWriteGzipFile.tell` and + :meth:`~tempsdb.chunks.gzip.ReadWriteGzipFile.seek` may say. + """ + with self.lock: + self.rw_file.write(value) + self.reopen_read() + + def seek(self, unsigned long pos, int mode): + if mode == 2: + self.pointer = self.size()-pos + self.ro_file.seek(self.pointer, 0) + else: + if pos != self.pointer: + self.ro_file.seek(pos, mode) + if mode == 0: + self.pointer = pos + + def tell(self): + return self.pointer diff --git a/tempsdb/database.pyx b/tempsdb/database.pyx index bc2c889c5a2aecd6a97947a382686ae86ce2a446..8ed9d113d0d71c94b5abc42be3630c3da2ad3b4b 100644 --- a/tempsdb/database.pyx +++ b/tempsdb/database.pyx @@ -221,6 +221,8 @@ cdef class Database: raise ValueError('Invalid block size, pick larger page') if os.path.isdir(os.path.join(self.path, name)): raise AlreadyExists('Series already exists') + if gzip_level: + warnings.warn('Gzip support is experimental') cdef TimeSeries series = create_series(os.path.join(self.path, name), name, block_size, entries_per_chunk, page_size=page_size,