dimanche 28 juin 2020

Multiple readers and writers without a critical section

Every writer(go-routine) has its corresponding reader(go-routine).

For communication, I have a channel hub, where every writer registers its data channel, as shown below:

// Writer hub - "Maintain the set of active Writers"
type WriterHub struct {
    Writers    map[*WriterSubscription]struct{}
    Register   chan *WriterSubscription
    Unregister chan *WriterSubscription
}

func NewHub() *WriterHub {
    return &WriterHub{
        Writers:    map[*WriterSubscription]struct{}{},
        Register:   make(chan *WriterSubscription),
        Unregister: make(chan *WriterSubscription),
    }
}

type WriterSubscription struct {
   DataChannel      chan *data.Object
   CloseDataChannel chan bool
   WriterNumber     uint
}

A separate go-routine maintains registration functionality:

func (h *WriterHub) Run() { // on a go-routine
    for {
        select {
        case Writer := <-h.Register:
            h.Writers[Writer] = struct{}{}

        case Writer := <-h.Unregister:
            if _, ok := h.Writers[Writer]; ok {
                delete(h.Writers, Writer)
            }
        }
    }
}

Each Writer registers to this hub:

func CreateWriter(hub *WriterHub, WriterCount uint, wg *sync.WaitGroup) { // invoked from main()

    // Subscribe to Writers hub
    Writer := &WriterSubscription{
        DataChannel:      make(chan *data.Object),
        CloseDataChannel: make(chan bool),
        WriterNumber:     WriterCount,
    }
    hub.Register <- Writer

    go func() {
        defer wg.Done()
        for {
            Writer.DataChannel <- data.CreateObject()
            stop := <-Writer.CloseDataChannel
            if stop == true {
                fmt.Println("Signal received")
                break
            }
        }

    }()
}

For simplicity, I have enforced reader go-routines launch only after all senders have registered, as shown below.

Based on number of senders, number of receivers gets launched, accordingly.

But the problem is,

Each reader need to access its corresponding datachannel, which makes WriterHub a critical section, as shown below:

for Writer := range WriterHub.Writers { // for each Writer
        go func() { // Launch a corresponding reader

            for {
                object := <-Writer.DataChannel          // receive
    
                if someCondition(Writer.WriterNumber) {
                    Writer.CloseDataChannel <- false
                } else {
                    Writer.CloseDataChannel <- true
                    break
                }
           }

Reader functionality is to just print the data received from sender.

What can be the approach to make every writer/reader access their coresponding DataChannel without adding them to a critical section(WriterHub)?

Aucun commentaire:

Enregistrer un commentaire