Skip to content
Snippets Groups Projects
Commit f92ec3c6 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

* added new things to `ExponentialBackoff`

* fixed longer first waits in `ExponentialBackoff`
parent 34b23d0e
No related branches found
No related tags found
No related merge requests found
__version__ = '2.17.7a2'
__version__ = '2.17.7a3'
......@@ -4,13 +4,11 @@ import threading
import time
import typing as tp
import warnings
import weakref
from abc import ABCMeta, abstractmethod
from concurrent.futures import Future
from threading import Condition as PythonCondition
from satella.coding.decorators import wraps
from satella.time import measure, parse_time_string
from ..typing import ExceptionList
from ...exceptions import ResourceLocked, WouldWaitMore
......@@ -102,6 +100,8 @@ class Condition(PythonCondition):
:raises ResourceLocked: unable to acquire the underlying lock within specified timeout.
:raises WouldWaitMore: wait's timeout has expired
"""
from satella.time import measure, parse_time_string
if timeout is not None:
timeout = parse_time_string(timeout)
if timeout < 0:
......@@ -368,6 +368,8 @@ class TerminableThread(threading.Thread):
:raises WouldWaitMore: timeout has passed and Condition has not happened
:raises SystemExit: thread is terminating
"""
from satella.time import parse_time_string
timeout = parse_time_string(timeout)
wake_up_each = parse_time_string(wake_up_each)
t = 0
......@@ -456,6 +458,8 @@ class IntervalTerminableThread(TerminableThread, metaclass=ABCMeta):
"""
def __init__(self, seconds: tp.Union[str, float], *args, **kwargs):
from satella.time import parse_time_string
self.seconds = parse_time_string(seconds)
super().__init__(*args, **kwargs)
......
......@@ -9,6 +9,7 @@ from functools import wraps # import from functools to prevent circular import
__all__ = ['measure', 'time_as_int', 'time_ms', 'sleep', 'time_us', 'ExponentialBackoff',
'parse_time_string']
from satella.coding.concurrent.thread import Condition
from satella.exceptions import WouldWaitMore
TimeSignal = tp.Callable[[], float]
......@@ -353,14 +354,16 @@ class ExponentialBackoff:
>>> time.sleep(2)
>>> self.assertTrue(eb.available)
Note that this structure is not thread safe.
Note that this structure is thread safe only when a single object is doing
the :code:`success` or :code:`failed` calls, and other utilize
:meth:`~satella.time.ExponentialBackoff.wait_until_available`.
:param start: value at which to start
:param limit: maximum sleep timeout
:param sleep_fun: function used to sleep. Will accept a single argument - number of
seconds to wait
"""
__slots__ = 'start', 'limit', 'counter', 'sleep_fun', 'unavailable_until'
__slots__ = 'start', 'limit', 'counter', 'sleep_fun', 'unavailable_until', 'condition'
def __init__(self, start: float = 1, limit: float = 30,
sleep_fun: tp.Callable[[float], None] = sleep):
......@@ -368,6 +371,7 @@ class ExponentialBackoff:
self.limit = limit
self.counter = start
self.sleep_fun = sleep_fun
self.condition = Condition()
self.unavailable_until = None
def sleep(self):
......@@ -386,8 +390,7 @@ class ExponentialBackoff:
self.counter = min(self.limit, self.counter * 2)
self.unavailable_until = time.monotonic() + self.counter
def wait_until_available(self, timeout: tp.Optional[float] = None,
sleep_function: tp.Callable[[float], None] = time.sleep) -> None:
def wait_until_available(self, timeout: tp.Optional[float] = None) -> None:
"""
Waits until the service is available
......@@ -402,7 +405,7 @@ class ExponentialBackoff:
tn = self.time_until_next_check()
if tn is None:
return
sleep_function(tn)
self.condition.wait(timeout=tn, dont_raise=True)
raise WouldWaitMore('timeouted while waiting for service to become healthy')
def time_until_next_check(self) -> tp.Optional[float]:
......@@ -432,6 +435,7 @@ class ExponentialBackoff:
"""
self.counter = 0
self.unavailable_until = None
self.condition.notify_all()
TIME_MODIFIERS = [
......
......@@ -5,6 +5,7 @@ import multiprocessing
import os
import sys
from satella.coding.concurrent import call_in_separate_thread
from satella.time import measure, time_as_int, time_ms, sleep, ExponentialBackoff, \
parse_time_string
from concurrent.futures import Future
......@@ -12,6 +13,20 @@ from concurrent.futures import Future
class TestTime(unittest.TestCase):
def test_exponential_backoff_earlier_wakeup(self):
eb = ExponentialBackoff(start=5, limit=30)
@call_in_separate_thread()
def do_test():
with measure() as m:
eb.wait_until_available(2.5)
self.assertTrue(eb.available)
eb.failed()
do_test()
time.sleep(1)
eb.success()
def test_exponential_backoff_waiting_for_service_healthy(self):
eb = ExponentialBackoff(start=2, limit=30)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment