diff --git a/msgstore_memory.go b/msgstore_memory.go new file mode 100644 index 0000000..7c79bab --- /dev/null +++ b/msgstore_memory.go @@ -0,0 +1,144 @@ +package soju + +import ( + "fmt" + "strconv" + "time" + + "gopkg.in/irc.v3" +) + +const messageRingBufferCap = 4096 + +func parseMemoryMsgID(s string) (netID int64, entity string, seq uint64, err error) { + netID, entity, extra, err := parseMsgID(s) + if err != nil { + return 0, "", 0, err + } + seq, err = strconv.ParseUint(extra, 10, 64) + if err != nil { + return 0, "", 0, fmt.Errorf("failed to parse message ID %q: %v", s, err) + } + return netID, entity, seq, nil +} + +func formatMemoryMsgID(netID int64, entity string, seq uint64) string { + extra := strconv.FormatUint(seq, 10) + return formatMsgID(netID, entity, extra) +} + +type ringBufferKey struct { + networkID int64 + entity string +} + +type memoryMessageStore struct { + buffers map[ringBufferKey]*messageRingBuffer +} + +func newMemoryMessageStore() *memoryMessageStore { + return &memoryMessageStore{ + buffers: make(map[ringBufferKey]*messageRingBuffer), + } +} + +func (ms *memoryMessageStore) Close() error { + ms.buffers = nil + return nil +} + +func (ms *memoryMessageStore) get(network *network, entity string) *messageRingBuffer { + k := ringBufferKey{networkID: network.ID, entity: entity} + if rb, ok := ms.buffers[k]; ok { + return rb + } + rb := newMessageRingBuffer(messageRingBufferCap) + ms.buffers[k] = rb + return rb +} + +func (ms *memoryMessageStore) LastMsgID(network *network, entity string, t time.Time) (string, error) { + var seq uint64 + k := ringBufferKey{networkID: network.ID, entity: entity} + if rb, ok := ms.buffers[k]; ok { + seq = rb.cur + } + return formatMemoryMsgID(network.ID, entity, seq), nil +} + +func (ms *memoryMessageStore) Append(network *network, entity string, msg *irc.Message) (string, error) { + k := ringBufferKey{networkID: network.ID, entity: entity} + rb, ok := ms.buffers[k] + if !ok { + rb = newMessageRingBuffer(messageRingBufferCap) + ms.buffers[k] = rb + } + + seq := rb.Append(msg) + return formatMemoryMsgID(network.ID, entity, seq), nil +} + +func (ms *memoryMessageStore) LoadLatestID(network *network, entity, id string, limit int) ([]*irc.Message, error) { + _, _, seq, err := parseMemoryMsgID(id) + if err != nil { + return nil, err + } + + k := ringBufferKey{networkID: network.ID, entity: entity} + rb, ok := ms.buffers[k] + if !ok { + return nil, nil + } + + return rb.LoadLatestSeq(seq, limit) +} + +type messageRingBuffer struct { + buf []*irc.Message + cur uint64 +} + +func newMessageRingBuffer(capacity int) *messageRingBuffer { + return &messageRingBuffer{ + buf: make([]*irc.Message, capacity), + cur: 1, + } +} + +func (rb *messageRingBuffer) cap() uint64 { + return uint64(len(rb.buf)) +} + +func (rb *messageRingBuffer) Append(msg *irc.Message) uint64 { + seq := rb.cur + i := int(seq % rb.cap()) + rb.buf[i] = msg + rb.cur++ + return seq +} + +func (rb *messageRingBuffer) LoadLatestSeq(seq uint64, limit int) ([]*irc.Message, error) { + if seq > rb.cur { + return nil, fmt.Errorf("loading messages from sequence number (%v) greater than current (%v)", seq, rb.cur) + } else if seq == rb.cur { + return nil, nil + } + + // The query excludes the message with the sequence number seq + diff := rb.cur - seq - 1 + if diff > rb.cap() { + // We dropped diff - cap entries + diff = rb.cap() + } + if int(diff) > limit { + diff = uint64(limit) + } + + l := make([]*irc.Message, int(diff)) + for i := 0; i < int(diff); i++ { + j := int((rb.cur - diff + uint64(i)) % rb.cap()) + l[i] = rb.buf[j] + } + + return l, nil +} diff --git a/user.go b/user.go index f71f21f..2864910 100644 --- a/user.go +++ b/user.go @@ -279,6 +279,8 @@ func newUser(srv *Server, record *User) *user { var msgStore messageStore if srv.LogPath != "" { msgStore = newFSMessageStore(srv.LogPath, record.Username) + } else { + msgStore = newMemoryMessageStore() } return &user{