diff --git a/doc/architecture.md b/doc/architecture.md index 3484185..7eccccf 100644 --- a/doc/architecture.md +++ b/doc/architecture.md @@ -31,7 +31,3 @@ race condition or inconsistent state) and to rate-limit each user. The user dispatcher goroutine receives from the `user.events` channel. Upstream and downstream message handlers are called from this goroutine, thus they can safely access both upstream and downstream state. - -In addition to these goroutines, each downstream connection also has one -goroutine per network to handle new upstream messages coming from the ring -buffer. diff --git a/downstream.go b/downstream.go index 9cb9b39..2f0dc1c 100644 --- a/downstream.go +++ b/downstream.go @@ -233,6 +233,29 @@ func (dc *downstreamConn) SendMessage(msg *irc.Message) { dc.conn.SendMessage(msg) } +func (dc *downstreamConn) sendFromUpstream(msg *irc.Message, uc *upstreamConn) { + dc.lock.Lock() + _, ours := dc.ourMessages[msg] + delete(dc.ourMessages, msg) + dc.lock.Unlock() + if ours && !dc.getCap("echo-message") { + // The message comes from our connection, don't echo it + // back + return + } + + msg = msg.Copy() + switch msg.Command { + case "PRIVMSG", "NOTICE": + msg.Prefix = dc.marshalUserPrefix(uc, msg.Prefix) + msg.Params[0] = dc.marshalEntity(uc, msg.Params[0]) + default: + panic(fmt.Sprintf("unexpected %q message", msg.Command)) + } + + dc.SendMessage(msg) +} + func (dc *downstreamConn) handleMessage(msg *irc.Message) error { switch msg.Command { case "QUIT": @@ -663,77 +686,45 @@ func (dc *downstreamConn) welcome() error { }) dc.forEachNetwork(func(net *network) { - dc.runNetwork(net, sendHistory) + var seqPtr *uint64 + if sendHistory { + seq, ok := net.history[dc.clientName] + if ok { + seqPtr = &seq + } + } + + consumer, _ := net.ring.NewConsumer(seqPtr) + + if _, ok := dc.ringConsumers[net]; ok { + panic("network has been added twice") + } + dc.ringConsumers[net] = consumer + + // TODO: this means all history is lost when trying to send it while the + // upstream is disconnected. We need to store history differently so that + // we don't need access to upstreamConn to forward it to a downstream + // client. + uc := net.upstream() + if uc == nil { + dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr) + return + } + + for { + msg := consumer.Peek() + if msg == nil { + break + } + + dc.sendFromUpstream(msg, uc) + consumer.Consume() + } }) return nil } -// runNetwork starts listening for messages coming from the network's ring -// buffer. -// -// It panics if the network is not suitable for the downstream connection. -func (dc *downstreamConn) runNetwork(net *network, loadHistory bool) { - if dc.network != nil && net != dc.network { - panic("network not suitable for downstream connection") - } - - var seqPtr *uint64 - if loadHistory { - seq, ok := net.history[dc.clientName] - if ok { - seqPtr = &seq - } - } - - consumer, ch := net.ring.NewConsumer(seqPtr) - - if _, ok := dc.ringConsumers[net]; ok { - panic("network has been added twice") - } - dc.ringConsumers[net] = consumer - - go func() { - for range ch { - uc := net.upstream() - if uc == nil { - dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr) - continue - } - - for { - msg := consumer.Peek() - if msg == nil { - break - } - - dc.lock.Lock() - _, ours := dc.ourMessages[msg] - delete(dc.ourMessages, msg) - dc.lock.Unlock() - if ours && !dc.getCap("echo-message") { - // The message comes from our connection, don't echo it - // back - consumer.Consume() - continue - } - - msg = msg.Copy() - switch msg.Command { - case "PRIVMSG", "NOTICE": - msg.Prefix = dc.marshalUserPrefix(uc, msg.Prefix) - msg.Params[0] = dc.marshalEntity(uc, msg.Params[0]) - default: - panic(fmt.Sprintf("unexpected %q message", msg.Command)) - } - - dc.SendMessage(msg) - consumer.Consume() - } - } - }() -} - func (dc *downstreamConn) runUntilRegistered() error { for !dc.registered { msg, err := dc.ReadMessage() diff --git a/upstream.go b/upstream.go index 3bb6747..2e216ca 100644 --- a/upstream.go +++ b/upstream.go @@ -1365,6 +1365,10 @@ func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) { func (uc *upstreamConn) produce(msg *irc.Message) { uc.network.ring.Produce(msg) + + uc.forEachDownstream(func(dc *downstreamConn) { + dc.sendFromUpstream(dc.ringConsumers[uc.network].Consume(), uc) + }) } func (uc *upstreamConn) updateAway() { diff --git a/user.go b/user.go index 746ed72..ee4d4f8 100644 --- a/user.go +++ b/user.go @@ -351,7 +351,8 @@ func (u *user) createNetwork(net *Network) (*network, error) { u.forEachDownstream(func(dc *downstreamConn) { if dc.network == nil { - dc.runNetwork(network, false) + consumer, _ := network.ring.NewConsumer(nil) + dc.ringConsumers[network] = consumer } }) @@ -375,6 +376,7 @@ func (u *user) deleteNetwork(id int64) error { if dc.network != nil && dc.network == net { dc.Close() } + delete(dc.ringConsumers, net) }) net.Stop()