mardi 7 août 2018

Task delegation in Python/Redis

I have an issue thinking of an architecture that'll solve the following problem:

I have a web application (producer) that receives some data on request. I also have a number of processes (consumers) that should process this data. 1 request generates 1 batch of data and should be processes by 1 consumer only.

My current solution consists of receiving the data, cache-ing it in memory with Redis, sending a message through a messagechannel that data has been written while the consumers are listening on the same channel, and then the data is processed by the consumers. The issue here is that I need to stop multiple consumers from working on the same data.

Producer code (flask endpoint):

    data = request.get_json()
    db = redis.Redis(connection_pool=pool)
    db.set(data["externalId"], data)
    # Subscribe to the batches channel and publish the id
    db.pubsub()
    db.publish('batches', request_key)
    results = None
    result_key = str(data["externalId"])

    # Wait till the batch is processed
    while results is None:
        results = db.get(result_key)
        if results is not None:
            results = results.decode('utf8')

    db.delete(data["externalId"])
    db.delete(result_key)

Consumer:

    db = redis.Redis(connection_pool = pool)
    channel = db.pubsub()
    channel.subscribe('batches')

    while True:
        try:
            message = channel.get_message()
            message_data = bytes(message['data']).decode('utf8')
            external_id = message_data.split('-')[-1]
            data = json.loads(db.get(external_id).decode('utf8'))
            result = risic_scorer.score(data)
            db.set(str(external_id), result)
        except Exception:
            pass

Aucun commentaire:

Enregistrer un commentaire