the project has the following structures (one of the question is also, are they all necessary? I am quite new to the pythonic ways)
- a singleton (Utils) which has a resource pool inside
- a resource pool which creates resources with a factory
- a factory which returns the resource (in this case it is an RPC connection)
I would expect that if I write a test requiring many resources in parallel (e.g. with import multiprocessing as mp), the resource pool would start filling up, but it always remain of size 1, which is quite worrying because it looks like something forcing synch execution.
Here are some lines describing the objects, at the end of the post there is the worrying test case.
Utils:
from flask import current_app
from app.main.util.serviceauthproxyfactory import ServiceAuthProxyFactory
from app.main.util.resourcepool import LazyPool
class Singleton(object):
_instance = None # Keep instance reference
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = object.__new__(cls, *args, **kwargs)
return cls._instance
class CUtils(Singleton):
factory = ServiceAuthProxyFactory()
lazypool = None
def __init__(self):
self.lazypool = LazyPool(factory = self.factory.create('any',rpc_user=current_app.config['RPC_USER'], rpc_password=current_app.config['RPC_PASSWORD'], rpc_host=current_app.config['RPC_HOST'], rpc_port=current_app.config['RPC_PORT']), pool_size=5)
def some_method(self, arg1):
try:
with self.lazypool.reserve(timeout=10) as rpc_connection:
raw_out=rpc_connection.dothings(arg1)
...
except Exception as general_exception:
print("An Exception occured: " + str(general_exception))
return general_exception
Factory:
class ServiceAuthProxyFactory:
def __init__(self):
self._builders = {}
def register_builder(self, key, builder):
self._builders[key] = builder
def create(self, key, **kwargs):
builder = self._builders.get(key)
if not builder:
#return default
builder = DefaultAuthServiceProxyBuilder(**kwargs)
return builder(**kwargs)
...etc..
Resource Pool (Lazy Pool - tweaked from this project but conceptually the same: https://github.com/Bogdanp/resource_pool, we can refer to this one:)
class LazyPool(Generic[ResourceT]):
_factory: ResourceFactory
_cond: Condition
_pool: List[ResourceT]
_pool_size: int
_used_size: int
def __init__(self, factory: ResourceFactory, *, pool_size: int, min_instances: int = 0) -> None:
assert pool_size > min_instances, "pool_size must be larger than min_instances"
self._factory = factory
self._cond = Condition()
self._pool = []
self._pool_size = pool_size
self._used_size = 0
for _ in range(min_instances):
self._used_size += 1
self.put(factory())
@contextmanager
def reserve(self, timeout: Optional[float] = None) -> Generator[ResourceT, None, None]:
"""Reserve a resource and then put it back.
"""
resource = self.get(timeout=timeout)
try:
yield resource
finally:
self.put(resource)
...
def __len__(self) -> int:
return len(self._pool)
I have written a list of tests which checks out:
- create 2 CUtils and check that they are the same object, OK
- test that the object returned by the factory actually works, OK
- test that the method in CUtils, when called, requests an object from the pool, which calls the factory and finally returns stuff as expected, OK
The FAIL test with multiprocessing is this one:
import multiprocessing as mp
...
def test_lazypool_multiple_operations(self):
cu = CUtils()
data = [....]
pool = mp.Pool(mp.cpu_count())
results = pool.map_async(cu.do_stuff, [x for x in data]).get()
pool.close()
pool.join() #is this necessary?
for result in results:
self.assertTrue(result=='expected outcome') #OK
self.assertTrue(len(cu.lazypool)>1) #FAIL - size remains 1
I would expect that the cu object would initiate the lazypool, then, every time it's asked to do something by the map_async it requires a new object to the pool which requires it from the factory up to pool size (5). Even if some objects may be released in time for the next request, it's impossible that all of them starting in parallel can use the same object from the pool, so I'd expect the size of the pool to grow.. but it doesn't.
My questions are:
- is this a reasonably pythonic way to structure concurrent requests to a slow resource? In the project it is not feasible to open an arbitrary number of rpc_connections, the project must keep that in check. Are there more pythonic patters to do it?
- am I misunderstanding the Singleton class, does it behave like a global static object?
- Is the resource_pool + factory pattern necessary?
Aucun commentaire:
Enregistrer un commentaire