satella.coding.concurrent package¶
Subpackages¶
Submodules¶
satella.coding.concurrent.atomic module¶
- class satella.coding.concurrent.atomic.AtomicNumber(v=0)¶
Bases:
Monitor
An atomic number. Note that the class is not hashable and for a reason, since it’s value might change in time. So in this case this is more of like a container for numbers.
Treat it like a normal number, except all operations are executed atomically.
You can also wait for it to change it’s value, via wait().
You change it’s value in the following way:
>>> a = AtomicNumber() >>> a += 2
Note that if the number if used in an expression, such as
>>> b = a + 2
Then a normal number will be returned
You need to invoke this at your constructor You can also use it to release locks of other objects.
- Parameters:
v (Union[int, float]) –
- condition¶
- value¶
- wait(timeout=None, throw_exception=True)¶
Block until the atomic number changes it’s value.
- Parameters:
timeout (Optional[float]) – maximum time to wait. None means wait indefinitely
throw_exception (bool) – whether to throw WouldWaitMore on timeout
- Raises:
WouldWaitMore – the value hasn’t changed within the timeout
- wait_until_equal(v, timeout=None)¶
Wait until the value of this number equals v.
- Parameters:
v (Union[int, float]) – value to compare this number against
timeout (Optional[float]) – maximum time to wait
- Raises:
WouldWaitMore – timeout expired without the value becoming equal to target
- Return type:
None
satella.coding.concurrent.callablegroup module¶
- class satella.coding.concurrent.callablegroup.CallNoOftenThan(interval, callable_)¶
Bases:
object
A class that will ensure that calls to given callable are made no more often than some interval.
Even if it’s call is called more often than specified value, the callable just won’t be called and None will be returned.
- Parameters:
interval (float) – interval in seconds
callable – callable to call
callable_ (Callable) –
- callable: Callable¶
- interval: float¶
- last_called: float¶
- class satella.coding.concurrent.callablegroup.CallableGroup(gather=True, swallow_exceptions=False)¶
Bases:
Generic
[T
]This behaves like a function, but allows to add other functions to call when invoked, eg.
c1 = Callable()
c1.add(foo) c1.add(bar)
c1(2, 3)
Now both foo and bar will be called with arguments (2, 3). Their exceptions will be propagated.
- Parameters:
gather (bool) –
swallow_exceptions (bool) –
- add(callable_, one_shot=False)¶
Add a callable.
Note
Same callable can’t be added twice. It will silently fail.
Can be a
CancellableCallback
, in that case methodremove_cancelled()
might be useful.Basically every callback is cancellable.
- Parameters:
callable – callable
one_shot (bool) – if True, callable will be unregistered after single call
callable_ (Union[CancellableCallback, Callable[[], T]]) –
- Returns:
callable_ if it was a cancellable callback, else one constructed after it
- Return type:
Deprecated since version v2.25.5: Do not pass a CancellableCallback, you’ll get your own
- callables: Dict[Callable, tuple[bool, int]]¶
- gather: bool¶
- property has_cancelled_callbacks: bool¶
Check whether this has any
CancellableCallback
instances and whether any of them was cancelled
- remove_cancelled()¶
Remove it’s entries that are CancelledCallbacks and that were cancelled
- Return type:
None
- swallow_exceptions: bool¶
- class satella.coding.concurrent.callablegroup.CancellableCallback(callback_fun)¶
Bases:
object
A callback that you can cancel.
Useful for event-driven software that looks through lists of callbacks and determines whether to delete them or further invalidate in some other way.
If called, the function itself won’t be called as well if this was cancelled. In this case a None will be returned instead of the result of callback_fun()
This short circuits __bool__ to return not .cancelled. So, the bool value of this callback depends on whether it has been cancelled or not.
Hashable and __eq__-able by identity. Equal only to itself.
- Parameters:
callback_fun (Callable) – function to call
- Variables:
cancelled – whether this callback was cancelled (bool)
- callback_fun¶
- cancel()¶
Cancel this callback.
- Return type:
None
- cancelled¶
satella.coding.concurrent.functions module¶
- satella.coding.concurrent.functions.parallel_execute(callable_, args, kwargs=<generator object infinite_iterator>)¶
Execute a number of calls to callable in parallel.
Callable must be a function that accepts arguments and returns a plain Python future.
Return will be an iterator that will yield every value of the iterator, or return an instance of exception, if any of the calls excepted.
- Parameters:
callable – a callable that returns futures
args (Iterable[T]) – an iterable of arguments to provide to the callable
kwargs (Iterable[dict]) – an iterable of keyword arguments to provide to the callable
callable_ (Callable[[T], Future]) –
- Returns:
an iterator yielding every value (or exception instance if thew) of the future
- satella.coding.concurrent.functions.run_as_future(fun)¶
A decorator that accepts a function that should be executed in a separate thread, and a Future returned instead of it’s result, that will enable to watch the function for completion.
The created thread will be non-demonic
Example usage:
>>> @run_as_future >>> def parse_a_file(x: str): >>> ... >>> fut = parse_a_file('test.txt') >>> result = fut.result()
satella.coding.concurrent.id_allocator module¶
- class satella.coding.concurrent.id_allocator.IDAllocator(start_at=0, top_limit=None)¶
Bases:
Monitor
Reusable ID allocator
You can use it to requisition ints from a pool, and then free their ints, permitting them to be reused.
Thread-safe.
- Parameters:
start_at (int) – the lowest integer that the allocator will return
top_limit (Optional[int]) – the maximum value that will not be allocated. If used, subsequent calls to
allocate_int()
will raiseEmpty
You need to invoke this at your constructor You can also use it to release locks of other objects.
- allocate_int()¶
Return a previously unallocated int, and mark it as allocated
- Returns:
an allocated int
- Raises:
Empty – could not allocate an int due to top limit
- Return type:
int
- bound¶
- free_ints¶
- ints_allocated¶
- mark_as_allocated(x)¶
Mark given x as allocated
- Parameters:
x (int) – x to mark as allocated
- Raises:
AlreadyAllocated – x was already allocated
ValueError – x is less than start_at
- mark_as_free(x)¶
Mark x as free
- Parameters:
x (int) – int to free
- Raises:
ValueError – x was not allocated or less than start_at
- start_at¶
- top_limit¶
- class satella.coding.concurrent.id_allocator.SequentialIssuer(start=0)¶
Bases:
Monitor
A classs that issues an monotonically increasing value.
- Parameters:
start (int) – start issuing IDs from this value
- Variables:
start – next value to be issued
You need to invoke this at your constructor You can also use it to release locks of other objects.
- issue()¶
Just issue a next identifier
- Returns:
a next identifier
- Return type:
int
- no_less_than(no_less_than)¶
Issue an int, which is no less than a given value
- Parameters:
no_less_than (int) – value that the returned id will not be less than this
- Returns:
an identifier, no less than no_less_than
- Return type:
int
- start¶
satella.coding.concurrent.list_processor module¶
- satella.coding.concurrent.list_processor.parallel_construct(iterable, function, thread_pool, span_title=None)¶
Construct a list from executing given function in a thread pool executor.
If opentracing is installed, and tracing is enabled, current span will be passed to child threads.
- Parameters:
iterable (Iterable[V]) – iterable to apply
function (Callable[[V], Optional[U]]) – function to apply. If that function returns None, no element will be added
thread_pool (ThreadPoolExecutor) – thread pool to execute
span_title (Optional[str]) – span title to create. For each execution a child span will be returned
- Returns:
list that is the result of parallel application of function on each element
- Return type:
List[U]
satella.coding.concurrent.locked_dataset module¶
- class satella.coding.concurrent.locked_dataset.LockedDataset¶
Bases:
object
A locked dataset. Subclass like
>>> class MyDataset(LockedDataset): >>> def __init__(self): >>> super(MyDataset, self).__init__() >>> with self: >>> self.mydata: str = "lol wut" >>> @LockedDataset.locked >>> def protected(self): >>> self.mydata = "updated atomically"
>>> mds = MyDataset() >>> with mds as md: >>> md.mydata = "modified atomically"
>>> try: >>> with mds(blocking=True, timeout=0.5) as md: >>> md.mydata = "modified atomically" >>> except ResourceLocked: >>> print('Could not update the resource')
If no lock is held, this class that derives from such will raise ResourceNotLocked upon element access while a lock is not being held.
Note that __enter__ will raise WouldWaitMore if timeout was given.
- static locked(blocking=True, timeout=-1)¶
Decorator to use for annotating methods that would lock :param blocking: whether to block at all :param timeout: optional timeout. Default, or -1 means “return ASAP”
- Return type:
Callable[[Callable], Callable]
satella.coding.concurrent.locked_structure module¶
- class satella.coding.concurrent.locked_structure.LockedStructure(obj_to_wrap, lock=None)¶
Bases:
Proxy
,Generic
[T
]A wizard to make every Python structure thread-safe.
It wraps obj_to_wrap, passing on all calls, settings and so on to the object wrapper, from lock exposing only the context manager protocol.
Example:
>>> locked_dict = LockedStructure(dict) >>> with locked_dict: >>> locked_dict[5] = 2
Also, please note that operations such as addition will strip this object of being a locked structure, ie. they will return object that participated in locked structure plus some other.
Note that in-place operations return the locked structure.
- Parameters:
obj_to_wrap (T) –
lock (Optional[allocate_lock]) –
satella.coding.concurrent.monitor module¶
- class satella.coding.concurrent.monitor.Monitor¶
Bases:
object
Base utility class for creating monitors (the synchronization thingies!)
These are NOT re-entrant!
Use it like that:
>>> class MyProtectedObject(Monitor): >>> def __init__(self, *args, **kwargs): >>> Monitor.__init__(self) >>> ... do your job ..
>>> @Monitor.synchronized >>> def function_that_needs_mutual_exclusion(self): >>> .. do your threadsafe jobs ..
>>> def function_that_partially_needs_protection(self): >>> .. do your jobs .. >>> with Monitor.acquire(self): >>> .. do your threadsafe jobs .. >>> .. do your jobs .. >>> with self: >>> .. do your threadsafe jobs ..
You need to invoke this at your constructor You can also use it to release locks of other objects.
- class acquire(foo)¶
Bases:
object
Returns a context manager object that can lock another object, as long as that object is a monitor.
Consider foo, which is a monitor. If you needed to lock it from outside, you would do:
>>> with Monitor.acquire(foo): >>> .. do operations on foo that need mutual exclusion ..
- Parameters:
foo (Monitor) –
- foo¶
- class release(foo)¶
Bases:
object
Returns a context manager object that can release another object as long as that object is a monitor.
Consider foo, which is a monitor. You have a protected function, but you feel that you can release it for a while as it would improve parallelism. You can use it as such:
>>> @Monitor.synchronized >>> def protected_function(self): >>> .. do some stuff that needs mutual exclusion .. >>> with Monitor.release(self): >>> .. do some I/O that does not need mutual exclusion .. >>> .. back to protected stuff ..
- Parameters:
foo (Monitor) –
- foo¶
- classmethod synchronize_on(monitor)¶
A decorator for locking on non-self Monitor objects
Use it like:
>>> class MasterClass(Monitor): >>> def get_object(self): >>> class SlaveClass: >>> @Monitor.synchronize_on(self) >>> def get_object(self2): >>> ... >>> return SlaveClass
- Parameters:
monitor (Monitor) –
- Return type:
Callable[[Callable], Callable]
- static synchronize_on_attribute(attr_name)¶
When a Monitor is an attribute of a class, and you have a method instance that you would like secure by acquiring that monitor, use this.
The first argument taken by that method instance must be self.
- Parameters:
attr_name (str) – name of the attribute that is the monitor
- static synchronized(fun)¶
This is a decorator. Class method decorated with that will lock the global lock of given instance, making it threadsafe. Depending on usage pattern of your class and it’s data semantics, your performance may vary
- Parameters:
fun (Callable) –
- Return type:
Callable
- class satella.coding.concurrent.monitor.MonitorDict(*args, **kwargs)¶
Bases:
Generic
[K
,V
],UserDict
,Monitor
A dict that is also a monitor.
Note that access to it’s properties is not automatically synchronized, you got to invoke the monitor to implement an opportunistic locking of your own choice
- class satella.coding.concurrent.monitor.MonitorList(*args)¶
Bases:
Generic
[T
],UserList
,Monitor
A list that is also a monitor.
Note that access to it’s properties is not automatically synchronized, you got to invoke the monitor to implement an opportunistic locking of your own choice
- class satella.coding.concurrent.monitor.MonitorSet(*args)¶
Bases:
set
,Monitor
A set that allows atomic insert-if-not-already-there operation
- insert_and_check(item)¶
Perform an atomic insert if not already in set
- Parameters:
item – item to insert
- Returns:
whether the item was successfully inserted
- Return type:
bool
satella.coding.concurrent.queue module¶
- class satella.coding.concurrent.queue.PeekableQueue¶
Bases:
Generic
[T
]A thread-safe FIFO queue that supports peek()ing for elements.
- get(timeout=None)¶
Get an element.
- Parameters:
timeout (Optional[float]) – maximum amount of seconds to wait. Default value of None means wait as long as necessary
- Returns:
the item
- Raises:
Empty – queue was empty
- Return type:
T
- inserted_condition¶
- lock¶
- peek(timeout=None)¶
Get an element without removing it from the top of the queue.
- Parameters:
timeout (Optional[float]) – maximum amount of seconds to wait. Default value of None means wait as long as necessary
- Returns:
the item
- Raises:
WouldWaitMore – timeout has expired
- Return type:
T
- put(item)¶
Add an element to the queue
- Parameters:
item (T) – element to add
- Return type:
None
- put_many(items)¶
Put multiple items
- Parameters:
items (Sequence[T]) – sequence of items to put
- Return type:
None
- qsize()¶
Return the approximate size of the queue. Note, qsize() > 0 doesn’t guarantee that a subsequent get() will not block. :return: approximate size of the queue
- Return type:
int
- queue¶
satella.coding.concurrent.sync module¶
- satella.coding.concurrent.sync.sync_threadpool(tpe, max_wait=None)¶
Make sure that every thread of given thread pool executor is done processing jobs scheduled until this moment.
Make sure that other tasks do not submit anything to this thread pool executor.
- Parameters:
tpe (Union[ExecutorWrapper, ThreadPoolExecutor]) – thread pool executor to sync. Can be also a ExecutorWrapper.
max_wait (Optional[float]) – maximum time to wait. Default, None, means wait forever
- Raises:
WouldWaitMore – timeout exceeded. Raised only when max_wait is not None.
- Return type:
None
satella.coding.concurrent.thread module¶
- class satella.coding.concurrent.thread.BogusTerminableThread¶
Bases:
object
A mock object that implements threading interface but does nothing
- Variables:
started – bool, if it’s running
terminated – bool, if terminated
daemon – bool, if daemon
- daemon¶
- is_alive()¶
- Returns:
if this thread is alive
- Return type:
bool
- join(timeout=None)¶
Wait for the pseudo-thread. Sets running to False if thread was terminated.
- Parameters:
timeout – maximum number of seconds to wait for termination
- Raises:
WouldWaitMore – thread did not terminate within that many seconds
RuntimeError – tried to join() before start()!
- Return type:
None
- start()¶
Set running to True :raises RuntimeError: thread already terminated or already running
- Return type:
None
- started¶
- terminate()¶
Set terminated to True.
Note that to set running to False you need to invoke
join()
afterwards.- Return type:
None
- terminated¶
- class satella.coding.concurrent.thread.Condition(lock=None)¶
Bases:
Condition
A wrapper to faciliate easier usage of Pythons’ threading.Condition.
There’s no need to acquire the underlying lock, as wait/notify/notify_all do it for you.
This happens to sorta not work on PyPy. Use at your own peril. You have been warned.
- notify(n=1)¶
Notify n threads waiting on this Condition
- Parameters:
n (int) – amount of threads to notify
- Return type:
None
- notifyAll()¶
Deprecated alias for notify_all
Deprecated since version 2.14.22.
- Return type:
None
- notify_all()¶
Notify all threads waiting on this Condition
- Return type:
None
- wait(timeout=None, dont_raise=False)¶
Wait for condition to become true.
- Parameters:
timeout (Optional[Union[str, float]]) – timeout to wait. None is default and means infinity. Can be also a time string.
dont_raise (bool) – if True, then WouldWaitMore won’t be raised
- Raises:
ResourceLocked – unable to acquire the underlying lock within specified timeout.
WouldWaitMore – wait’s timeout has expired
- Return type:
None
- class satella.coding.concurrent.thread.IntervalTerminableThread(seconds, *args, **kwargs)¶
Bases:
TerminableThread
A TerminableThread that calls .loop() once per x seconds, taking into account the length of
loop()
runtime.If executing .loop() takes more than x seconds, on_overrun() will be called. If executing .process() takes more than x seconds, it will be called immediately after it returns (and
on_overrun()
executes)- Parameters:
seconds (Union[str, float]) – time that a single looping through should take in seconds. Can be also a time string. This will include the time spent on calling .loop(), the rest of this time will be spent safe_sleep()ing.
Note that this is called in the constructor’s thread. Use .prepare() to run statements that should be ran in new thread.
- Parameters:
terminate_on – if provided, and
loop()
throws one of it, swallow it and terminate the thread by callingterminate()
. Note that the subclass check will be done via isinstance so you can use the metaclass magic :) Note that SystemExit will be automatically added to list of terminable exceptions.seconds (Union[str, float]) –
- abstract loop()¶
Override me!
- Return type:
None
- on_overrun(time_taken)¶
Called when executing .loop() takes more than x seconds.
Called each cycle.
You are meant to override this, as by default this does nothing.
- Parameters:
time_taken (float) – how long did calling .loop() take
- Return type:
None
- run()¶
Calls self.loop() indefinitely, until terminating condition is met
- class satella.coding.concurrent.thread.SingleStartThread(*args, **kwargs)¶
Bases:
Thread
A thread that keeps track of whether it’s .start() method was called, and does nothing if it’s called second or so time.
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.
args is a list or tuple of arguments for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
- start()¶
No-op when called second or so time. The first time it starts the thread.
- Returns:
self
- Return type:
- class satella.coding.concurrent.thread.TerminableThread(*args, terminate_on=None, **kwargs)¶
Bases:
Thread
Class that will execute something in a loop unless terminated. Use like:
>>> class MeGrimlock(TerminableThread): >>> def loop(self): >>> ... do your operations .. >>> a = MeGrimlock().start() >>> a.terminate().join()
Property to check whether to terminate is stored in self.terminating.
If you decide to override run(), you got to check periodically for self._terminating to become true. If it’s true, then a termination request was received, and the thread should terminate itself. If you decide to use the loop/cleanup interface, you don’t need to do so, because it will be automatically checked for you before each loop() call.
You may also use it as a context manager. Entering the context will start the thread, and exiting it will .terminate().join() it, in the following way:
>>> a = MeGrimlock() >>> with a: >>> ... >>> self.assertFalse(a.is_alive())
If prepare() throws one of the terminate_on exceptions,
loop()
even won’t be called. However,terminate()
will be automatically called then.Same applies for
IntervalTerminableThread
andCPUTimeAwareIntervalTerminableThread
.Note that this is called in the constructor’s thread. Use .prepare() to run statements that should be ran in new thread.
- Parameters:
terminate_on (Optional[Union[Type[Exception], Tuple[Type[Exception], ...]]]) – if provided, and
loop()
throws one of it, swallow it and terminate the thread by callingterminate()
. Note that the subclass check will be done via isinstance so you can use the metaclass magic :) Note that SystemExit will be automatically added to list of terminable exceptions.
- cleanup()¶
Called after thread non-forced termination, in the thread’s context.
The default implementation does nothing.
- loop()¶
Run one iteration of the loop. Meant to be overrided. You do not need to override it if you decide to override run() through.
This should block for as long as a single check will take, as termination checks take place between calls.
Note that if it throws one of the exceptions given in terminate_on this thread will terminate cleanly, whereas if it throws something else, the thread will be terminated with a traceback.
- Return type:
None
- prepare()¶
This is called before the .loop() looping loop is entered.
This is invoked already in a separate thread.
- Return type:
None
- run()¶
Calls self.loop() indefinitely, until terminating condition is met
- Return type:
None
- safe_sleep(interval, wake_up_each=2)¶
Sleep for interval, waking up each wake_up_each seconds to check if terminating, finish earlier if is terminating.
This will do the right thing when passed a negative interval.
To be invoked only by the thread that’s represented by the object!
- Parameters:
interval (float) – Time to sleep in total
wake_up_each (float) – Amount of seconds to wake up each
- Raises:
SystemExit – thread is terminating
- Return type:
None
- safe_wait_condition(condition, timeout, wake_up_each=2, dont_raise=False)¶
Wait for a condition, checking periodically if the thread is being terminated.
To be invoked only by the thread that’s represented by the object!
- Parameters:
condition (Condition) – condition to wait on
timeout (Union[str, float]) – maximum time to wait in seconds. Can be also a time string
wake_up_each (Union[str, float]) – amount of seconds to wake up each to check for termination. Can be also a time string.
dont_raise (bool) – if set to True,
WouldWaitMore
will not be raised
- Raises:
WouldWaitMore – timeout has passed and Condition has not happened
SystemExit – thread is terminating
- Return type:
None
- start()¶
Start the execution of this thread :return: this thread
- Return type:
- terminate(force=False)¶
Signal this thread to terminate.
Forcing, if requested, will be done by injecting a SystemExit exception into target thread, so the thread must acquire GIL. For example, following would not be interruptable:
>>> time.sleep(1000000)
Note that calling force=True on PyPy won’t work, and NotImplementedError will be raised instead.
- Parameters:
force (bool) – Whether to force a quit
- Returns:
self
- Raises:
RuntimeError – when something goes wrong with the underlying Python machinery
NotImplementedError – force=True was used on PyPy
- Return type:
- property terminating: bool¶
- Returns:
Is this thread either alive and trying to terminate or dead and after termination?
- satella.coding.concurrent.thread.call_in_separate_thread(*t_args, no_thread_attribute=False, delay=0, **t_kwargs)¶
Decorator to mark given routine as callable in a separate thread.
The decorated routine will return a Future that is waitable to get the result (or the exception) of the function.
The returned Future will have an extra attribute, “thread” that is thread that was spawned for it. The returned thread will in turn have an attribute “future” that links to this future.
Warning
calling this will cause reference loops, so don’t use it if you’ve disabled Python GC, or in that case enable the no_thread_attribute argument
The arguments given here will be passed to thread’s constructor, so use like:
- Parameters:
no_thread_attribute (bool) – if set to True, future won’t have a link returned to it’s thread. The thread will have attribute of “future” anyways.
delay (float) – seconds to wait before launching function
>>> @call_in_separate_thread(daemon=True) >>> def handle_messages(): >>> while True: >>> ...
satella.coding.concurrent.thread_collection module¶
- class satella.coding.concurrent.thread_collection.ThreadCollection(threads=None)¶
Bases:
object
A collection of threads.
Create like:
>>> class MyThread(Thread): >>> def __init__(self, a): >>> ... >>> tc = ThreadCollection.from_class(MyThread, [2, 4, 5], daemon=True) >>> tc.start() >>> tc.terminate() >>> tc.join()
This also implements iteration (it will return all the threads in the collection) and length check. This also supports + and += operations for all thread collections, threads and iterables of threads.
- Parameters:
threads (Optional[Sequence[Thread]]) –
- add(thread)¶
Add a thread to the collection
- Parameters:
thread (Thread) – thread to add
- Returns:
this thread collection instance
- Return type:
- append(thread)¶
Alias for
add()
- Parameters:
thread (Thread) – thread to add
- Returns:
this thread collection instance
- Return type:
- property daemon: bool¶
Is any of the threads a daemon?
Also, when used a setter sets daemon attribute.
- classmethod from_class(cls_to_use, iteratable, **kwargs)¶
Build a thread collection
- Parameters:
cls_to_use – class to instantiate with
iteratable – an iterable with the sole argument to this class
- Return type:
- classmethod get_currently_running(include_main_thread=True)¶
Get all currently running threads as thread collection
- Parameters:
include_main_thread (bool) – whether to include the main thread
- Returns:
a thread collection representing all currently running threads
- Return type:
- is_alive()¶
Is at least one thread alive?
- Return type:
bool
- join(timeout=None)¶
Join all threads
- Parameters:
timeout (Optional[float]) – maximum time in seconds to wait for the threads to terminate. Note that the timeout will be applied to each thread in sequence, so this can block for up to thread_count*timeout seconds. Default value of None means wait as long as it’s necessary.
- Returns:
this thread collection instance
- Raises:
WouldWaitMore – one of the threads failed to terminate
- Return type:
- start()¶
Start all threads
- Returns:
this thread collection instance
- Return type:
- terminate(*args, **kwargs)¶
Call terminate() on all threads that have this method
- Returns:
this thread collection instance
- Return type:
- threads¶
satella.coding.concurrent.timer module¶
- class satella.coding.concurrent.timer.Timer(interval, function, args=None, kwargs=None, spawn_separate=False)¶
Bases:
object
A copy of threading.Timer but all objects are backed and waited upon in a single thread. They can be executed either in background monitor’s thread or a separate thread can be spawned for them.
There might be up to a second of delay before the timer is picked up.
n If spawn_separate is False, exceptions will be logged.
- param interval:
amount of seconds that should elapse between calling start() and function executing. Can be also a time string.
- param function:
function to execute
- param args:
argument for function
- param kwargs:
kwargs for function
- param spawn_separate:
whether to call the function in a separate thread
- Parameters:
interval (Union[str, float]) –
- args¶
- cancel()¶
Do not execute this timer
- Return type:
None
- cancelled¶
- execute_at¶
- function¶
- interval¶
- kwargs¶
- spawn_separate¶
- start()¶
Order this timer task to be executed in interval seconds
- Return type:
None
- class satella.coding.concurrent.timer.TimerBackgroundThread¶
Bases:
Thread
,Monitor
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.
args is a list or tuple of arguments for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
- run()¶
Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
satella.coding.concurrent.value module¶
- class satella.coding.concurrent.value.DeferredValue¶
Bases:
Generic
[T
]A class that allows you to pass arguments that will be available later during runtime.
Usage:
>>> def thread1(value): >>> print(value.value())
>>> val = DeferredValue() >>> threading.Thread(target=thread1, args=(val, )).start() >>> time.sleep(10) >>> val.set_value(3)
- lock¶
- set_value(va)¶
Set a value and wake up all the threads waiting on it.
- Parameters:
va (T) – value to set
- Raises:
ValueError – value is already set
- Return type:
None
- val¶
- value(timeout=None)¶
Wait until value is available, and return it.
- Parameters:
timeout (Optional[float]) – number of seconds to wait. If None is given, this will take as long as necessary.
- Returns:
a value
- Raises:
WouldWaitMore – timeout was given and it has expired
- Return type:
T
Module contents¶
- class satella.coding.concurrent.AtomicNumber(v=0)¶
Bases:
Monitor
An atomic number. Note that the class is not hashable and for a reason, since it’s value might change in time. So in this case this is more of like a container for numbers.
Treat it like a normal number, except all operations are executed atomically.
You can also wait for it to change it’s value, via wait().
You change it’s value in the following way:
>>> a = AtomicNumber() >>> a += 2
Note that if the number if used in an expression, such as
>>> b = a + 2
Then a normal number will be returned
You need to invoke this at your constructor You can also use it to release locks of other objects.
- Parameters:
v (Union[int, float]) –
- condition¶
- value¶
- wait(timeout=None, throw_exception=True)¶
Block until the atomic number changes it’s value.
- Parameters:
timeout (Optional[float]) – maximum time to wait. None means wait indefinitely
throw_exception (bool) – whether to throw WouldWaitMore on timeout
- Raises:
WouldWaitMore – the value hasn’t changed within the timeout
- wait_until_equal(v, timeout=None)¶
Wait until the value of this number equals v.
- Parameters:
v (Union[int, float]) – value to compare this number against
timeout (Optional[float]) – maximum time to wait
- Raises:
WouldWaitMore – timeout expired without the value becoming equal to target
- Return type:
None
- class satella.coding.concurrent.BogusTerminableThread¶
Bases:
object
A mock object that implements threading interface but does nothing
- Variables:
started – bool, if it’s running
terminated – bool, if terminated
daemon – bool, if daemon
- daemon¶
- is_alive()¶
- Returns:
if this thread is alive
- Return type:
bool
- join(timeout=None)¶
Wait for the pseudo-thread. Sets running to False if thread was terminated.
- Parameters:
timeout – maximum number of seconds to wait for termination
- Raises:
WouldWaitMore – thread did not terminate within that many seconds
RuntimeError – tried to join() before start()!
- Return type:
None
- start()¶
Set running to True :raises RuntimeError: thread already terminated or already running
- Return type:
None
- started¶
- terminate()¶
Set terminated to True.
Note that to set running to False you need to invoke
join()
afterwards.- Return type:
None
- terminated¶
- class satella.coding.concurrent.CallNoOftenThan(interval, callable_)¶
Bases:
object
A class that will ensure that calls to given callable are made no more often than some interval.
Even if it’s call is called more often than specified value, the callable just won’t be called and None will be returned.
- Parameters:
interval (float) – interval in seconds
callable – callable to call
callable_ (Callable) –
- callable: Callable¶
- interval: float¶
- last_called: float¶
- class satella.coding.concurrent.CallableGroup(gather=True, swallow_exceptions=False)¶
Bases:
Generic
[T
]This behaves like a function, but allows to add other functions to call when invoked, eg.
c1 = Callable()
c1.add(foo) c1.add(bar)
c1(2, 3)
Now both foo and bar will be called with arguments (2, 3). Their exceptions will be propagated.
- Parameters:
gather (bool) –
swallow_exceptions (bool) –
- add(callable_, one_shot=False)¶
Add a callable.
Note
Same callable can’t be added twice. It will silently fail.
Can be a
CancellableCallback
, in that case methodremove_cancelled()
might be useful.Basically every callback is cancellable.
- Parameters:
callable – callable
one_shot (bool) – if True, callable will be unregistered after single call
callable_ (Union[CancellableCallback, Callable[[], T]]) –
- Returns:
callable_ if it was a cancellable callback, else one constructed after it
- Return type:
Deprecated since version v2.25.5: Do not pass a CancellableCallback, you’ll get your own
- callables: Dict[Callable, tuple[bool, int]]¶
- gather: bool¶
- property has_cancelled_callbacks: bool¶
Check whether this has any
CancellableCallback
instances and whether any of them was cancelled
- remove_cancelled()¶
Remove it’s entries that are CancelledCallbacks and that were cancelled
- Return type:
None
- swallow_exceptions: bool¶
- class satella.coding.concurrent.CancellableCallback(callback_fun)¶
Bases:
object
A callback that you can cancel.
Useful for event-driven software that looks through lists of callbacks and determines whether to delete them or further invalidate in some other way.
If called, the function itself won’t be called as well if this was cancelled. In this case a None will be returned instead of the result of callback_fun()
This short circuits __bool__ to return not .cancelled. So, the bool value of this callback depends on whether it has been cancelled or not.
Hashable and __eq__-able by identity. Equal only to itself.
- Parameters:
callback_fun (Callable) – function to call
- Variables:
cancelled – whether this callback was cancelled (bool)
- callback_fun¶
- cancel()¶
Cancel this callback.
- Return type:
None
- cancelled¶
- class satella.coding.concurrent.Condition(lock=None)¶
Bases:
Condition
A wrapper to faciliate easier usage of Pythons’ threading.Condition.
There’s no need to acquire the underlying lock, as wait/notify/notify_all do it for you.
This happens to sorta not work on PyPy. Use at your own peril. You have been warned.
- notify(n=1)¶
Notify n threads waiting on this Condition
- Parameters:
n (int) – amount of threads to notify
- Return type:
None
- notifyAll()¶
Deprecated alias for notify_all
Deprecated since version 2.14.22.
- Return type:
None
- notify_all()¶
Notify all threads waiting on this Condition
- Return type:
None
- wait(timeout=None, dont_raise=False)¶
Wait for condition to become true.
- Parameters:
timeout (Optional[Union[str, float]]) – timeout to wait. None is default and means infinity. Can be also a time string.
dont_raise (bool) – if True, then WouldWaitMore won’t be raised
- Raises:
ResourceLocked – unable to acquire the underlying lock within specified timeout.
WouldWaitMore – wait’s timeout has expired
- Return type:
None
- class satella.coding.concurrent.DeferredValue¶
Bases:
Generic
[T
]A class that allows you to pass arguments that will be available later during runtime.
Usage:
>>> def thread1(value): >>> print(value.value())
>>> val = DeferredValue() >>> threading.Thread(target=thread1, args=(val, )).start() >>> time.sleep(10) >>> val.set_value(3)
- lock¶
- set_value(va)¶
Set a value and wake up all the threads waiting on it.
- Parameters:
va (T) – value to set
- Raises:
ValueError – value is already set
- Return type:
None
- val¶
- value(timeout=None)¶
Wait until value is available, and return it.
- Parameters:
timeout (Optional[float]) – number of seconds to wait. If None is given, this will take as long as necessary.
- Returns:
a value
- Raises:
WouldWaitMore – timeout was given and it has expired
- Return type:
T
- class satella.coding.concurrent.Future¶
Bases:
Future
,Generic
[T
]A future that allows it’s callback handlers to change it’s result before presenting it to the user.
Use like this:
>>> fut = Future() >>> fut.set_running_or_notify_cancel() >>> def transform_future(future): >>> future.set_result(future.result() + 2) >>> fut.add_pre_done_callback(transform_future) >>> fut.set_result(2) >>> assert fut.result() == 4
Initializes the future. Should not be called by clients.
- add_pre_done_callback(fn)¶
Attaches a callable that will be called just before the future finishes and can change the future’s result (or insert an Exception).
- Args:
- fn: A callable that will be called with this future as its only
argument just before the future completes or is cancelled.
- chain(fun)¶
Schedule function to be called with the result of this future as it’s argument (or exception value if the future excepted).
- Parameters:
fun – function to call
- Returns:
self
- Return type:
- exception(timeout)¶
Return the exception raised by the call that the future represents.
- Args:
- timeout: The number of seconds to wait for the exception if the
future isn’t done. If None, then there is no limit on the wait time.
- Returns:
The exception raised by the call that the future represents or None if the call completed without raising.
- Raises:
CancelledError: If the future was cancelled. TimeoutError: If the future didn’t finish executing before the given
timeout.
- Parameters:
timeout (None) –
- Return type:
Type[Exception]
- on_failure(fun)¶
Schedule function to be called with the exception value that befall this future
- Parameters:
fun – function to call
- Returns:
self
- on_success(fun)¶
Schedule function to be called with the result of this future as it’s argument only if this future succeeds.
- Parameters:
fun – function to call
- Returns:
self
- Return type:
- result(timeout=None)¶
Return the result of the call that the future represents.
- Args:
- timeout: The number of seconds to wait for the result if the future
isn’t done. If None, then there is no limit on the wait time.
- Returns:
The result of the call that the future represents.
- Raises:
CancelledError: If the future was cancelled. TimeoutError: If the future didn’t finish executing before the given
timeout.
Exception: If the call raised then that exception will be raised.
- Return type:
T
- set_exception(exception)¶
Sets the result of the future as being the given exception.
Should only be used by Executor implementations and unit tests.
- set_result(result)¶
Sets the return value of work associated with the future.
Should only be used by Executor implementations and unit tests.
- Parameters:
result (T) –
- Return type:
None
- class satella.coding.concurrent.FutureCollection(futures=())¶
Bases:
object
A set of futures sharing a common result, or a common exception.
This overloads the operator + for making an union of futures. It can be used with either instances of
FutureCollection
or normal futures.Also supports the indexing operator to get n-th future.
- Parameters:
futures (Sequence[Future]) –
- add(future)¶
Add a future
- Parameters:
future (Future) – a Future to add
- Returns:
self
- Return type:
- add_done_callback(callback, only_one=False)¶
Add a callback to a Future to be called on it’s completion.
By default, this will add the callback to all futures.
- Parameters:
callback – callback that takes the completed Future as argument
only_one (bool) – callback will be added only to a single Future. False by default
- Raises:
IndexError – only_one was given and no Futures in collection!
- Return type:
None
- cancel()¶
Cancel all futures
- Returns:
True if all sections were cancelled
- Return type:
bool
- exception(timeout=None)¶
Return first exception raised by any of the futures
This will block until the results are available. This call proceeding does not mean that results for all are available, since this will return the first exception encountered!
- Parameters:
timeout (Optional[float]) – a timeout in seconds for a single result. Default value None means wait as long as necessary
- Returns:
the first exception, or None if there were no exceptions
- Raises:
WouldWaitMore – timeout while waiting for result
- Return type:
Optional[Exception]
- futures¶
- result(timeout=None)¶
Return the result of all futures, as a list.
This will block until the results are available.
- Parameters:
timeout (Optional[float]) – a timeout in seconds for a single result. Default value None means wait as long as necessary
- Returns:
list containing results of all futures
- Raises:
WouldWaitMore – timeout while waiting for result
- Return type:
list
- set_exception(exc)¶
Set an exception for all futures
- Parameters:
exc – exception instance to set
- Return type:
None
- set_result(result)¶
Set a result for all futures
- Parameters:
result – result to set
- Return type:
None
- set_running_or_notify_cancel()¶
Call
set_running_or_notify_cancel
on the futuresThis will return True if at least one future was not cancelled
- Return type:
bool
- class satella.coding.concurrent.IDAllocator(start_at=0, top_limit=None)¶
Bases:
Monitor
Reusable ID allocator
You can use it to requisition ints from a pool, and then free their ints, permitting them to be reused.
Thread-safe.
- Parameters:
start_at (int) – the lowest integer that the allocator will return
top_limit (Optional[int]) – the maximum value that will not be allocated. If used, subsequent calls to
allocate_int()
will raiseEmpty
You need to invoke this at your constructor You can also use it to release locks of other objects.
- allocate_int()¶
Return a previously unallocated int, and mark it as allocated
- Returns:
an allocated int
- Raises:
Empty – could not allocate an int due to top limit
- Return type:
int
- bound¶
- free_ints¶
- ints_allocated¶
- mark_as_allocated(x)¶
Mark given x as allocated
- Parameters:
x (int) – x to mark as allocated
- Raises:
AlreadyAllocated – x was already allocated
ValueError – x is less than start_at
- mark_as_free(x)¶
Mark x as free
- Parameters:
x (int) – int to free
- Raises:
ValueError – x was not allocated or less than start_at
- start_at¶
- top_limit¶
- class satella.coding.concurrent.IntervalTerminableThread(seconds, *args, **kwargs)¶
Bases:
TerminableThread
A TerminableThread that calls .loop() once per x seconds, taking into account the length of
loop()
runtime.If executing .loop() takes more than x seconds, on_overrun() will be called. If executing .process() takes more than x seconds, it will be called immediately after it returns (and
on_overrun()
executes)- Parameters:
seconds (Union[str, float]) – time that a single looping through should take in seconds. Can be also a time string. This will include the time spent on calling .loop(), the rest of this time will be spent safe_sleep()ing.
Note that this is called in the constructor’s thread. Use .prepare() to run statements that should be ran in new thread.
- Parameters:
terminate_on – if provided, and
loop()
throws one of it, swallow it and terminate the thread by callingterminate()
. Note that the subclass check will be done via isinstance so you can use the metaclass magic :) Note that SystemExit will be automatically added to list of terminable exceptions.seconds (Union[str, float]) –
- abstract loop()¶
Override me!
- Return type:
None
- on_overrun(time_taken)¶
Called when executing .loop() takes more than x seconds.
Called each cycle.
You are meant to override this, as by default this does nothing.
- Parameters:
time_taken (float) – how long did calling .loop() take
- Return type:
None
- run()¶
Calls self.loop() indefinitely, until terminating condition is met
- exception satella.coding.concurrent.InvalidStateError¶
Bases:
Error
The operation is not allowed in this state.
- class satella.coding.concurrent.LockedDataset¶
Bases:
object
A locked dataset. Subclass like
>>> class MyDataset(LockedDataset): >>> def __init__(self): >>> super(MyDataset, self).__init__() >>> with self: >>> self.mydata: str = "lol wut" >>> @LockedDataset.locked >>> def protected(self): >>> self.mydata = "updated atomically"
>>> mds = MyDataset() >>> with mds as md: >>> md.mydata = "modified atomically"
>>> try: >>> with mds(blocking=True, timeout=0.5) as md: >>> md.mydata = "modified atomically" >>> except ResourceLocked: >>> print('Could not update the resource')
If no lock is held, this class that derives from such will raise ResourceNotLocked upon element access while a lock is not being held.
Note that __enter__ will raise WouldWaitMore if timeout was given.
- static locked(blocking=True, timeout=-1)¶
Decorator to use for annotating methods that would lock :param blocking: whether to block at all :param timeout: optional timeout. Default, or -1 means “return ASAP”
- Return type:
Callable[[Callable], Callable]
- class satella.coding.concurrent.LockedStructure(obj_to_wrap, lock=None)¶
Bases:
Proxy
,Generic
[T
]A wizard to make every Python structure thread-safe.
It wraps obj_to_wrap, passing on all calls, settings and so on to the object wrapper, from lock exposing only the context manager protocol.
Example:
>>> locked_dict = LockedStructure(dict) >>> with locked_dict: >>> locked_dict[5] = 2
Also, please note that operations such as addition will strip this object of being a locked structure, ie. they will return object that participated in locked structure plus some other.
Note that in-place operations return the locked structure.
- Parameters:
obj_to_wrap (T) –
lock (Optional[allocate_lock]) –
- class satella.coding.concurrent.Monitor¶
Bases:
object
Base utility class for creating monitors (the synchronization thingies!)
These are NOT re-entrant!
Use it like that:
>>> class MyProtectedObject(Monitor): >>> def __init__(self, *args, **kwargs): >>> Monitor.__init__(self) >>> ... do your job ..
>>> @Monitor.synchronized >>> def function_that_needs_mutual_exclusion(self): >>> .. do your threadsafe jobs ..
>>> def function_that_partially_needs_protection(self): >>> .. do your jobs .. >>> with Monitor.acquire(self): >>> .. do your threadsafe jobs .. >>> .. do your jobs .. >>> with self: >>> .. do your threadsafe jobs ..
You need to invoke this at your constructor You can also use it to release locks of other objects.
- class acquire(foo)¶
Bases:
object
Returns a context manager object that can lock another object, as long as that object is a monitor.
Consider foo, which is a monitor. If you needed to lock it from outside, you would do:
>>> with Monitor.acquire(foo): >>> .. do operations on foo that need mutual exclusion ..
- Parameters:
foo (Monitor) –
- foo¶
- class release(foo)¶
Bases:
object
Returns a context manager object that can release another object as long as that object is a monitor.
Consider foo, which is a monitor. You have a protected function, but you feel that you can release it for a while as it would improve parallelism. You can use it as such:
>>> @Monitor.synchronized >>> def protected_function(self): >>> .. do some stuff that needs mutual exclusion .. >>> with Monitor.release(self): >>> .. do some I/O that does not need mutual exclusion .. >>> .. back to protected stuff ..
- Parameters:
foo (Monitor) –
- foo¶
- classmethod synchronize_on(monitor)¶
A decorator for locking on non-self Monitor objects
Use it like:
>>> class MasterClass(Monitor): >>> def get_object(self): >>> class SlaveClass: >>> @Monitor.synchronize_on(self) >>> def get_object(self2): >>> ... >>> return SlaveClass
- Parameters:
monitor (Monitor) –
- Return type:
Callable[[Callable], Callable]
- static synchronize_on_attribute(attr_name)¶
When a Monitor is an attribute of a class, and you have a method instance that you would like secure by acquiring that monitor, use this.
The first argument taken by that method instance must be self.
- Parameters:
attr_name (str) – name of the attribute that is the monitor
- static synchronized(fun)¶
This is a decorator. Class method decorated with that will lock the global lock of given instance, making it threadsafe. Depending on usage pattern of your class and it’s data semantics, your performance may vary
- Parameters:
fun (Callable) –
- Return type:
Callable
- class satella.coding.concurrent.MonitorDict(*args, **kwargs)¶
Bases:
Generic
[K
,V
],UserDict
,Monitor
A dict that is also a monitor.
Note that access to it’s properties is not automatically synchronized, you got to invoke the monitor to implement an opportunistic locking of your own choice
- class satella.coding.concurrent.MonitorList(*args)¶
Bases:
Generic
[T
],UserList
,Monitor
A list that is also a monitor.
Note that access to it’s properties is not automatically synchronized, you got to invoke the monitor to implement an opportunistic locking of your own choice
- class satella.coding.concurrent.MonitorSet(*args)¶
Bases:
set
,Monitor
A set that allows atomic insert-if-not-already-there operation
- insert_and_check(item)¶
Perform an atomic insert if not already in set
- Parameters:
item – item to insert
- Returns:
whether the item was successfully inserted
- Return type:
bool
- class satella.coding.concurrent.PeekableQueue¶
Bases:
Generic
[T
]A thread-safe FIFO queue that supports peek()ing for elements.
- get(timeout=None)¶
Get an element.
- Parameters:
timeout (Optional[float]) – maximum amount of seconds to wait. Default value of None means wait as long as necessary
- Returns:
the item
- Raises:
Empty – queue was empty
- Return type:
T
- inserted_condition¶
- lock¶
- peek(timeout=None)¶
Get an element without removing it from the top of the queue.
- Parameters:
timeout (Optional[float]) – maximum amount of seconds to wait. Default value of None means wait as long as necessary
- Returns:
the item
- Raises:
WouldWaitMore – timeout has expired
- Return type:
T
- put(item)¶
Add an element to the queue
- Parameters:
item (T) – element to add
- Return type:
None
- put_many(items)¶
Put multiple items
- Parameters:
items (Sequence[T]) – sequence of items to put
- Return type:
None
- qsize()¶
Return the approximate size of the queue. Note, qsize() > 0 doesn’t guarantee that a subsequent get() will not block. :return: approximate size of the queue
- Return type:
int
- queue¶
- class satella.coding.concurrent.RMonitor¶
Bases:
Monitor
Monitor, but using an reentrant lock instead of a normal one
You need to invoke this at your constructor You can also use it to release locks of other objects.
- class satella.coding.concurrent.SequentialIssuer(start=0)¶
Bases:
Monitor
A classs that issues an monotonically increasing value.
- Parameters:
start (int) – start issuing IDs from this value
- Variables:
start – next value to be issued
You need to invoke this at your constructor You can also use it to release locks of other objects.
- issue()¶
Just issue a next identifier
- Returns:
a next identifier
- Return type:
int
- no_less_than(no_less_than)¶
Issue an int, which is no less than a given value
- Parameters:
no_less_than (int) – value that the returned id will not be less than this
- Returns:
an identifier, no less than no_less_than
- Return type:
int
- start¶
- class satella.coding.concurrent.SingleStartThread(*args, **kwargs)¶
Bases:
Thread
A thread that keeps track of whether it’s .start() method was called, and does nothing if it’s called second or so time.
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.
args is a list or tuple of arguments for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
- start()¶
No-op when called second or so time. The first time it starts the thread.
- Returns:
self
- Return type:
- class satella.coding.concurrent.TerminableThread(*args, terminate_on=None, **kwargs)¶
Bases:
Thread
Class that will execute something in a loop unless terminated. Use like:
>>> class MeGrimlock(TerminableThread): >>> def loop(self): >>> ... do your operations .. >>> a = MeGrimlock().start() >>> a.terminate().join()
Property to check whether to terminate is stored in self.terminating.
If you decide to override run(), you got to check periodically for self._terminating to become true. If it’s true, then a termination request was received, and the thread should terminate itself. If you decide to use the loop/cleanup interface, you don’t need to do so, because it will be automatically checked for you before each loop() call.
You may also use it as a context manager. Entering the context will start the thread, and exiting it will .terminate().join() it, in the following way:
>>> a = MeGrimlock() >>> with a: >>> ... >>> self.assertFalse(a.is_alive())
If prepare() throws one of the terminate_on exceptions,
loop()
even won’t be called. However,terminate()
will be automatically called then.Same applies for
IntervalTerminableThread
andCPUTimeAwareIntervalTerminableThread
.Note that this is called in the constructor’s thread. Use .prepare() to run statements that should be ran in new thread.
- Parameters:
terminate_on (Optional[Union[Type[Exception], Tuple[Type[Exception], ...]]]) – if provided, and
loop()
throws one of it, swallow it and terminate the thread by callingterminate()
. Note that the subclass check will be done via isinstance so you can use the metaclass magic :) Note that SystemExit will be automatically added to list of terminable exceptions.
- cleanup()¶
Called after thread non-forced termination, in the thread’s context.
The default implementation does nothing.
- loop()¶
Run one iteration of the loop. Meant to be overrided. You do not need to override it if you decide to override run() through.
This should block for as long as a single check will take, as termination checks take place between calls.
Note that if it throws one of the exceptions given in terminate_on this thread will terminate cleanly, whereas if it throws something else, the thread will be terminated with a traceback.
- Return type:
None
- prepare()¶
This is called before the .loop() looping loop is entered.
This is invoked already in a separate thread.
- Return type:
None
- run()¶
Calls self.loop() indefinitely, until terminating condition is met
- Return type:
None
- safe_sleep(interval, wake_up_each=2)¶
Sleep for interval, waking up each wake_up_each seconds to check if terminating, finish earlier if is terminating.
This will do the right thing when passed a negative interval.
To be invoked only by the thread that’s represented by the object!
- Parameters:
interval (float) – Time to sleep in total
wake_up_each (float) – Amount of seconds to wake up each
- Raises:
SystemExit – thread is terminating
- Return type:
None
- safe_wait_condition(condition, timeout, wake_up_each=2, dont_raise=False)¶
Wait for a condition, checking periodically if the thread is being terminated.
To be invoked only by the thread that’s represented by the object!
- Parameters:
condition (Condition) – condition to wait on
timeout (Union[str, float]) – maximum time to wait in seconds. Can be also a time string
wake_up_each (Union[str, float]) – amount of seconds to wake up each to check for termination. Can be also a time string.
dont_raise (bool) – if set to True,
WouldWaitMore
will not be raised
- Raises:
WouldWaitMore – timeout has passed and Condition has not happened
SystemExit – thread is terminating
- Return type:
None
- start()¶
Start the execution of this thread :return: this thread
- Return type:
- terminate(force=False)¶
Signal this thread to terminate.
Forcing, if requested, will be done by injecting a SystemExit exception into target thread, so the thread must acquire GIL. For example, following would not be interruptable:
>>> time.sleep(1000000)
Note that calling force=True on PyPy won’t work, and NotImplementedError will be raised instead.
- Parameters:
force (bool) – Whether to force a quit
- Returns:
self
- Raises:
RuntimeError – when something goes wrong with the underlying Python machinery
NotImplementedError – force=True was used on PyPy
- Return type:
- property terminating: bool¶
- Returns:
Is this thread either alive and trying to terminate or dead and after termination?
- class satella.coding.concurrent.ThreadCollection(threads=None)¶
Bases:
object
A collection of threads.
Create like:
>>> class MyThread(Thread): >>> def __init__(self, a): >>> ... >>> tc = ThreadCollection.from_class(MyThread, [2, 4, 5], daemon=True) >>> tc.start() >>> tc.terminate() >>> tc.join()
This also implements iteration (it will return all the threads in the collection) and length check. This also supports + and += operations for all thread collections, threads and iterables of threads.
- Parameters:
threads (Optional[Sequence[Thread]]) –
- add(thread)¶
Add a thread to the collection
- Parameters:
thread (Thread) – thread to add
- Returns:
this thread collection instance
- Return type:
- append(thread)¶
Alias for
add()
- Parameters:
thread (Thread) – thread to add
- Returns:
this thread collection instance
- Return type:
- property daemon: bool¶
Is any of the threads a daemon?
Also, when used a setter sets daemon attribute.
- classmethod from_class(cls_to_use, iteratable, **kwargs)¶
Build a thread collection
- Parameters:
cls_to_use – class to instantiate with
iteratable – an iterable with the sole argument to this class
- Return type:
- classmethod get_currently_running(include_main_thread=True)¶
Get all currently running threads as thread collection
- Parameters:
include_main_thread (bool) – whether to include the main thread
- Returns:
a thread collection representing all currently running threads
- Return type:
- is_alive()¶
Is at least one thread alive?
- Return type:
bool
- join(timeout=None)¶
Join all threads
- Parameters:
timeout (Optional[float]) – maximum time in seconds to wait for the threads to terminate. Note that the timeout will be applied to each thread in sequence, so this can block for up to thread_count*timeout seconds. Default value of None means wait as long as it’s necessary.
- Returns:
this thread collection instance
- Raises:
WouldWaitMore – one of the threads failed to terminate
- Return type:
- start()¶
Start all threads
- Returns:
this thread collection instance
- Return type:
- terminate(*args, **kwargs)¶
Call terminate() on all threads that have this method
- Returns:
this thread collection instance
- Return type:
- threads¶
- class satella.coding.concurrent.Timer(interval, function, args=None, kwargs=None, spawn_separate=False)¶
Bases:
object
A copy of threading.Timer but all objects are backed and waited upon in a single thread. They can be executed either in background monitor’s thread or a separate thread can be spawned for them.
There might be up to a second of delay before the timer is picked up.
n If spawn_separate is False, exceptions will be logged.
- param interval:
amount of seconds that should elapse between calling start() and function executing. Can be also a time string.
- param function:
function to execute
- param args:
argument for function
- param kwargs:
kwargs for function
- param spawn_separate:
whether to call the function in a separate thread
- Parameters:
interval (Union[str, float]) –
- args¶
- cancel()¶
Do not execute this timer
- Return type:
None
- cancelled¶
- execute_at¶
- function¶
- interval¶
- kwargs¶
- spawn_separate¶
- start()¶
Order this timer task to be executed in interval seconds
- Return type:
None
- class satella.coding.concurrent.WrappingFuture(source_future)¶
Bases:
Future
A Satella future wrapping an existing Python future.
Use like:
>> wrapped = WrappingFuture(existing_python_future)
Initializes the future. Should not be called by clients.
- Parameters:
source_future (Future) –
- cancel()¶
Cancel the future if possible.
Returns True if the future was cancelled, False otherwise. A future cannot be cancelled if it is running or has already completed.
- Return type:
bool
- set_running_or_notify_cancel()¶
Mark the future as running or process any cancel notifications.
Should only be used by Executor implementations and unit tests.
If the future has been cancelled (cancel() was called and returned True) then any threads waiting on the future completing (though calls to as_completed() or wait()) are notified and False is returned.
If the future was not cancelled then it is put in the running state (future calls to running() will return True) and True is returned.
This method should be called by Executor implementations before executing the work associated with this future. If this method returns False then the work should not be executed.
- Returns:
False if the Future was cancelled, True otherwise.
- Raises:
- RuntimeError: if this method was already called or if set_result()
or set_exception() was called.
- Return type:
bool
- satella.coding.concurrent.call_in_separate_thread(*t_args, no_thread_attribute=False, delay=0, **t_kwargs)¶
Decorator to mark given routine as callable in a separate thread.
The decorated routine will return a Future that is waitable to get the result (or the exception) of the function.
The returned Future will have an extra attribute, “thread” that is thread that was spawned for it. The returned thread will in turn have an attribute “future” that links to this future.
Warning
calling this will cause reference loops, so don’t use it if you’ve disabled Python GC, or in that case enable the no_thread_attribute argument
The arguments given here will be passed to thread’s constructor, so use like:
- Parameters:
no_thread_attribute (bool) – if set to True, future won’t have a link returned to it’s thread. The thread will have attribute of “future” anyways.
delay (float) – seconds to wait before launching function
>>> @call_in_separate_thread(daemon=True) >>> def handle_messages(): >>> while True: >>> ...
- satella.coding.concurrent.parallel_construct(iterable, function, thread_pool, span_title=None)¶
Construct a list from executing given function in a thread pool executor.
If opentracing is installed, and tracing is enabled, current span will be passed to child threads.
- Parameters:
iterable (Iterable[V]) – iterable to apply
function (Callable[[V], Optional[U]]) – function to apply. If that function returns None, no element will be added
thread_pool (ThreadPoolExecutor) – thread pool to execute
span_title (Optional[str]) – span title to create. For each execution a child span will be returned
- Returns:
list that is the result of parallel application of function on each element
- Return type:
List[U]
- satella.coding.concurrent.parallel_execute(callable_, args, kwargs=<generator object infinite_iterator>)¶
Execute a number of calls to callable in parallel.
Callable must be a function that accepts arguments and returns a plain Python future.
Return will be an iterator that will yield every value of the iterator, or return an instance of exception, if any of the calls excepted.
- Parameters:
callable – a callable that returns futures
args (Iterable[T]) – an iterable of arguments to provide to the callable
kwargs (Iterable[dict]) – an iterable of keyword arguments to provide to the callable
callable_ (Callable[[T], Future]) –
- Returns:
an iterator yielding every value (or exception instance if thew) of the future
- satella.coding.concurrent.run_as_future(fun)¶
A decorator that accepts a function that should be executed in a separate thread, and a Future returned instead of it’s result, that will enable to watch the function for completion.
The created thread will be non-demonic
Example usage:
>>> @run_as_future >>> def parse_a_file(x: str): >>> ... >>> fut = parse_a_file('test.txt') >>> result = fut.result()
- satella.coding.concurrent.sync_threadpool(tpe, max_wait=None)¶
Make sure that every thread of given thread pool executor is done processing jobs scheduled until this moment.
Make sure that other tasks do not submit anything to this thread pool executor.
- Parameters:
tpe (Union[ExecutorWrapper, ThreadPoolExecutor]) – thread pool executor to sync. Can be also a ExecutorWrapper.
max_wait (Optional[float]) – maximum time to wait. Default, None, means wait forever
- Raises:
WouldWaitMore – timeout exceeded. Raised only when max_wait is not None.
- Return type:
None