Skip to content
Snippets Groups Projects
Commit fb0c80d1 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

refactored for iterator

parent afc78609
No related branches found
No related tags found
No related merge requests found
from .chunks cimport Chunk
from .series cimport TimeSeries
cdef class Iterator:
cdef:
unsigned long long start
unsigned long long stop
object chunks # type: collections.deque
bint is_first, is_last
TimeSeries parent
unsigned int i, limit
bint closed
Chunk current_chunk
cpdef void close(self)
cdef int get_next(self) except -1
cpdef tuple next(self)
import typing as tp
from .chunks cimport Chunk
from .series cimport TimeSeries
import collections
cdef class Iterator:
"""
Iterator that allows iterating through result.
At most basic this implements an iterator interface, iterating over
tp.Tuple[int, bytes] - timestamp and data
When you're done call :meth:`~tempsdb.iterators.Iterator.close` to release the resources.
"""
def __init__(self, parent: TimeSeries, start: int, stop: int, chunks: tp.List[Chunk]):
self.start = start
self.stop = stop
self.current_chunk = None
self.chunks = collections.deque(chunks)
self.parent = parent
self.i = 0
self.limit = 0
self.closed = False
self.is_first = False
self.is_last = False
def __del__(self):
self.close()
cpdef void close(self):
"""
Close this iterator, release chunks
"""
if self.closed:
return
self.closed = True
cdef Chunk chunk
for chunk in self.chunks:
self.parent.done_chunk(chunk.name())
self.chunks = None
cdef int get_next(self) except -1:
"""
Fetch next chunk, set i, is_first, is_last and limit appropriately
"""
if self.current_chunk is not None:
self.parent.done_chunk(self.current_chunk.name())
self.is_first = False
else:
self.is_first = True
try:
self.current_chunk = self.chunks.popleft()
except IndexError:
raise StopIteration()
self.is_last = not self.chunks
if self.is_last and self.is_first:
self.i = self.current_chunk.find_left(self.start)
self.limit = self.current_chunk.find_right(self.stop)
elif self.is_first:
self.i = self.current_chunk.find_left(self.start)
self.limit = self.current_chunk.length()
elif self.is_last:
self.i = 0
self.limit = self.current_chunk.find_right(self.stop)
else:
self.i = 0
self.limit = self.current_chunk.length()
return 0
def __next__(self) -> tp.Tuple[int, bytes]:
return self.next()
cpdef tuple next(self):
"""
Return next element
:return: next element
:rtype: tp.Tuple[int, bytes]
"""
if self.current_chunk is None:
self.get_next()
if self.i == self.limit:
self.get_next()
try:
return self.current_chunk.get_piece_at(self.i)
finally:
self.i += 1
def __iter__(self) -> Iterator:
return self
from .chunks cimport Chunk
from .iterators cimport Iterator
cdef class TimeSeries:
......@@ -13,6 +14,7 @@ cdef class TimeSeries:
readonly unsigned long long last_entry_ts
unsigned int page_size
list chunks
dict refs_chunks # tp.Dict[int, int]
dict open_chunks
list data_in_memory
Chunk last_chunk
......@@ -22,14 +24,14 @@ cdef class TimeSeries:
cpdef int delete(self) except -1
cdef dict _get_metadata(self)
cpdef void close(self)
cpdef void done_chunk(self, unsigned long long name)
cpdef Chunk open_chunk(self, unsigned long long name)
cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1
cpdef int append(self, unsigned long long timestamp, bytes data) except -1
cpdef int sync(self) except -1
cpdef int close_chunks(self) except -1
cpdef object iterate_range(self, unsigned long long start, unsigned long long stop)
cpdef Iterator iterate_range(self, unsigned long long start, unsigned long long stop)
cpdef unsigned int get_index_of_chunk_for(self, unsigned long long timestamp)
cpdef TimeSeries create_series(str path, unsigned int block_size,
int max_entries_per_chunk, int page_size=*)
......@@ -29,6 +29,7 @@ cdef class TimeSeries:
self.mpm = None
self.lock = threading.Lock()
self.open_lock = threading.Lock()
self.refs_chunks = {}
self.closed = False
self.path = path
......@@ -74,9 +75,19 @@ cdef class TimeSeries:
self.open_chunks[self.last_chunk.min_ts] = self.last_chunk
self.last_entry_ts = self.last_chunk.max_ts
cpdef void done_chunk(self, unsigned long long name):
"""
Signal that we are done with given chunk and that it can be freed.
Releases the reference to a chunk.
"""
self.refs_chunks[name] -= 1
cpdef Chunk open_chunk(self, unsigned long long name):
"""
Opens a provided chunk
Opens a provided chunk.
Acquires a reference to the chunk.
:param name: name of the chunk
:type name: int
......@@ -94,6 +105,10 @@ cdef class TimeSeries:
self.open_chunks[name] = Chunk(self,
os.path.join(self.path, str(name)),
self.page_size)
if name not in self.refs_chunks:
self.refs_chunks[name] = 1
else:
self.refs_chunks[name] += 1
return self.open_chunks[name]
cpdef void close(self):
......@@ -141,7 +156,7 @@ cdef class TimeSeries:
except IndexError:
return len(self.chunks)-1
cpdef object iterate_range(self, unsigned long long start, unsigned long long stop):
cpdef Iterator iterate_range(self, unsigned long long start, unsigned long long stop):
"""
Return an iterator through collected data with given timestamps.
......@@ -165,25 +180,15 @@ cdef class TimeSeries:
cdef:
unsigned int ch_start = self.get_index_of_chunk_for(start)
unsigned int ch_stop = self.get_index_of_chunk_for(stop)
list iterator = []
list chunks = []
bint is_first
bint is_last
unsigned int chunk_index
Chunk chunk
for chunk_index in range(ch_start, ch_stop+1):
chunk = self.open_chunk(self.chunks[chunk_index])
is_first = chunk_index == ch_start
is_last = chunk_index == ch_stop
if is_first and is_last:
return chunk.iterate_indices(chunk.find_left(start), chunk.find_right(stop))
elif is_first:
iterator.append(chunk.iterate_indices(chunk.find_left(start), chunk.entries))
elif is_last:
iterator.append(chunk.iterate_indices(0, chunk.find_right(stop)))
else:
iterator.append(chunk.iterate_indices(0, chunk.entries))
return itertools.chain(*iterator)
chunks.append(self.open_chunk(self.chunks[chunk_index]))
return Iterator(self, start, stop, chunks)
cpdef int mark_synced_up_to(self, unsigned long long timestamp) except -1:
"""
......@@ -247,9 +252,10 @@ cdef class TimeSeries:
for chunk_name in chunks:
if chunk_name != last_chunk_name:
continue
else:
elif not self.refs_chunks[chunk_name]:
self.open_chunks[chunk_name].close()
del self.open_chunks[chunk_name]
del self.refs_chunks[chunk_name]
return 0
cpdef int append(self, unsigned long long timestamp, bytes data) except -1:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment