Skip to content
Snippets Groups Projects
Commit afc78609 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

added correct reading

parent 19766cf5
No related branches found
No related tags found
No related merge requests found
......@@ -20,7 +20,7 @@ def find_pyx(*path) -> tp.List[str]:
setup(name='tempsdb',
version='0.1_a3',
version='0.1_a4',
packages=['tempsdb'],
install_requires=['satella>=2.14.21', 'ujson'],
ext_modules=build([Multibuild('tempsdb', find_pyx('tempsdb')), ],
......
......@@ -23,7 +23,7 @@ cdef class Chunk:
readonly bint writable
object write_lock
cpdef object iterate_range(self, unsigned long starting_entry, unsigned long stopping_entry)
cpdef object iterate_indices(self, unsigned long starting_entry, unsigned long stopping_entry)
cpdef void close(self)
cpdef tuple get_piece_at(self, unsigned int index)
cpdef int append(self, unsigned long long timestamp, bytes data) except -1
......@@ -49,4 +49,5 @@ cdef class Chunk:
cdef unsigned long long get_timestamp_at(self, unsigned int index)
cpdef Chunk create_chunk(TimeSeries parent, str path, list data, int page_size)
cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timestamp,
bytes data, int page_size)
......@@ -11,6 +11,7 @@ DEF TIMESTAMP_SIZE = 8
DEF FOOTER_SIZE = 4
STRUCT_Q = struct.Struct('<Q')
STRUCT_L = struct.Struct('<L')
STRUCT_LQ = struct.Struct('<LQ')
cdef class Chunk:
......@@ -44,23 +45,23 @@ cdef class Chunk:
self.file = open(self.path, 'rb+' if self.writable else 'rb')
try:
if self.writable:
self.mmap = mmap.mmap(self.file.fileno(), self.file_size)
self.mmap = mmap.mmap(self.file.fileno(), 0)
else:
self.mmap = mmap.mmap(self.file.fileno(), self.file_size, access=mmap.ACCESS_READ)
self.mmap = mmap.mmap(self.file.fileno(), 0, access=mmap.ACCESS_READ)
except OSError as e:
self.file.close()
self.closed = True
raise Corruption(f'Empty chunk file!')
try:
self.block_size, = STRUCT_L.unpack(self.mmap[:HEADER_SIZE])
self.block_size, self.min_ts = STRUCT_LQ.unpack(self.mmap[0:HEADER_SIZE+TIMESTAMP_SIZE])
self.block_size_plus = self.block_size + TIMESTAMP_SIZE
except struct.error:
self.close()
raise Corruption('Could not read the header of the chunk file %s' % (self.path, ))
self.entries, = STRUCT_L.unpack(self.mmap[self.file_size-FOOTER_SIZE:self.file_size])
self.pointer = self.entries*self.block_size_plus+HEADER_SIZE
self.max_ts = self.get_timestamp_at(self.entries-1)
self.min_ts = self.get_timestamp_at(0)
cpdef unsigned int find_left(self, unsigned long long timestamp):
"""
......@@ -161,7 +162,9 @@ cdef class Chunk:
if self.closed or not self.writable:
raise InvalidState('chunk is closed')
if len(data) != self.block_size:
raise ValueError('data not equal in length to block size!')
raise ValueError('data (%s) not equal in length to block size (%s)!' % (
len(data), self.block_size
))
if timestamp <= self.max_ts:
raise ValueError('invalid timestamp')
......@@ -180,13 +183,13 @@ cdef class Chunk:
self.pointer += self.block_size_plus
return 0
cpdef object iterate_range(self, unsigned long starting_entry, unsigned long stopping_entry):
cpdef object iterate_indices(self, unsigned long starting_entry, unsigned long stopping_entry):
"""
Return a partial iterator starting at starting_entry and ending at stopping_entry (exclusive)
Return a partial iterator starting at starting_entry and ending at stopping_entry (exclusive).
:param starting_entry: number of starting entry
:param starting_entry: index of starting entry
:type starting_entry: int
:param stopping_entry: number of stopping entry
:param stopping_entry: index of stopping entry
:type stopping_entry:
:return: an iterator
:rtype: tp.Iterator[tp.Tuple[int, bytes]]
......@@ -239,7 +242,8 @@ cdef class Chunk:
return ts, self.mmap[starting_index+TIMESTAMP_SIZE:stopping_index]
cpdef Chunk create_chunk(TimeSeries parent, str path, list data, int page_size):
cpdef Chunk create_chunk(TimeSeries parent, str path, unsigned long long timestamp,
bytes data, int page_size):
"""
Creates a new chunk on disk
......@@ -247,9 +251,10 @@ cpdef Chunk create_chunk(TimeSeries parent, str path, list data, int page_size):
:type parent: TimeSeries
:param path: path to the new chunk file
:type path: str
:param data: data to write, list of tuple (timestamp, entry to write).
Must be nonempty and sorted by timestamp.
:type data: tp.List[tp.Tuple[int, bytes]]
:param timestamp: timestamp for first entry to contain
:type timestamp: int
:param data: data of the first entry
:type data: bytes
:param page_size: size of a single page for storage
:type page_size: int
:raises ValueError: entries in data were not of equal size, or data was empty or data
......@@ -264,28 +269,14 @@ cpdef Chunk create_chunk(TimeSeries parent, str path, list data, int page_size):
cdef:
bytes b
unsigned long long ts
unsigned long block_size = len(data[0][1])
unsigned long block_size = len(data)
unsigned long file_size = 0
unsigned long long last_ts = 0
unsigned int entries = len(data)
unsigned int entries = 1
bint first_element = True
file_size += file.write(STRUCT_L.pack(block_size))
try:
for ts, b in data:
if not first_element:
if ts <= last_ts:
raise ValueError('Timestamp appeared twice or data was not sorted')
if len(b) != block_size:
raise ValueError('Block size has entries of not equal length')
file_size += file.write(STRUCT_Q.pack(ts))
file_size += file.write(b)
last_ts = ts
first_element = False
except ValueError:
file.close()
os.unlink(path)
raise
file_size += file.write(STRUCT_Q.pack(timestamp))
file_size += file.write(data)
# Pad this thing to page_size
cdef unsigned long bytes_to_pad = page_size - (file_size % page_size)
......@@ -293,9 +284,8 @@ cpdef Chunk create_chunk(TimeSeries parent, str path, list data, int page_size):
# Create a footer at the end
cdef bytearray footer = bytearray(page_size)
footer[-4:] = STRUCT_L.pack(entries)
footer[-4:] = b'\x01\x00\x00\x00' # 1 in little endian
file.write(footer)
file.close()
print('Finished creating chunk')
return Chunk(parent, path, page_size)
......@@ -27,6 +27,9 @@ cdef class TimeSeries:
cpdef int append(self, unsigned long long timestamp, bytes data) except -1
cpdef int sync(self) except -1
cpdef int close_chunks(self) except -1
cpdef object iterate_range(self, unsigned long long start, unsigned long long stop)
cpdef unsigned int get_index_of_chunk_for(self, unsigned long long timestamp)
cpdef TimeSeries create_series(str path, unsigned int block_size,
int max_entries_per_chunk, int page_size=*)
import itertools
import shutil
import threading
import time
......@@ -11,6 +12,10 @@ import os
DEF METADATA_FILE_NAME = 'metadata.txt'
cdef class TimeSeries:
"""
This is thread-safe
......@@ -39,9 +44,16 @@ cdef class TimeSeries:
set files_s = set(files)
str chunk
try:
metadata = ujson.loads(metadata_s)
metadata = ujson.loads(metadata_s) # raises ValueError
# raises KeyError
self.block_size = metadata['block_size']
self.max_entries_per_chunk = metadata['max_entries_per_chunk']
self.last_entry_synced = metadata['last_entry_synced']
self.page_size = metadata['page_size']
except ValueError:
raise Corruption('Corrupted series')
except KeyError:
raise Corruption('Could not read metadata item')
self.open_chunks = {} # tp.Dict[int, Chunk]
......@@ -57,16 +69,7 @@ cdef class TimeSeries:
self.chunks.append(int(chunk))
except ValueError:
raise Corruption('Detected invalid file "%s"' % (chunk, ))
self.chunks.sort()
try:
self.block_size = metadata['block_size']
self.max_entries_per_chunk = metadata['max_entries_per_chunk']
self.last_entry_synced = metadata['last_entry_synced']
self.page_size = metadata['page_size']
except KeyError:
raise Corruption('Could not read metadata item')
self.last_chunk = Chunk(self, os.path.join(self.path, str(max(self.chunks))))
self.open_chunks[self.last_chunk.min_ts] = self.last_chunk
self.last_entry_ts = self.last_chunk.max_ts
......@@ -88,7 +91,9 @@ cdef class TimeSeries:
raise DoesNotExist('Invalid chunk!')
with self.open_lock:
if name not in self.open_chunks:
self.open_chunks[name] = Chunk(self, os.path.join(self.path, str(name)))
self.open_chunks[name] = Chunk(self,
os.path.join(self.path, str(name)),
self.page_size)
return self.open_chunks[name]
cpdef void close(self):
......@@ -107,6 +112,79 @@ cdef class TimeSeries:
self.mpm = None
self.closed = True
cpdef unsigned int get_index_of_chunk_for(self, unsigned long long timestamp):
"""
Return the index of chunk that should have given timestamp
:param timestamp: timestamp to check, larger than first timestamp,
smaller or equal to current timestamp
:type timestamp: int
:return: name of the starting chunk
:rtype: int
"""
cdef:
unsigned int lo = 0
unsigned int hi = len(self.chunks)
unsigned int mid
while lo < hi:
mid = (lo+hi)//2
if self.chunks[mid] < timestamp:
lo = mid+1
else:
hi = mid
try:
if self.chunks[lo] == timestamp:
return lo
else:
return lo-1
except IndexError:
return len(self.chunks)-1
cpdef object iterate_range(self, unsigned long long start, unsigned long long stop):
"""
Return an iterator through collected data with given timestamps.
:param start: timestamp to start at
:type start: int
:param stop: timestamp to stop at
:type stop: int
:return: an iterator with the data
:rtype: tp.Iterator[tp.Tuple[int, bytes]]
:raises ValueError: start larger than stop
"""
if self.last_chunk is None:
return iter([])
if start > stop:
raise ValueError('start larger than stop')
if start < self.chunks[0]:
start = self.chunks[0]
if stop > self.last_entry_ts:
stop = self.last_entry_ts
cdef:
unsigned int ch_start = self.get_index_of_chunk_for(start)
unsigned int ch_stop = self.get_index_of_chunk_for(stop)
list iterator = []
bint is_first
bint is_last
unsigned int chunk_index
Chunk chunk
for chunk_index in range(ch_start, ch_stop+1):
chunk = self.open_chunk(self.chunks[chunk_index])
is_first = chunk_index == ch_start
is_last = chunk_index == ch_stop
if is_first and is_last:
return chunk.iterate_indices(chunk.find_left(start), chunk.find_right(stop))
elif is_first:
iterator.append(chunk.iterate_indices(chunk.find_left(start), chunk.entries))
elif is_last:
iterator.append(chunk.iterate_indices(0, chunk.find_right(stop)))
else:
iterator.append(chunk.iterate_indices(0, chunk.entries))
return itertools.chain(*iterator)
cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1:
"""
Mark the series as synced up to particular timestamp
......@@ -188,20 +266,18 @@ cdef class TimeSeries:
if self.closed:
raise InvalidState('series is closed')
if len(data) != self.block_size:
raise ValueError('Invalid block size')
raise ValueError('Invalid block size, was %s should be %s' % (len(data), self.block_size))
if timestamp <= self.last_entry_ts:
raise ValueError('Timestamp not larger than previous timestamp')
with self.lock:
if self.last_chunk is None:
with self.lock, self.open_lock:
# If this is indeed our first chunk, or we've exceeded the limit of entries per chunk
if self.last_chunk is None or self.last_chunk.length() >= self.max_entries_per_chunk:
# Create a next chunk
self.last_chunk = create_chunk(self, os.path.join(self.path, str(timestamp)),
[(timestamp, data)], self.page_size)
timestamp, data, self.page_size)
self.open_chunks[timestamp] = self.last_chunk
self.chunks.append(timestamp)
elif self.last_chunk.length() >= self.max_entries_per_chunk:
self.last_chunk = create_chunk(self, os.path.join(self.path, str(timestamp)),
[(timestamp, data)], self.page_size)
self.chunks.append(timestamp)
else:
self.last_chunk.append(timestamp, data)
self.last_entry_ts = timestamp
......
......@@ -6,12 +6,32 @@ class TestDB(unittest.TestCase):
def test_create_series(self):
from tempsdb.series import create_series
series = create_series('test', 8, 10)
series = create_series('test', 1, 10)
start, ts = 127, 100
for i in range(20):
series.append(ts, bytes(bytearray([start])))
start -= 1
ts += 100
self.do_verify_series(series, 0, 2000)
self.do_verify_series(series, 500, 2000)
self.do_verify_series(series, 1000, 2000)
self.do_verify_series(series, 1500, 2000)
self.do_verify_series(series, 0, 500)
self.do_verify_series(series, 0, 1200)
self.do_verify_series(series, 0, 1800)
def do_verify_series(self, series, start, stop):
items = list(series.iterate_range(start, stop))
self.assertGreaterEqual(items[0][0], start)
self.assertLessEqual(items[-1][0], stop)
def test_chunk(self):
from tempsdb.chunks import create_chunk
data = [(0, b'ala '), (1, b'ma '), (4, b'kota')]
chunk = create_chunk(None, 'chunk.db', data, 4096)
chunk = create_chunk(None, 'chunk.db', 0, b'ala ', 4096)
chunk.append(1, b'ma ')
chunk.append(4, b'kota')
self.assertEqual(chunk.min_ts, 0)
self.assertEqual(chunk.max_ts, 4)
self.assertEqual(chunk.block_size, 4)
......
......@@ -2,8 +2,12 @@ FROM python:3.8
RUN pip install satella snakehouse nose2 wheel ujson
ADD . /app
ADD tempsdb /app/tempsdb
ADD setup.py /app/setup.py
ADD setup.cfg /app/setup.cfg
WORKDIR /app
RUN python setup.py build_ext --inplace
ADD tests /app/tests
CMD ["nose2", "-vv"]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment