I'm writing a Go app which accepts a websocket connection, then starts:
listengoroutine which listens on the connection for client messages and sends response for the client based on the received message via channel toupdateClient.updateClientgoroutine which writes to the connection.processExternalDatagoroutine which receives data from message queue, sends the data toupdateClientvia a channel so thatupdateClientcan update the client with the data.
I'm using gorilla library for websocket connections, and its read call is blocking. In addition, both its write and read methods don't support concurrent calls, which is the main reason I have the updateClient goroutine which is the single routine which calls write method.
The problem arises when I need to close the connection which can happen at least in 2 cases:
- The client closed the connection or error occurred during read.
processExternalDatafinished, there's no more data to update the client and the connection should be closed.
So updateClient needs to somehow notify listen to quit and vice versa listen needs to somehow notify updateClient to quit. updateClient has a quit channel inside select but listen can't have select because it already has a for loop with blocking read call inside.
So what I did is I added isJobFinished field on the connection type which is a condition for for loop to work:
type WsConnection struct {
connection *websocket.Conn
writeChan chan messageWithCb
quitChan chan bool
isJobFinished bool
userID string
}
func processExternalData() {
// receive data from message queue
// send it to client via writeChan
}
func (conn *WsConnection) listen() {
defer func() {
conn.connection.Close()
conn.quitChan <- true
}()
// keep the loop for communication with client
for !conn.isJobFinished {
_, message, err := conn.connection.ReadMessage()
if err != nil {
log.Println("read:", err)
break
}
// convert message to type messageWithCb
switch clientMessage.MessageType {
case userNotFound:
conn.writeChan <- messageWithCb{
message: map[string]interface{}{
"type": user,
"payload": false,
},
}
default:
log.Printf("Unknown message type received: %v", clientMessage)
}
}
log.Println("end of listen")
}
func updateClient(w http.ResponseWriter, req *http.Request) {
upgrader.CheckOrigin = func(req *http.Request) bool {
return true
}
connection, err := upgrader.Upgrade(w, req, nil)
if err != nil {
log.Print("upgrade:", err)
return
}
wsConn := &WsConnection{
connection: connection,
writeChan: make(chan messageWithCb),
quitChan: make(chan bool),
}
go wsConn.listen()
for {
select {
case msg := <-wsConn.writeChan:
err := connection.WriteJSON(msg.message)
if err != nil {
log.Println("connection.WriteJSON error: ", err)
}
if wsConn.isJobFinished {
if msg.callback != nil {
msg.callback() // sends on `wsConn.quitChan` from a goroutine
}
}
case <-wsConn.quitChan:
// clean up
wsConn.connection.Close()
close(wsConn.writeChan)
return
}
}
}
I'm wondering if a better pattern exists in Go for such cases. Specifically, I'd like to be able to have a quit channel inside listen as well so updateClient can notify it to quit instead of maintaining isJobFinished field. Also in this case there's no danger of not protecting isJobFinished field because only one method writes to it but if the logic gets more complicated then having to protect the field inside the for loop in listen will probably negatively impact the performance.
Also I can't close the quiteChan because both listen and updateClient use it and there's no way to know for them when it's closed by another one.
Aucun commentaire:
Enregistrer un commentaire