dimanche 28 juin 2020

How to create many-many communication among go-routines, without a critical section?

Every writer (goroutine) has its corresponding reader (goroutine).

Buffered channel does not give guarantee of delivery, so I have used unbuffered channel


For communication, every writer registers its data channel, in the below hub:

// Writer hub - "Maintain the set of active Writers"
type WriterHub struct {
    Writers    map[*WriterSubscription]struct{}
    Register   chan *WriterSubscription
    Unregister chan *WriterSubscription
}

func NewHub() *WriterHub {
    return &WriterHub{
        Writers:    map[*WriterSubscription]struct{}{},
        Register:   make(chan *WriterSubscription),
        Unregister: make(chan *WriterSubscription),
    }
}

type WriterSubscription struct {
   DataChannel      chan *data.Object
   CloseDataChannel chan bool
   WriterNumber     uint
}

A separate goroutine maintains registration functionality:

func (hub *WriterHub) Run() { // on a go-routine
    for {
        select {
        case Writer := <-hub.Register:
            hub.Writers[Writer] = struct{}{}

        case Writer := <-hub.Unregister:
            if _, ok := hub.Writers[Writer]; ok {
                delete(hub.Writers, Writer)
            }
        }
    }
}

Each Writer registers to this hub:

func CreateWriter(hub *WriterHub, WriterCount uint, wg *sync.WaitGroup) { // invoked from main()

    // Subscribe to Writers hub
    Writer := &WriterSubscription{
        DataChannel:      make(chan *data.Object),
        CloseDataChannel: make(chan bool),
        WriterNumber:     WriterCount,
    }
    hub.Register <- Writer

    go func() {
        defer wg.Done()
        for {
            Writer.DataChannel <- data.CreateObject()
            stop := <-Writer.CloseDataChannel
            if stop == true {
                hub.Unregister <- Writer
                fmt.Println("Signal received")
                break
            }
        }

    }()
}

For simplicity, reader goroutines launch only after all writers have registered with Writerhub

Below is the code for launching readers.

Based on number of writers, equivalent number of readers get launched, in the below code.

for Writer := range hub.Writers { // for each Writer
        go func() { // Launch a corresponding reader

            for {
                object := <-Writer.DataChannel          // receive
                print(object)
    
                if someCondition(Writer.WriterNumber) {
                    Writer.CloseDataChannel <- false
                } else {
                    Writer.CloseDataChannel <- true
                    break
                }
             }
         }
  }

Invoking CreateWriter() twice from main(), two writers(go-routines) gets launched & register their channels successfully in channel hub(WriterHub).


But,

go install -race option gives,

Line: for Writer := range hub.Writers

&

Line: hub.Writers[Writer] = struct{}{} in Run()

as data race.

Reason is, WriterHub is in race condition, due to which, second reader is not launched.

Need to access WriterHub as a critical section.


Is there a concurrency pattern (in Go) to perform many-many communication using unbuffered channel? without having a critical section

Aucun commentaire:

Enregistrer un commentaire