From 8d7eb2887692455f4bff7ef1c6af8c04112aa1e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Fri, 11 Dec 2020 17:53:55 +0100 Subject: [PATCH] update the docs --- README.md | 10 ++- docs/index.rst | 1 + docs/varlen.rst | 44 ++++++++++++ tempsdb/chunks.pxd | 3 + tempsdb/chunks.pyx | 70 ++++++++++++++++--- tempsdb/database.pxd | 6 ++ tempsdb/database.pyx | 62 ++++++++++++++++- tempsdb/varlen.pxd | 23 +++++++ tempsdb/varlen.pyx | 159 ++++++++++++++++++++++++++++++++++++++++++- 9 files changed, 364 insertions(+), 14 deletions(-) create mode 100644 docs/varlen.rst diff --git a/README.md b/README.md index 1683474..6210abd 100644 --- a/README.md +++ b/README.md @@ -56,8 +56,16 @@ Then copy your resulting wheel and install it via pip on the target system. * if page_size is default, it won't be written as part of the metadata * added support for per-series metadata -* added `TimeSeries.append_padded` +* following additions to `TimeSeries`: + * added `append_padded` + * added metadata support, `metadata` property and `set_metadata` call * added variable length series +* fixed a bug where getting a series that was already closed would TypeError +* following additions to `Chunk`: + * `Chunk.get_slice_of_piece_at` + * `Chunk.get_slice_of_piece_starting_at` + * `Chunk.get_byte_of_piece` +* fixed the behaviour of `AlternativeMMaps` when passed a single index to __getitem__ and __setitem__ ## v0.4.4 diff --git a/docs/index.rst b/docs/index.rst index f0f6d89..44fcf65 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -8,6 +8,7 @@ Welcome to tempsdb's documentation! usage exceptions chunks + varlen memory-pressure-manager This is an append-only embedded time series library written in Cython. diff --git a/docs/varlen.rst b/docs/varlen.rst new file mode 100644 index 0000000..04579d7 --- /dev/null +++ b/docs/varlen.rst @@ -0,0 +1,44 @@ +Variable length series +====================== + +Version 0.5 added support for variable length series. + +How does it work? +----------------- + +They work by breaking down your data into smaller pieces and storing them in separate +series, prefixing with length. + +For each series you specify so-called length profile. It is a list of ints, each representing +a block size for next series created. If an entry cannot fit in the already created series, a new one +will be created. Note that the last entry of this array will loop forever, so if you for example +put a 1024 byte data in a varlen series of length profile [10, 255] there will be a total +of 5 normal time series created to accomodate it, with length of: +* 10 +* 255 +* 255 +* 255 +* 255 + +Each entry is also prefixed by it's length. The size of that field is described by an +extra parameter called `size_struct`. + +Note that the only valid sizes of `size_struct` are: +* 1 for maximum length of 255 +* 2 for maximum length of 65535 +* 4 for maximum length of 4294967295 + +Accessing them +-------------- + +Use methods :meth:`tempsdb.database.Database.create_varlen_series` and +:meth:`tempsdb.database.Database.get_varlen_series` to obtain instances of following class: + + +.. autoclass:: tempsdb.varlen.VarlenSeries + :members: + + +.. autoclass:: tempsdb.varlen.VarlenEntry + :members: + diff --git a/tempsdb/chunks.pxd b/tempsdb/chunks.pxd index 6db8a10..b1c430a 100644 --- a/tempsdb/chunks.pxd +++ b/tempsdb/chunks.pxd @@ -25,6 +25,9 @@ cdef class Chunk: cpdef object iterate_indices(self, unsigned long starting_entry, unsigned long stopping_entry) cpdef int close(self) except -1 cdef tuple get_piece_at(self, unsigned int index) + cpdef bytes get_slice_of_piece_at(self, unsigned int index, int start, int stop) + cpdef bytes get_slice_of_piece_starting_at(self, unsigned int index, int start) + cpdef int get_byte_of_piece(self, unsigned int index, int byte_index) except -1 cpdef int append(self, unsigned long long timestamp, bytes data) except -1 cdef int sync(self) except -1 cpdef unsigned int find_left(self, unsigned long long timestamp) diff --git a/tempsdb/chunks.pyx b/tempsdb/chunks.pyx index 9704d09..9c11818 100644 --- a/tempsdb/chunks.pyx +++ b/tempsdb/chunks.pyx @@ -34,22 +34,31 @@ cdef class AlternativeMMap: self.size = self.io.tell() self.file_lock_object = file_lock_object - def __getitem__(self, item: slice) -> bytes: + def __getitem__(self, item: tp.Union[int, slice]) -> tp.Union[int, bytes]: cdef: unsigned long start = item.start unsigned long stop = item.stop - + bytes b with self.file_lock_object: - self.io.seek(start, 0) - return self.io.read(stop-start) + if isinstance(item, int): + self.io.seek(item, 0) + b = self.io.read(1) + return b[0] + else: + start = item.start + stop = item.stop + self.io.seek(start, 0) + return self.io.read(stop-start) - def __setitem__(self, key: slice, value: bytes): + def __setitem__(self, key: tp.Union[int, slice], value: tp.Union[int, bytes]) -> None: cdef: unsigned long start = key.start - - with self.file_lock_object: - self.io.seek(start, 0) - self.io.write(value) + if isinstance(key, int): + self[key:key+1] = bytes([value]) + else: + with self.file_lock_object: + self.io.seek(start, 0) + self.io.write(value) def close(self): pass @@ -167,6 +176,49 @@ cdef class Chunk: # Inform the OS that we don't need the header anymore self.mmap.madvise(mmap.MADV_DONTNEED, 0, HEADER_SIZE+TIMESTAMP_SIZE) + cpdef int get_byte_of_piece(self, unsigned int index, int byte_index) except -1: + """ + Return a particular byte of given element at given index. + + When index is negative, or larger than block_size, the behaviour is undefined + + :param index: index of the element + :param byte_index: index of the byte + :return: value of the byte + :raises ValueError: index too large + """ + if index > self.entries: + raise ValueError('index too large') + cdef: + unsigned long offset = HEADER_SIZE + TIMESTAMP_SIZE + index * self.block_size_plus + byte_index + return self.mmap[offset] + + cpdef bytes get_slice_of_piece_starting_at(self, unsigned int index, int start): + """ + Return a slice of data from given element starting at given index to the end + + :param index: index of the element + :param start: starting index + :return: a byte slice + """ + return self.get_slice_of_piece_at(index, start, self.block_size) + + cpdef bytes get_slice_of_piece_at(self, unsigned int index, int start, int stop): + """ + Return a slice of data from given element + + :param index: index of the element + :param start: starting offset of data + :param stop: stopping offset of data + :return: a byte slice + """ + if index >= self.entries: + raise IndexError('Index too large') + cdef: + unsigned long starting_index = HEADER_SIZE + TIMESTAMP_SIZE + index * self.block_size_plus + start + unsigned long stopping_index = starting_index + self.block_size_plus + stop + return self.mmap[starting_index:stopping_index] + cpdef unsigned int find_left(self, unsigned long long timestamp): """ Return an index i of position such that ts[i] <= timestamp and diff --git a/tempsdb/database.pxd b/tempsdb/database.pxd index e3e8887..9d75efc 100644 --- a/tempsdb/database.pxd +++ b/tempsdb/database.pxd @@ -1,4 +1,5 @@ from .series cimport TimeSeries +from .varlen cimport VarlenSeries cdef class Database: @@ -8,15 +9,20 @@ cdef class Database: object lock object mpm dict open_series + dict open_varlen_series cpdef int close(self) except -1 cpdef TimeSeries get_series(self, str name, bint use_descriptor_based_access=*) + cpdef VarlenSeries get_varlen_series(self, str name) cpdef int register_memory_pressure_manager(self, object mpm) except -1 cpdef TimeSeries create_series(self, str name, int block_size, unsigned long entries_per_chunk, int page_size=*, bint use_descriptor_based_access=*) + cpdef VarlenSeries create_varlen_series(self, str name, list length_profile, + int size_struct, + unsigned long entries_per_chunk) cpdef list get_open_series(self) cpdef list get_all_series(self) cpdef int close_all_open_series(self) except -1 diff --git a/tempsdb/database.pyx b/tempsdb/database.pyx index 8a57db6..e28534e 100644 --- a/tempsdb/database.pyx +++ b/tempsdb/database.pyx @@ -5,6 +5,7 @@ from satella.coding import DictDeleter from tempsdb.exceptions import DoesNotExist, AlreadyExists from .series cimport TimeSeries, create_series +from .varlen cimport VarlenSeries, create_varlen_series cdef class Database: @@ -22,6 +23,7 @@ cdef class Database: self.path = path self.closed = False self.open_series = {} + self.open_varlen_series = {} self.lock = threading.Lock() self.mpm = None @@ -68,7 +70,7 @@ cdef class Database: if name in self.open_series: if self.open_series[name].closed: del self.open_series[name] - return self.open_series(name) + return self.get_series(name) return self.open_series[name] if not os.path.isdir(path): raise DoesNotExist('series %s does not exist' % (name, )) @@ -82,11 +84,16 @@ cdef class Database: """ Closes all open series """ - cdef TimeSeries series + cdef: + TimeSeries series + VarlenSeries v_series with self.lock: for series in self.open_series.values(): series.close() self.open_series = {} + for v_series in self.open_varlen_series.values(): + v_series.close() + self.open_varlen_series = {} return 0 cpdef unsigned long long get_first_entry_for(self, str name): @@ -134,6 +141,57 @@ cdef class Database: """ return os.listdir(self.path) + cpdef VarlenSeries create_varlen_series(self, str name, list length_profile, + int size_struct, + unsigned long entries_per_chunk): + """ + Create a new variable length series + + :param name: name of the series + :param length_profile: list of lengths of subsequent chunks + :param size_struct: how many bytes will be used to store length? + Valid entries are 1, 2 and 4 + :param entries_per_chunk: entries per chunk file + :return: new variable length series + :raises AlreadyExists: series with given name already exists + """ + if os.path.isdir(os.path.join(self.path, 'varlen', name)): + raise AlreadyExists('Series already exists') + cdef VarlenSeries series = create_varlen_series(os.path.join(self.path, name), name, + size_struct, + length_profile, + entries_per_chunk) + self.open_varlen_series[name] = series + return series + + + cpdef VarlenSeries get_varlen_series(self, str name): + """ + Load and return an existing variable length series + + :param name: name of the series + + :return: a loaded varlen series + :raises DoesNotExist: series does not exist + """ + if name in self.open_varlen_series: + result = self.open_varlen_series[name] + else: + path = os.path.join(self.path, 'varlen', name) + with self.lock: + # Check a second time due to the lock + if name in self.open_varlen_series: + if self.open_varlen_series[name].closed: + del self.open_varlen_series[name] + return self.get_varlen_series(name) + return self.open_varlen_series[name] + if not os.path.isdir(path): + raise DoesNotExist('series %s does not exist' % (name, )) + self.open_varlen_series[name] = result = VarlenSeries(path, name) + if self.mpm is not None: + result.register_memory_pressure_manager(self.mpm) + return result + cpdef TimeSeries create_series(self, str name, int block_size, unsigned long entries_per_chunk, int page_size=4096, diff --git a/tempsdb/varlen.pxd b/tempsdb/varlen.pxd index d5dabd3..7d5baa0 100644 --- a/tempsdb/varlen.pxd +++ b/tempsdb/varlen.pxd @@ -13,11 +13,34 @@ cdef class VarlenSeries: list length_profile int max_entries_per_chunk int current_maximum_length + object mpm cpdef int close(self) except -1 + cpdef int delete(self) except -1 cpdef int get_length_for(self, int index) cpdef int add_series(self) except -1 + cdef void register_memory_pressure_manager(self, object mpm) cpdef int append(self, unsigned long long timestamp, bytes data) except -1 + cdef inline int get_maximum_length(self): + if self.size_field == 1: + return 0xFF + elif self.size_field == 2: + return 0xFFFF + elif self.size_field == 4: + return 0xFFFFFFFF + +cdef class VarlenEntry: + cdef: + list chunks + list item_no + VarlenSeries parent + + cpdef int length(self) + cpdef bytes to_bytes(self) + cpdef unsigned long long timestamp(self) + cpdef bytes slice(self, int start, int stop) + cpdef int get_byte_at(self, int index) except -1 + cpdef VarlenSeries create_varlen_series(str path, str name, int size_struct, list length_profile, int max_entries_per_chunk) diff --git a/tempsdb/varlen.pyx b/tempsdb/varlen.pyx index d259edc..e0ee320 100644 --- a/tempsdb/varlen.pyx +++ b/tempsdb/varlen.pyx @@ -1,10 +1,142 @@ import os +import shutil +import typing as tp import struct -from tempsdb.exceptions import Corruption, AlreadyExists +from .chunks cimport Chunk +from .exceptions import Corruption, AlreadyExists from .series cimport TimeSeries, create_series +cdef class VarlenEntry: + """ + An object representing the value. + + It is preferred for an object to exists, instead of copying data. + This should make tempsdb far more zero-copy. + + This behaves as a bytes object, in particular it can be sliced, iterated, + and it's length obtained. It also overloads __bytes__. + """ + def __init__(self, parent: VarlenSeries, chunks: tp.List[Chunk], + item_no: tp.List[int]): + self.parent = parent + self.item_no = item_no + self.chunks = chunks + + cpdef unsigned long long timestamp(self): + """ + :return: timestamp assigned to this entry + """ + return self.chunks[0].get_piece_at(self.item_no[0])[0] + + cpdef int length(self): + """ + :return: self length + """ + cdef bytes b = self.chunks[0].get_slice_of_piece_at(self.item_no[0], 0, self.parent.size_field) + b = b[:self.parent.size_field] + return self.parent.size_struct.unpack(b)[0] + + def __getitem__(self, item): + if isinstance(item, slice): + return self.slice(item.start, item.stop) + else: + return self.get_byte_at(item) + + cpdef int get_byte_at(self, int index) except -1: + """ + Return a byte at a particular index + + :param index: index of the byte + :return: the value of the byte + :raises ValueError: index too large + """ + cdef: + int pointer = 0 + int segment = 0 + int seg_len = 0 + while pointer < index and segment < len(self.chunks): + seg_len = self.parent.get_length_for(segment) + if seg_len+pointer > index: + return self.chunks[segment].get_byte_of_piece(self.item_no[segment], + index-pointer) + pointer += seg_len + segment += 1 + raise ValueError('Index too large') + + cpdef bytes slice(self, int start, int stop): + """ + Returns a slice of the entry + + :param start: position to start at + :param stop: position to stop at + :return: a slice of this entry + :raises ValueError: stop was smaller than start + """ + if stop < start: + raise ValueError('stop smaller than start') + if stop == start: + return b'' + + cdef: + int length = stop-start + bytearray b = bytearray(length) + int segment = 0 + int pointer = 0 + int next_chunk_len + int start_reading_at + + # Track down the right segment to start the read + while pointer < start: + next_chunk_len = self.parent.get_length_for(segment) + if next_chunk_len > start-pointer: + start_reading_at = start - pointer + break + pointer += next_chunk_len + segment += 1 + + + cdef: + int write_pointer = 0 + int chunk_len = self.parent.get_length_for(segment) + int len_to_read = self.parent.get_length_for(segment) - start_reading_at + Chunk chunk = self.chunks[segment] + while write_pointer < length: + if chunk_len > start_reading_at + length: + ... # We end this right here + + b[write_pointer:write_pointer+len_to_read] = chunk.get_slice_of_piece_at(self.item_no[segment], + start_reading_at, + 0) + + + cpdef bytes to_bytes(self): + """ + :return: value as bytes + """ + cdef: + int length = self.length() + bytearray b = bytearray(length) + int pointer = 0 + int segment = 0 + bytes cur_data + int cur_data_len + while pointer < length: + cur_data = self.chunks[segment].get_piece_at(self.item_no[segment])[1] + cur_data_len = len(cur_data) + if cur_data_len > length-pointer: + b[pointer:length] = cur_data[:cur_data_len-(length-pointer)] + else: + b[pointer:pointer+cur_data_len] = cur_data + pointer += cur_data_len + segment += 1 + return bytes(b) + + def __len__(self) -> int: + return self.length() + + cdef class VarlenSeries: """ A time series housing variable length data. @@ -15,9 +147,16 @@ cdef class VarlenSeries: :param path: path to directory containing the series :param name: name of the series """ + cdef void register_memory_pressure_manager(self, object mpm): + self.mpm = mpm + cdef TimeSeries series + for series in self.series: + series.register_memory_pressure_manager(mpm) + def __init__(self, path: str, name: str): self.closed = False self.path = path + self.mpm = None self.name = name self.root_series = TimeSeries(os.path.join(path, 'root'), 'root') self.max_entries_per_chunk = self.root_series.max_entries_per_chunk @@ -66,8 +205,12 @@ cdef class VarlenSeries: :param timestamp: timestamp to append it with :param data: data to write + + :raises ValueError: too long an entry """ cdef int data_len = len(data) + if data_len > self.get_maximum_length(): + raise ValueError('data too long') if data_len < self.get_length_for(0): data = self.size_struct.pack(len(data)) + data self.root_series.append_padded(timestamp, data) @@ -89,6 +232,16 @@ cdef class VarlenSeries: self.series[segment].append_padded(timestamp, data_to_put) pointer += cur_len segment += 1 + return 0 + + cpdef int delete(self) except -1: + """ + Erases this variable length series from the disk. + + Closes this series as a side-effect. + """ + self.close() + shutil.rmtree(self.path) cpdef int add_series(self) except -1: """ @@ -134,12 +287,14 @@ cpdef VarlenSeries create_varlen_series(str path, str name, int size_struct, lis Create a variable length series :raises AlreadyExists: directory exists at given path - :raises ValueError: invalid length profile or max_entries_per_chunk + :raises ValueError: invalid length profile or max_entries_per_chunk or size_struct """ if os.path.exists(path): raise AlreadyExists('directory present at paht') if not length_profile or not max_entries_per_chunk: raise ValueError('invalid parameter') + if size_struct not in (1, 2, 4): + raise ValueError('invalid size_struct') os.mkdir(path) cdef TimeSeries root_series = create_series(os.path.join(path, 'root'), -- GitLab