From cae248f672055159348b3ca40515b7627839fbea Mon Sep 17 00:00:00 2001 From: delthas Date: Fri, 29 Jan 2021 16:57:38 +0100 Subject: [PATCH] Add support for the wip soju.im/read capability and READ command READ lets downstream clients share information between each other about what messages have been read by other downstreams. Each target/entity has an optional corresponding read receipt, which is stored as a timestamp. - When a downstream sends: READ #chan timestamp=2020-01-01T01:23:45.000Z the read receipt for that target is set to that date - soju sends READ to downstreams: - on JOIN, if the client uses the soju.im/read capability - when the read receipt timestamp is set by any downstream The read receipt date is clamped by the previous receipt date and the current time. --- bridge.go | 23 ++++++++- db.go | 9 ++++ db_postgres.go | 58 ++++++++++++++++++++++ db_sqlite.go | 101 ++++++++++++++++++++++++++++++++++++++ doc/read.md | 128 +++++++++++++++++++++++++++++++++++++++++++++++++ downstream.go | 87 ++++++++++++++++++++++++++++++++- upstream.go | 4 +- user.go | 6 +-- 8 files changed, 408 insertions(+), 8 deletions(-) create mode 100644 doc/read.md diff --git a/bridge.go b/bridge.go index bd02969..dc8ef75 100644 --- a/bridge.go +++ b/bridge.go @@ -1,13 +1,15 @@ package soju import ( + "context" + "fmt" "strconv" "strings" "gopkg.in/irc.v3" ) -func forwardChannel(dc *downstreamConn, ch *upstreamChannel) { +func forwardChannel(ctx context.Context, dc *downstreamConn, ch *upstreamChannel) { if !ch.complete { panic("Tried to forward a partial channel") } @@ -16,6 +18,25 @@ func forwardChannel(dc *downstreamConn, ch *upstreamChannel) { if ch.Topic != "" { sendTopic(dc, ch) } + + if dc.caps["soju.im/read"] { + channelCM := ch.conn.network.casemap(ch.Name) + r, err := dc.srv.db.GetReadReceipt(ctx, ch.conn.network.ID, channelCM) + if err != nil { + dc.logger.Printf("failed to get the read receipt for %q: %v", ch.Name, err) + } else { + timestampStr := "*" + if r != nil { + timestampStr = fmt.Sprintf("timestamp=%s", r.Timestamp.UTC().Format(serverTimeLayout)) + } + dc.SendMessage(&irc.Message{ + Prefix: dc.prefix(), + Command: "READ", + Params: []string{dc.marshalEntity(ch.conn.network, ch.Name), timestampStr}, + }) + } + } + sendNames(dc, ch) } diff --git a/db.go b/db.go index 41dd42e..a227978 100644 --- a/db.go +++ b/db.go @@ -28,6 +28,9 @@ type Database interface { ListDeliveryReceipts(ctx context.Context, networkID int64) ([]DeliveryReceipt, error) StoreClientDeliveryReceipts(ctx context.Context, networkID int64, client string, receipts []DeliveryReceipt) error + + GetReadReceipt(ctx context.Context, networkID int64, name string) (*ReadReceipt, error) + StoreReadReceipt(ctx context.Context, networkID int64, receipt *ReadReceipt) error } type MetricsCollectorDatabase interface { @@ -180,3 +183,9 @@ type DeliveryReceipt struct { Client string InternalMsgID string } + +type ReadReceipt struct { + ID int64 + Target string // channel or nick + Timestamp time.Time +} diff --git a/db_postgres.go b/db_postgres.go index ce27eec..008fdef 100644 --- a/db_postgres.go +++ b/db_postgres.go @@ -77,6 +77,14 @@ CREATE TABLE "DeliveryReceipt" ( internal_msgid VARCHAR(255) NOT NULL, UNIQUE(network, target, client) ); + +CREATE TABLE "ReadReceipt" ( + id SERIAL PRIMARY KEY, + network INTEGER NOT NULL REFERENCES "Network"(id) ON DELETE CASCADE, + target VARCHAR(255) NOT NULL, + timestamp TIMESTAMP WITH TIME ZONE NOT NULL, + UNIQUE(network, target) +); ` var postgresMigrations = []string{ @@ -89,6 +97,15 @@ var postgresMigrations = []string{ TYPE sasl_mechanism USING sasl_mechanism::sasl_mechanism; `, + ` + CREATE TABLE "ReadReceipt" ( + id SERIAL PRIMARY KEY, + network INTEGER NOT NULL REFERENCES "Network"(id) ON DELETE CASCADE, + target VARCHAR(255) NOT NULL, + timestamp TIMESTAMP WITH TIME ZONE NOT NULL, + UNIQUE(network, target) + ); + `, } type PostgresDB struct { @@ -500,3 +517,44 @@ func (db *PostgresDB) StoreClientDeliveryReceipts(ctx context.Context, networkID return tx.Commit() } + +func (db *PostgresDB) GetReadReceipt(ctx context.Context, networkID int64, name string) (*ReadReceipt, error) { + ctx, cancel := context.WithTimeout(ctx, postgresQueryTimeout) + defer cancel() + + receipt := &ReadReceipt{ + Target: name, + } + + row := db.db.QueryRowContext(ctx, + `SELECT id, timestamp FROM "ReadReceipt" WHERE network = $1 AND target = $2`, + networkID, name) + if err := row.Scan(&receipt.ID, &receipt.Timestamp); err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err + } + return receipt, nil +} + +func (db *PostgresDB) StoreReadReceipt(ctx context.Context, networkID int64, receipt *ReadReceipt) error { + ctx, cancel := context.WithTimeout(ctx, postgresQueryTimeout) + defer cancel() + + var err error + if receipt.ID != 0 { + _, err = db.db.ExecContext(ctx, ` + UPDATE "ReadReceipt" + SET timestamp = $1 + WHERE id = $2`, + receipt.Timestamp, receipt.ID) + } else { + err = db.db.QueryRowContext(ctx, ` + INSERT INTO "ReadReceipt" (network, target, timestamp) + VALUES ($1, $2, $3) + RETURNING id`, + networkID, receipt.Target, receipt.Timestamp).Scan(&receipt.ID) + } + return err +} diff --git a/db_sqlite.go b/db_sqlite.go index 2f4d99f..4929b58 100644 --- a/db_sqlite.go +++ b/db_sqlite.go @@ -70,6 +70,15 @@ CREATE TABLE DeliveryReceipt ( FOREIGN KEY(network) REFERENCES Network(id), UNIQUE(network, target, client) ); + +CREATE TABLE ReadReceipt ( + id INTEGER PRIMARY KEY, + network INTEGER NOT NULL, + target TEXT NOT NULL, + timestamp TEXT NOT NULL, + FOREIGN KEY(network) REFERENCES Network(id), + UNIQUE(network, target) +); ` var sqliteMigrations = []string{ @@ -170,6 +179,16 @@ var sqliteMigrations = []string{ DROP TABLE Network; ALTER TABLE NetworkNew RENAME TO Network; `, + ` + CREATE TABLE ReadReceipt ( + id INTEGER PRIMARY KEY, + network INTEGER NOT NULL, + target TEXT NOT NULL, + timestamp TEXT NOT NULL, + FOREIGN KEY(network) REFERENCES Network(id), + UNIQUE(network, target) + ); + `, } type SqliteDB struct { @@ -383,6 +402,17 @@ func (db *SqliteDB) DeleteUser(ctx context.Context, id int64) error { return err } + _, err = tx.ExecContext(ctx, `DELETE FROM ReadReceipt + WHERE id IN ( + SELECT ReadReceipt.id + FROM ReadReceipt + JOIN Network ON ReadReceipt.network = Network.id + WHERE Network.user = ? + )`, id) + if err != nil { + return err + } + _, err = tx.ExecContext(ctx, `DELETE FROM Channel WHERE id IN ( SELECT Channel.id @@ -545,6 +575,11 @@ func (db *SqliteDB) DeleteNetwork(ctx context.Context, id int64) error { return err } + _, err = tx.ExecContext(ctx, "DELETE FROM ReadReceipt WHERE network = ?", id) + if err != nil { + return err + } + _, err = tx.ExecContext(ctx, "DELETE FROM Channel WHERE network = ?", id) if err != nil { return err @@ -719,3 +754,69 @@ func (db *SqliteDB) StoreClientDeliveryReceipts(ctx context.Context, networkID i return tx.Commit() } + +func (db *SqliteDB) GetReadReceipt(ctx context.Context, networkID int64, name string) (*ReadReceipt, error) { + db.lock.RLock() + defer db.lock.RUnlock() + + ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout) + defer cancel() + + receipt := &ReadReceipt{ + Target: name, + } + + row := db.db.QueryRowContext(ctx, ` + SELECT id, timestamp FROM ReadReceipt WHERE network = :network AND target = :target`, + sql.Named("network", networkID), + sql.Named("target", name), + ) + var timestamp string + if err := row.Scan(&receipt.ID, ×tamp); err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err + } + if t, err := time.Parse(serverTimeLayout, timestamp); err != nil { + return nil, err + } else { + receipt.Timestamp = t + } + return receipt, nil +} + +func (db *SqliteDB) StoreReadReceipt(ctx context.Context, networkID int64, receipt *ReadReceipt) error { + db.lock.Lock() + defer db.lock.Unlock() + + ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout) + defer cancel() + + args := []interface{}{ + sql.Named("id", receipt.ID), + sql.Named("timestamp", receipt.Timestamp.UTC().Format(serverTimeLayout)), + sql.Named("network", networkID), + sql.Named("target", receipt.Target), + } + + var err error + if receipt.ID != 0 { + _, err = db.db.ExecContext(ctx, ` + UPDATE ReadReceipt SET timestamp = :timestamp WHERE id = :id`, + args...) + } else { + var res sql.Result + res, err = db.db.ExecContext(ctx, ` + INSERT INTO + ReadReceipt(network, target, timestamp) + VALUES (:network, :target, :timestamp)`, + args...) + if err != nil { + return err + } + receipt.ID, err = res.LastInsertId() + } + + return err +} diff --git a/doc/read.md b/doc/read.md new file mode 100644 index 0000000..29df5d1 --- /dev/null +++ b/doc/read.md @@ -0,0 +1,128 @@ +# read + +This is a work-in-progress specification. + +## Description + +This document describes the format of the `read` extension. This enables several clients of the same user connected to a bouncer to tell each other about which messages have been read in each buffer (channel or query). + +These "read" receipts mean that the actual user has read the message, and is typically useful to clear highlight notifications on other clients. This specification is *not* about message delivery receipts at the client socket level. + +The server as mentioned in this document refers to the IRC bouncer the clients are connected to. No messages or capabilities introduced by this specification are exchanged with the actual upstream server the bouncer is connected to. + +## Implementation + +The `read` extension uses the `soju.im/read` capability and introduces a new command, `READ`. + +The `soju.im/read` capability MAY be negotiated, and affects which messages are sent by the server as specified below. + +### `READ` Command + +The `READ` command can be sent by both clients and servers. + +This command has the following general syntax: + + READ [] + +The `target` parameter specifies a single buffer (channel or nickname). + +The `timestamp` parameter, if specified, MUST be a literal `*`, or have the format `timestamp=YYYY-MM-DDThh:mm:ss.sssZ`, as in the [server-time](https://github.com/ircv3/ircv3-specifications/blob/master/extensions/server-time-3.2.md) extension. + +#### `READ` client set command + + READ + +When sent from a client, this `READ` command signals to the server that the last message read by the user, to the best knowledge of the client, has the specified timestamp. The timestamp MUST correspond to a previous message `time` tag. The timestamp MUST NOT be a literal `*`. + +The server MUST reply to a successful `READ` set command using a `READ` server command, or using an error message. + +#### `READ` client get command + + READ + +When sent from a client, this `READ` command requests the server about the timestamp of the last message read by the user. + +The server MUST reply to a successful `READ` get command using a `READ` server command, or using an error message. + +#### `READ` server command + +When sent from a server, the `READ` command signals to the client that the last message read by the user, to the best knowledge of the server, has the specified timestamp. In that case, the command has the following syntax: + + READ + +The `prefix` is the prefix of the client the message is sent to. + +If there is no known last message read timestamp, the `timestamp` parameter is a literal `*`. Otherwise, it is the formatted timestamp of the last read message. + +#### Command flows + +The server sends a `READ` command to a client in the following cases. + +If the `soju.im/read` capability is negotiated, after the server sends a server `JOIN` command to the client for a corresponding channel, the server MUST send a `READ` command for that channel. The command MUST be sent before the `RPL_ENDOFNAMES` reply for that channel following the `JOIN`. + +If the `soju.im/read` capability is negotiated, after the last read timestamp of a target changes, the server SHOULD send a `READ` command for that target to all the clients of the user. + +#### Read timestamp notes + +The last read timestamp of a target SHOULD only ever increase. If a client sends a `READ` command with a timestamp that is below or equal to the current known timestamp of the server, the server SHOULD reply with a `READ` command with the newer, previous value that was stored and ignore the client timestamp. + +#### Errors and Warnings + +Errors are returned using the standard replies syntax. + +If the server receives a `READ` command with missing parameters, the `NEED_MORE_PARAMS` error code MUST be returned. + + FAIL READ NEED_MORE_PARAMS :Missing parameters + +If the selectors were invalid, the `INVALID_PARAMS` error code SHOULD be returned. + + FAIL READ INVALID_PARAMS [invalid_parameters] :Invalid parameters + +If the read timestamp cannot be set or returned due to an error, the `INTERNAL_ERROR` error code SHOULD be returned. + + FAIL READ INTERNAL_ERROR the_given_target [extra_context] :The read timestamp could not be set + +### Examples + +Updating the read timestamp after the user receives and reads a message +~~~~ +[s] @2019-01-04T14:33:26.123Z :nick!ident@host PRIVMSG #channel :message +[c] READ #channel timestamp=2019-01-04T14:33:26.123Z +[s] :irc.host READ #channel timestamp=2019-01-04T14:33:26.123Z +~~~~ + +Getting the read timestamp automatically after joining a channel when the capability is negotiated +~~~~ +[s] :nick!ident@host JOIN #channel +[s] :irc.host READ #channel timestamp=2019-01-04T14:33:26.123Z +~~~~ + +Getting the read timestamp automatically for a channel without any set timestamp +~~~~ +[s] :nick!ident@host JOIN #channel +[s] :irc.host READ #channel * +~~~~ + +Asking the server about the read timestamp for a particular user +~~~~ +[c] READ target +[s] :irc.host READ target timestamp=2019-01-04T14:33:26.123Z +~~~~ + +## Use Cases + +Clients can know whether a user has already read newly received messages. For clients that display notifications about new messages or highlights, knowing when messages have been read can enable them to clear notifications for messages that were already read on another device. + +Clients never have to actively get the read timestamp because it is provided to them on join and as updated by the server, except for user targets where they have to request the initial read timestamp by sending a `READ` client get command. + +## Implementation Considerations + +Server implementations can typically store a per-target timestamp variable that stores the timestamp of the last read message. When it receives a new timestamp, it can clamp it between the last read timestamp and the current time, and broadcast the new value to all clients if it was changed. + +Client implementations can know when a user has read messages by using various techniques such as when the focus shifts to their window or activity, when the messages are scrolled, when the user is idle, etc. They should not assume that any message appended to the buffer is being read by the client right now, especially when the window does not have the focus or is not visible. It is indeed a best-effort value. + +Clients should typically only need to use the `READ` get client command to get the initial read timestamp of user buffers they open. They will automatically receive initial channels read timestamps and updates, as well as user target timestamp updates. + +## Security Considerations + +No last read timestamp is ever exchanged with the actual upstream server the bouncer is connected to, so there is no privacy risk that the server might leak or use this read data to infer when the user is online. diff --git a/downstream.go b/downstream.go index b5caa8c..e2191a6 100644 --- a/downstream.go +++ b/downstream.go @@ -225,6 +225,7 @@ var permanentDownstreamCaps = map[string]string{ "soju.im/bouncer-networks": "", "soju.im/bouncer-networks-notify": "", + "soju.im/read": "", } // needAllDownstreamCaps is the list of downstream capabilities that @@ -553,6 +554,9 @@ func (dc *downstreamConn) SendMessage(msg *irc.Message) { if msg.Command == "ACCOUNT" && !dc.caps["account-notify"] { return } + if msg.Command == "READ" && !dc.caps["soju.im/read"] { + return + } dc.srv.metrics.downstreamOutMessagesTotal.Inc() dc.conn.SendMessage(context.TODO(), msg) @@ -1469,7 +1473,7 @@ func (dc *downstreamConn) welcome(ctx context.Context) error { Params: []string{dc.marshalEntity(ch.conn.network, ch.Name)}, }) - forwardChannel(dc, ch) + forwardChannel(ctx, dc, ch) } }) @@ -1817,7 +1821,7 @@ func (dc *downstreamConn) handleMessageRegistered(ctx context.Context, msg *irc. if key != "" { ch.Key = key } - uc.network.attach(ch) + uc.network.attach(ctx, ch) } else { ch = &Channel{ Name: upstreamName, @@ -2750,6 +2754,85 @@ func (dc *downstreamConn) handleMessageRegistered(ctx context.Context, msg *irc. dc.SendMessage(dc.marshalMessage(msg, network)) } }) + case "READ": + var target, criteria string + if err := parseMessageParams(msg, &target); err != nil { + return ircError{&irc.Message{ + Command: "FAIL", + Params: []string{"READ", "NEED_MORE_PARAMS", "Missing parameters"}, + }} + } + if len(msg.Params) > 1 { + criteria = msg.Params[1] + } + + uc, entity, err := dc.unmarshalEntity(target) + if err != nil { + return err + } + entityCM := uc.network.casemap(entity) + + r, err := dc.srv.db.GetReadReceipt(ctx, uc.network.ID, entityCM) + if err != nil { + dc.logger.Printf("failed to get the read receipt for %q: %v", entity, err) + return ircError{&irc.Message{ + Command: "FAIL", + Params: []string{"READ", "INTERNAL_ERROR", target, "Internal error"}, + }} + } else if r == nil { + r = &ReadReceipt{ + Target: entityCM, + } + } + + broadcast := false + if len(criteria) > 0 { + // TODO: support msgid criteria + criteriaParts := strings.SplitN(criteria, "=", 2) + if len(criteriaParts) != 2 || criteriaParts[0] != "timestamp" { + return ircError{&irc.Message{ + Command: "FAIL", + Params: []string{"READ", "INVALID_PARAMS", criteria, "Unknown criteria"}, + }} + } + + timestamp, err := time.Parse(serverTimeLayout, criteriaParts[1]) + if err != nil { + return ircError{&irc.Message{ + Command: "FAIL", + Params: []string{"READ", "INVALID_PARAMS", criteria, "Invalid criteria"}, + }} + } + now := time.Now() + if timestamp.After(now) { + timestamp = now + } + if r.Timestamp.Before(timestamp) { + r.Timestamp = timestamp + if err := dc.srv.db.StoreReadReceipt(ctx, uc.network.ID, r); err != nil { + dc.logger.Printf("failed to store receipt for %q: %v", entity, err) + return ircError{&irc.Message{ + Command: "FAIL", + Params: []string{"READ", "INTERNAL_ERROR", target, "Internal error"}, + }} + } + broadcast = true + } + } + + timestampStr := "*" + if !r.Timestamp.IsZero() { + timestampStr = fmt.Sprintf("timestamp=%s", r.Timestamp.UTC().Format(serverTimeLayout)) + } + uc.forEachDownstream(func(d *downstreamConn) { + if broadcast || dc.id == d.id { + d.SendMessage(&irc.Message{ + Prefix: d.prefix(), + Command: "READ", + Params: []string{d.marshalEntity(uc.network, entity), timestampStr}, + }) + } + }) case "BOUNCER": var subcommand string if err := parseMessageParams(msg, &subcommand); err != nil { diff --git a/upstream.go b/upstream.go index 81509d6..c6a4bb3 100644 --- a/upstream.go +++ b/upstream.go @@ -1337,7 +1337,7 @@ func (uc *upstreamConn) handleMessage(ctx context.Context, msg *irc.Message) err c := uc.network.channels.Value(name) if c == nil || !c.Detached { uc.forEachDownstream(func(dc *downstreamConn) { - forwardChannel(dc, ch) + forwardChannel(ctx, dc, ch) }) } case irc.RPL_WHOREPLY: @@ -1768,7 +1768,7 @@ func (uc *upstreamConn) handleDetachedMessage(ctx context.Context, ch *Channel, }) } if ch.ReattachOn == FilterMessage || (ch.ReattachOn == FilterHighlight && uc.network.isHighlight(msg)) { - uc.network.attach(ch) + uc.network.attach(ctx, ch) if err := uc.srv.db.StoreChannel(ctx, uc.network.ID, ch); err != nil { uc.logger.Printf("failed to update channel %q: %v", ch.Name, err) } diff --git a/user.go b/user.go index fefe654..d31483b 100644 --- a/user.go +++ b/user.go @@ -303,7 +303,7 @@ func (net *network) detach(ch *Channel) { }) } -func (net *network) attach(ch *Channel) { +func (net *network) attach(ctx context.Context, ch *Channel) { if !ch.Detached { return } @@ -329,11 +329,11 @@ func (net *network) attach(ch *Channel) { }) if uch != nil { - forwardChannel(dc, uch) + forwardChannel(ctx, dc, uch) } if detachedMsgID != "" { - dc.sendTargetBacklog(context.TODO(), net, ch.Name, detachedMsgID) + dc.sendTargetBacklog(ctx, net, ch.Name, detachedMsgID) } }) }