diff --git a/CHANGELOG.md b/CHANGELOG.md index 26b9cfc1e6ea7d8b12c58d251c0e389083c07dc4..fd274a66b39a7a9cbc75492310d40a47ed88b935 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1 +1,3 @@ -# v2.12.8 +# v2.13 + +* added the call hierarchy diff --git a/docs/coding/call_hierarchy.rst b/docs/coding/call_hierarchy.rst new file mode 100644 index 0000000000000000000000000000000000000000..3fd2a832db8bb411cfaa591b865b2d93707014fc --- /dev/null +++ b/docs/coding/call_hierarchy.rst @@ -0,0 +1,54 @@ +============== +Call hierarchy +============== + +Satella enables you to define function call and their trees programmatically, so you can +for example express such a condition "call a function C with given args when either call of function A with given args or function B with given args returned True". + +You can specify different argument sets for execution of such a tree, ie. you can provide different argument sets +and just tell your function to use the _i_-th one. + +It additionally supports optional parallelization of function calls, if given an Executor. + +.. autoclass:: satella.coding.call_hierarchy.Call + :members: + +.. autoclass:: satella.coding.call_hierarchy.CallWithArgumentSet + :members: + +.. autoclass:: satella.coding.call_hierarchy.CallIf + :members: + +.. autoclass:: satella.coding.call_hierarchy.Reduce + :members: + +.. autoclass:: satella.coding.call_hierarchy.ExecutionEnvironment + :members: + +You should run the callables in such a way + + :: + + def add(a, b): + print(a+b) + + call_1 = CallWithArgumentSet(add, 0) + ee = ExecutionEnvironment([((1, 2), {})]) + ee(call_1) + +but in a pinch you can just type + + :: + + def add(a, b): + print(a+b) + + call_1 = CallWithArgumentSet(add, 0) + call_1(1, 2) + +Note that you need to go through ExecutionEnvironment if you want to make use of parallelism. + +.. autofunction:: satella.coding.call_hierarchy.call_with_ee + + + diff --git a/docs/index.rst b/docs/index.rst index 164df20824ba7047123be81b1c14310afc90854a..d67537b5e090fd5572fa224cdfc3f88d971e7de8 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -16,6 +16,7 @@ Visit the project's page at GitHub_! coding/futures coding/structures coding/decorators + coding/call_hierarchy coding/predicates coding/concurrent coding/sequences diff --git a/satella/__init__.py b/satella/__init__.py index f2846c43f4d74daba474738bd7ff4c36c9438b42..945ac83d040c915ccb7ebf6c221c796fc9f41ef4 100644 --- a/satella/__init__.py +++ b/satella/__init__.py @@ -1 +1 @@ -__version__ = '2.12.8_a1' +__version__ = '2.13' diff --git a/satella/coding/call_hierarchy.py b/satella/coding/call_hierarchy.py new file mode 100644 index 0000000000000000000000000000000000000000..7e3d0d13b52f3a9aff3f3debd5fc2e8f42db0307 --- /dev/null +++ b/satella/coding/call_hierarchy.py @@ -0,0 +1,182 @@ +import threading +import logging +import typing as tp +from concurrent.futures import Executor + +from satella.coding.decorators.decorators import wraps + +local_ee = threading.local() +logger = logging.getLogger(__name__) + + +__all__ = ['Call', 'CallIf', 'CallWithArgumentSet', 'ExecutionEnvironment', 'call_with_ee'] + + +def before_call(fun): + @wraps(fun) + def inner(self, *args, **kwargs): + if not hasattr(local_ee, 'ee'): + v = call_with_ee(fun, ExecutionEnvironment([(args, kwargs)])) + return v(self, *args, **kwargs) + else: + return fun(self, *args, **kwargs) + return inner + + +class Call: + """ + A call to given function with a given set of arguments + """ + + def __init__(self, fn, *args, **kwargs): + self.fn = fn + self.args = args + self.kwargs = kwargs + + @before_call + def __call__(self, *args, **kwargs): + """ + Call this callable. + + If an execution environment is already defined, it will be used. If not, + a new execution environment will be defined with the 0-th set of arguments + as args, kwargs. + + :param args: args to use as the 0-th set of arguments + :param kwargs: kwargs to use as the 0-th set of arguments + :return: return value + """ + return self.fn(*self.args, **self.kwargs) + + +class CallWithArgumentSet(Call): + """ + Call a function with a set of arguments provided by the environment + """ + def __init__(self, fn, arg_set_no: int = 0): + self.fn = fn + self.arg_set_no = arg_set_no + + @before_call + def __call__(self, *args, **kwargs): + try: + ee = local_ee.ee + except AttributeError: + raise RuntimeError('Execution environment is required!') + args, kwargs = ee[self.arg_set_no] + v = self.fn(*args, **kwargs) + return v + + +class CallIf(Call): + """ + Call a function only if fn_if_call returned True + """ + def __init__(self, fn_if_call: Call, fn_to_call: Call): + self.fn_to_call = fn_to_call + self.fn_call_if = fn_if_call + + @before_call + def __call__(self, *args, **kwargs): + if self.fn_call_if(): + return self.fn_to_call() + + +class Reduce(Call): + """ + A call consisting of calling other calls (possibly in parallel). + + It's result will be the combination of some other function calls by a given operator, + starting with a starting value. + + By default the starting operator just discards the results. + + :param callables: callables to call in parallel + :param reducing_op: a callable/2 that takes previous result (or the starting value) and current + callable result, returning a new starting value + :param starting_value: starting value + :param do_parallel: whether try to execute these calls in parallel, if possible. + Parallel execution will be done only if an executor is given in the execution environment. + """ + def __init__(self, *callables: Call, + reducing_op: tp.Callable[[tp.Any, tp.Any], tp.Any] = lambda a, b: None, + starting_value: tp.Any = 0, + do_parallel: bool = True): + self.reducing_op = reducing_op + self.starting_value = starting_value + self.do_parallel = do_parallel + self.callables = callables + + @before_call + def __call__(self, *args, **kwargs): + if self.do_parallel: + if local_ee.ee.executor is not None: + executor = local_ee.ee.executor + sv = self.starting_value + futures = [executor.submit(call_with_ee(callable_, local_ee.ee)) + for callable_ in self.callables] + for future in futures: + sv = self.reducing_op(sv, future.result()) + return sv + sv = self.starting_value + for callable_ in self.callables: + b = callable_() + sv = self.reducing_op(sv, b) + return sv + + +class ExecutionEnvironment: + __slots__ = ('arg_sets', 'executor') + + def __init__(self, argument_sets: tp.Iterable[tp.Tuple[tp.Tuple[tp.Any], tp.Dict]], + executor: tp.Optional[Executor] = None): + self.arg_sets = [] + for args, kwargs in argument_sets: + self.arg_sets.append((args, kwargs)) + self.executor = executor + + def __call__(self, callable_: Call, *args, **kwargs): + """ + Run a given callable within the current EE. + + :param callable_: callable to run + :return: value returned by that callable + """ + had_ee = hasattr(local_ee, 'ee') + if had_ee: + prev_ee = local_ee.ee + local_ee.ee = self + v = callable_(*args, **kwargs) + if had_ee: + local_ee.ee = prev_ee + else: + del local_ee.ee + return v + + def __getitem__(self, item: int) -> tp.Tuple[tp.Tuple, tp.Dict]: + """Return the n-th argument set""" + v = self.arg_sets[item] + return v + + +def call_with_ee(callable_: tp.Callable, ee: ExecutionEnvironment) -> tp.Callable: + """ + Return a callable that will invoke the target callable with specified execution environment, + but only if an EE is not defined right now. + + To explicitly provide an execution environment use this: + + >>> call_1 = Call(...) + >>> ee = ExecutionEnvironment() + >>> ee(call_1) + + :param callable_: callable to invoke + :param ee: execution environment to use + :return: a new callable + """ + def inner(*args, **kwargs): + if not hasattr(local_ee, 'ee'): + return ee(callable_, *args, **kwargs) + else: + return callable_(*args, **kwargs) + return inner diff --git a/tests/test_coding/test_call_hierarchy.py b/tests/test_coding/test_call_hierarchy.py new file mode 100644 index 0000000000000000000000000000000000000000..f21ffb2f29f3ce7bee3ddda7cb3383f10ea44263 --- /dev/null +++ b/tests/test_coding/test_call_hierarchy.py @@ -0,0 +1,57 @@ +import unittest +from concurrent.futures.thread import ThreadPoolExecutor + +from satella.coding.call_hierarchy import Call, CallWithArgumentSet, Reduce, CallIf, \ + ExecutionEnvironment + + +class TestCallHierarchy(unittest.TestCase): + def test_call(self): + call = Call(lambda: 5) + self.assertEqual(call(), 5) + + def test_exec_parallel(self): + arg_sets = [] + + def mult(y): + return y*2 + + for value in [1, 2, 3, 4, 5, 6, 7]: + arg_sets.append(((value, ), {})) + + calls = [] + for i, _ in enumerate(arg_sets): + calls.append(CallWithArgumentSet(mult, i)) + call_v = Reduce(*calls, reducing_op=lambda a, b: a+b, starting_value=0) + tpe = ThreadPoolExecutor(max_workers=4) + ee = ExecutionEnvironment(arg_sets, tpe) + self.assertEqual(ee(call_v), 56) + + def test_arg_sets(self): + def add(a, b): + return a+b + + call1 = CallWithArgumentSet(add, 0) + call2 = CallWithArgumentSet(add, 1) + call_v = Reduce(call1, call2, reducing_op=lambda a, b: a+b, starting_value=0) + ee = ExecutionEnvironment([((1, 2), {}), ((3, 4), {})]) + self.assertEqual(ee(call_v), 10) + + def test_call_if(self): + a = {'test': True, 'b': 0} + + def a_is_true(): + nonlocal a + return a['test'] + + def incr_b(): + nonlocal a + a['b'] += 1 + + call_if_true = CallIf(a_is_true, incr_b) + + call_if_true() + self.assertEqual(a['b'], 1) + a['test'] = False + call_if_true() + self.assertEqual(a['b'], 1)