mercredi 25 janvier 2017

How to design a system which sends records and retry them if acknowledgement is not receieved?

I am working on a project where I need to consume lot of records and then I am sending these records to some other system. Here is the flow:

  • Store all the records in a CHM from multiple threads. Records will come at very high speed.
  • From a background thread which runs every 1 minute send these records from CHM to some database.
  • After sending each record to database, add them to retry bucket as well so that it can be retried after a particular time if acknowledgment is not receieved for this record.
  • We also have a poller runnable thread which receives acknowledgment from database that tells these records have been receieved so once I get an acknowledgment back, I delete that record from retry bucket so that it doesn't get retried.
  • Even if some records are sent twice it's ok but its good to minimize this.

Below is my Processor class in which add method will be called by multiple threads to populate dataHolderByPartitionReference CHM. And then in the constructor of Processor class, I start the background thread which runs every 1 minute to push records from CHM to a particular database.

In the same constructor, I start the ResponsePoller thread which receives acknowledgment from database that tells these records have been receieved and if it is received, I will delete those record from the retry bucket so that it doesn't get retried otherwise I will retry again.

sendToDatabase method is where I send the records to database and in the same method, I add those records to retry bucket as well which will get retried from a different thread depending on acknowledgement.

public class Processor {
  private final ScheduledExecutorService executorServicePoller = Executors
      .newSingleThreadScheduledExecutor();
  private final ScheduledExecutorService executorService = Executors
      .newSingleThreadScheduledExecutor();
  // creating a ListeningExecutorService (Guava) by wrapping a normal ExecutorService (Java)
  private final ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors
      .newCachedThreadPool());
  private final AtomicReference<ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>> dataHolderByPartitionReference =
      new AtomicReference<>(new ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>());

  private static class Holder {
    private static final Processor INSTANCE = new Processor();
  }

  public static Processor getInstance() {
    return Holder.INSTANCE;
  }

  private Processor() {
    executorServicePoller.submit(new ResponsePoller());
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        validateAndSendAllPartitions(dataHolderByPartitionReference
            .getAndSet(new ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>()));
      }
    }, 0, 1, TimeUnit.MINUTES);
  }

  // calling validateAndSend in parallel for each partition
  // generally there will be only 5-6 unique partitions max
  private void validateAndSendAllPartitions(
      ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>> dataHolderByPartition) {
    List<ListenableFuture<Void>> list = new ArrayList<ListenableFuture<Void>>();
    // For each partition, create an independent thread that will
    // validate the eventpackets and send it to the database
    for (Entry<Integer, ConcurrentLinkedQueue<DataHolder>> entry : dataHolderByPartition
        .entrySet()) {
     final int partition = entry.getKey();
      final ConcurrentLinkedQueue<DataHolder> dataHolders = entry.getValue();
      ListenableFuture<Void> future = executor.submit(new Callable<Void>() {
        public Void call() throws Exception {
          validateAndSend(partition, dataHolders);
          return null;
        }
      });
      // Add the future to the list
      list.add(future);
    }
    // We want to know when ALL the threads have completed,
    // so we use a Guava function to turn a list of ListenableFutures
    // into a single ListenableFuture
    ListenableFuture<List<Void>> combinedFutures = Futures.allAsList(list);

    // The get on the combined ListenableFuture will now block until
    // ALL the individual threads have completed work.
    try {
      List<Void> allPartitionEventPackets = combinedFutures.get();
    } catch (InterruptedException ex) {
      Thread.currentThread().interrupt();
      // log error
    } catch (ExecutionException ex) {
      // log error
    }
  }

  private void validateAndSend(final int partition,
      final ConcurrentLinkedQueue<DataHolder> dataHolders) {
    Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>();
    int totalSize = 0;
    while (!dataHolders.isEmpty()) {
      DataHolder dataHolder = dataHolders.poll();
      byte[] clientKeyBytes = dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8);
      if (clientKeyBytes.length > 255)
        continue;
      byte[] processBytes = dataHolder.getProcessBytes();
      int clientKeyLength = clientKeyBytes.length;
      int processBytesLength = processBytes.length;

      int additionalLength = clientKeyLength + processBytesLength;
      if (totalSize + additionalLength > 64000) {
        sendToDatabase(partition, clientKeyBytesAndProcessBytesHolder);
        clientKeyBytesAndProcessBytesHolder.clear(); // watch out for gc
        totalSize = 0;
      }
      clientKeyBytesAndProcessBytesHolder.put(clientKeyBytes, processBytes);
      totalSize += additionalLength;
    }
    // calling again with remaining values
    sendToDatabase(partition, clientKeyBytesAndProcessBytesHolder);
  }

  // in this method, I am sending to database
  // and also adding it to retry bucket
  private void sendToDatabase(final int partition, final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) {
    if (clientKeyBytesAndProcessBytesHolder.isEmpty()) {
      return;
    }
    // this address will be unique always
    long address = getUniqueAddress(....);
    Frame frame = new Frame(.......);
    byte[] packedByteArray = frame.serialize();
    ZMsg msg = new ZMsg();
    msg.add(packedByteArray);

    ZMQObj zmqObj = PoolManager.getInstance().getNextSocket();
    Socket socket = zmqObj.getSocket();
    msg.send(socket);
    msg.destroy();

    // after sending, add to retry bucket
    RetryTask.getInstance().addToRetryQueue(address, packedByteArray);
  }

  // called by multiple threads to populate dataHolderByPartitionReference CHM
  public void add(final int partition, final DataHolder holder) {
    ConcurrentMap<Integer, ConcurrentLinkedQueue<DataHolder>> dataHolderByPartition =
        dataHolderByPartitionReference.get();
    ConcurrentLinkedQueue<DataHolder> dataHolder =
        dataHolderByPartition.get(partition);
    if (dataHolder == null) {
      dataHolder = Queues.newConcurrentLinkedQueue();
      ConcurrentLinkedQueue<DataHolder> currentDataHolder =
          dataHolderByPartition.putIfAbsent(partition, dataHolder);
      if (currentDataHolder != null)
        dataHolder = currentDataHolder;
    }
    dataHolder.add(holder);
  }
}

And below is my RetryTask class:

public class RetryTask {
  private final ScheduledExecutorService executorService = Executors
      .newSingleThreadScheduledExecutor();
  private final AtomicReference<ConcurrentHashMap<Long, byte[]>> retry_bucket =
       new AtomicReference<>(new ConcurrentHashMap<Long, byte[]>());

  private static class Holder {
    private static final RetryTask INSTANCE = new RetryTask();
  }

  public static RetryTask getInstance() {
    return Holder.INSTANCE;
  }

  private RetryTask() {
    executorService.scheduleAtFixedRate(new Runnable() {
      public void run() {
        retryEvents(retry_bucket
            .getAndSet(new ConcurrentHashMap<Long, byte[]>()));
      }
    }, 0, 2, TimeUnit.MINUTES);
  }

  public void addToRetryQueue(final long address, final byte[] encodedRecord) {
    retry_bucket.get().put(address, encodedRecord);
  }

  public void removeFromRetryQueue(final long address) {
    retry_bucket.get().remove(address);
  }

  // retrying the events here
  private void retryEvents(final ConcurrentMap<Long, byte[]> bucket) {
    ZMQObj zmqObj = PoolManager.getInstance().getNextSocket();
    Socket socket = zmqObj.getSocket();

    for (Map.Entry<Long, byte[]> entry : bucket.entrySet()) {
      long address = entry.getKey();
      byte[] serializeEvent = entry.getValue();
      ZMsg msg = new ZMsg();
      msg.add(serializeEvent);
      msg.send(socket);
      msg.destroy();
    }
  }
}

And here is my ResponsePoller class which waits for the acknowledgment for all those records already sent by the other background thread. If acknowledgement is received, then delete it from the retry bucket so that it doesn't get retried.

public class ResponsePoller implements Runnable {
  private static final Random random = new Random();
  private static final int listenerPort = 8084;

  @Override
  public void run() {
    ZContext ctx = new ZContext();
    Socket client = ctx.createSocket(ZMQ.PULL);

    // Set random identity to make tracing easier
    String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
    client.setIdentity(identity.getBytes(ZMQ.CHARSET));
    client.bind("tcp://" + TestUtils.getIPAddress() + ":" + listenerPort);

    PollItem[] items = new PollItem[] {new PollItem(client, Poller.POLLIN)};

    while (!Thread.currentThread().isInterrupted()) {
      // Tick once per second, pulling in arriving messages
      for (int centitick = 0; centitick < 100; centitick++) {
        ZMQ.poll(items, 10);
        if (items[0].isReadable()) {
          ZMsg msg = ZMsg.recvMsg(client);
          Iterator<ZFrame> it = msg.iterator();
          while (it.hasNext()) {
            ZFrame frame = it.next();
            try {
              long address = TestUtils.getLocation(frame.getData());
              // remove from retry bucket since we got the acknowledgment for this record
              RetryTask.getInstance().removeFromRetryQueue(address);
            } catch (Exception ex) {
              // log error
            } finally {
              frame.destroy();
            }
          }
          msg.destroy();
        }
      }
    }
    ctx.destroy();
  }
}

Code which sends data to database is only these below lines:

ZMsg msg = new ZMsg();
msg.add(packedByteArray);

ZMQObj zmqObj = PoolManager.getInstance().getNextSocket();
Socket socket = zmqObj.getSocket();
msg.send(socket);
msg.destroy();

Question:

I am trying to see from the design perspective what is the best way to design this problem so that all my logic works seamlessly? I am pretty sure there is a better way to design this problem as compared to what I have?

Aucun commentaire:

Enregistrer un commentaire