From 6e409cf2f3e07c14368e68d6e2e71e35d359f15b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Mon, 14 Dec 2020 20:16:15 +0100 Subject: [PATCH] fix getting and creating series --- tempsdb/database.pyx | 91 ++++++++++++++++++++++++-------------------- tempsdb/series.pyx | 2 +- tests/test.sh | 1 + 3 files changed, 51 insertions(+), 43 deletions(-) diff --git a/tempsdb/database.pyx b/tempsdb/database.pyx index 1689a24..d79343a 100644 --- a/tempsdb/database.pyx +++ b/tempsdb/database.pyx @@ -31,7 +31,7 @@ cdef class Database: self.closed = False self.open_series = {} self.open_varlen_series = {} - self.lock = threading.Lock() + self.lock = threading.RLock() self.mpm = None cpdef list get_open_series(self): @@ -87,10 +87,11 @@ cdef class Database: :raises DoesNotExist: series does not exist :raises StillOpen: series is open """ - cdef str path = os.path.join(self.path, 'varlen', name) + cdef: + str path = os.path.join(self.path, 'varlen', name) + VarlenSeries series if not os.path.exists(path): raise DoesNotExist('series does not exist') - cdef VarlenSeries series with self.lock: if name in self.open_varlen_series: series = self.open_varlen_series[name] @@ -113,17 +114,14 @@ cdef class Database: cdef: TimeSeries result str path - if name in self.open_series: - result = self.open_series[name] - else: - path = os.path.join(self.path, name) - with self.lock: - # Check a second time due to the lock - if name in self.open_series: - if self.open_series[name].closed: - del self.open_series[name] - return self.get_series(name) - return self.open_series[name] + with self.lock: + if name in self.open_series: + result = self.open_series[name] + if result.closed: + del self.open_series[name] + return self.get_series(name) + else: + path = os.path.join(self.path, name) if not os.path.isdir(path): raise DoesNotExist('series %s does not exist' % (name, )) self.open_series[name] = result = TimeSeries(path, name, @@ -209,14 +207,18 @@ cdef class Database: :return: new variable length series :raises AlreadyExists: series with given name already exists """ - if os.path.isdir(os.path.join(self.path, 'varlen', name)): - raise AlreadyExists('Series already exists') - cdef VarlenSeries series = create_varlen_series(os.path.join(self.path, name), name, - size_struct, - length_profile, - entries_per_chunk, - gzip_level=gzip_level) - self.open_varlen_series[name] = series + cdef: + VarlenSeries series + str path = os.path.join(self.path, 'varlen', name) + with self.lock: + if os.path.isdir(path): + raise AlreadyExists('Series already exists') + series = create_varlen_series(path, name, + size_struct, + length_profile, + entries_per_chunk, + gzip_level=gzip_level) + self.open_varlen_series[name] = series return series @@ -229,17 +231,17 @@ cdef class Database: :return: a loaded varlen series :raises DoesNotExist: series does not exist """ - if name in self.open_varlen_series: - result = self.open_varlen_series[name] - else: - path = os.path.join(self.path, 'varlen', name) - with self.lock: - # Check a second time due to the lock - if name in self.open_varlen_series: - if self.open_varlen_series[name].closed: - del self.open_varlen_series[name] - return self.get_varlen_series(name) - return self.open_varlen_series[name] + cdef: + VarlenSeries result + str path + with self.lock: + if name in self.open_varlen_series: + result = self.open_varlen_series[name] + if result.closed: + del self.open_varlen_series[name] + result = self.get_varlen_series(name) + else: + path = os.path.join(self.path, 'varlen', name) if not os.path.isdir(path): raise DoesNotExist('series %s does not exist' % (name, )) self.open_varlen_series[name] = result = VarlenSeries(path, name) @@ -275,14 +277,14 @@ cdef class Database: raise ValueError('Series cannot be named varlen') if os.path.isdir(os.path.join(self.path, name)): raise AlreadyExists('Series already exists') - if gzip_level: - warnings.warn('Gzip support is experimental') - cdef TimeSeries series = create_series(os.path.join(self.path, name), name, - block_size, - entries_per_chunk, page_size=page_size, - use_descriptor_based_access=use_descriptor_based_access, - gzip_level=gzip_level) - self.open_series[name] = series + cdef TimeSeries series + with self.lock: + series = create_series(os.path.join(self.path, name), name, + block_size, + entries_per_chunk, page_size=page_size, + use_descriptor_based_access=use_descriptor_based_access, + gzip_level=gzip_level) + self.open_series[name] = series return series cpdef int register_memory_pressure_manager(self, object mpm) except -1: @@ -312,11 +314,16 @@ cdef class Database: """ if self.closed: return 0 - cdef TimeSeries series + cdef: + TimeSeries series + VarlenSeries var_series with self.lock: for series in self.open_series.values(): series.close() # because already closed series won't close themselves self.open_series = {} + for var_series in self.open_varlen_series.values(): + var_series.close() + self.open_varlen_series = {} self.closed = True return 0 diff --git a/tempsdb/series.pyx b/tempsdb/series.pyx index c1966ff..bb8c84d 100644 --- a/tempsdb/series.pyx +++ b/tempsdb/series.pyx @@ -152,7 +152,7 @@ cdef class TimeSeries: filename = filename.replace('.direct', '') is_direct |= is_gzip try: - self.chunks.append(int(filename)) + self.chunks.append((int(filename), is_direct, is_gzip)) except ValueError: raise Corruption('Detected invalid file "%s"' % (filename, )) self.chunks.sort() diff --git a/tests/test.sh b/tests/test.sh index f6ded54..dd44a5f 100644 --- a/tests/test.sh +++ b/tests/test.sh @@ -1,3 +1,4 @@ #!/bin/bash +set -e python -m coverage run -m nose2 -vv python -m coverage report -- GitLab