Add eventDownstreamDisconnected

This should remove the need for protecting user.downstreamConns with a
mutex.
This commit is contained in:
Simon Ser 2020-03-27 17:55:03 +01:00
parent 36ab6ece09
commit c0f5850e5b
No known key found for this signature in database
GPG key ID: 0FDE7BE0E88F5E48
3 changed files with 18 additions and 25 deletions

View file

@ -310,10 +310,6 @@ func (dc *downstreamConn) Close() error {
return fmt.Errorf("downstream connection already closed") return fmt.Errorf("downstream connection already closed")
} }
if u := dc.user; u != nil {
u.removeDownstream(dc)
}
close(dc.closed) close(dc.closed)
return nil return nil
} }
@ -757,16 +753,9 @@ func (dc *downstreamConn) runNetwork(net *network, loadHistory bool) {
seq := consumer.Close() seq := consumer.Close()
// TODO: need to take dc.network into account here
dc.user.lock.Lock()
lastDownstream := len(dc.user.downstreamConns) == 0
dc.user.lock.Unlock()
if lastDownstream {
net.lock.Lock() net.lock.Lock()
net.history[historyName] = seq net.history[historyName] = seq
net.lock.Unlock() net.lock.Unlock()
}
}() }()
} }

View file

@ -123,6 +123,7 @@ func (s *Server) Serve(ln net.Listener) error {
if err := dc.readMessages(dc.user.events); err != nil { if err := dc.readMessages(dc.user.events); err != nil {
dc.logger.Print(err) dc.logger.Print(err)
} }
dc.user.events <- eventDownstreamDisconnected{dc}
} }
dc.Close() dc.Close()

25
user.go
View file

@ -23,6 +23,10 @@ type eventDownstreamConnected struct {
dc *downstreamConn dc *downstreamConn
} }
type eventDownstreamDisconnected struct {
dc *downstreamConn
}
type network struct { type network struct {
Network Network
user *user user *user
@ -169,6 +173,16 @@ func (u *user) run() {
u.lock.Lock() u.lock.Lock()
u.downstreamConns = append(u.downstreamConns, dc) u.downstreamConns = append(u.downstreamConns, dc)
u.lock.Unlock() u.lock.Unlock()
case eventDownstreamDisconnected:
dc := e.dc
u.lock.Lock()
for i := range u.downstreamConns {
if u.downstreamConns[i] == dc {
u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
break
}
}
u.lock.Unlock()
case eventDownstreamMessage: case eventDownstreamMessage:
msg, dc := e.msg, e.dc msg, dc := e.msg, e.dc
if dc.isClosed() { if dc.isClosed() {
@ -189,17 +203,6 @@ func (u *user) run() {
} }
} }
func (u *user) removeDownstream(dc *downstreamConn) {
u.lock.Lock()
for i := range u.downstreamConns {
if u.downstreamConns[i] == dc {
u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
break
}
}
u.lock.Unlock()
}
func (u *user) createNetwork(net *Network) (*network, error) { func (u *user) createNetwork(net *Network) (*network, error) {
if net.ID != 0 { if net.ID != 0 {
panic("tried creating an already-existing network") panic("tried creating an already-existing network")