jeudi 28 décembre 2017

Release a socket back depending on whether they are live or dead in its own pool?

I am using below class to send data to our messaging queue by using socket either in a synchronous way or asynchronous way as shown below. It depends on requirement whether I want to call synchronous or asynchronous method to send data on a socket. Most of the times we will send data through aysnc way but sometimes I may need to send data through sync way.

  • sendAsync - It sends data asynchronously without any timeout. If acknowledgment is not received then it will retry again from the background thread which is started in SendToQueue constructor only.
  • send - It sends data synchronously on a socket. It internally calls doSendAsync method and then sleep for a particular timeout period and if acknowledgment is not received then it removes from cache bucket so that we don't retry again.

So the only difference between those two above methods is - For async case, I need to retry at all cost if acknowledgment is not received but for sync I don't need to retry at all and that's why I am storing more state in a PendingMessage class.

ResponsePoller is a class which receives the acknowledgment for the data that was sent to our messaging queue on a particular socket and then calls handleAckReceived method below to remove the address so that we don't retry after receiving the acknowledgment. If acknowledgment is received then socket is live otherwise it is dead.

public class SendToQueue {
  private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
  private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder()
          .maximumSize(1000000)
          .concurrencyLevel(100)
          .build();

  private static class PendingMessage {
    private final long _address;
    private final byte[] _encodedRecords;
    private final Socket _socket;
    private final boolean _retryEnabled;
    private final Object _monitor = new Object();
    private long _sendTimeMillis;
    private volatile boolean _acknowledged;

    public PendingMessage(long address, byte[] encodedRecords, Socket socket, boolean retryEnabled) {
      _address = address;
      _sendTimeMillis = System.currentTimeMillis();
      _encodedRecords = encodedRecords;
      _socket = socket;
      _retryEnabled = retryEnabled;
    }

    public synchronized boolean hasExpired() {
      return System.currentTimeMillis() - _sendTimeMillis > 500L;
    }

    public synchronized void markResent() {
      _sendTimeMillis = System.currentTimeMillis();
    }

    public boolean shouldRetry() {
      return _retryEnabled && !_acknowledged;
    }

    public boolean waitForAck() {
      try {
        synchronized (_monitor) {
          _monitor.wait(500L);
        }
        return _acknowledged;
      } catch (InterruptedException ie) {
        return false;
      }
    }

    public void ackReceived() {
      _acknowledged = true;
      synchronized (_monitor) {
        _monitor.notifyAll();
      }
    }

    public long getAddress() {
      return _address;
    }

    public byte[] getEncodedRecords() {
      return _encodedRecords;
    }

    public Socket getSocket() {
      return _socket;
    }
  }

  private static class Holder {
    private static final SendToQueue INSTANCE = new SendToQueue();
  }

  public static SendToQueue getInstance() {
    return Holder.INSTANCE;
  }

  private void handleRetries() {
    List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
    for (PendingMessage m : messages) {
      if (m.hasExpired()) {
        if (m.shouldRetry()) {
          m.markResent();
          doSendAsync(m, m.getSocket());
        } else {
          cache.invalidate(m.getAddress());
        }
      }
    }
  }

  private SendToQueue() {
    executorService.submit(new ResponsePoller()); // another thread which receives acknowledgment
                                                  // and then delete entry from the cache
                                                  // accordingly.
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        handleRetries();
      }
    }, 0, 1, TimeUnit.SECONDS);
  }

  public boolean sendAsync(final long address, final byte[] encodedRecords, final Socket socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, socket, true);
    cache.put(address, m);
    return doSendAsync(m, socket);
  }

  private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      return msg.send(socket);
    } finally {
      msg.destroy();
    }
  }

  public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);
    cache.put(address, m);
    try {
      if (doSendAsync(m, socket)) {
        return m.waitForAck();
      }
      return false;
    } finally {
      cache.invalidate(address);
    }
  }

  // called by acknowledgment thread which is in ResponsePoller class
  public void handleAckReceived(final long address) {
    PendingMessage m = cache.getIfPresent(address);
    if (m != null) {
      m.ackReceived();
      cache.invalidate(address);
    }
  }
}

As I am sending data on a socket and if I get the acknowledgment back for the same data then it means Socket is alive but if data is not acknowledge back then it means socket is dead (but I will keep retrying to send the data). So with my above design (or if there is any better way), how can I figure out whether any socket is dead or live because either acknowledgment was not received or it was received from that socket and basis on that I need to release the socket back into its pool (whether it is alive or dead) by calling below method depending on whether acknowledgment is received or not either for sync or async case.

I also need to configure count that if acknowledgment is not received on a particular socket for x (where x is a number > 0) times then only mark a socket dead. What is the best and efficient way to do this thing?

SocketManager.getInstance().releaseSocket(socket, SocketState.LIVE);
SocketManager.getInstance().releaseSocket(socket, SocketState.DEAD);

Aucun commentaire:

Enregistrer un commentaire