Implement delivery receipts via PING messages

This patch implements basic message delivery receipts via PING and PONG.

When a PRIVMSG or NOTICE message is sent, a PING message with a token is
also sent. The history cursor isn't immediately advanced, instead the
bouncer will wait for a PONG message before doing so.

Self-messages trigger a PING for simplicity's sake. We can't immediately
advance the history cursor in this case, because a prior message might
still have an outstanding PING.

Future work may include optimizations such as removing the need to send
a PING after a self-message, or groupping multiple PING messages
together.

Closes: https://todo.sr.ht/~emersion/soju/11
This commit is contained in:
Simon Ser 2020-08-28 17:21:08 +02:00
parent e4d2ddb377
commit e797d90c59
No known key found for this signature in database
GPG key ID: 0FDE7BE0E88F5E48
2 changed files with 102 additions and 20 deletions

View file

@ -289,6 +289,73 @@ func (dc *downstreamConn) SendMessage(msg *irc.Message) {
dc.conn.SendMessage(msg) dc.conn.SendMessage(msg)
} }
// sendMessageWithID sends an outgoing message with the specified internal ID.
func (dc *downstreamConn) sendMessageWithID(msg *irc.Message, id string) {
dc.SendMessage(msg)
if id == "" || !dc.messageSupportsHistory(msg) {
return
}
dc.sendPing(id)
}
// advanceMessageWithID advances history to the specified message ID without
// sending a message. This is useful e.g. for self-messages when echo-message
// isn't enabled.
func (dc *downstreamConn) advanceMessageWithID(msg *irc.Message, id string) {
if id == "" || !dc.messageSupportsHistory(msg) {
return
}
dc.sendPing(id)
}
// ackMsgID acknowledges that a message has been received.
func (dc *downstreamConn) ackMsgID(id string) {
netName, entity, _, _, err := parseMsgID(id)
if err != nil {
dc.logger.Printf("failed to ACK message ID %q: %v", id, err)
return
}
network := dc.user.getNetwork(netName)
if network == nil {
return
}
history, ok := network.history[entity]
if !ok {
return
}
history.clients[dc.clientName] = id
}
func (dc *downstreamConn) sendPing(msgID string) {
token := "soju-msgid-" + base64.RawURLEncoding.EncodeToString([]byte(msgID))
dc.SendMessage(&irc.Message{
Command: "PING",
Params: []string{token},
})
}
func (dc *downstreamConn) handlePong(token string) {
if !strings.HasPrefix(token, "soju-msgid-") {
dc.logger.Printf("received unrecognized PONG token %q", token)
return
}
token = strings.TrimPrefix(token, "soju-msgid-")
b, err := base64.RawURLEncoding.DecodeString(token)
if err != nil {
dc.logger.Printf("received malformed PONG token: %v", err)
return
}
msgID := string(b)
dc.ackMsgID(msgID)
}
// marshalMessage re-formats a message coming from an upstream connection so // marshalMessage re-formats a message coming from an upstream connection so
// that it's suitable for being sent on this downstream connection. Only // that it's suitable for being sent on this downstream connection. Only
// messages that may appear in logs are supported, except MODE. // messages that may appear in logs are supported, except MODE.
@ -875,6 +942,20 @@ func (dc *downstreamConn) welcome() error {
return nil return nil
} }
// messageSupportsHistory checks whether the provided message can be sent as
// part of an history batch.
func (dc *downstreamConn) messageSupportsHistory(msg *irc.Message) bool {
// Don't replay all messages, because that would mess up client
// state. For instance we just sent the list of users, sending
// PART messages for one of these users would be incorrect.
// TODO: add support for draft/event-playback
switch msg.Command {
case "PRIVMSG", "NOTICE":
return true
}
return false
}
func (dc *downstreamConn) sendNetworkHistory(net *network) { func (dc *downstreamConn) sendNetworkHistory(net *network) {
if dc.caps["draft/chathistory"] || dc.user.msgStore == nil { if dc.caps["draft/chathistory"] || dc.user.msgStore == nil {
return return
@ -906,15 +987,7 @@ func (dc *downstreamConn) sendNetworkHistory(net *network) {
} }
for _, msg := range history { for _, msg := range history {
// Don't replay all messages, because that would mess up client if !dc.messageSupportsHistory(msg) {
// state. For instance we just sent the list of users, sending
// PART messages for one of these users would be incorrect.
ignore := true
switch msg.Command {
case "PRIVMSG", "NOTICE":
ignore = false
}
if ignore {
continue continue
} }
@ -983,6 +1056,12 @@ func (dc *downstreamConn) handleMessageRegistered(msg *irc.Message) error {
Params: []string{dc.srv.Hostname, source}, Params: []string{dc.srv.Hostname, source},
}) })
return nil return nil
case "PONG":
if len(msg.Params) == 0 {
return newNeedMoreParamsError(msg.Command)
}
token := msg.Params[len(msg.Params)-1]
dc.handlePong(token)
case "USER": case "USER":
return ircError{&irc.Message{ return ircError{&irc.Message{
Command: irc.ERR_ALREADYREGISTERED, Command: irc.ERR_ALREADYREGISTERED,

View file

@ -1607,9 +1607,13 @@ func (uc *upstreamConn) SendMessageLabeled(downstreamID uint64, msg *irc.Message
uc.SendMessage(msg) uc.SendMessage(msg)
} }
func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) { // appendLog appends a message to the log file.
//
// The internal message ID is returned. If the message isn't recorded in the
// log file, an empty string is returned.
func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) (msgID string) {
if uc.user.msgStore == nil { if uc.user.msgStore == nil {
return return ""
} }
detached := false detached := false
@ -1622,7 +1626,7 @@ func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) {
lastID, err := uc.user.msgStore.LastMsgID(uc.network, entity, time.Now()) lastID, err := uc.user.msgStore.LastMsgID(uc.network, entity, time.Now())
if err != nil { if err != nil {
uc.logger.Printf("failed to log message: failed to get last message ID: %v", err) uc.logger.Printf("failed to log message: failed to get last message ID: %v", err)
return return ""
} }
history = &networkHistory{ history = &networkHistory{
@ -1646,14 +1650,10 @@ func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) {
msgID, err := uc.user.msgStore.Append(uc.network, entity, msg) msgID, err := uc.user.msgStore.Append(uc.network, entity, msg)
if err != nil { if err != nil {
uc.logger.Printf("failed to log message: %v", err) uc.logger.Printf("failed to log message: %v", err)
return return ""
} }
if !detached && msgID != "" { return msgID
uc.forEachDownstream(func(dc *downstreamConn) {
history.clients[dc.clientName] = msgID
})
}
} }
// produce appends a message to the logs and forwards it to connected downstream // produce appends a message to the logs and forwards it to connected downstream
@ -1662,8 +1662,9 @@ func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) {
// If origin is not nil and origin doesn't support echo-message, the message is // If origin is not nil and origin doesn't support echo-message, the message is
// forwarded to all connections except origin. // forwarded to all connections except origin.
func (uc *upstreamConn) produce(target string, msg *irc.Message, origin *downstreamConn) { func (uc *upstreamConn) produce(target string, msg *irc.Message, origin *downstreamConn) {
var msgID string
if target != "" { if target != "" {
uc.appendLog(target, msg) msgID = uc.appendLog(target, msg)
} }
// Don't forward messages if it's a detached channel // Don't forward messages if it's a detached channel
@ -1673,7 +1674,9 @@ func (uc *upstreamConn) produce(target string, msg *irc.Message, origin *downstr
uc.forEachDownstream(func(dc *downstreamConn) { uc.forEachDownstream(func(dc *downstreamConn) {
if dc != origin || dc.caps["echo-message"] { if dc != origin || dc.caps["echo-message"] {
dc.SendMessage(dc.marshalMessage(msg, uc.network)) dc.sendMessageWithID(dc.marshalMessage(msg, uc.network), msgID)
} else {
dc.advanceMessageWithID(msg, msgID)
} }
}) })
} }