From 78628a411a9369c4a8c4f3c54dc43e28a92a70bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Thu, 10 Dec 2020 19:30:40 +0100 Subject: [PATCH] if page_size is default, it won't be written as part of the metadata --- README.md | 2 +- setup.py | 2 +- tempsdb/chunks.pyx | 32 ++++++------ tempsdb/series.pyx | 115 ++++++++++++++++++++++++-------------------- unittest.Dockerfile | 2 +- 5 files changed, 83 insertions(+), 70 deletions(-) diff --git a/README.md b/README.md index fca4c93..66cb256 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,7 @@ Then copy your resulting wheel and install it via pip on the target system. ## v0.4.5 -* _TBA_ +* if page_size is default, it won't be written as part of the metadata ## v0.4.4 diff --git a/setup.py b/setup.py index 6236fb0..dc3677a 100644 --- a/setup.py +++ b/setup.py @@ -28,7 +28,7 @@ if 'CI' in os.environ: setup(name='tempsdb', - version='0.4.5a1', + version='0.4.5a2', packages=['tempsdb'], install_requires=['satella>=2.14.24', 'ujson'], ext_modules=build([Multibuild('tempsdb', find_pyx('tempsdb')), ], diff --git a/tempsdb/chunks.pyx b/tempsdb/chunks.pyx index 360da4f..9704d09 100644 --- a/tempsdb/chunks.pyx +++ b/tempsdb/chunks.pyx @@ -91,22 +91,24 @@ cdef class Chunk: Switch self to mmap-based access instead of descriptor-based. No-op if already in mmap mode. + + :raises Corruption: unable to mmap file due to an unrecoverable error """ - if not isinstance(self.mmap, AlternativeMMap): - return 0 - try: - self.mmap = mmap.mmap(self.file.fileno(), 0) - self.file_lock_object = None - except OSError as e: - if e.errno in (11, # EAGAIN - memory is too low - 12, # ENOMEM - no memory space available - 19, # ENODEV - fs does not support mmapping - 75): # EOVERFLOW - too many pages would have been used - pass - else: - self.file.close() - self.closed = True - raise Corruption(f'Failed to mmap chunk file: {e}') + if isinstance(self.mmap, AlternativeMMap): + try: + self.mmap = mmap.mmap(self.file.fileno(), 0) + self.file_lock_object = None + except OSError as e: + if e.errno in (11, # EAGAIN - memory is too low + 12, # ENOMEM - no memory space available + 19, # ENODEV - fs does not support mmapping + 75): # EOVERFLOW - too many pages would have been used + pass + else: + self.file.close() + self.closed = True + raise Corruption(f'Failed to mmap chunk file: {e}') + return 0 cpdef int switch_to_descriptor_based_access(self) except -1: """ diff --git a/tempsdb/series.pyx b/tempsdb/series.pyx index b163237..a3fdbd4 100644 --- a/tempsdb/series.pyx +++ b/tempsdb/series.pyx @@ -1,13 +1,13 @@ import shutil import threading -import ujson -from satella.files import read_in_file +from satella.json import write_json_to_file, read_json_from_file from .chunks cimport create_chunk, Chunk from .exceptions import DoesNotExist, Corruption, InvalidState, AlreadyExists import os DEF METADATA_FILE_NAME = 'metadata.txt' +DEF DEFAULT_PAGE_SIZE=4096 cdef class TimeSeries: @@ -55,6 +55,11 @@ cdef class TimeSeries: """ Switches to mmap-based file access method for the entire series, and all chunks open inside. + + This will try to enable mmap on every chunk, but if mmap fails due to recoverable + errors, it will remain in descriptor-based mode. + + :raises Corruption: mmap failed due to an irrecoverable error """ self.descriptor_based_access = False cdef Chunk chunk @@ -78,22 +83,19 @@ cdef class TimeSeries: raise DoesNotExist('Chosen time series does not exist') cdef: - str metadata_s = read_in_file(os.path.join(self.path, METADATA_FILE_NAME), - 'utf-8', 'invalid json') dict metadata str filename list files = os.listdir(self.path) unsigned long long last_chunk_name try: - metadata = ujson.loads(metadata_s) # raises ValueError - # raises KeyError + metadata = read_json_from_file(os.path.join(self.path, METADATA_FILE_NAME)) self.block_size = metadata['block_size'] self.max_entries_per_chunk = metadata['max_entries_per_chunk'] self.last_entry_synced = metadata['last_entry_synced'] - self.page_size = metadata['page_size'] - except ValueError: - raise Corruption('Corrupted series') + self.page_size = metadata.get('page_size', DEFAULT_PAGE_SIZE) + except (OSError, ValueError) as e: + raise Corruption('Corrupted series: %s' % (e, )) except KeyError: raise Corruption('Could not read metadata item') @@ -159,36 +161,37 @@ cdef class TimeSeries: cpdef int trim(self, unsigned long long timestamp) except -1: """ - Delete all entries earlier than timestamp. + Delete all entries earlier than timestamp that are closed. Note that this will drop entire chunks, so it may be possible that some entries will linger - on. This will not delete currently opened chunks! + on. + + This will affect only closed chunks. Chunks ready to delete that are closed after + this will not be deleted, as :meth:`~tempsdb.series.TimeSeries.trim` will need + to be called again. :param timestamp: timestamp to delete entries earlier than """ - if len(self.chunks) == 1: - return 0 cdef: unsigned long long chunk_to_delete int refs - try: - with self.open_lock: - while len(self.chunks) >= 2 and timestamp > self.chunks[1]: - chunk_to_delete = self.chunks[0] - if chunk_to_delete in self.open_chunks: - refs = self.refs_chunks.get(chunk_to_delete, 0) - if not refs: - self.open_chunks[chunk_to_delete].delete() + if len(self.chunks) > 1: + try: + with self.open_lock: + while len(self.chunks) >= 2 and timestamp > self.chunks[1]: + chunk_to_delete = self.chunks[0] + if chunk_to_delete in self.open_chunks: + refs = self.refs_chunks.get(chunk_to_delete, 0) + if not refs: + self.open_chunks[chunk_to_delete].delete() + else: + # I would delete it, but it's open... + break else: - # I would delete it, but it's open... - return 0 - else: - os.unlink(os.path.join(self.path, str(chunk_to_delete))) - del self.chunks[0] - else: - return 0 - except IndexError: - return 0 + os.unlink(os.path.join(self.path, str(chunk_to_delete))) + del self.chunks[0] + except IndexError: + pass return 0 cpdef void close(self): @@ -197,18 +200,17 @@ cdef class TimeSeries: No further operations can be executed on it afterwards. """ - if self.closed: - return cdef: Chunk chunk list open_chunks - open_chunks = list(self.open_chunks.values()) - for chunk in open_chunks: - chunk.close() - if self.mpm is not None: - self.mpm.cancel() - self.mpm = None - self.closed = True + if not self.closed: + open_chunks = list(self.open_chunks.values()) + for chunk in open_chunks: + chunk.close() + if self.mpm is not None: + self.mpm.cancel() + self.mpm = None + self.closed = True cdef unsigned int get_index_of_chunk_for(self, unsigned long long timestamp): """ @@ -271,7 +273,9 @@ cdef class TimeSeries: cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1: """ - Mark the series as synced up to particular timestamp + Mark the series as synced up to particular timestamp. + + This will additionally sync the metadata. :param timestamp: timestamp of the last synced entry """ @@ -280,8 +284,12 @@ cdef class TimeSeries: return 0 cdef int sync_metadata(self) except -1: - with self.lock, open(os.path.join(self.path, METADATA_FILE_NAME), 'w') as f_out: - ujson.dump(self.get_metadata(), f_out) + """ + Write the metadata to disk + """ + with self.lock: + write_json_to_file(os.path.join(self.path, METADATA_FILE_NAME), self.get_metadata()) + return 0 cpdef int sync(self) except -1: """ @@ -300,12 +308,14 @@ cdef class TimeSeries: return 0 cdef dict get_metadata(self): - return { + cdef dict meta = { 'block_size': self.block_size, 'max_entries_per_chunk': self.max_entries_per_chunk, 'last_entry_synced': self.last_entry_synced, - 'page_size': self.page_size } + if self.page_size != DEFAULT_PAGE_SIZE: + meta['page_size'] = self.page_size + return meta cdef void register_memory_pressure_manager(self, object mpm): """ @@ -389,6 +399,7 @@ cdef class TimeSeries: raise InvalidState('series is closed') self.close() shutil.rmtree(self.path) + return 0 cpdef unsigned long open_chunks_mmap_size(self): """ @@ -404,18 +415,18 @@ cdef class TimeSeries: return ram cpdef TimeSeries create_series(str path, str name, unsigned int block_size, - int max_entries_per_chunk, int page_size=4096, + int max_entries_per_chunk, int page_size=DEFAULT_PAGE_SIZE, bint use_descriptor_based_access=False): if os.path.exists(path): raise AlreadyExists('This series already exists!') - os.mkdir(path) - with open(os.path.join(path, METADATA_FILE_NAME), 'w') as f_out: - ujson.dump({ + cdef dict meta = { 'block_size': block_size, 'max_entries_per_chunk': max_entries_per_chunk, - 'last_entry_synced': 0, - 'page_size': page_size - }, f_out - ) + 'last_entry_synced': 0 + } + if page_size != DEFAULT_PAGE_SIZE: + meta['page_size'] = page_size + os.mkdir(path) + write_json_to_file(os.path.join(path, METADATA_FILE_NAME), meta) return TimeSeries(path, name) diff --git a/unittest.Dockerfile b/unittest.Dockerfile index c567def..fc3401c 100644 --- a/unittest.Dockerfile +++ b/unittest.Dockerfile @@ -1,6 +1,6 @@ FROM python:3.8 -RUN pip install satella snakehouse nose2 wheel ujson coverage +RUN pip install satella>=2.14.24 snakehouse nose2 wheel ujson coverage ADD tempsdb /app/tempsdb ADD setup.py /app/setup.py -- GitLab