I have an issue thinking of an architecture that'll solve the following problem:
I have a web application (producer) that receives some data on request. I also have a number of processes (consumers) that should process this data. 1 request generates 1 batch of data and should be processes by 1 consumer only.
My current solution consists of receiving the data, cache-ing it in memory with Redis, sending a message through a messagechannel that data has been written while the consumers are listening on the same channel, and then the data is processed by the consumers. The issue here is that I need to stop multiple consumers from working on the same data.
Producer code (flask endpoint):
data = request.get_json()
db = redis.Redis(connection_pool=pool)
db.set(data["externalId"], data)
# Subscribe to the batches channel and publish the id
db.pubsub()
db.publish('batches', request_key)
results = None
result_key = str(data["externalId"])
# Wait till the batch is processed
while results is None:
results = db.get(result_key)
if results is not None:
results = results.decode('utf8')
db.delete(data["externalId"])
db.delete(result_key)
Consumer:
db = redis.Redis(connection_pool = pool)
channel = db.pubsub()
channel.subscribe('batches')
while True:
try:
message = channel.get_message()
message_data = bytes(message['data']).decode('utf8')
external_id = message_data.split('-')[-1]
data = json.loads(db.get(external_id).decode('utf8'))
result = risic_scorer.score(data)
db.set(str(external_id), result)
except Exception:
pass
Aucun commentaire:
Enregistrer un commentaire