From d2319dc30234b338a45b8f606e6b4ec48c5df8a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Tue, 8 Dec 2020 19:34:56 +0100 Subject: [PATCH] first take at the gzip --- README.md | 6 + setup.py | 2 +- tempsdb/chunks/base.pxd | 22 ++- tempsdb/chunks/base.pyx | 349 ++++++++++++++++++++++++++++++++++++++ tempsdb/chunks/direct.pxd | 11 ++ tempsdb/chunks/direct.pyx | 77 +++++++++ tempsdb/chunks/maker.pxd | 7 + tempsdb/chunks/maker.pyx | 79 +++++++++ tempsdb/database.pxd | 1 + tempsdb/database.pyx | 7 +- tempsdb/iterators.pxd | 2 +- tempsdb/iterators.pyx | 2 +- tempsdb/series.pxd | 6 +- tempsdb/series.pyx | 97 ++++++++--- tests/test_db.py | 19 +++ 15 files changed, 649 insertions(+), 38 deletions(-) create mode 100644 tempsdb/chunks/direct.pxd create mode 100644 tempsdb/chunks/direct.pyx create mode 100644 tempsdb/chunks/maker.pxd create mode 100644 tempsdb/chunks/maker.pyx diff --git a/README.md b/README.md index f99038a..0523f9a 100644 --- a/README.md +++ b/README.md @@ -80,10 +80,16 @@ Then copy your resulting wheel and install it via pip on the target system. ## v0.4.3 +* slightly reduced `metadata.txt` by defaulting `page_size` +* moved `Chunk` +* added support for gzipping +* added `DirectChunk` * iterating and writing at the same time from multiple threads made safe * added `TimeSeries.disable_mmap` * `Iterator`'s destructor will emit a warning if you forget to close it explicitly. +* added option for transparent gzip compression + Please note that gzip disables mmap! ## v0.4.2 diff --git a/setup.py b/setup.py index 4ec7056..bf588d2 100644 --- a/setup.py +++ b/setup.py @@ -28,7 +28,7 @@ if 'CI' in os.environ: setup(name='tempsdb', - version='0.5a3', + version='0.5a4', packages=['tempsdb'], install_requires=['satella', 'ujson'], ext_modules=build([Multibuild('tempsdb', find_pyx('tempsdb')), ], diff --git a/tempsdb/chunks/base.pxd b/tempsdb/chunks/base.pxd index 6c3ded3..e7c27dc 100644 --- a/tempsdb/chunks/base.pxd +++ b/tempsdb/chunks/base.pxd @@ -1,22 +1,35 @@ from ..series cimport TimeSeries -cdef class BaseChunk: +cdef class AlternativeMMap: + cdef: + object io, file_lock_object + unsigned long size + + +cdef class Chunk: cdef: TimeSeries parent readonly str path readonly unsigned long long min_ts readonly unsigned long long max_ts + unsigned int block_size_plus # block size plus timestamp length + readonly unsigned int block_size readonly unsigned long entries unsigned long file_size - + unsigned long pointer # position to write next entry at + readonly unsigned long page_size + object file, mmap, file_lock_object + bint closed cpdef object iterate_indices(self, unsigned long starting_entry, unsigned long stopping_entry) cpdef int close(self) except -1 cdef tuple get_piece_at(self, unsigned int index) - cpdef int append(self, unsigned long long timestamp, bytes data) except -1 cdef int sync(self) except -1 cpdef unsigned int find_left(self, unsigned long long timestamp) cpdef unsigned int find_right(self, unsigned long long timestamp) - cdef int extend(self) except -1 + cpdef object open_file(self, str path) + cpdef int extend(self) except -1 + cpdef int after_init(self) except -1 + cpdef int append(self, unsigned long long timestamp, bytes data) except -1 cpdef int delete(self) except -1 cpdef int switch_to_descriptor_based_access(self) except -1 cpdef unsigned long get_mmap_size(self) @@ -36,4 +49,3 @@ cdef class BaseChunk: return self.entries cdef unsigned long long get_timestamp_at(self, unsigned int index) - diff --git a/tempsdb/chunks/base.pyx b/tempsdb/chunks/base.pyx index e69de29..9071c0a 100644 --- a/tempsdb/chunks/base.pyx +++ b/tempsdb/chunks/base.pyx @@ -0,0 +1,349 @@ +import io +import os +import threading +import typing as tp +import struct +import mmap +from ..exceptions import Corruption, InvalidState, AlreadyExists +from ..series cimport TimeSeries + + +STRUCT_Q = struct.Struct('<Q') +STRUCT_L = struct.Struct('<L') +STRUCT_LQ = struct.Struct('<LQ') + +DEF HEADER_SIZE = 4 +DEF TIMESTAMP_SIZE = 8 +DEF FOOTER_SIZE = 4 + + +cdef class AlternativeMMap: + """ + An alternative mmap implementation used when mmap cannot allocate due to memory issues + """ + def flush(self): + self.io.flush() + + def madvise(self, a, b, c): + ... + + def resize(self, file_size: int): + self.size = file_size + + def __init__(self, io_file: io.BinaryIO, file_lock_object): + self.io = io_file + self.io.seek(0, 2) + self.size = self.io.tell() + self.file_lock_object = file_lock_object + + def __getitem__(self, item: slice) -> bytes: + cdef: + unsigned long start = item.start + unsigned long stop = item.stop + + with self.file_lock_object: + self.io.seek(start, 0) + return self.io.read(stop-start) + + def __setitem__(self, key: slice, value: bytes): + cdef: + unsigned long start = key.start + + with self.file_lock_object: + self.io.seek(start, 0) + self.io.write(value) + + def close(self): + pass + + +cdef class Chunk: + """ + Represents a single chunk of time series. + + This implementation is the default - it allocates a page ahead of the stream and + writes the amount of currently written entries to the end of the page. Suitable for SSD + and RAM media. + + This also implements an iterator interface, and will iterate with tp.Tuple[int, bytes], + as well as a sequence protocol. + + This will try to mmap opened files, but if mmap fails due to not enough memory this + will use descriptor-based access. + + :param parent: parent time series + :param path: path to the chunk file + :param use_descriptor_access: whether to use descriptor based access instead of mmap + + :ivar path: path to the chunk (str) + :ivar min_ts: timestamp of the first entry stored (int) + :ivar max_ts: timestamp of the last entry stored (int) + :ivar block_size: size of the data entries (int) + :ivar entries: amount of entries in this chunk (int) + :ivar page_size: size of the page (int) + """ + cpdef unsigned long get_mmap_size(self): + """ + :return: how many bytes are mmaped? + :rtype: int + """ + if isinstance(self.mmap, AlternativeMMap): + return 0 + else: + return self.file_size + + cpdef int switch_to_descriptor_based_access(self) except -1: + """ + Switch self to descriptor-based access instead of mmap. + + No-op if already in descriptor based mode. + """ + if isinstance(self.mmap, AlternativeMMap): + return 0 + self.mmap.close() + self.file_lock_object = threading.Lock() + self.mmap = AlternativeMMap(self.file, self.file_lock_object) + return 0 + + cpdef int after_init(self) except -1: + """ + Load the :attr:`~Chunk.entries`, :attr:`~Chunk.pointer` and :attr:`~Chunk.max_ts` + + :meta private: + """ + self.entries, = STRUCT_L.unpack(self.mmap[self.file_size-FOOTER_SIZE:self.file_size]) + self.pointer = self.entries*self.block_size_plus+HEADER_SIZE + self.max_ts = self.get_timestamp_at(self.entries-1) + + if self.pointer >= self.page_size: + # Inform the OS that we don't need the header anymore + self.mmap.madvise(mmap.MADV_DONTNEED, 0, HEADER_SIZE+TIMESTAMP_SIZE) + return 0 + + cpdef object open_file(self, str path): + self.file = open(self.path, 'rb+') + + def __init__(self, parent: tp.Optional[TimeSeries], path: str, page_size: int, + use_descriptor_access: bool = False): + cdef bytes b + self.file_size = os.path.getsize(path) + self.page_size = page_size + self.parent = parent + self.closed = False + self.path = path + self.file = self.open_file(path) + self.file_lock_object = None + + if use_descriptor_access: + self.file_lock_object = threading.Lock() + self.mmap = AlternativeMMap(self.file, self.file_lock_object) + else: + try: + self.mmap = mmap.mmap(self.file.fileno(), 0) + except OSError as e: + if e.errno == 12: # Cannot allocate memory + self.file_lock_object = threading.Lock() + self.mmap = AlternativeMMap(self.file, self.file_lock_object) + else: + self.file.close() + self.closed = True + raise Corruption(f'Failed to mmap chunk file: {e}') + + try: + self.block_size, self.min_ts = STRUCT_LQ.unpack(self.mmap[0:HEADER_SIZE+TIMESTAMP_SIZE]) + self.block_size_plus = self.block_size + TIMESTAMP_SIZE + except struct.error: + self.close() + raise Corruption('Could not read the header of the chunk file %s' % (self.path, )) + + self.entries = 0 + self.max_ts = 0 + self.pointer = 0 + + self.after_init() + + cpdef unsigned int find_left(self, unsigned long long timestamp): + """ + Return an index i of position such that ts[i] <= timestamp and + (timestamp-ts[i]) -> min. + + Used as bound in searches: you start from this index and finish at + :meth:`~tempsdb.chunks.Chunk.find_right`. + + :param timestamp: timestamp to look for, must be smaller or equal to largest element + in the chunk + :return: index such that ts[i] <= timestamp and (timestamp-ts[i]) -> min, or length of the + array if timestamp is larger than largest element in this chunk + """ + cdef: + unsigned int hi = self.length() + unsigned int lo = 0 + unsigned int mid + while lo < hi: + mid = (lo+hi)//2 + if self.get_timestamp_at(mid) < timestamp: + lo = mid+1 + else: + hi = mid + return lo + + cpdef unsigned int find_right(self, unsigned long long timestamp): + """ + Return an index i of position such that ts[i] > timestamp and + (ts[i]-timestamp) -> min + + Used as bound in searches: you start from + :meth:`~tempsdb.chunks.Chunk.find_right` and finish at this inclusive. + + :param timestamp: timestamp to look for + :return: index such that ts[i] > timestamp and (ts[i]-timestamp) -> min + """ + cdef: + unsigned int hi = self.length() + unsigned int lo = 0 + unsigned int mid + while lo < hi: + mid = (lo+hi)//2 + if timestamp < self.get_timestamp_at(mid): + hi = mid + else: + lo = mid+1 + return lo + + def __getitem__(self, index: tp.Union[int, slice]): + if isinstance(index, slice): + return self.iterate_range(index.start, index.stop) + else: + return self.get_piece_at(index) + + cdef int sync(self) except -1: + """ + Synchronize the mmap + """ + self.mmap.flush() + return 0 + + cpdef int extend(self) except -1: + """ + Adds PAGE_SIZE bytes to this file + """ + cdef bytearray ba + if self.file_lock_object: + self.file_lock_object.acquire() + try: + self.file_size += self.page_size + self.file.seek(0, 2) + ba = bytearray(self.page_size) + ba[self.page_size-FOOTER_SIZE:self.page_size] = STRUCT_L.pack(self.entries) + self.file.write(ba) + self.mmap.resize(self.file_size) + finally: + if self.file_lock_object: + self.file_lock_object.release() + + cdef unsigned long long get_timestamp_at(self, unsigned int index): + """ + Get timestamp at given entry + + :param index: index of the entry + :return: timestamp at this entry + """ + cdef unsigned long offset = HEADER_SIZE+index*self.block_size_plus + return STRUCT_Q.unpack(self.mmap[offset:offset+TIMESTAMP_SIZE])[0] + + cpdef int delete(self) except -1: + """ + Close and delete this chunk. + """ + self.close() + os.unlink(self.path) + return 0 + + cpdef int append(self, unsigned long long timestamp, bytes data) except -1: + """ + Append a record to this chunk. + + Might range from very fast (just a memory operation) to quite slow (adding a new page + to the file). + + Simultaneous writing is not thread-safe. + + Timestamp and data is not checked for, this is supposed to be handled by + :class:`~tempsdb.series.TimeSeries`. + + :param timestamp: timestamp of the entry + :param data: data to write + :raises InvalidState: chunk is closed + """ + if self.closed: + raise InvalidState('chunk is closed') + + if self.pointer >= self.file_size-FOOTER_SIZE-self.block_size_plus: + self.extend() + cdef unsigned long long ptr_end = self.pointer + TIMESTAMP_SIZE + # Append entry + self.mmap[self.pointer:ptr_end] = STRUCT_Q.pack(timestamp) + self.mmap[ptr_end:ptr_end+self.block_size] = data + self.entries += 1 + # Update entries count + self.mmap[self.file_size-FOOTER_SIZE:self.file_size] = STRUCT_L.pack(self.entries) + # Update properties + self.max_ts = timestamp + self.pointer += self.block_size_plus + return 0 + + cpdef object iterate_indices(self, unsigned long starting_entry, unsigned long stopping_entry): + """ + Return a partial iterator starting at starting_entry and ending at stopping_entry (exclusive). + + :param starting_entry: index of starting entry + :param stopping_entry: index of stopping entry + :return: an iterator + :rtype: tp.Iterator[tp.Tuple[int, bytes]] + """ + return self._iterate(starting_entry, stopping_entry) + + def _iterate(self, starting_entry: int, stopping_entry: int): + cdef int i + for i in range(starting_entry, stopping_entry): + yield self.get_piece_at(i) + + def __iter__(self) -> tp.Iterator[tp.Tuple[int, bytes]]: + return self._iterate(0, self.entries) + + def __len__(self): + return self.length() + + cpdef int close(self) except -1: + """ + Close the chunk and close the allocated resources + """ + if self.closed: + return 0 + if self.parent: + with self.parent.open_lock: + del self.parent.open_chunks[self.name()] + self.parent = None + self.mmap.close() + self.file.close() + return 0 + + def __del__(self): + self.close() + + cdef tuple get_piece_at(self, unsigned int index): + """ + Return a piece of data at a particular index, numbered from 0 + + :return: at piece of data at given index + :rtype: tp.Tuple[int, bytes] + """ + if index >= self.entries: + raise IndexError('Index too large') + cdef: + unsigned long starting_index = HEADER_SIZE + index * self.block_size_plus + unsigned long stopping_index = starting_index + self.block_size_plus + unsigned long long ts = STRUCT_Q.unpack( + self.mmap[starting_index:starting_index+TIMESTAMP_SIZE])[0] + return ts, self.mmap[starting_index+TIMESTAMP_SIZE:stopping_index] + diff --git a/tempsdb/chunks/direct.pxd b/tempsdb/chunks/direct.pxd new file mode 100644 index 0000000..f2ee126 --- /dev/null +++ b/tempsdb/chunks/direct.pxd @@ -0,0 +1,11 @@ +from .base cimport Chunk + + +cdef class DirectChunk(Chunk): + cdef: + int gzip + + cpdef object open_file(self, str path) + cpdef int extend(self) except -1 + cpdef int after_init(self) except -1 + cpdef int append(self, unsigned long long timestamp, bytes data) except -1 diff --git a/tempsdb/chunks/direct.pyx b/tempsdb/chunks/direct.pyx new file mode 100644 index 0000000..5b02429 --- /dev/null +++ b/tempsdb/chunks/direct.pyx @@ -0,0 +1,77 @@ +import gzip +import typing as tp +import struct + +from ..series cimport TimeSeries +from .base cimport Chunk + + +STRUCT_Q = struct.Struct('<Q') +DEF HEADER_SIZE = 4 +DEF TIMESTAMP_SIZE = 8 +DEF FOOTER_SIZE = 4 + + +cdef class DirectChunk(Chunk): + """ + Alternative implementation that extends the file as-it-goes, without allocating an entire page + in advance. + + This is also the only chunk type capable of supporting gzip. + + Note that if you system doesn't like mmap resizing a lot, try to use it with + `use_descriptor_access=True`. + + Note that you can only use gzip if you set use_descriptor_access to True + + :param gzip_compression_level: gzip compression level to use. 0 is default and means + gzip disabled. + :raises ValueError: non-direct descriptor was requested and gzip was enabled + """ + def __init__(self, parent: tp.Optional[TimeSeries], path: str, page_size: int, + use_descriptor_access: bool = True, + gzip_compression_level: int = 0): + self.gzip = gzip_compression_level + + if gzip_compression_level: + path = path + '.gz' + else: + path = path + '.direct' + + if gzip_compression_level: + if not use_descriptor_access: + raise ValueError('Use descriptor access must be enabled when using gzip') + super().__init__(parent, path, page_size, use_descriptor_access) + + cpdef object open_file(self, str path): + if self.gzip: + self.file = gzip.open(path, 'rb+', compresslevel=self.gzip) + else: + self.file = open(path, 'rb+') + + cpdef int extend(self) except -1: + return 0 + + cpdef int after_init(self) except -1: + self.entries = (self.file_size - HEADER_SIZE) // self.block_size_plus + self.pointer = self.file_size + self.max_ts, = STRUCT_Q.unpack( + self.mmap[self.file_size-self.block_size_plus:self.file_size-self.block_size]) + return 0 + + cpdef int append(self, unsigned long long timestamp, bytes data) except -1: + cdef bytearray b + if self.file_lock_object: + self.file_lock_object.acquire() + try: + self.file_size += self.block_size_plus + self.file.seek(self.pointer, 0) + b = STRUCT_Q.pack(timestamp) + data + self.file.write(b) + self.mmap.resize(self.file_size) + self.pointer += self.block_size_plus + finally: + if self.file_lock_object: + self.file_lock_object.release() + return 0 + diff --git a/tempsdb/chunks/maker.pxd b/tempsdb/chunks/maker.pxd new file mode 100644 index 0000000..7b79b97 --- /dev/null +++ b/tempsdb/chunks/maker.pxd @@ -0,0 +1,7 @@ +from .base cimport Chunk +from ..series cimport TimeSeries + + +cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timestamp, + bytes data, int page_size, bint descriptor_based_access=*, + bint use_direct_mode=*, int gzip_compresion_level=*) diff --git a/tempsdb/chunks/maker.pyx b/tempsdb/chunks/maker.pyx new file mode 100644 index 0000000..e861307 --- /dev/null +++ b/tempsdb/chunks/maker.pyx @@ -0,0 +1,79 @@ +import gzip +import os +import struct + +from tempsdb.exceptions import AlreadyExists +from .base cimport Chunk +from .direct cimport DirectChunk +from ..series cimport TimeSeries + +STRUCT_Q = struct.Struct('<Q') +STRUCT_L = struct.Struct('<L') + +cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timestamp, + bytes data, int page_size, bint descriptor_based_access=False, + bint use_direct_mode = False, int gzip_compresion_level=0): + """ + Creates a new chunk on disk + + :param parent: parent time series + :param path: path to the new chunk file + :param timestamp: timestamp for first entry to contain + :param data: data of the first entry + :param page_size: size of a single page for storage + :param descriptor_based_access: whether to use descriptor based access instead of mmap. + Default is False + :param use_direct_mode: if True, chunk will be created using direct mode, without page + preallocation + :param gzip_compresion_level: gzip compression level. Use 0 to disable compression. + :raises ValueError: entries in data were not of equal size, or data was empty or data + was not sorted by timestamp or same timestamp appeared twice + :raises AlreadyExists: chunk already exists + """ + if os.path.exists(path): + raise AlreadyExists('chunk already exists!') + if not data: + raise ValueError('Data is empty') + if not gzip_compresion_level and use_direct_mode: + path = path + '.direct' + elif gzip_compresion_level: + path = path + '.gz' + + if gzip_compresion_level: + file = gzip.open(path, 'wb', compresslevel=gzip_compresion_level) + else: + file = open(path, 'wb') + cdef: + bytes b + unsigned long long ts + unsigned long block_size = len(data) + unsigned long file_size = 0 + unsigned long long last_ts = 0 + unsigned int entries = 1 + bint first_element = True + file_size += file.write(STRUCT_L.pack(block_size)) + file_size += file.write(STRUCT_Q.pack(timestamp)) + file_size += file.write(data) + cdef: + bytearray footer + unsigned long bytes_to_pad + if not use_direct_mode: + # Pad this thing to page_size + bytes_to_pad = page_size - (file_size % page_size) + file.write(b'\x00' * bytes_to_pad) + + # Create a footer at the end + footer = bytearray(page_size) + footer[-4:] = b'\x01\x00\x00\x00' # 1 in little endian + file.write(footer) + file.close() + if gzip_compresion_level: + return DirectChunk(parent, path, page_size, use_descriptor_access=False, + gzip_compresion_level=gzip_compresion_level) + else: + if use_direct_mode: + return DirectChunk(parent, path, page_size, + use_descriptor_access=descriptor_based_access) + else: + return Chunk(parent, path, page_size, use_descriptor_access=descriptor_based_access) + diff --git a/tempsdb/database.pxd b/tempsdb/database.pxd index 9d75efc..ee4eeaa 100644 --- a/tempsdb/database.pxd +++ b/tempsdb/database.pxd @@ -23,6 +23,7 @@ cdef class Database: cpdef VarlenSeries create_varlen_series(self, str name, list length_profile, int size_struct, unsigned long entries_per_chunk) + bint use_descriptor_based_access=*) int gzip_level=*) cpdef list get_open_series(self) cpdef list get_all_series(self) cpdef int close_all_open_series(self) except -1 diff --git a/tempsdb/database.pyx b/tempsdb/database.pyx index 4a79f14..bc2c889 100644 --- a/tempsdb/database.pyx +++ b/tempsdb/database.pyx @@ -201,7 +201,8 @@ cdef class Database: cpdef TimeSeries create_series(self, str name, int block_size, unsigned long entries_per_chunk, int page_size=4096, - bint use_descriptor_based_access=False): + bint use_descriptor_based_access=False, + int gzip_level=0): """ Create a new series @@ -211,6 +212,7 @@ cdef class Database: :param page_size: size of a single page. Default is 4096 :param use_descriptor_based_access: whether to use descriptor based access instead of mmap. Default is False + :param gzip_level: gzip compression level. Default is 0 which means "don't use gzip" :return: new series :raises ValueError: block size was larger than page_size plus a timestamp :raises AlreadyExists: series with given name already exists @@ -222,7 +224,8 @@ cdef class Database: cdef TimeSeries series = create_series(os.path.join(self.path, name), name, block_size, entries_per_chunk, page_size=page_size, - use_descriptor_based_access=use_descriptor_based_access) + use_descriptor_based_access=use_descriptor_based_access, + gzip_level=gzip_level) self.open_series[name] = series return series diff --git a/tempsdb/iterators.pxd b/tempsdb/iterators.pxd index b5befd8..c84a722 100644 --- a/tempsdb/iterators.pxd +++ b/tempsdb/iterators.pxd @@ -1,4 +1,4 @@ -from .chunks cimport Chunk +from .chunks.base cimport Chunk from .series cimport TimeSeries diff --git a/tempsdb/iterators.pyx b/tempsdb/iterators.pyx index e898745..b7d5d11 100644 --- a/tempsdb/iterators.pyx +++ b/tempsdb/iterators.pyx @@ -1,7 +1,7 @@ import typing as tp import warnings -from .chunks cimport Chunk +from .chunks.base cimport Chunk from .series cimport TimeSeries import collections diff --git a/tempsdb/series.pxd b/tempsdb/series.pxd index 4f1fc9a..b91210b 100644 --- a/tempsdb/series.pxd +++ b/tempsdb/series.pxd @@ -13,6 +13,7 @@ cdef class TimeSeries: readonly unsigned long long last_entry_synced readonly int block_size readonly unsigned long long last_entry_ts + readonly int gzip_level unsigned int page_size readonly dict metadata readonly bint descriptor_based_access @@ -28,7 +29,7 @@ cdef class TimeSeries: cpdef int close(self) except -1 cdef void incref_chunk(self, unsigned long long name) cdef void decref_chunk(self, unsigned long long name) - cdef Chunk open_chunk(self, unsigned long long name) + cdef Chunk open_chunk(self, unsigned long long name, bint is_direct, bint is_gzip) cdef int sync_metadata(self) except -1 cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1 cpdef int append(self, unsigned long long timestamp, bytes data) except -1 @@ -48,4 +49,5 @@ cdef class TimeSeries: cpdef TimeSeries create_series(str path, str name, unsigned int block_size, int max_entries_per_chunk, int page_size=*, - bint use_descriptor_based_access=*) + bint use_descriptor_based_access=*, + int gzip_level=*) diff --git a/tempsdb/series.pyx b/tempsdb/series.pyx index fced5ea..d86bc31 100644 --- a/tempsdb/series.pyx +++ b/tempsdb/series.pyx @@ -5,7 +5,9 @@ import warnings from satella.json import write_json_to_file, read_json_from_file -from .chunks cimport create_chunk, Chunk +from .chunks.base cimport Chunk +from .chunks.maker cimport create_chunk +from .chunks.direct cimport DirectChunk from .exceptions import DoesNotExist, Corruption, InvalidState, AlreadyExists import os @@ -105,18 +107,25 @@ cdef class TimeSeries: raise DoesNotExist('Chosen time series does not exist') cdef: + str metadata_s = read_in_file(os.path.join(self.path, METADATA_FILE_NAME), + 'utf-8', 'invalid json') dict metadata str filename list files = os.listdir(self.path) unsigned long long last_chunk_name + bint is_direct + bint is_gzip try: metadata = read_json_from_file(os.path.join(self.path, METADATA_FILE_NAME)) self.block_size = metadata['block_size'] self.max_entries_per_chunk = metadata['max_entries_per_chunk'] self.last_entry_synced = metadata['last_entry_synced'] - self.page_size = metadata.get('page_size', DEFAULT_PAGE_SIZE) + self.page_size = metadata.get('page_size', 4096) self.metadata = metadata.get('metadata') + self.gzip_level = metadata.get('gzip_level', 0) + except ValueError: + raise Corruption('Corrupted series') except (OSError, ValueError) as e: raise Corruption('Corrupted series: %s' % (e, )) except KeyError: @@ -132,18 +141,26 @@ cdef class TimeSeries: self.chunks = [] self.last_entry_ts = 0 else: - self.chunks = [] # type: tp.List[int] # sorted by ASC + self.chunks = [] # type: tp.List[tp.Tuple[int, bool, bool]] # sorted by ASC + # timestamp, is_direct, is_gzip for filename in files: if filename == METADATA_FILE_NAME: continue + is_gzip = filename.endswith('.gz') + if is_gzip: + filename = filename.replace('.gz', '') + is_direct = filename.endswith('.direct') + if is_direct: + filename = filename.replace('.direct', '') + is_direct |= is_gzip try: self.chunks.append(int(filename)) except ValueError: raise Corruption('Detected invalid file "%s"' % (filename, )) self.chunks.sort() - last_chunk_name = self.chunks[-1] - self.last_chunk = self.open_chunk(last_chunk_name) + last_chunk_name, is_direct, is_gzip = self.chunks[-1] + self.last_chunk = self.open_chunk(last_chunk_name, is_direct, is_gzip) self.last_entry_ts = self.last_chunk.max_ts cdef void decref_chunk(self, unsigned long long name): @@ -155,7 +172,7 @@ cdef class TimeSeries: else: self.refs_chunks[name] += 1 - cdef Chunk open_chunk(self, unsigned long long name): + cdef Chunk open_chunk(self, unsigned long long name, bint is_direct, bint is_gzip): """ Opens a provided chunk. @@ -173,10 +190,18 @@ cdef class TimeSeries: cdef Chunk chunk with self.open_lock: if name not in self.open_chunks: - self.open_chunks[name] = chunk = Chunk(self, - os.path.join(self.path, str(name)), - self.page_size, - use_descriptor_access=self.descriptor_based_access) + if is_direct: + chunk = DirectChunk(self, + os.path.join(self.path, str(name)), + self.page_size, + use_descriptor_access=True, + gzip_compression_level=self.gzip_level) + else: + chunk = Chunk(self, + os.path.join(self.path, str(name)), + self.page_size, + use_descriptor_access=self.descriptor_based_access) + self.open_chunks[name] = chunk else: chunk = self.open_chunks[name] self.incref_chunk(name) @@ -184,20 +209,36 @@ cdef class TimeSeries: cpdef int trim(self, unsigned long long timestamp) except -1: """ - Delete all entries earlier than timestamp that are closed. + Delete all entries earlier than timestamp. Note that this will drop entire chunks, so it may be possible that some entries will linger - on. - - This will affect only closed chunks. Chunks ready to delete that are closed after - this will not be deleted, as :meth:`~tempsdb.series.TimeSeries.trim` will need - to be called again. + on. This will not delete currently opened chunks! :param timestamp: timestamp to delete entries earlier than """ + if len(self.chunks) == 1: + return 0 cdef: unsigned long long chunk_to_delete int refs + try: + with self.open_lock: + while len(self.chunks) >= 2 and timestamp > self.chunks[1][0]: + chunk_to_delete = self.chunks[0][0] + if chunk_to_delete in self.open_chunks: + refs = self.refs_chunks.get(chunk_to_delete, 0) + if not refs: + self.open_chunks[chunk_to_delete].delete() + else: + # I would delete it, but it's open... + return 0 + else: + os.unlink(os.path.join(self.path, str(chunk_to_delete))) + del self.chunks[0] + else: + return 0 + except IndexError: + return 0 if len(self.chunks) > 1: try: with self.open_lock: @@ -293,7 +334,8 @@ cdef class TimeSeries: Chunk chunk for chunk_index in range(ch_start, ch_stop+1): - chunks.append(self.open_chunk(self.chunks[chunk_index])) + ts, is_direct, is_gzip = self.chunks[chunk_index] + chunks.append(self.open_chunk(ts, is_direct, is_gzip)) return Iterator(self, start, stop, chunks) cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1: @@ -428,10 +470,12 @@ cdef class TimeSeries: self.decref_chunk(self.last_chunk.name()) self.last_chunk = create_chunk(self, os.path.join(self.path, str(timestamp)), timestamp, data, self.page_size, - descriptor_based_access=self.descriptor_based_access) + descriptor_based_access=is_descriptor, + use_direct_mode=is_descriptor, + gzip_compresion_level=self.gzip_level) self.open_chunks[timestamp] = self.last_chunk self.incref_chunk(timestamp) - self.chunks.append(timestamp) + self.chunks.append((timestamp, is_descriptor, bool(self.gzip_level))) else: self.last_chunk.append(timestamp, data) self.last_entry_ts = timestamp @@ -448,7 +492,6 @@ cdef class TimeSeries: raise InvalidState('series is closed') self.close() shutil.rmtree(self.path) - return 0 cpdef unsigned long open_chunks_mmap_size(self): """ @@ -473,16 +516,18 @@ cdef class TimeSeries: cpdef TimeSeries create_series(str path, str name, unsigned int block_size, int max_entries_per_chunk, int page_size=DEFAULT_PAGE_SIZE, bint use_descriptor_based_access=False): + int max_entries_per_chunk, int page_size=4096, + bint use_descriptor_based_access=False, + int gzip_level=0): if os.path.exists(path): raise AlreadyExists('This series already exists!') cdef dict meta = { 'block_size': block_size, 'max_entries_per_chunk': max_entries_per_chunk, - 'last_entry_synced': 0 - } - if page_size != DEFAULT_PAGE_SIZE: - meta['page_size'] = page_size - os.mkdir(path) - write_json_to_file(os.path.join(path, METADATA_FILE_NAME), meta) + 'last_entry_synced': 0, + 'page_size': page_size, + 'gzip_level': gzip_level + }, f_out + ) return TimeSeries(path, name) diff --git a/tests/test_db.py b/tests/test_db.py index 4e24666..4f04068 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -42,6 +42,25 @@ class TestDB(unittest.TestCase): self.do_verify_series(series, 0, 1800) series.close() + def test_create_series(self): + from tempsdb.series import create_series + + series = create_series('test.gz', 'test.gz', 1, 10, gzip_level=6) + start, ts = 127, 100 + for i in range(20): + series.append(ts, bytes(bytearray([start]))) + start -= 1 + ts += 100 + + self.do_verify_series(series, 0, 2000) + self.do_verify_series(series, 500, 2000) + self.do_verify_series(series, 1000, 2000) + self.do_verify_series(series, 1500, 2000) + self.do_verify_series(series, 0, 500) + self.do_verify_series(series, 0, 1200) + self.do_verify_series(series, 0, 1800) + series.close() + def do_verify_series(self, series, start, stop): it = series.iterate_range(start, stop) items = list(it) -- GitLab