Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions rueidiscompat/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6126,6 +6126,12 @@ func testAdapter(resp3 bool) {
Expect(err).NotTo(HaveOccurred())
res.RadixTreeKeys = 0
res.RadixTreeNodes = 0
res.IDMPDuration = 0
res.IDMPMaxSize = 0
res.PIDsTracked = 0
res.IIDsTracked = 0
res.IIDsAdded = 0
res.IIDsDuplicates = 0

if resp3 {
Expect(res).To(Equal(XInfoStream{
Expand Down Expand Up @@ -6173,6 +6179,12 @@ func testAdapter(resp3 bool) {
Expect(err).NotTo(HaveOccurred())
res.RadixTreeKeys = 0
res.RadixTreeNodes = 0
res.IDMPDuration = 0
res.IDMPMaxSize = 0
res.PIDsTracked = 0
res.IIDsTracked = 0
res.IIDsAdded = 0
res.IIDsDuplicates = 0

if resp3 {
Expect(res).To(Equal(XInfoStream{
Expand Down Expand Up @@ -6206,6 +6218,12 @@ func testAdapter(resp3 bool) {
Expect(err).NotTo(HaveOccurred())
res.RadixTreeKeys = 0
res.RadixTreeNodes = 0
res.IDMPDuration = 0
res.IDMPMaxSize = 0
res.PIDsTracked = 0
res.IIDsTracked = 0
res.IIDsAdded = 0
res.IIDsDuplicates = 0

// Verify DeliveryTime
now := time.Now()
Expand Down Expand Up @@ -12106,6 +12124,56 @@ func testAdapterRedis86() {
Expect(err).NotTo(HaveOccurred())
Expect(status).To(Equal("OK"))
})

It("should XINFO STREAM with IDMP fields", func() {
streamName := "xinfo-idmp-stream"

_, err := adapter.XAdd(ctx, XAddArgs{
Stream: streamName,
Values: map[string]any{"field1": "value1"},
}).Result()
Expect(err).NotTo(HaveOccurred())

_, err = adapter.XAdd(ctx, XAddArgs{
Stream: streamName,
Values: map[string]any{"field2": "value2"},
}).Result()
Expect(err).NotTo(HaveOccurred())

res, err := adapter.XInfoStream(ctx, streamName).Result()
Expect(err).NotTo(HaveOccurred())

Expect(res.Length).To(Equal(int64(2)))
Expect(res.EntriesAdded).To(Equal(int64(2)))
Expect(res.IDMPDuration).To(Equal(int64(100)))
Expect(res.IDMPMaxSize).To(Equal(int64(100)))
Expect(res.PIDsTracked).To(BeNumerically(">=", 0))
Expect(res.IIDsTracked).To(BeNumerically(">=", 0))
Expect(res.IIDsAdded).To(BeNumerically(">=", 0))
Expect(res.IIDsDuplicates).To(BeNumerically(">=", 0))
})

It("should XINFO STREAM FULL with IDMP fields", func() {
streamName := "xinfo-full-idmp-stream"

_, err := adapter.XAdd(ctx, XAddArgs{
Stream: streamName,
Values: map[string]any{"field1": "value1"},
}).Result()
Expect(err).NotTo(HaveOccurred())

res, err := adapter.XInfoStreamFull(ctx, streamName, 0).Result()
Expect(err).NotTo(HaveOccurred())

Expect(res.Length).To(Equal(int64(1)))
Expect(res.EntriesAdded).To(Equal(int64(1)))
Expect(res.IDMPDuration).To(Equal(int64(100)))
Expect(res.IDMPMaxSize).To(Equal(int64(100)))
Expect(res.PIDsTracked).To(BeNumerically(">=", 0))
Expect(res.IIDsTracked).To(BeNumerically(">=", 0))
Expect(res.IIDsAdded).To(BeNumerically(">=", 0))
Expect(res.IIDsDuplicates).To(BeNumerically(">=", 0))
})
})
}

Expand Down
60 changes: 54 additions & 6 deletions rueidiscompat/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1369,6 +1369,12 @@ type XInfoStream struct {
RadixTreeNodes int64
Groups int64
EntriesAdded int64
IDMPDuration int64
IDMPMaxSize int64
PIDsTracked int64
IIDsTracked int64
IIDsAdded int64
IIDsDuplicates int64
}
type XInfoStreamCmd struct {
baseCmd[XInfoStream]
Expand All @@ -1393,6 +1399,27 @@ func (cmd *XInfoStreamCmd) from(res rueidis.RedisResult) {
if v, ok := kv["groups"]; ok {
val.Groups, _ = v.AsInt64()
}
if v, ok := kv["entries-added"]; ok {
val.EntriesAdded, _ = v.AsInt64()
}
if v, ok := kv["idmp-duration"]; ok {
val.IDMPDuration, _ = v.AsInt64()
}
if v, ok := kv["idmp-maxsize"]; ok {
val.IDMPMaxSize, _ = v.AsInt64()
}
if v, ok := kv["pids-tracked"]; ok {
val.PIDsTracked, _ = v.AsInt64()
}
if v, ok := kv["iids-tracked"]; ok {
val.IIDsTracked, _ = v.AsInt64()
}
if v, ok := kv["iids-added"]; ok {
val.IIDsAdded, _ = v.AsInt64()
}
if v, ok := kv["iids-duplicates"]; ok {
val.IIDsDuplicates, _ = v.AsInt64()
}
if v, ok := kv["last-generated-id"]; ok {
val.LastGeneratedID, _ = v.ToString()
}
Expand All @@ -1402,9 +1429,6 @@ func (cmd *XInfoStreamCmd) from(res rueidis.RedisResult) {
if v, ok := kv["recorded-first-entry-id"]; ok {
val.RecordedFirstEntryID, _ = v.ToString()
}
if v, ok := kv["entries-added"]; ok {
val.EntriesAdded, _ = v.AsInt64()
}
if v, ok := kv["first-entry"]; ok {
if r, err := v.AsXRangeEntry(); err == nil {
val.FirstEntry = newXMessage(r)
Expand Down Expand Up @@ -1464,6 +1488,12 @@ type XInfoStreamFull struct {
RadixTreeKeys int64
RadixTreeNodes int64
EntriesAdded int64
IDMPDuration int64
IDMPMaxSize int64
PIDsTracked int64
IIDsTracked int64
IIDsAdded int64
IIDsDuplicates int64
}

type XInfoStreamFullCmd struct {
Expand All @@ -1486,12 +1516,30 @@ func (cmd *XInfoStreamFullCmd) from(res rueidis.RedisResult) {
if v, ok := kv["radix-tree-nodes"]; ok {
val.RadixTreeNodes, _ = v.AsInt64()
}
if v, ok := kv["last-generated-id"]; ok {
val.LastGeneratedID, _ = v.ToString()
}
if v, ok := kv["entries-added"]; ok {
val.EntriesAdded, _ = v.AsInt64()
}
if v, ok := kv["idmp-duration"]; ok {
val.IDMPDuration, _ = v.AsInt64()
}
if v, ok := kv["idmp-maxsize"]; ok {
val.IDMPMaxSize, _ = v.AsInt64()
}
if v, ok := kv["pids-tracked"]; ok {
val.PIDsTracked, _ = v.AsInt64()
}
if v, ok := kv["iids-tracked"]; ok {
val.IIDsTracked, _ = v.AsInt64()
}
if v, ok := kv["iids-added"]; ok {
val.IIDsAdded, _ = v.AsInt64()
}
if v, ok := kv["iids-duplicates"]; ok {
val.IIDsDuplicates, _ = v.AsInt64()
}
if v, ok := kv["last-generated-id"]; ok {
val.LastGeneratedID, _ = v.ToString()
}
if v, ok := kv["max-deleted-entry-id"]; ok {
val.MaxDeletedEntryID, _ = v.ToString()
}
Expand Down
Loading