Skip to content
Snippets Groups Projects
Commit a4cfb281 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

added `IntervalTerminableThread`, 2.11.24

parent 74f9f2c1
No related branches found
Tags v2.11.24
No related merge requests found
# v2.11.24
* added `IntervalTerminableThread`
......@@ -32,6 +32,12 @@ It means that if it's hanging on I/O, for example, it won't be affected.
.. autoclass:: satella.coding.concurrent.TerminableThread
:members:
IntervalTerminableThread
------------------------
.. autoclass:: satella.coding.concurrent.IntervalTerminableThread
:members:
BogusTerminableThread
=====================
......
__version__ = '2.11.24_a1'
__version__ = '2.11.24'
......@@ -4,7 +4,7 @@ from .locked_dataset import LockedDataset
from .locked_structure import LockedStructure
from .monitor import MonitorList, Monitor, MonitorDict, RMonitor
from .thread import TerminableThread, Condition, SingleStartThread, call_in_separate_thread, \
BogusTerminableThread
BogusTerminableThread, IntervalTerminableThread
from .id_allocator import IDAllocator
from .timer import Timer
from .functions import parallel_execute, run_as_future
......@@ -14,4 +14,4 @@ __all__ = ['LockedDataset', 'Monitor', 'RMonitor', 'CallableGroup', 'TerminableT
'MonitorDict', 'MonitorList', 'Condition', 'LockedStructure', 'AtomicNumber',
'CallNoOftenThan', 'SingleStartThread', 'IDAllocator', 'call_in_separate_thread',
'BogusTerminableThread', 'Timer', 'parallel_execute', 'run_as_future',
'sync_threadpool']
'sync_threadpool', 'IntervalTerminableThread']
import ctypes
import platform
from abc import ABCMeta, abstractmethod
import threading
import time
import typing as tp
......@@ -296,3 +297,50 @@ class TerminableThread(threading.Thread):
raise RuntimeError('Multiple threads killed!')
return self
class IntervalTerminableThread(TerminableThread, metaclass=ABCMeta):
"""
A TerminableThread that calls .loop() once per x seconds.
If executing .loop() takes more than x seconds, on_overrun() will be called.
:param seconds: time that a single looping through should take. This will
include the time spent on calling .loop(), the rest of this time will
be spent safe_sleep()ing.
"""
def __init__(self, seconds: float, *args, **kwargs):
self.seconds = seconds
super().__init__(*args, **kwargs)
@abstractmethod
def loop(self) -> tp.Optional[bool]:
"""
Override me!
If True is returned, the thread will not sleep and .loop() will be executed
once more.
"""
def on_overrun(self, time_taken: float) -> None:
"""
Called when executing .loop() takes more than x seconds.
Called each cycle.
:param time_taken: how long did calling .loop() take
"""
def run(self):
self.prepare()
while not self._terminating:
with measure() as measurement:
v = self.loop()
if not v:
time_taken = measurement()
time_to_sleep = self.seconds - time_taken
if time_to_sleep < 0:
self.on_overrun(time_taken)
else:
self.safe_sleep(time_to_sleep)
self.cleanup()
......@@ -9,7 +9,7 @@ from concurrent.futures import Future, ThreadPoolExecutor
from satella.coding.concurrent import TerminableThread, CallableGroup, Condition, MonitorList, \
LockedStructure, AtomicNumber, Monitor, IDAllocator, call_in_separate_thread, Timer, \
parallel_execute, run_as_future, sync_threadpool
parallel_execute, run_as_future, sync_threadpool, IntervalTerminableThread
from satella.coding.sequences import unique
from satella.exceptions import WouldWaitMore, AlreadyAllocated
......@@ -271,6 +271,37 @@ class TestConcurrent(unittest.TestCase):
mtt.start()
mtt.terminate().join()
def test_interval_terminable_thread(self):
class MyTerminableThread(IntervalTerminableThread):
def __init__(self):
super().__init__(1)
self.a = 0
self.overrun = False
def prepare(self) -> None:
self.a = 1
def on_overrun(self, time_taken: float) -> None:
self.overrun = True
def loop(self) -> None:
if self.a == 3:
time.sleep(3)
self.a += 1
mtt = MyTerminableThread()
mtt.start()
a = mtt.a
time.sleep(0.3)
self.assertEqual(mtt.a, a)
self.assertFalse(mtt.overrun)
time.sleep(1.2)
self.assertEqual(mtt.a, a+1)
self.assertFalse(mtt.overrun)
time.sleep(4)
self.assertTrue(mtt.overrun)
mtt.terminate().join()
@unittest.skipIf(platform.python_implementation() != 'PyPy', 'this requires PyPy')
def test_terminable_thread_force_notimplementederror(self):
class MyTerminableThread(TerminableThread):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment