dimanche 10 juillet 2016

Producer/consumer pattern with multiple Consumers

Faced with the problem when the producer and consumer pattern has multiple consumers; i have an intention of creating single producer that pass data to consumers (as Workers) that do some staff with data-object. But the problem is that I have no idea how to pass data to producer;

Lets say that we have main function that gets data from somewhere:

public function Foo(){
    dataobject = new DataObject();
    Sting data = dataobject.get();
}

Then pass this data to Queue and process it and the final function should be like this:

public function Foo(){
    QueueService queue = new QueueService();
    dataObject = new DataObject();
    Sting data = dataobject.get();
    queue.send(data);
}

And there is my case: Producer -> Queue <- Consumer_1, Consumer_n;

Instead of using Blocking Queue I went with ThreadPool and stuck with the problem that I don't have any idea how to pass data to consumer;

 public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService service = null;
        String threadName = Thread.currentThread().getName();
        try {
            service = Executors.newFixedThreadPool(6); // +1 thread for producer
            service.submit(new Producer(service)).get(); // Wait until producer exits
        } finally {
            if (null != service) {
                service.shutdown();
                try {
                    service.awaitTermination(1, TimeUnit.HOURS);
                } catch (InterruptedException e) {
                    //handle
                }
            }
        }
        System.out.println("Exit");
    }

Thats okay, we spawn several threads and started to work with them. Also there is no problem even with Worker, here is the implementation:

class Worker implements Runnable {
    private String message;

    public Worker(String message) {
        this.message = message;
    }

    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        ThreadLocalRandom random = ThreadLocalRandom.current();
        try {
            //Do staff
        } catch (InterruptedException e) {
            //handle
        }
    }
}

And finally - Producer

class Producer implements Runnable {
    private ExecutorService service;

    Producer(ExecutorService service) {
        this.service = service;
    }

    @Override
    public void run() {
        String threadName = Thread.currentThread().getName();

        try {
          service.submit(new Worker(input));
        } catch (IOException e) {
            //handle
        }
        System.out.printf("[%s] Producer shutdown", threadName);
    }
}

As i mentioned earlier I don't have an idea how to pass data to Producers and also I see two possible problems for me:

  • I don't quite understand the conception of Threads in Java;
  • This implementation is wrong architectural solution;

Aucun commentaire:

Enregistrer un commentaire