diff --git a/README.md b/README.md index 2767752af0ddc8d4d41d58060370289cfc748d72..59d2bff1cbde6c18b956d7febb7d51706bdaecf8 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,8 @@ So no variable encoding for you! * added `get_first_entry_for` * added `close_all_open_series` * added `TimeSeries.name` +* added option to use descriptor based access instead of mmap +* added `TimeSeries.open_chunks_ram_size` ## v0.1 diff --git a/docs/index.rst b/docs/index.rst index 9112bd49d0eb89fd49ed57924298569bbfc0710c..f0f6d89ccb98230cfa7a8311e2af85ac2cdf7e64 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -19,6 +19,10 @@ iterators. Stored time series with a 8-bit timestamp and a fixed length of data. So no variable encoding for you! +.. versionadded:: 0.2 + +When mmap fails due to memory issues, this falls back to slower fwrite()/fread() implementation. +You can also manually select the descriptor-based implementation if you want to. Indices and tables ================== diff --git a/setup.py b/setup.py index b1d66c22de8942c31e8d0c1c72e743323ca30d2f..dfb59d010c14343ba91563347d922ed468b1e0a9 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ def find_pyx(*path) -> tp.List[str]: # setup(name='tempsdb', - version='0.2_a6', + version='0.2_a7', packages=['tempsdb'], install_requires=['satella>=2.14.21', 'ujson'], ext_modules=build([Multibuild('tempsdb', find_pyx('tempsdb')), ], diff --git a/tempsdb/chunks.pxd b/tempsdb/chunks.pxd index b9d02b9c0e61c08e450dde7f5599283f69119be6..321881738589340fd1fb4441c61bf0de8ceddae7 100644 --- a/tempsdb/chunks.pxd +++ b/tempsdb/chunks.pxd @@ -1,6 +1,12 @@ from .series cimport TimeSeries +cdef class AlternativeMMap: + cdef: + object io + unsigned long size + + cdef class Chunk: cdef: TimeSeries parent @@ -25,6 +31,7 @@ cdef class Chunk: cpdef unsigned int find_right(self, unsigned long long timestamp) cdef int extend(self) except -1 cpdef int delete(self) except -1 + cpdef int switch_to_descriptor_based_access(self) except -1 cdef inline unsigned long long name(self): """ @@ -44,4 +51,5 @@ cdef class Chunk: cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timestamp, - bytes data, int page_size) + bytes data, int page_size, + bint descriptor_based_access=*) diff --git a/tempsdb/chunks.pyx b/tempsdb/chunks.pyx index 794ce5263ffe3acdc4e6c669b53528c8468124e2..3adc96b1efd6d0a1e0514afcf2e40e9f25aad9fb 100644 --- a/tempsdb/chunks.pyx +++ b/tempsdb/chunks.pyx @@ -1,3 +1,4 @@ +import io import os import typing as tp import struct @@ -13,17 +14,58 @@ STRUCT_L = struct.Struct('<L') STRUCT_LQ = struct.Struct('<LQ') +cdef class AlternativeMMap: + """ + An alternative mmap implementation used when mmap cannot allocate due to memory issues + """ + def flush(self): + self.io.flush() + + def madvise(self, a, b, c): + ... + + def resize(self, file_size: int): + self.size = file_size + + def __init__(self, io_file: io.BinaryIO): + self.io = io_file + self.io.seek(0, 2) + self.size = self.io.tell() + + def __getitem__(self, item: slice) -> bytes: + cdef: + unsigned long start = item.start + unsigned long stop = item.stop + + self.io.seek(start, 0) + return self.io.read(stop-start) + + def __setitem__(self, key: slice, value: bytes): + cdef: + unsigned long start = key.start + + self.io.seek(start, 0) + self.io.write(value) + + def close(self): + pass + + cdef class Chunk: """ Represents a single chunk of time series. This also implements an iterator interface, and will iterate with tp.Tuple[int, bytes], - as well as a sequence protocol + as well as a sequence protocol. + + This will try to mmap opened files, but if mmap fails due to not enough memory this + will use descriptor-based access. :param parent: parent time series :type parent: tp.Optional[TimeSeries] :param path: path to the chunk file :type path: str + :param use_descriptor_access: whether to use descriptor based access instead of mmap :ivar path: path to the chunk (str) :ivar min_ts: timestamp of the first entry stored (int) @@ -32,7 +74,16 @@ cdef class Chunk: :ivar entries: amount of entries in this chunk (int) :ivar page_size: size of the page (int) """ - def __init__(self, parent: tp.Optional[TimeSeries], path: str, page_size: int): + cpdef int switch_to_descriptor_based_access(self) except -1: + """ + Switch self to descriptor-based access instead of mmap + """ + self.mmap.close() + self.mmap = AlternativeMMap(self.file) + return 0 + + def __init__(self, parent: tp.Optional[TimeSeries], path: str, page_size: int, + use_descriptor_access: bool = False): cdef bytes b self.file_size = os.path.getsize(path) self.page_size = page_size @@ -40,12 +91,20 @@ cdef class Chunk: self.closed = False self.path = path self.file = open(self.path, 'rb+') - try: - self.mmap = mmap.mmap(self.file.fileno(), 0) - except OSError as e: - self.file.close() - self.closed = True - raise Corruption(f'Empty chunk file!') + + if use_descriptor_access: + self.mmap = AlternativeMMap(self.file) + else: + try: + self.mmap = mmap.mmap(self.file.fileno(), 0) + except OSError as e: + if e.errno == 12: # Cannot allocate memory + self.mmap = AlternativeMMap(self.file) + else: + self.file.close() + self.closed = True + raise Corruption(f'Failed to mmap chunk file: {e}') + try: self.block_size, self.min_ts = STRUCT_LQ.unpack(self.mmap[0:HEADER_SIZE+TIMESTAMP_SIZE]) self.block_size_plus = self.block_size + TIMESTAMP_SIZE @@ -251,7 +310,7 @@ cdef class Chunk: cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timestamp, - bytes data, int page_size): + bytes data, int page_size, bint descriptor_based_access=False): """ Creates a new chunk on disk @@ -263,8 +322,11 @@ cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timesta :type timestamp: int :param data: data of the first entry :type data: bytes - :param page_size: size of a single page for storage + :param page_size: size of a single page for storage :type page_size: int + :param descriptor_based_access: whether to use descriptor based access instead of mmap. + Default is False + :type descriptor_based_access: bool :raises ValueError: entries in data were not of equal size, or data was empty or data was not sorted by timestamp or same timestamp appeared twice :raises AlreadyExists: chunk already exists @@ -295,5 +357,5 @@ cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timesta footer[-4:] = b'\x01\x00\x00\x00' # 1 in little endian file.write(footer) file.close() - return Chunk(parent, path, page_size) + return Chunk(parent, path, page_size, ) diff --git a/tempsdb/database.pxd b/tempsdb/database.pxd index d3701ca708dd6e51b53913326de1d4145ed7a3ad..41418cf2be66457bbbdb2ee2dfa2885f47e8a8a0 100644 --- a/tempsdb/database.pxd +++ b/tempsdb/database.pxd @@ -9,11 +9,13 @@ cdef class Database: object mpm cpdef int close(self) except -1 - cpdef TimeSeries get_series(self, str name) + cpdef TimeSeries get_series(self, str name, + bint use_descriptor_based_access=*) cpdef int register_memory_pressure_manager(self, object mpm) except -1 cpdef TimeSeries create_series(self, str name, int block_size, unsigned long entries_per_chunk, - int page_size=*) + int page_size=*, + bint use_descriptor_based_access=*) cpdef list get_open_series(self) cpdef list get_all_series(self) cpdef int close_all_open_series(self) except -1 diff --git a/tempsdb/database.pyx b/tempsdb/database.pyx index c7714b72572ccb922114cc4701df361f44f7aac2..58e0bbf9f9e3f94c8eb55c1ad4121f1c9ff1d1fd 100644 --- a/tempsdb/database.pyx +++ b/tempsdb/database.pyx @@ -44,12 +44,18 @@ cdef class Database: output.append(series) return series - cpdef TimeSeries get_series(self, name: str): + cpdef TimeSeries get_series(self, name: str, bint use_descriptor_based_access = False): """ Load and return an existing series :param name: name of the series :type name: str + + .. versionadded:: 0.2 + + :param use_descriptor_based_access: whether to use descriptor based access instead of mmap, + default is False + :type use_descriptor_based_access: bool :return: a loaded time series :rtype: TimeSeries :raises DoesNotExist: series does not exist @@ -70,7 +76,8 @@ cdef class Database: return self.open_series[name] if not os.path.isdir(path): raise DoesNotExist('series %s does not exist' % (name, )) - self.open_series[name] = result = TimeSeries(path, name) + self.open_series[name] = result = TimeSeries(path, name, + use_descriptor_based_access=use_descriptor_based_access) if self.mpm is not None: result.register_memory_pressure_manager(self.mpm) return result @@ -106,7 +113,6 @@ cdef class Database: raise DoesNotExist('series does not exist') cdef: unsigned long long minimum_ts = 0xFFFFFFFFFFFFFFFF - str name list files = os.listdir(path) unsigned long long candidate_ts if len(files) == 1: @@ -133,7 +139,8 @@ cdef class Database: cpdef TimeSeries create_series(self, str name, int block_size, unsigned long entries_per_chunk, - int page_size=4096): + int page_size=4096, + bint use_descriptor_based_access=False): """ Create a new series @@ -143,8 +150,14 @@ cdef class Database: :type block_size: int :param entries_per_chunk: entries per chunk file :type entries_per_chunk: int - :param page_size: size of a single page + :param page_size: size of a single page. Default is 4096 :type page_size: int + + .. versionadded:: 0.2 + + :param use_descriptor_based_access: whether to use descriptor based access instead of mmap. + Default is False + :type use_descriptor_based_access: bool :return: new series :rtype: TimeSeries :raises ValueError: block size was larger than page_size plus a timestamp @@ -156,7 +169,8 @@ cdef class Database: raise AlreadyExists('Series already exists') cdef TimeSeries series = create_series(os.path.join(self.name, name), name, block_size, - entries_per_chunk, page_size=page_size) + entries_per_chunk, page_size=page_size, + use_descriptor_based_access=use_descriptor_based_access) self.open_series[name] = series return series diff --git a/tempsdb/series.pxd b/tempsdb/series.pxd index e4f9b8f292e946ba96a4f3f01605053a9d021c67..57eda5d77c5bac55eebcd4482f6886be03cf825c 100644 --- a/tempsdb/series.pxd +++ b/tempsdb/series.pxd @@ -14,6 +14,7 @@ cdef class TimeSeries: readonly unsigned int block_size readonly unsigned long long last_entry_ts unsigned int page_size + bint descriptor_based_access list chunks dict refs_chunks # type: tp.Dict[int, int] dict open_chunks # type: tp.Dict[int, Chunk] @@ -35,9 +36,11 @@ cdef class TimeSeries: cpdef Iterator iterate_range(self, unsigned long long start, unsigned long long stop) cdef unsigned int get_index_of_chunk_for(self, unsigned long long timestamp) cpdef int trim(self, unsigned long long timestamp) except -1 + cpdef unsigned long open_chunks_ram_size(self) cdef inline int get_references_for(self, unsigned long long timestamp): return self.refs_chunks.get(timestamp, 0) cpdef TimeSeries create_series(str path, str name, unsigned int block_size, - int max_entries_per_chunk, int page_size=4096): + int max_entries_per_chunk, int page_size=*, + bint use_descriptor_based_access=*) diff --git a/tempsdb/series.pyx b/tempsdb/series.pyx index 326d318394e21c2477428fa2bbfe06498460b6ec..3bcff38da47c8d2b758cae233a77512dd8bfa2c9 100644 --- a/tempsdb/series.pyx +++ b/tempsdb/series.pyx @@ -10,9 +10,6 @@ import os DEF METADATA_FILE_NAME = 'metadata.txt' - - - cdef class TimeSeries: """ This is thread-safe @@ -26,7 +23,8 @@ cdef class TimeSeries: :ivar name: name of the series (str) """ - def __init__(self, path: str, name: str): + def __init__(self, path: str, name: str, use_descriptor_based_access: bool = False): + self.descriptor_based_access = use_descriptor_based_access self.mpm = None self.name = name self.lock = threading.RLock() @@ -114,7 +112,8 @@ cdef class TimeSeries: if name not in self.open_chunks: self.open_chunks[name] = chunk = Chunk(self, os.path.join(self.path, str(name)), - self.page_size) + self.page_size, + use_descriptor_access=self.descriptor_based_access) else: chunk = self.open_chunks[name] self.incref_chunk(name) @@ -339,7 +338,8 @@ cdef class TimeSeries: if self.last_chunk is not None: self.decref_chunk(self.last_chunk.name()) self.last_chunk = create_chunk(self, os.path.join(self.path, str(timestamp)), - timestamp, data, self.page_size) + timestamp, data, self.page_size, + descriptor_based_access=self.descriptor_based_access) self.open_chunks[timestamp] = self.last_chunk self.incref_chunk(timestamp) self.chunks.append(timestamp) @@ -360,9 +360,23 @@ cdef class TimeSeries: self.close() shutil.rmtree(self.path) + cpdef unsigned long open_chunks_ram_size(self): + """ + .. versionadded:: 0.2 + + :return: how much RAM do the opened chunks consume? + :rtype: int + """ + cdef: + unsigned long ram = 0 + Chunk chunk + for chunk in self.open_chunks.values(): + ram += chunk.file_size + return ram cpdef TimeSeries create_series(str path, str name, unsigned int block_size, - int max_entries_per_chunk, int page_size=4096): + int max_entries_per_chunk, int page_size=4096, + bint use_descriptor_based_access=False): if os.path.exists(path): raise AlreadyExists('This series already exists!') diff --git a/tests/test_db.py b/tests/test_db.py index d669e7415b65338cae9e9084c798c082bc36fc08..29f19e95893654d31fa4a61e1e2ecf696c8bcd78 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -1,6 +1,8 @@ import os import unittest +from tempsdb.chunks import Chunk + class TestDB(unittest.TestCase): def test_write_series(self): @@ -39,6 +41,40 @@ class TestDB(unittest.TestCase): self.assertGreaterEqual(items[0][0], start) self.assertLessEqual(items[-1][0], stop) + def test_chunk_alternative(self): + from tempsdb.chunks import create_chunk + data = [(0, b'ala '), (1, b'ma '), (4, b'kota')] + chunk = create_chunk(None, 'chunk_a.db', 0, b'ala ', 4096) + chunk.close() + chunk = Chunk(None, 'chunk_a.db', 4096, use_descriptor_access=True) + chunk.append(1, b'ma ') + chunk.append(4, b'kota') + self.assertEqual(chunk.min_ts, 0) + self.assertEqual(chunk.max_ts, 4) + self.assertEqual(chunk.block_size, 4) + self.assertEqual(chunk[0], (0, b'ala ')) + self.assertEqual(chunk[1], (1, b'ma ')) + self.assertEqual(chunk[2], (4, b'kota')) + self.assertEqual(len(chunk), 3) + self.assertEqual(list(iter(chunk)), data) + chunk.append(5, b'test') + self.assertEqual(chunk.find_left(0), 0) + self.assertEqual(chunk.find_left(1), 1) + self.assertEqual(chunk.find_left(2), 2) + self.assertEqual(chunk.find_left(3), 2) + self.assertEqual(chunk.find_left(4), 2) + self.assertEqual(chunk.find_left(5), 3) + self.assertEqual(chunk.find_left(6), 4) + self.assertEqual(chunk.find_right(0), 1) + self.assertEqual(chunk.find_right(1), 2) + self.assertEqual(chunk.find_right(2), 2) + self.assertEqual(chunk.find_right(3), 2) + self.assertEqual(chunk.find_right(4), 3) + self.assertEqual(chunk.find_right(5), 4) + self.assertEqual(chunk.find_right(6), 4) + chunk.close() + self.assertEqual(os.path.getsize('chunk.db'), 8192) + def test_chunk(self): from tempsdb.chunks import create_chunk data = [(0, b'ala '), (1, b'ma '), (4, b'kota')]