Skip to content
Snippets Groups Projects
Commit b7e49e70 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

add descriptor based access instead of only MMAP and open_chunks_ram_size

parent 40872e61
No related branches found
No related tags found
No related merge requests found
...@@ -21,6 +21,8 @@ So no variable encoding for you! ...@@ -21,6 +21,8 @@ So no variable encoding for you!
* added `get_first_entry_for` * added `get_first_entry_for`
* added `close_all_open_series` * added `close_all_open_series`
* added `TimeSeries.name` * added `TimeSeries.name`
* added option to use descriptor based access instead of mmap
* added `TimeSeries.open_chunks_ram_size`
## v0.1 ## v0.1
......
...@@ -19,6 +19,10 @@ iterators. ...@@ -19,6 +19,10 @@ iterators.
Stored time series with a 8-bit timestamp and a fixed length of data. Stored time series with a 8-bit timestamp and a fixed length of data.
So no variable encoding for you! So no variable encoding for you!
.. versionadded:: 0.2
When mmap fails due to memory issues, this falls back to slower fwrite()/fread() implementation.
You can also manually select the descriptor-based implementation if you want to.
Indices and tables Indices and tables
================== ==================
......
...@@ -21,7 +21,7 @@ def find_pyx(*path) -> tp.List[str]: ...@@ -21,7 +21,7 @@ def find_pyx(*path) -> tp.List[str]:
# #
setup(name='tempsdb', setup(name='tempsdb',
version='0.2_a6', version='0.2_a7',
packages=['tempsdb'], packages=['tempsdb'],
install_requires=['satella>=2.14.21', 'ujson'], install_requires=['satella>=2.14.21', 'ujson'],
ext_modules=build([Multibuild('tempsdb', find_pyx('tempsdb')), ], ext_modules=build([Multibuild('tempsdb', find_pyx('tempsdb')), ],
......
from .series cimport TimeSeries from .series cimport TimeSeries
cdef class AlternativeMMap:
cdef:
object io
unsigned long size
cdef class Chunk: cdef class Chunk:
cdef: cdef:
TimeSeries parent TimeSeries parent
...@@ -25,6 +31,7 @@ cdef class Chunk: ...@@ -25,6 +31,7 @@ cdef class Chunk:
cpdef unsigned int find_right(self, unsigned long long timestamp) cpdef unsigned int find_right(self, unsigned long long timestamp)
cdef int extend(self) except -1 cdef int extend(self) except -1
cpdef int delete(self) except -1 cpdef int delete(self) except -1
cpdef int switch_to_descriptor_based_access(self) except -1
cdef inline unsigned long long name(self): cdef inline unsigned long long name(self):
""" """
...@@ -44,4 +51,5 @@ cdef class Chunk: ...@@ -44,4 +51,5 @@ cdef class Chunk:
cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timestamp, cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timestamp,
bytes data, int page_size) bytes data, int page_size,
bint descriptor_based_access=*)
import io
import os import os
import typing as tp import typing as tp
import struct import struct
...@@ -13,17 +14,58 @@ STRUCT_L = struct.Struct('<L') ...@@ -13,17 +14,58 @@ STRUCT_L = struct.Struct('<L')
STRUCT_LQ = struct.Struct('<LQ') STRUCT_LQ = struct.Struct('<LQ')
cdef class AlternativeMMap:
"""
An alternative mmap implementation used when mmap cannot allocate due to memory issues
"""
def flush(self):
self.io.flush()
def madvise(self, a, b, c):
...
def resize(self, file_size: int):
self.size = file_size
def __init__(self, io_file: io.BinaryIO):
self.io = io_file
self.io.seek(0, 2)
self.size = self.io.tell()
def __getitem__(self, item: slice) -> bytes:
cdef:
unsigned long start = item.start
unsigned long stop = item.stop
self.io.seek(start, 0)
return self.io.read(stop-start)
def __setitem__(self, key: slice, value: bytes):
cdef:
unsigned long start = key.start
self.io.seek(start, 0)
self.io.write(value)
def close(self):
pass
cdef class Chunk: cdef class Chunk:
""" """
Represents a single chunk of time series. Represents a single chunk of time series.
This also implements an iterator interface, and will iterate with tp.Tuple[int, bytes], This also implements an iterator interface, and will iterate with tp.Tuple[int, bytes],
as well as a sequence protocol as well as a sequence protocol.
This will try to mmap opened files, but if mmap fails due to not enough memory this
will use descriptor-based access.
:param parent: parent time series :param parent: parent time series
:type parent: tp.Optional[TimeSeries] :type parent: tp.Optional[TimeSeries]
:param path: path to the chunk file :param path: path to the chunk file
:type path: str :type path: str
:param use_descriptor_access: whether to use descriptor based access instead of mmap
:ivar path: path to the chunk (str) :ivar path: path to the chunk (str)
:ivar min_ts: timestamp of the first entry stored (int) :ivar min_ts: timestamp of the first entry stored (int)
...@@ -32,7 +74,16 @@ cdef class Chunk: ...@@ -32,7 +74,16 @@ cdef class Chunk:
:ivar entries: amount of entries in this chunk (int) :ivar entries: amount of entries in this chunk (int)
:ivar page_size: size of the page (int) :ivar page_size: size of the page (int)
""" """
def __init__(self, parent: tp.Optional[TimeSeries], path: str, page_size: int): cpdef int switch_to_descriptor_based_access(self) except -1:
"""
Switch self to descriptor-based access instead of mmap
"""
self.mmap.close()
self.mmap = AlternativeMMap(self.file)
return 0
def __init__(self, parent: tp.Optional[TimeSeries], path: str, page_size: int,
use_descriptor_access: bool = False):
cdef bytes b cdef bytes b
self.file_size = os.path.getsize(path) self.file_size = os.path.getsize(path)
self.page_size = page_size self.page_size = page_size
...@@ -40,12 +91,20 @@ cdef class Chunk: ...@@ -40,12 +91,20 @@ cdef class Chunk:
self.closed = False self.closed = False
self.path = path self.path = path
self.file = open(self.path, 'rb+') self.file = open(self.path, 'rb+')
try:
self.mmap = mmap.mmap(self.file.fileno(), 0) if use_descriptor_access:
except OSError as e: self.mmap = AlternativeMMap(self.file)
self.file.close() else:
self.closed = True try:
raise Corruption(f'Empty chunk file!') self.mmap = mmap.mmap(self.file.fileno(), 0)
except OSError as e:
if e.errno == 12: # Cannot allocate memory
self.mmap = AlternativeMMap(self.file)
else:
self.file.close()
self.closed = True
raise Corruption(f'Failed to mmap chunk file: {e}')
try: try:
self.block_size, self.min_ts = STRUCT_LQ.unpack(self.mmap[0:HEADER_SIZE+TIMESTAMP_SIZE]) self.block_size, self.min_ts = STRUCT_LQ.unpack(self.mmap[0:HEADER_SIZE+TIMESTAMP_SIZE])
self.block_size_plus = self.block_size + TIMESTAMP_SIZE self.block_size_plus = self.block_size + TIMESTAMP_SIZE
...@@ -251,7 +310,7 @@ cdef class Chunk: ...@@ -251,7 +310,7 @@ cdef class Chunk:
cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timestamp, cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timestamp,
bytes data, int page_size): bytes data, int page_size, bint descriptor_based_access=False):
""" """
Creates a new chunk on disk Creates a new chunk on disk
...@@ -263,8 +322,11 @@ cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timesta ...@@ -263,8 +322,11 @@ cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timesta
:type timestamp: int :type timestamp: int
:param data: data of the first entry :param data: data of the first entry
:type data: bytes :type data: bytes
:param page_size: size of a single page for storage :param page_size: size of a single page for storage
:type page_size: int :type page_size: int
:param descriptor_based_access: whether to use descriptor based access instead of mmap.
Default is False
:type descriptor_based_access: bool
:raises ValueError: entries in data were not of equal size, or data was empty or data :raises ValueError: entries in data were not of equal size, or data was empty or data
was not sorted by timestamp or same timestamp appeared twice was not sorted by timestamp or same timestamp appeared twice
:raises AlreadyExists: chunk already exists :raises AlreadyExists: chunk already exists
...@@ -295,5 +357,5 @@ cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timesta ...@@ -295,5 +357,5 @@ cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timesta
footer[-4:] = b'\x01\x00\x00\x00' # 1 in little endian footer[-4:] = b'\x01\x00\x00\x00' # 1 in little endian
file.write(footer) file.write(footer)
file.close() file.close()
return Chunk(parent, path, page_size) return Chunk(parent, path, page_size, )
...@@ -9,11 +9,13 @@ cdef class Database: ...@@ -9,11 +9,13 @@ cdef class Database:
object mpm object mpm
cpdef int close(self) except -1 cpdef int close(self) except -1
cpdef TimeSeries get_series(self, str name) cpdef TimeSeries get_series(self, str name,
bint use_descriptor_based_access=*)
cpdef int register_memory_pressure_manager(self, object mpm) except -1 cpdef int register_memory_pressure_manager(self, object mpm) except -1
cpdef TimeSeries create_series(self, str name, int block_size, cpdef TimeSeries create_series(self, str name, int block_size,
unsigned long entries_per_chunk, unsigned long entries_per_chunk,
int page_size=*) int page_size=*,
bint use_descriptor_based_access=*)
cpdef list get_open_series(self) cpdef list get_open_series(self)
cpdef list get_all_series(self) cpdef list get_all_series(self)
cpdef int close_all_open_series(self) except -1 cpdef int close_all_open_series(self) except -1
......
...@@ -44,12 +44,18 @@ cdef class Database: ...@@ -44,12 +44,18 @@ cdef class Database:
output.append(series) output.append(series)
return series return series
cpdef TimeSeries get_series(self, name: str): cpdef TimeSeries get_series(self, name: str, bint use_descriptor_based_access = False):
""" """
Load and return an existing series Load and return an existing series
:param name: name of the series :param name: name of the series
:type name: str :type name: str
.. versionadded:: 0.2
:param use_descriptor_based_access: whether to use descriptor based access instead of mmap,
default is False
:type use_descriptor_based_access: bool
:return: a loaded time series :return: a loaded time series
:rtype: TimeSeries :rtype: TimeSeries
:raises DoesNotExist: series does not exist :raises DoesNotExist: series does not exist
...@@ -70,7 +76,8 @@ cdef class Database: ...@@ -70,7 +76,8 @@ cdef class Database:
return self.open_series[name] return self.open_series[name]
if not os.path.isdir(path): if not os.path.isdir(path):
raise DoesNotExist('series %s does not exist' % (name, )) raise DoesNotExist('series %s does not exist' % (name, ))
self.open_series[name] = result = TimeSeries(path, name) self.open_series[name] = result = TimeSeries(path, name,
use_descriptor_based_access=use_descriptor_based_access)
if self.mpm is not None: if self.mpm is not None:
result.register_memory_pressure_manager(self.mpm) result.register_memory_pressure_manager(self.mpm)
return result return result
...@@ -106,7 +113,6 @@ cdef class Database: ...@@ -106,7 +113,6 @@ cdef class Database:
raise DoesNotExist('series does not exist') raise DoesNotExist('series does not exist')
cdef: cdef:
unsigned long long minimum_ts = 0xFFFFFFFFFFFFFFFF unsigned long long minimum_ts = 0xFFFFFFFFFFFFFFFF
str name
list files = os.listdir(path) list files = os.listdir(path)
unsigned long long candidate_ts unsigned long long candidate_ts
if len(files) == 1: if len(files) == 1:
...@@ -133,7 +139,8 @@ cdef class Database: ...@@ -133,7 +139,8 @@ cdef class Database:
cpdef TimeSeries create_series(self, str name, int block_size, cpdef TimeSeries create_series(self, str name, int block_size,
unsigned long entries_per_chunk, unsigned long entries_per_chunk,
int page_size=4096): int page_size=4096,
bint use_descriptor_based_access=False):
""" """
Create a new series Create a new series
...@@ -143,8 +150,14 @@ cdef class Database: ...@@ -143,8 +150,14 @@ cdef class Database:
:type block_size: int :type block_size: int
:param entries_per_chunk: entries per chunk file :param entries_per_chunk: entries per chunk file
:type entries_per_chunk: int :type entries_per_chunk: int
:param page_size: size of a single page :param page_size: size of a single page. Default is 4096
:type page_size: int :type page_size: int
.. versionadded:: 0.2
:param use_descriptor_based_access: whether to use descriptor based access instead of mmap.
Default is False
:type use_descriptor_based_access: bool
:return: new series :return: new series
:rtype: TimeSeries :rtype: TimeSeries
:raises ValueError: block size was larger than page_size plus a timestamp :raises ValueError: block size was larger than page_size plus a timestamp
...@@ -156,7 +169,8 @@ cdef class Database: ...@@ -156,7 +169,8 @@ cdef class Database:
raise AlreadyExists('Series already exists') raise AlreadyExists('Series already exists')
cdef TimeSeries series = create_series(os.path.join(self.name, name), name, cdef TimeSeries series = create_series(os.path.join(self.name, name), name,
block_size, block_size,
entries_per_chunk, page_size=page_size) entries_per_chunk, page_size=page_size,
use_descriptor_based_access=use_descriptor_based_access)
self.open_series[name] = series self.open_series[name] = series
return series return series
......
...@@ -14,6 +14,7 @@ cdef class TimeSeries: ...@@ -14,6 +14,7 @@ cdef class TimeSeries:
readonly unsigned int block_size readonly unsigned int block_size
readonly unsigned long long last_entry_ts readonly unsigned long long last_entry_ts
unsigned int page_size unsigned int page_size
bint descriptor_based_access
list chunks list chunks
dict refs_chunks # type: tp.Dict[int, int] dict refs_chunks # type: tp.Dict[int, int]
dict open_chunks # type: tp.Dict[int, Chunk] dict open_chunks # type: tp.Dict[int, Chunk]
...@@ -35,9 +36,11 @@ cdef class TimeSeries: ...@@ -35,9 +36,11 @@ cdef class TimeSeries:
cpdef Iterator iterate_range(self, unsigned long long start, unsigned long long stop) cpdef Iterator iterate_range(self, unsigned long long start, unsigned long long stop)
cdef unsigned int get_index_of_chunk_for(self, unsigned long long timestamp) cdef unsigned int get_index_of_chunk_for(self, unsigned long long timestamp)
cpdef int trim(self, unsigned long long timestamp) except -1 cpdef int trim(self, unsigned long long timestamp) except -1
cpdef unsigned long open_chunks_ram_size(self)
cdef inline int get_references_for(self, unsigned long long timestamp): cdef inline int get_references_for(self, unsigned long long timestamp):
return self.refs_chunks.get(timestamp, 0) return self.refs_chunks.get(timestamp, 0)
cpdef TimeSeries create_series(str path, str name, unsigned int block_size, cpdef TimeSeries create_series(str path, str name, unsigned int block_size,
int max_entries_per_chunk, int page_size=4096): int max_entries_per_chunk, int page_size=*,
bint use_descriptor_based_access=*)
...@@ -10,9 +10,6 @@ import os ...@@ -10,9 +10,6 @@ import os
DEF METADATA_FILE_NAME = 'metadata.txt' DEF METADATA_FILE_NAME = 'metadata.txt'
cdef class TimeSeries: cdef class TimeSeries:
""" """
This is thread-safe This is thread-safe
...@@ -26,7 +23,8 @@ cdef class TimeSeries: ...@@ -26,7 +23,8 @@ cdef class TimeSeries:
:ivar name: name of the series (str) :ivar name: name of the series (str)
""" """
def __init__(self, path: str, name: str): def __init__(self, path: str, name: str, use_descriptor_based_access: bool = False):
self.descriptor_based_access = use_descriptor_based_access
self.mpm = None self.mpm = None
self.name = name self.name = name
self.lock = threading.RLock() self.lock = threading.RLock()
...@@ -114,7 +112,8 @@ cdef class TimeSeries: ...@@ -114,7 +112,8 @@ cdef class TimeSeries:
if name not in self.open_chunks: if name not in self.open_chunks:
self.open_chunks[name] = chunk = Chunk(self, self.open_chunks[name] = chunk = Chunk(self,
os.path.join(self.path, str(name)), os.path.join(self.path, str(name)),
self.page_size) self.page_size,
use_descriptor_access=self.descriptor_based_access)
else: else:
chunk = self.open_chunks[name] chunk = self.open_chunks[name]
self.incref_chunk(name) self.incref_chunk(name)
...@@ -339,7 +338,8 @@ cdef class TimeSeries: ...@@ -339,7 +338,8 @@ cdef class TimeSeries:
if self.last_chunk is not None: if self.last_chunk is not None:
self.decref_chunk(self.last_chunk.name()) self.decref_chunk(self.last_chunk.name())
self.last_chunk = create_chunk(self, os.path.join(self.path, str(timestamp)), self.last_chunk = create_chunk(self, os.path.join(self.path, str(timestamp)),
timestamp, data, self.page_size) timestamp, data, self.page_size,
descriptor_based_access=self.descriptor_based_access)
self.open_chunks[timestamp] = self.last_chunk self.open_chunks[timestamp] = self.last_chunk
self.incref_chunk(timestamp) self.incref_chunk(timestamp)
self.chunks.append(timestamp) self.chunks.append(timestamp)
...@@ -360,9 +360,23 @@ cdef class TimeSeries: ...@@ -360,9 +360,23 @@ cdef class TimeSeries:
self.close() self.close()
shutil.rmtree(self.path) shutil.rmtree(self.path)
cpdef unsigned long open_chunks_ram_size(self):
"""
.. versionadded:: 0.2
:return: how much RAM do the opened chunks consume?
:rtype: int
"""
cdef:
unsigned long ram = 0
Chunk chunk
for chunk in self.open_chunks.values():
ram += chunk.file_size
return ram
cpdef TimeSeries create_series(str path, str name, unsigned int block_size, cpdef TimeSeries create_series(str path, str name, unsigned int block_size,
int max_entries_per_chunk, int page_size=4096): int max_entries_per_chunk, int page_size=4096,
bint use_descriptor_based_access=False):
if os.path.exists(path): if os.path.exists(path):
raise AlreadyExists('This series already exists!') raise AlreadyExists('This series already exists!')
......
import os import os
import unittest import unittest
from tempsdb.chunks import Chunk
class TestDB(unittest.TestCase): class TestDB(unittest.TestCase):
def test_write_series(self): def test_write_series(self):
...@@ -39,6 +41,40 @@ class TestDB(unittest.TestCase): ...@@ -39,6 +41,40 @@ class TestDB(unittest.TestCase):
self.assertGreaterEqual(items[0][0], start) self.assertGreaterEqual(items[0][0], start)
self.assertLessEqual(items[-1][0], stop) self.assertLessEqual(items[-1][0], stop)
def test_chunk_alternative(self):
from tempsdb.chunks import create_chunk
data = [(0, b'ala '), (1, b'ma '), (4, b'kota')]
chunk = create_chunk(None, 'chunk_a.db', 0, b'ala ', 4096)
chunk.close()
chunk = Chunk(None, 'chunk_a.db', 4096, use_descriptor_access=True)
chunk.append(1, b'ma ')
chunk.append(4, b'kota')
self.assertEqual(chunk.min_ts, 0)
self.assertEqual(chunk.max_ts, 4)
self.assertEqual(chunk.block_size, 4)
self.assertEqual(chunk[0], (0, b'ala '))
self.assertEqual(chunk[1], (1, b'ma '))
self.assertEqual(chunk[2], (4, b'kota'))
self.assertEqual(len(chunk), 3)
self.assertEqual(list(iter(chunk)), data)
chunk.append(5, b'test')
self.assertEqual(chunk.find_left(0), 0)
self.assertEqual(chunk.find_left(1), 1)
self.assertEqual(chunk.find_left(2), 2)
self.assertEqual(chunk.find_left(3), 2)
self.assertEqual(chunk.find_left(4), 2)
self.assertEqual(chunk.find_left(5), 3)
self.assertEqual(chunk.find_left(6), 4)
self.assertEqual(chunk.find_right(0), 1)
self.assertEqual(chunk.find_right(1), 2)
self.assertEqual(chunk.find_right(2), 2)
self.assertEqual(chunk.find_right(3), 2)
self.assertEqual(chunk.find_right(4), 3)
self.assertEqual(chunk.find_right(5), 4)
self.assertEqual(chunk.find_right(6), 4)
chunk.close()
self.assertEqual(os.path.getsize('chunk.db'), 8192)
def test_chunk(self): def test_chunk(self):
from tempsdb.chunks import create_chunk from tempsdb.chunks import create_chunk
data = [(0, b'ala '), (1, b'ma '), (4, b'kota')] data = [(0, b'ala '), (1, b'ma '), (4, b'kota')]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment