diff --git a/docs/chunks.rst b/docs/chunks.rst new file mode 100644 index 0000000000000000000000000000000000000000..cdf78ebc3b96e90dd9f89ec36bcc67e07e7fbd76 --- /dev/null +++ b/docs/chunks.rst @@ -0,0 +1,8 @@ +Chunk +===== + +For your convenience the class :class:`~tempsdb.chunks.Chunk` was also documented, but don't use +it directly: + +.. autoclass:: tempsdb.chunks.Chunk + :members: diff --git a/docs/index.rst b/docs/index.rst index 0f61f694894872a5d950f48289261a7e5052b824..d4621d42f2fcf0c67e5541b1f081bc9ac0242cbe 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -12,8 +12,9 @@ Welcome to tempsdb's documentation! usage exceptions + chunks -It tries to use mmap for reads where possible, and in general be as zero-copy as possible (ie. the +It tries to use mmap for reads and writes, and in general is as zero-copy as possible (ie. the only time data is unserialized is when a particular entry is read). Stored time series with a 8-bit timestamp and a fixed length of data. diff --git a/docs/usage.rst b/docs/usage.rst index 3daca865753310716f502c4dadbb2d75d9506bd9..fdc95401290cee7f69ce1d9db3ca5a6737ad28e5 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -14,4 +14,3 @@ Then you can create and retrieve particular series: .. autoclass:: tempsdb.series.TimeSeries :members: - diff --git a/tempsdb/chunks.pxd b/tempsdb/chunks.pxd index 94aac247a42cdcccc7a23ae14626f973c9460707..d5a1eaf6184845ee622c54e2d5505e73c367c903 100644 --- a/tempsdb/chunks.pxd +++ b/tempsdb/chunks.pxd @@ -18,6 +18,7 @@ cdef class Chunk: cpdef void close(self) cpdef tuple get_piece_at(self, unsigned int index) cpdef int put(self, unsigned long long timestamp, bytes data) except -1 + cpdef int sync(self) except -1 cdef inline int length(self): return self.entries diff --git a/tempsdb/chunks.pyx b/tempsdb/chunks.pyx index 275d11a0de410d1ba2484192186999c16a87077a..0514bdb77eeb3b3a7344cb29dca8e2ef21c66ee7 100644 --- a/tempsdb/chunks.pyx +++ b/tempsdb/chunks.pyx @@ -56,6 +56,13 @@ cdef class Chunk: self.max_ts, = STRUCT_Q.unpack(self.mmap[-TIMESTAMP_SIZE-self.block_size:-self.block_size]) self.min_ts, = STRUCT_Q.unpack(self.mmap[HEADER_SIZE:HEADER_SIZE+TIMESTAMP_SIZE]) + cpdef int sync(self) except -1: + """ + Synchronize the mmap + """ + self.mmap.flush() + return 0 + cpdef int put(self, unsigned long long timestamp, bytes data) except -1: """ Append a record to this chunk @@ -73,14 +80,11 @@ cdef class Chunk: raise ValueError('data not equal in length to block size!') if timestamp <= self.max_ts: raise ValueError('invalid timestamp') - - cdef bytearray data_to_write = bytearray(TIMESTAMP_SIZE+self.block_size) - data_to_write[0:TIMESTAMP_SIZE] = STRUCT_Q.pack(timestamp) - data_to_write[TIMESTAMP_SIZE:] = data + cdef unsigned long long pointer_at_end = (self.entries+1)*(TIMESTAMP_SIZE+self.block_size) + HEADER_SIZE with self.write_lock: - self.file.seek(0, 2) - self.file.write(data_to_write) - self.mmap.resize((self.entries+1)*(8+self.block_size)+HEADER_SIZE) + self.mmap.resize(pointer_at_end) + self.mmap[pointer_at_end-self.block_size-TIMESTAMP_SIZE:pointer_at_end-self.block_size] = STRUCT_Q.pack(timestamp) + self.mmap[pointer_at_end-self.block_size:pointer_at_end] = data self.entries += 1 self.max_ts = timestamp return 0 @@ -104,7 +108,7 @@ cdef class Chunk: yield self.get_piece_at(i) def __iter__(self) -> tp.Iterator[tp.Tuple[int, bytes]]: - cdef unsigned long i = 0 + cdef int i for i in range(self.entries): yield self.get_piece_at(i)