diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d6aab57c374581db126d5c960d4247a01ef6c1b..527cc3a59b85a442ea40be1e5de7086091da65bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1 +1,6 @@ # v2.11.17 + +* added `wrap_future` +* refactored `satella.cassandra` +* deprecated tracing Cassandra's ResponseFutures directly + diff --git a/docs/cassandra.rst b/docs/cassandra.rst index 4f8552e81b5afa54b55178b0ca2449ba1f75e437..051c6eadca998dae465ac27025c55cd362c7bbef 100644 --- a/docs/cassandra.rst +++ b/docs/cassandra.rst @@ -3,6 +3,11 @@ Cassandra **This module is available only if you have cassandra-driver installed** +wrap_future +----------- + +.. autofunction:: satella.cassandra.wrap_future + parallel_for ------------ diff --git a/satella/__init__.py b/satella/__init__.py index 2e93560851716fcff0e30b26353ff152539a19e7..7a3edefea6b97fe9c261aa0050672aed1ab52beb 100644 --- a/satella/__init__.py +++ b/satella/__init__.py @@ -1 +1 @@ -__version__ = '2.11.17_a1' +__version__ = '2.11.17_a2' diff --git a/satella/cassandra/__init__.py b/satella/cassandra/__init__.py index 9791250702cb47a1e98419237429d356d03e5a6b..f5afbe58ed33f079b6d771b3af3bbef0f64c7d79 100644 --- a/satella/cassandra/__init__.py +++ b/satella/cassandra/__init__.py @@ -1,50 +1,4 @@ -import itertools -import typing as tp -from collections import namedtuple +from .parallel import parallel_for +from .future import wrap_future - -def parallel_for(cursor, query: tp.Union[tp.List[str], str, 'Statement', tp.List['Statement']], - arguments: tp.Iterable[tuple]) -> tp.Iterator[namedtuple]: - """ - Syntactic sugar for - - >>> futures = [] - >>> for args in arguments: - >>> futures.append(cursor.execute_async(query, args)) - >>> for future in futures: - >>> yield future.result() - - If query is a string or a Cassandra Statement, or else - - >>> futures = [] - >>> for query, args in zip(query, arguments): - >>> futures.append(cursor.execute_async(query, args)) - >>> for future in futures: - >>> yield future.result() - - Note that if None is encountered in the argument iterable, session.execute() will - be called with a single argument. You better have it as a BoundStatement then! - - :param cursor: the Cassandra cursor to use (obtained using connection.session()) - :param query: base query or a list of queries, if a different one is to be used - :param arguments: iterable yielding arguments to use in execute_async - """ - try: - from cassandra.query import Statement - query_classes = (str, Statement) - except ImportError: - query_classes = str - - if isinstance(query, query_classes): - query = itertools.repeat(query) - - futures = [] - for query, args in zip(query, arguments): - if args is None: - future = cursor.execute_async(query) - else: - future = cursor.execute_async(query, args) - futures.append(future) - - for future in futures: - yield future.result() +__all__ = ['wrap_future', 'parallel_for'] diff --git a/satella/cassandra/common.py b/satella/cassandra/common.py new file mode 100644 index 0000000000000000000000000000000000000000..448f63edaf5cd05c0fe0c891fbb398b4511e4593 --- /dev/null +++ b/satella/cassandra/common.py @@ -0,0 +1,5 @@ +try: + from cassandra.cluster import ResponseFuture +except ImportError: + class ResponseFuture: + pass diff --git a/satella/cassandra/future.py b/satella/cassandra/future.py new file mode 100644 index 0000000000000000000000000000000000000000..1a3f9212a2432b4669866ce66234257783c58ae3 --- /dev/null +++ b/satella/cassandra/future.py @@ -0,0 +1,19 @@ +from concurrent.futures import Future + +from .common import ResponseFuture + + +def wrap_future(future: ResponseFuture) -> Future: + """ + Convert a Cassandra's future to a normal Python future. + + :param future: cassandra future to wrap + :return: a standard Python future + """ + + fut = Future() + fut.set_running_or_notify_cancel() + future.add_callback(lambda result: fut.set_result(result)) + future.add_errback(lambda exception: fut.set_exception(exception)) + return fut + diff --git a/satella/cassandra/parallel.py b/satella/cassandra/parallel.py new file mode 100644 index 0000000000000000000000000000000000000000..9791250702cb47a1e98419237429d356d03e5a6b --- /dev/null +++ b/satella/cassandra/parallel.py @@ -0,0 +1,50 @@ +import itertools +import typing as tp +from collections import namedtuple + + +def parallel_for(cursor, query: tp.Union[tp.List[str], str, 'Statement', tp.List['Statement']], + arguments: tp.Iterable[tuple]) -> tp.Iterator[namedtuple]: + """ + Syntactic sugar for + + >>> futures = [] + >>> for args in arguments: + >>> futures.append(cursor.execute_async(query, args)) + >>> for future in futures: + >>> yield future.result() + + If query is a string or a Cassandra Statement, or else + + >>> futures = [] + >>> for query, args in zip(query, arguments): + >>> futures.append(cursor.execute_async(query, args)) + >>> for future in futures: + >>> yield future.result() + + Note that if None is encountered in the argument iterable, session.execute() will + be called with a single argument. You better have it as a BoundStatement then! + + :param cursor: the Cassandra cursor to use (obtained using connection.session()) + :param query: base query or a list of queries, if a different one is to be used + :param arguments: iterable yielding arguments to use in execute_async + """ + try: + from cassandra.query import Statement + query_classes = (str, Statement) + except ImportError: + query_classes = str + + if isinstance(query, query_classes): + query = itertools.repeat(query) + + futures = [] + for query, args in zip(query, arguments): + if args is None: + future = cursor.execute_async(query) + else: + future = cursor.execute_async(query, args) + futures.append(future) + + for future in futures: + yield future.result() diff --git a/satella/opentracing/trace.py b/satella/opentracing/trace.py index fd11edfffdea7a5975553133bb804819483b8cd5..52ae0d6c4b6cd0f5866c38be03f2d1e9225a0eeb 100644 --- a/satella/opentracing/trace.py +++ b/satella/opentracing/trace.py @@ -1,6 +1,10 @@ import typing as tp +import sys +import warnings from concurrent.futures import Future +from ..cassandra.future import wrap_future +from ..cassandra.common import ResponseFuture from satella.coding.decorators import wraps try: @@ -9,12 +13,6 @@ except ImportError: class Span: pass -try: - from cassandra.cluster import ResponseFuture -except ImportError: - class ResponseFuture: - pass - def trace_function(tracer, name: str, tags: tp.Optional[dict] = None): """ @@ -43,18 +41,17 @@ def trace_future(future: tp.Union[ResponseFuture, Future], span: Span): :param span: span to close on future's completion """ if isinstance(future, ResponseFuture): - def close_exception(exc): + warnings.warn('Tracing Cassandra futures is deprecated. Use wrap_future() to ' + 'convert it to a standard Python future. This feature will be ' + 'deprecated in Satella 3.x', DeprecationWarning) + future = wrap_future(future) + + def close_future(fut): + exc = fut.exception() + if exc is not None: # noinspection PyProtectedMember - Span._on_error(span, type(exc), exc, '<unavailable>') - span.finish() - - future.add_callback(span.finish) - future.add_errback(close_exception) - else: - def close_future(fut): - exc = fut.exception() - if exc is not None: - # noinspection PyProtectedMember - Span._on_error(span, type(exc), exc, '<unavailable>') - span.finish() - future.add_done_callback(close_future) + exc_type, value, traceback = sys.exc_info() + Span._on_error(span, exc_type, value, traceback) + span.finish() + + future.add_done_callback(close_future) diff --git a/tests/test_cassandra.py b/tests/test_cassandra.py index 1e59041a5c9187be06366f0754178210f280f278..fac49118896373876dcee0ba42574164af8ff1a2 100644 --- a/tests/test_cassandra.py +++ b/tests/test_cassandra.py @@ -1,8 +1,51 @@ -from satella.cassandra import parallel_for +from satella.coding.concurrent import CallableGroup + +from satella.cassandra import parallel_for, wrap_future import unittest class TestCassandra(unittest.TestCase): + def test_wrap_future(self): + class MockCassandraFuture: + def __init__(self): + self.value = None + self.callbacks = CallableGroup() + self.errbacks = CallableGroup() + + def add_callback(self, callback): + self.callbacks.add(callback) + + def add_errback(self, errback): + self.errbacks.add(errback) + + def set_result(self, x): + self.value = x + if isinstance(x, Exception): + self.errbacks(x) + else: + self.callbacks(x) + + mcf = MockCassandraFuture() + wrapped = wrap_future(mcf) + a = {} + + def on_done(fut): + if fut.exception() is None: + a['success'] = True + else: + a['failure'] = True + + wrapped.add_done_callback(on_done) + + mcf.set_result(None) + self.assertTrue(a['success']) + + mcf = MockCassandraFuture() + wrapped = wrap_future(mcf) + wrapped.add_done_callback(on_done) + mcf.set_result(Exception()) + self.assertTrue(a['failure']) + def test_parallel_for(self): class Cursor: def __init__(self):