lundi 28 juin 2021

one producer with long running several consumers with blocking Queue

I have to create message sending application witch send SMS to our customers. since generating and sending messages to customers takes considerable time, I have decided to implement it with producer and consumer pattern. So, this will not affect the original flow of execution.

ones I put raw data to the queue as a object, this will picked by one of consumer threads in thread pool and then generate message and send SMS. this flow should continue once application is up and running.

My application works fine. but I found that each thread that consumer and producer thread pool creates stay alive in waiting state even after it finished the send SMS task. is it a problem for long running application or can I use the the consumer and producer thread pool for all the time instead of creating new thread pool each time when initializingSendMessage(RawData trxn) method invoked?

MessageSendUtil class is used to create common queue and initialize the taks.

public class SendMessageUtil {
    public static void initializingSendMessage(RawData trxn) {

        BlockingQueue<Message> sharedQueue = new LinkedBlockingQueue<>();
        ExecutorService produceMessagePool = Executors.newFixedThreadPool(1);
        ExecutorService consumerMessagePool = Executors.newFixedThreadPool(5);
        try {
            produceMessagePool.submit(new Producer(sharedQueue));
            int i = 0;
            while (i++<MESSAGE_CONSUME_POOL_SIZE) {
                consumerMessagePool.submit(new Consumer(sharedQueue));
            }
            produceMessagePool.shutdown();
            consumerMessagePool.shutdown();
            
        } catch (Exception ex){
            System.out.println(ex.getMessage());
        }
    }

my consumer and producer class looks like this.

public class Producer implements Runnable {
    private final BlockingQueue<Message> sharedQueue;
    public Producer(BlockingQueue<Message> sharedQueue) {
        this.sharedQueue = sharedQueue;
    }
    @Override
    public void run() {
        List<Message> messageList = new ArrayList<>();
        for(int i = 0;i<100000;i++) {
            Message message = new Message();
            message.setMessage("Test message sending");
            messageList.add(message);
        }
        sharedQueue.addAll(messageList);
    }
}

/

public class Consumer implements Runnable {
    private final BlockingQueue<Message> sharedQueue;
    private MessageBroadcaster messageBroadcaster;
    public Consumer(BlockingQueue<Message> sharedQueue) {
        this.sharedQueue = sharedQueue;
    }
    @Override
    public void run() {
        initializeMessageBroadcaster();

        //Send messages to costumer
        while(true){
            try {
                Message message = sharedQueue.take();
                messageBroadcaster.sendMessage(message);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }

    private void initializeMessageBroadcaster() {
        if(Objects.isNull(messageBroadcaster)){
            messageBroadcaster = new MessageBroadcasterImpl();
        }
    }
}

after several time of invoking initializingSendMessage(RawData trxn), live threads show like this. live thread count

Aucun commentaire:

Enregistrer un commentaire