Skip to content

Commit

Permalink
add integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
sancar committed Jun 7, 2024
1 parent 368f067 commit ab408dd
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 3 deletions.
30 changes: 30 additions & 0 deletions cmd_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,36 @@ func TestStreamDelete(t *testing.T) {
),
),
)

mustDo(t, c,
"XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", ">",
proto.Array(
proto.Array(
proto.String("planets"),
proto.Array(
proto.Array(
proto.String("0-4"),
proto.Strings("name", "Jupiter"),
),
),
),
),
)

mustDo(t, c,
"XINFO", "GROUPS", "planets",
proto.Array(
proto.Array(
proto.String("name"), proto.String("processing"),
proto.String("consumers"), proto.Int(1),
proto.String("pending"), proto.Int(1),
proto.String("last-delivered-id"), proto.String("0-4"),
proto.String("entries-read"), proto.Int(4),
proto.String("lag"), proto.Int(0),
),
),
)

}

// Test XACK
Expand Down
24 changes: 24 additions & 0 deletions integration/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,30 @@ func TestStreamGroup(t *testing.T) {
})
})

t.Run("XREADGROUP XDEL XINFO lag", func(t *testing.T) {
testRaw(t, func(c *client) {
// TODO ALL INFO needs to be uncommented, even DoLoosely is not enough
// for some of them becase Nil is returned instead of Int
c.Do("XGROUP", "CREATE", "planets", "processing", "$", "MKSTREAM")
c.Do("XADD", "planets", "0-1", "name", "Mercury")
c.Do("XADD", "planets", "0-2", "name", "Venus")
c.Do("XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", ">")
//c.Do("XINFO", "GROUPS", "planets") //SPEC entries-added 2, entries-read = 2, lag = 0
c.Do("XADD", "planets", "0-3", "name", "Earth")
c.Do("XADD", "planets", "0-4", "name", "Jupiter")
//c.Do("XINFO", "GROUPS", "planets") //SPEC entries-added 4, entries-read = 2, lag = 2

c.Do("XDEL", "planets", "0-1")
//c.Do("XINFO", "GROUPS", "planets") // SPEC entries-added 4, entries-read = 2, lag = 2
c.Do("XDEL", "planets", "0-2")
//c.Do("XINFO", "GROUPS", "planets") // SPEC entries-added 4, entries-read = 2, lag = 2
c.Do("XDEL", "planets", "0-3")
//c.Do("XINFO", "GROUPS", "planets") // SPEC entries-added 4, entries-read = 2, lag = nil
c.Do("XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", ">")
//c.Do("XINFO", "GROUPS", "planets") // SPEC entries-added 4, entries-read = 4, lag = 0
})
})

t.Run("XACK", func(t *testing.T) {
testRaw(t, func(c *client) {
c.Do("XGROUP", "CREATE", "planets", "processing", "$", "MKSTREAM")
Expand Down
2 changes: 1 addition & 1 deletion integration/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (c *client) Do(cmd string, args ...string) {
// c.t.Logf("real:%q mini:%q", string(resReal), string(resMini))

if resReal != resMini {
c.t.Errorf("real: %q mini: %q", string(resReal), string(resMini))
c.t.Errorf("\nreal: %q \nmini: %q", string(resReal), string(resMini))
return
}

Expand Down
5 changes: 3 additions & 2 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (s *streamKey) createGroup(group, id string) error {
var entriesReadValid = true
if id == "$" {
id = s.lastIDUnlocked()
entriesRead = len(s.entries)
entriesRead = s.entriesAdded
} else if id == "0" || id == "0-0" || id == s.firstIDUnlocked() {
entriesRead = 0
} else {
Expand Down Expand Up @@ -565,6 +565,7 @@ func (g *streamGroup) updateEntriesRead(msgs []StreamEntry) {
g.entriesRead += len(msgs)
} else if g.lastID == g.stream.lastIDUnlocked() {
// reset entries read as we catch up to the last ID
g.entriesRead = len(g.stream.entries)
g.entriesRead = g.stream.entriesAdded
g.entriesReadValid = true
}
}

0 comments on commit ab408dd

Please sign in to comment.