Skip to content
Snippets Groups Projects
Commit 28b19fc6 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

refactor parallel_for

parent 05857731
No related branches found
No related tags found
No related merge requests found
......@@ -2,3 +2,4 @@
* added for_argument, attach_arguments and auto_adapt_to_methods to __all__ in `satella.coding`
* added time_us to __all__ in `satella.time`
* refactored `satella.cassandra.parallel_for`
......@@ -11,3 +11,6 @@ and you would prefer to make use of `execute_async` and want to better
make use of them, here's the routine to help you out.
.. autofunction:: satella.cassandra.parallel_for
Note that if you specify an iterable instead of a string or a `cassandra.query.Statement`,
different query will be applied to those arguments, as stemming from their ordering.
__version__ = '2.8.15_a4'
__version__ = '2.8.15_a5'
import itertools
import typing as tp
from collections import namedtuple
def parallel_for(cursor, query: str, arguments: tp.Iterable[tuple]) -> tp.Iterator[namedtuple]:
def parallel_for(cursor, query: tp.Union[tp.List[str], str, 'Statement', tp.List['Statement']],
arguments: tp.Iterable[tuple]) -> tp.Iterator[namedtuple]:
"""
Syntactic sugar for
>>> futures = []
>>> for args in arguments:
>>> futures.append(cur.execute_async(query, args))
>>> futures.append(cursor.execute_async(query, args))
>>> for future in futures:
>>> yield future.result()
If query is a string, or else
>>> futures = []
>>> for query, args in zip(query, arguments):
>>> futures.append(cursor.execute_async(query, args))
>>> for future in futures:
>>> yield future.result()
:param cursor: the Cassandra cursor to use (obtained using connection.session())
:param query: base query
:param query: base query or a list of queries, if a different one is to be used
:param arguments: iterable yielding arguments to use in execute_async
"""
futures = []
for args in arguments:
futures.append(cursor.execute_async(query, args))
from cassandra.query import Statement
if isinstance(query, (str, Statement)):
query = itertools.repeat(query)
futures = [cursor.execute_async(query, args) for query, args in zip(query, arguments)]
for future in futures:
yield future.result()
......@@ -18,8 +18,14 @@ class TestCassandra(unittest.TestCase):
return self
cur = Cursor()
for row in parallel_for(cur, 'SELECT * FROM table', [(1,), (2, ), (3, )]):
pass
list(parallel_for(cur, 'SELECT * FROM table', [(1,), (2, ), (3, )]))
self.assertEqual(cur.execute_times_called, 3)
self.assertEqual(cur.result_times_called, 3)
list(parallel_for(cur, ['SELECT * FROM table',
'SELECT * FROM table2',
'SELECT * FROM table3'], [(1,), (2, ), (3, )]))
self.assertEqual(cur.execute_times_called, 6)
self.assertEqual(cur.result_times_called, 6)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment