satella.time package

Submodules

satella.time.backoff module

class satella.time.backoff.ExponentialBackoff(start=1, limit=30, sleep_fun=<built-in function sleep>, grace_amount=0)

Bases: object

A class that will sleep increasingly longer on errors. Meant to be used in such a way:

>>> eb = ExponentialBackoff(start=2, limit=30)
>>> while not connect():
>>>     eb.failed()
>>>     eb.sleep()
>>> eb.success()

Also a structure that will mark an object (eg. the Internet access) as inaccessible for some duration. Usage is that case is like this:

>>> eb = ExponentialBackoff(start=2, limit=30)
>>> eb.failed()
>>> self.assertFalse(eb.available)
>>> time.sleep(2)
>>> self.assertTrue(eb.available)

Note that this structure is thread safe only when a single object is doing the success or failed calls, and other utilize wait_until_available().

Parameters:
  • start (float) – value at which to start

  • limit (float) – maximum sleep timeout

  • sleep_fun (Callable[[float], None]) – function used to sleep. Will accept a single argument - number of seconds to wait

  • grace_amount (int) – amount of fails() that this will survive before everything fails

property available: bool

Was the status of the last call success?

condition
counter
failed()

Called when something fails.

Return type:

None

grace_amount
grace_counter
launch(exceptions_on_failed=<class 'Exception'>, immediate=False)

A decorator to simplify writing doing-something loops. Basically, this:

>>> eb = ExponentialBackoff(start=2.5, limit=30)
>>> @eb.launch(TypeError)
>>> def do_action(*args, **kwargs):
>>>     x_do_action(*args, **kwargs)
>>> do_action(5, test=True)

is equivalent to this:

>>> eb = ExponentialBackoff(start=2.5, limit=30)
>>> while True:
>>>     try:
>>>         x_do_action(5, test=True)
>>>     except TypeError:
>>>         eb.failed()
>>>         eb.sleep()

The first example with immediate=True could skip the last call to do_action, as it will be executed automatically with zero parameters if immediate=True is set.

Parameters:
  • exceptions_on_failed (Union[Type[Exception], Tuple[Type[Exception], ...]]) – a list of a single exception of exceptions whose raising will signal that fun has failed

  • immediate (bool) – immediately execute the function, but return the wrapped function as a result of this decorator. The function will be called with zero arguments.

Returns:

a function, that called, will pass the exactly same parameters

limit
property ready_for_next_check: bool
Returns:

Has failure() been called only in the last waiting period?

sleep()

Called when sleep is expected.

Return type:

None

sleep_fun
start
success()

Called when something successes.

Return type:

None

time_until_next_check()

Return the time until next health check, or None if the service is healthy

Return type:

Optional[float]

unavailable_until
wait_until_available(timeout=None)

Waits until the service is available

Parameters:

timeout (Optional[float]) – maximum amount of seconds to wait. If waited more than that, WouldWaitMore will be raised

Raises:

WouldWaitMore – waited for timeout and service still was not healthy

Return type:

None

satella.time.measure module

class satella.time.measure.measure(future_to_measure=None, stop_on_stop=True, adjust=0.0, time_getter_callable=<built-in function monotonic>, create_stopped=False, timeout=None)

Bases: object

A class used to measure time elapsed. Use for example like this:

>>> with measure() as measurement:
>>>     time.sleep(1)
>>>     print('This has taken so far', measurement(), 'seconds')
>>>     time.sleep(1)
>>> print('A total of ', measurement(), 'seconds have elapsed')

You can also use the .start() method instead of context manager. Time measurement will stop after exiting or calling .stop() depending on stop_on_stop flag.

Time elapsing starts after the counter is created. If you wish to start it manually, please specify create_stopped=True

When instantiated and called with no arguments, this will return the time elapsed:

>>> a = measure()
>>> time.sleep(0.5)
>>> assert a() >= 0.5

You can also decorate your functions to have them keep track time of their execution, like that:

>>> @measure()
>>> def measuring(measurement_object: measure, *args, **kwargs):
>>>     ...

This will also correctly work on methods, correctly inserting the measurement object after self/cls:

>>> class Test:
>>>     @measure()
>>>     def measuring(self, measurement_object: measure, arg1):
>>>         ...

You can also measure how long does executing a future take, eg.

>>> future = get_my_future()
>>> measurement = measure(future)
>>> future.result()
>>> print('Executing the future took', measurement(), 'seconds')

In case a future is passed, the measurement will stop automatically as soon as the future returns with a result (or exception).

Note that in order to reuse a single counter you must .reset() it first. Just calling .start() after .stop() will result in the timer being resumed instead!

Note that if you’re measuring generators, this will count the time passed from the measure() was called till the generator finishes.

Note that if you’re using the decorator form, this object will be first copied and then passed to the function/future. This is to prevent undefined behaviour during multithreading. Also, the timer that you pass to this function will be started/not started, depending on what you set earlier. .reset() will be called on a copy of this object.

This can also be used to write custom timeouts, eg.

>>> with measure(timeout=5) as m:
>>>     while not m.timeouted:
>>>         ... do something ...
>>>         if condition:
>>>             break
>>>     if m.timeouted:
>>>         raise WouldWaitMore('timeout hit')
Parameters:
  • stop_on_stop (bool) – stop elapsing time upon calling .stop()/exiting the context manager. If this is set to False then .start() and .stop() won’t work and calling them will raise a TypeError.

  • adjust (float) – interval to add to current time upon initialization

  • time_getter_callable (TimeSignal) – callable/0 -> float to get the time with

  • create_stopped (bool) – if this is set to True, you will manually need to call .start()

  • timeout (tp.Optional[float]) – a time limit, after exceeding which the property timeouted will be true

  • future_to_measure (tp.Optional[Future]) –

adjust(interval)

Add given value to internal started_at counter

Parameters:

interval (float) –

Return type:

None

assert_not_timeouted()

If the time elapsed exceeded timeout, throw WouldWaitMore.

Always returns if the timeout was not givne

Return type:

None

create_stopped
elapsed
get_time_elapsed()
Returns:

currently elapsed time

Return type:

float

has_exceeded(value)

Return whether the timer has exceeded provided value.

Deprecated since version 2.14.22.

Parameters:

value (float) –

Return type:

bool

raise_if_exceeded(value, exc_class=<class 'satella.exceptions.WouldWaitMore'>)

Raise provided exception, with no arguments, if timer has clocked more than provided value.

If no exc_class is provided, WouldWaitMore will be raised by default.

Deprecated since version 2.14.22.

Parameters:
  • value (float) –

  • exc_class (Type[Exception]) –

reset()

Reset the counter, enabling it to start counting after a .stop() call. This will put the counter in a STOPPED mode if it’s running already.

Return type:

None

reset_and_start()

Syntactic sugar for calling reset() and then start()

Return type:

None

reset_and_stop()

Syntactic sugar for calling stop() and then reset()

New in version 2.23.2.

Return type:

None

start()

Start measuring time or resume measuring it

Return type:

None

started_on
stop()

Stop counting time

Raises:

TypeError – stop_on_stop is enabled or the counter has already been stopped

Return type:

None

stop_on_stop
property stopped: bool

Whether this counter is already stopped

New in version 2.23.2.

stopped_on
time_getter_callable
property time_remaining: float
Returns:

the difference between provided timeout and elapsed time, or None if timeout was not given. This will never be negative.

timeout
property timeouted: bool
Returns:

Has the time elapsed exceeded timeout? Always False if timeout was not given

update()

Alias for .start()

Return type:

None

satella.time.misc module

satella.time.misc.sleep(y, abort_on_interrupt=False)

Sleep for given interval.

This won’t be interrupted by KeyboardInterrupted, and always will sleep for given time interval. This will return at once if x is negative

Parameters:
  • y (Union[str, float]) – the interval to wait in seconds, can be also a time string

  • abort_on_interrupt (bool) – whether to abort at once when KeyboardInterrupt is seen

Returns:

whether the function has completed its sleep naturally. False is seen on aborts thanks to KeyboardInterrupt only if abort_on_interrupt is True. False will be also seen on negative y.

Return type:

bool

satella.time.misc.time_as_int()

Syntactic sugar for

>>> from time import time
>>> int(time())
Return type:

int

satella.time.misc.time_ms()

Syntactic sugar for

>>> from time import time
>>> int(time()*1000)

This will try to use time.time_ns() if available

Return type:

int

satella.time.misc.time_us()

Syntactic sugar for

>>> from time import time
>>> int(time()*1000000)

This will try to use time.time_ns() if available

Return type:

int

satella.time.parse module

satella.time.parse.parse_time_string(s)

Parse a time string into seconds, so eg. ‘30m’ will be equal to 1800, and so will be ‘30 min’.

This will correctly parse: - seconds - minutes - hours - days - weeks

Warning

This does not handle fractions of a second!

Parameters:

s (Union[int, float, str]) – time string or time value in seconds

Returns:

value in seconds

Return type:

float

Module contents

class satella.time.ExponentialBackoff(start=1, limit=30, sleep_fun=<built-in function sleep>, grace_amount=0)

Bases: object

A class that will sleep increasingly longer on errors. Meant to be used in such a way:

>>> eb = ExponentialBackoff(start=2, limit=30)
>>> while not connect():
>>>     eb.failed()
>>>     eb.sleep()
>>> eb.success()

Also a structure that will mark an object (eg. the Internet access) as inaccessible for some duration. Usage is that case is like this:

>>> eb = ExponentialBackoff(start=2, limit=30)
>>> eb.failed()
>>> self.assertFalse(eb.available)
>>> time.sleep(2)
>>> self.assertTrue(eb.available)

Note that this structure is thread safe only when a single object is doing the success or failed calls, and other utilize wait_until_available().

Parameters:
  • start (float) – value at which to start

  • limit (float) – maximum sleep timeout

  • sleep_fun (Callable[[float], None]) – function used to sleep. Will accept a single argument - number of seconds to wait

  • grace_amount (int) – amount of fails() that this will survive before everything fails

property available: bool

Was the status of the last call success?

condition
counter
failed()

Called when something fails.

Return type:

None

grace_amount
grace_counter
launch(exceptions_on_failed=<class 'Exception'>, immediate=False)

A decorator to simplify writing doing-something loops. Basically, this:

>>> eb = ExponentialBackoff(start=2.5, limit=30)
>>> @eb.launch(TypeError)
>>> def do_action(*args, **kwargs):
>>>     x_do_action(*args, **kwargs)
>>> do_action(5, test=True)

is equivalent to this:

>>> eb = ExponentialBackoff(start=2.5, limit=30)
>>> while True:
>>>     try:
>>>         x_do_action(5, test=True)
>>>     except TypeError:
>>>         eb.failed()
>>>         eb.sleep()

The first example with immediate=True could skip the last call to do_action, as it will be executed automatically with zero parameters if immediate=True is set.

Parameters:
  • exceptions_on_failed (Union[Type[Exception], Tuple[Type[Exception], ...]]) – a list of a single exception of exceptions whose raising will signal that fun has failed

  • immediate (bool) – immediately execute the function, but return the wrapped function as a result of this decorator. The function will be called with zero arguments.

Returns:

a function, that called, will pass the exactly same parameters

limit
property ready_for_next_check: bool
Returns:

Has failure() been called only in the last waiting period?

sleep()

Called when sleep is expected.

Return type:

None

sleep_fun
start
success()

Called when something successes.

Return type:

None

time_until_next_check()

Return the time until next health check, or None if the service is healthy

Return type:

Optional[float]

unavailable_until
wait_until_available(timeout=None)

Waits until the service is available

Parameters:

timeout (Optional[float]) – maximum amount of seconds to wait. If waited more than that, WouldWaitMore will be raised

Raises:

WouldWaitMore – waited for timeout and service still was not healthy

Return type:

None

class satella.time.measure(future_to_measure=None, stop_on_stop=True, adjust=0.0, time_getter_callable=<built-in function monotonic>, create_stopped=False, timeout=None)

Bases: object

A class used to measure time elapsed. Use for example like this:

>>> with measure() as measurement:
>>>     time.sleep(1)
>>>     print('This has taken so far', measurement(), 'seconds')
>>>     time.sleep(1)
>>> print('A total of ', measurement(), 'seconds have elapsed')

You can also use the .start() method instead of context manager. Time measurement will stop after exiting or calling .stop() depending on stop_on_stop flag.

Time elapsing starts after the counter is created. If you wish to start it manually, please specify create_stopped=True

When instantiated and called with no arguments, this will return the time elapsed:

>>> a = measure()
>>> time.sleep(0.5)
>>> assert a() >= 0.5

You can also decorate your functions to have them keep track time of their execution, like that:

>>> @measure()
>>> def measuring(measurement_object: measure, *args, **kwargs):
>>>     ...

This will also correctly work on methods, correctly inserting the measurement object after self/cls:

>>> class Test:
>>>     @measure()
>>>     def measuring(self, measurement_object: measure, arg1):
>>>         ...

You can also measure how long does executing a future take, eg.

>>> future = get_my_future()
>>> measurement = measure(future)
>>> future.result()
>>> print('Executing the future took', measurement(), 'seconds')

In case a future is passed, the measurement will stop automatically as soon as the future returns with a result (or exception).

Note that in order to reuse a single counter you must .reset() it first. Just calling .start() after .stop() will result in the timer being resumed instead!

Note that if you’re measuring generators, this will count the time passed from the measure() was called till the generator finishes.

Note that if you’re using the decorator form, this object will be first copied and then passed to the function/future. This is to prevent undefined behaviour during multithreading. Also, the timer that you pass to this function will be started/not started, depending on what you set earlier. .reset() will be called on a copy of this object.

This can also be used to write custom timeouts, eg.

>>> with measure(timeout=5) as m:
>>>     while not m.timeouted:
>>>         ... do something ...
>>>         if condition:
>>>             break
>>>     if m.timeouted:
>>>         raise WouldWaitMore('timeout hit')
Parameters:
  • stop_on_stop (bool) – stop elapsing time upon calling .stop()/exiting the context manager. If this is set to False then .start() and .stop() won’t work and calling them will raise a TypeError.

  • adjust (float) – interval to add to current time upon initialization

  • time_getter_callable (TimeSignal) – callable/0 -> float to get the time with

  • create_stopped (bool) – if this is set to True, you will manually need to call .start()

  • timeout (tp.Optional[float]) – a time limit, after exceeding which the property timeouted will be true

  • future_to_measure (tp.Optional[Future]) –

adjust(interval)

Add given value to internal started_at counter

Parameters:

interval (float) –

Return type:

None

assert_not_timeouted()

If the time elapsed exceeded timeout, throw WouldWaitMore.

Always returns if the timeout was not givne

Return type:

None

create_stopped
elapsed
get_time_elapsed()
Returns:

currently elapsed time

Return type:

float

has_exceeded(value)

Return whether the timer has exceeded provided value.

Deprecated since version 2.14.22.

Parameters:

value (float) –

Return type:

bool

raise_if_exceeded(value, exc_class=<class 'satella.exceptions.WouldWaitMore'>)

Raise provided exception, with no arguments, if timer has clocked more than provided value.

If no exc_class is provided, WouldWaitMore will be raised by default.

Deprecated since version 2.14.22.

Parameters:
  • value (float) –

  • exc_class (Type[Exception]) –

reset()

Reset the counter, enabling it to start counting after a .stop() call. This will put the counter in a STOPPED mode if it’s running already.

Return type:

None

reset_and_start()

Syntactic sugar for calling reset() and then start()

Return type:

None

reset_and_stop()

Syntactic sugar for calling stop() and then reset()

New in version 2.23.2.

Return type:

None

start()

Start measuring time or resume measuring it

Return type:

None

started_on
stop()

Stop counting time

Raises:

TypeError – stop_on_stop is enabled or the counter has already been stopped

Return type:

None

stop_on_stop
property stopped: bool

Whether this counter is already stopped

New in version 2.23.2.

stopped_on
time_getter_callable
property time_remaining: float
Returns:

the difference between provided timeout and elapsed time, or None if timeout was not given. This will never be negative.

timeout
property timeouted: bool
Returns:

Has the time elapsed exceeded timeout? Always False if timeout was not given

update()

Alias for .start()

Return type:

None

satella.time.parse_time_string(s)

Parse a time string into seconds, so eg. ‘30m’ will be equal to 1800, and so will be ‘30 min’.

This will correctly parse: - seconds - minutes - hours - days - weeks

Warning

This does not handle fractions of a second!

Parameters:

s (Union[int, float, str]) – time string or time value in seconds

Returns:

value in seconds

Return type:

float

satella.time.sleep(y, abort_on_interrupt=False)

Sleep for given interval.

This won’t be interrupted by KeyboardInterrupted, and always will sleep for given time interval. This will return at once if x is negative

Parameters:
  • y (Union[str, float]) – the interval to wait in seconds, can be also a time string

  • abort_on_interrupt (bool) – whether to abort at once when KeyboardInterrupt is seen

Returns:

whether the function has completed its sleep naturally. False is seen on aborts thanks to KeyboardInterrupt only if abort_on_interrupt is True. False will be also seen on negative y.

Return type:

bool

satella.time.time_as_int()

Syntactic sugar for

>>> from time import time
>>> int(time())
Return type:

int

satella.time.time_ms()

Syntactic sugar for

>>> from time import time
>>> int(time()*1000)

This will try to use time.time_ns() if available

Return type:

int

satella.time.time_us()

Syntactic sugar for

>>> from time import time
>>> int(time()*1000000)

This will try to use time.time_ns() if available

Return type:

int