From 03f89723050d640468da3c4ff929f90a617d3de2 Mon Sep 17 00:00:00 2001 From: delthas Date: Sat, 9 Oct 2021 00:13:16 +0200 Subject: [PATCH] Add support for draft/event-playback --- downstream.go | 38 ++++++----- msgstore.go | 11 +++- msgstore_fs.go | 155 +++++++++++++++++++++++++++++++++++---------- msgstore_memory.go | 6 ++ 4 files changed, 160 insertions(+), 50 deletions(-) diff --git a/downstream.go b/downstream.go index 775dadb..8603eab 100644 --- a/downstream.go +++ b/downstream.go @@ -518,7 +518,7 @@ func (dc *downstreamConn) SendBatch(typ string, params []string, tags irc.Tags, func (dc *downstreamConn) sendMessageWithID(msg *irc.Message, id string) { dc.SendMessage(msg) - if id == "" || !dc.messageSupportsHistory(msg) { + if id == "" || !dc.messageSupportsBacklog(msg) { return } @@ -529,7 +529,7 @@ func (dc *downstreamConn) sendMessageWithID(msg *irc.Message, id string) { // sending a message. This is useful e.g. for self-messages when echo-message // isn't enabled. func (dc *downstreamConn) advanceMessageWithID(msg *irc.Message, id string) { - if id == "" || !dc.messageSupportsHistory(msg) { + if id == "" || !dc.messageSupportsBacklog(msg) { return } @@ -571,8 +571,13 @@ func (dc *downstreamConn) handlePong(token string) { // marshalMessage re-formats a message coming from an upstream connection so // that it's suitable for being sent on this downstream connection. Only -// messages that may appear in logs are supported, except MODE. +// messages that may appear in logs are supported, except MODE messages which +// may only appear in single-upstream mode. func (dc *downstreamConn) marshalMessage(msg *irc.Message, net *network) *irc.Message { + if dc.network != nil { + return msg + } + msg = msg.Copy() msg.Prefix = dc.marshalUserPrefix(net, msg.Prefix) @@ -983,6 +988,12 @@ func (dc *downstreamConn) updateSupportedCaps() { dc.unsetSupportedCap(cap) } } + + if dc.srv.LogPath != "" && dc.network != nil { + dc.setSupportedCap("draft/event-playback", "") + } else { + dc.unsetSupportedCap("draft/event-playback") + } } func (dc *downstreamConn) updateNick() { @@ -1297,13 +1308,12 @@ func (dc *downstreamConn) welcome() error { return nil } -// messageSupportsHistory checks whether the provided message can be sent as +// messageSupportsBacklog checks whether the provided message can be sent as // part of an history batch. -func (dc *downstreamConn) messageSupportsHistory(msg *irc.Message) bool { +func (dc *downstreamConn) messageSupportsBacklog(msg *irc.Message) bool { // Don't replay all messages, because that would mess up client // state. For instance we just sent the list of users, sending // PART messages for one of these users would be incorrect. - // TODO: add support for draft/event-playback switch msg.Command { case "PRIVMSG", "NOTICE": return true @@ -1328,10 +1338,6 @@ func (dc *downstreamConn) sendTargetBacklog(net *network, target, msgID string) dc.SendBatch("chathistory", []string{dc.marshalEntity(net, target)}, nil, func(batchRef irc.TagValue) { for _, msg := range history { - if !dc.messageSupportsHistory(msg) { - continue - } - if ch != nil && ch.Detached { if net.detachedMessageNeedsRelay(ch, msg) { dc.relayDetachedMessage(net, msg) @@ -2326,21 +2332,23 @@ func (dc *downstreamConn) handleMessageRegistered(msg *irc.Message) error { }} } + eventPlayback := dc.caps["draft/event-playback"] + var history []*irc.Message switch subcommand { case "BEFORE": - history, err = store.LoadBeforeTime(network, entity, bounds[0], time.Time{}, limit) + history, err = store.LoadBeforeTime(network, entity, bounds[0], time.Time{}, limit, eventPlayback) case "AFTER": - history, err = store.LoadAfterTime(network, entity, bounds[0], time.Now(), limit) + history, err = store.LoadAfterTime(network, entity, bounds[0], time.Now(), limit, eventPlayback) case "BETWEEN": if bounds[0].Before(bounds[1]) { - history, err = store.LoadAfterTime(network, entity, bounds[0], bounds[1], limit) + history, err = store.LoadAfterTime(network, entity, bounds[0], bounds[1], limit, eventPlayback) } else { - history, err = store.LoadBeforeTime(network, entity, bounds[0], bounds[1], limit) + history, err = store.LoadBeforeTime(network, entity, bounds[0], bounds[1], limit, eventPlayback) } case "TARGETS": // TODO: support TARGETS in multi-upstream mode - targets, err := store.ListTargets(network, bounds[0], bounds[1], limit) + targets, err := store.ListTargets(network, bounds[0], bounds[1], limit, eventPlayback) if err != nil { dc.logger.Printf("failed fetching targets for chathistory: %v", err) return ircError{&irc.Message{ diff --git a/msgstore.go b/msgstore.go index 5bf4b2b..ff93f72 100644 --- a/msgstore.go +++ b/msgstore.go @@ -17,6 +17,8 @@ type messageStore interface { // date. The message ID returned may not refer to a valid message, but can be // used in history queries. LastMsgID(network *network, entity string, t time.Time) (string, error) + // LoadLatestID queries the latest non-event messages for the given network, + // entity and date, up to a count of limit messages, sorted from oldest to newest. LoadLatestID(network *network, entity, id string, limit int) ([]*irc.Message, error) Append(network *network, entity string, msg *irc.Message) (id string, err error) } @@ -34,15 +36,18 @@ type chatHistoryMessageStore interface { // ListTargets lists channels and nicknames by time of the latest message. // It returns up to limit targets, starting from start and ending on end, // both excluded. end may be before or after start. - ListTargets(network *network, start, end time.Time, limit int) ([]chatHistoryTarget, error) + // If events is false, only PRIVMSG/NOTICE messages are considered. + ListTargets(network *network, start, end time.Time, limit int, events bool) ([]chatHistoryTarget, error) // LoadBeforeTime loads up to limit messages before start down to end. The // returned messages must be between and excluding the provided bounds. // end is before start. - LoadBeforeTime(network *network, entity string, start, end time.Time, limit int) ([]*irc.Message, error) + // If events is false, only PRIVMSG/NOTICE messages are considered. + LoadBeforeTime(network *network, entity string, start, end time.Time, limit int, events bool) ([]*irc.Message, error) // LoadBeforeTime loads up to limit messages after start up to end. The // returned messages must be between and excluding the provided bounds. // end is after start. - LoadAfterTime(network *network, entity string, start, end time.Time, limit int) ([]*irc.Message, error) + // If events is false, only PRIVMSG/NOTICE messages are considered. + LoadAfterTime(network *network, entity string, start, end time.Time, limit int, events bool) ([]*irc.Message, error) } type msgIDType uint diff --git a/msgstore_fs.go b/msgstore_fs.go index 37ba2a1..ee3ac57 100644 --- a/msgstore_fs.go +++ b/msgstore_fs.go @@ -249,7 +249,7 @@ func formatMessage(msg *irc.Message) string { } } -func parseMessage(line, entity string, ref time.Time) (*irc.Message, time.Time, error) { +func parseMessage(line, entity string, ref time.Time, events bool) (*irc.Message, time.Time, error) { var hour, minute, second int _, err := fmt.Sscanf(line, "[%02d:%02d:%02d] ", &hour, &minute, &second) if err != nil { @@ -257,30 +257,121 @@ func parseMessage(line, entity string, ref time.Time) (*irc.Message, time.Time, } line = line[11:] - var cmd, sender, text string - if strings.HasPrefix(line, "<") { - cmd = "PRIVMSG" - parts := strings.SplitN(line[1:], "> ", 2) + var cmd string + var prefix *irc.Prefix + var params []string + if events && strings.HasPrefix(line, "*** ") { + parts := strings.SplitN(line[4:], " ", 2) if len(parts) != 2 { return nil, time.Time{}, nil } - sender, text = parts[0], parts[1] - } else if strings.HasPrefix(line, "-") { - cmd = "NOTICE" - parts := strings.SplitN(line[1:], "- ", 2) - if len(parts) != 2 { - return nil, time.Time{}, nil + switch parts[0] { + case "Joins:", "Parts:", "Quits:": + args := strings.SplitN(parts[1], " ", 3) + if len(args) < 2 { + return nil, time.Time{}, nil + } + nick := args[0] + mask := strings.TrimSuffix(strings.TrimPrefix(args[1], "("), ")") + maskParts := strings.SplitN(mask, "@", 2) + if len(maskParts) != 2 { + return nil, time.Time{}, nil + } + prefix = &irc.Prefix{ + Name: nick, + User: maskParts[0], + Host: maskParts[1], + } + var reason string + if len(args) > 2 { + reason = strings.TrimSuffix(strings.TrimPrefix(args[2], "("), ")") + } + switch parts[0] { + case "Joins:": + cmd = "JOIN" + params = []string{entity} + case "Parts:": + cmd = "PART" + if reason != "" { + params = []string{entity, reason} + } else { + params = []string{entity} + } + case "Quits:": + cmd = "QUIT" + if reason != "" { + params = []string{reason} + } + } + default: + nick := parts[0] + rem := parts[1] + if r := strings.TrimPrefix(rem, "is now known as "); r != rem { + cmd = "NICK" + prefix = &irc.Prefix{ + Name: nick, + } + params = []string{r} + } else if r := strings.TrimPrefix(rem, "was kicked by "); r != rem { + args := strings.SplitN(r, " ", 2) + if len(args) != 2 { + return nil, time.Time{}, nil + } + cmd = "KICK" + prefix = &irc.Prefix{ + Name: args[0], + } + reason := strings.TrimSuffix(strings.TrimPrefix(args[1], "("), ")") + params = []string{entity, nick} + if reason != "" { + params = append(params, reason) + } + } else if r := strings.TrimPrefix(rem, "changes topic to "); r != rem { + cmd = "TOPIC" + prefix = &irc.Prefix{ + Name: nick, + } + topic := strings.TrimSuffix(strings.TrimPrefix(r, "'"), "'") + params = []string{entity, topic} + } else if r := strings.TrimPrefix(rem, "sets mode: "); r != rem { + cmd = "MODE" + prefix = &irc.Prefix{ + Name: nick, + } + params = append([]string{entity}, strings.Split(r, " ")...) + } else { + return nil, time.Time{}, nil + } } - sender, text = parts[0], parts[1] - } else if strings.HasPrefix(line, "* ") { - cmd = "PRIVMSG" - parts := strings.SplitN(line[2:], " ", 2) - if len(parts) != 2 { - return nil, time.Time{}, nil - } - sender, text = parts[0], "\x01ACTION "+parts[1]+"\x01" } else { - return nil, time.Time{}, nil + var sender, text string + if strings.HasPrefix(line, "<") { + cmd = "PRIVMSG" + parts := strings.SplitN(line[1:], "> ", 2) + if len(parts) != 2 { + return nil, time.Time{}, nil + } + sender, text = parts[0], parts[1] + } else if strings.HasPrefix(line, "-") { + cmd = "NOTICE" + parts := strings.SplitN(line[1:], "- ", 2) + if len(parts) != 2 { + return nil, time.Time{}, nil + } + sender, text = parts[0], parts[1] + } else if strings.HasPrefix(line, "* ") { + cmd = "PRIVMSG" + parts := strings.SplitN(line[2:], " ", 2) + if len(parts) != 2 { + return nil, time.Time{}, nil + } + sender, text = parts[0], "\x01ACTION "+parts[1]+"\x01" + } else { + return nil, time.Time{}, nil + } + + prefix = &irc.Prefix{Name: sender} + params = []string{entity, text} } year, month, day := ref.Date() @@ -290,14 +381,14 @@ func parseMessage(line, entity string, ref time.Time) (*irc.Message, time.Time, Tags: map[string]irc.TagValue{ "time": irc.TagValue(t.UTC().Format(serverTimeLayout)), }, - Prefix: &irc.Prefix{Name: sender}, + Prefix: prefix, Command: cmd, - Params: []string{entity, text}, + Params: params, } return msg, t, nil } -func (ms *fsMessageStore) parseMessagesBefore(network *network, entity string, ref time.Time, end time.Time, limit int, afterOffset int64) ([]*irc.Message, error) { +func (ms *fsMessageStore) parseMessagesBefore(network *network, entity string, ref time.Time, end time.Time, events bool, limit int, afterOffset int64) ([]*irc.Message, error) { path := ms.logPath(network, entity, ref) f, err := os.Open(path) if err != nil { @@ -321,7 +412,7 @@ func (ms *fsMessageStore) parseMessagesBefore(network *network, entity string, r } for sc.Scan() { - msg, t, err := parseMessage(sc.Text(), entity, ref) + msg, t, err := parseMessage(sc.Text(), entity, ref, events) if err != nil { return nil, err } else if msg == nil || !t.After(end) { @@ -353,7 +444,7 @@ func (ms *fsMessageStore) parseMessagesBefore(network *network, entity string, r } } -func (ms *fsMessageStore) parseMessagesAfter(network *network, entity string, ref time.Time, end time.Time, limit int) ([]*irc.Message, error) { +func (ms *fsMessageStore) parseMessagesAfter(network *network, entity string, ref time.Time, end time.Time, events bool, limit int) ([]*irc.Message, error) { path := ms.logPath(network, entity, ref) f, err := os.Open(path) if err != nil { @@ -367,7 +458,7 @@ func (ms *fsMessageStore) parseMessagesAfter(network *network, entity string, re var history []*irc.Message sc := bufio.NewScanner(f) for sc.Scan() && len(history) < limit { - msg, t, err := parseMessage(sc.Text(), entity, ref) + msg, t, err := parseMessage(sc.Text(), entity, ref, events) if err != nil { return nil, err } else if msg == nil || !t.After(ref) { @@ -385,14 +476,14 @@ func (ms *fsMessageStore) parseMessagesAfter(network *network, entity string, re return history, nil } -func (ms *fsMessageStore) LoadBeforeTime(network *network, entity string, start time.Time, end time.Time, limit int) ([]*irc.Message, error) { +func (ms *fsMessageStore) LoadBeforeTime(network *network, entity string, start time.Time, end time.Time, limit int, events bool) ([]*irc.Message, error) { start = start.In(time.Local) end = end.In(time.Local) history := make([]*irc.Message, limit) remaining := limit tries := 0 for remaining > 0 && tries < fsMessageStoreMaxTries && end.Before(start) { - buf, err := ms.parseMessagesBefore(network, entity, start, end, remaining, -1) + buf, err := ms.parseMessagesBefore(network, entity, start, end, events, remaining, -1) if err != nil { return nil, err } @@ -410,14 +501,14 @@ func (ms *fsMessageStore) LoadBeforeTime(network *network, entity string, start return history[remaining:], nil } -func (ms *fsMessageStore) LoadAfterTime(network *network, entity string, start time.Time, end time.Time, limit int) ([]*irc.Message, error) { +func (ms *fsMessageStore) LoadAfterTime(network *network, entity string, start time.Time, end time.Time, limit int, events bool) ([]*irc.Message, error) { start = start.In(time.Local) end = end.In(time.Local) var history []*irc.Message remaining := limit tries := 0 for remaining > 0 && tries < fsMessageStoreMaxTries && start.Before(end) { - buf, err := ms.parseMessagesAfter(network, entity, start, end, remaining) + buf, err := ms.parseMessagesAfter(network, entity, start, end, events, remaining) if err != nil { return nil, err } @@ -460,7 +551,7 @@ func (ms *fsMessageStore) LoadLatestID(network *network, entity, id string, limi offset = afterOffset } - buf, err := ms.parseMessagesBefore(network, entity, t, time.Time{}, remaining, offset) + buf, err := ms.parseMessagesBefore(network, entity, t, time.Time{}, false, remaining, offset) if err != nil { return nil, err } @@ -478,7 +569,7 @@ func (ms *fsMessageStore) LoadLatestID(network *network, entity, id string, limi return history[remaining:], nil } -func (ms *fsMessageStore) ListTargets(network *network, start, end time.Time, limit int) ([]chatHistoryTarget, error) { +func (ms *fsMessageStore) ListTargets(network *network, start, end time.Time, limit int, events bool) ([]chatHistoryTarget, error) { start = start.In(time.Local) end = end.In(time.Local) rootPath := filepath.Join(ms.root, escapeFilename(network.GetName())) diff --git a/msgstore_memory.go b/msgstore_memory.go index 63ebb74..7a16488 100644 --- a/msgstore_memory.go +++ b/msgstore_memory.go @@ -74,6 +74,12 @@ func (ms *memoryMessageStore) LastMsgID(network *network, entity string, t time. } func (ms *memoryMessageStore) Append(network *network, entity string, msg *irc.Message) (string, error) { + switch msg.Command { + case "PRIVMSG", "NOTICE": + default: + return "", nil + } + k := ringBufferKey{networkID: network.ID, entity: entity} rb, ok := ms.buffers[k] if !ok {