My system looks like this:
[Q1] --> Service1 --> [Q2] --> ...(processing)... --> ServiceN --> [Outbound queue]
The queues are RabbitMQ 3.5.6. I'm using Spring Integration 4.2.1, Spring AMQP 1.5.1 and Spring Integration Java DSL 1.1.0.
I would like to throttle the consumption of messages from the queue Q1 by the Service1 depending on how many messages are currently in processing and haven't reached the Outbound queue - eg. I want to have max. 10 messages being processed at a time. That is because the processing part consumes a lot of resources and I don't want to overload the system.
My current configuration of the initial part of the flow is simply as follows:
IntegrationFlows
.from(Amqp.inboundAdapter(connectionFactory, "Q1"))
.handle(message -> service1.process(message.getPayload())
.get();
Service1 and ServiceN can communicate (it's the same JVM), so I am able to implement a locking mechanism between them so that service1.process() blocks before proceeding with the execution if the "in processing" message limit is reached. That is what - if I understand correctly - @Gary Russell suggested in this comment. It will however result in the messages being picked up from the broker and hanging there for a while in an unacked state. Is there a way to just not pick up the messages from the queue at all?
The answer of @Artem Bilan to use SimpleMessageListenerContainer.stop()/.start() seems quite heavyweight looking at the implementation and all the shutdown/startup logic that would be invoked.
Also both answers are two years old now. Any better suggestions?
Aucun commentaire:
Enregistrer un commentaire