diff --git a/README.md b/README.md index 61bde9f309062acbf5c4013d1aa932199c1142ae..3b66ce0bf181c31c03e714f8fb3c5823d68966be 100644 --- a/README.md +++ b/README.md @@ -10,8 +10,8 @@ Embedded Cython library for time series that you need to upload somewhere. -Stored time series with a 8-bit timestamp and a fixed length of data. -So no variable encoding for you! +Stored time series with a 8-bit timestamp and a data, which can be of +fixed length or variable. # Installation @@ -54,7 +54,7 @@ Then copy your resulting wheel and install it via pip on the target system. ## v0.5 -* both `Database` and `TimeSeries` destructor will close and +* both `Database`, `TimeSeries` and `Chunk` destructor will close and emit a warning if the user forgot to * if page_size is default, it won't be written as part of the metadata * added support for per-series metadata @@ -69,6 +69,8 @@ Then copy your resulting wheel and install it via pip on the target system. * `get_byte_of_piece` * `get_timestamp_at` * fixed the behaviour of `AlternativeMMaps` when passed a single index to __getitem__ and __setitem__ +* added `StillOpen` exception, chunk won't allow to close itself if it has any + remaining references ## v0.4.4 diff --git a/docs/exceptions.rst b/docs/exceptions.rst index f42a9204878b9686209f89891a481b978231d624..b8125bfbeb01f2ed90bc95a3d086dd31af840f2c 100644 --- a/docs/exceptions.rst +++ b/docs/exceptions.rst @@ -14,3 +14,5 @@ The exceptions that inherit from it are: .. autoclass:: tempsdb.exceptions.InvalidState .. autoclass:: tempsdb.exceptions.AlreadyExists + +.. autoclass:: tempsdb.exceptions.StillOpen diff --git a/docs/usage.rst b/docs/usage.rst index 0e89575f242c132ec7edf319298fe8ed623e0417..46d3af9f57da02657b67401d887bd8f38a7c3d80 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -1,6 +1,8 @@ How this does work? =================== +.. note:: This is about fixed length data time series. + Data is stored in so called chunks. A chunk's last page can be actively appended to, or a chunk is immutable. diff --git a/docs/varlen.rst b/docs/varlen.rst index 4275eaaffc6ed43a1dc79e934dfcd08cf1523356..908d1eb84e5f2d44dd9ed8d6affa15a5a2dea37e 100644 --- a/docs/varlen.rst +++ b/docs/varlen.rst @@ -1,7 +1,7 @@ Variable length series ====================== -Version 0.5 added support for variable length series. +.. versionadded:: 0.5 How does it work? ----------------- @@ -13,7 +13,7 @@ For each series you specify so-called length profile. It is a list of ints, each a block size for next series created. If an entry cannot fit in the already created series, a new one will be created. Note that the last entry of this array will loop forever, so if you for example put a 1024 byte data in a varlen series of length profile [10, 255] there will be a total -of 5 normal time series created to accomodate it, with length of: +of 5 normal time series created to accommodate it, with length of: * 10 * 255 * 255 @@ -33,6 +33,9 @@ Note that the only valid sizes of `size_struct` are: * 3 for maximum length of 16777215 * 4 for maximum length of 4294967295 +Also note that variable length series live in a different namespace than standard +time series, so you can name them the same. + Accessing them -------------- diff --git a/tempsdb/chunks.pxd b/tempsdb/chunks.pxd index a1c7b7fc81a12c5ed4d4f6512817c8850b0cf375..ff1585ccd6d7c86ed9ac47fb19e664c64b1745b2 100644 --- a/tempsdb/chunks.pxd +++ b/tempsdb/chunks.pxd @@ -22,10 +22,13 @@ cdef class Chunk: object file, mmap, file_lock_object bint closed + cdef void incref(self) + cdef int decref(self) except -1 cpdef object iterate_indices(self, unsigned long starting_entry, unsigned long stopping_entry) - cpdef int close(self) except -1 + cpdef int close(self, bint force=*) except -1 cpdef unsigned long long get_timestamp_at(self, unsigned int index) cdef tuple get_piece_at(self, unsigned int index) + cpdef bytes get_value_at(self, unsigned int index) cpdef bytes get_slice_of_piece_at(self, unsigned int index, int start, int stop) cpdef bytes get_slice_of_piece_starting_at(self, unsigned int index, int start) cpdef int get_byte_of_piece(self, unsigned int index, int byte_index) except -1 diff --git a/tempsdb/chunks.pyx b/tempsdb/chunks.pyx index 429a5afa4c6ed70e7015b27efc5f95de05e3d82b..3d3066a08d09fd875e9932d8db535d4316c63938 100644 --- a/tempsdb/chunks.pyx +++ b/tempsdb/chunks.pyx @@ -4,7 +4,9 @@ import threading import typing as tp import struct import mmap -from .exceptions import Corruption, InvalidState, AlreadyExists +import warnings + +from .exceptions import Corruption, InvalidState, AlreadyExists, StillOpen from .series cimport TimeSeries DEF HEADER_SIZE = 4 @@ -132,6 +134,17 @@ cdef class Chunk: self.mmap = AlternativeMMap(self.file, self.file_lock_object) return 0 + cdef void incref(self): + if self.parent is not None: + self.parent.incref_chunk(self.min_ts) + + cdef int decref(self) except -1: + if self.parent is not None: + self.parent.decref_chunk(self.min_ts) + if self.parent.refs_chunks[self.name()] < 0: + raise ValueError('reference of chunk fell below zero!') + return 0 + def __init__(self, parent: tp.Optional[TimeSeries], path: str, page_size: int, use_descriptor_access: bool = False): cdef bytes b @@ -216,7 +229,7 @@ cdef class Chunk: raise IndexError('Index too large') cdef: unsigned long starting_index = HEADER_SIZE + TIMESTAMP_SIZE + index * self.block_size_plus + start - unsigned long stopping_index = starting_index + self.block_size_plus + stop + unsigned long stopping_index = starting_index + stop return self.mmap[starting_index:stopping_index] cpdef unsigned long long get_timestamp_at(self, unsigned int index): @@ -381,23 +394,47 @@ cdef class Chunk: def __len__(self): return self.length() - cpdef int close(self) except -1: + cpdef int close(self, bint force=False) except -1: """ Close the chunk and close the allocated resources + + :param force: whether to close the chunk even if it's open somewhere + :raises StillOpen: this chunk has a parent attached and the parent + says that this chunk is still being referred to """ if self.closed: return 0 + cdef unsigned long long name = self.name() if self.parent: with self.parent.open_lock: - del self.parent.open_chunks[self.name()] + if not force and self.parent.refs_chunks.get(name, 0) > 0: + raise StillOpen('this chunk is opened') + del self.parent.refs_chunks[name] + del self.parent.open_chunks[name] self.parent = None self.mmap.close() self.file.close() return 0 - def __del__(self): + def __del__(self) -> None: + if self.closed: + return + warnings.warn('You forgot to close a Chunk') self.close() + cpdef bytes get_value_at(self, unsigned int index): + """ + Return only the value at a particular index, numbered from 0 + + :return: value at given index + """ + if index >= self.entries: + raise IndexError('Index too large') + cdef: + unsigned long starting_index = HEADER_SIZE + TIMESTAMP_SIZE + index * self.block_size_plus + unsigned long stopping_index = starting_index + self.block_size + return self.mmap[starting_index:stopping_index] + cdef tuple get_piece_at(self, unsigned int index): """ Return a piece of data at a particular index, numbered from 0 diff --git a/tempsdb/exceptions.pyx b/tempsdb/exceptions.pyx index 023cc8a4f6335a889b288eab8bf837bee294bee2..fd213a324d9bb927e9a80ffd9742411b37f41b79 100644 --- a/tempsdb/exceptions.pyx +++ b/tempsdb/exceptions.pyx @@ -17,3 +17,7 @@ class InvalidState(TempsDBError): class AlreadyExists(TempsDBError): """Provided object already exists""" ... + +class StillOpen(TempsDBError): + """This resource has outstanding references and cannot be closed""" + ... diff --git a/tempsdb/iterators.pxd b/tempsdb/iterators.pxd index 31e4d5c71eaf9e8d554d0006cd787a8a47e2f8e3..b5befd8b773d2442ce57746cc0bdd5becd806118 100644 --- a/tempsdb/iterators.pxd +++ b/tempsdb/iterators.pxd @@ -13,7 +13,7 @@ cdef class Iterator: bint closed Chunk current_chunk - cpdef void close(self) + cpdef int close(self) except -1 cdef int get_next(self) except -1 cpdef tuple next_item(self) cdef tuple next_item_pos(self) diff --git a/tempsdb/iterators.pyx b/tempsdb/iterators.pyx index d5af6f7f563ff1f3a1bb31a46d23ceab5113bd79..e898745aafadc13521d105c6e732942d47386445 100644 --- a/tempsdb/iterators.pyx +++ b/tempsdb/iterators.pyx @@ -50,7 +50,7 @@ cdef class Iterator: warnings.warn('You forgot to close an Iterator. Please close them explicitly!') self.close() - cpdef void close(self): + cpdef int close(self) except -1: """ Close this iterator, release chunks. @@ -61,12 +61,13 @@ cdef class Iterator: No-op if iterator is already closed. """ if self.closed: - return + return 0 self.closed = True cdef Chunk chunk for chunk in self.chunks: - self.parent.decref_chunk(chunk.name()) + chunk.decref() self.chunks = None + return 0 cdef int get_next(self) except -1: """ @@ -109,15 +110,19 @@ cdef class Iterator: cdef tuple next_item_pos(self): """ - :return: the timestamp of next element and a position of it within the current chunk - :rtype: tp.Tuple[int, int] + Note that this increases the chunk reference count. + + :return: the timestamp of next element and a position of it within the current chunk, + along with that chunk + :rtype: tp.Tuple[int, int, Chunk] """ try: if self.current_chunk is None: self.get_next() elif self.i == self.limit: self.get_next() - return self.current_chunk.get_timestamp_at(self.i), self.i + self.current_chunk.incref() + return self.current_chunk.get_timestamp_at(self.i), self.i, self.current_chunk except StopIteration: return None finally: diff --git a/tempsdb/series.pxd b/tempsdb/series.pxd index 5c6974ee6b497c88147804b1754e9de3c031c3d9..4ef27a8ecada834cf6b21b4d3485b45e65738b6d 100644 --- a/tempsdb/series.pxd +++ b/tempsdb/series.pxd @@ -25,7 +25,7 @@ cdef class TimeSeries: cdef void register_memory_pressure_manager(self, object mpm) cpdef int delete(self) except -1 cdef dict get_metadata(self) - cpdef void close(self) + cpdef int close(self) except -1 cdef void incref_chunk(self, unsigned long long name) cdef void decref_chunk(self, unsigned long long name) cdef Chunk open_chunk(self, unsigned long long name) diff --git a/tempsdb/series.pyx b/tempsdb/series.pyx index 05e03163098517ff92ec14787ca0f9e5e692930b..a6905e8dd26141904f601b57fb9e7df398cf9155 100644 --- a/tempsdb/series.pyx +++ b/tempsdb/series.pyx @@ -217,7 +217,7 @@ cdef class TimeSeries: pass return 0 - cpdef void close(self): + cpdef int close(self) except -1: """ Close the series. @@ -226,14 +226,16 @@ cdef class TimeSeries: cdef: Chunk chunk list open_chunks - if not self.closed: - open_chunks = list(self.open_chunks.values()) - for chunk in open_chunks: - chunk.close() - if self.mpm is not None: - self.mpm.cancel() - self.mpm = None - self.closed = True + if self.closed: + return 0 + open_chunks = list(self.open_chunks.values()) + for chunk in open_chunks: + chunk.close(True) + if self.mpm is not None: + self.mpm.cancel() + self.mpm = None + self.closed = True + return 0 cdef unsigned int get_index_of_chunk_for(self, unsigned long long timestamp): """ @@ -357,8 +359,12 @@ cdef class TimeSeries: cpdef int close_chunks(self) except -1: """ - Close all superficially opened chunks. + Close all chunks opened by read requests that are not referred to anymore. + + No-op if closed. """ + if self.closed: + return 0 if self.last_chunk is None: return 0 if len(self.chunks) == 1: diff --git a/tempsdb/varlen.pxd b/tempsdb/varlen.pxd index 67bac8b7bd4a4dfc14d2951a131b6ddc7c2eedd4..456364f16636035724a3c0fb83646bc5a2048031 100644 --- a/tempsdb/varlen.pxd +++ b/tempsdb/varlen.pxd @@ -7,35 +7,43 @@ cdef class VarlenSeries: int size_field int references object size_struct - readonly str path - readonly str name + str path + str name TimeSeries root_series - list series + readonly list series list length_profile int max_entries_per_chunk int current_maximum_length object mpm + cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1 cpdef int close(self) except -1 cpdef int delete(self) except -1 cdef int get_length_for(self, int index) + cpdef int trim(self, unsigned long long timestamp) except -1 cdef int add_series(self) except -1 cdef void register_memory_pressure_manager(self, object mpm) cpdef int append(self, unsigned long long timestamp, bytes data) except -1 cpdef long long get_maximum_length(self) except -1 + cpdef VarlenIterator iterate_range(self, unsigned long long start, unsigned long long stop, + bint direct_bytes=*) cdef class VarlenIterator: cdef: bint closed - object parent - list chunk_positions + VarlenSeries parent + list positions + list timestamps + list chunks list iterators unsigned long long start unsigned long long stop bint direct_bytes cpdef int close(self) except -1 + cpdef VarlenEntry get_next(self) + cdef int advance_series(self, int index, bint force) except -1 cdef class VarlenEntry: @@ -53,7 +61,7 @@ cdef class VarlenEntry: cpdef int get_byte_at(self, int index) except -1 cpdef bint endswith(self, bytes v) cpdef bint startswith(self, bytes v) - + cpdef int close(self) except -1 cpdef VarlenSeries create_varlen_series(str path, str name, int size_struct, list length_profile, int max_entries_per_chunk) diff --git a/tempsdb/varlen.pyx b/tempsdb/varlen.pyx index 229bd38a5fa3e10a84241a7df9ff7a0822b5fa5f..e4eed496cc75bc36c92571bc58c7fd00436af29f 100644 --- a/tempsdb/varlen.pyx +++ b/tempsdb/varlen.pyx @@ -5,7 +5,7 @@ import struct import warnings from .chunks cimport Chunk -from .exceptions import Corruption, AlreadyExists +from .exceptions import Corruption, AlreadyExists, StillOpen from .iterators cimport Iterator from .series cimport TimeSeries, create_series @@ -19,7 +19,10 @@ cdef class VarlenEntry: values are routinely longer than 20-40 bytes. This behaves as a bytes object, in particular it can be sliced, iterated, - and it's length obtained. It also overloads __bytes__. + and it's length obtained. It also overloads __bytes__. It's also directly comparable + and hashable, and boolable. + + This acquires a reference to the chunk it refers, and releases it upon destruction. Once :meth:`~tempsdb.varlen.VarlenEntry.to_bytes` is called, it's result will be cached. @@ -28,7 +31,12 @@ cdef class VarlenEntry: item_no: tp.List[int]): self.parent = parent self.item_no = item_no - self.chunks = chunks + cdef Chunk chunk + self.chunks = [] + for chunk in chunks: + if chunk is not None: + chunk.incref() + self.chunks.append(chunk) self.data = None #: cached data, filled in by to_bytes self.len = -1 @@ -45,15 +53,11 @@ cdef class VarlenEntry: if self.data is not None: return self.data.startswith(v) - if self.len > -1: - if len(v) > self.len: - return False - else: - if len(v) > self.length(): - return False + if len(v) > self.length(): + return False - cdef bytes b = self.slice(0, self.length) - return self.length == v + cdef bytes b = self.slice(0, self.len) + return b == v cpdef bint endswith(self, bytes v): """ @@ -80,6 +84,18 @@ cdef class VarlenEntry: cdef bytes b = self.slice(self.len-len_v, self.len) return b == v + def __gt__(self, other) -> bool: + return self.to_bytes() > other + + def __le__(self, other) -> bool: + return self.to_bytes() < other + + def __eq__(self, other) -> bool: + return self.to_bytes() == other + + def __hash__(self) -> bool: + return hash(self.to_bytes()) + def __bool__(self) -> bool: if self.data is not None: return bool(self.data) @@ -98,14 +114,14 @@ cdef class VarlenEntry: if self.len > -1: return self.len cdef bytes b = self.chunks[0].get_slice_of_piece_at(self.item_no[0], 0, self.parent.size_field) - b = b[:self.parent.size_field] + assert len(b) == self.parent.size_field, 'Invalid slice!' self.len = self.parent.size_struct.unpack(b)[0] return self.len def __contains__(self, item: bytes) -> bool: return item in self.to_bytes() - def __getitem__(self, item): + def __getitem__(self, item: tp.Union[int, slice]) -> tp.Union[int, bytes]: if isinstance(item, slice): return self.slice(item.start, item.stop) else: @@ -123,15 +139,17 @@ cdef class VarlenEntry: int pointer = 0 int segment = 0 int seg_len = 0 + int offset = self.parent.size_field if self.data is not None: return self.data[index] while pointer < index and segment < len(self.chunks): seg_len = self.parent.get_length_for(segment) if seg_len+pointer > index: return self.chunks[segment].get_byte_of_piece(self.item_no[segment], - index-pointer) + offset+index-pointer) pointer += seg_len segment += 1 + offset = 0 raise ValueError('Index too large') cpdef bytes slice(self, int start, int stop): @@ -173,11 +191,12 @@ cdef class VarlenEntry: int len_to_read = self.parent.get_length_for(segment) - start_reading_at Chunk chunk = self.chunks[segment] bytes temp_data + int offset = self.parent.size_field while write_pointer < length and len(self.chunks) < segment: if chunk_len-start_reading_at >= + (length - write_pointer): # We have all the data that we require b[write_pointer:length] = chunk.get_slice_of_piece_at(self.item_no[segment], - 0, length-write_pointer) + offset, offset+length-write_pointer) return bytes(b) temp_data = chunk.get_slice_of_piece_at(self.item_no[segment], 0, chunk_len) @@ -185,6 +204,7 @@ cdef class VarlenEntry: write_pointer += chunk_len segment += 1 start_reading_at = 0 + offset = 0 raise ValueError('invalid indices') @@ -202,17 +222,19 @@ cdef class VarlenEntry: int segment = 0 bytes cur_data int cur_data_len - while pointer < length: - cur_data = self.chunks[segment].get_piece_at(self.item_no[segment])[1] - cur_data_len = len(cur_data) + int offset = self.parent.size_field + while pointer < length and segment < len(self.chunks): + cur_data = self.chunks[segment].get_value_at(self.item_no[segment]) + cur_data_len = self.parent.get_length_for(segment) if cur_data_len > length-pointer: - b[pointer:length] = cur_data[:cur_data_len-(length-pointer)] - else: - b[pointer:pointer+cur_data_len] = cur_data + b[pointer:length] = cur_data[offset:length-pointer+offset] + break + b[pointer:pointer+cur_data_len] = cur_data[offset:cur_data_len+offset] pointer += cur_data_len segment += 1 - if self.data is None: - self.data = bytes(b) + offset = 0 + first_segment = False + self.data = bytes(b) self.len = length return self.data @@ -225,6 +247,25 @@ cdef class VarlenEntry: def __len__(self) -> int: return self.length() + cpdef int close(self) except -1: + """ + Close this object and release all the references. + + It is not necessary to call, since the destructor will call this. + .. warning:: Do not let your VarlenEntries outlive the iterator itself! + It will be impossible to close the iterator. + """ + cdef Chunk chunk + if self.chunks is None: + return 0 + for chunk in self.chunks: + chunk.decref() + self.chunks = None + return 0 + + def __del__(self) -> None: + self.close() + STRUCT_L = struct.Struct('<L') class ThreeByteStruct: @@ -240,7 +281,10 @@ cdef class VarlenIterator: """ A result of a varlen series query. - Please close it when you're done. + This iterator will close itself when completed. If you break out of it's + iteration, please close it youself via + :meth:`~tempsdb.varlen.VarlenIterator.close` + If you forget to do that, a warning will be issued and the destructor will close it automatically. @@ -259,21 +303,83 @@ cdef class VarlenIterator: self.stop = stop self.direct_bytes = direct_bytes self.closed = False - self.chunk_positions = [] + cdef int amount_series = len(self.parent.series) + self.positions = [None] * amount_series + self.chunks = [None] * amount_series + self.timestamps = [None] * amount_series self.iterators = [] - self.next_timestamps = [] cdef: TimeSeries series Iterator iterator - for series in self.parent.series: - iterator = series.iterate_range(start, stop) - iterator.get_next() + Chunk chunk + unsigned int pos + unsigned long long ts + tuple tpl + int i + for i in range(amount_series): + iterator = self.parent.series[i].iterate_range(start, stop) self.iterators.append(iterator) - self.chunk_positions.append(iterator.i) + for i in range(amount_series): + iterator = self.iterators[i] + self.advance_series(i, True) + + cdef int advance_series(self, int index, bint force) except -1: + cdef: + Iterator iterator = self.iterators[index] + tuple tpl + Chunk old_chunk, chunk + if iterator is None and not force: + return 0 + + tpl = iterator.next_item_pos() + if tpl is None: + self.timestamps[index] = None + self.positions[index] = None + old_chunk = self.chunks[index] + if old_chunk is not None: + old_chunk.decref() + self.chunks[index] = None + iterator.close() + self.iterators[index] = None + else: + ts, pos, chunk = tpl + self.timestamps[index] = ts + self.positions[index] = pos + self.chunks[index] = chunk + return 0 + + cpdef VarlenEntry get_next(self): + """ + Return next element of the iterator, or None if no more available. + """ + if self.timestamps[0] is None: + return None + cdef: + unsigned long long ts = self.timestamps[0] + list chunks = [] + list positions = [] + int i + + for i in range(len(self.chunks)): + if self.timestamps[i] is None: + break + elif self.timestamps[i] == ts: + chunks.append(self.chunks[i]) + positions.append(self.positions[i]) + self.advance_series(i, False) + return VarlenEntry(self.parent, chunks, positions) - def __next__(self) -> tp.Tuple[int, tp.Union[bytes, VarlenEntry]]: - ... + def __next__(self): + cdef VarlenEntry varlen_entry = self.get_next() + if varlen_entry is None: + self.close() + raise StopIteration('iterator exhausted') + else: + if self.direct_bytes: + return varlen_entry.timestamp(), varlen_entry.to_bytes() + else: + return varlen_entry.timestamp(), varlen_entry def __iter__(self): return self @@ -281,13 +387,22 @@ cdef class VarlenIterator: cpdef int close(self) except -1: """ Close this iterator and release all the resources + + No-op if already closed. """ - cdef Iterator iterator - if not self.closed: - self.parent.references -= 1 - self.closed = True - for iterator in self.iterators: + cdef: + Iterator iterator + Chunk chunk + if self.closed: + return 0 + self.closed = True + for iterator in self.iterators: + if iterator is not None: iterator.close() + for chunk in self.chunks: + if chunk is not None: + chunk.decref() + self.parent.references -= 1 return 0 def __del__(self): @@ -312,6 +427,13 @@ cdef class VarlenSeries: for series in self.series: series.register_memory_pressure_manager(mpm) + cpdef VarlenIterator iterate_range(self, unsigned long long start, unsigned long long stop, + bint direct_bytes=False): + """ + Return an iterator with the data + """ + return VarlenIterator(self, start, stop, direct_bytes=direct_bytes) + def __init__(self, path: str, name: str): self.closed = False self.path = path @@ -361,6 +483,22 @@ cdef class VarlenSeries: self.current_maximum_length = tot_length + cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1: + """ + Mark the series as synchronized up to particular period + + :param timestamp: timestamp of synchronization + """ + self.root_series.mark_synced_up_to(timestamp) + return 0 + + @property + def last_entry_synced(self) -> int: + """ + :return: timestamp of the last entry synchronized. Starting value is 0 + """ + return self.root_series.last_entry_synced + cpdef int append(self, unsigned long long timestamp, bytes data) except -1: """ Append an entry to the series @@ -432,15 +570,34 @@ cdef class VarlenSeries: cpdef int close(self) except -1: """ - Close this series + Close this series. + + No-op if already closed. + + :raises StillOpen: some references are being held """ if self.closed: return 0 + if self.references: + raise StillOpen('still some iterators around') + self.closed = True cdef TimeSeries series for series in self.series: series.close() + return 0 + + cpdef int trim(self, unsigned long long timestamp) except -1: + """ + Try to delete all entries younger than timestamp + + :param timestamp: timestamp that separates alive entries from the dead + """ + cdef TimeSeries series + for series in self.series: + series.trim(timestamp) + return 0 cpdef long long get_maximum_length(self) except -1: """ diff --git a/tests/test_varlen.py b/tests/test_varlen.py index 9c64dd2bd6b94a772bfda5a50f6fc8b85205ed0e..3299d2a6313b8a6bcbf2d0430f4d6d795df6f20b 100644 --- a/tests/test_varlen.py +++ b/tests/test_varlen.py @@ -1,17 +1,24 @@ +import logging import os import unittest from tempsdb.varlen import create_varlen_series +logger = logging.getLogger(__name__) + class TestVarlen(unittest.TestCase): def test_varlen(self): + series = [(0, b'test skarabeusza'), (10, b'test skarabeuszatest skarabeusza')] varlen = create_varlen_series('test_dir', 'test_dir', 2, [10, 20, 10], 20) - try: - varlen.append(0, b'test skarabeusza') - self.assertEqual(len(os.listdir('test_dir')), 2) - varlen.append(10, b'test skarabeuszatest skarabeusza') - self.assertEqual(len(os.listdir('test_dir')), 3) - finally: - varlen.close() + varlen.append(*series[0]) + self.assertEqual(len(os.listdir('test_dir')), 2) + + varlen.append(*series[1]) + self.assertEqual(len(os.listdir('test_dir')), 3) + + it = varlen.iterate_range(0, 20) + lst = [(ts, v.to_bytes()) for ts, v in it] + it.close() + self.assertEqual(lst, series) diff --git a/unittest.Dockerfile b/unittest.Dockerfile index fc3401c6e587a78d17cd00b64f700b38678858a9..bccaee128e06f1d891c83204193b8f90f442213e 100644 --- a/unittest.Dockerfile +++ b/unittest.Dockerfile @@ -10,7 +10,6 @@ WORKDIR /app ENV CI=true RUN python setup.py build_ext --inplace - ADD tests /app/tests -CMD ["coverage", "run", "-m", "nose2", "-vv"] +CMD ["nose2", "-vv"]