Skip to content
Snippets Groups Projects
Commit 6d515d7c authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

further improve parallel_for

parent 28b19fc6
No related branches found
No related tags found
No related merge requests found
__version__ = '2.8.15_a5'
__version__ = '2.8.15_a6'
......@@ -14,7 +14,7 @@ def parallel_for(cursor, query: tp.Union[tp.List[str], str, 'Statement', tp.List
>>> for future in futures:
>>> yield future.result()
If query is a string, or else
If query is a string or a Cassandra Statement, or else
>>> futures = []
>>> for query, args in zip(query, arguments):
......@@ -22,6 +22,9 @@ def parallel_for(cursor, query: tp.Union[tp.List[str], str, 'Statement', tp.List
>>> for future in futures:
>>> yield future.result()
Note that if None is encountered in the argument iterable, session.execute() will
be called with a single argument. You better have it as a BoundStatement then!
:param cursor: the Cassandra cursor to use (obtained using connection.session())
:param query: base query or a list of queries, if a different one is to be used
:param arguments: iterable yielding arguments to use in execute_async
......@@ -30,8 +33,14 @@ def parallel_for(cursor, query: tp.Union[tp.List[str], str, 'Statement', tp.List
if isinstance(query, (str, Statement)):
query = itertools.repeat(query)
futures = [cursor.execute_async(query, args) for query, args in zip(query, arguments)]
futures = []
for query, args in zip(query, arguments):
if args is None:
future = cursor.execute_async(query)
else:
future = cursor.execute_async(query, args)
futures.append(future)
for future in futures:
yield future.result()
......@@ -8,12 +8,15 @@ class TestCassandra(unittest.TestCase):
def __init__(self):
self.execute_times_called = 0
self.result_times_called = 0
self.execute_without_args_called = 0
def result(self):
self.result_times_called += 1
return []
def execute_async(self, query, args):
def execute_async(self, query, args=None):
if args is None:
self.execute_without_args_called += 1
self.execute_times_called += 1
return self
......@@ -29,3 +32,10 @@ class TestCassandra(unittest.TestCase):
self.assertEqual(cur.execute_times_called, 6)
self.assertEqual(cur.result_times_called, 6)
list(parallel_for(cur, ['SELECT * FROM table',
'SELECT * FROM table2',
'SELECT * FROM table3'], [None, None, None]))
self.assertEqual(cur.execute_without_args_called, 3)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment