From c0f5850e5b66b37ea02b09ed05fd78aa836300ff Mon Sep 17 00:00:00 2001 From: Simon Ser Date: Fri, 27 Mar 2020 17:55:03 +0100 Subject: [PATCH] Add eventDownstreamDisconnected This should remove the need for protecting user.downstreamConns with a mutex. --- downstream.go | 17 +++-------------- server.go | 1 + user.go | 25 ++++++++++++++----------- 3 files changed, 18 insertions(+), 25 deletions(-) diff --git a/downstream.go b/downstream.go index 3166b1a..8207c41 100644 --- a/downstream.go +++ b/downstream.go @@ -310,10 +310,6 @@ func (dc *downstreamConn) Close() error { return fmt.Errorf("downstream connection already closed") } - if u := dc.user; u != nil { - u.removeDownstream(dc) - } - close(dc.closed) return nil } @@ -757,16 +753,9 @@ func (dc *downstreamConn) runNetwork(net *network, loadHistory bool) { seq := consumer.Close() - // TODO: need to take dc.network into account here - dc.user.lock.Lock() - lastDownstream := len(dc.user.downstreamConns) == 0 - dc.user.lock.Unlock() - - if lastDownstream { - net.lock.Lock() - net.history[historyName] = seq - net.lock.Unlock() - } + net.lock.Lock() + net.history[historyName] = seq + net.lock.Unlock() }() } diff --git a/server.go b/server.go index 3666c53..5326b47 100644 --- a/server.go +++ b/server.go @@ -123,6 +123,7 @@ func (s *Server) Serve(ln net.Listener) error { if err := dc.readMessages(dc.user.events); err != nil { dc.logger.Print(err) } + dc.user.events <- eventDownstreamDisconnected{dc} } dc.Close() diff --git a/user.go b/user.go index 11f972c..384e719 100644 --- a/user.go +++ b/user.go @@ -23,6 +23,10 @@ type eventDownstreamConnected struct { dc *downstreamConn } +type eventDownstreamDisconnected struct { + dc *downstreamConn +} + type network struct { Network user *user @@ -169,6 +173,16 @@ func (u *user) run() { u.lock.Lock() u.downstreamConns = append(u.downstreamConns, dc) u.lock.Unlock() + case eventDownstreamDisconnected: + dc := e.dc + u.lock.Lock() + for i := range u.downstreamConns { + if u.downstreamConns[i] == dc { + u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...) + break + } + } + u.lock.Unlock() case eventDownstreamMessage: msg, dc := e.msg, e.dc if dc.isClosed() { @@ -189,17 +203,6 @@ func (u *user) run() { } } -func (u *user) removeDownstream(dc *downstreamConn) { - u.lock.Lock() - for i := range u.downstreamConns { - if u.downstreamConns[i] == dc { - u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...) - break - } - } - u.lock.Unlock() -} - func (u *user) createNetwork(net *Network) (*network, error) { if net.ID != 0 { panic("tried creating an already-existing network")