lundi 4 novembre 2019

how to structure confluent kafka consumer in go?

I am writing kafka consumer in go using confluent-kafka-go

I have two topics, topic1 and topic2. This will increase in future for sure.


Now, what are good design patterns here to follow to keep my code clean in go?

The way I have implemented is :

var topicHandler map[string]func([]byte)

func init() {
    topicHandler = make(map[string]func([]byte))
    topicHandler["topic1"] = handleTopic1
    topicHandler["topic2"] = handleTopic2
    createConsumers(topicHandler)
}

func createConsumers(topicHandlers map[string]func([]byte)) error {

        // ...
        // start consumer for all the topics in topicHandlers map key 
        // ...

    // ***** consume from the topic
    for {
        select {
        case sig := <-sigchan:
            break
        default:
            ev := c.Poll(100)
            switch e := ev.(type) {
            case *kafka.Message:
                topicSpecificHanderFunc := topicHandlers[*e.TopicPartition.Topic]
                topicSpecificHanderFunc(e.Value)

            case kafka.Error:
                fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
            }
        }
    }
    return nil
}

func handleTopic1(msg []byte) {
    fmt.Printf("%v", string(msg))
}

func handleTopic2(msg []byte) {
    fmt.Printf("%v", string(msg))
}

So in future, if anybody needs to write consumer for topic3, he will do 2 things...

  1. topicHandler["topic3"] = handleTopic3
  2. create func handleTopic3(msg []byte) {} function to handle the incoming msg for topic3

    Is it good to go or is there anything you can suggest to improve??

    Any suggestion is very welcome. Thanks

Aucun commentaire:

Enregistrer un commentaire