vendredi 2 octobre 2020

How to communicate between multiple goroutines with for loops with blocking function calls inside one of them

I'm writing a Go app which accepts a websocket connection, then starts:

  1. listen goroutine which listens on the connection for client messages and sends response for the client based on the received message via channel to updateClient.
  2. updateClient goroutine which writes to the connection.
  3. processExternalData goroutine which receives data from message queue, sends the data to updateClient via a channel so that updateClient can update the client with the data.

I'm using gorilla library for websocket connections, and its read call is blocking. In addition, both its write and read methods don't support concurrent calls, which is the main reason I have the updateClient goroutine which is the single routine which calls write method.

The problem arises when I need to close the connection which can happen at least in 2 cases:

  1. The client closed the connection or error occurred during read.
  2. processExternalData finished, there's no more data to update the client and the connection should be closed.

So updateClient needs to somehow notify listen to quit and vice versa listen needs to somehow notify updateClient to quit. updateClient has a quit channel inside select but listen can't have select because it already has a for loop with blocking read call inside.

So what I did is I added isJobFinished field on the connection type which is a condition for for loop to work:

type WsConnection struct {
    connection    *websocket.Conn
    writeChan     chan messageWithCb
    quitChan      chan bool
    isJobFinished bool
    userID        string
}

func processExternalData() {
    // receive data from message queue
    // send it to client via writeChan
}

func (conn *WsConnection) listen() {
    defer func() {
        conn.connection.Close()
        conn.quitChan <- true
    }()

    // keep the loop for communication with client
    for !conn.isJobFinished {
        _, message, err := conn.connection.ReadMessage()
        if err != nil {
            log.Println("read:", err)
            break

        }
        // convert message to type messageWithCb
        switch clientMessage.MessageType {
        case userNotFound:
            conn.writeChan <- messageWithCb{
                message: map[string]interface{}{
                    "type":    user,
                    "payload": false,
                },
            }
        default:
            log.Printf("Unknown message type received: %v", clientMessage)
        }
    }
    log.Println("end of listen")
}

func updateClient(w http.ResponseWriter, req *http.Request) {
    upgrader.CheckOrigin = func(req *http.Request) bool {
        return true
    }
    connection, err := upgrader.Upgrade(w, req, nil)
    if err != nil {
        log.Print("upgrade:", err)
        return
    }
    wsConn := &WsConnection{
        connection: connection,
        writeChan:  make(chan messageWithCb),
        quitChan:   make(chan bool),
    }
    go wsConn.listen()
    for {
        select {
        case msg := <-wsConn.writeChan:
            err := connection.WriteJSON(msg.message)
            if err != nil {
                log.Println("connection.WriteJSON error: ", err)
            }
            if wsConn.isJobFinished {
                if msg.callback != nil {
                    msg.callback() // sends on `wsConn.quitChan` from a goroutine
                }
            }
        case <-wsConn.quitChan:
            // clean up
            wsConn.connection.Close()
            close(wsConn.writeChan)
            return
        }
    }
}

I'm wondering if a better pattern exists in Go for such cases. Specifically, I'd like to be able to have a quit channel inside listen as well so updateClient can notify it to quit instead of maintaining isJobFinished field. Also in this case there's no danger of not protecting isJobFinished field because only one method writes to it but if the logic gets more complicated then having to protect the field inside the for loop in listen will probably negatively impact the performance.

Also I can't close the quiteChan because both listen and updateClient use it and there's no way to know for them when it's closed by another one.

Aucun commentaire:

Enregistrer un commentaire