Consume ring buffer for networks added on-the-fly

This commit is contained in:
Simon Ser 2020-03-25 11:28:25 +01:00
parent 293a0e8e20
commit 21241c2009
No known key found for this signature in database
GPG key ID: 0FDE7BE0E88F5E48
2 changed files with 71 additions and 47 deletions

View file

@ -702,58 +702,70 @@ func (dc *downstreamConn) register() error {
})
dc.forEachNetwork(func(net *network) {
historyName := dc.rawUsername
// TODO: need to take dc.network into account here
var seqPtr *uint64
if firstDownstream {
net.lock.Lock()
seq, ok := net.history[historyName]
net.lock.Unlock()
if ok {
seqPtr = &seq
}
}
// TODO: we need to create a consumer when adding networks on-the-fly
consumer, ch := net.ring.NewConsumer(seqPtr)
go func() {
for {
var closed bool
select {
case <-ch:
uc := net.upstream()
if uc == nil {
dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr)
break
}
dc.ringMessages <- ringMessage{consumer, uc}
case <-dc.closed:
closed = true
}
if closed {
break
}
}
seq := consumer.Close()
// TODO: need to take dc.network into account here
dc.user.lock.Lock()
lastDownstream := len(dc.user.downstreamConns) == 0
dc.user.lock.Unlock()
if lastDownstream {
net.lock.Lock()
net.history[historyName] = seq
net.lock.Unlock()
}
}()
// TODO: need to take dc.network into account when deciding whether or
// not to load history
dc.runNetwork(net, firstDownstream)
})
return nil
}
// runNetwork starts listening for messages coming from the network's ring
// buffer.
//
// It panics if the network is not suitable for the downstream connection.
func (dc *downstreamConn) runNetwork(net *network, loadHistory bool) {
if dc.network != nil && net != dc.network {
panic("network not suitable for downstream connection")
}
historyName := dc.rawUsername
var seqPtr *uint64
if loadHistory {
net.lock.Lock()
seq, ok := net.history[historyName]
net.lock.Unlock()
if ok {
seqPtr = &seq
}
}
consumer, ch := net.ring.NewConsumer(seqPtr)
go func() {
for {
var closed bool
select {
case <-ch:
uc := net.upstream()
if uc == nil {
dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr)
break
}
dc.ringMessages <- ringMessage{consumer, uc}
case <-dc.closed:
closed = true
}
if closed {
break
}
}
seq := consumer.Close()
// TODO: need to take dc.network into account here
dc.user.lock.Lock()
lastDownstream := len(dc.user.downstreamConns) == 0
dc.user.lock.Unlock()
if lastDownstream {
net.lock.Lock()
net.history[historyName] = seq
net.lock.Unlock()
}
}()
}
func (dc *downstreamConn) runUntilRegistered() error {
for !dc.registered {
msg, err := dc.irc.ReadMessage()

12
user.go
View file

@ -198,14 +198,26 @@ func (u *user) removeDownstream(dc *downstreamConn) {
}
func (u *user) createNetwork(net *Network) (*network, error) {
if net.ID != 0 {
panic("tried creating an already-existing network")
}
network := newNetwork(u, net)
err := u.srv.db.StoreNetwork(u.Username, &network.Network)
if err != nil {
return nil, err
}
u.forEachDownstream(func(dc *downstreamConn) {
if dc.network == nil {
dc.runNetwork(network, false)
}
})
u.lock.Lock()
u.networks = append(u.networks, network)
u.lock.Unlock()
go network.run()
return network, nil
}