I'm writing a Go app which accepts a websocket connection, then starts:
listen
goroutine which listens on the connection for client messages and sends response for the client based on the received message via channel toupdateClient
.updateClient
goroutine which writes to the connection.processExternalData
goroutine which receives data from message queue, sends the data toupdateClient
via a channel so thatupdateClient
can update the client with the data.
I'm using gorilla library for websocket connections, and its read call is blocking. In addition, both its write and read methods don't support concurrent calls, which is the main reason I have the updateClient
goroutine which is the single routine which calls write method.
The problem arises when I need to close the connection which can happen at least in 2 cases:
- The client closed the connection or error occurred during read.
processExternalData
finished, there's no more data to update the client and the connection should be closed.
So updateClient
needs to somehow notify listen
to quit and vice versa listen
needs to somehow notify updateClient
to quit. updateClient
has a quit channel inside select
but listen
can't have select
because it already has a for
loop with blocking read call inside.
So what I did is I added isJobFinished
field on the connection type which is a condition for for
loop to work:
type WsConnection struct {
connection *websocket.Conn
writeChan chan messageWithCb
quitChan chan bool
isJobFinished bool
userID string
}
func processExternalData() {
// receive data from message queue
// send it to client via writeChan
}
func (conn *WsConnection) listen() {
defer func() {
conn.connection.Close()
conn.quitChan <- true
}()
// keep the loop for communication with client
for !conn.isJobFinished {
_, message, err := conn.connection.ReadMessage()
if err != nil {
log.Println("read:", err)
break
}
// convert message to type messageWithCb
switch clientMessage.MessageType {
case userNotFound:
conn.writeChan <- messageWithCb{
message: map[string]interface{}{
"type": user,
"payload": false,
},
}
default:
log.Printf("Unknown message type received: %v", clientMessage)
}
}
log.Println("end of listen")
}
func updateClient(w http.ResponseWriter, req *http.Request) {
upgrader.CheckOrigin = func(req *http.Request) bool {
return true
}
connection, err := upgrader.Upgrade(w, req, nil)
if err != nil {
log.Print("upgrade:", err)
return
}
wsConn := &WsConnection{
connection: connection,
writeChan: make(chan messageWithCb),
quitChan: make(chan bool),
}
go wsConn.listen()
for {
select {
case msg := <-wsConn.writeChan:
err := connection.WriteJSON(msg.message)
if err != nil {
log.Println("connection.WriteJSON error: ", err)
}
if wsConn.isJobFinished {
if msg.callback != nil {
msg.callback() // sends on `wsConn.quitChan` from a goroutine
}
}
case <-wsConn.quitChan:
// clean up
wsConn.connection.Close()
close(wsConn.writeChan)
return
}
}
}
I'm wondering if a better pattern exists in Go for such cases. Specifically, I'd like to be able to have a quit channel inside listen
as well so updateClient
can notify it to quit instead of maintaining isJobFinished
field. Also in this case there's no danger of not protecting isJobFinished
field because only one method writes to it but if the logic gets more complicated then having to protect the field inside the for
loop in listen
will probably negatively impact the performance.
Also I can't close the quiteChan
because both listen
and updateClient
use it and there's no way to know for them when it's closed by another one.
Aucun commentaire:
Enregistrer un commentaire