satella.coding.concurrent package

Subpackages

Submodules

satella.coding.concurrent.atomic module

class satella.coding.concurrent.atomic.AtomicNumber(v=0)

Bases: Monitor

An atomic number. Note that the class is not hashable and for a reason, since it’s value might change in time. So in this case this is more of like a container for numbers.

Treat it like a normal number, except all operations are executed atomically.

You can also wait for it to change it’s value, via wait().

You change it’s value in the following way:

>>> a = AtomicNumber()
>>> a += 2

Note that if the number if used in an expression, such as

>>> b = a + 2

Then a normal number will be returned

You need to invoke this at your constructor You can also use it to release locks of other objects.

Parameters:

v (Union[int, float]) –

condition
value
wait(timeout=None, throw_exception=True)

Block until the atomic number changes it’s value.

Parameters:
  • timeout (Optional[float]) – maximum time to wait. None means wait indefinitely

  • throw_exception (bool) – whether to throw WouldWaitMore on timeout

Raises:

WouldWaitMore – the value hasn’t changed within the timeout

wait_until_equal(v, timeout=None)

Wait until the value of this number equals v.

Parameters:
  • v (Union[int, float]) – value to compare this number against

  • timeout (Optional[float]) – maximum time to wait

Raises:

WouldWaitMore – timeout expired without the value becoming equal to target

Return type:

None

satella.coding.concurrent.callablegroup module

class satella.coding.concurrent.callablegroup.CallNoOftenThan(interval, callable_)

Bases: object

A class that will ensure that calls to given callable are made no more often than some interval.

Even if it’s call is called more often than specified value, the callable just won’t be called and None will be returned.

Parameters:
  • interval (float) – interval in seconds

  • callable – callable to call

  • callable_ (Callable) –

callable: Callable
interval: float
last_called: float
class satella.coding.concurrent.callablegroup.CallableGroup(gather=True, swallow_exceptions=False)

Bases: Generic[T]

This behaves like a function, but allows to add other functions to call when invoked, eg.

c1 = Callable()

c1.add(foo) c1.add(bar)

c1(2, 3)

Now both foo and bar will be called with arguments (2, 3). Their exceptions will be propagated.

Parameters:
  • gather (bool) –

  • swallow_exceptions (bool) –

add(callable_, one_shot=False)

Add a callable.

Note

Same callable can’t be added twice. It will silently fail.

Can be a CancellableCallback, in that case method remove_cancelled() might be useful.

Basically every callback is cancellable.

Parameters:
  • callable – callable

  • one_shot (bool) – if True, callable will be unregistered after single call

  • callable_ (Union[CancellableCallback, Callable[[], T]]) –

Returns:

callable_ if it was a cancellable callback, else one constructed after it

Return type:

CancellableCallback

Deprecated since version v2.25.5: Do not pass a CancellableCallback, you’ll get your own

callables: Dict[Callable, tuple[bool, int]]
gather: bool
property has_cancelled_callbacks: bool

Check whether this has any CancellableCallback instances and whether any of them was cancelled

remove_cancelled()

Remove it’s entries that are CancelledCallbacks and that were cancelled

Return type:

None

swallow_exceptions: bool
class satella.coding.concurrent.callablegroup.CancellableCallback(callback_fun)

Bases: object

A callback that you can cancel.

Useful for event-driven software that looks through lists of callbacks and determines whether to delete them or further invalidate in some other way.

If called, the function itself won’t be called as well if this was cancelled. In this case a None will be returned instead of the result of callback_fun()

This short circuits __bool__ to return not .cancelled. So, the bool value of this callback depends on whether it has been cancelled or not.

Hashable and __eq__-able by identity. Equal only to itself.

Parameters:

callback_fun (Callable) – function to call

Variables:

cancelled – whether this callback was cancelled (bool)

callback_fun
cancel()

Cancel this callback.

Return type:

None

cancelled

satella.coding.concurrent.functions module

satella.coding.concurrent.functions.parallel_execute(callable_, args, kwargs=<generator object infinite_iterator>)

Execute a number of calls to callable in parallel.

Callable must be a function that accepts arguments and returns a plain Python future.

Return will be an iterator that will yield every value of the iterator, or return an instance of exception, if any of the calls excepted.

Parameters:
  • callable – a callable that returns futures

  • args (Iterable[T]) – an iterable of arguments to provide to the callable

  • kwargs (Iterable[dict]) – an iterable of keyword arguments to provide to the callable

  • callable_ (Callable[[T], Future]) –

Returns:

an iterator yielding every value (or exception instance if thew) of the future

satella.coding.concurrent.functions.run_as_future(fun)

A decorator that accepts a function that should be executed in a separate thread, and a Future returned instead of it’s result, that will enable to watch the function for completion.

The created thread will be non-demonic

Example usage:

>>> @run_as_future
>>> def parse_a_file(x: str):
>>>     ...
>>> fut = parse_a_file('test.txt')
>>> result = fut.result()

satella.coding.concurrent.id_allocator module

class satella.coding.concurrent.id_allocator.IDAllocator(start_at=0, top_limit=None)

Bases: Monitor

Reusable ID allocator

You can use it to requisition ints from a pool, and then free their ints, permitting them to be reused.

Thread-safe.

Parameters:
  • start_at (int) – the lowest integer that the allocator will return

  • top_limit (Optional[int]) – the maximum value that will not be allocated. If used, subsequent calls to allocate_int() will raise Empty

You need to invoke this at your constructor You can also use it to release locks of other objects.

allocate_int()

Return a previously unallocated int, and mark it as allocated

Returns:

an allocated int

Raises:

Empty – could not allocate an int due to top limit

Return type:

int

bound
free_ints
ints_allocated
mark_as_allocated(x)

Mark given x as allocated

Parameters:

x (int) – x to mark as allocated

Raises:
  • AlreadyAllocated – x was already allocated

  • ValueError – x is less than start_at

mark_as_free(x)

Mark x as free

Parameters:

x (int) – int to free

Raises:

ValueError – x was not allocated or less than start_at

start_at
top_limit
class satella.coding.concurrent.id_allocator.SequentialIssuer(start=0)

Bases: Monitor

A classs that issues an monotonically increasing value.

Parameters:

start (int) – start issuing IDs from this value

Variables:

start – next value to be issued

You need to invoke this at your constructor You can also use it to release locks of other objects.

issue()

Just issue a next identifier

Returns:

a next identifier

Return type:

int

no_less_than(no_less_than)

Issue an int, which is no less than a given value

Parameters:

no_less_than (int) – value that the returned id will not be less than this

Returns:

an identifier, no less than no_less_than

Return type:

int

start

satella.coding.concurrent.list_processor module

satella.coding.concurrent.list_processor.parallel_construct(iterable, function, thread_pool, span_title=None)

Construct a list from executing given function in a thread pool executor.

If opentracing is installed, and tracing is enabled, current span will be passed to child threads.

Parameters:
  • iterable (Iterable[V]) – iterable to apply

  • function (Callable[[V], Optional[U]]) – function to apply. If that function returns None, no element will be added

  • thread_pool (ThreadPoolExecutor) – thread pool to execute

  • span_title (Optional[str]) – span title to create. For each execution a child span will be returned

Returns:

list that is the result of parallel application of function on each element

Return type:

List[U]

satella.coding.concurrent.locked_dataset module

class satella.coding.concurrent.locked_dataset.LockedDataset

Bases: object

A locked dataset. Subclass like

>>> class MyDataset(LockedDataset):
>>>     def __init__(self):
>>>         super(MyDataset, self).__init__()
>>>         with self:
>>>             self.mydata: str = "lol wut"
>>>    @LockedDataset.locked
>>>    def protected(self):
>>>         self.mydata = "updated atomically"
>>> mds = MyDataset()
>>> with mds as md:
>>>     md.mydata = "modified atomically"
>>> try:
>>>     with mds(blocking=True, timeout=0.5) as md:
>>>         md.mydata = "modified atomically"
>>> except ResourceLocked:
>>>     print('Could not update the resource')

If no lock is held, this class that derives from such will raise ResourceNotLocked upon element access while a lock is not being held.

Note that __enter__ will raise WouldWaitMore if timeout was given.

class InternalDataset

Bases: object

args
lock
locked
static locked(blocking=True, timeout=-1)

Decorator to use for annotating methods that would lock :param blocking: whether to block at all :param timeout: optional timeout. Default, or -1 means “return ASAP”

Return type:

Callable[[Callable], Callable]

satella.coding.concurrent.locked_structure module

class satella.coding.concurrent.locked_structure.LockedStructure(obj_to_wrap, lock=None)

Bases: Proxy, Generic[T]

A wizard to make every Python structure thread-safe.

It wraps obj_to_wrap, passing on all calls, settings and so on to the object wrapper, from lock exposing only the context manager protocol.

Example:

>>> locked_dict = LockedStructure(dict)
>>> with locked_dict:
>>>     locked_dict[5] = 2

Also, please note that operations such as addition will strip this object of being a locked structure, ie. they will return object that participated in locked structure plus some other.

Note that in-place operations return the locked structure.

Parameters:
  • obj_to_wrap (T) –

  • lock (Optional[allocate_lock]) –

satella.coding.concurrent.monitor module

class satella.coding.concurrent.monitor.Monitor

Bases: object

Base utility class for creating monitors (the synchronization thingies!)

These are NOT re-entrant!

Use it like that:

>>> class MyProtectedObject(Monitor):
>>>     def __init__(self, *args, **kwargs):
>>>         Monitor.__init__(self)
>>>         ... do your job ..
>>>     @Monitor.synchronized
>>>     def function_that_needs_mutual_exclusion(self):
>>>         .. do your threadsafe jobs ..
>>>     def function_that_partially_needs_protection(self):
>>>         .. do your jobs ..
>>>         with Monitor.acquire(self):
>>>             .. do your threadsafe jobs ..
>>>         .. do your jobs ..
>>>         with self:
>>>             .. do your threadsafe jobs ..

You need to invoke this at your constructor You can also use it to release locks of other objects.

class acquire(foo)

Bases: object

Returns a context manager object that can lock another object, as long as that object is a monitor.

Consider foo, which is a monitor. If you needed to lock it from outside, you would do:

>>> with Monitor.acquire(foo):
>>>     .. do operations on foo that need mutual exclusion ..
Parameters:

foo (Monitor) –

foo
class release(foo)

Bases: object

Returns a context manager object that can release another object as long as that object is a monitor.

Consider foo, which is a monitor. You have a protected function, but you feel that you can release it for a while as it would improve parallelism. You can use it as such:

>>> @Monitor.synchronized
>>> def protected_function(self):
>>>     .. do some stuff that needs mutual exclusion ..
>>>     with Monitor.release(self):
>>>         .. do some I/O that does not need mutual exclusion ..
>>>     .. back to protected stuff ..
Parameters:

foo (Monitor) –

foo
classmethod synchronize_on(monitor)

A decorator for locking on non-self Monitor objects

Use it like:

>>> class MasterClass(Monitor):
>>>     def get_object(self):
>>>         class SlaveClass:
>>>             @Monitor.synchronize_on(self)
>>>             def get_object(self2):
>>>                 ...
>>>         return SlaveClass
Parameters:

monitor (Monitor) –

Return type:

Callable[[Callable], Callable]

static synchronize_on_attribute(attr_name)

When a Monitor is an attribute of a class, and you have a method instance that you would like secure by acquiring that monitor, use this.

The first argument taken by that method instance must be self.

Parameters:

attr_name (str) – name of the attribute that is the monitor

static synchronized(fun)

This is a decorator. Class method decorated with that will lock the global lock of given instance, making it threadsafe. Depending on usage pattern of your class and it’s data semantics, your performance may vary

Parameters:

fun (Callable) –

Return type:

Callable

class satella.coding.concurrent.monitor.MonitorDict(*args, **kwargs)

Bases: Generic[K, V], UserDict, Monitor

A dict that is also a monitor.

Note that access to it’s properties is not automatically synchronized, you got to invoke the monitor to implement an opportunistic locking of your own choice

class satella.coding.concurrent.monitor.MonitorList(*args)

Bases: Generic[T], UserList, Monitor

A list that is also a monitor.

Note that access to it’s properties is not automatically synchronized, you got to invoke the monitor to implement an opportunistic locking of your own choice

class satella.coding.concurrent.monitor.MonitorSet(*args)

Bases: set, Monitor

A set that allows atomic insert-if-not-already-there operation

insert_and_check(item)

Perform an atomic insert if not already in set

Parameters:

item – item to insert

Returns:

whether the item was successfully inserted

Return type:

bool

class satella.coding.concurrent.monitor.RMonitor

Bases: Monitor

Monitor, but using an reentrant lock instead of a normal one

You need to invoke this at your constructor You can also use it to release locks of other objects.

satella.coding.concurrent.queue module

class satella.coding.concurrent.queue.PeekableQueue

Bases: Generic[T]

A thread-safe FIFO queue that supports peek()ing for elements.

get(timeout=None)

Get an element.

Parameters:

timeout (Optional[float]) – maximum amount of seconds to wait. Default value of None means wait as long as necessary

Returns:

the item

Raises:

Empty – queue was empty

Return type:

T

inserted_condition
lock
peek(timeout=None)

Get an element without removing it from the top of the queue.

Parameters:

timeout (Optional[float]) – maximum amount of seconds to wait. Default value of None means wait as long as necessary

Returns:

the item

Raises:

WouldWaitMore – timeout has expired

Return type:

T

put(item)

Add an element to the queue

Parameters:

item (T) – element to add

Return type:

None

put_many(items)

Put multiple items

Parameters:

items (Sequence[T]) – sequence of items to put

Return type:

None

qsize()

Return the approximate size of the queue. Note, qsize() > 0 doesn’t guarantee that a subsequent get() will not block. :return: approximate size of the queue

Return type:

int

queue

satella.coding.concurrent.sync module

satella.coding.concurrent.sync.sync_threadpool(tpe, max_wait=None)

Make sure that every thread of given thread pool executor is done processing jobs scheduled until this moment.

Make sure that other tasks do not submit anything to this thread pool executor.

Parameters:
  • tpe (Union[ExecutorWrapper, ThreadPoolExecutor]) – thread pool executor to sync. Can be also a ExecutorWrapper.

  • max_wait (Optional[float]) – maximum time to wait. Default, None, means wait forever

Raises:

WouldWaitMore – timeout exceeded. Raised only when max_wait is not None.

Return type:

None

satella.coding.concurrent.thread module

class satella.coding.concurrent.thread.BogusTerminableThread

Bases: object

A mock object that implements threading interface but does nothing

Variables:
  • started – bool, if it’s running

  • terminated – bool, if terminated

  • daemon – bool, if daemon

daemon
is_alive()
Returns:

if this thread is alive

Return type:

bool

join(timeout=None)

Wait for the pseudo-thread. Sets running to False if thread was terminated.

Parameters:

timeout – maximum number of seconds to wait for termination

Raises:
  • WouldWaitMore – thread did not terminate within that many seconds

  • RuntimeError – tried to join() before start()!

Return type:

None

start()

Set running to True :raises RuntimeError: thread already terminated or already running

Return type:

None

started
terminate()

Set terminated to True.

Note that to set running to False you need to invoke join() afterwards.

Return type:

None

terminated
class satella.coding.concurrent.thread.Condition(lock=None)

Bases: Condition

A wrapper to faciliate easier usage of Pythons’ threading.Condition.

There’s no need to acquire the underlying lock, as wait/notify/notify_all do it for you.

This happens to sorta not work on PyPy. Use at your own peril. You have been warned.

notify(n=1)

Notify n threads waiting on this Condition

Parameters:

n (int) – amount of threads to notify

Return type:

None

notifyAll()

Deprecated alias for notify_all

Deprecated since version 2.14.22.

Return type:

None

notify_all()

Notify all threads waiting on this Condition

Return type:

None

wait(timeout=None, dont_raise=False)

Wait for condition to become true.

Parameters:
  • timeout (Optional[Union[str, float]]) – timeout to wait. None is default and means infinity. Can be also a time string.

  • dont_raise (bool) – if True, then WouldWaitMore won’t be raised

Raises:
Return type:

None

class satella.coding.concurrent.thread.IntervalTerminableThread(seconds, *args, **kwargs)

Bases: TerminableThread

A TerminableThread that calls .loop() once per x seconds, taking into account the length of loop() runtime.

If executing .loop() takes more than x seconds, on_overrun() will be called. If executing .process() takes more than x seconds, it will be called immediately after it returns (and on_overrun() executes)

Parameters:

seconds (Union[str, float]) – time that a single looping through should take in seconds. Can be also a time string. This will include the time spent on calling .loop(), the rest of this time will be spent safe_sleep()ing.

Note that this is called in the constructor’s thread. Use .prepare() to run statements that should be ran in new thread.

Parameters:
  • terminate_on – if provided, and loop() throws one of it, swallow it and terminate the thread by calling terminate(). Note that the subclass check will be done via isinstance so you can use the metaclass magic :) Note that SystemExit will be automatically added to list of terminable exceptions.

  • seconds (Union[str, float]) –

abstract loop()

Override me!

Return type:

None

on_overrun(time_taken)

Called when executing .loop() takes more than x seconds.

Called each cycle.

You are meant to override this, as by default this does nothing.

Parameters:

time_taken (float) – how long did calling .loop() take

Return type:

None

run()

Calls self.loop() indefinitely, until terminating condition is met

class satella.coding.concurrent.thread.SingleStartThread(*args, **kwargs)

Bases: Thread

A thread that keeps track of whether it’s .start() method was called, and does nothing if it’s called second or so time.

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is a list or tuple of arguments for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

start()

No-op when called second or so time. The first time it starts the thread.

Returns:

self

Return type:

SingleStartThread

class satella.coding.concurrent.thread.TerminableThread(*args, terminate_on=None, **kwargs)

Bases: Thread

Class that will execute something in a loop unless terminated. Use like:

>>> class MeGrimlock(TerminableThread):
>>>     def loop(self):
>>>         ... do your operations ..
>>> a = MeGrimlock().start()
>>> a.terminate().join()

Property to check whether to terminate is stored in self.terminating.

If you decide to override run(), you got to check periodically for self._terminating to become true. If it’s true, then a termination request was received, and the thread should terminate itself. If you decide to use the loop/cleanup interface, you don’t need to do so, because it will be automatically checked for you before each loop() call.

You may also use it as a context manager. Entering the context will start the thread, and exiting it will .terminate().join() it, in the following way:

>>> a = MeGrimlock()
>>> with a:
>>>     ...
>>> self.assertFalse(a.is_alive())

If prepare() throws one of the terminate_on exceptions, loop() even won’t be called. However, terminate() will be automatically called then.

Same applies for IntervalTerminableThread and CPUTimeAwareIntervalTerminableThread.

Note that this is called in the constructor’s thread. Use .prepare() to run statements that should be ran in new thread.

Parameters:

terminate_on (Optional[Union[Type[Exception], Tuple[Type[Exception], ...]]]) – if provided, and loop() throws one of it, swallow it and terminate the thread by calling terminate(). Note that the subclass check will be done via isinstance so you can use the metaclass magic :) Note that SystemExit will be automatically added to list of terminable exceptions.

cleanup()

Called after thread non-forced termination, in the thread’s context.

The default implementation does nothing.

loop()

Run one iteration of the loop. Meant to be overrided. You do not need to override it if you decide to override run() through.

This should block for as long as a single check will take, as termination checks take place between calls.

Note that if it throws one of the exceptions given in terminate_on this thread will terminate cleanly, whereas if it throws something else, the thread will be terminated with a traceback.

Return type:

None

prepare()

This is called before the .loop() looping loop is entered.

This is invoked already in a separate thread.

Return type:

None

run()

Calls self.loop() indefinitely, until terminating condition is met

Return type:

None

safe_sleep(interval, wake_up_each=2)

Sleep for interval, waking up each wake_up_each seconds to check if terminating, finish earlier if is terminating.

This will do the right thing when passed a negative interval.

To be invoked only by the thread that’s represented by the object!

Parameters:
  • interval (float) – Time to sleep in total

  • wake_up_each (float) – Amount of seconds to wake up each

Raises:

SystemExit – thread is terminating

Return type:

None

safe_wait_condition(condition, timeout, wake_up_each=2, dont_raise=False)

Wait for a condition, checking periodically if the thread is being terminated.

To be invoked only by the thread that’s represented by the object!

Parameters:
  • condition (Condition) – condition to wait on

  • timeout (Union[str, float]) – maximum time to wait in seconds. Can be also a time string

  • wake_up_each (Union[str, float]) – amount of seconds to wake up each to check for termination. Can be also a time string.

  • dont_raise (bool) – if set to True, WouldWaitMore will not be raised

Raises:
  • WouldWaitMore – timeout has passed and Condition has not happened

  • SystemExit – thread is terminating

Return type:

None

start()

Start the execution of this thread :return: this thread

Return type:

TerminableThread

terminate(force=False)

Signal this thread to terminate.

Forcing, if requested, will be done by injecting a SystemExit exception into target thread, so the thread must acquire GIL. For example, following would not be interruptable:

>>> time.sleep(1000000)

Note that calling force=True on PyPy won’t work, and NotImplementedError will be raised instead.

Parameters:

force (bool) – Whether to force a quit

Returns:

self

Raises:
  • RuntimeError – when something goes wrong with the underlying Python machinery

  • NotImplementedError – force=True was used on PyPy

Return type:

TerminableThread

property terminating: bool
Returns:

Is this thread either alive and trying to terminate or dead and after termination?

satella.coding.concurrent.thread.call_in_separate_thread(*t_args, no_thread_attribute=False, delay=0, **t_kwargs)

Decorator to mark given routine as callable in a separate thread.

The decorated routine will return a Future that is waitable to get the result (or the exception) of the function.

The returned Future will have an extra attribute, “thread” that is thread that was spawned for it. The returned thread will in turn have an attribute “future” that links to this future.

Warning

calling this will cause reference loops, so don’t use it if you’ve disabled Python GC, or in that case enable the no_thread_attribute argument

The arguments given here will be passed to thread’s constructor, so use like:

Parameters:
  • no_thread_attribute (bool) – if set to True, future won’t have a link returned to it’s thread. The thread will have attribute of “future” anyways.

  • delay (float) – seconds to wait before launching function

>>> @call_in_separate_thread(daemon=True)
>>> def handle_messages():
>>>     while True:
>>>         ...

satella.coding.concurrent.thread_collection module

class satella.coding.concurrent.thread_collection.ThreadCollection(threads=None)

Bases: object

A collection of threads.

Create like:

>>> class MyThread(Thread):
>>>     def __init__(self, a):
>>>         ...
>>> tc = ThreadCollection.from_class(MyThread, [2, 4, 5], daemon=True)
>>> tc.start()
>>> tc.terminate()
>>> tc.join()

This also implements iteration (it will return all the threads in the collection) and length check. This also supports + and += operations for all thread collections, threads and iterables of threads.

Parameters:

threads (Optional[Sequence[Thread]]) –

add(thread)

Add a thread to the collection

Parameters:

thread (Thread) – thread to add

Returns:

this thread collection instance

Return type:

ThreadCollection

append(thread)

Alias for add()

Parameters:

thread (Thread) – thread to add

Returns:

this thread collection instance

Return type:

ThreadCollection

property daemon: bool

Is any of the threads a daemon?

Also, when used a setter sets daemon attribute.

classmethod from_class(cls_to_use, iteratable, **kwargs)

Build a thread collection

Parameters:
  • cls_to_use – class to instantiate with

  • iteratable – an iterable with the sole argument to this class

Return type:

ThreadCollection

classmethod get_currently_running(include_main_thread=True)

Get all currently running threads as thread collection

Parameters:

include_main_thread (bool) – whether to include the main thread

Returns:

a thread collection representing all currently running threads

Return type:

ThreadCollection

is_alive()

Is at least one thread alive?

Return type:

bool

join(timeout=None)

Join all threads

Parameters:

timeout (Optional[float]) – maximum time in seconds to wait for the threads to terminate. Note that the timeout will be applied to each thread in sequence, so this can block for up to thread_count*timeout seconds. Default value of None means wait as long as it’s necessary.

Returns:

this thread collection instance

Raises:

WouldWaitMore – one of the threads failed to terminate

Return type:

ThreadCollection

start()

Start all threads

Returns:

this thread collection instance

Return type:

ThreadCollection

terminate(*args, **kwargs)

Call terminate() on all threads that have this method

Returns:

this thread collection instance

Return type:

ThreadCollection

threads

satella.coding.concurrent.timer module

class satella.coding.concurrent.timer.Timer(interval, function, args=None, kwargs=None, spawn_separate=False)

Bases: object

A copy of threading.Timer but all objects are backed and waited upon in a single thread. They can be executed either in background monitor’s thread or a separate thread can be spawned for them.

There might be up to a second of delay before the timer is picked up.

n If spawn_separate is False, exceptions will be logged.

param interval:

amount of seconds that should elapse between calling start() and function executing. Can be also a time string.

param function:

function to execute

param args:

argument for function

param kwargs:

kwargs for function

param spawn_separate:

whether to call the function in a separate thread

Parameters:

interval (Union[str, float]) –

args
cancel()

Do not execute this timer

Return type:

None

cancelled
execute_at
function
interval
kwargs
spawn_separate
start()

Order this timer task to be executed in interval seconds

Return type:

None

class satella.coding.concurrent.timer.TimerBackgroundThread

Bases: Thread, Monitor

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is a list or tuple of arguments for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

run()

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

satella.coding.concurrent.value module

class satella.coding.concurrent.value.DeferredValue

Bases: Generic[T]

A class that allows you to pass arguments that will be available later during runtime.

Usage:

>>> def thread1(value):
 >>>     print(value.value())
>>> val = DeferredValue()
>>> threading.Thread(target=thread1, args=(val, )).start()
>>> time.sleep(10)
>>> val.set_value(3)
lock
result(timeout=None)

An alias for value()

Return type:

T

set_value(va)

Set a value and wake up all the threads waiting on it.

Parameters:

va (T) – value to set

Raises:

ValueError – value is already set

Return type:

None

val
value(timeout=None)

Wait until value is available, and return it.

Parameters:

timeout (Optional[float]) – number of seconds to wait. If None is given, this will take as long as necessary.

Returns:

a value

Raises:

WouldWaitMore – timeout was given and it has expired

Return type:

T

Module contents

class satella.coding.concurrent.AtomicNumber(v=0)

Bases: Monitor

An atomic number. Note that the class is not hashable and for a reason, since it’s value might change in time. So in this case this is more of like a container for numbers.

Treat it like a normal number, except all operations are executed atomically.

You can also wait for it to change it’s value, via wait().

You change it’s value in the following way:

>>> a = AtomicNumber()
>>> a += 2

Note that if the number if used in an expression, such as

>>> b = a + 2

Then a normal number will be returned

You need to invoke this at your constructor You can also use it to release locks of other objects.

Parameters:

v (Union[int, float]) –

condition
value
wait(timeout=None, throw_exception=True)

Block until the atomic number changes it’s value.

Parameters:
  • timeout (Optional[float]) – maximum time to wait. None means wait indefinitely

  • throw_exception (bool) – whether to throw WouldWaitMore on timeout

Raises:

WouldWaitMore – the value hasn’t changed within the timeout

wait_until_equal(v, timeout=None)

Wait until the value of this number equals v.

Parameters:
  • v (Union[int, float]) – value to compare this number against

  • timeout (Optional[float]) – maximum time to wait

Raises:

WouldWaitMore – timeout expired without the value becoming equal to target

Return type:

None

class satella.coding.concurrent.BogusTerminableThread

Bases: object

A mock object that implements threading interface but does nothing

Variables:
  • started – bool, if it’s running

  • terminated – bool, if terminated

  • daemon – bool, if daemon

daemon
is_alive()
Returns:

if this thread is alive

Return type:

bool

join(timeout=None)

Wait for the pseudo-thread. Sets running to False if thread was terminated.

Parameters:

timeout – maximum number of seconds to wait for termination

Raises:
  • WouldWaitMore – thread did not terminate within that many seconds

  • RuntimeError – tried to join() before start()!

Return type:

None

start()

Set running to True :raises RuntimeError: thread already terminated or already running

Return type:

None

started
terminate()

Set terminated to True.

Note that to set running to False you need to invoke join() afterwards.

Return type:

None

terminated
class satella.coding.concurrent.CallNoOftenThan(interval, callable_)

Bases: object

A class that will ensure that calls to given callable are made no more often than some interval.

Even if it’s call is called more often than specified value, the callable just won’t be called and None will be returned.

Parameters:
  • interval (float) – interval in seconds

  • callable – callable to call

  • callable_ (Callable) –

callable: Callable
interval: float
last_called: float
class satella.coding.concurrent.CallableGroup(gather=True, swallow_exceptions=False)

Bases: Generic[T]

This behaves like a function, but allows to add other functions to call when invoked, eg.

c1 = Callable()

c1.add(foo) c1.add(bar)

c1(2, 3)

Now both foo and bar will be called with arguments (2, 3). Their exceptions will be propagated.

Parameters:
  • gather (bool) –

  • swallow_exceptions (bool) –

add(callable_, one_shot=False)

Add a callable.

Note

Same callable can’t be added twice. It will silently fail.

Can be a CancellableCallback, in that case method remove_cancelled() might be useful.

Basically every callback is cancellable.

Parameters:
  • callable – callable

  • one_shot (bool) – if True, callable will be unregistered after single call

  • callable_ (Union[CancellableCallback, Callable[[], T]]) –

Returns:

callable_ if it was a cancellable callback, else one constructed after it

Return type:

CancellableCallback

Deprecated since version v2.25.5: Do not pass a CancellableCallback, you’ll get your own

callables: Dict[Callable, tuple[bool, int]]
gather: bool
property has_cancelled_callbacks: bool

Check whether this has any CancellableCallback instances and whether any of them was cancelled

remove_cancelled()

Remove it’s entries that are CancelledCallbacks and that were cancelled

Return type:

None

swallow_exceptions: bool
class satella.coding.concurrent.CancellableCallback(callback_fun)

Bases: object

A callback that you can cancel.

Useful for event-driven software that looks through lists of callbacks and determines whether to delete them or further invalidate in some other way.

If called, the function itself won’t be called as well if this was cancelled. In this case a None will be returned instead of the result of callback_fun()

This short circuits __bool__ to return not .cancelled. So, the bool value of this callback depends on whether it has been cancelled or not.

Hashable and __eq__-able by identity. Equal only to itself.

Parameters:

callback_fun (Callable) – function to call

Variables:

cancelled – whether this callback was cancelled (bool)

callback_fun
cancel()

Cancel this callback.

Return type:

None

cancelled
class satella.coding.concurrent.Condition(lock=None)

Bases: Condition

A wrapper to faciliate easier usage of Pythons’ threading.Condition.

There’s no need to acquire the underlying lock, as wait/notify/notify_all do it for you.

This happens to sorta not work on PyPy. Use at your own peril. You have been warned.

notify(n=1)

Notify n threads waiting on this Condition

Parameters:

n (int) – amount of threads to notify

Return type:

None

notifyAll()

Deprecated alias for notify_all

Deprecated since version 2.14.22.

Return type:

None

notify_all()

Notify all threads waiting on this Condition

Return type:

None

wait(timeout=None, dont_raise=False)

Wait for condition to become true.

Parameters:
  • timeout (Optional[Union[str, float]]) – timeout to wait. None is default and means infinity. Can be also a time string.

  • dont_raise (bool) – if True, then WouldWaitMore won’t be raised

Raises:
Return type:

None

class satella.coding.concurrent.DeferredValue

Bases: Generic[T]

A class that allows you to pass arguments that will be available later during runtime.

Usage:

>>> def thread1(value):
 >>>     print(value.value())
>>> val = DeferredValue()
>>> threading.Thread(target=thread1, args=(val, )).start()
>>> time.sleep(10)
>>> val.set_value(3)
lock
result(timeout=None)

An alias for value()

Return type:

T

set_value(va)

Set a value and wake up all the threads waiting on it.

Parameters:

va (T) – value to set

Raises:

ValueError – value is already set

Return type:

None

val
value(timeout=None)

Wait until value is available, and return it.

Parameters:

timeout (Optional[float]) – number of seconds to wait. If None is given, this will take as long as necessary.

Returns:

a value

Raises:

WouldWaitMore – timeout was given and it has expired

Return type:

T

class satella.coding.concurrent.Future

Bases: Future, Generic[T]

A future that allows it’s callback handlers to change it’s result before presenting it to the user.

Use like this:

>>> fut = Future()
>>> fut.set_running_or_notify_cancel()
>>> def transform_future(future):
>>>     future.set_result(future.result() + 2)
>>> fut.add_pre_done_callback(transform_future)
>>> fut.set_result(2)
>>> assert fut.result() == 4

Initializes the future. Should not be called by clients.

add_pre_done_callback(fn)

Attaches a callable that will be called just before the future finishes and can change the future’s result (or insert an Exception).

Args:
fn: A callable that will be called with this future as its only

argument just before the future completes or is cancelled.

chain(fun)

Schedule function to be called with the result of this future as it’s argument (or exception value if the future excepted).

Parameters:

fun – function to call

Returns:

self

Return type:

Future

exception(timeout)

Return the exception raised by the call that the future represents.

Args:
timeout: The number of seconds to wait for the exception if the

future isn’t done. If None, then there is no limit on the wait time.

Returns:

The exception raised by the call that the future represents or None if the call completed without raising.

Raises:

CancelledError: If the future was cancelled. TimeoutError: If the future didn’t finish executing before the given

timeout.

Parameters:

timeout (None) –

Return type:

Type[Exception]

on_failure(fun)

Schedule function to be called with the exception value that befall this future

Parameters:

fun – function to call

Returns:

self

on_success(fun)

Schedule function to be called with the result of this future as it’s argument only if this future succeeds.

Parameters:

fun – function to call

Returns:

self

Return type:

Future

result(timeout=None)

Return the result of the call that the future represents.

Args:
timeout: The number of seconds to wait for the result if the future

isn’t done. If None, then there is no limit on the wait time.

Returns:

The result of the call that the future represents.

Raises:

CancelledError: If the future was cancelled. TimeoutError: If the future didn’t finish executing before the given

timeout.

Exception: If the call raised then that exception will be raised.

Return type:

T

set_exception(exception)

Sets the result of the future as being the given exception.

Should only be used by Executor implementations and unit tests.

set_result(result)

Sets the return value of work associated with the future.

Should only be used by Executor implementations and unit tests.

Parameters:

result (T) –

Return type:

None

class satella.coding.concurrent.FutureCollection(futures=())

Bases: object

A set of futures sharing a common result, or a common exception.

This overloads the operator + for making an union of futures. It can be used with either instances of FutureCollection or normal futures.

Also supports the indexing operator to get n-th future.

Parameters:

futures (Sequence[Future]) –

add(future)

Add a future

Parameters:

future (Future) – a Future to add

Returns:

self

Return type:

FutureCollection

add_done_callback(callback, only_one=False)

Add a callback to a Future to be called on it’s completion.

By default, this will add the callback to all futures.

Parameters:
  • callback – callback that takes the completed Future as argument

  • only_one (bool) – callback will be added only to a single Future. False by default

Raises:

IndexError – only_one was given and no Futures in collection!

Return type:

None

cancel()

Cancel all futures

Returns:

True if all sections were cancelled

Return type:

bool

exception(timeout=None)

Return first exception raised by any of the futures

This will block until the results are available. This call proceeding does not mean that results for all are available, since this will return the first exception encountered!

Parameters:

timeout (Optional[float]) – a timeout in seconds for a single result. Default value None means wait as long as necessary

Returns:

the first exception, or None if there were no exceptions

Raises:

WouldWaitMore – timeout while waiting for result

Return type:

Optional[Exception]

futures
result(timeout=None)

Return the result of all futures, as a list.

This will block until the results are available.

Parameters:

timeout (Optional[float]) – a timeout in seconds for a single result. Default value None means wait as long as necessary

Returns:

list containing results of all futures

Raises:

WouldWaitMore – timeout while waiting for result

Return type:

list

set_exception(exc)

Set an exception for all futures

Parameters:

exc – exception instance to set

Return type:

None

set_result(result)

Set a result for all futures

Parameters:

result – result to set

Return type:

None

set_running_or_notify_cancel()

Call set_running_or_notify_cancel on the futures

This will return True if at least one future was not cancelled

Return type:

bool

class satella.coding.concurrent.IDAllocator(start_at=0, top_limit=None)

Bases: Monitor

Reusable ID allocator

You can use it to requisition ints from a pool, and then free their ints, permitting them to be reused.

Thread-safe.

Parameters:
  • start_at (int) – the lowest integer that the allocator will return

  • top_limit (Optional[int]) – the maximum value that will not be allocated. If used, subsequent calls to allocate_int() will raise Empty

You need to invoke this at your constructor You can also use it to release locks of other objects.

allocate_int()

Return a previously unallocated int, and mark it as allocated

Returns:

an allocated int

Raises:

Empty – could not allocate an int due to top limit

Return type:

int

bound
free_ints
ints_allocated
mark_as_allocated(x)

Mark given x as allocated

Parameters:

x (int) – x to mark as allocated

Raises:
  • AlreadyAllocated – x was already allocated

  • ValueError – x is less than start_at

mark_as_free(x)

Mark x as free

Parameters:

x (int) – int to free

Raises:

ValueError – x was not allocated or less than start_at

start_at
top_limit
class satella.coding.concurrent.IntervalTerminableThread(seconds, *args, **kwargs)

Bases: TerminableThread

A TerminableThread that calls .loop() once per x seconds, taking into account the length of loop() runtime.

If executing .loop() takes more than x seconds, on_overrun() will be called. If executing .process() takes more than x seconds, it will be called immediately after it returns (and on_overrun() executes)

Parameters:

seconds (Union[str, float]) – time that a single looping through should take in seconds. Can be also a time string. This will include the time spent on calling .loop(), the rest of this time will be spent safe_sleep()ing.

Note that this is called in the constructor’s thread. Use .prepare() to run statements that should be ran in new thread.

Parameters:
  • terminate_on – if provided, and loop() throws one of it, swallow it and terminate the thread by calling terminate(). Note that the subclass check will be done via isinstance so you can use the metaclass magic :) Note that SystemExit will be automatically added to list of terminable exceptions.

  • seconds (Union[str, float]) –

abstract loop()

Override me!

Return type:

None

on_overrun(time_taken)

Called when executing .loop() takes more than x seconds.

Called each cycle.

You are meant to override this, as by default this does nothing.

Parameters:

time_taken (float) – how long did calling .loop() take

Return type:

None

run()

Calls self.loop() indefinitely, until terminating condition is met

exception satella.coding.concurrent.InvalidStateError

Bases: Error

The operation is not allowed in this state.

class satella.coding.concurrent.LockedDataset

Bases: object

A locked dataset. Subclass like

>>> class MyDataset(LockedDataset):
>>>     def __init__(self):
>>>         super(MyDataset, self).__init__()
>>>         with self:
>>>             self.mydata: str = "lol wut"
>>>    @LockedDataset.locked
>>>    def protected(self):
>>>         self.mydata = "updated atomically"
>>> mds = MyDataset()
>>> with mds as md:
>>>     md.mydata = "modified atomically"
>>> try:
>>>     with mds(blocking=True, timeout=0.5) as md:
>>>         md.mydata = "modified atomically"
>>> except ResourceLocked:
>>>     print('Could not update the resource')

If no lock is held, this class that derives from such will raise ResourceNotLocked upon element access while a lock is not being held.

Note that __enter__ will raise WouldWaitMore if timeout was given.

class InternalDataset

Bases: object

args
lock
locked
static locked(blocking=True, timeout=-1)

Decorator to use for annotating methods that would lock :param blocking: whether to block at all :param timeout: optional timeout. Default, or -1 means “return ASAP”

Return type:

Callable[[Callable], Callable]

class satella.coding.concurrent.LockedStructure(obj_to_wrap, lock=None)

Bases: Proxy, Generic[T]

A wizard to make every Python structure thread-safe.

It wraps obj_to_wrap, passing on all calls, settings and so on to the object wrapper, from lock exposing only the context manager protocol.

Example:

>>> locked_dict = LockedStructure(dict)
>>> with locked_dict:
>>>     locked_dict[5] = 2

Also, please note that operations such as addition will strip this object of being a locked structure, ie. they will return object that participated in locked structure plus some other.

Note that in-place operations return the locked structure.

Parameters:
  • obj_to_wrap (T) –

  • lock (Optional[allocate_lock]) –

class satella.coding.concurrent.Monitor

Bases: object

Base utility class for creating monitors (the synchronization thingies!)

These are NOT re-entrant!

Use it like that:

>>> class MyProtectedObject(Monitor):
>>>     def __init__(self, *args, **kwargs):
>>>         Monitor.__init__(self)
>>>         ... do your job ..
>>>     @Monitor.synchronized
>>>     def function_that_needs_mutual_exclusion(self):
>>>         .. do your threadsafe jobs ..
>>>     def function_that_partially_needs_protection(self):
>>>         .. do your jobs ..
>>>         with Monitor.acquire(self):
>>>             .. do your threadsafe jobs ..
>>>         .. do your jobs ..
>>>         with self:
>>>             .. do your threadsafe jobs ..

You need to invoke this at your constructor You can also use it to release locks of other objects.

class acquire(foo)

Bases: object

Returns a context manager object that can lock another object, as long as that object is a monitor.

Consider foo, which is a monitor. If you needed to lock it from outside, you would do:

>>> with Monitor.acquire(foo):
>>>     .. do operations on foo that need mutual exclusion ..
Parameters:

foo (Monitor) –

foo
class release(foo)

Bases: object

Returns a context manager object that can release another object as long as that object is a monitor.

Consider foo, which is a monitor. You have a protected function, but you feel that you can release it for a while as it would improve parallelism. You can use it as such:

>>> @Monitor.synchronized
>>> def protected_function(self):
>>>     .. do some stuff that needs mutual exclusion ..
>>>     with Monitor.release(self):
>>>         .. do some I/O that does not need mutual exclusion ..
>>>     .. back to protected stuff ..
Parameters:

foo (Monitor) –

foo
classmethod synchronize_on(monitor)

A decorator for locking on non-self Monitor objects

Use it like:

>>> class MasterClass(Monitor):
>>>     def get_object(self):
>>>         class SlaveClass:
>>>             @Monitor.synchronize_on(self)
>>>             def get_object(self2):
>>>                 ...
>>>         return SlaveClass
Parameters:

monitor (Monitor) –

Return type:

Callable[[Callable], Callable]

static synchronize_on_attribute(attr_name)

When a Monitor is an attribute of a class, and you have a method instance that you would like secure by acquiring that monitor, use this.

The first argument taken by that method instance must be self.

Parameters:

attr_name (str) – name of the attribute that is the monitor

static synchronized(fun)

This is a decorator. Class method decorated with that will lock the global lock of given instance, making it threadsafe. Depending on usage pattern of your class and it’s data semantics, your performance may vary

Parameters:

fun (Callable) –

Return type:

Callable

class satella.coding.concurrent.MonitorDict(*args, **kwargs)

Bases: Generic[K, V], UserDict, Monitor

A dict that is also a monitor.

Note that access to it’s properties is not automatically synchronized, you got to invoke the monitor to implement an opportunistic locking of your own choice

class satella.coding.concurrent.MonitorList(*args)

Bases: Generic[T], UserList, Monitor

A list that is also a monitor.

Note that access to it’s properties is not automatically synchronized, you got to invoke the monitor to implement an opportunistic locking of your own choice

class satella.coding.concurrent.MonitorSet(*args)

Bases: set, Monitor

A set that allows atomic insert-if-not-already-there operation

insert_and_check(item)

Perform an atomic insert if not already in set

Parameters:

item – item to insert

Returns:

whether the item was successfully inserted

Return type:

bool

class satella.coding.concurrent.PeekableQueue

Bases: Generic[T]

A thread-safe FIFO queue that supports peek()ing for elements.

get(timeout=None)

Get an element.

Parameters:

timeout (Optional[float]) – maximum amount of seconds to wait. Default value of None means wait as long as necessary

Returns:

the item

Raises:

Empty – queue was empty

Return type:

T

inserted_condition
lock
peek(timeout=None)

Get an element without removing it from the top of the queue.

Parameters:

timeout (Optional[float]) – maximum amount of seconds to wait. Default value of None means wait as long as necessary

Returns:

the item

Raises:

WouldWaitMore – timeout has expired

Return type:

T

put(item)

Add an element to the queue

Parameters:

item (T) – element to add

Return type:

None

put_many(items)

Put multiple items

Parameters:

items (Sequence[T]) – sequence of items to put

Return type:

None

qsize()

Return the approximate size of the queue. Note, qsize() > 0 doesn’t guarantee that a subsequent get() will not block. :return: approximate size of the queue

Return type:

int

queue
class satella.coding.concurrent.RMonitor

Bases: Monitor

Monitor, but using an reentrant lock instead of a normal one

You need to invoke this at your constructor You can also use it to release locks of other objects.

class satella.coding.concurrent.SequentialIssuer(start=0)

Bases: Monitor

A classs that issues an monotonically increasing value.

Parameters:

start (int) – start issuing IDs from this value

Variables:

start – next value to be issued

You need to invoke this at your constructor You can also use it to release locks of other objects.

issue()

Just issue a next identifier

Returns:

a next identifier

Return type:

int

no_less_than(no_less_than)

Issue an int, which is no less than a given value

Parameters:

no_less_than (int) – value that the returned id will not be less than this

Returns:

an identifier, no less than no_less_than

Return type:

int

start
class satella.coding.concurrent.SingleStartThread(*args, **kwargs)

Bases: Thread

A thread that keeps track of whether it’s .start() method was called, and does nothing if it’s called second or so time.

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is a list or tuple of arguments for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

start()

No-op when called second or so time. The first time it starts the thread.

Returns:

self

Return type:

SingleStartThread

class satella.coding.concurrent.TerminableThread(*args, terminate_on=None, **kwargs)

Bases: Thread

Class that will execute something in a loop unless terminated. Use like:

>>> class MeGrimlock(TerminableThread):
>>>     def loop(self):
>>>         ... do your operations ..
>>> a = MeGrimlock().start()
>>> a.terminate().join()

Property to check whether to terminate is stored in self.terminating.

If you decide to override run(), you got to check periodically for self._terminating to become true. If it’s true, then a termination request was received, and the thread should terminate itself. If you decide to use the loop/cleanup interface, you don’t need to do so, because it will be automatically checked for you before each loop() call.

You may also use it as a context manager. Entering the context will start the thread, and exiting it will .terminate().join() it, in the following way:

>>> a = MeGrimlock()
>>> with a:
>>>     ...
>>> self.assertFalse(a.is_alive())

If prepare() throws one of the terminate_on exceptions, loop() even won’t be called. However, terminate() will be automatically called then.

Same applies for IntervalTerminableThread and CPUTimeAwareIntervalTerminableThread.

Note that this is called in the constructor’s thread. Use .prepare() to run statements that should be ran in new thread.

Parameters:

terminate_on (Optional[Union[Type[Exception], Tuple[Type[Exception], ...]]]) – if provided, and loop() throws one of it, swallow it and terminate the thread by calling terminate(). Note that the subclass check will be done via isinstance so you can use the metaclass magic :) Note that SystemExit will be automatically added to list of terminable exceptions.

cleanup()

Called after thread non-forced termination, in the thread’s context.

The default implementation does nothing.

loop()

Run one iteration of the loop. Meant to be overrided. You do not need to override it if you decide to override run() through.

This should block for as long as a single check will take, as termination checks take place between calls.

Note that if it throws one of the exceptions given in terminate_on this thread will terminate cleanly, whereas if it throws something else, the thread will be terminated with a traceback.

Return type:

None

prepare()

This is called before the .loop() looping loop is entered.

This is invoked already in a separate thread.

Return type:

None

run()

Calls self.loop() indefinitely, until terminating condition is met

Return type:

None

safe_sleep(interval, wake_up_each=2)

Sleep for interval, waking up each wake_up_each seconds to check if terminating, finish earlier if is terminating.

This will do the right thing when passed a negative interval.

To be invoked only by the thread that’s represented by the object!

Parameters:
  • interval (float) – Time to sleep in total

  • wake_up_each (float) – Amount of seconds to wake up each

Raises:

SystemExit – thread is terminating

Return type:

None

safe_wait_condition(condition, timeout, wake_up_each=2, dont_raise=False)

Wait for a condition, checking periodically if the thread is being terminated.

To be invoked only by the thread that’s represented by the object!

Parameters:
  • condition (Condition) – condition to wait on

  • timeout (Union[str, float]) – maximum time to wait in seconds. Can be also a time string

  • wake_up_each (Union[str, float]) – amount of seconds to wake up each to check for termination. Can be also a time string.

  • dont_raise (bool) – if set to True, WouldWaitMore will not be raised

Raises:
  • WouldWaitMore – timeout has passed and Condition has not happened

  • SystemExit – thread is terminating

Return type:

None

start()

Start the execution of this thread :return: this thread

Return type:

TerminableThread

terminate(force=False)

Signal this thread to terminate.

Forcing, if requested, will be done by injecting a SystemExit exception into target thread, so the thread must acquire GIL. For example, following would not be interruptable:

>>> time.sleep(1000000)

Note that calling force=True on PyPy won’t work, and NotImplementedError will be raised instead.

Parameters:

force (bool) – Whether to force a quit

Returns:

self

Raises:
  • RuntimeError – when something goes wrong with the underlying Python machinery

  • NotImplementedError – force=True was used on PyPy

Return type:

TerminableThread

property terminating: bool
Returns:

Is this thread either alive and trying to terminate or dead and after termination?

class satella.coding.concurrent.ThreadCollection(threads=None)

Bases: object

A collection of threads.

Create like:

>>> class MyThread(Thread):
>>>     def __init__(self, a):
>>>         ...
>>> tc = ThreadCollection.from_class(MyThread, [2, 4, 5], daemon=True)
>>> tc.start()
>>> tc.terminate()
>>> tc.join()

This also implements iteration (it will return all the threads in the collection) and length check. This also supports + and += operations for all thread collections, threads and iterables of threads.

Parameters:

threads (Optional[Sequence[Thread]]) –

add(thread)

Add a thread to the collection

Parameters:

thread (Thread) – thread to add

Returns:

this thread collection instance

Return type:

ThreadCollection

append(thread)

Alias for add()

Parameters:

thread (Thread) – thread to add

Returns:

this thread collection instance

Return type:

ThreadCollection

property daemon: bool

Is any of the threads a daemon?

Also, when used a setter sets daemon attribute.

classmethod from_class(cls_to_use, iteratable, **kwargs)

Build a thread collection

Parameters:
  • cls_to_use – class to instantiate with

  • iteratable – an iterable with the sole argument to this class

Return type:

ThreadCollection

classmethod get_currently_running(include_main_thread=True)

Get all currently running threads as thread collection

Parameters:

include_main_thread (bool) – whether to include the main thread

Returns:

a thread collection representing all currently running threads

Return type:

ThreadCollection

is_alive()

Is at least one thread alive?

Return type:

bool

join(timeout=None)

Join all threads

Parameters:

timeout (Optional[float]) – maximum time in seconds to wait for the threads to terminate. Note that the timeout will be applied to each thread in sequence, so this can block for up to thread_count*timeout seconds. Default value of None means wait as long as it’s necessary.

Returns:

this thread collection instance

Raises:

WouldWaitMore – one of the threads failed to terminate

Return type:

ThreadCollection

start()

Start all threads

Returns:

this thread collection instance

Return type:

ThreadCollection

terminate(*args, **kwargs)

Call terminate() on all threads that have this method

Returns:

this thread collection instance

Return type:

ThreadCollection

threads
class satella.coding.concurrent.Timer(interval, function, args=None, kwargs=None, spawn_separate=False)

Bases: object

A copy of threading.Timer but all objects are backed and waited upon in a single thread. They can be executed either in background monitor’s thread or a separate thread can be spawned for them.

There might be up to a second of delay before the timer is picked up.

n If spawn_separate is False, exceptions will be logged.

param interval:

amount of seconds that should elapse between calling start() and function executing. Can be also a time string.

param function:

function to execute

param args:

argument for function

param kwargs:

kwargs for function

param spawn_separate:

whether to call the function in a separate thread

Parameters:

interval (Union[str, float]) –

args
cancel()

Do not execute this timer

Return type:

None

cancelled
execute_at
function
interval
kwargs
spawn_separate
start()

Order this timer task to be executed in interval seconds

Return type:

None

class satella.coding.concurrent.WrappingFuture(source_future)

Bases: Future

A Satella future wrapping an existing Python future.

Use like:

>> wrapped = WrappingFuture(existing_python_future)

Initializes the future. Should not be called by clients.

Parameters:

source_future (Future) –

cancel()

Cancel the future if possible.

Returns True if the future was cancelled, False otherwise. A future cannot be cancelled if it is running or has already completed.

Return type:

bool

set_running_or_notify_cancel()

Mark the future as running or process any cancel notifications.

Should only be used by Executor implementations and unit tests.

If the future has been cancelled (cancel() was called and returned True) then any threads waiting on the future completing (though calls to as_completed() or wait()) are notified and False is returned.

If the future was not cancelled then it is put in the running state (future calls to running() will return True) and True is returned.

This method should be called by Executor implementations before executing the work associated with this future. If this method returns False then the work should not be executed.

Returns:

False if the Future was cancelled, True otherwise.

Raises:
RuntimeError: if this method was already called or if set_result()

or set_exception() was called.

Return type:

bool

satella.coding.concurrent.call_in_separate_thread(*t_args, no_thread_attribute=False, delay=0, **t_kwargs)

Decorator to mark given routine as callable in a separate thread.

The decorated routine will return a Future that is waitable to get the result (or the exception) of the function.

The returned Future will have an extra attribute, “thread” that is thread that was spawned for it. The returned thread will in turn have an attribute “future” that links to this future.

Warning

calling this will cause reference loops, so don’t use it if you’ve disabled Python GC, or in that case enable the no_thread_attribute argument

The arguments given here will be passed to thread’s constructor, so use like:

Parameters:
  • no_thread_attribute (bool) – if set to True, future won’t have a link returned to it’s thread. The thread will have attribute of “future” anyways.

  • delay (float) – seconds to wait before launching function

>>> @call_in_separate_thread(daemon=True)
>>> def handle_messages():
>>>     while True:
>>>         ...
satella.coding.concurrent.parallel_construct(iterable, function, thread_pool, span_title=None)

Construct a list from executing given function in a thread pool executor.

If opentracing is installed, and tracing is enabled, current span will be passed to child threads.

Parameters:
  • iterable (Iterable[V]) – iterable to apply

  • function (Callable[[V], Optional[U]]) – function to apply. If that function returns None, no element will be added

  • thread_pool (ThreadPoolExecutor) – thread pool to execute

  • span_title (Optional[str]) – span title to create. For each execution a child span will be returned

Returns:

list that is the result of parallel application of function on each element

Return type:

List[U]

satella.coding.concurrent.parallel_execute(callable_, args, kwargs=<generator object infinite_iterator>)

Execute a number of calls to callable in parallel.

Callable must be a function that accepts arguments and returns a plain Python future.

Return will be an iterator that will yield every value of the iterator, or return an instance of exception, if any of the calls excepted.

Parameters:
  • callable – a callable that returns futures

  • args (Iterable[T]) – an iterable of arguments to provide to the callable

  • kwargs (Iterable[dict]) – an iterable of keyword arguments to provide to the callable

  • callable_ (Callable[[T], Future]) –

Returns:

an iterator yielding every value (or exception instance if thew) of the future

satella.coding.concurrent.run_as_future(fun)

A decorator that accepts a function that should be executed in a separate thread, and a Future returned instead of it’s result, that will enable to watch the function for completion.

The created thread will be non-demonic

Example usage:

>>> @run_as_future
>>> def parse_a_file(x: str):
>>>     ...
>>> fut = parse_a_file('test.txt')
>>> result = fut.result()
satella.coding.concurrent.sync_threadpool(tpe, max_wait=None)

Make sure that every thread of given thread pool executor is done processing jobs scheduled until this moment.

Make sure that other tasks do not submit anything to this thread pool executor.

Parameters:
  • tpe (Union[ExecutorWrapper, ThreadPoolExecutor]) – thread pool executor to sync. Can be also a ExecutorWrapper.

  • max_wait (Optional[float]) – maximum time to wait. Default, None, means wait forever

Raises:

WouldWaitMore – timeout exceeded. Raised only when max_wait is not None.

Return type:

None