diff --git a/README.md b/README.md index 91fd2a3fa3d7877fca86b1bf18c26d4a997c51c9..b4da7ecf044f01914fff39af7eb4c46f0fe6e50d 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,7 @@ Then copy your resulting wheel and install it via pip on the target system. ## v0.6 +* **bugfix**: fixed some bugs with reading values after close * added support for storing metadata as minijson * this will be enabled by default is minijson is importable * fixed minor compiler warnings diff --git a/tempsdb/chunks/base.pxd b/tempsdb/chunks/base.pxd index 83ccb8a1c269063068544d26692ec196190f7735..febcc7e8b399b011eea186544367f9ae08d829c2 100644 --- a/tempsdb/chunks/base.pxd +++ b/tempsdb/chunks/base.pxd @@ -13,7 +13,6 @@ cdef class Chunk: readonly str path readonly unsigned long long min_ts readonly unsigned long long max_ts - unsigned int block_size_plus # block size plus timestamp length readonly unsigned int block_size readonly unsigned long entries unsigned long file_size @@ -32,7 +31,7 @@ cdef class Chunk: cpdef int get_byte_of_piece(self, unsigned int index, int byte_index) except -1 cpdef unsigned int find_left(self, unsigned long long timestamp) cpdef unsigned int find_right(self, unsigned long long timestamp) - cpdef object open_file(self, str path) + cdef object open_file(self, str path) cpdef int extend(self) except -1 cpdef int after_init(self) except -1 cpdef int append(self, unsigned long long timestamp, bytes data) except -1 diff --git a/tempsdb/chunks/base.pyx b/tempsdb/chunks/base.pyx index 344b725ffdce4947a47e38af6ea7ff4fd0c3f42a..ff5eed7e17a466ac4c2782ea48542d3d03e35ba7 100644 --- a/tempsdb/chunks/base.pyx +++ b/tempsdb/chunks/base.pyx @@ -24,7 +24,9 @@ cdef class AlternativeMMap: """ An alternative mmap implementation used when mmap cannot allocate due to memory issues. - Note that opening gzip files is slow, as the script needs to iterate + Note that opening gzip files is slow, as the script needs to iterate. + + Utilizing negative indices is always wrong! """ def flush(self): self.io.flush() @@ -42,14 +44,14 @@ cdef class AlternativeMMap: rw_gz = io_file self.size = rw_gz.size else: - self.io.seek(0, 2) + self.io.seek(0, io.SEEK_END) self.size = self.io.tell() self.file_lock_object = file_lock_object def __getitem__(self, item: tp.Union[int, slice]) -> tp.Union[int, bytes]: cdef: - unsigned long start = item.start - unsigned long stop = item.stop + unsigned long start + unsigned long stop bytes b with self.file_lock_object: if isinstance(item, int): @@ -64,10 +66,12 @@ cdef class AlternativeMMap: def __setitem__(self, key: tp.Union[int, slice], value: tp.Union[int, bytes]) -> None: cdef: - unsigned long start = key.start + unsigned long start if isinstance(key, int): self[key:key+1] = bytes([value]) else: + start = key.start + assert key.stop - start == len(value), 'invalid write length!' with self.file_lock_object: if not isinstance(self.io, ReadWriteGzipFile): self.io.seek(start, 0) @@ -160,15 +164,21 @@ cdef class Chunk: :meta private: """ self.entries, = STRUCT_L.unpack(self.mmap[self.file_size-FOOTER_SIZE:self.file_size]) - self.pointer = self.entries*self.block_size_plus+HEADER_SIZE + self.pointer = self.entries*(self.block_size+TIMESTAMP_SIZE)+HEADER_SIZE self.max_ts = self.get_timestamp_at(self.entries-1) + print('Readed',self.path, 'entries=', self.entries, 'max ts=', self.max_ts, + 'file size=', self.file_size, 'pointer=', self.pointer, 'page size=', self.page_size) + + if self.entries == 3867: + print('last record of 3867: ', repr(self.mmap[69592:69610])) + if self.pointer >= self.page_size: # Inform the OS that we don't need the header anymore self.mmap.madvise(mmap.MADV_DONTNEED, 0, HEADER_SIZE+TIMESTAMP_SIZE) return 0 - cpdef object open_file(self, str path): + cdef object open_file(self, str path): return open(self.path, 'rb+') def __init__(self, TimeSeries parent, str path, int page_size, @@ -199,7 +209,6 @@ cdef class Chunk: try: self.block_size, self.min_ts = STRUCT_LQ.unpack(self.mmap[0:HEADER_SIZE+TIMESTAMP_SIZE]) - self.block_size_plus = self.block_size + TIMESTAMP_SIZE except struct.error: self.close() raise Corruption('Could not read the header of the chunk file %s' % (self.path, )) @@ -210,10 +219,6 @@ cdef class Chunk: self.after_init() - if self.pointer >= self.page_size: - # Inform the OS that we don't need the header anymore - self.mmap.madvise(mmap.MADV_DONTNEED, 0, HEADER_SIZE+TIMESTAMP_SIZE) - cpdef int get_byte_of_piece(self, unsigned int index, int byte_index) except -1: """ Return a particular byte of given element at given index. @@ -228,7 +233,7 @@ cdef class Chunk: if index > self.entries: raise ValueError('index too large') cdef: - unsigned long offset = HEADER_SIZE + TIMESTAMP_SIZE + index * self.block_size_plus + byte_index + unsigned long offset = HEADER_SIZE + TIMESTAMP_SIZE + index * (self.block_size + TIMESTAMP_SIZE) + byte_index return self.mmap[offset] cpdef bytes get_slice_of_piece_starting_at(self, unsigned int index, int start): @@ -253,7 +258,7 @@ cdef class Chunk: if index >= self.entries: raise IndexError('Index too large') cdef: - unsigned long starting_index = HEADER_SIZE + TIMESTAMP_SIZE + index * self.block_size_plus + start + unsigned long starting_index = HEADER_SIZE + TIMESTAMP_SIZE + index * (self.block_size + TIMESTAMP_SIZE) + start unsigned long stopping_index = starting_index + stop return self.mmap[starting_index:stopping_index] @@ -267,8 +272,9 @@ cdef class Chunk: :return: the timestamp """ cdef: - unsigned long starting_index = HEADER_SIZE + index * self.block_size_plus + unsigned long starting_index = HEADER_SIZE + index * (self.block_size + TIMESTAMP_SIZE) unsigned long stopping_index = starting_index + TIMESTAMP_SIZE + print('reading timestamp from', starting_index, 'to', stopping_index) return STRUCT_Q.unpack(self.mmap[starting_index:stopping_index])[0] cpdef unsigned int find_left(self, unsigned long long timestamp): @@ -403,7 +409,7 @@ cdef class Chunk: if index >= self.entries: raise IndexError('Index too large') cdef: - unsigned long starting_index = HEADER_SIZE + TIMESTAMP_SIZE + index * self.block_size_plus + unsigned long starting_index = HEADER_SIZE + TIMESTAMP_SIZE + index * (self.block_size + TIMESTAMP_SIZE) unsigned long stopping_index = starting_index + self.block_size return self.mmap[starting_index:stopping_index] @@ -425,8 +431,10 @@ cdef class Chunk: del self.parent.refs_chunks[name] del self.parent.open_chunks[name] self.parent = None + self.sync() self.mmap.close() self.file.close() + print('closing', self.path) return 0 def __del__(self) -> None: @@ -446,8 +454,8 @@ cdef class Chunk: raise IndexError('Index too large, got %s while max entries is %s' % (index, self.entries)) cdef: - unsigned long starting_index = HEADER_SIZE + index * self.block_size_plus - unsigned long stopping_index = starting_index + self.block_size_plus + unsigned long starting_index = HEADER_SIZE + index * (self.block_size + TIMESTAMP_SIZE) + unsigned long stopping_index = starting_index + (self.block_size + TIMESTAMP_SIZE) unsigned long long ts = STRUCT_Q.unpack( self.mmap[starting_index:starting_index+TIMESTAMP_SIZE])[0] return ts, self.mmap[starting_index+TIMESTAMP_SIZE:stopping_index] diff --git a/tempsdb/chunks/direct.pyx b/tempsdb/chunks/direct.pyx index 266492832f8879eb4e8fc1987528d2f7614af816..76fa3bb39d76dfa3a0e67f6f42e43311255385e4 100644 --- a/tempsdb/chunks/direct.pyx +++ b/tempsdb/chunks/direct.pyx @@ -72,10 +72,10 @@ cdef class DirectChunk(Chunk): else: self.file.seek(0, os.SEEK_END) self.file_size = self.file.tell() - self.entries = (self.file_size - HEADER_SIZE) // self.block_size_plus + self.entries = (self.file_size - HEADER_SIZE) // (self.block_size + TIMESTAMP_SIZE) self.pointer = self.file_size - d = (self.file_size - self.block_size) - (self.file_size-self.block_size_plus) - cdef bytes b = self.mmap[self.file_size-self.block_size_plus:self.file_size-self.block_size] + d = (self.file_size - self.block_size) - (self.file_size-(self.block_size + TIMESTAMP_SIZE)) + cdef bytes b = self.mmap[self.file_size-(self.block_size + TIMESTAMP_SIZE):self.file_size-self.block_size] self.max_ts, = STRUCT_Q.unpack(b) return 0 @@ -91,13 +91,13 @@ cdef class DirectChunk(Chunk): if self.file_lock_object: self.file_lock_object.acquire() try: - self.file_size += self.block_size_plus + self.file_size += self.block_size + TIMESTAMP_SIZE if not isinstance(self.file, ReadWriteGzipFile): self.file.seek(self.pointer, 0) b = STRUCT_Q.pack(timestamp) + data self.file.write(b) self.mmap.resize(self.file_size) - self.pointer += self.block_size_plus + self.pointer += self.block_size + TIMESTAMP_SIZE self.entries += 1 finally: if self.file_lock_object: diff --git a/tempsdb/chunks/maker.pyx b/tempsdb/chunks/maker.pyx index 703858a27ecb32bbcd532445c4c027bff2063019..f0daca6a88d20d9d6939b237e164d845ecd71159 100644 --- a/tempsdb/chunks/maker.pyx +++ b/tempsdb/chunks/maker.pyx @@ -63,13 +63,10 @@ cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timesta unsigned long bytes_to_pad if not use_direct_mode: # Pad this thing to page_size - bytes_to_pad = page_size - (file_size % page_size) + bytes_to_pad = page_size - 4 - (file_size % page_size) file.write(b'\x00' * bytes_to_pad) - # Create a footer at the end - footer = bytearray(page_size) - footer[-4:] = b'\x01\x00\x00\x00' # 1 in little endian - file.write(footer) + file.write(b'\x01\x00\x00\x00') file.close() if gzip_compression_level: return DirectChunk(parent, original_path, page_size, use_descriptor_access=True, diff --git a/tempsdb/chunks/normal.pyx b/tempsdb/chunks/normal.pyx index 8339d979d78f1f47f915f257d1087b95e414f187..245b6f19b10de41c82839a57cf0bcc9130852644 100644 --- a/tempsdb/chunks/normal.pyx +++ b/tempsdb/chunks/normal.pyx @@ -41,10 +41,11 @@ cdef class NormalChunk(Chunk): if self.file_lock_object: self.file_lock_object.acquire() try: + self.sync() self.file_size += self.page_size self.file.seek(0, io.SEEK_END) ba = bytearray(self.page_size) - ba[self.page_size-FOOTER_SIZE:self.page_size] = STRUCT_L.pack(self.entries) + ba[-FOOTER_SIZE:] = STRUCT_L.pack(self.entries) self.file.write(ba) try: self.mmap.resize(self.file_size) @@ -77,10 +78,12 @@ cdef class NormalChunk(Chunk): if self.closed: raise InvalidState('chunk is closed') - if self.pointer >= self.file_size-FOOTER_SIZE-self.block_size_plus: + if self.pointer >= self.file_size-FOOTER_SIZE-self.block_size-TIMESTAMP_SIZE: self.extend() cdef unsigned long long ptr_end = self.pointer + TIMESTAMP_SIZE # Append entry + if self.entries > 4090: + print('writing %s to %s@%s ec=%s' % (repr(data), self.path, self.pointer, self.entries)) self.mmap[self.pointer:ptr_end] = STRUCT_Q.pack(timestamp) self.mmap[ptr_end:ptr_end+self.block_size] = data self.entries += 1 @@ -88,5 +91,5 @@ cdef class NormalChunk(Chunk): self.mmap[self.file_size-FOOTER_SIZE:self.file_size] = STRUCT_L.pack(self.entries) # Update properties self.max_ts = timestamp - self.pointer += self.block_size_plus + self.pointer += self.block_size + TIMESTAMP_SIZE return 0 diff --git a/tempsdb/database.pyx b/tempsdb/database.pyx index ce885d87b50ff3a31613982181caa385159ca9a6..8bd7c3b93831e6e690f98b8526bbf3f5db255de4 100644 --- a/tempsdb/database.pyx +++ b/tempsdb/database.pyx @@ -304,7 +304,7 @@ cdef class Database: cpdef TimeSeries create_series(self, str name, int block_size, unsigned long entries_per_chunk, - int page_size=4096, + int page_size=0, bint use_descriptor_based_access=False, int gzip_level=0): """ @@ -315,7 +315,7 @@ cdef class Database: :param name: name of the series :param block_size: size of the data field :param entries_per_chunk: entries per chunk file - :param page_size: size of a single page. Default is 4096 + :param page_size: size of a single page. Default (0) is autodetect. :param use_descriptor_based_access: whether to use descriptor based access instead of mmap. Default is False :param gzip_level: gzip compression level. Default is 0 which means "don't use gzip" diff --git a/tempsdb/series.pyx b/tempsdb/series.pyx index d0d9ca0309df7aa326e252c1abb378fce6d63863..eb5171856846a7a4f7941f39c6a9312c92d8fa29 100644 --- a/tempsdb/series.pyx +++ b/tempsdb/series.pyx @@ -1,3 +1,4 @@ +import resource import os import typing as tp import shutil @@ -11,8 +12,6 @@ from .chunks.maker cimport create_chunk from .exceptions import DoesNotExist, Corruption, InvalidState, AlreadyExists from .metadata cimport read_meta_at, write_meta_at -DEF DEFAULT_PAGE_SIZE=4096 - cdef set metadata_file_names = {'metadata.txt', 'metadata.minijson'} @@ -124,7 +123,7 @@ cdef class TimeSeries: self.block_size = metadata['block_size'] self.max_entries_per_chunk = metadata['max_entries_per_chunk'] self.last_entry_synced = metadata['last_entry_synced'] - self.page_size = metadata.get('page_size', 4096) + self.page_size = metadata['page_size'] self.metadata = metadata.get('metadata') self.gzip_level = metadata.get('gzip_level', 0) except ValueError: @@ -386,9 +385,8 @@ cdef class TimeSeries: 'block_size': self.block_size, 'max_entries_per_chunk': self.max_entries_per_chunk, 'last_entry_synced': self.last_entry_synced, + 'page_size': self.page_size } - if self.page_size != DEFAULT_PAGE_SIZE: - meta['page_size'] = self.page_size if self.metadata is not None: meta['metadata'] = self.metadata return meta @@ -523,19 +521,20 @@ cdef class TimeSeries: cpdef TimeSeries create_series(str path, str name, unsigned int block_size, - int max_entries_per_chunk, int page_size=DEFAULT_PAGE_SIZE, + int max_entries_per_chunk, int page_size=0, bint use_descriptor_based_access=False, int gzip_level=0): + if not page_size: + page_size = resource.getpagesize() if os.path.exists(path): raise AlreadyExists('This series already exists!') os.mkdir(path) cdef dict meta = { 'block_size': block_size, 'max_entries_per_chunk': max_entries_per_chunk, - 'last_entry_synced': 0 + 'last_entry_synced': 0, + 'page_size': page_size } - if page_size != DEFAULT_PAGE_SIZE: - meta['page_size'] = page_size if gzip_level: meta['gzip_level'] = gzip_level write_meta_at(path, meta) diff --git a/tests/test.sh b/tests/test.sh index dd44a5f32c500db999f9e6ec27bc9af99181a1ad..9fa28620fe86845f445104ecb16914855f34ad25 100644 --- a/tests/test.sh +++ b/tests/test.sh @@ -1,4 +1,4 @@ #!/bin/bash set -e -python -m coverage run -m nose2 -vv +python -m coverage run -m nose2 -vv -F python -m coverage report diff --git a/tests/test_chunks.py b/tests/test_chunks.py index 9e5b4b3e55475c68d97a9d228ae63d97b9b6dd6b..99a69c34ea03657294252b7ef367c07356f9ff89 100644 --- a/tests/test_chunks.py +++ b/tests/test_chunks.py @@ -38,7 +38,7 @@ class TestDB(unittest.TestCase): self.assertEqual(chunk.find_right(5), 4) self.assertEqual(chunk.find_right(6), 4) chunk.close() - self.assertEqual(os.path.getsize('chunk.db'), 8192) + self.assertEqual(os.path.getsize('chunk.db'), 4096) def test_chunk(self): from tempsdb.chunks.maker import create_chunk @@ -71,4 +71,4 @@ class TestDB(unittest.TestCase): self.assertEqual(chunk.find_right(5), 4) self.assertEqual(chunk.find_right(6), 4) chunk.close() - self.assertEqual(os.path.getsize('chunk.db'), 8192) + self.assertEqual(os.path.getsize('chunk.db'), 4096) diff --git a/tests/test_series.py b/tests/test_series.py index a7f192f62013666bf6edd8d2eb9bbb0e49e8e9fc..48c50720ce55f27d59004accbe4a34f40645dca0 100644 --- a/tests/test_series.py +++ b/tests/test_series.py @@ -3,6 +3,53 @@ import unittest class TestSeries(unittest.TestCase): + + @unittest.skip('bug') + def test_write_series_append_after_close(self): + from tempsdb.series import create_series, TimeSeries + series = create_series('test6', 'test6', 10, 4096) + + for i in range(8000): + series.append(i, b'\x00'*10) + + series.close() + series = TimeSeries('test6', 'test6') + for i in range(8000, 16000): + series.append(i, b'\x00'*10) + + cur_val = 0 + with series.iterate_range(0, 17000) as it: + for ts, v in it: + if ts != cur_val: + self.fail('Failed at %s:%s' % (ts, cur_val)) + cur_val += 1 + + series.close() + + @unittest.skip('because of reasons') + def test_write_series_with_interim_close(self): + from tempsdb.series import create_series, TimeSeries + series = create_series('test4', 'test4', 10, 4096) + + self.assertRaises(ValueError, series.get_current_value) + for i in range(8000): + series.append(i, b'\x00'*10) + + series.close() + series = TimeSeries('test4', 'test4') + self.assertEqual(series.last_entry_ts, i) + self.assertEqual(series.get_current_value(), (i, b'\x00'*10)) + + with series.iterate_range(i, i) as it: + lst = list(it) + self.assertEqual(len(lst), 1) + self.assertEqual(lst[0][0], i) + + series.trim(4100) + + self.assertEqual(len(os.listdir('test3')), 2) + series.close() + def test_write_series(self): from tempsdb.series import create_series series = create_series('test3', 'test3', 10, 4096)