diff --git a/CHANGELOG.md b/CHANGELOG.md index 3628e62b2db8dcdfbfb32f9d47630f804ce2c4e0..eb6b91c645440535c903bd1a8f46fe3cd3069b80 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,3 +2,4 @@ * added `time_us` * updated `stringify` to correctly handle None-cases +* added `satella.cassandra.parallel_for` diff --git a/docs/cassandra.rst b/docs/cassandra.rst new file mode 100644 index 0000000000000000000000000000000000000000..0842a51cae1e8b106a7dc05fd21d94b4e1048126 --- /dev/null +++ b/docs/cassandra.rst @@ -0,0 +1,13 @@ +**This module is available only if you have cassandra-driver installed** + +Cassandra +========= + +parallel_for +------------ + +If you have multiple async requests that would hit multiple nodes +and you would prefer to make use of `execute_async` and want to better +make use of them, here's the routine to help you out. + +.. autofunction:: satella.cassandra.parallel_for diff --git a/docs/index.rst b/docs/index.rst index 7e5d5ad88e9409d50773b26e23914317b19f2fe1..4c0f60fcc5a8ed14818e7f1578ca6df0474445ad 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -29,6 +29,7 @@ Visit the project's page at GitHub_! time exceptions processes + cassandra Indices and tables diff --git a/satella/__init__.py b/satella/__init__.py index 0934f03c99a07db6f6e28f29dfdd3a74aee17d8c..7039ccb2922ee420cbbd5b9e2a24c848eb3b9522 100644 --- a/satella/__init__.py +++ b/satella/__init__.py @@ -1 +1 @@ -__version__ = '2.8.6_a3' +__version__ = '2.8.6' diff --git a/satella/cassandra/__init__.py b/satella/cassandra/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..f3d0093763d3a0006fc878ab760fd436c4498940 --- /dev/null +++ b/satella/cassandra/__init__.py @@ -0,0 +1,24 @@ +import typing as tp +from collections import namedtuple + + +def parallel_for(cursor, query: str, arguments: tp.Iterable[tuple]) -> tp.Iterator[namedtuple]: + """ + Syntactic sugar for + + >>> futures = [] + >>> for args in arguments: + >>> futures.append(cur.execute_async(query, args)) + >>> for future in futures: + >>> yield future.result() + + :param cursor: the Cassandra cursor to use + :param query: base query + :param arguments: iterable yielding arguments to use in execute_async + """ + futures = [] + for args in arguments: + futures.append(cursor.execute_async(query, args)) + + for future in futures: + yield future.result() diff --git a/setup.py b/setup.py index ef8a18a6bb6982c42bd6b45ad83b4cbec1c837a6..dbf6e556a3953cc4e00fc0d94c404d9c4b51347c 100644 --- a/setup.py +++ b/setup.py @@ -16,6 +16,7 @@ setup(keywords=['ha', 'high availability', 'scalable', 'scalability', 'server', extras_require={ 'HTTPJSONSource': ['requests'], 'YAMLSource': ['pyyaml'], - 'TOMLSource': ['toml'] + 'TOMLSource': ['toml'], + 'satella.cassandra': ['cassandra-driver'] } ) diff --git a/tests/test_cassandra.py b/tests/test_cassandra.py new file mode 100644 index 0000000000000000000000000000000000000000..98bdf0b26c2e9249b303f795ed07aa68e68f23ec --- /dev/null +++ b/tests/test_cassandra.py @@ -0,0 +1,25 @@ +from satella.cassandra import parallel_for +import unittest + + +class TestCassandra(unittest.TestCase): + def test_parallel_for(self): + class Cursor: + def __init__(self): + self.execute_times_called = 0 + self.result_times_called = 0 + + def result(self): + self.result_times_called += 1 + return [] + + def execute_async(self, query, args): + self.execute_times_called += 1 + return self + + cur = Cursor() + for row in parallel_for(cur, 'SELECT * FROM table', [(1,), (2, ), (3, )]): + pass + + self.assertEqual(cur.execute_times_called, 3) + self.assertEqual(cur.result_times_called, 3) \ No newline at end of file