Skip to content
Snippets Groups Projects
Commit 1b57d6ec authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

writing done

parent ebc7e338
No related branches found
No related tags found
No related merge requests found
...@@ -10,3 +10,5 @@ The exceptions that inherit from it are: ...@@ -10,3 +10,5 @@ The exceptions that inherit from it are:
.. autoclass:: tempsdb.exceptions.DoesNotExist .. autoclass:: tempsdb.exceptions.DoesNotExist
.. autoclass:: tempsdb.exceptions.Corruption .. autoclass:: tempsdb.exceptions.Corruption
.. autoclass:: tempsdb.exceptions.InvalidState
...@@ -7,10 +7,11 @@ cdef class Chunk: ...@@ -7,10 +7,11 @@ cdef class Chunk:
readonly unsigned long entries readonly unsigned long entries
object file object file
object mmap object mmap
bint closed bint closed, writable
object write_lock
cpdef void close(self) cpdef void close(self)
cpdef tuple get_piece_at(self, unsigned int index) cpdef tuple get_piece_at(self, unsigned int index)
cpdef int put(self, unsigned long long timestamp, bytes data) except -1
cpdef Chunk create_chunk(str path, list data) cpdef Chunk create_chunk(str path, list data)
import os import os
import threading
import typing as tp import typing as tp
import struct import struct
import mmap import mmap
from .exceptions import Corruption from .exceptions import Corruption, InvalidState
STRUCT_QQL = struct.Struct('>QQL') STRUCT_L = struct.Struct('>L')
STRUCT_Q = struct.Struct('>Q') STRUCT_Q = struct.Struct('>Q')
DEF HEADER_SIZE = 20 DEF HEADER_SIZE = 4
DEF TIMESTAMP_SIZE = 8 DEF TIMESTAMP_SIZE = 8
...@@ -14,7 +15,7 @@ cdef class Chunk: ...@@ -14,7 +15,7 @@ cdef class Chunk:
""" """
Represents a single chunk of time series. Represents a single chunk of time series.
This also implements an iterator interface. This will iterate with tp.Tuple[int, bytes]. This also implements an iterator interface, and will iterate with tp.Tuple[int, bytes].
:param path: path to the chunk file :param path: path to the chunk file
:type path: str :type path: str
...@@ -25,26 +26,58 @@ cdef class Chunk: ...@@ -25,26 +26,58 @@ cdef class Chunk:
:ivar block_size: size of the data entries (int) :ivar block_size: size of the data entries (int)
:ivar entries: amount of entries in this chunk (int) :ivar entries: amount of entries in this chunk (int)
""" """
def __init__(self, path: str): def __init__(self, path: str, writable: bool = True):
cdef: cdef:
unsigned long long file_size = os.path.getsize(path) unsigned long long file_size = os.path.getsize(path)
bytes b bytes b
self.writable = writable
self.write_lock = threading.Lock()
self.closed = False self.closed = False
self.path = path self.path = path
self.file = open(self.path, 'rb') self.file = open(self.path, 'rb+' if self.writable else 'rb')
try: try:
self.mmap = mmap.mmap(self.file.fileno(), file_size, access=mmap.ACCESS_READ) if self.writable:
self.mmap = mmap.mmap(self.file.fileno(), file_size)
else:
self.mmap = mmap.mmap(self.file.fileno(), file_size, access=mmap.ACCESS_READ)
except OSError as e: except OSError as e:
self.file.close() self.file.close()
self.closed = True self.closed = True
raise Corruption(f'Empty chunk file!') raise Corruption(f'Empty chunk file!')
try: try:
self.min_ts, self.max_ts, self.block_size = STRUCT_QQL.unpack(self.mmap[:HEADER_SIZE]) self.block_size, = STRUCT_L.unpack(self.mmap[:HEADER_SIZE])
except struct.error: except struct.error:
self.close() self.close()
raise Corruption('Could not read the header of the chunk file %s' % (self.path, )) raise Corruption('Could not read the header of the chunk file %s' % (self.path, ))
print(f'Readed in {file_size} bytes bs={self.block_size}')
self.entries = (file_size-HEADER_SIZE) // (self.block_size+TIMESTAMP_SIZE) self.entries = (file_size-HEADER_SIZE) // (self.block_size+TIMESTAMP_SIZE)
self.max_ts, = STRUCT_Q.unpack(self.mmap[-TIMESTAMP_SIZE-self.block_size:-self.block_size])
cpdef int put(self, unsigned long long timestamp, bytes data) except -1:
"""
Append a record to this chunk
:param timestamp: timestamp of the entry
:type timestamp: int
:param data: data to write
:type data: bytes
:raises InvalidState: chunk is closed or not writable
:raises ValueError: invalid timestamp or data
"""
if self.closed or not self.writable:
raise InvalidState('chunk is closed')
if len(data) != self.block_size:
raise ValueError('data not equal in length to block size!')
if timestamp <= self.max_ts:
raise ValueError('invalid timestamp')
cdef bytearray data_to_write = bytearray(TIMESTAMP_SIZE+self.block_size)
data_to_write[0:TIMESTAMP_SIZE] = STRUCT_Q.pack(timestamp)
data_to_write[TIMESTAMP_SIZE:] = data
with self.write_lock:
self.file.seek(0, 2)
self.file.write(data_to_write)
self.entries += 1
self.mmap.resize(self.entries*(8+self.block_size)+HEADER_SIZE)
return 0
def __iter__(self) -> tp.Iterator[tp.Tuple[int, bytes]]: def __iter__(self) -> tp.Iterator[tp.Tuple[int, bytes]]:
cdef unsigned long i = 0 cdef unsigned long i = 0
...@@ -99,21 +132,13 @@ cpdef Chunk create_chunk(str path, list data): ...@@ -99,21 +132,13 @@ cpdef Chunk create_chunk(str path, list data):
raise ValueError('Data is empty') raise ValueError('Data is empty')
file = open(path, 'wb') file = open(path, 'wb')
cdef: cdef:
unsigned long long min_ts = 0xFFFFFFFFFFFFFFFF
unsigned long long max_ts = 0
bytes b bytes b
unsigned long long ts unsigned long long ts
unsigned long block_size = len(data[0][1]) unsigned long block_size = len(data[0][1])
unsigned long long last_ts = 0 unsigned long long last_ts = 0
bint first_element = True bint first_element = True
for ts, b in data: file.write(STRUCT_L.pack(block_size))
if ts < min_ts:
min_ts = ts
elif ts > max_ts:
max_ts = ts
file.write(STRUCT_QQL.pack(min_ts, max_ts, block_size))
try: try:
for ts, b in data: for ts, b in data:
if not first_element: if not first_element:
......
cdef class Database: cdef class Database:
cdef: cdef:
str path str path
bint closed
cpdef void close(self)
...@@ -4,4 +4,12 @@ cdef class Database: ...@@ -4,4 +4,12 @@ cdef class Database:
""" """
def __init__(self, path: str): def __init__(self, path: str):
self.path = path self.path = path
self.closed = False
cpdef void close(self):
"""
Close this TempsDB database
"""
if self.closed:
return
self.closed = True
...@@ -8,3 +8,6 @@ class DoesNotExist(TempsDBError): ...@@ -8,3 +8,6 @@ class DoesNotExist(TempsDBError):
class Corruption(TempsDBError): class Corruption(TempsDBError):
"""Corruption was detected in the dataset""" """Corruption was detected in the dataset"""
class InvalidState(TempsDBError):
"""An attempt was made to write to a resource that's closed"""
from .database cimport Database from .database cimport Database
from .chunks cimport Chunk
cdef class TimeSeries: cdef class TimeSeries:
cdef: cdef:
object lock bint closed
object lock, fopen_lock
str path str path
Database parent Database parent
str name str name
...@@ -15,8 +18,11 @@ cdef class TimeSeries: ...@@ -15,8 +18,11 @@ cdef class TimeSeries:
list chunks list chunks
dict open_chunks dict open_chunks
list data_in_memory list data_in_memory
Chunk last_chunk
cdef dict _get_metadata(self) cdef dict _get_metadata(self)
cpdef void close(self)
cpdef Chunk open_chunk(self, unsigned long long name)
cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1 cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1
cpdef int try_sync(self) except -1 cpdef int try_sync(self) except -1
cpdef int _sync_metadata(self) except -1 cpdef int _sync_metadata(self) except -1
......
...@@ -3,9 +3,9 @@ import time ...@@ -3,9 +3,9 @@ import time
import ujson import ujson
from satella.files import read_in_file from satella.files import read_in_file
from .chunks cimport create_chunk from .chunks cimport create_chunk, Chunk
from .database cimport Database from .database cimport Database
from .exceptions import DoesNotExist, Corruption from .exceptions import DoesNotExist, Corruption, InvalidState
import os import os
DEF METADATA_FILE_NAME = 'metadata.txt' DEF METADATA_FILE_NAME = 'metadata.txt'
...@@ -20,28 +20,30 @@ cdef class TimeSeries: ...@@ -20,28 +20,30 @@ cdef class TimeSeries:
""" """
def __init__(self, parent: Database, name: str): def __init__(self, parent: Database, name: str):
self.lock = threading.Lock() self.lock = threading.Lock()
self.fopen_lock = threading.Lock()
self.parent = parent self.parent = parent
self.name = name self.name = name
self.closed = False
if not os.path.isdir(self.parent.path, name): if not os.path.isdir(self.parent.path, name):
raise DoesNotExist('Chosen time series does not exist') raise DoesNotExist('Chosen time series does not exist')
self.path = os.path.join(self.parent.path, self.name) self.path = os.path.join(self.parent.path, self.name)
cdef:
cdef str metadata_s = read_in_file(os.path.join(self.path, METADATA_FILE_NAME), str metadata_s = read_in_file(os.path.join(self.path, METADATA_FILE_NAME),
'utf-8', 'invalid json') 'utf-8', 'invalid json')
cdef dict metadata dict metadata
list files = os.path.listdir(self.path)
set files_s = set(files)
str chunk
try: try:
metadata = ujson.loads(metadata_s) metadata = ujson.loads(metadata_s)
except ValueError: except ValueError:
raise Corruption('Corrupted series') raise Corruption('Corrupted series')
cdef list files = os.path.listdir(self.path)
cdef set files_s = set(files)
files_s.remove('metadata.txt') files_s.remove('metadata.txt')
self.chunks = [] # type: tp.List[int] # sorted by ASC self.chunks = [] # type: tp.List[int] # sorted by ASC
cdef str chunk
for chunk in files: for chunk in files:
try: try:
self.chunks.append(int(chunk)) self.chunks.append(int(chunk))
...@@ -61,6 +63,40 @@ cdef class TimeSeries: ...@@ -61,6 +63,40 @@ cdef class TimeSeries:
self.data_in_memory = [] self.data_in_memory = []
self.open_chunks = {} # tp.Dict[int, Chunk] self.open_chunks = {} # tp.Dict[int, Chunk]
self.last_synced = time.monotonic() self.last_synced = time.monotonic()
self.last_chunk = Chunk(os.path.join(self.path, str(max(self.chunks))))
cpdef Chunk open_chunk(self, unsigned long long name):
"""
Opens a provided chunk
:param name: name of the chunk
:type name: int
:return: chunk
:rtype: Chunk
:raises DoesNotExist: chunk not found
:raises InvalidState: resource closed
"""
if self.closed:
raise InvalidState('Series is closed')
if name not in self.chunks:
raise DoesNotExist('Invalid chunk!')
with self.fopen_lock:
if name not in self.open_chunks:
self.open_chunks[name] = Chunk(os.path.join(self.path, str(name)))
return self.open_chunks[name]
cpdef void close(self):
"""
Close the series.
No further operations can be executed on it afterwards.
"""
if self.closed:
return
cdef Chunk chunk
for chunk in self.data_in_memory.values():
chunk.close()
self.closed = True
cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1: cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1:
""" """
...@@ -76,7 +112,11 @@ cdef class TimeSeries: ...@@ -76,7 +112,11 @@ cdef class TimeSeries:
cpdef int sync(self) except -1: cpdef int sync(self) except -1:
""" """
Synchronize the data kept in the memory with these kept on disk Synchronize the data kept in the memory with these kept on disk
:raises InvalidState: the resource is closed
""" """
if self.closed:
raise InvalidState('series is closed')
cdef: cdef:
unsigned long long min_ts = self.data_in_memory[0][0] unsigned long long min_ts = self.data_in_memory[0][0]
str path = os.path.join(self.path, str(min_ts)) str path = os.path.join(self.path, str(min_ts))
...@@ -125,15 +165,23 @@ cdef class TimeSeries: ...@@ -125,15 +165,23 @@ cdef class TimeSeries:
:param data: data to write :param data: data to write
:type data: bytes :type data: bytes
:raises ValueError: Timestamp not larger than previous timestamp or invalid block size :raises ValueError: Timestamp not larger than previous timestamp or invalid block size
:raises InvalidState: the resource is closed
""" """
if self.closed:
raise InvalidState('series is closed')
if len(data) != self.block_size: if len(data) != self.block_size:
raise ValueError('Invalid block size') raise ValueError('Invalid block size')
if timestamp <= self.last_entry_ts: if timestamp <= self.last_entry_ts:
raise ValueError('Timestamp not larger than previous timestamp') raise ValueError('Timestamp not larger than previous timestamp')
with self.lock: with self.lock:
self.data_in_memory.append((timestamp, data)) if len(self.last_chunk) >= self.max_entries_per_block:
self.last_chunk.close()
self.last_chunk = create_chunk(os.path.join(self.path, str(timestamp)),
[(timestamp, data)])
else:
self.last_chunk.put(timestamp, data)
self.last_entry_ts = timestamp self.last_entry_ts = timestamp
if len(self.data_in_memory) >= self.max_entries_per_block:
self.sync()
return 0 return 0
import os
import unittest import unittest
from tempsdb.chunks import create_chunk from tempsdb.chunks import create_chunk
...@@ -14,3 +15,6 @@ class TestDB(unittest.TestCase): ...@@ -14,3 +15,6 @@ class TestDB(unittest.TestCase):
self.assertEqual(chunk.get_piece_at(2), (4, b'kota')) self.assertEqual(chunk.get_piece_at(2), (4, b'kota'))
self.assertEqual(len(chunk), 3) self.assertEqual(len(chunk), 3)
self.assertEqual(list(iter(chunk)), data) self.assertEqual(list(iter(chunk)), data)
chunk.put(5, b'test')
chunk.close()
self.assertEqual(os.path.getsize('chunk.db'), 4+4*12)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment