From 730dc4bd875db0f6613790237e7d7b40ddf5e810 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Mon, 14 Dec 2020 17:39:27 +0100 Subject: [PATCH] properly merged the files --- README.md | 2 + docs/chunks.rst | 36 +++++- tempsdb/chunks/base.pxd | 1 + tempsdb/chunks/base.pyx | 108 ++++++++++------- tempsdb/chunks/direct.pyx | 9 +- tempsdb/chunks/maker.pyx | 5 +- tempsdb/chunks/normal.pxd | 50 +------- tempsdb/chunks/normal.pyx | 245 +------------------------------------- tempsdb/series.pyx | 5 +- tests/test_db.py | 10 +- 10 files changed, 126 insertions(+), 345 deletions(-) diff --git a/README.md b/README.md index 633ee56..92f62b3 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,8 @@ Then copy your resulting wheel and install it via pip on the target system. ## v0.5 +* if mmap is used, the kernel will be informed after loading the chunk that we + don't need it's memory right now * both `Database`, `TimeSeries` and `Chunk` destructor will close and emit a warning if the user forgot to * if page_size is default, it won't be written as part of the metadata diff --git a/docs/chunks.rst b/docs/chunks.rst index 14bb6e2..d45a93c 100644 --- a/docs/chunks.rst +++ b/docs/chunks.rst @@ -1,16 +1,34 @@ Chunk ===== -For your convenience the class :class:`~tempsdb.chunks.Chunk` was also documented, but don't use +.. versionadded:: 0.5 + +There are two kinds of chunk - a "normal" chunk and "direct" chunk. + +The difference is that a normal chunk will preallocate a page ahead, in order for writes to be fast, +while direct chunk will write only as much data as is strictly required. + +Only "direct" chunks are capable to be gzipped, since one page is preallocated for normal chunk, which +would prevent modifications made post-factum to it. + +For your convenience the class :class:`~tempsdb.chunks.base.Chunk` was also documented, but don't use it directly: -.. autoclass:: tempsdb.chunks.Chunk +.. autoclass:: tempsdb.chunks.base.Chunk :members: Data stored in files is little endian. +A way to tell which chunk are we dealing with is to look at it's extension. +Chunks that have: + +* no extension - are normal chunks +* `.direct` extension - are direct chunks +* `.gz` extension - are direct and gzipped chunks -A file storing a chunk consists as follows: +Normal chunk +------------ +A file storing a normal chunk consists as follows: * 4 bytes unsigned int - block size * repeated @@ -18,3 +36,15 @@ A file storing a chunk consists as follows: * block_size bytes of data It's padded to `page_size` with zeros, and four last bytes is the `unsigned long` amount of entries + +Direct chunk +------------ +A file storing a direct chunk consists as follows: + +* 4 bytes unsigned int - block size +* repeated + * 8 bytes unsigned long long - timestamp + * block_size bytes of data + +Note that a direct chunk will be able to be gzipped. If it's file name ends with .gz, then it's +a direct chunk which is gzipped. diff --git a/tempsdb/chunks/base.pxd b/tempsdb/chunks/base.pxd index a220b7c..83ccb8a 100644 --- a/tempsdb/chunks/base.pxd +++ b/tempsdb/chunks/base.pxd @@ -38,6 +38,7 @@ cdef class Chunk: cpdef int append(self, unsigned long long timestamp, bytes data) except -1 cpdef int delete(self) except -1 cpdef int switch_to_descriptor_based_access(self) except -1 + cpdef int switch_to_mmap_based_access(self) except -1 cpdef unsigned long get_mmap_size(self) cdef void incref(self) cdef int decref(self) except -1 diff --git a/tempsdb/chunks/base.pyx b/tempsdb/chunks/base.pyx index 6e23429..2a69104 100644 --- a/tempsdb/chunks/base.pyx +++ b/tempsdb/chunks/base.pyx @@ -4,6 +4,7 @@ import threading import typing as tp import struct import mmap +import warnings from .gzip cimport ReadWriteGzipFile from ..exceptions import Corruption, InvalidState, AlreadyExists, StillOpen @@ -43,25 +44,35 @@ cdef class AlternativeMMap: self.size = self.io.tell() self.file_lock_object = file_lock_object - def __getitem__(self, item: slice) -> bytes: + def __getitem__(self, item: tp.Union[int, slice]) -> tp.Union[int, bytes]: cdef: unsigned long start = item.start unsigned long stop = item.stop bytes b - with self.file_lock_object: - if start != self.io.tell(): + if isinstance(item, int): + self.io.seek(item, 0) + b = self.io.read(1) + return b[0] + else: + start = item.start + stop = item.stop self.io.seek(start, 0) - return self.io.read(stop-start) + return self.io.read(stop-start) - def __setitem__(self, key: slice, value: bytes): + def __setitem__(self, key: tp.Union[int, slice], value: tp.Union[int, bytes]) -> None: cdef: unsigned long start = key.start + if isinstance(key, int): + self[key:key+1] = bytes([value]) + else: + with self.file_lock_object: + if not isinstance(self.io, ReadWriteGzipFile): + self.io.seek(start, 0) + self.io.write(value) - with self.file_lock_object: - if not isinstance(self.io, ReadWriteGzipFile): - self.io.seek(0, 2) - self.io.write(value) + def close(self): + pass def close(self): pass @@ -102,6 +113,30 @@ cdef class Chunk: else: return self.file_size + cpdef int switch_to_mmap_based_access(self) except -1: + """ + Switch self to mmap-based access instead of descriptor-based. + + No-op if already in mmap mode. + + :raises Corruption: unable to mmap file due to an unrecoverable error + """ + if isinstance(self.mmap, AlternativeMMap): + try: + self.mmap = mmap.mmap(self.file.fileno(), 0) + self.file_lock_object = None + except OSError as e: + if e.errno in (11, # EAGAIN - memory is too low + 12, # ENOMEM - no memory space available + 19, # ENODEV - fs does not support mmapping + 75): # EOVERFLOW - too many pages would have been used + pass + else: + self.file.close() + self.closed = True + raise Corruption(f'Failed to mmap chunk file: {e}') + return 0 + cpdef int switch_to_descriptor_based_access(self) except -1: """ Switch self to descriptor-based access instead of mmap. @@ -172,6 +207,10 @@ cdef class Chunk: self.after_init() + if self.pointer >= self.page_size: + # Inform the OS that we don't need the header anymore + self.mmap.madvise(mmap.MADV_DONTNEED, 0, HEADER_SIZE+TIMESTAMP_SIZE) + cpdef int get_byte_of_piece(self, unsigned int index, int byte_index) except -1: """ Return a particular byte of given element at given index. @@ -291,22 +330,8 @@ cdef class Chunk: return 0 cpdef int extend(self) except -1: - """ - Adds PAGE_SIZE bytes to this file - """ - cdef bytearray ba - if self.file_lock_object: - self.file_lock_object.acquire() - try: - self.mmap.seek(self.file_size) - self.file_size += self.page_size - ba = bytearray(self.page_size) - ba[self.page_size-FOOTER_SIZE:self.page_size] = STRUCT_L.pack(self.entries) - self.file.write(ba) - self.mmap.resize(self.file_size) - finally: - if self.file_lock_object: - self.file_lock_object.release() + raise NotImplementedError('Abstract method!') + return 0 cpdef int delete(self) except -1: """ @@ -332,21 +357,7 @@ cdef class Chunk: :param data: data to write :raises InvalidState: chunk is closed """ - if self.closed: - raise InvalidState('chunk is closed') - - if self.pointer >= self.file_size-FOOTER_SIZE-self.block_size_plus: - self.extend() - cdef unsigned long long ptr_end = self.pointer + TIMESTAMP_SIZE - # Append entry - cdef bytes b = STRUCT_Q.pack(timestamp) + data - self.mmap[self.pointer:ptr_end+self.block_size] = b - self.entries += 1 - # Update entries count - self.mmap[self.file_size-FOOTER_SIZE:self.file_size] = STRUCT_L.pack(self.entries) - # Update properties - self.max_ts = timestamp - self.pointer += self.block_size_plus + raise NotImplementedError('Abstract method!') return 0 cpdef object iterate_indices(self, unsigned long starting_entry, unsigned long stopping_entry): @@ -398,21 +409,29 @@ cdef class Chunk: cpdef int close(self, bint force=False) except -1: """ Close the chunk and close the allocated resources + + :param force: whether to close the chunk even if it's open somewhere + :raises StillOpen: this chunk has a parent attached and the parent + says that this chunk is still being referred to """ if self.closed: return 0 cdef unsigned long long name = self.name() if self.parent: with self.parent.open_lock: - if self.parent.get_references_for(name) and not force: - raise StillOpen('chunk still open!') + if not force and self.parent.get_references_for(name) > 0: + raise StillOpen('this chunk is opened') + del self.parent.refs_chunks[name] del self.parent.open_chunks[name] self.parent = None self.mmap.close() self.file.close() return 0 - def __del__(self): + def __del__(self) -> None: + if self.closed: + return + warnings.warn('You forgot to close a Chunk') self.close() cdef tuple get_piece_at(self, unsigned int index): @@ -423,7 +442,8 @@ cdef class Chunk: :rtype: tp.Tuple[int, bytes] """ if index >= self.entries: - raise IndexError('Index too large') + raise IndexError('Index too large, got %s while max entries is %s' % (index, + self.entries)) cdef: unsigned long starting_index = HEADER_SIZE + index * self.block_size_plus unsigned long stopping_index = starting_index + self.block_size_plus diff --git a/tempsdb/chunks/direct.pyx b/tempsdb/chunks/direct.pyx index 90c2d4c..763804e 100644 --- a/tempsdb/chunks/direct.pyx +++ b/tempsdb/chunks/direct.pyx @@ -1,4 +1,3 @@ -import gzip import os import typing as tp import struct @@ -69,8 +68,11 @@ cdef class DirectChunk(Chunk): return 0 cpdef int after_init(self) except -1: - self.file.seek(0, os.SEEK_END) - self.file_size = self.file.tell() + if isinstance(self.file, ReadWriteGzipFile): + self.file_size = self.file.size() + else: + self.file.seek(0, os.SEEK_END) + self.file_size = self.file.tell() self.entries = (self.file_size - HEADER_SIZE) // self.block_size_plus self.pointer = self.file_size d = (self.file_size - self.block_size) - (self.file_size-self.block_size_plus) @@ -90,6 +92,7 @@ cdef class DirectChunk(Chunk): self.file.write(b) self.mmap.resize(self.file_size) self.pointer += self.block_size_plus + self.entries += 1 finally: if self.file_lock_object: self.file_lock_object.release() diff --git a/tempsdb/chunks/maker.pyx b/tempsdb/chunks/maker.pyx index e08a833..703858a 100644 --- a/tempsdb/chunks/maker.pyx +++ b/tempsdb/chunks/maker.pyx @@ -4,12 +4,15 @@ import struct from tempsdb.exceptions import AlreadyExists from .base cimport Chunk +from .normal cimport NormalChunk from .direct cimport DirectChunk from ..series cimport TimeSeries + STRUCT_Q = struct.Struct('<Q') STRUCT_L = struct.Struct('<L') + cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timestamp, bytes data, int page_size, bint descriptor_based_access=False, bint use_direct_mode = False, int gzip_compression_level=0): @@ -76,5 +79,5 @@ cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timesta return DirectChunk(parent, original_path, page_size, use_descriptor_access=descriptor_based_access) else: - return Chunk(parent, original_path, page_size, use_descriptor_access=descriptor_based_access) + return NormalChunk(parent, original_path, page_size, use_descriptor_access=descriptor_based_access) diff --git a/tempsdb/chunks/normal.pxd b/tempsdb/chunks/normal.pxd index abeba72..3d6d836 100644 --- a/tempsdb/chunks/normal.pxd +++ b/tempsdb/chunks/normal.pxd @@ -1,50 +1,6 @@ -from ..series cimport TimeSeries +from .base cimport Chunk -cdef class AlternativeMMap: - cdef: - object io, file_lock_object - unsigned long size - - -cdef class Chunk: - cdef: - TimeSeries parent - readonly str path - readonly unsigned long long min_ts - readonly unsigned long long max_ts - unsigned int block_size_plus # block size plus timestamp length - readonly unsigned int block_size - readonly unsigned long entries - unsigned long file_size - unsigned long pointer # position to write next entry at - readonly unsigned long page_size - object file, mmap, file_lock_object - bint closed - - cpdef int close(self, bint force=*) except -1 +cdef class NormalChunk(Chunk): cpdef int append(self, unsigned long long timestamp, bytes data) except -1 - cdef int sync(self) except -1 - cdef int extend(self) except -1 - cpdef int switch_to_descriptor_based_access(self) except -1 - cpdef int switch_to_mmap_based_access(self) except -1 - cpdef unsigned long get_mmap_size(self) - - cdef inline unsigned long long name(self): - """ - :return: the name of this chunk - :rtype: int - """ - return self.min_ts - - cdef inline int length(self): - """ - :return: amount of entries in this chunk - :rtype: int - """ - return self.entries - - -cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timestamp, - bytes data, int page_size, - bint descriptor_based_access=*) + cpdef int extend(self) except -1 diff --git a/tempsdb/chunks/normal.pyx b/tempsdb/chunks/normal.pyx index acd662c..8339d97 100644 --- a/tempsdb/chunks/normal.pyx +++ b/tempsdb/chunks/normal.pyx @@ -1,72 +1,17 @@ import io -import os -import threading -import typing as tp import struct -import mmap -import warnings from ..exceptions import Corruption, InvalidState, AlreadyExists, StillOpen -from ..series cimport TimeSeries +from .base cimport Chunk DEF HEADER_SIZE = 4 DEF TIMESTAMP_SIZE = 8 DEF FOOTER_SIZE = 4 STRUCT_Q = struct.Struct('<Q') STRUCT_L = struct.Struct('<L') -STRUCT_LQ = struct.Struct('<LQ') -cdef class AlternativeMMap: - """ - An alternative mmap implementation used when mmap cannot allocate due to memory issues - """ - def flush(self): - self.io.flush() - - def madvise(self, a, b, c): - ... - - def resize(self, int file_size): - self.size = file_size - - def __init__(self, io_file: io.BinaryIO, file_lock_object): - self.io = io_file - self.io.seek(0, io.SEEK_END) - self.size = self.io.tell() - self.file_lock_object = file_lock_object - - def __getitem__(self, item: tp.Union[int, slice]) -> tp.Union[int, bytes]: - cdef: - unsigned long start = item.start - unsigned long stop = item.stop - bytes b - with self.file_lock_object: - if isinstance(item, int): - self.io.seek(item, 0) - b = self.io.read(1) - return b[0] - else: - start = item.start - stop = item.stop - self.io.seek(start, 0) - return self.io.read(stop-start) - - def __setitem__(self, key: tp.Union[int, slice], value: tp.Union[int, bytes]) -> None: - cdef: - unsigned long start = key.start - if isinstance(key, int): - self[key:key+1] = bytes([value]) - else: - with self.file_lock_object: - self.io.seek(start, 0) - self.io.write(value) - - def close(self): - pass - - -cdef class Chunk: +cdef class NormalChunk(Chunk): """ Represents a single chunk of time series. @@ -87,111 +32,8 @@ cdef class Chunk: :ivar entries: amount of entries in this chunk (int) :ivar page_size: size of the page (int) """ - cpdef unsigned long get_mmap_size(self): - """ - :return: how many bytes are mmaped? - :rtype: int - """ - if isinstance(self.mmap, AlternativeMMap): - return 0 - else: - return self.file_size - - cpdef int switch_to_mmap_based_access(self) except -1: - """ - Switch self to mmap-based access instead of descriptor-based. - - No-op if already in mmap mode. - - :raises Corruption: unable to mmap file due to an unrecoverable error - """ - if isinstance(self.mmap, AlternativeMMap): - try: - self.mmap = mmap.mmap(self.file.fileno(), 0) - self.file_lock_object = None - except OSError as e: - if e.errno in (11, # EAGAIN - memory is too low - 12, # ENOMEM - no memory space available - 19, # ENODEV - fs does not support mmapping - 75): # EOVERFLOW - too many pages would have been used - pass - else: - self.file.close() - self.closed = True - raise Corruption(f'Failed to mmap chunk file: {e}') - return 0 - cpdef int switch_to_descriptor_based_access(self) except -1: - """ - Switch self to descriptor-based access instead of mmap. - - No-op if already in descriptor based mode. - """ - if isinstance(self.mmap, AlternativeMMap): - return 0 - self.mmap.close() - self.file_lock_object = threading.Lock() - self.mmap = AlternativeMMap(self.file, self.file_lock_object) - return 0 - - def __init__(self, parent: tp.Optional[TimeSeries], path: str, page_size: int, - use_descriptor_access: bool = False): - cdef bytes b - self.file_size = os.path.getsize(path) - self.page_size = page_size - self.parent = parent - self.closed = False - self.path = path - self.file = open(self.path, 'rb+') - self.file_lock_object = None - - if use_descriptor_access: - self.file_lock_object = threading.Lock() - self.mmap = AlternativeMMap(self.file, self.file_lock_object) - else: - try: - self.mmap = mmap.mmap(self.file.fileno(), 0) - except OSError as e: - if e.errno in (11, # EAGAIN - memory is too low - 12, # ENOMEM - no memory space available - 19, # ENODEV - fs does not support mmapping - 75): # EOVERFLOW - too many pages would have been used - self.file_lock_object = threading.Lock() - self.mmap = AlternativeMMap(self.file, self.file_lock_object) - else: - self.file.close() - self.closed = True - raise Corruption(f'Failed to mmap chunk file: {e}') - - try: - self.block_size, self.min_ts = STRUCT_LQ.unpack(self.mmap[0:HEADER_SIZE+TIMESTAMP_SIZE]) - self.block_size_plus = self.block_size + TIMESTAMP_SIZE - except struct.error: - self.close() - raise Corruption('Could not read the header of the chunk file %s' % (self.path, )) - - self.entries, = STRUCT_L.unpack(self.mmap[self.file_size-FOOTER_SIZE:self.file_size]) - self.pointer = self.entries*self.block_size_plus+HEADER_SIZE - self.max_ts = self.get_timestamp_at(self.entries-1) - - if self.pointer >= self.page_size: - # Inform the OS that we don't need the header anymore - self.mmap.madvise(mmap.MADV_DONTNEED, 0, HEADER_SIZE+TIMESTAMP_SIZE) - - def __getitem__(self, index: tp.Union[int, slice]): - if isinstance(index, slice): - return self.iterate_range(index.start, index.stop) - else: - return self.get_piece_at(index) - - cdef int sync(self) except -1: - """ - Synchronize the mmap - """ - self.mmap.flush() - return 0 - - cdef int extend(self) except -1: + cpdef int extend(self) except -1: """ Adds PAGE_SIZE bytes to this file """ @@ -214,6 +56,7 @@ cdef class Chunk: finally: if self.file_lock_object: self.file_lock_object.release() + return 0 cpdef int append(self, unsigned long long timestamp, bytes data) except -1: """ @@ -247,83 +90,3 @@ cdef class Chunk: self.max_ts = timestamp self.pointer += self.block_size_plus return 0 - - def __iter__(self) -> tp.Iterator[tp.Tuple[int, bytes]]: - return self._iterate(0, self.entries) - - def __len__(self): - return self.length() - - cpdef int close(self, bint force=False) except -1: - """ - Close the chunk and close the allocated resources - - :param force: whether to close the chunk even if it's open somewhere - :raises StillOpen: this chunk has a parent attached and the parent - says that this chunk is still being referred to - """ - if self.closed: - return 0 - cdef unsigned long long name = self.name() - if self.parent: - with self.parent.open_lock: - if not force and self.parent.get_references_for(name) > 0: - raise StillOpen('this chunk is opened') - del self.parent.refs_chunks[name] - del self.parent.open_chunks[name] - self.parent = None - self.mmap.close() - self.file.close() - return 0 - - def __del__(self) -> None: - if self.closed: - return - warnings.warn('You forgot to close a Chunk') - self.close() - - -cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timestamp, - bytes data, int page_size, bint descriptor_based_access=False): - """ - Creates a new chunk on disk - - :param parent: parent time series - :param path: path to the new chunk file - :param timestamp: timestamp for first entry to contain - :param data: data of the first entry - :param page_size: size of a single page for storage - :param descriptor_based_access: whether to use descriptor based access instead of mmap. - Default is False - :raises ValueError: entries in data were not of equal size, or data was empty or data - was not sorted by timestamp or same timestamp appeared twice - :raises AlreadyExists: chunk already exists - """ - if os.path.exists(path): - raise AlreadyExists('chunk already exists!') - if not data: - raise ValueError('Data is empty') - file = open(path, 'wb') - cdef: - bytes b - unsigned long long ts - unsigned long block_size = len(data) - unsigned long file_size = 0 - unsigned long long last_ts = 0 - unsigned int entries = 1 - bint first_element = True - file_size += file.write(STRUCT_L.pack(block_size)) - file_size += file.write(STRUCT_Q.pack(timestamp)) - file_size += file.write(data) - - # Pad this thing to page_size - cdef unsigned long bytes_to_pad = page_size - (file_size % page_size) - file.write(b'\x00' * bytes_to_pad) - - # Create a footer at the end - cdef bytearray footer = bytearray(page_size) - footer[-4:] = b'\x01\x00\x00\x00' # 1 in little endian - file.write(footer) - file.close() - return Chunk(parent, path, page_size, use_descriptor_access=descriptor_based_access) - diff --git a/tempsdb/series.pyx b/tempsdb/series.pyx index cab2074..26ee3fd 100644 --- a/tempsdb/series.pyx +++ b/tempsdb/series.pyx @@ -6,8 +6,9 @@ import warnings from satella.json import write_json_to_file, read_json_from_file from .chunks.base cimport Chunk -from .chunks.maker cimport create_chunk +from .chunks.normal cimport NormalChunk from .chunks.direct cimport DirectChunk +from .chunks.maker cimport create_chunk from .exceptions import DoesNotExist, Corruption, InvalidState, AlreadyExists import os @@ -194,7 +195,7 @@ cdef class TimeSeries: use_descriptor_access=True, gzip_compression_level=self.gzip_level if is_gzip else 0) else: - chunk = Chunk(self, + chunk = NormalChunk(self, os.path.join(self.path, str(name)), self.page_size, use_descriptor_access=self.descriptor_based_access) diff --git a/tests/test_db.py b/tests/test_db.py index 76744b6..91716d1 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -1,9 +1,6 @@ import os import unittest -from tempsdb.chunks.base import Chunk -from tempsdb.chunks.maker import create_chunk - class TestDB(unittest.TestCase): def test_write_series(self): @@ -70,10 +67,13 @@ class TestDB(unittest.TestCase): self.assertLessEqual(items[-1][0], stop) def test_chunk_alternative(self): + from tempsdb.chunks.normal import NormalChunk + from tempsdb.chunks.maker import create_chunk + data = [(0, b'ala '), (1, b'ma '), (4, b'kota')] chunk = create_chunk(None, 'chunk_a.db', 0, b'ala ', 4096) chunk.close() - chunk = Chunk(None, 'chunk_a.db', 4096, use_descriptor_access=True) + chunk = NormalChunk(None, 'chunk_a.db', 4096, use_descriptor_access=True) chunk.append(1, b'ma ') chunk.append(4, b'kota') self.assertEqual(chunk.min_ts, 0) @@ -103,6 +103,8 @@ class TestDB(unittest.TestCase): self.assertEqual(os.path.getsize('chunk.db'), 8192) def test_chunk(self): + from tempsdb.chunks.maker import create_chunk + data = [(0, b'ala '), (1, b'ma '), (4, b'kota')] chunk = create_chunk(None, 'chunk.db', 0, b'ala ', 4096) chunk.append(1, b'ma ') -- GitLab