From 374539de621a440ee6a0cb866b5a703c9bd43a5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Sat, 28 Nov 2020 21:33:55 +0100 Subject: [PATCH] switch to little endian --- docs/chunks.rst | 10 ++++++++++ tempsdb/chunks.pxd | 5 +++-- tempsdb/chunks.pyx | 30 ++++++++++++++++++++---------- tempsdb/series.pxd | 4 ++-- tempsdb/series.pyx | 15 +++++++++------ tests/test_db.py | 2 +- 6 files changed, 45 insertions(+), 21 deletions(-) diff --git a/docs/chunks.rst b/docs/chunks.rst index cdf78eb..640ace6 100644 --- a/docs/chunks.rst +++ b/docs/chunks.rst @@ -6,3 +6,13 @@ it directly: .. autoclass:: tempsdb.chunks.Chunk :members: + +Data stored in files is little endian. + + +A file storing a chunk consists as follows: + +* 4 bytes unsigned int - block size +* repeated + * 8 bytes unsigned long long - timestamp + * block_size bytes of data diff --git a/tempsdb/chunks.pxd b/tempsdb/chunks.pxd index d5a1eaf..6d3bb0b 100644 --- a/tempsdb/chunks.pxd +++ b/tempsdb/chunks.pxd @@ -6,7 +6,8 @@ cdef class Chunk: readonly str path readonly unsigned long long min_ts readonly unsigned long long max_ts - readonly unsigned long block_size + unsigned int block_size_plus + readonly unsigned int block_size readonly unsigned long entries object file object mmap @@ -17,7 +18,7 @@ cdef class Chunk: cpdef object iterate_range(self, unsigned long starting_entry, unsigned long stopping_entry) cpdef void close(self) cpdef tuple get_piece_at(self, unsigned int index) - cpdef int put(self, unsigned long long timestamp, bytes data) except -1 + cpdef int append(self, unsigned long long timestamp, bytes data) except -1 cpdef int sync(self) except -1 cdef inline int length(self): return self.entries diff --git a/tempsdb/chunks.pyx b/tempsdb/chunks.pyx index 0514bdb..a8cbb94 100644 --- a/tempsdb/chunks.pyx +++ b/tempsdb/chunks.pyx @@ -6,8 +6,8 @@ import mmap from .exceptions import Corruption, InvalidState, AlreadyExists from .series cimport TimeSeries -STRUCT_L = struct.Struct('>L') -STRUCT_Q = struct.Struct('>Q') +STRUCT_L = struct.Struct('<L') +STRUCT_Q = struct.Struct('<Q') DEF HEADER_SIZE = 4 DEF TIMESTAMP_SIZE = 8 @@ -16,8 +16,11 @@ cdef class Chunk: """ Represents a single chunk of time series. - This also implements an iterator interface, and will iterate with tp.Tuple[int, bytes]. + This also implements an iterator interface, and will iterate with tp.Tuple[int, bytes], + as well as a sequence protocol + :param parent: parent time series + :type parent: tp.Optional[TimeSeries] :param path: path to the chunk file :type path: str @@ -49,13 +52,20 @@ cdef class Chunk: raise Corruption(f'Empty chunk file!') try: self.block_size, = STRUCT_L.unpack(self.mmap[:HEADER_SIZE]) + self.block_size_plus = self.block_size + TIMESTAMP_SIZE except struct.error: self.close() raise Corruption('Could not read the header of the chunk file %s' % (self.path, )) - self.entries = (file_size-HEADER_SIZE) // (self.block_size+TIMESTAMP_SIZE) - self.max_ts, = STRUCT_Q.unpack(self.mmap[-TIMESTAMP_SIZE-self.block_size:-self.block_size]) + self.entries = (file_size-HEADER_SIZE) // (self.block_size_plus) + self.max_ts, = STRUCT_Q.unpack(self.mmap[file_size-self.block_size_plus:file_size-self.block_size]) self.min_ts, = STRUCT_Q.unpack(self.mmap[HEADER_SIZE:HEADER_SIZE+TIMESTAMP_SIZE]) + def __getitem__(self, index: tp.Union[int, slice]): + if isinstance(index, slice): + return self.iterate_range(index.start, index.stop) + else: + return self.get_piece_at(index) + cpdef int sync(self) except -1: """ Synchronize the mmap @@ -63,7 +73,7 @@ cdef class Chunk: self.mmap.flush() return 0 - cpdef int put(self, unsigned long long timestamp, bytes data) except -1: + cpdef int append(self, unsigned long long timestamp, bytes data) except -1: """ Append a record to this chunk @@ -80,10 +90,10 @@ cdef class Chunk: raise ValueError('data not equal in length to block size!') if timestamp <= self.max_ts: raise ValueError('invalid timestamp') - cdef unsigned long long pointer_at_end = (self.entries+1)*(TIMESTAMP_SIZE+self.block_size) + HEADER_SIZE + cdef unsigned long long pointer_at_end = (self.entries+1)*self.block_size_plus + HEADER_SIZE with self.write_lock: self.mmap.resize(pointer_at_end) - self.mmap[pointer_at_end-self.block_size-TIMESTAMP_SIZE:pointer_at_end-self.block_size] = STRUCT_Q.pack(timestamp) + self.mmap[pointer_at_end-self.block_size_plus:pointer_at_end-self.block_size] = STRUCT_Q.pack(timestamp) self.mmap[pointer_at_end-self.block_size:pointer_at_end] = data self.entries += 1 self.max_ts = timestamp @@ -141,8 +151,8 @@ cdef class Chunk: if index >= self.entries: raise IndexError('Index too large') cdef: - unsigned long starting_index = HEADER_SIZE + index * (self.block_size+TIMESTAMP_SIZE) - unsigned long stopping_index = starting_index + self.block_size+TIMESTAMP_SIZE + unsigned long starting_index = HEADER_SIZE + index * self.block_size_plus + unsigned long stopping_index = starting_index + self.block_size_plus unsigned long long ts = STRUCT_Q.unpack( self.mmap[starting_index:starting_index+TIMESTAMP_SIZE])[0] return ts, self.mmap[starting_index+TIMESTAMP_SIZE:stopping_index] diff --git a/tempsdb/series.pxd b/tempsdb/series.pxd index da4f3db..66e9084 100644 --- a/tempsdb/series.pxd +++ b/tempsdb/series.pxd @@ -5,7 +5,7 @@ cdef class TimeSeries: cdef: bint closed object lock, fopen_lock - str path + readonly str path unsigned int max_entries_per_chunk readonly unsigned long long last_entry_synced readonly unsigned int block_size @@ -20,7 +20,7 @@ cdef class TimeSeries: cpdef void close(self) cpdef Chunk open_chunk(self, unsigned long long name) cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1 - cpdef int put(self, unsigned long long timestamp, bytes data) except -1 + cpdef int append(self, unsigned long long timestamp, bytes data) except -1 cpdef int sync(self) except -1 cpdef TimeSeries create_series(str path, unsigned int block_size, int max_entries_per_chunk) diff --git a/tempsdb/series.pyx b/tempsdb/series.pyx index c034488..135ae6e 100644 --- a/tempsdb/series.pyx +++ b/tempsdb/series.pyx @@ -17,7 +17,8 @@ cdef class TimeSeries: :ivar last_entry_ts: timestamp of the last entry added or 0 if no entries yet (int) :ivar last_entry_synced: timestamp of the last synchronized entry (int) - :ivar block_size: size of the writable block of data + :ivar block_size: size of the writable block of data (int) + :ivar path: path to the directory containing the series (str) """ def __init__(self, path: str): self.lock = threading.Lock() @@ -120,11 +121,13 @@ cdef class TimeSeries: """ if self.closed: raise InvalidState('series is closed') - cdef: - unsigned long long min_ts = self.data_in_memory[0][0] - str path = os.path.join(self.path, str(min_ts)) + with self.lock, open(os.path.join(self.path, METADATA_FILE_NAME), 'w') as f_out: ujson.dump(self._get_metadata(), f_out) + + if self.last_chunk: + self.last_chunk.sync() + return 0 cdef dict _get_metadata(self): @@ -134,7 +137,7 @@ cdef class TimeSeries: 'last_entry_synced': self.last_entry_synced } - cpdef int put(self, unsigned long long timestamp, bytes data) except -1: + cpdef int append(self, unsigned long long timestamp, bytes data) except -1: """ Append an entry. @@ -162,7 +165,7 @@ cdef class TimeSeries: [(timestamp, data)]) self.chunks.append(timestamp) else: - self.last_chunk.put(timestamp, data) + self.last_chunk.append(timestamp, data) self.last_entry_ts = timestamp diff --git a/tests/test_db.py b/tests/test_db.py index 7c5bbd7..5127d5b 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -20,6 +20,6 @@ class TestDB(unittest.TestCase): self.assertEqual(chunk.get_piece_at(2), (4, b'kota')) self.assertEqual(len(chunk), 3) self.assertEqual(list(iter(chunk)), data) - chunk.put(5, b'test') + chunk.append(5, b'test') chunk.close() self.assertEqual(os.path.getsize('chunk.db'), 4+4*12) -- GitLab