diff --git a/CHANGELOG.md b/CHANGELOG.md index cdc9ff3f2e9854fd728789518039a72ab04558f2..e048783a204866246ff68e054efdadb395b847b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ # v2.21.4 -* moar tests for CPManager \ No newline at end of file +* moar tests for CPManager +* added tainting \ No newline at end of file diff --git a/docs/debug/tainting.rst b/docs/debug/tainting.rst new file mode 100644 index 0000000000000000000000000000000000000000..5fdcc9a64dd740030eaa2ceafd743eacaabb23e7 --- /dev/null +++ b/docs/debug/tainting.rst @@ -0,0 +1,26 @@ +Tainting +======== + +Some times you need to register a handle +for your thread to monitor whether some +values have been used or not. All interactions with this +variable will infect your source, so you need a way to find them +and elliminate them. Satella's tainting is at your disposal. + +.. warning:: Tainting won't work correctly on earlier versions + than Python 3.7 due to the fact that per-opcode tracing was + added there. + +.. autoclass:: satella.debug.TaintingEnvironment + :members: + +.. autofunction:: satella.debug.taint + +.. autofunction:: satella.debug.access_tainted + +There is a class that is a thin proxy at the objects that +you taint, namely: + +.. autoclass:: satella.debug.TaintedObject + +This works rather similar to :class:`satella.coding.structures.Proxy`. diff --git a/docs/index.rst b/docs/index.rst index d8d58cad0768e7b0c0bf1fcfc8ea849e0e788c85..2ffd07412087b8384ad899c24d8bcfb46e6f50f0 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -22,6 +22,7 @@ Visit the project's page at GitHub_! coding/sequences coding/transforms coding/typing + debug/tainting instrumentation/cpu_time instrumentation/traceback instrumentation/memory diff --git a/satella/coding/concurrent/monitor.py b/satella/coding/concurrent/monitor.py index c2ae7259970ed09b4c8be36b4df60baeee4a21ed..2dc44cec3e8599c740bc23a1969f33f87f285bb1 100644 --- a/satella/coding/concurrent/monitor.py +++ b/satella/coding/concurrent/monitor.py @@ -99,7 +99,7 @@ class Monitor: >>> def protected_function(self): >>> .. do some stuff that needs mutual exclusion .. >>> with Monitor.release(self): - >>> .. do some I/O that doesn't need mutual exclusion .. + >>> .. do some I/O that does not need mutual exclusion .. >>> .. back to protected stuff .. """ __slots__ = ('foo',) diff --git a/satella/coding/sequences/iterators.py b/satella/coding/sequences/iterators.py index f3800cdb5dbe9a0d843e74deca33250d15f46c9f..0981486eefded653b2227168a261f20d5e06ff20 100644 --- a/satella/coding/sequences/iterators.py +++ b/satella/coding/sequences/iterators.py @@ -448,7 +448,7 @@ def zip_shifted(*args: tp.Union[Iteratable, tp.Tuple[Iteratable, int]]) -> \ warnings.warn('This is deprecated and will be removed in Satella 3.0. ' 'Use zip(shift(...)) instead!', DeprecationWarning) - iterators = [] # type: tp.List[tp.Union[tp.Tuple[tp.Iterator[T], tp.List[T]], tp.Iterator[T]] + iterators = [] # type: tp.List[tp.Union[tp.Tuple[tp.Iterator[T], tp.List[T]], tp.Iterator[T]]] for row in args: if not isinstance(row, tuple): iterators.append(row) diff --git a/satella/debug/__init__.py b/satella/debug/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..1de7fcd9350f82a6777e25d8e0f30194d45bd940 --- /dev/null +++ b/satella/debug/__init__.py @@ -0,0 +1,3 @@ +from .tainting import TaintedObject, taint, access_tainted, TaintingEnvironment + +__all__ = ['TaintingEnvironment', 'TaintedObject', 'taint', 'access_tainted'] diff --git a/satella/debug/tainting/__init__.py b/satella/debug/tainting/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..342dc390da88aa4188124254daf0d87396aa0547 --- /dev/null +++ b/satella/debug/tainting/__init__.py @@ -0,0 +1,4 @@ +from .environment import TaintingEnvironment +from .tainteds import TaintedObject, taint, access_tainted + +__all__ = ['TaintingEnvironment', 'TaintedObject', 'taint', 'access_tainted'] diff --git a/satella/debug/tainting/environment.py b/satella/debug/tainting/environment.py new file mode 100644 index 0000000000000000000000000000000000000000..57d2cdde81d58352b31df3e71055fe69426951cd --- /dev/null +++ b/satella/debug/tainting/environment.py @@ -0,0 +1,94 @@ +from __future__ import annotations + +import dis +import inspect +import sys +import threading +import gc +import typing as tp +import warnings + +from satella.coding.typing import T + +from .tainteds import TaintedObject, access_tainted, taint + +local = threading.local() + +RET_VAL = dis.opmap['RETURN_VALUE'] + + +class TaintingEnvironment: + """ + A round of tainting. Taints will be invalidated at the end. + Basically any Python opcode that relates to tainting will + be tainted itself. + + As an added extra, function values will be tainted as well if at least one of the parameters has been tainted + + Use like this: + + >>> with TaintingEnvironment() as env: + >>> a = taint(a) + >>> b = a + 5 + >>> ... + + .. warning:: Using environment tainting will slow down your Python scripts since they install a per opcode handler. + + .. warning:: Using functions that take at least one tainted argument is supposed to return a tainted result, + but I couldn't get that shipped in time at this involved some serious tweaking with the Python bytecode. + """ + __slots__ = 'enabled', 'old_python' + + def __init__(self): + vi = sys.version_info + self.old_python = False + if vi.major >= 3 and vi.minor < 7: + warnings.warn( + 'Due to an old Python being used, altering function return values to be tainted will not work', + UserWarning) + self.old_python = True + elif sys.implementation.name != 'cpython': + warnings.warn( + 'You are not using CPython. Since this library will eventually involve some tweaking with the Python ' + 'bytecode, if your runtime does not provide this level of compatibility, it will not work.', + UserWarning) + + self.enabled = False + + def __enter__(self) -> TaintingEnvironment: + """ + Register itself as the current tainting environment + + :raises RuntimeError: there is already a tainting session in progress + """ + self.enabled = True + global local + if hasattr(local, 'satella_tainting'): + raise RuntimeError('Other tainting session in progress') + local.satella_tainting = self + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + del local.satella_tainting + self.enabled = False + return False + + @staticmethod + def get_session_in_progress() -> TaintingEnvironment: + """ + Obtain current tainting session in progress + + :raises RuntimeError: no tainting session started yet + """ + if not hasattr(local, 'satella_tainting'): + raise RuntimeError('You require a session in progress to do that') + return local.satella_tainting + + def get_tainted_variables(self) -> tp.Iterator[T]: + """ + Return all, of the tainted variables + """ + for obj in gc.get_referrers(self): + if not isinstance(obj, TaintedObject): + continue + yield access_tainted(obj) diff --git a/satella/debug/tainting/tainteds.py b/satella/debug/tainting/tainteds.py new file mode 100644 index 0000000000000000000000000000000000000000..d268900c081fc8a7d71b8872ed30cd9639ed2111 --- /dev/null +++ b/satella/debug/tainting/tainteds.py @@ -0,0 +1,189 @@ +from __future__ import annotations +import functools +import typing as tp + +from satella.coding.typing import T + + +def might_accept_tainted(taint_result: bool = False): + """ + For methods that might accept a tainted value during execution + + This both unpacks your first argument if is was a TaintedObject, you'll receive it's value + :param taint_result: result will be tainted + """ + def outer(fun): + @functools.wraps(fun) + def inner(self, *args): + if len(args) > 0: + args = access_tainted(args[0]), *args[1:] + p = fun(self, *args) + if taint_result: + return taint(p) + else: + return p + return inner + return outer + + +class TaintedObject(tp.Generic[T]): + __slots__ = '__environment', '__v' + + def __new__(cls, v): + if isinstance(v, TaintedObject): + return v + return object.__new__(cls) + + def __init__(self, v: T): + from .environment import TaintingEnvironment + if isinstance(v, TaintedObject): + # all is already set correctly for this object and it will be returned + return + self.__environment = TaintingEnvironment.get_session_in_progress() + self.__v = v + if isinstance(v, tp.MutableSequence): + for i in range(len(v)): + self.__v[i] = taint(v[i]) + else: + try: + if isinstance(v, tp.MutableMapping): + dict_to_transform = v + else: + dict_to_transform = v.__dict__ + for key in dict_to_transform: + dict_to_transform[key] = taint(dict_to_transform[key]) + except AttributeError: + pass # must have been a primitive type + + def __setattr__(self, key: str, value: tp.Any): + if key == '_TaintedObject__environment' or key == '_TaintedObject__v': + super().__setattr__(key, value) + else: + super().__setattr__(key, taint(value)) + + def __iter__(self) -> tp.Iterator[TaintedObject[T]]: + for item in self.__v: + yield taint(item) + + def __next__(self) -> TaintedObject[T]: + return taint(next(self.__v)) + + @might_accept_tainted() + def __str__(self) -> str: # it must be a str otherwise Python will complain + return str(self.__v) + + def __call__(self, *args, **kwargs): + return taint(self.__v(*args, **kwargs)) + + @might_accept_tainted(taint_result=True) + def __repr__(self) -> str: + return '%s TAINTED' % str(self.__v) + + @might_accept_tainted(taint_result=True) + def __getattr__(self, item: str) -> TaintedObject[T]: + return getattr(self.__v, item) + + def __len__(self) -> int: # This has to return an int otherwise Python will break + return len(self.__v) + + def __bool__(self) -> bool: # This has to return a bool otherwise Python will break + return bool(self.__v) + + def __int__(self) -> int: + return int(self.__v) + + def __float__(self) -> float: + return float(self.__v) + + def __enter__(self): + return taint(self.__v.__enter__()) + + def __exit__(self, exc_type, exc_val, exc_tb): + return self.__v.__exit__(exc_type, exc_val, exc_tb) + + @might_accept_tainted(taint_result=True) + def __eq__(self, other) -> bool: + return self.__v == other + + @might_accept_tainted(taint_result=True) + def __gt__(self, other) -> bool: + return self.__v > other + + @might_accept_tainted(taint_result=True) + def __ge__(self, other) -> bool: + return self.__v >= other + + @might_accept_tainted(taint_result=True) + def __lt__(self, other) -> bool: + return self.__v < other + + @might_accept_tainted(taint_result=True) + def __le__(self, other) -> bool: + return self.__v <= other + + @might_accept_tainted(taint_result=True) + def __add__(self, other) -> TaintedObject: + return self.__v + other + + @might_accept_tainted(taint_result=True) + def __radd__(self, other) -> TaintedObject: + return other + self.__v + + @might_accept_tainted() + def __iadd__(self, other) -> TaintedObject: + self.__v += other + return self + + @might_accept_tainted(taint_result=True) + def __sub__(self, other) -> TaintedObject: + return self.__v - other + + @might_accept_tainted() + def __isub__(self, other) -> TaintedObject: + self.__v -= other + return self + + @might_accept_tainted(taint_result=True) + def __mul__(self, other) -> TaintedObject: + return self.__v * other + + @might_accept_tainted() + def __imul__(self, other) -> TaintedObject: + self.__v *= other + return self + + def __dir__(self) -> tp.Iterable[str]: + return dir(self.__v) + + def __hash__(self) -> int: + return hash(self.__v) + + def __getitem__(self, item: int) -> TaintedObject[T]: + return taint(self.__v[item]) + + def __setitem__(self, key, value: T) -> None: + self.__v[key] = taint(value) + + def __delitem__(self, key: str) -> None: + del self.__v[key] + + +def access_tainted(v: tp.Union[T, TaintedObject[T]]) -> T: + """ + If v is tainted, this will extract it's value. + + If it is not, v will be returned + """ + if not isinstance(v, TaintedObject): + return v + return getattr(v, '_TaintedObject__v') + + +def taint(v: T) -> TaintedObject[T]: + """ + Taints the object if necessary. If already tainted will leave it as is + + :raises RuntimeError: no tainting session in progress + """ + return v if isinstance(v, TaintedObject) else TaintedObject(v) + diff --git a/satella/time/measure.py b/satella/time/measure.py index 7b0433f3ed60f92e15029afecbd5c0d2fe5d9c2c..b56dcba4637883514faaacd6a5ed134c39648d7f 100644 --- a/satella/time/measure.py +++ b/satella/time/measure.py @@ -1,3 +1,4 @@ +from __future__ import annotations from ..exceptions import WouldWaitMore import typing as tp import time diff --git a/tests/test_tainting.py b/tests/test_tainting.py new file mode 100644 index 0000000000000000000000000000000000000000..ee89e3e3300fe94cbde706f30d4b57e766283ec3 --- /dev/null +++ b/tests/test_tainting.py @@ -0,0 +1,21 @@ +import unittest + +from satella.debug import TaintedObject, TaintingEnvironment + + +class TestTainting(unittest.TestCase): + def test_declare(self): + self.assertRaises(RuntimeError, lambda: TaintedObject(5)) + + with TaintingEnvironment() as env: + a = 5 + b = TaintedObject(a) + c = TaintedObject(b) + self.assertIs(b, c) + + def taint_me(a, b, c): + return a+b+c + d = taint_me(a, b, c) + self.assertIsInstance(d, TaintedObject) + self.assertEqual(d, 15) + self.assertIn(15, list(env.get_tainted_variables()))