Below function(BroadcastMessage
) broadcast messages to subscribers(PublisherHub.subscribers
). BroadcastMessage
establishes a connection from source(PCF firehose)
In the below case, BroadcastMessage()
establishing a connection from PCF firehose using api consumer.New()
(https://github.com/cloudfoundry/noaa/blob/master/consumer/consumer.go#L78):
func BroadcastMessage(publisherHub *publisher.PublisherHub) {
....
cnsmr := consumer.New(dopplerAddress, &tls.Config{InsecureSkipVerify: true}, nil)
....
var (
msgChan <-chan *events.Envelope
errorChan <-chan error
)
switch *filterType {
case "logs":
msgChan, errorChan = cnsmr.FilteredFirehose(firehoseSubscriptionId, authToken, consumer.LogMessages)
case "metrics":
msgChan, errorChan = cnsmr.FilteredFirehose(firehoseSubscriptionId, authToken, consumer.Metrics)
default:
msgChan, errorChan = cnsmr.Firehose(firehoseSubscriptionId, authToken)
}
go func() {
for err := range errorChan {
fmt.Fprintf(os.Stderr, "%v\n", err.Error())
}
}()
for msg := range msgChan {
publisherHub.Broadcast <- msg
}
}
publisherHub
broadcast msg
to all subscribers where type PublisherHub
maintains the hub of subscribers:
type PublisherHub struct {
subscribers map[*subscriptionmediator.HandlerSubscription]struct{}
Register chan *subscriptionmediator.HandlerSubscription
Unregister chan *subscriptionmediator.HandlerSubscription
Broadcast chan *events.Envelope
}
main() blocks in BroadcastMessage()
until explicit shutdown through SIGTERM
func main() {
....
publisherHub := publisher.NewHub()
go publisherHub.Run()
....
go loadbroadcast.BroadcastMessage(publisherHub)
....
}
But in production this connection(consumer.New()
) is being lost due to loss of network connectivity and all the subscribers are getting affected.
How to handle loss of connection in BroadcastMessage
, so that subscribers can receive messages seamlessly? Because Broadcastmessage()
is already in an infinite loop(for msg := range msgChan
)
Aucun commentaire:
Enregistrer un commentaire