Skip to content
Snippets Groups Projects
Commit b0681fef authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

v2.11.17_a2

* added `wrap_future`
* refactored `satella.cassandra`
* deprecated tracing Cassandra's ResponseFutures directly
parent bc690d76
No related branches found
No related tags found
No related merge requests found
# v2.11.17 # v2.11.17
* added `wrap_future`
* refactored `satella.cassandra`
* deprecated tracing Cassandra's ResponseFutures directly
...@@ -3,6 +3,11 @@ Cassandra ...@@ -3,6 +3,11 @@ Cassandra
**This module is available only if you have cassandra-driver installed** **This module is available only if you have cassandra-driver installed**
wrap_future
-----------
.. autofunction:: satella.cassandra.wrap_future
parallel_for parallel_for
------------ ------------
......
__version__ = '2.11.17_a1' __version__ = '2.11.17_a2'
import itertools from .parallel import parallel_for
import typing as tp from .future import wrap_future
from collections import namedtuple
__all__ = ['wrap_future', 'parallel_for']
def parallel_for(cursor, query: tp.Union[tp.List[str], str, 'Statement', tp.List['Statement']],
arguments: tp.Iterable[tuple]) -> tp.Iterator[namedtuple]:
"""
Syntactic sugar for
>>> futures = []
>>> for args in arguments:
>>> futures.append(cursor.execute_async(query, args))
>>> for future in futures:
>>> yield future.result()
If query is a string or a Cassandra Statement, or else
>>> futures = []
>>> for query, args in zip(query, arguments):
>>> futures.append(cursor.execute_async(query, args))
>>> for future in futures:
>>> yield future.result()
Note that if None is encountered in the argument iterable, session.execute() will
be called with a single argument. You better have it as a BoundStatement then!
:param cursor: the Cassandra cursor to use (obtained using connection.session())
:param query: base query or a list of queries, if a different one is to be used
:param arguments: iterable yielding arguments to use in execute_async
"""
try:
from cassandra.query import Statement
query_classes = (str, Statement)
except ImportError:
query_classes = str
if isinstance(query, query_classes):
query = itertools.repeat(query)
futures = []
for query, args in zip(query, arguments):
if args is None:
future = cursor.execute_async(query)
else:
future = cursor.execute_async(query, args)
futures.append(future)
for future in futures:
yield future.result()
try:
from cassandra.cluster import ResponseFuture
except ImportError:
class ResponseFuture:
pass
from concurrent.futures import Future
from .common import ResponseFuture
def wrap_future(future: ResponseFuture) -> Future:
"""
Convert a Cassandra's future to a normal Python future.
:param future: cassandra future to wrap
:return: a standard Python future
"""
fut = Future()
fut.set_running_or_notify_cancel()
future.add_callback(lambda result: fut.set_result(result))
future.add_errback(lambda exception: fut.set_exception(exception))
return fut
import itertools
import typing as tp
from collections import namedtuple
def parallel_for(cursor, query: tp.Union[tp.List[str], str, 'Statement', tp.List['Statement']],
arguments: tp.Iterable[tuple]) -> tp.Iterator[namedtuple]:
"""
Syntactic sugar for
>>> futures = []
>>> for args in arguments:
>>> futures.append(cursor.execute_async(query, args))
>>> for future in futures:
>>> yield future.result()
If query is a string or a Cassandra Statement, or else
>>> futures = []
>>> for query, args in zip(query, arguments):
>>> futures.append(cursor.execute_async(query, args))
>>> for future in futures:
>>> yield future.result()
Note that if None is encountered in the argument iterable, session.execute() will
be called with a single argument. You better have it as a BoundStatement then!
:param cursor: the Cassandra cursor to use (obtained using connection.session())
:param query: base query or a list of queries, if a different one is to be used
:param arguments: iterable yielding arguments to use in execute_async
"""
try:
from cassandra.query import Statement
query_classes = (str, Statement)
except ImportError:
query_classes = str
if isinstance(query, query_classes):
query = itertools.repeat(query)
futures = []
for query, args in zip(query, arguments):
if args is None:
future = cursor.execute_async(query)
else:
future = cursor.execute_async(query, args)
futures.append(future)
for future in futures:
yield future.result()
import typing as tp import typing as tp
import sys
import warnings
from concurrent.futures import Future from concurrent.futures import Future
from ..cassandra.future import wrap_future
from ..cassandra.common import ResponseFuture
from satella.coding.decorators import wraps from satella.coding.decorators import wraps
try: try:
...@@ -9,12 +13,6 @@ except ImportError: ...@@ -9,12 +13,6 @@ except ImportError:
class Span: class Span:
pass pass
try:
from cassandra.cluster import ResponseFuture
except ImportError:
class ResponseFuture:
pass
def trace_function(tracer, name: str, tags: tp.Optional[dict] = None): def trace_function(tracer, name: str, tags: tp.Optional[dict] = None):
""" """
...@@ -43,18 +41,17 @@ def trace_future(future: tp.Union[ResponseFuture, Future], span: Span): ...@@ -43,18 +41,17 @@ def trace_future(future: tp.Union[ResponseFuture, Future], span: Span):
:param span: span to close on future's completion :param span: span to close on future's completion
""" """
if isinstance(future, ResponseFuture): if isinstance(future, ResponseFuture):
def close_exception(exc): warnings.warn('Tracing Cassandra futures is deprecated. Use wrap_future() to '
'convert it to a standard Python future. This feature will be '
'deprecated in Satella 3.x', DeprecationWarning)
future = wrap_future(future)
def close_future(fut):
exc = fut.exception()
if exc is not None:
# noinspection PyProtectedMember # noinspection PyProtectedMember
Span._on_error(span, type(exc), exc, '<unavailable>') exc_type, value, traceback = sys.exc_info()
span.finish() Span._on_error(span, exc_type, value, traceback)
span.finish()
future.add_callback(span.finish)
future.add_errback(close_exception) future.add_done_callback(close_future)
else:
def close_future(fut):
exc = fut.exception()
if exc is not None:
# noinspection PyProtectedMember
Span._on_error(span, type(exc), exc, '<unavailable>')
span.finish()
future.add_done_callback(close_future)
from satella.cassandra import parallel_for from satella.coding.concurrent import CallableGroup
from satella.cassandra import parallel_for, wrap_future
import unittest import unittest
class TestCassandra(unittest.TestCase): class TestCassandra(unittest.TestCase):
def test_wrap_future(self):
class MockCassandraFuture:
def __init__(self):
self.value = None
self.callbacks = CallableGroup()
self.errbacks = CallableGroup()
def add_callback(self, callback):
self.callbacks.add(callback)
def add_errback(self, errback):
self.errbacks.add(errback)
def set_result(self, x):
self.value = x
if isinstance(x, Exception):
self.errbacks(x)
else:
self.callbacks(x)
mcf = MockCassandraFuture()
wrapped = wrap_future(mcf)
a = {}
def on_done(fut):
if fut.exception() is None:
a['success'] = True
else:
a['failure'] = True
wrapped.add_done_callback(on_done)
mcf.set_result(None)
self.assertTrue(a['success'])
mcf = MockCassandraFuture()
wrapped = wrap_future(mcf)
wrapped.add_done_callback(on_done)
mcf.set_result(Exception())
self.assertTrue(a['failure'])
def test_parallel_for(self): def test_parallel_for(self):
class Cursor: class Cursor:
def __init__(self): def __init__(self):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment