Every writer (goroutine) has its corresponding reader (goroutine).
Buffered channel does not give guarantee of delivery, so I have used unbuffered channel
For communication, every writer registers its data channel, in the below hub:
// Writer hub - "Maintain the set of active Writers"
type WriterHub struct {
Writers map[*WriterSubscription]struct{}
Register chan *WriterSubscription
Unregister chan *WriterSubscription
}
func NewHub() *WriterHub {
return &WriterHub{
Writers: map[*WriterSubscription]struct{}{},
Register: make(chan *WriterSubscription),
Unregister: make(chan *WriterSubscription),
}
}
type WriterSubscription struct {
DataChannel chan *data.Object
CloseDataChannel chan bool
WriterNumber uint
}
A separate goroutine maintains registration functionality:
func (hub *WriterHub) Run() { // on a go-routine
for {
select {
case Writer := <-hub.Register:
hub.Writers[Writer] = struct{}{}
case Writer := <-hub.Unregister:
if _, ok := hub.Writers[Writer]; ok {
delete(hub.Writers, Writer)
}
}
}
}
Each Writer registers to this hub:
func CreateWriter(hub *WriterHub, WriterCount uint, wg *sync.WaitGroup) { // invoked from main()
// Subscribe to Writers hub
Writer := &WriterSubscription{
DataChannel: make(chan *data.Object),
CloseDataChannel: make(chan bool),
WriterNumber: WriterCount,
}
hub.Register <- Writer
go func() {
defer wg.Done()
for {
Writer.DataChannel <- data.CreateObject()
stop := <-Writer.CloseDataChannel
if stop == true {
hub.Unregister <- Writer
fmt.Println("Signal received")
break
}
}
}()
}
For simplicity, reader goroutines launch only after all writers have registered with Writerhub
Below is the code for launching readers.
Based on number of writers, equivalent number of readers get launched, in the below code.
for Writer := range hub.Writers { // for each Writer
go func() { // Launch a corresponding reader
for {
object := <-Writer.DataChannel // receive
print(object)
if someCondition(Writer.WriterNumber) {
Writer.CloseDataChannel <- false
} else {
Writer.CloseDataChannel <- true
break
}
}
}
}
Invoking CreateWriter()
twice from main()
, two writers(go-routines) gets launched & register their channels successfully in channel hub(WriterHub
).
But,
go install -race
option gives,
Line: for Writer := range hub.Writers
&
Line: hub.Writers[Writer] = struct{}{}
in Run()
as data race.
Reason is, WriterHub
is in race condition, due to which, second reader is not launched.
Need to access WriterHub
as a critical section.
Is there a concurrency pattern (in Go) to perform many-many communication using unbuffered channel? without having a critical section
Aucun commentaire:
Enregistrer un commentaire