Add support for the wip soju.im/read capability and READ command

READ lets downstream clients share information between each other about
what messages have been read by other downstreams.

Each target/entity has an optional corresponding read receipt, which is
stored as a timestamp.

- When a downstream sends:
  READ #chan timestamp=2020-01-01T01:23:45.000Z
  the read receipt for that target is set to that date
- soju sends READ to downstreams:
  - on JOIN, if the client uses the soju.im/read capability
  - when the read receipt timestamp is set by any downstream

The read receipt date is clamped by the previous receipt date and the
current time.
This commit is contained in:
delthas 2021-01-29 16:57:38 +01:00 committed by Simon Ser
parent 17cd3b3e98
commit cae248f672
8 changed files with 408 additions and 8 deletions

View file

@ -1,13 +1,15 @@
package soju package soju
import ( import (
"context"
"fmt"
"strconv" "strconv"
"strings" "strings"
"gopkg.in/irc.v3" "gopkg.in/irc.v3"
) )
func forwardChannel(dc *downstreamConn, ch *upstreamChannel) { func forwardChannel(ctx context.Context, dc *downstreamConn, ch *upstreamChannel) {
if !ch.complete { if !ch.complete {
panic("Tried to forward a partial channel") panic("Tried to forward a partial channel")
} }
@ -16,6 +18,25 @@ func forwardChannel(dc *downstreamConn, ch *upstreamChannel) {
if ch.Topic != "" { if ch.Topic != "" {
sendTopic(dc, ch) sendTopic(dc, ch)
} }
if dc.caps["soju.im/read"] {
channelCM := ch.conn.network.casemap(ch.Name)
r, err := dc.srv.db.GetReadReceipt(ctx, ch.conn.network.ID, channelCM)
if err != nil {
dc.logger.Printf("failed to get the read receipt for %q: %v", ch.Name, err)
} else {
timestampStr := "*"
if r != nil {
timestampStr = fmt.Sprintf("timestamp=%s", r.Timestamp.UTC().Format(serverTimeLayout))
}
dc.SendMessage(&irc.Message{
Prefix: dc.prefix(),
Command: "READ",
Params: []string{dc.marshalEntity(ch.conn.network, ch.Name), timestampStr},
})
}
}
sendNames(dc, ch) sendNames(dc, ch)
} }

9
db.go
View file

@ -28,6 +28,9 @@ type Database interface {
ListDeliveryReceipts(ctx context.Context, networkID int64) ([]DeliveryReceipt, error) ListDeliveryReceipts(ctx context.Context, networkID int64) ([]DeliveryReceipt, error)
StoreClientDeliveryReceipts(ctx context.Context, networkID int64, client string, receipts []DeliveryReceipt) error StoreClientDeliveryReceipts(ctx context.Context, networkID int64, client string, receipts []DeliveryReceipt) error
GetReadReceipt(ctx context.Context, networkID int64, name string) (*ReadReceipt, error)
StoreReadReceipt(ctx context.Context, networkID int64, receipt *ReadReceipt) error
} }
type MetricsCollectorDatabase interface { type MetricsCollectorDatabase interface {
@ -180,3 +183,9 @@ type DeliveryReceipt struct {
Client string Client string
InternalMsgID string InternalMsgID string
} }
type ReadReceipt struct {
ID int64
Target string // channel or nick
Timestamp time.Time
}

View file

@ -77,6 +77,14 @@ CREATE TABLE "DeliveryReceipt" (
internal_msgid VARCHAR(255) NOT NULL, internal_msgid VARCHAR(255) NOT NULL,
UNIQUE(network, target, client) UNIQUE(network, target, client)
); );
CREATE TABLE "ReadReceipt" (
id SERIAL PRIMARY KEY,
network INTEGER NOT NULL REFERENCES "Network"(id) ON DELETE CASCADE,
target VARCHAR(255) NOT NULL,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
UNIQUE(network, target)
);
` `
var postgresMigrations = []string{ var postgresMigrations = []string{
@ -89,6 +97,15 @@ var postgresMigrations = []string{
TYPE sasl_mechanism TYPE sasl_mechanism
USING sasl_mechanism::sasl_mechanism; USING sasl_mechanism::sasl_mechanism;
`, `,
`
CREATE TABLE "ReadReceipt" (
id SERIAL PRIMARY KEY,
network INTEGER NOT NULL REFERENCES "Network"(id) ON DELETE CASCADE,
target VARCHAR(255) NOT NULL,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
UNIQUE(network, target)
);
`,
} }
type PostgresDB struct { type PostgresDB struct {
@ -500,3 +517,44 @@ func (db *PostgresDB) StoreClientDeliveryReceipts(ctx context.Context, networkID
return tx.Commit() return tx.Commit()
} }
func (db *PostgresDB) GetReadReceipt(ctx context.Context, networkID int64, name string) (*ReadReceipt, error) {
ctx, cancel := context.WithTimeout(ctx, postgresQueryTimeout)
defer cancel()
receipt := &ReadReceipt{
Target: name,
}
row := db.db.QueryRowContext(ctx,
`SELECT id, timestamp FROM "ReadReceipt" WHERE network = $1 AND target = $2`,
networkID, name)
if err := row.Scan(&receipt.ID, &receipt.Timestamp); err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
return receipt, nil
}
func (db *PostgresDB) StoreReadReceipt(ctx context.Context, networkID int64, receipt *ReadReceipt) error {
ctx, cancel := context.WithTimeout(ctx, postgresQueryTimeout)
defer cancel()
var err error
if receipt.ID != 0 {
_, err = db.db.ExecContext(ctx, `
UPDATE "ReadReceipt"
SET timestamp = $1
WHERE id = $2`,
receipt.Timestamp, receipt.ID)
} else {
err = db.db.QueryRowContext(ctx, `
INSERT INTO "ReadReceipt" (network, target, timestamp)
VALUES ($1, $2, $3)
RETURNING id`,
networkID, receipt.Target, receipt.Timestamp).Scan(&receipt.ID)
}
return err
}

View file

@ -70,6 +70,15 @@ CREATE TABLE DeliveryReceipt (
FOREIGN KEY(network) REFERENCES Network(id), FOREIGN KEY(network) REFERENCES Network(id),
UNIQUE(network, target, client) UNIQUE(network, target, client)
); );
CREATE TABLE ReadReceipt (
id INTEGER PRIMARY KEY,
network INTEGER NOT NULL,
target TEXT NOT NULL,
timestamp TEXT NOT NULL,
FOREIGN KEY(network) REFERENCES Network(id),
UNIQUE(network, target)
);
` `
var sqliteMigrations = []string{ var sqliteMigrations = []string{
@ -170,6 +179,16 @@ var sqliteMigrations = []string{
DROP TABLE Network; DROP TABLE Network;
ALTER TABLE NetworkNew RENAME TO Network; ALTER TABLE NetworkNew RENAME TO Network;
`, `,
`
CREATE TABLE ReadReceipt (
id INTEGER PRIMARY KEY,
network INTEGER NOT NULL,
target TEXT NOT NULL,
timestamp TEXT NOT NULL,
FOREIGN KEY(network) REFERENCES Network(id),
UNIQUE(network, target)
);
`,
} }
type SqliteDB struct { type SqliteDB struct {
@ -383,6 +402,17 @@ func (db *SqliteDB) DeleteUser(ctx context.Context, id int64) error {
return err return err
} }
_, err = tx.ExecContext(ctx, `DELETE FROM ReadReceipt
WHERE id IN (
SELECT ReadReceipt.id
FROM ReadReceipt
JOIN Network ON ReadReceipt.network = Network.id
WHERE Network.user = ?
)`, id)
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, `DELETE FROM Channel _, err = tx.ExecContext(ctx, `DELETE FROM Channel
WHERE id IN ( WHERE id IN (
SELECT Channel.id SELECT Channel.id
@ -545,6 +575,11 @@ func (db *SqliteDB) DeleteNetwork(ctx context.Context, id int64) error {
return err return err
} }
_, err = tx.ExecContext(ctx, "DELETE FROM ReadReceipt WHERE network = ?", id)
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DELETE FROM Channel WHERE network = ?", id) _, err = tx.ExecContext(ctx, "DELETE FROM Channel WHERE network = ?", id)
if err != nil { if err != nil {
return err return err
@ -719,3 +754,69 @@ func (db *SqliteDB) StoreClientDeliveryReceipts(ctx context.Context, networkID i
return tx.Commit() return tx.Commit()
} }
func (db *SqliteDB) GetReadReceipt(ctx context.Context, networkID int64, name string) (*ReadReceipt, error) {
db.lock.RLock()
defer db.lock.RUnlock()
ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
defer cancel()
receipt := &ReadReceipt{
Target: name,
}
row := db.db.QueryRowContext(ctx, `
SELECT id, timestamp FROM ReadReceipt WHERE network = :network AND target = :target`,
sql.Named("network", networkID),
sql.Named("target", name),
)
var timestamp string
if err := row.Scan(&receipt.ID, &timestamp); err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
if t, err := time.Parse(serverTimeLayout, timestamp); err != nil {
return nil, err
} else {
receipt.Timestamp = t
}
return receipt, nil
}
func (db *SqliteDB) StoreReadReceipt(ctx context.Context, networkID int64, receipt *ReadReceipt) error {
db.lock.Lock()
defer db.lock.Unlock()
ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
defer cancel()
args := []interface{}{
sql.Named("id", receipt.ID),
sql.Named("timestamp", receipt.Timestamp.UTC().Format(serverTimeLayout)),
sql.Named("network", networkID),
sql.Named("target", receipt.Target),
}
var err error
if receipt.ID != 0 {
_, err = db.db.ExecContext(ctx, `
UPDATE ReadReceipt SET timestamp = :timestamp WHERE id = :id`,
args...)
} else {
var res sql.Result
res, err = db.db.ExecContext(ctx, `
INSERT INTO
ReadReceipt(network, target, timestamp)
VALUES (:network, :target, :timestamp)`,
args...)
if err != nil {
return err
}
receipt.ID, err = res.LastInsertId()
}
return err
}

128
doc/read.md Normal file
View file

@ -0,0 +1,128 @@
# read
This is a work-in-progress specification.
## Description
This document describes the format of the `read` extension. This enables several clients of the same user connected to a bouncer to tell each other about which messages have been read in each buffer (channel or query).
These "read" receipts mean that the actual user has read the message, and is typically useful to clear highlight notifications on other clients. This specification is *not* about message delivery receipts at the client socket level.
The server as mentioned in this document refers to the IRC bouncer the clients are connected to. No messages or capabilities introduced by this specification are exchanged with the actual upstream server the bouncer is connected to.
## Implementation
The `read` extension uses the `soju.im/read` capability and introduces a new command, `READ`.
The `soju.im/read` capability MAY be negotiated, and affects which messages are sent by the server as specified below.
### `READ` Command
The `READ` command can be sent by both clients and servers.
This command has the following general syntax:
READ <target> [<timestamp>]
The `target` parameter specifies a single buffer (channel or nickname).
The `timestamp` parameter, if specified, MUST be a literal `*`, or have the format `timestamp=YYYY-MM-DDThh:mm:ss.sssZ`, as in the [server-time](https://github.com/ircv3/ircv3-specifications/blob/master/extensions/server-time-3.2.md) extension.
#### `READ` client set command
READ <target> <timestamp>
When sent from a client, this `READ` command signals to the server that the last message read by the user, to the best knowledge of the client, has the specified timestamp. The timestamp MUST correspond to a previous message `time` tag. The timestamp MUST NOT be a literal `*`.
The server MUST reply to a successful `READ` set command using a `READ` server command, or using an error message.
#### `READ` client get command
READ <target>
When sent from a client, this `READ` command requests the server about the timestamp of the last message read by the user.
The server MUST reply to a successful `READ` get command using a `READ` server command, or using an error message.
#### `READ` server command
When sent from a server, the `READ` command signals to the client that the last message read by the user, to the best knowledge of the server, has the specified timestamp. In that case, the command has the following syntax:
<prefix> READ <target> <timestamp>
The `prefix` is the prefix of the client the message is sent to.
If there is no known last message read timestamp, the `timestamp` parameter is a literal `*`. Otherwise, it is the formatted timestamp of the last read message.
#### Command flows
The server sends a `READ` command to a client in the following cases.
If the `soju.im/read` capability is negotiated, after the server sends a server `JOIN` command to the client for a corresponding channel, the server MUST send a `READ` command for that channel. The command MUST be sent before the `RPL_ENDOFNAMES` reply for that channel following the `JOIN`.
If the `soju.im/read` capability is negotiated, after the last read timestamp of a target changes, the server SHOULD send a `READ` command for that target to all the clients of the user.
#### Read timestamp notes
The last read timestamp of a target SHOULD only ever increase. If a client sends a `READ` command with a timestamp that is below or equal to the current known timestamp of the server, the server SHOULD reply with a `READ` command with the newer, previous value that was stored and ignore the client timestamp.
#### Errors and Warnings
Errors are returned using the standard replies syntax.
If the server receives a `READ` command with missing parameters, the `NEED_MORE_PARAMS` error code MUST be returned.
FAIL READ NEED_MORE_PARAMS :Missing parameters
If the selectors were invalid, the `INVALID_PARAMS` error code SHOULD be returned.
FAIL READ INVALID_PARAMS [invalid_parameters] :Invalid parameters
If the read timestamp cannot be set or returned due to an error, the `INTERNAL_ERROR` error code SHOULD be returned.
FAIL READ INTERNAL_ERROR the_given_target [extra_context] :The read timestamp could not be set
### Examples
Updating the read timestamp after the user receives and reads a message
~~~~
[s] @2019-01-04T14:33:26.123Z :nick!ident@host PRIVMSG #channel :message
[c] READ #channel timestamp=2019-01-04T14:33:26.123Z
[s] :irc.host READ #channel timestamp=2019-01-04T14:33:26.123Z
~~~~
Getting the read timestamp automatically after joining a channel when the capability is negotiated
~~~~
[s] :nick!ident@host JOIN #channel
[s] :irc.host READ #channel timestamp=2019-01-04T14:33:26.123Z
~~~~
Getting the read timestamp automatically for a channel without any set timestamp
~~~~
[s] :nick!ident@host JOIN #channel
[s] :irc.host READ #channel *
~~~~
Asking the server about the read timestamp for a particular user
~~~~
[c] READ target
[s] :irc.host READ target timestamp=2019-01-04T14:33:26.123Z
~~~~
## Use Cases
Clients can know whether a user has already read newly received messages. For clients that display notifications about new messages or highlights, knowing when messages have been read can enable them to clear notifications for messages that were already read on another device.
Clients never have to actively get the read timestamp because it is provided to them on join and as updated by the server, except for user targets where they have to request the initial read timestamp by sending a `READ` client get command.
## Implementation Considerations
Server implementations can typically store a per-target timestamp variable that stores the timestamp of the last read message. When it receives a new timestamp, it can clamp it between the last read timestamp and the current time, and broadcast the new value to all clients if it was changed.
Client implementations can know when a user has read messages by using various techniques such as when the focus shifts to their window or activity, when the messages are scrolled, when the user is idle, etc. They should not assume that any message appended to the buffer is being read by the client right now, especially when the window does not have the focus or is not visible. It is indeed a best-effort value.
Clients should typically only need to use the `READ` get client command to get the initial read timestamp of user buffers they open. They will automatically receive initial channels read timestamps and updates, as well as user target timestamp updates.
## Security Considerations
No last read timestamp is ever exchanged with the actual upstream server the bouncer is connected to, so there is no privacy risk that the server might leak or use this read data to infer when the user is online.

View file

@ -225,6 +225,7 @@ var permanentDownstreamCaps = map[string]string{
"soju.im/bouncer-networks": "", "soju.im/bouncer-networks": "",
"soju.im/bouncer-networks-notify": "", "soju.im/bouncer-networks-notify": "",
"soju.im/read": "",
} }
// needAllDownstreamCaps is the list of downstream capabilities that // needAllDownstreamCaps is the list of downstream capabilities that
@ -553,6 +554,9 @@ func (dc *downstreamConn) SendMessage(msg *irc.Message) {
if msg.Command == "ACCOUNT" && !dc.caps["account-notify"] { if msg.Command == "ACCOUNT" && !dc.caps["account-notify"] {
return return
} }
if msg.Command == "READ" && !dc.caps["soju.im/read"] {
return
}
dc.srv.metrics.downstreamOutMessagesTotal.Inc() dc.srv.metrics.downstreamOutMessagesTotal.Inc()
dc.conn.SendMessage(context.TODO(), msg) dc.conn.SendMessage(context.TODO(), msg)
@ -1469,7 +1473,7 @@ func (dc *downstreamConn) welcome(ctx context.Context) error {
Params: []string{dc.marshalEntity(ch.conn.network, ch.Name)}, Params: []string{dc.marshalEntity(ch.conn.network, ch.Name)},
}) })
forwardChannel(dc, ch) forwardChannel(ctx, dc, ch)
} }
}) })
@ -1817,7 +1821,7 @@ func (dc *downstreamConn) handleMessageRegistered(ctx context.Context, msg *irc.
if key != "" { if key != "" {
ch.Key = key ch.Key = key
} }
uc.network.attach(ch) uc.network.attach(ctx, ch)
} else { } else {
ch = &Channel{ ch = &Channel{
Name: upstreamName, Name: upstreamName,
@ -2750,6 +2754,85 @@ func (dc *downstreamConn) handleMessageRegistered(ctx context.Context, msg *irc.
dc.SendMessage(dc.marshalMessage(msg, network)) dc.SendMessage(dc.marshalMessage(msg, network))
} }
}) })
case "READ":
var target, criteria string
if err := parseMessageParams(msg, &target); err != nil {
return ircError{&irc.Message{
Command: "FAIL",
Params: []string{"READ", "NEED_MORE_PARAMS", "Missing parameters"},
}}
}
if len(msg.Params) > 1 {
criteria = msg.Params[1]
}
uc, entity, err := dc.unmarshalEntity(target)
if err != nil {
return err
}
entityCM := uc.network.casemap(entity)
r, err := dc.srv.db.GetReadReceipt(ctx, uc.network.ID, entityCM)
if err != nil {
dc.logger.Printf("failed to get the read receipt for %q: %v", entity, err)
return ircError{&irc.Message{
Command: "FAIL",
Params: []string{"READ", "INTERNAL_ERROR", target, "Internal error"},
}}
} else if r == nil {
r = &ReadReceipt{
Target: entityCM,
}
}
broadcast := false
if len(criteria) > 0 {
// TODO: support msgid criteria
criteriaParts := strings.SplitN(criteria, "=", 2)
if len(criteriaParts) != 2 || criteriaParts[0] != "timestamp" {
return ircError{&irc.Message{
Command: "FAIL",
Params: []string{"READ", "INVALID_PARAMS", criteria, "Unknown criteria"},
}}
}
timestamp, err := time.Parse(serverTimeLayout, criteriaParts[1])
if err != nil {
return ircError{&irc.Message{
Command: "FAIL",
Params: []string{"READ", "INVALID_PARAMS", criteria, "Invalid criteria"},
}}
}
now := time.Now()
if timestamp.After(now) {
timestamp = now
}
if r.Timestamp.Before(timestamp) {
r.Timestamp = timestamp
if err := dc.srv.db.StoreReadReceipt(ctx, uc.network.ID, r); err != nil {
dc.logger.Printf("failed to store receipt for %q: %v", entity, err)
return ircError{&irc.Message{
Command: "FAIL",
Params: []string{"READ", "INTERNAL_ERROR", target, "Internal error"},
}}
}
broadcast = true
}
}
timestampStr := "*"
if !r.Timestamp.IsZero() {
timestampStr = fmt.Sprintf("timestamp=%s", r.Timestamp.UTC().Format(serverTimeLayout))
}
uc.forEachDownstream(func(d *downstreamConn) {
if broadcast || dc.id == d.id {
d.SendMessage(&irc.Message{
Prefix: d.prefix(),
Command: "READ",
Params: []string{d.marshalEntity(uc.network, entity), timestampStr},
})
}
})
case "BOUNCER": case "BOUNCER":
var subcommand string var subcommand string
if err := parseMessageParams(msg, &subcommand); err != nil { if err := parseMessageParams(msg, &subcommand); err != nil {

View file

@ -1337,7 +1337,7 @@ func (uc *upstreamConn) handleMessage(ctx context.Context, msg *irc.Message) err
c := uc.network.channels.Value(name) c := uc.network.channels.Value(name)
if c == nil || !c.Detached { if c == nil || !c.Detached {
uc.forEachDownstream(func(dc *downstreamConn) { uc.forEachDownstream(func(dc *downstreamConn) {
forwardChannel(dc, ch) forwardChannel(ctx, dc, ch)
}) })
} }
case irc.RPL_WHOREPLY: case irc.RPL_WHOREPLY:
@ -1768,7 +1768,7 @@ func (uc *upstreamConn) handleDetachedMessage(ctx context.Context, ch *Channel,
}) })
} }
if ch.ReattachOn == FilterMessage || (ch.ReattachOn == FilterHighlight && uc.network.isHighlight(msg)) { if ch.ReattachOn == FilterMessage || (ch.ReattachOn == FilterHighlight && uc.network.isHighlight(msg)) {
uc.network.attach(ch) uc.network.attach(ctx, ch)
if err := uc.srv.db.StoreChannel(ctx, uc.network.ID, ch); err != nil { if err := uc.srv.db.StoreChannel(ctx, uc.network.ID, ch); err != nil {
uc.logger.Printf("failed to update channel %q: %v", ch.Name, err) uc.logger.Printf("failed to update channel %q: %v", ch.Name, err)
} }

View file

@ -303,7 +303,7 @@ func (net *network) detach(ch *Channel) {
}) })
} }
func (net *network) attach(ch *Channel) { func (net *network) attach(ctx context.Context, ch *Channel) {
if !ch.Detached { if !ch.Detached {
return return
} }
@ -329,11 +329,11 @@ func (net *network) attach(ch *Channel) {
}) })
if uch != nil { if uch != nil {
forwardChannel(dc, uch) forwardChannel(ctx, dc, uch)
} }
if detachedMsgID != "" { if detachedMsgID != "" {
dc.sendTargetBacklog(context.TODO(), net, ch.Name, detachedMsgID) dc.sendTargetBacklog(ctx, net, ch.Name, detachedMsgID)
} }
}) })
} }