Faced with the problem when the producer and consumer pattern has multiple consumers; i have an intention of creating single producer that pass data to consumers (as Workers) that do some staff with data-object. But the problem is that I have no idea how to pass data to producer;
Lets say that we have main function that gets data from somewhere:
public function Foo(){
dataobject = new DataObject();
Sting data = dataobject.get();
}
Then pass this data to Queue and process it and the final function should be like this:
public function Foo(){
QueueService queue = new QueueService();
dataObject = new DataObject();
Sting data = dataobject.get();
queue.send(data);
}
And there is my case: Producer -> Queue <- Consumer_1, Consumer_n;
Instead of using Blocking Queue I went with ThreadPool and stuck with the problem that I don't have any idea how to pass data to consumer;
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService service = null;
String threadName = Thread.currentThread().getName();
try {
service = Executors.newFixedThreadPool(6); // +1 thread for producer
service.submit(new Producer(service)).get(); // Wait until producer exits
} finally {
if (null != service) {
service.shutdown();
try {
service.awaitTermination(1, TimeUnit.HOURS);
} catch (InterruptedException e) {
//handle
}
}
}
System.out.println("Exit");
}
Thats okay, we spawn several threads and started to work with them. Also there is no problem even with Worker, here is the implementation:
class Worker implements Runnable {
private String message;
public Worker(String message) {
this.message = message;
}
@Override
public void run() {
String name = Thread.currentThread().getName();
ThreadLocalRandom random = ThreadLocalRandom.current();
try {
//Do staff
} catch (InterruptedException e) {
//handle
}
}
}
And finally - Producer
class Producer implements Runnable {
private ExecutorService service;
Producer(ExecutorService service) {
this.service = service;
}
@Override
public void run() {
String threadName = Thread.currentThread().getName();
try {
service.submit(new Worker(input));
} catch (IOException e) {
//handle
}
System.out.printf("[%s] Producer shutdown", threadName);
}
}
As i mentioned earlier I don't have an idea how to pass data to Producers and also I see two possible problems for me:
- I don't quite understand the conception of Threads in Java;
- This implementation is wrong architectural solution;
Aucun commentaire:
Enregistrer un commentaire