Skip to content
Snippets Groups Projects
Commit 8a7b8a1c authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

write chunk

parent 2c7ac975
No related branches found
No related tags found
No related merge requests found
Exceptions
==========
The base TempsDB exception is
.. autoclass:: tempsdb.exceptions.TempsDBError
The exceptions that inherit from it are:
.. autoclass:: tempsdb.exceptions.DoesNotExist
.. autoclass:: tempsdb.exceptions.Corruption
...@@ -10,6 +10,11 @@ Welcome to tempsdb's documentation! ...@@ -10,6 +10,11 @@ Welcome to tempsdb's documentation!
:maxdepth: 2 :maxdepth: 2
:caption: Contents: :caption: Contents:
time-series
exceptions
It tries to use mmap where possible, and in general be as zero-copy as possible (ie. the
only time data is unserialized is when a particular entry is read).
Indices and tables Indices and tables
......
Time series
===========
The name of the series must be a valid name for a directory on your filesystem.
...@@ -13,7 +13,7 @@ def find_pyx(*path) -> tp.List[str]: ...@@ -13,7 +13,7 @@ def find_pyx(*path) -> tp.List[str]:
setup(name='tempsdb', setup(name='tempsdb',
version='0.1_a1', version='0.1_a1',
packages=find_packages(include=['tempsdb', 'tempsdb.*']), packages=find_packages(include=['tempsdb', 'tempsdb.*']),
install_requires=['satella'], install_requires=['satella', 'ujson'],
ext_modules=build([Multibuild('tempsdb', find_pyx('tempsdb')), ], ext_modules=build([Multibuild('tempsdb', find_pyx('tempsdb')), ],
compiler_directives={ compiler_directives={
'language_level': '3', 'language_level': '3',
......
cdef class Chunk:
cdef:
readonly str path
readonly unsigned long long min_ts
readonly unsigned long long max_ts
readonly unsigned long block_size
unsigned long pointer
readonly unsigned long entries
object file
object mmap
bint closed
cpdef void close(self)
cpdef tuple get_piece_at(self, unsigned int index)
cpdef Chunk create_chunk(str path, list data)
import os
import typing as tp
import struct
import mmap
from .exceptions import Corruption
STRUCT_QQL = struct.Struct('>QQL')
STRUCT_Q = struct.Struct('>Q')
cdef class Chunk:
"""
Represents a single chunk of time series
:param path: path to the chunk file
:type path: str
:ivar path: path to the chunk (str)
:ivar min_ts: timestamp of the first entry stored (int)
:ivar max_ts: timestamp of the last entry stored (int)
:ivar block_size: size of the data entries (int)
"""
def __init__(self, path: str):
self.closed = False
self.path = path
cdef bytes b
self.file = open(self.path, 'rb')
try:
self.mmap = mmap.mmap(self.file.fileno(), 0)
except OSError:
raise Corruption('Empty chunk file!')
try:
self.min_ts, self.max_ts, self.block_size = STRUCT_QQL.unpack(self.file.read(16))
except IOError:
raise Corruption('Could not read the header of the chunk file %s' % (self.path, ))
self.pointer = 8
self.entries = (os.path.getsize(self.path)-20) / self.block_size
cpdef void close(self):
"""
Close the chunk and close the allocated resources
"""
if self.closed:
return
self.mmap.close()
self.file.close()
def __del__(self):
self.close()
cpdef tuple get_piece_at(self, unsigned int index):
"""
Return a piece of data at a particular index, numbered from 0
:return: at piece of data at given index
:rtype: tp.Tuple[int, bytes]
"""
if index >= self.entries:
raise IndexError('Index too large')
cdef:
unsigned long starting_index = 20 + index * self.block_size
unsigned long stopping_index = starting_index + self.block_size
bytes bytes_at = self.mmap[starting_index:stopping_index]
unsigned long long ts = STRUCT_Q.unpack(self.mmap[starting_index:starting_index+8])[0]
return ts, self.mmap[starting_index+8:stopping_index]
cpdef Chunk create_chunk(str path, list data):
"""
Creates a new chunk on disk
:param path: path to the new chunk file
:type path: str
:param data: data to write, must be nonempty
:type data: tp.List[tp.Tuple[int, bytes]]
:raises ValueError: entries in data were not of equal size, or data was empty
"""
if not data:
raise ValueError('Data is empty')
file = open(path, 'wb')
cdef:
unsigned long long min_ts = 0xFFFFFFFFFFFFFFFF
unsigned long long max_ts = 0
bytes b
unsigned long long ts
unsigned long block_size = len(data[0][1])
for ts, b in data:
if ts < min_ts:
min_ts = ts
elif ts > max_ts:
max_ts = ts
file.write(STRUCT_QQL.pack(min_ts, max_ts, len(data[0][1])))
try:
for ts, b in data:
if len(b) != block_size:
raise ValueError('Block size has entries of not equal length')
file.write(STRUCT_Q.pack(ts))
file.write(b)
file.close()
except ValueError:
file.close()
os.unlink(path)
raise
return Chunk(path)
class TempsDBError(Exception):
"""Base class for TempsDB errors"""
class DoesNotExist(TempsDBError):
"""The required resource does not exist"""
class Corruption(TempsDBError):
"""Corruption was detected in the dataset"""
...@@ -2,5 +2,11 @@ from .database cimport Database ...@@ -2,5 +2,11 @@ from .database cimport Database
cdef class TimeSeries: cdef class TimeSeries:
cdef: cdef:
str path
Database parent Database parent
str name str name
int block_size
unsigned long long last_entry_ts
list chunks
cpdef void sync(self)
import ujson
from satella.files import read_in_file
from .chunks cimport Chunk
from .database cimport Database from .database cimport Database
from .exceptions import DoesNotExist, Corruption
import os
cdef class TimeSeries: cdef class TimeSeries:
def __init__(self, parent: Database, name: str): def __init__(self, parent: Database, name: str):
self.parent = parent self.parent = parent
self.name = name self.name = name
if not os.path.isdir(self.parent.path, name):
raise DoesNotExist('Chosen time series does not exist')
self.path = os.path.join(self.parent.path, self.name)
cdef str metadata_s = read_in_file(os.path.join(self.path, 'metadata.txt'),
'utf-8', 'invalid json')
cdef dict metadata
try:
metadata = ujson.loads(metadata_s)
except ValueError:
raise Corruption('Corrupted series')
cdef list files = os.path.listdir(self.path)
cdef set files_s = set(files)
files_s.remove('metadata.txt')
self.chunks = []
cdef str chunk
for chunk in files_s:
try:
self.chunks.append(int(chunk))
except ValueError:
raise Corruption('Detected invalid file "%s"' % (chunk, ))
self.last_entry_ts = metadata['last_entry_ts']
self.block_size = metadata['block_size']
cpdef void sync(self):
"""
Synchronize the data kept in the memory with these kept on disk
"""
import unittest import unittest
from tempsdb.chunks import create_chunk
class TestDB(unittest.TestCase): class TestDB(unittest.TestCase):
def test_something(self): def test_chunk(self):
... data = [(0, b'ala '), (1, b'ma '), (4, b'kota')]
chunk = create_chunk('chunk.db', data)
self.assertEqual(chunk.min_ts, 0)
self.assertEqual(chunk.max_ts, 4)
self.assertEqual(chunk.bs, 4)
self.assertEqual(chunk.get_piece_at(0), (0, b'ala '))
self.assertEqual(chunk.get_piece_at(1), (1, b'ma '))
self.assertEqual(chunk.get_piece_at(2), (4, b'kota'))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment