Add user.{add,remove}Downstream

This commit is contained in:
Simon Ser 2020-03-23 16:05:00 +01:00
parent c511a3c895
commit 015281ed35
No known key found for this signature in database
GPG key ID: 0FDE7BE0E88F5E48
2 changed files with 24 additions and 12 deletions

View file

@ -304,14 +304,7 @@ func (dc *downstreamConn) Close() error {
} }
if u := dc.user; u != nil { if u := dc.user; u != nil {
u.lock.Lock() u.removeDownstream(dc)
for i := range u.downstreamConns {
if u.downstreamConns[i] == dc {
u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
break
}
}
u.lock.Unlock()
} }
close(dc.closed) close(dc.closed)
@ -660,10 +653,7 @@ func (dc *downstreamConn) register() error {
dc.username = dc.user.Username dc.username = dc.user.Username
dc.logger.Printf("registration complete for user %q", dc.username) dc.logger.Printf("registration complete for user %q", dc.username)
dc.user.lock.Lock() firstDownstream := dc.user.addDownstream(dc)
firstDownstream := len(dc.user.downstreamConns) == 0
dc.user.downstreamConns = append(dc.user.downstreamConns, dc)
dc.user.lock.Unlock()
dc.SendMessage(&irc.Message{ dc.SendMessage(&irc.Message{
Prefix: dc.srv.prefix(), Prefix: dc.srv.prefix(),
@ -707,6 +697,7 @@ func (dc *downstreamConn) register() error {
historyName := dc.rawUsername historyName := dc.rawUsername
// TODO: need to take dc.network into account here
var seqPtr *uint64 var seqPtr *uint64
if firstDownstream { if firstDownstream {
uc.network.lock.Lock() uc.network.lock.Lock()
@ -717,6 +708,7 @@ func (dc *downstreamConn) register() error {
} }
} }
// TODO: we need to create a consumer when adding networks on-the-fly
consumer, ch := uc.ring.NewConsumer(seqPtr) consumer, ch := uc.ring.NewConsumer(seqPtr)
go func() { go func() {
for { for {
@ -734,6 +726,7 @@ func (dc *downstreamConn) register() error {
seq := consumer.Close() seq := consumer.Close()
// TODO: need to take dc.network into account here
dc.user.lock.Lock() dc.user.lock.Lock()
lastDownstream := len(dc.user.downstreamConns) == 0 lastDownstream := len(dc.user.downstreamConns) == 0
dc.user.lock.Unlock() dc.user.lock.Unlock()

19
user.go
View file

@ -176,6 +176,25 @@ func (u *user) run() {
} }
} }
func (u *user) addDownstream(dc *downstreamConn) (first bool) {
u.lock.Lock()
first = len(dc.user.downstreamConns) == 0
u.downstreamConns = append(u.downstreamConns, dc)
u.lock.Unlock()
return first
}
func (u *user) removeDownstream(dc *downstreamConn) {
u.lock.Lock()
for i := range u.downstreamConns {
if u.downstreamConns[i] == dc {
u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
break
}
}
u.lock.Unlock()
}
func (u *user) createNetwork(net *Network) (*network, error) { func (u *user) createNetwork(net *Network) (*network, error) {
network := newNetwork(u, net) network := newNetwork(u, net)
err := u.srv.db.StoreNetwork(u.Username, &network.Network) err := u.srv.db.StoreNetwork(u.Username, &network.Network)