vendredi 9 décembre 2016

How to run multiple kafka consumers on the same box independent of each other?

I have two Kafka consumer ConsumerA and ConsumerB. I want to run these two kafka consumers independent of each other on the same machine. There is no relation between them at all. These two kafka consumer will work on different topics on the same machine.

  • Each consumer should have a different Properties object.
  • Each consumer should have a different thread pool configuration since they can be run in multithreaded way if needed independent of other consumer.

Below is my design:

Consumer class (abstract):

 public abstract class Consumer implements Runnable {
    private final Properties consumerProps;
    private final String consumerName;

    public Consumer(String consumerName, Properties consumerProps) {
        this.consumerName = consumerName;
        this.consumerProps = consumerProps;
    }

    protected abstract void shutdown();
    protected abstract void run(String consumerName, Properties consumerProps);

    @Override
    public final void run() {
        run(consumerName, consumerProps);
    }
}

ConsumerA class:

public class ConsumerA extends Consumer {
    private KafkaConsumer<byte[], byte[]> consumer;

    public ConsumerA(String consumerName, Properties consumerProps) {
        super(consumerName, consumerProps);
    }

    @Override
    public void shutdown() {
        consumer.wakeup();
    }

    @Override
    protected void run(String consumerName, Properties consumerProps) {
        consumer = new KafkaConsumer<>(consumerProps);
        consumer.subscribe(getTopicsBasisOnConsumerName());

        Map<String, Object> config = new HashMap<>();
        config.put(Config.URLS, TEST_URL);
        GenericRecordDomainDataDecoder decoder = new GenericRecordDomainDataDecoder(config);

        while (true) {
        ConsumerRecords<byte[], byte[]> records = consumer.poll(Long.MAX_VALUE);
            for (ConsumerRecord<byte[], byte[]> record : records) {
              GenericRecord payload = decoder.decode(record.value());
                // extract data from payload
            }
        }           
    }
}

ConsumerA B class:

// similar to `ConsumerA` but with specific details of B

ConsumerHandler class:

public final class ConsumerHandler {
  private final ExecutorService executorServiceConsumer;
  private final Consumer consumer;
  private final List<Consumer> consumers = new ArrayList<>();

  public ConsumerHandler(Consumer consumer, int poolSize) {
    this.executorServiceConsumer = Executors.newFixedThreadPool(poolSize);
    this.consumer = consumer;
    for (int i = 0; i < poolSize; i++) {
      this.consumers.add(consumer);
      executorServiceConsumer.submit(consumer);
    }
 }
  public void shutdown() {
    Runtime.getRuntime().addShutdownHook(new Thread() {
      @Override
      public void run() {
        for (Consumer consumer : consumers) {
          consumer.shutdown();
        }         
        executorServiceConsumer.shutdown();
        try {
          executorServiceConsumer.awaitTermination(1000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException ex) {
          Thread.currentThread().interrupt();
        }
      }
    });
  }
}

Below is my main class in one of my project where if I start my server, calls will come first automatically and from this place I start my all kafka consumers where I execute my ConsumerA and ConsumerB. And as soon as shutdown is called, I release all the resources by calling shutdown on all my Kafka consumers.

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Singleton;

@Singleton
@DependencyInjectionInitializer
public class Initializer {
  private final ConsumerHandler consumerHandlerA;
  private final ConsumerHandler consumerHandlerB;

  @PostConstruct
  public void init() {
    consumerHandlerA = new ConsumerHandler (new ConsumerA("consumerA", getConsumerPropsA()), 3);
    consumerHandlerB = new ConsumerHandler (new ConsumerB("consumerB", getConsumerPropsB()), 3);
  }

  @PreDestroy
  public void shutdown() {
    consumerHandlerA.shutdown();
    consumerHandlerB.shutdown();
  }
}

Is this the right design for this kind of problem where I want to run multiple kafka consumers on the same box? Let me know if there is any better and efficient way to solve this problem. In general I will be running three or four Kafka consumers max on the same box.

Aucun commentaire:

Enregistrer un commentaire