Skip to content
Snippets Groups Projects
Commit f665125d authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

v2.9.0rc1 - added CPManager

parent 716c21af
No related branches found
No related tags found
No related merge requests found
# v2.18.11 # v2.19.0
* unit tests migrated to CircleCI * unit tests migrated to CircleCI
* added __len__ to FutureCollection * added __len__ to FutureCollection
* fixed a bug in DictionaryEQAble * fixed a bug in DictionaryEQAble
* fixed a bug in ListDeleter * fixed a bug in ListDeleter
* minor breaking change: changed semantics of ListDeleter * minor breaking change: changed semantics of ListDeleter
* added `CPManager`
\ No newline at end of file
...@@ -46,16 +46,3 @@ These tests run on Python 3.8 ...@@ -46,16 +46,3 @@ These tests run on Python 3.8
They pass on Windows too, but some tests They pass on Windows too, but some tests
requiring POSIX-like functionality are skipped. requiring POSIX-like functionality are skipped.
## How to adjust thread count.
Start from 13000. If the process manages t hit a single core 100%, start a new program, or just reduce
the amount of threads. The process is network-bound.
## Valid targets
* tass.com
* kremlin,ru
* *eng.putin.kremlin.ru
* eng.constitution.kremlin.ru
* eng.flag.kremlin.ru
\ No newline at end of file
...@@ -87,6 +87,11 @@ SequentialIssuer ...@@ -87,6 +87,11 @@ SequentialIssuer
.. autoclass:: satella.coding.concurrent.SequentialIssuer .. autoclass:: satella.coding.concurrent.SequentialIssuer
:members: :members:
CPManager
=========
.. autoclass:: satella.coding.resources.CPManager
:members:
IDAllocator IDAllocator
=========== ===========
......
__version__ = '2.19.0a1' __version__ = '2.19.0rc1'
from .cp_manager import CPManager
import abc
import sys
import typing as tp
import queue
import logging
import warnings
from ..concurrent import Monitor
from ..misc import Closeable
from ..recast_exceptions import silence_excs
from ..typing import T
logger = logging.getLogger(__name__)
class CPManager(Monitor, Closeable, tp.Generic[T]):
"""
A thread-safe no-hassle connection-pool manager.
This supports automatic connection recycling, connection will be cycled each
max_cycle_no takings and deposits.
Note that you have to overload :meth:`~satella.coding.resources.CPManager.teardown_connection`
and :meth:`~satella.coding.resources.CPManager.create_connection`.
:param max_number: maximum number of connections
:param max_cycle_no: maximum number of get/put connection cycles.
.. warning:: May not work under PyPy for reasons having to do with id's semantics.
"""
def __init__(self, max_number: int, max_cycle_no: int):
super().__init__()
Closeable.__init__(self)
if sys.implementation.name != 'cpython':
warnings.warn(f'This may run bad on {sys.implementation.name}', UserWarning)
self.connections = queue.Queue(max_number)
self.spawned_connections = 0
self.max_number = max_number
self.max_cycle_no = max_cycle_no
self.id_to_times = {} # type: tp.Dict[int, int]
def close(self) -> bool:
if super().close():
while self.spawned_connections:
self.teardown_connection(self.connections.get())
self.spawned_connections -= 1
def acquire_connection(self) -> T:
"""
Either acquire a new connection, or just establish it in the background
"""
try:
conn = self.connections.get(False)
except queue.Empty:
while True:
with silence_excs(queue.Empty), Monitor.acquire(self):
if self.spawned_connections == self.max_number:
conn = self.connections.get(False, 5)
break
elif self.spawned_connections < self.max_number:
conn = self.create_connection()
self.connections.put(conn)
self.spawned_connections += 1
break
obj_id = id(conn)
try:
self.id_to_times[obj_id] += 1
except KeyError:
self.id_to_times[obj_id] = 1
return conn
def release_connection(self, connection: T) -> None:
"""
Release a connection
:param connection: connection to release
"""
obj_id = id(connection)
if self.id_to_times[obj_id] == self.max_cycle_no:
with Monitor.acquire(self), silence_excs(KeyError):
self.spawned_connections -= 1
del self.id_to_times[connection]
self.teardown_connection(connection)
else:
try:
self.connections.put(connection, False)
except queue.Full:
with Monitor.acquire(self), silence_excs(KeyError):
self.spawned_connections -= 1
del self.id_to_times[obj_id]
self.teardown_connection(connection)
def fail_connection(self, connection: T) -> None:
"""
Signal that a given connection has been failed
:param connection: connection to fail
"""
obj_id = id(connection)
self.id_to_times[obj_id] = self.max_cycle_no
@abc.abstractmethod
def teardown_connection(self, connection: T) -> None:
"""
Close the connection.
Is safe to block.
:param connection: connection to tear down
"""
@abc.abstractmethod
def create_connection(self) -> T:
"""
Create a new connection.
Is safe to block.
:return: a new connection instance
"""
\ No newline at end of file
import time
import unittest
from concurrent.futures import Future
from satella.coding.concurrent import call_in_separate_thread
from satella.coding.resources import CPManager
class TestResources(unittest.TestCase):
def test_something(self):
class InheritCPManager(CPManager):
def __init__(self, *args):
super().__init__(*args)
self.resources = 0
def create_connection(self):
time.sleep(3)
self.resources += 1
return lambda: self.resources + 1
def teardown_connection(self, connection) -> None:
assert self.resources
self.resources -= 1
cp = InheritCPManager(5, 2)
conns = [cp.acquire_connection() for _ in range(5)]
@call_in_separate_thread()
def do_call():
conn = cp.acquire_connection()
cp.release_connection(conn)
ret = do_call() # type: Future
cp.release_connection(conns.pop())
ret.result(timeout=5)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment