samedi 18 février 2017

Send and retry data if data was not received?

I am working on a project where I need to do below things:

  • Send certain data on a particular socket to another system. I have to send a particular byte array on a given socket. Each byte array has a unique long address.
  • And then keep retrying to send same data by using either of the RetryStrategy implemented below.
  • Start a background poller thread which tells you whether the data you sent was received or not at the other system. If it was received, then we will remove it from the pending queue so that it doesn't get retried and if for whatever reason it was not received, then we will retry sending same data again with the RetryStrategy we used.

For example: If we have sent byteArrayA which has unique long address as addressA and if it was recived at the other system, then my poller thread will get this addressA back as an acknowledgement meaning it was received so now we can remove this address from the pending queue so that it doesn't get retried again.

I have two RetryStrategy implemeted ConstantBackoff and ExponentialBackoff. So I came up with below simulator which simulates the above flow.

public class Experimental {
  /** Return the desired backoff delay in millis for the given retry number, which is 1-based. */
  interface RetryStrategy {
    long getDelayMs(int retry);
  }

  public enum ConstantBackoff implements RetryStrategy {
    INSTANCE;
    @Override
    public long getDelayMs(int retry) {
      return 1000L;
    }
  }

  public enum ExponentialBackoff implements RetryStrategy {
    INSTANCE;
    @Override
    public long getDelayMs(int retry) {
      return 100 + (1L << retry);
    }
  }

  /** A container that sends messages with retries. */    
  private static class Sender {
    private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(20);
    private final ConcurrentMap<Long, Retrier> pending = new ConcurrentHashMap<>();

    /** Send the given (simulated) data with given address on the given socket. */
    void sendTo(long addr, byte[] data, int socket) {
      System.err.println("Sending " + Arrays.toString(data) + "@" + addr + " on " + socket);
    }

    /** The state of a message that's being retried. */
    private class Retrier implements Runnable {
      private final RetryStrategy retryStrategy;
      private final long addr;
      private final byte[] data;
      private final int socket;
      private int retry;
      private Future<?> future;

      Retrier(RetryStrategy retryStrategy, long addr, byte[] data, int socket) {
        this.retryStrategy = retryStrategy;
        this.addr = addr;
        this.data = data;
        this.socket = socket;
        this.retry = 0;
      }

      private synchronized void start() {
        if (future == null) {
          future = executorService.submit(this);
          pending.put(addr, this);
        }
      }

      private synchronized void cancel() {
        if (future != null) {
          future.cancel(true);
          future = null;
        }
      }

      private synchronized void reschedule() {
        if (future != null) {
          future = executorService.schedule(this, retryStrategy.getDelayMs(++retry), MILLISECONDS);
        }
      }

      @Override
      synchronized public void run() {
        sendTo(addr, data, socket);
        reschedule();
      }
    }

   /** 
    * Get a (simulated) verified message address. Just picks a pending 
    * one. Returns zero if none left.
    */      
    long getVerifiedAddr() {
      System.err.println("Pending messages: " + pending.size());
      Iterator<Long> i = pending.keySet().iterator();
      long addr = i.hasNext() ? i.next() : 0;
      return addr;
    }

    /** A polling loop that cancels retries of (simulated) verified messages. */        
    class CancellationPoller implements Runnable {
      @Override
      public void run() {
        while (!Thread.currentThread().isInterrupted()) {
          try {
            Thread.sleep(1000);
          } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
          }
          long addr = getVerifiedAddr();
          if (addr == 0) {
            continue;
          }
          System.err.println("Verified message (to be cancelled) " + addr);
          Retrier retrier = pending.remove(addr);
          if (retrier != null) {
            retrier.cancel();
          }
        }
      }
    }

    private Sender initialize() {
      executorService.submit(new CancellationPoller());
      return this;
    }

    private void sendWithRetriesTo(RetryStrategy retryStrategy, long addr, byte[] data, int socket) {
      new Retrier(retryStrategy, addr, data, socket).start();
    }
  }

  public static void main(String[] args) {
    Sender sender = new Sender().initialize();
    for (long i = 1; i <= 10; i++) {
      sender.sendWithRetriesTo(ConstantBackoff.INSTANCE, i, null, 42);
    }
    for (long i = -1; i >= -10; i--) {
      sender.sendWithRetriesTo(ExponentialBackoff.INSTANCE, i, null, 37);
    }
  }
}

I wanted to see is there any race condition in the above code or any thread safety issues? Since getting stuff right in multithreaded is tough.

Let me know if there is any better or efficient way to do same thing.

Aucun commentaire:

Enregistrer un commentaire