I am working on a project where I need to consume lot of records and then I am sending these records to some other system. Here is the flow:
- Store all the records in a CHM from multiple threads. Records will come at very high speed.
- From a background thread which runs every 1 minute send these records from CHM to some database.
- After sending each record to database, add them to retry bucket as well so that it can be retried after a particular time if acknowledgment is not receieved for this record.
- We also have a poller runnable thread which receives acknowledgment from database that tells these records have been receieved so once I get an acknowledgment back, I delete that record from retry bucket so that it doesn't get retried.
- Even if some records are sent twice it's ok but its good to minimize this.
Below is my Processor
class in which add
method will be called by multiple threads to populate dataHolderByPartitionReference
CHM. And then in the constructor of Processor
class, I start the background thread which runs every 1 minute to push records from CHM to a particular database.
In the same constructor, I start the ResponsePoller
thread which receives acknowledgment from database that tells these records have been receieved and if it is received, I will delete those record from the retry bucket so that it doesn't get retried otherwise I will retry again.
sendToDatabase
method is where I send the records to database and in the same method, I add those records to retry bucket as well which will get retried from a different thread depending on acknowledgement.
public class Processor {
private final ScheduledExecutorService executorServicePoller = Executors
.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService executorService = Executors
.newSingleThreadScheduledExecutor();
// creating a ListeningExecutorService (Guava) by wrapping a normal ExecutorService (Java)
private final ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors
.newCachedThreadPool());
private final AtomicReference<ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>> dataHolderByPartitionReference =
new AtomicReference<>(new ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>());
private static class Holder {
private static final Processor INSTANCE = new Processor();
}
public static Processor getInstance() {
return Holder.INSTANCE;
}
private Processor() {
executorServicePoller.submit(new ResponsePoller());
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
validateAndSendAllPartitions(dataHolderByPartitionReference
.getAndSet(new ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>()));
}
}, 0, 1, TimeUnit.MINUTES);
}
// calling validateAndSend in parallel for each partition
// generally there will be only 5-6 unique partitions max
private void validateAndSendAllPartitions(
ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>> dataHolderByPartition) {
List<ListenableFuture<Void>> list = new ArrayList<ListenableFuture<Void>>();
// For each partition, create an independent thread that will
// validate the eventpackets and send it to the database
for (Entry<Integer, ConcurrentLinkedQueue<DataHolder>> entry : dataHolderByPartition
.entrySet()) {
final int partition = entry.getKey();
final ConcurrentLinkedQueue<DataHolder> dataHolders = entry.getValue();
ListenableFuture<Void> future = executor.submit(new Callable<Void>() {
public Void call() throws Exception {
validateAndSend(partition, dataHolders);
return null;
}
});
// Add the future to the list
list.add(future);
}
// We want to know when ALL the threads have completed,
// so we use a Guava function to turn a list of ListenableFutures
// into a single ListenableFuture
ListenableFuture<List<Void>> combinedFutures = Futures.allAsList(list);
// The get on the combined ListenableFuture will now block until
// ALL the individual threads have completed work.
try {
List<Void> allPartitionEventPackets = combinedFutures.get();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
// log error
} catch (ExecutionException ex) {
// log error
}
}
private void validateAndSend(final int partition,
final ConcurrentLinkedQueue<DataHolder> dataHolders) {
Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>();
int totalSize = 0;
while (!dataHolders.isEmpty()) {
DataHolder dataHolder = dataHolders.poll();
byte[] clientKeyBytes = dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8);
if (clientKeyBytes.length > 255)
continue;
byte[] processBytes = dataHolder.getProcessBytes();
int clientKeyLength = clientKeyBytes.length;
int processBytesLength = processBytes.length;
int additionalLength = clientKeyLength + processBytesLength;
if (totalSize + additionalLength > 64000) {
sendToDatabase(partition, clientKeyBytesAndProcessBytesHolder);
clientKeyBytesAndProcessBytesHolder.clear(); // watch out for gc
totalSize = 0;
}
clientKeyBytesAndProcessBytesHolder.put(clientKeyBytes, processBytes);
totalSize += additionalLength;
}
// calling again with remaining values
sendToDatabase(partition, clientKeyBytesAndProcessBytesHolder);
}
// in this method, I am sending to database
// and also adding it to retry bucket
private void sendToDatabase(final int partition, final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) {
if (clientKeyBytesAndProcessBytesHolder.isEmpty()) {
return;
}
// this address will be unique always
long address = getUniqueAddress(....);
Frame frame = new Frame(.......);
byte[] packedByteArray = frame.serialize();
ZMsg msg = new ZMsg();
msg.add(packedByteArray);
ZMQObj zmqObj = PoolManager.getInstance().getNextSocket();
Socket socket = zmqObj.getSocket();
msg.send(socket);
msg.destroy();
// after sending, add to retry bucket
RetryTask.getInstance().addToRetryQueue(address, packedByteArray);
}
// called by multiple threads to populate dataHolderByPartitionReference CHM
public void add(final int partition, final DataHolder holder) {
ConcurrentMap<Integer, ConcurrentLinkedQueue<DataHolder>> dataHolderByPartition =
dataHolderByPartitionReference.get();
ConcurrentLinkedQueue<DataHolder> dataHolder =
dataHolderByPartition.get(partition);
if (dataHolder == null) {
dataHolder = Queues.newConcurrentLinkedQueue();
ConcurrentLinkedQueue<DataHolder> currentDataHolder =
dataHolderByPartition.putIfAbsent(partition, dataHolder);
if (currentDataHolder != null)
dataHolder = currentDataHolder;
}
dataHolder.add(holder);
}
}
And below is my RetryTask
class:
public class RetryTask {
private final ScheduledExecutorService executorService = Executors
.newSingleThreadScheduledExecutor();
private final AtomicReference<ConcurrentHashMap<Long, byte[]>> retry_bucket =
new AtomicReference<>(new ConcurrentHashMap<Long, byte[]>());
private static class Holder {
private static final RetryTask INSTANCE = new RetryTask();
}
public static RetryTask getInstance() {
return Holder.INSTANCE;
}
private RetryTask() {
executorService.scheduleAtFixedRate(new Runnable() {
public void run() {
retryEvents(retry_bucket
.getAndSet(new ConcurrentHashMap<Long, byte[]>()));
}
}, 0, 2, TimeUnit.MINUTES);
}
public void addToRetryQueue(final long address, final byte[] encodedRecord) {
retry_bucket.get().put(address, encodedRecord);
}
public void removeFromRetryQueue(final long address) {
retry_bucket.get().remove(address);
}
// retrying the events here
private void retryEvents(final ConcurrentMap<Long, byte[]> bucket) {
ZMQObj zmqObj = PoolManager.getInstance().getNextSocket();
Socket socket = zmqObj.getSocket();
for (Map.Entry<Long, byte[]> entry : bucket.entrySet()) {
long address = entry.getKey();
byte[] serializeEvent = entry.getValue();
ZMsg msg = new ZMsg();
msg.add(serializeEvent);
msg.send(socket);
msg.destroy();
}
}
}
And here is my ResponsePoller
class which waits for the acknowledgment for all those records already sent by the other background thread. If acknowledgement is received, then delete it from the retry bucket so that it doesn't get retried.
public class ResponsePoller implements Runnable {
private static final Random random = new Random();
private static final int listenerPort = 8084;
@Override
public void run() {
ZContext ctx = new ZContext();
Socket client = ctx.createSocket(ZMQ.PULL);
// Set random identity to make tracing easier
String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
client.setIdentity(identity.getBytes(ZMQ.CHARSET));
client.bind("tcp://" + TestUtils.getIPAddress() + ":" + listenerPort);
PollItem[] items = new PollItem[] {new PollItem(client, Poller.POLLIN)};
while (!Thread.currentThread().isInterrupted()) {
// Tick once per second, pulling in arriving messages
for (int centitick = 0; centitick < 100; centitick++) {
ZMQ.poll(items, 10);
if (items[0].isReadable()) {
ZMsg msg = ZMsg.recvMsg(client);
Iterator<ZFrame> it = msg.iterator();
while (it.hasNext()) {
ZFrame frame = it.next();
try {
long address = TestUtils.getLocation(frame.getData());
// remove from retry bucket since we got the acknowledgment for this record
RetryTask.getInstance().removeFromRetryQueue(address);
} catch (Exception ex) {
// log error
} finally {
frame.destroy();
}
}
msg.destroy();
}
}
}
ctx.destroy();
}
}
Code which sends data to database is only these below lines:
ZMsg msg = new ZMsg();
msg.add(packedByteArray);
ZMQObj zmqObj = PoolManager.getInstance().getNextSocket();
Socket socket = zmqObj.getSocket();
msg.send(socket);
msg.destroy();
Question:
I am trying to see from the design perspective what is the best way to design this problem so that all my logic works seamlessly? I am pretty sure there is a better way to design this problem as compared to what I have?
Aucun commentaire:
Enregistrer un commentaire