Skip to content
Snippets Groups Projects
Commit 1794489f authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

fix tests, some more series

parent 8a7b8a1c
No related branches found
No related tags found
No related merge requests found
...@@ -14,12 +14,10 @@ jobs: ...@@ -14,12 +14,10 @@ jobs:
- run: - run:
command: | command: |
sudo pip install satella snakehouse sudo pip install satella snakehouse
sudo python setup.py install
sudo pip install pytest-xdist pytest-cov pytest pytest-forked pluggy py mock
name: Install necessary modules name: Install necessary modules
- run: - run:
command: | command: |
pytest sudo python setup.py test
name: Test name: Test
workflows: workflows:
......
.git
.circleci
docs
...@@ -27,6 +27,8 @@ share/python-wheels/ ...@@ -27,6 +27,8 @@ share/python-wheels/
*.egg *.egg
MANIFEST MANIFEST
docs/_build docs/_build
*.c
*.h
# PyInstaller # PyInstaller
# Usually these files are written by a python script from a template # Usually these files are written by a python script from a template
......
...@@ -32,7 +32,3 @@ max-line-length = 100 ...@@ -32,7 +32,3 @@ max-line-length = 100
[pep8] [pep8]
max-line-length = 100 max-line-length = 100
[bdist_wheel]
universal = 1
...@@ -18,6 +18,5 @@ setup(name='tempsdb', ...@@ -18,6 +18,5 @@ setup(name='tempsdb',
compiler_directives={ compiler_directives={
'language_level': '3', 'language_level': '3',
}), }),
python_requires='!=2.7.*,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,!=3.7.*', python_requires='!=2.7.*,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,!=3.7.*'
zip_safe=False
) )
from tempsdb.__bootstrap__ import bootstrap_cython_submodules
bootstrap_cython_submodules()
...@@ -19,6 +19,7 @@ cdef class Chunk: ...@@ -19,6 +19,7 @@ cdef class Chunk:
:ivar min_ts: timestamp of the first entry stored (int) :ivar min_ts: timestamp of the first entry stored (int)
:ivar max_ts: timestamp of the last entry stored (int) :ivar max_ts: timestamp of the last entry stored (int)
:ivar block_size: size of the data entries (int) :ivar block_size: size of the data entries (int)
:ivar entries: amount of entries in this chunk (int)
""" """
def __init__(self, path: str): def __init__(self, path: str):
self.closed = False self.closed = False
...@@ -31,7 +32,7 @@ cdef class Chunk: ...@@ -31,7 +32,7 @@ cdef class Chunk:
raise Corruption('Empty chunk file!') raise Corruption('Empty chunk file!')
try: try:
self.min_ts, self.max_ts, self.block_size = STRUCT_QQL.unpack(self.file.read(16)) self.min_ts, self.max_ts, self.block_size = STRUCT_QQL.unpack(self.file.read(16))
except IOError: except struct.error:
raise Corruption('Could not read the header of the chunk file %s' % (self.path, )) raise Corruption('Could not read the header of the chunk file %s' % (self.path, ))
self.pointer = 8 self.pointer = 8
self.entries = (os.path.getsize(self.path)-20) / self.block_size self.entries = (os.path.getsize(self.path)-20) / self.block_size
...@@ -65,16 +66,17 @@ cdef class Chunk: ...@@ -65,16 +66,17 @@ cdef class Chunk:
return ts, self.mmap[starting_index+8:stopping_index] return ts, self.mmap[starting_index+8:stopping_index]
cpdef Chunk create_chunk(str path, list data): cpdef Chunk create_chunk(str path, list data):
""" """
Creates a new chunk on disk Creates a new chunk on disk
:param path: path to the new chunk file :param path: path to the new chunk file
:type path: str :type path: str
:param data: data to write, must be nonempty :param data: data to write, list of tuple (timestamp, entry to write).
Must be nonempty and sorted by timestamp.
:type data: tp.List[tp.Tuple[int, bytes]] :type data: tp.List[tp.Tuple[int, bytes]]
:raises ValueError: entries in data were not of equal size, or data was empty :raises ValueError: entries in data were not of equal size, or data was empty or data
was not sorted by timestamp or same timestamp appeared twice
""" """
if not data: if not data:
raise ValueError('Data is empty') raise ValueError('Data is empty')
...@@ -85,20 +87,27 @@ cpdef Chunk create_chunk(str path, list data): ...@@ -85,20 +87,27 @@ cpdef Chunk create_chunk(str path, list data):
bytes b bytes b
unsigned long long ts unsigned long long ts
unsigned long block_size = len(data[0][1]) unsigned long block_size = len(data[0][1])
unsigned long long last_ts = 0
bint first_element = True
for ts, b in data: for ts, b in data:
if ts < min_ts: if ts < min_ts:
min_ts = ts min_ts = ts
elif ts > max_ts: elif ts > max_ts:
max_ts = ts max_ts = ts
file.write(STRUCT_QQL.pack(min_ts, max_ts, len(data[0][1]))) file.write(STRUCT_QQL.pack(min_ts, max_ts, block_size))
try: try:
for ts, b in data: for ts, b in data:
if not first_element:
if ts <= last_ts:
raise ValueError('Timestamp appeared twice or data was not sorted')
if len(b) != block_size: if len(b) != block_size:
raise ValueError('Block size has entries of not equal length') raise ValueError('Block size has entries of not equal length')
file.write(STRUCT_Q.pack(ts)) file.write(STRUCT_Q.pack(ts))
file.write(b) file.write(b)
file.close() last_ts = ts
first_element = False
file.close()
except ValueError: except ValueError:
file.close() file.close()
os.unlink(path) os.unlink(path)
......
...@@ -2,11 +2,24 @@ from .database cimport Database ...@@ -2,11 +2,24 @@ from .database cimport Database
cdef class TimeSeries: cdef class TimeSeries:
cdef: cdef:
object lock
str path str path
Database parent Database parent
str name str name
int block_size unsigned int max_entries_per_chunk
unsigned long long last_entry_ts double last_synced
readonly double interval_between_synces
readonly unsigned long long last_entry_synced
readonly unsigned int block_size
readonly unsigned long long last_entry_ts
list chunks list chunks
dict open_chunks
list data_in_memory
cdef dict _get_metadata(self)
cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1
cpdef int try_sync(self) except -1
cpdef int _sync_metadata(self) except -1
cpdef int put(self, unsigned long long timestamp, bytes data) except -1
cpdef int sync(self) except -1
cpdef void sync(self)
import threading
import time
import ujson import ujson
from satella.files import read_in_file from satella.files import read_in_file
from .chunks cimport Chunk from .chunks cimport Chunk, create_chunk
from .database cimport Database from .database cimport Database
from .exceptions import DoesNotExist, Corruption from .exceptions import DoesNotExist, Corruption
import os import os
DEF METADATA_FILE_NAME = 'metadata.txt'
cdef class TimeSeries: cdef class TimeSeries:
"""
This is thread-safe
:ivar last_entry_ts: timestamp of the last entry added (int)
:ivar last_entry_synced: timestamp of the last synchronized entry (int)
:ivar block_size: size of the writable block of data
"""
def __init__(self, parent: Database, name: str): def __init__(self, parent: Database, name: str):
self.lock = threading.Lock()
self.parent = parent self.parent = parent
self.name = name self.name = name
...@@ -18,7 +30,7 @@ cdef class TimeSeries: ...@@ -18,7 +30,7 @@ cdef class TimeSeries:
self.path = os.path.join(self.parent.path, self.name) self.path = os.path.join(self.parent.path, self.name)
cdef str metadata_s = read_in_file(os.path.join(self.path, 'metadata.txt'), cdef str metadata_s = read_in_file(os.path.join(self.path, METADATA_FILE_NAME),
'utf-8', 'invalid json') 'utf-8', 'invalid json')
cdef dict metadata cdef dict metadata
try: try:
...@@ -29,18 +41,100 @@ cdef class TimeSeries: ...@@ -29,18 +41,100 @@ cdef class TimeSeries:
cdef list files = os.path.listdir(self.path) cdef list files = os.path.listdir(self.path)
cdef set files_s = set(files) cdef set files_s = set(files)
files_s.remove('metadata.txt') files_s.remove('metadata.txt')
self.chunks = [] self.chunks = [] # type: tp.List[int] # sorted by ASC
cdef str chunk cdef str chunk
for chunk in files_s: for chunk in files:
try: try:
self.chunks.append(int(chunk)) self.chunks.append(int(chunk))
except ValueError: except ValueError:
raise Corruption('Detected invalid file "%s"' % (chunk, )) raise Corruption('Detected invalid file "%s"' % (chunk, ))
self.last_entry_ts = metadata['last_entry_ts'] self.chunks.sort()
self.block_size = metadata['block_size'] try:
self.last_entry_ts = metadata['last_entry_ts']
self.block_size = metadata['block_size']
self.max_entries_per_block = metadata['max_entries_per_block']
self.last_entry_synced = metadata['last_entry_synced']
self.interval_between_synces = metadata['interval_between_synces']
except KeyError:
raise Corruption('Could not read metadata item')
cpdef void sync(self): self.data_in_memory = []
self.open_chunks = {} # tp.Dict[int, Chunk]
self.last_synced = time.monotonic()
cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1:
"""
Mark the series as synced up to particular timestamp
:param timestamp: timestamp of the last synced entry
:type timestamp: int
"""
self.last_entry_synced = timestamp
self._sync_metadata()
return 0
cpdef int sync(self) except -1:
""" """
Synchronize the data kept in the memory with these kept on disk Synchronize the data kept in the memory with these kept on disk
""" """
cdef:
unsigned long long min_ts = self.data_in_memory[0][0]
str path = os.path.join(self.path, str(min_ts))
with self.lock:
self.last_synced = time.monotonic()
if not self.data_in_memory:
return 0
chunk = create_chunk(path, self.data_in_memory)
self.chunks.append(chunk.min_ts)
self.data_in_memory = []
self._sync_metadata()
return 0
cdef dict _get_metadata(self):
return {
'last_entry_ts': self.last_entry_ts,
'block_size': self.block_size,
'max_entries_per_block': self.max_entries_per_block,
'last_entry_synced': self.last_entry_synced,
'interval_between_synces': self.interval_between_synces
}
cpdef int _sync_metadata(self) except -1:
with open(os.path.join(self.path, METADATA_FILE_NAME), 'w') as f_out:
ujson.dump(self._get_metadata(), f_out)
return 0
cpdef int try_sync(self) except -1:
"""
Check if synchronization is necessary, and if so, perform it.
Prefer this to :meth:`~tempsdb.series.Series.sync`
"""
if len(self.data_in_memory) == self.max_entries_per_block or \
time.monotonic() - self.last_synced > self.interval_between_synces:
self.sync()
return 0
cpdef int put(self, unsigned long long timestamp, bytes data) except -1:
"""
Append an entry.
:param timestamp: timestamp, must be larger than current last_entry_ts
:type timestamp: int
:param data: data to write
:type data: bytes
:raises ValueError: Timestamp not larger than previous timestamp or invalid block size
"""
if len(data) != self.block_size:
raise ValueError('Invalid block size')
if timestamp <= self.last_entry_ts:
raise ValueError('Timestamp not larger than previous timestamp')
with self.lock:
self.data_in_memory.append((timestamp, data))
self.last_entry_ts = timestamp
if len(self.data_in_memory) >= self.max_entries_per_block:
self.sync()
return 0
import unittest import unittest
from tempsdb.chunks import create_chunk
class TestDB(unittest.TestCase): class TestDB(unittest.TestCase):
def test_chunk(self): def test_chunk(self):
from tempsdb.chunks import create_chunk
data = [(0, b'ala '), (1, b'ma '), (4, b'kota')] data = [(0, b'ala '), (1, b'ma '), (4, b'kota')]
chunk = create_chunk('chunk.db', data) chunk = create_chunk('chunk.db', data)
self.assertEqual(chunk.min_ts, 0) self.assertEqual(chunk.min_ts, 0)
......
FROM python:3.8
RUN pip install satella snakehouse nose2
ADD . /app
WORKDIR /app
CMD ["python", "setup.py", "test"]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment