diff --git a/README.md b/README.md index 0523f9a9338eb313c1da871d768d6b83156137ca..9bcc7aaa80dd8dfd89d7eb96adfcf097c645b027 100644 --- a/README.md +++ b/README.md @@ -80,6 +80,7 @@ Then copy your resulting wheel and install it via pip on the target system. ## v0.4.3 +* improving handling mmap failures on too low memory * slightly reduced `metadata.txt` by defaulting `page_size` * moved `Chunk` * added support for gzipping diff --git a/setup.py b/setup.py index bf588d252b1e9e6c37519798c10df9a979ee2b11..587c9e7f35286747446780b0b4184067487560da 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,6 @@ from distutils.core import setup from setuptools import Extension from snakehouse import Multibuild, build -from satella.distutils import monkey_patch_parallel_compilation def find_pyx(*path) -> tp.List[str]: @@ -28,9 +27,9 @@ if 'CI' in os.environ: setup(name='tempsdb', - version='0.5a4', + version='0.5.0_a1', packages=['tempsdb'], - install_requires=['satella', 'ujson'], + install_requires=['satella>=2.14.21', 'ujson'], ext_modules=build([Multibuild('tempsdb', find_pyx('tempsdb')), ], compiler_directives=directives), # ext_modules=cythonize(extensions, diff --git a/tempsdb/chunks/base.pyx b/tempsdb/chunks/base.pyx index 9071c0a23a5605284ad63902e342c7bc9ba65126..59d43772c57b79e6821b2a3452ef86efe1a59fa7 100644 --- a/tempsdb/chunks/base.pyx +++ b/tempsdb/chunks/base.pyx @@ -1,3 +1,4 @@ +import gzip import io import os import threading @@ -32,7 +33,10 @@ cdef class AlternativeMMap: def __init__(self, io_file: io.BinaryIO, file_lock_object): self.io = io_file - self.io.seek(0, 2) + if isinstance(io_file, gzip.GzipFile): + self.size = self.io.size + else: + self.io.seek(self.file_size, 0) self.size = self.io.tell() self.file_lock_object = file_lock_object @@ -121,7 +125,7 @@ cdef class Chunk: return 0 cpdef object open_file(self, str path): - self.file = open(self.path, 'rb+') + return open(self.path, 'rb+') def __init__(self, parent: tp.Optional[TimeSeries], path: str, page_size: int, use_descriptor_access: bool = False): @@ -141,7 +145,7 @@ cdef class Chunk: try: self.mmap = mmap.mmap(self.file.fileno(), 0) except OSError as e: - if e.errno == 12: # Cannot allocate memory + if e.errno in (11, 12): # Cannot allocate memory or memory range exhausted self.file_lock_object = threading.Lock() self.mmap = AlternativeMMap(self.file, self.file_lock_object) else: @@ -282,8 +286,8 @@ cdef class Chunk: self.extend() cdef unsigned long long ptr_end = self.pointer + TIMESTAMP_SIZE # Append entry - self.mmap[self.pointer:ptr_end] = STRUCT_Q.pack(timestamp) - self.mmap[ptr_end:ptr_end+self.block_size] = data + cdef bytes b = STRUCT_Q.pack(timestamp) + data + self.mmap[self.pointer:ptr_end+self.block_size] = b self.entries += 1 # Update entries count self.mmap[self.file_size-FOOTER_SIZE:self.file_size] = STRUCT_L.pack(self.entries) diff --git a/tempsdb/chunks/direct.pyx b/tempsdb/chunks/direct.pyx index 5b02429b03586e4c69f86a67212398d68cfefeb0..56d469b2f95d97e2b2a3b2de4c219291c8015597 100644 --- a/tempsdb/chunks/direct.pyx +++ b/tempsdb/chunks/direct.pyx @@ -1,6 +1,7 @@ import gzip import typing as tp import struct +import warnings from ..series cimport TimeSeries from .base cimport Chunk @@ -29,8 +30,19 @@ cdef class DirectChunk(Chunk): :raises ValueError: non-direct descriptor was requested and gzip was enabled """ def __init__(self, parent: tp.Optional[TimeSeries], path: str, page_size: int, - use_descriptor_access: bool = True, + use_descriptor_access: tp.Optional[bool] = None, gzip_compression_level: int = 0): + if path.endswith('.gz'): + warnings.warn('Please pass the path without .gz') + path = path.replace('.gz', '') + if path.endswith('.direct'): + warnings.warn('Please pass the path without .direct') + path = path.replace('.direct', '') + if use_descriptor_access is None: + use_descriptor_access = False + if gzip_compression_level: + use_descriptor_access = True + self.gzip = gzip_compression_level if gzip_compression_level: @@ -41,26 +53,34 @@ cdef class DirectChunk(Chunk): if gzip_compression_level: if not use_descriptor_access: raise ValueError('Use descriptor access must be enabled when using gzip') - super().__init__(parent, path, page_size, use_descriptor_access) + super().__init__(parent, path, page_size, + use_descriptor_access=use_descriptor_access | bool(gzip_compression_level)) cpdef object open_file(self, str path): if self.gzip: - self.file = gzip.open(path, 'rb+', compresslevel=self.gzip) + return gzip.open(path, 'wb+', compresslevel=self.gzip) else: - self.file = open(path, 'rb+') + return super().open_file(path) cpdef int extend(self) except -1: return 0 cpdef int after_init(self) except -1: + if isinstance(self.file, gzip.GzipFile): + self.file_size = self.file.size + else: + self.io.seek(0, 2) + self.file_size = self.file.tell() self.entries = (self.file_size - HEADER_SIZE) // self.block_size_plus self.pointer = self.file_size - self.max_ts, = STRUCT_Q.unpack( - self.mmap[self.file_size-self.block_size_plus:self.file_size-self.block_size]) + d = (self.file_size - self.block_size) - (self.file_size-self.block_size_plus) + cdef bytes b = self.mmap[self.file_size-self.block_size_plus:self.file_size-self.block_size] + print(self.file, d, repr(b)) + self.max_ts, = STRUCT_Q.unpack(b) return 0 cpdef int append(self, unsigned long long timestamp, bytes data) except -1: - cdef bytearray b + cdef bytes b if self.file_lock_object: self.file_lock_object.acquire() try: diff --git a/tempsdb/chunks/maker.pxd b/tempsdb/chunks/maker.pxd index 7b79b97b44bca55e36b2c47602644e4109798199..ed6cd7b49cec72cbe9e96bf5d3044fd333019e19 100644 --- a/tempsdb/chunks/maker.pxd +++ b/tempsdb/chunks/maker.pxd @@ -4,4 +4,4 @@ from ..series cimport TimeSeries cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timestamp, bytes data, int page_size, bint descriptor_based_access=*, - bint use_direct_mode=*, int gzip_compresion_level=*) + bint use_direct_mode=*, int gzip_compression_level=*) diff --git a/tempsdb/chunks/maker.pyx b/tempsdb/chunks/maker.pyx index e8613078b8cbf2e77da8bb804d7d458a9d8a1f17..e08a833736b6f2f7fddd2086fc787689fb782c08 100644 --- a/tempsdb/chunks/maker.pyx +++ b/tempsdb/chunks/maker.pyx @@ -12,7 +12,7 @@ STRUCT_L = struct.Struct('<L') cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timestamp, bytes data, int page_size, bint descriptor_based_access=False, - bint use_direct_mode = False, int gzip_compresion_level=0): + bint use_direct_mode = False, int gzip_compression_level=0): """ Creates a new chunk on disk @@ -25,22 +25,23 @@ cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timesta Default is False :param use_direct_mode: if True, chunk will be created using direct mode, without page preallocation - :param gzip_compresion_level: gzip compression level. Use 0 to disable compression. + :param gzip_compression_level: gzip compression level. Use 0 to disable compression. :raises ValueError: entries in data were not of equal size, or data was empty or data was not sorted by timestamp or same timestamp appeared twice :raises AlreadyExists: chunk already exists """ + cdef str original_path = path if os.path.exists(path): raise AlreadyExists('chunk already exists!') if not data: raise ValueError('Data is empty') - if not gzip_compresion_level and use_direct_mode: + if not gzip_compression_level and use_direct_mode: path = path + '.direct' - elif gzip_compresion_level: + elif gzip_compression_level: path = path + '.gz' - if gzip_compresion_level: - file = gzip.open(path, 'wb', compresslevel=gzip_compresion_level) + if gzip_compression_level: + file = gzip.open(path, 'wb', compresslevel=gzip_compression_level) else: file = open(path, 'wb') cdef: @@ -67,13 +68,13 @@ cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timesta footer[-4:] = b'\x01\x00\x00\x00' # 1 in little endian file.write(footer) file.close() - if gzip_compresion_level: - return DirectChunk(parent, path, page_size, use_descriptor_access=False, - gzip_compresion_level=gzip_compresion_level) + if gzip_compression_level: + return DirectChunk(parent, original_path, page_size, use_descriptor_access=True, + gzip_compression_level=gzip_compression_level) else: if use_direct_mode: - return DirectChunk(parent, path, page_size, + return DirectChunk(parent, original_path, page_size, use_descriptor_access=descriptor_based_access) else: - return Chunk(parent, path, page_size, use_descriptor_access=descriptor_based_access) + return Chunk(parent, original_path, page_size, use_descriptor_access=descriptor_based_access) diff --git a/tempsdb/series.pyx b/tempsdb/series.pyx index d86bc31b92114558d6240c0521351de267637a6d..fcd205e5991256dc19ea5fcb3038d80b0938ece8 100644 --- a/tempsdb/series.pyx +++ b/tempsdb/series.pyx @@ -132,17 +132,16 @@ cdef class TimeSeries: raise Corruption('Could not read metadata item') self.open_chunks = {} # tp.Dict[int, Chunk] + self.chunks = [] # type: tp.List[tp.Tuple[int, bool, bool]] # sorted by ASC + #: timestamp, is_direct, is_gzip if not len(files): raise Corruption('Empty directory!') elif len(files) == 1: # empty series self.last_chunk = None - self.chunks = [] self.last_entry_ts = 0 else: - self.chunks = [] # type: tp.List[tp.Tuple[int, bool, bool]] # sorted by ASC - # timestamp, is_direct, is_gzip for filename in files: if filename == METADATA_FILE_NAME: continue @@ -185,7 +184,7 @@ cdef class TimeSeries: """ if self.closed: raise InvalidState('Series is closed') - if name not in self.chunks: + if name not in (v[0] for v in self.chunks): raise DoesNotExist('Invalid chunk!') cdef Chunk chunk with self.open_lock: @@ -195,7 +194,7 @@ cdef class TimeSeries: os.path.join(self.path, str(name)), self.page_size, use_descriptor_access=True, - gzip_compression_level=self.gzip_level) + gzip_compression_level=self.gzip_level if is_gzip else 0) else: chunk = Chunk(self, os.path.join(self.path, str(name)), @@ -292,13 +291,13 @@ cdef class TimeSeries: unsigned int mid while lo < hi: mid = (lo+hi)//2 - if self.chunks[mid] < timestamp: + if self.chunks[mid][0] < timestamp: lo = mid+1 else: hi = mid try: - if self.chunks[lo] == timestamp: + if self.chunks[lo][0] == timestamp: return lo else: return lo-1 @@ -319,8 +318,8 @@ cdef class TimeSeries: if start > stop: raise ValueError('start larger than stop') - if start < self.chunks[0]: - start = self.chunks[0] + if start < self.chunks[0][0]: + start = self.chunks[0][0] if stop > self.last_entry_ts: stop = self.last_entry_ts @@ -472,7 +471,7 @@ cdef class TimeSeries: timestamp, data, self.page_size, descriptor_based_access=is_descriptor, use_direct_mode=is_descriptor, - gzip_compresion_level=self.gzip_level) + gzip_compression_level=self.gzip_level) self.open_chunks[timestamp] = self.last_chunk self.incref_chunk(timestamp) self.chunks.append((timestamp, is_descriptor, bool(self.gzip_level))) diff --git a/tests/test_db.py b/tests/test_db.py index 4f0406830df6f066dab47d81892d0a2a3c76a747..76744b6a9f9fb887f7a8e8fd95933280d5f505d8 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -1,7 +1,8 @@ import os import unittest -from tempsdb.chunks import Chunk +from tempsdb.chunks.base import Chunk +from tempsdb.chunks.maker import create_chunk class TestDB(unittest.TestCase): @@ -42,7 +43,7 @@ class TestDB(unittest.TestCase): self.do_verify_series(series, 0, 1800) series.close() - def test_create_series(self): + def test_create_series_gzip(self): from tempsdb.series import create_series series = create_series('test.gz', 'test.gz', 1, 10, gzip_level=6) @@ -69,7 +70,6 @@ class TestDB(unittest.TestCase): self.assertLessEqual(items[-1][0], stop) def test_chunk_alternative(self): - from tempsdb.chunks import create_chunk data = [(0, b'ala '), (1, b'ma '), (4, b'kota')] chunk = create_chunk(None, 'chunk_a.db', 0, b'ala ', 4096) chunk.close() @@ -103,7 +103,6 @@ class TestDB(unittest.TestCase): self.assertEqual(os.path.getsize('chunk.db'), 8192) def test_chunk(self): - from tempsdb.chunks import create_chunk data = [(0, b'ala '), (1, b'ma '), (4, b'kota')] chunk = create_chunk(None, 'chunk.db', 0, b'ala ', 4096) chunk.append(1, b'ma ')