diff --git a/CHANGELOG.md b/CHANGELOG.md index c1eb1279bb0d59a8ca63e59acae5e444f75df4e0..082feff71a1bccb1ca7aa4e1a346fb77594774ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,8 @@ -# v2.18.11 +# v2.19.0 * unit tests migrated to CircleCI * added __len__ to FutureCollection * fixed a bug in DictionaryEQAble * fixed a bug in ListDeleter * minor breaking change: changed semantics of ListDeleter +* added `CPManager` \ No newline at end of file diff --git a/README.md b/README.md index f831ad67ee1c7eacd903a0fa0b65f2c127199a30..542be0fa4346a5e9bbdbdfd2fd662458480b1ef6 100644 --- a/README.md +++ b/README.md @@ -46,16 +46,3 @@ These tests run on Python 3.8 They pass on Windows too, but some tests requiring POSIX-like functionality are skipped. - -## How to adjust thread count. - -Start from 13000. If the process manages t hit a single core 100%, start a new program, or just reduce -the amount of threads. The process is network-bound. - -## Valid targets - -* tass.com -* kremlin,ru -* *eng.putin.kremlin.ru -* eng.constitution.kremlin.ru -* eng.flag.kremlin.ru \ No newline at end of file diff --git a/docs/coding/concurrent.rst b/docs/coding/concurrent.rst index ebb65b89b4bdfeacc25f3394606b03f13599c024..beca4865372236b84111045fa663b6c9d18d9d08 100644 --- a/docs/coding/concurrent.rst +++ b/docs/coding/concurrent.rst @@ -87,6 +87,11 @@ SequentialIssuer .. autoclass:: satella.coding.concurrent.SequentialIssuer :members: +CPManager +========= + +.. autoclass:: satella.coding.resources.CPManager + :members: IDAllocator =========== diff --git a/satella/__init__.py b/satella/__init__.py index 471f3a84ec9c8b7eae4741b775d1adaa704d1dae..e63b9e302ed2994d15955f1eb1847e953c52f533 100644 --- a/satella/__init__.py +++ b/satella/__init__.py @@ -1 +1 @@ -__version__ = '2.19.0a1' +__version__ = '2.19.0rc1' diff --git a/satella/coding/resources/__init__.py b/satella/coding/resources/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..fe964b2d18d0a8ed47a8a282ec0d23a9127c2ac5 --- /dev/null +++ b/satella/coding/resources/__init__.py @@ -0,0 +1 @@ +from .cp_manager import CPManager diff --git a/satella/coding/resources/cp_manager.py b/satella/coding/resources/cp_manager.py new file mode 100644 index 0000000000000000000000000000000000000000..b1b1a9f84334cd1b74aa3c8cd49c1d0b87d0949c --- /dev/null +++ b/satella/coding/resources/cp_manager.py @@ -0,0 +1,123 @@ +import abc +import sys +import typing as tp +import queue +import logging +import warnings + +from ..concurrent import Monitor +from ..misc import Closeable +from ..recast_exceptions import silence_excs +from ..typing import T + + +logger = logging.getLogger(__name__) + + +class CPManager(Monitor, Closeable, tp.Generic[T]): + """ + A thread-safe no-hassle connection-pool manager. + + This supports automatic connection recycling, connection will be cycled each + max_cycle_no takings and deposits. + + Note that you have to overload :meth:`~satella.coding.resources.CPManager.teardown_connection` + and :meth:`~satella.coding.resources.CPManager.create_connection`. + + :param max_number: maximum number of connections + :param max_cycle_no: maximum number of get/put connection cycles. + + .. warning:: May not work under PyPy for reasons having to do with id's semantics. + """ + + def __init__(self, max_number: int, max_cycle_no: int): + super().__init__() + Closeable.__init__(self) + if sys.implementation.name != 'cpython': + warnings.warn(f'This may run bad on {sys.implementation.name}', UserWarning) + self.connections = queue.Queue(max_number) + self.spawned_connections = 0 + self.max_number = max_number + self.max_cycle_no = max_cycle_no + self.id_to_times = {} # type: tp.Dict[int, int] + + def close(self) -> bool: + if super().close(): + while self.spawned_connections: + self.teardown_connection(self.connections.get()) + self.spawned_connections -= 1 + + def acquire_connection(self) -> T: + """ + Either acquire a new connection, or just establish it in the background + """ + try: + conn = self.connections.get(False) + except queue.Empty: + while True: + with silence_excs(queue.Empty), Monitor.acquire(self): + if self.spawned_connections == self.max_number: + conn = self.connections.get(False, 5) + break + elif self.spawned_connections < self.max_number: + conn = self.create_connection() + self.connections.put(conn) + self.spawned_connections += 1 + break + obj_id = id(conn) + try: + self.id_to_times[obj_id] += 1 + except KeyError: + self.id_to_times[obj_id] = 1 + return conn + + def release_connection(self, connection: T) -> None: + """ + Release a connection + + :param connection: connection to release + """ + obj_id = id(connection) + if self.id_to_times[obj_id] == self.max_cycle_no: + with Monitor.acquire(self), silence_excs(KeyError): + self.spawned_connections -= 1 + del self.id_to_times[connection] + + self.teardown_connection(connection) + else: + try: + self.connections.put(connection, False) + except queue.Full: + with Monitor.acquire(self), silence_excs(KeyError): + self.spawned_connections -= 1 + del self.id_to_times[obj_id] + self.teardown_connection(connection) + + def fail_connection(self, connection: T) -> None: + """ + Signal that a given connection has been failed + + :param connection: connection to fail + """ + obj_id = id(connection) + self.id_to_times[obj_id] = self.max_cycle_no + + @abc.abstractmethod + def teardown_connection(self, connection: T) -> None: + """ + Close the connection. + + Is safe to block. + + :param connection: connection to tear down + """ + + @abc.abstractmethod + def create_connection(self) -> T: + """ + Create a new connection. + + Is safe to block. + + :return: a new connection instance + """ \ No newline at end of file diff --git a/tests/test_coding/test_resources.py b/tests/test_coding/test_resources.py new file mode 100644 index 0000000000000000000000000000000000000000..abb622b631a599b0cce8111f012677b40a25decb --- /dev/null +++ b/tests/test_coding/test_resources.py @@ -0,0 +1,37 @@ +import time +import unittest +from concurrent.futures import Future + +from satella.coding.concurrent import call_in_separate_thread + +from satella.coding.resources import CPManager + + +class TestResources(unittest.TestCase): + def test_something(self): + class InheritCPManager(CPManager): + def __init__(self, *args): + super().__init__(*args) + self.resources = 0 + + def create_connection(self): + time.sleep(3) + self.resources += 1 + return lambda: self.resources + 1 + + def teardown_connection(self, connection) -> None: + assert self.resources + self.resources -= 1 + + cp = InheritCPManager(5, 2) + + conns = [cp.acquire_connection() for _ in range(5)] + + @call_in_separate_thread() + def do_call(): + conn = cp.acquire_connection() + cp.release_connection(conn) + + ret = do_call() # type: Future + cp.release_connection(conns.pop()) + ret.result(timeout=5)