From b52af300920e5b5b134ad0f268433a2c3f012710 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Mon, 14 Dec 2020 17:03:33 +0100 Subject: [PATCH] rebased feature/gzip onto develop --- README.md | 1 + docs/index.rst | 7 ++ setup.py | 3 +- tempsdb/chunks/base.pxd | 11 ++- tempsdb/chunks/base.pyx | 100 +++++++++++++++++++--- tempsdb/chunks/normal.pxd | 14 +-- tempsdb/chunks/normal.pyx | 173 +------------------------------------- tempsdb/database.pxd | 4 +- tempsdb/series.pyx | 24 +++--- tempsdb/varlen.pyx | 2 +- 10 files changed, 122 insertions(+), 217 deletions(-) diff --git a/README.md b/README.md index c5041e0..633ee56 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,7 @@ Then copy your resulting wheel and install it via pip on the target system. * added `append_padded` * added metadata support, `metadata` property and `set_metadata` call * added variable length series +* added experimental support for gzipping (constant-length series only for now) * fixed a bug where getting a series that was already closed would TypeError * following additions to `Chunk`: * `get_slice_of_piece_at` diff --git a/docs/index.rst b/docs/index.rst index 44fcf65..b289c9b 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -25,6 +25,13 @@ So no variable encoding for you! When mmap fails due to memory issues, this falls back to slower fwrite()/fread() implementation. You can also manually select the descriptor-based implementation if you want to. +.. versionadded:: 0.5 + +Experimental support for gzipping time series is added. Note that reading from gzipped files is for now +slow, as every seek requires reading the file from the beginning. + +Warnings will be issued while using gzipped series to remind you of this fact. + Indices and tables ================== diff --git a/setup.py b/setup.py index 587c9e7..c2db648 100644 --- a/setup.py +++ b/setup.py @@ -2,6 +2,7 @@ import os import typing as tp from Cython.Build import cythonize +from satella.distutils import monkey_patch_parallel_compilation from satella.files import find_files from distutils.core import setup @@ -27,7 +28,7 @@ if 'CI' in os.environ: setup(name='tempsdb', - version='0.5.0_a1', + version='0.5.0a8', packages=['tempsdb'], install_requires=['satella>=2.14.21', 'ujson'], ext_modules=build([Multibuild('tempsdb', find_pyx('tempsdb')), ], diff --git a/tempsdb/chunks/base.pxd b/tempsdb/chunks/base.pxd index 37afb91..a220b7c 100644 --- a/tempsdb/chunks/base.pxd +++ b/tempsdb/chunks/base.pxd @@ -22,9 +22,14 @@ cdef class Chunk: object file, mmap, file_lock_object bint closed cpdef object iterate_indices(self, unsigned long starting_entry, unsigned long stopping_entry) - cpdef int close(self) except -1 + cpdef int close(self, bint force=*) except -1 cdef tuple get_piece_at(self, unsigned int index) cdef int sync(self) except -1 + cpdef unsigned long long get_timestamp_at(self, unsigned int index) + cpdef bytes get_value_at(self, unsigned int index) + cpdef bytes get_slice_of_piece_at(self, unsigned int index, int start, int stop) + cpdef bytes get_slice_of_piece_starting_at(self, unsigned int index, int start) + cpdef int get_byte_of_piece(self, unsigned int index, int byte_index) except -1 cpdef unsigned int find_left(self, unsigned long long timestamp) cpdef unsigned int find_right(self, unsigned long long timestamp) cpdef object open_file(self, str path) @@ -34,6 +39,8 @@ cdef class Chunk: cpdef int delete(self) except -1 cpdef int switch_to_descriptor_based_access(self) except -1 cpdef unsigned long get_mmap_size(self) + cdef void incref(self) + cdef int decref(self) except -1 cdef inline unsigned long long name(self): """ @@ -48,5 +55,3 @@ cdef class Chunk: :rtype: int """ return self.entries - - cdef unsigned long long get_timestamp_at(self, unsigned int index) diff --git a/tempsdb/chunks/base.pyx b/tempsdb/chunks/base.pyx index 1ef8201..6e23429 100644 --- a/tempsdb/chunks/base.pyx +++ b/tempsdb/chunks/base.pyx @@ -6,7 +6,7 @@ import struct import mmap from .gzip cimport ReadWriteGzipFile -from ..exceptions import Corruption, InvalidState, AlreadyExists +from ..exceptions import Corruption, InvalidState, AlreadyExists, StillOpen from ..series cimport TimeSeries @@ -172,6 +172,63 @@ cdef class Chunk: self.after_init() + cpdef int get_byte_of_piece(self, unsigned int index, int byte_index) except -1: + """ + Return a particular byte of given element at given index. + + When index is negative, or larger than block_size, the behaviour is undefined + + :param index: index of the element + :param byte_index: index of the byte + :return: value of the byte + :raises ValueError: index too large + """ + if index > self.entries: + raise ValueError('index too large') + cdef: + unsigned long offset = HEADER_SIZE + TIMESTAMP_SIZE + index * self.block_size_plus + byte_index + return self.mmap[offset] + + cpdef bytes get_slice_of_piece_starting_at(self, unsigned int index, int start): + """ + Return a slice of data from given element starting at given index to the end + + :param index: index of the element + :param start: starting index + :return: a byte slice + """ + return self.get_slice_of_piece_at(index, start, self.block_size) + + cpdef bytes get_slice_of_piece_at(self, unsigned int index, int start, int stop): + """ + Return a slice of data from given element + + :param index: index of the element + :param start: starting offset of data + :param stop: stopping offset of data + :return: a byte slice + """ + if index >= self.entries: + raise IndexError('Index too large') + cdef: + unsigned long starting_index = HEADER_SIZE + TIMESTAMP_SIZE + index * self.block_size_plus + start + unsigned long stopping_index = starting_index + stop + return self.mmap[starting_index:stopping_index] + + cpdef unsigned long long get_timestamp_at(self, unsigned int index): + """ + Return a timestamp at a particular location + + Passing an invalid index will result in an undefined behaviour. + + :param index: index of element + :return: the timestamp + """ + cdef: + unsigned long starting_index = HEADER_SIZE + index * self.block_size_plus + unsigned long stopping_index = starting_index + TIMESTAMP_SIZE + return STRUCT_Q.unpack(self.mmap[starting_index:stopping_index])[0] + cpdef unsigned int find_left(self, unsigned long long timestamp): """ Return an index i of position such that ts[i] <= timestamp and @@ -251,16 +308,6 @@ cdef class Chunk: if self.file_lock_object: self.file_lock_object.release() - cdef unsigned long long get_timestamp_at(self, unsigned int index): - """ - Get timestamp at given entry - - :param index: index of the entry - :return: timestamp at this entry - """ - cdef unsigned long offset = HEADER_SIZE+index*self.block_size_plus - return STRUCT_Q.unpack(self.mmap[offset:offset+TIMESTAMP_SIZE])[0] - cpdef int delete(self) except -1: """ Close and delete this chunk. @@ -324,15 +371,42 @@ cdef class Chunk: def __len__(self): return self.length() - cpdef int close(self) except -1: + cdef void incref(self): + if self.parent is not None: + self.parent.incref_chunk(self.min_ts) + + cdef int decref(self) except -1: + if self.parent is not None: + self.parent.decref_chunk(self.name()) + if self.parent.get_references_for(self.name()) < 0: + raise ValueError('reference of chunk fell below zero!') + return 0 + + cpdef bytes get_value_at(self, unsigned int index): + """ + Return only the value at a particular index, numbered from 0 + + :return: value at given index + """ + if index >= self.entries: + raise IndexError('Index too large') + cdef: + unsigned long starting_index = HEADER_SIZE + TIMESTAMP_SIZE + index * self.block_size_plus + unsigned long stopping_index = starting_index + self.block_size + return self.mmap[starting_index:stopping_index] + + cpdef int close(self, bint force=False) except -1: """ Close the chunk and close the allocated resources """ if self.closed: return 0 + cdef unsigned long long name = self.name() if self.parent: with self.parent.open_lock: - del self.parent.open_chunks[self.name()] + if self.parent.get_references_for(name) and not force: + raise StillOpen('chunk still open!') + del self.parent.open_chunks[name] self.parent = None self.mmap.close() self.file.close() diff --git a/tempsdb/chunks/normal.pxd b/tempsdb/chunks/normal.pxd index ff1585c..abeba72 100644 --- a/tempsdb/chunks/normal.pxd +++ b/tempsdb/chunks/normal.pxd @@ -1,4 +1,4 @@ -from .series cimport TimeSeries +from ..series cimport TimeSeries cdef class AlternativeMMap: @@ -22,22 +22,10 @@ cdef class Chunk: object file, mmap, file_lock_object bint closed - cdef void incref(self) - cdef int decref(self) except -1 - cpdef object iterate_indices(self, unsigned long starting_entry, unsigned long stopping_entry) cpdef int close(self, bint force=*) except -1 - cpdef unsigned long long get_timestamp_at(self, unsigned int index) - cdef tuple get_piece_at(self, unsigned int index) - cpdef bytes get_value_at(self, unsigned int index) - cpdef bytes get_slice_of_piece_at(self, unsigned int index, int start, int stop) - cpdef bytes get_slice_of_piece_starting_at(self, unsigned int index, int start) - cpdef int get_byte_of_piece(self, unsigned int index, int byte_index) except -1 cpdef int append(self, unsigned long long timestamp, bytes data) except -1 cdef int sync(self) except -1 - cpdef unsigned int find_left(self, unsigned long long timestamp) - cpdef unsigned int find_right(self, unsigned long long timestamp) cdef int extend(self) except -1 - cpdef int delete(self) except -1 cpdef int switch_to_descriptor_based_access(self) except -1 cpdef int switch_to_mmap_based_access(self) except -1 cpdef unsigned long get_mmap_size(self) diff --git a/tempsdb/chunks/normal.pyx b/tempsdb/chunks/normal.pyx index 0656288..acd662c 100644 --- a/tempsdb/chunks/normal.pyx +++ b/tempsdb/chunks/normal.pyx @@ -6,8 +6,8 @@ import struct import mmap import warnings -from .exceptions import Corruption, InvalidState, AlreadyExists, StillOpen -from .series cimport TimeSeries +from ..exceptions import Corruption, InvalidState, AlreadyExists, StillOpen +from ..series cimport TimeSeries DEF HEADER_SIZE = 4 DEF TIMESTAMP_SIZE = 8 @@ -134,17 +134,6 @@ cdef class Chunk: self.mmap = AlternativeMMap(self.file, self.file_lock_object) return 0 - cdef void incref(self): - if self.parent is not None: - self.parent.incref_chunk(self.min_ts) - - cdef int decref(self) except -1: - if self.parent is not None: - self.parent.decref_chunk(self.name()) - if self.parent.get_references_for(self.name()) < 0: - raise ValueError('reference of chunk fell below zero!') - return 0 - def __init__(self, parent: tp.Optional[TimeSeries], path: str, page_size: int, use_descriptor_access: bool = False): cdef bytes b @@ -189,111 +178,6 @@ cdef class Chunk: # Inform the OS that we don't need the header anymore self.mmap.madvise(mmap.MADV_DONTNEED, 0, HEADER_SIZE+TIMESTAMP_SIZE) - cpdef int get_byte_of_piece(self, unsigned int index, int byte_index) except -1: - """ - Return a particular byte of given element at given index. - - When index is negative, or larger than block_size, the behaviour is undefined - - :param index: index of the element - :param byte_index: index of the byte - :return: value of the byte - :raises ValueError: index too large - """ - if index > self.entries: - raise ValueError('index too large') - cdef: - unsigned long offset = HEADER_SIZE + TIMESTAMP_SIZE + index * self.block_size_plus + byte_index - return self.mmap[offset] - - cpdef bytes get_slice_of_piece_starting_at(self, unsigned int index, int start): - """ - Return a slice of data from given element starting at given index to the end - - :param index: index of the element - :param start: starting index - :return: a byte slice - """ - return self.get_slice_of_piece_at(index, start, self.block_size) - - cpdef bytes get_slice_of_piece_at(self, unsigned int index, int start, int stop): - """ - Return a slice of data from given element - - :param index: index of the element - :param start: starting offset of data - :param stop: stopping offset of data - :return: a byte slice - """ - if index >= self.entries: - raise IndexError('Index too large') - cdef: - unsigned long starting_index = HEADER_SIZE + TIMESTAMP_SIZE + index * self.block_size_plus + start - unsigned long stopping_index = starting_index + stop - return self.mmap[starting_index:stopping_index] - - cpdef unsigned long long get_timestamp_at(self, unsigned int index): - """ - Return a timestamp at a particular location - - Passing an invalid index will result in an undefined behaviour. - - :param index: index of element - :return: the timestamp - """ - cdef: - unsigned long starting_index = HEADER_SIZE + index * self.block_size_plus - unsigned long stopping_index = starting_index + TIMESTAMP_SIZE - return STRUCT_Q.unpack(self.mmap[starting_index:stopping_index])[0] - - cpdef unsigned int find_left(self, unsigned long long timestamp): - """ - Return an index i of position such that ts[i] <= timestamp and - (timestamp-ts[i]) -> min. - - Used as bound in searches: you start from this index and finish at - :meth:`~tempsdb.chunks.Chunk.find_right`. - - :param timestamp: timestamp to look for, must be smaller or equal to largest element - in the chunk - :return: index such that ts[i] <= timestamp and (timestamp-ts[i]) -> min, or length of the - array if timestamp is larger than largest element in this chunk - """ - cdef: - unsigned int hi = self.length() - unsigned int lo = 0 - unsigned int mid - while lo < hi: - mid = (lo+hi)//2 - if self.get_timestamp_at(mid) < timestamp: - lo = mid+1 - else: - hi = mid - return lo - - cpdef unsigned int find_right(self, unsigned long long timestamp): - """ - Return an index i of position such that ts[i] > timestamp and - (ts[i]-timestamp) -> min - - Used as bound in searches: you start from - :meth:`~tempsdb.chunks.Chunk.find_right` and finish at this inclusive. - - :param timestamp: timestamp to look for - :return: index such that ts[i] > timestamp and (ts[i]-timestamp) -> min - """ - cdef: - unsigned int hi = self.length() - unsigned int lo = 0 - unsigned int mid - while lo < hi: - mid = (lo+hi)//2 - if timestamp < self.get_timestamp_at(mid): - hi = mid - else: - lo = mid+1 - return lo - def __getitem__(self, index: tp.Union[int, slice]): if isinstance(index, slice): return self.iterate_range(index.start, index.stop) @@ -331,14 +215,6 @@ cdef class Chunk: if self.file_lock_object: self.file_lock_object.release() - cpdef int delete(self) except -1: - """ - Close and delete this chunk. - """ - self.close() - os.unlink(self.path) - return 0 - cpdef int append(self, unsigned long long timestamp, bytes data) except -1: """ Append a record to this chunk. @@ -372,22 +248,6 @@ cdef class Chunk: self.pointer += self.block_size_plus return 0 - cpdef object iterate_indices(self, unsigned long starting_entry, unsigned long stopping_entry): - """ - Return a partial iterator starting at starting_entry and ending at stopping_entry (exclusive). - - :param starting_entry: index of starting entry - :param stopping_entry: index of stopping entry - :return: an iterator - :rtype: tp.Iterator[tp.Tuple[int, bytes]] - """ - return self._iterate(starting_entry, stopping_entry) - - def _iterate(self, starting_entry: int, stopping_entry: int): - cdef int i - for i in range(starting_entry, stopping_entry): - yield self.get_piece_at(i) - def __iter__(self) -> tp.Iterator[tp.Tuple[int, bytes]]: return self._iterate(0, self.entries) @@ -422,35 +282,6 @@ cdef class Chunk: warnings.warn('You forgot to close a Chunk') self.close() - cpdef bytes get_value_at(self, unsigned int index): - """ - Return only the value at a particular index, numbered from 0 - - :return: value at given index - """ - if index >= self.entries: - raise IndexError('Index too large') - cdef: - unsigned long starting_index = HEADER_SIZE + TIMESTAMP_SIZE + index * self.block_size_plus - unsigned long stopping_index = starting_index + self.block_size - return self.mmap[starting_index:stopping_index] - - cdef tuple get_piece_at(self, unsigned int index): - """ - Return a piece of data at a particular index, numbered from 0 - - :return: at piece of data at given index - :rtype: tp.Tuple[int, bytes] - """ - if index >= self.entries: - raise IndexError('Index too large') - cdef: - unsigned long starting_index = HEADER_SIZE + index * self.block_size_plus - unsigned long stopping_index = starting_index + self.block_size_plus - unsigned long long ts = STRUCT_Q.unpack( - self.mmap[starting_index:starting_index+TIMESTAMP_SIZE])[0] - return ts, self.mmap[starting_index+TIMESTAMP_SIZE:stopping_index] - cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timestamp, bytes data, int page_size, bint descriptor_based_access=False): diff --git a/tempsdb/database.pxd b/tempsdb/database.pxd index ee4eeaa..9c6d167 100644 --- a/tempsdb/database.pxd +++ b/tempsdb/database.pxd @@ -19,11 +19,11 @@ cdef class Database: cpdef TimeSeries create_series(self, str name, int block_size, unsigned long entries_per_chunk, int page_size=*, - bint use_descriptor_based_access=*) + bint use_descriptor_based_access=*, + int gzip_level=*) cpdef VarlenSeries create_varlen_series(self, str name, list length_profile, int size_struct, unsigned long entries_per_chunk) - bint use_descriptor_based_access=*) int gzip_level=*) cpdef list get_open_series(self) cpdef list get_all_series(self) cpdef int close_all_open_series(self) except -1 diff --git a/tempsdb/series.pyx b/tempsdb/series.pyx index fcd205e..cab2074 100644 --- a/tempsdb/series.pyx +++ b/tempsdb/series.pyx @@ -107,8 +107,6 @@ cdef class TimeSeries: raise DoesNotExist('Chosen time series does not exist') cdef: - str metadata_s = read_in_file(os.path.join(self.path, METADATA_FILE_NAME), - 'utf-8', 'invalid json') dict metadata str filename list files = os.listdir(self.path) @@ -469,12 +467,12 @@ cdef class TimeSeries: self.decref_chunk(self.last_chunk.name()) self.last_chunk = create_chunk(self, os.path.join(self.path, str(timestamp)), timestamp, data, self.page_size, - descriptor_based_access=is_descriptor, - use_direct_mode=is_descriptor, + descriptor_based_access=self.descriptor_based_access, + use_direct_mode=bool(self.gzip_level), gzip_compression_level=self.gzip_level) self.open_chunks[timestamp] = self.last_chunk self.incref_chunk(timestamp) - self.chunks.append((timestamp, is_descriptor, bool(self.gzip_level))) + self.chunks.append((timestamp, bool(self.gzip_level), bool(self.gzip_level))) else: self.last_chunk.append(timestamp, data) self.last_entry_ts = timestamp @@ -514,19 +512,19 @@ cdef class TimeSeries: cpdef TimeSeries create_series(str path, str name, unsigned int block_size, int max_entries_per_chunk, int page_size=DEFAULT_PAGE_SIZE, - bint use_descriptor_based_access=False): - int max_entries_per_chunk, int page_size=4096, bint use_descriptor_based_access=False, int gzip_level=0): if os.path.exists(path): raise AlreadyExists('This series already exists!') - + os.mkdir(path) cdef dict meta = { 'block_size': block_size, 'max_entries_per_chunk': max_entries_per_chunk, - 'last_entry_synced': 0, - 'page_size': page_size, - 'gzip_level': gzip_level - }, f_out - ) + 'last_entry_synced': 0 + } + if page_size != DEFAULT_PAGE_SIZE: + meta['page_size'] = page_size + if gzip_level: + meta['gzip_level'] = gzip_level + write_json_to_file(os.path.join(path, 'metadata.txt'), meta) return TimeSeries(path, name) diff --git a/tempsdb/varlen.pyx b/tempsdb/varlen.pyx index 41b9355..19ccd24 100644 --- a/tempsdb/varlen.pyx +++ b/tempsdb/varlen.pyx @@ -4,7 +4,7 @@ import typing as tp import struct import warnings -from .chunks cimport Chunk +from .chunks.base cimport Chunk from .exceptions import Corruption, AlreadyExists, StillOpen from .iterators cimport Iterator from .series cimport TimeSeries, create_series -- GitLab