Skip to content

Commit 24b1208

Browse files
authored
feat(gcpspanner): Implement Notification Channel Delivery Attempts (#1988)
This commit introduces the Notification Channel Delivery Attempts feature in the GCP Spanner library. It includes the ability to create and list delivery attempts associated with notification channels. For now, each channel will only keep track of the last 10 delivery attempts (we will adjust this number in the future). This is similar to GitHub webhooks delivery attempts. - **Database Schema (New Migration File):** - **`NotificationChannelDeliveryAttempts` table:** Logs individual delivery attempts with a composite primary key `(ID, ChannelID)`. - **Go Client-Side Implementation (`lib/gcpspanner`):** - **`notification_channel_delivery_attempt.go`:** Implemented `Create` and `List` functionality for delivery attempts, including logic to automatically prune old attempts. - **Testing (`lib/gcpspanner`):** - Added comprehensive integration tests for all new functionality.
1 parent a04e66f commit 24b1208

3 files changed

Lines changed: 454 additions & 0 deletions

File tree

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
-- Copyright 2025 Google LLC
2+
--
3+
-- Licensed under the Apache License, Version 2.0 (the "License");
4+
-- you may not use this file except in compliance with the License.
5+
-- You may obtain a copy of the License at
6+
--
7+
-- http://www.apache.org/licenses/LICENSE-2.0
8+
--
9+
-- Unless required by applicable law or agreed to in writing, software
10+
-- distributed under the License is distributed on an "AS IS" BASIS,
11+
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
-- See the License for the specific language governing permissions and
13+
-- limitations under the License.
14+
15+
-- NotificationChannelDeliveryAttempts stores a log of delivery attempts for a channel.
16+
CREATE TABLE IF NOT EXISTS NotificationChannelDeliveryAttempts (
17+
ID STRING(36) NOT NULL DEFAULT (GENERATE_UUID()),
18+
ChannelID STRING(36) NOT NULL,
19+
AttemptTimestamp TIMESTAMP NOT NULL,
20+
Status STRING(MAX) NOT NULL, -- SUCCESS or FAILURE
21+
Details JSON,
22+
CONSTRAINT FK_NotificationChannelDeliveryAttempt_NotificationChannel FOREIGN KEY (ChannelID) REFERENCES NotificationChannels(ID) ON DELETE CASCADE
23+
) PRIMARY KEY (ID, ChannelID);
24+
25+
-- Index to get the latest attempts for a channel
26+
CREATE INDEX IX_NotificationChannelDeliveryAttempt_AttemptTimestamp
27+
ON NotificationChannelDeliveryAttempts (ChannelID, AttemptTimestamp DESC);
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package gcpspanner
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"time"
21+
22+
"cloud.google.com/go/spanner"
23+
)
24+
25+
const notificationChannelDeliveryAttemptTable = "NotificationChannelDeliveryAttempts"
26+
const maxDeliveryAttemptsToKeep = 10
27+
28+
// NotificationChannelDeliveryAttempt represents a row in the NotificationChannelDeliveryAttempt table.
29+
type NotificationChannelDeliveryAttempt struct {
30+
ID string `spanner:"ID"`
31+
ChannelID string `spanner:"ChannelID"`
32+
AttemptTimestamp time.Time `spanner:"AttemptTimestamp"`
33+
Status NotificationChannelDeliveryAttemptStatus `spanner:"Status"`
34+
Details spanner.NullJSON `spanner:"Details"`
35+
}
36+
37+
type NotificationChannelDeliveryAttemptStatus string
38+
39+
const (
40+
// DeliveryAttemptStatusSuccess indicates that the delivery attempt was successful.
41+
DeliveryAttemptStatusSuccess NotificationChannelDeliveryAttemptStatus = "SUCCESS"
42+
// DeliveryAttemptStatusFailure indicates that the delivery attempt failed.
43+
DeliveryAttemptStatusFailure NotificationChannelDeliveryAttemptStatus = "FAILURE"
44+
)
45+
46+
// CreateNotificationChannelDeliveryAttemptRequest is the request to create a delivery attempt.
47+
type CreateNotificationChannelDeliveryAttemptRequest struct {
48+
ChannelID string
49+
AttemptTimestamp time.Time
50+
Status NotificationChannelDeliveryAttemptStatus
51+
Details spanner.NullJSON
52+
}
53+
54+
// ListNotificationChannelDeliveryAttemptsRequest is the request struct for listing delivery attempts.
55+
type ListNotificationChannelDeliveryAttemptsRequest struct {
56+
ChannelID string
57+
PageSize int
58+
PageToken *string
59+
}
60+
61+
// GetPageSize returns the page size for the request.
62+
func (r ListNotificationChannelDeliveryAttemptsRequest) GetPageSize() int {
63+
return r.PageSize
64+
}
65+
66+
type notificationChannelDeliveryAttemptMapper struct{}
67+
68+
func (m notificationChannelDeliveryAttemptMapper) Table() string {
69+
return notificationChannelDeliveryAttemptTable
70+
}
71+
72+
func (m notificationChannelDeliveryAttemptMapper) NewEntity(
73+
id string,
74+
req CreateNotificationChannelDeliveryAttemptRequest) (NotificationChannelDeliveryAttempt, error) {
75+
return NotificationChannelDeliveryAttempt{
76+
ID: id,
77+
ChannelID: req.ChannelID,
78+
AttemptTimestamp: req.AttemptTimestamp,
79+
Status: req.Status,
80+
Details: req.Details,
81+
}, nil
82+
}
83+
84+
type deliveryAttemptKey struct {
85+
ID string
86+
ChannelID string
87+
}
88+
89+
func (m notificationChannelDeliveryAttemptMapper) SelectOne(key deliveryAttemptKey) spanner.Statement {
90+
stmt := spanner.NewStatement(`
91+
SELECT ID, ChannelID, AttemptTimestamp, Status, Details
92+
FROM NotificationChannelDeliveryAttempts
93+
WHERE ID = @id AND ChannelID = @channelID`)
94+
stmt.Params = map[string]interface{}{
95+
"id": key.ID,
96+
"channelID": key.ChannelID,
97+
}
98+
99+
return stmt
100+
}
101+
102+
func (m notificationChannelDeliveryAttemptMapper) SelectList(
103+
req ListNotificationChannelDeliveryAttemptsRequest) spanner.Statement {
104+
var pageFilter string
105+
params := map[string]interface{}{
106+
"channelID": req.ChannelID,
107+
"pageSize": req.PageSize,
108+
}
109+
if req.PageToken != nil {
110+
cursor, err := decodeCursor[notificationChannelDeliveryAttemptCursor](*req.PageToken)
111+
if err == nil {
112+
params["lastID"] = cursor.LastID
113+
pageFilter = " AND ID > @lastID"
114+
}
115+
}
116+
stmt := spanner.NewStatement(fmt.Sprintf(`
117+
SELECT ID, ChannelID, AttemptTimestamp, Status, Details
118+
FROM NotificationChannelDeliveryAttempts
119+
WHERE ChannelID = @channelID %s
120+
ORDER BY AttemptTimestamp, ID
121+
LIMIT @pageSize`, pageFilter))
122+
stmt.Params = params
123+
124+
return stmt
125+
}
126+
127+
type notificationChannelDeliveryAttemptCursor struct {
128+
LastID string `json:"last_id"`
129+
}
130+
131+
// EncodePageToken returns the ID of the delivery attempt as a page token.
132+
func (m notificationChannelDeliveryAttemptMapper) EncodePageToken(item NotificationChannelDeliveryAttempt) string {
133+
return encodeCursor(notificationChannelDeliveryAttemptCursor{
134+
LastID: item.ID,
135+
})
136+
}
137+
138+
// CreateNotificationChannelDeliveryAttempt creates a new delivery attempt log, prunes old ones, and returns its ID.
139+
func (c *Client) CreateNotificationChannelDeliveryAttempt(
140+
ctx context.Context, req CreateNotificationChannelDeliveryAttemptRequest) (*string, error) {
141+
var newID *string
142+
_, err := c.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
143+
// 1. Create the new attempt
144+
id, err := newEntityCreator[notificationChannelDeliveryAttemptMapper](c).createWithTransaction(ctx, txn, req)
145+
if err != nil {
146+
return err
147+
}
148+
newID = id
149+
150+
// 2. Count existing attempts for the channel. Note: This count does not include the new attempt just buffered.
151+
countStmt := spanner.NewStatement(`
152+
SELECT COUNT(*)
153+
FROM NotificationChannelDeliveryAttempts
154+
WHERE ChannelID = @channelID`)
155+
countStmt.Params["channelID"] = req.ChannelID
156+
var count int64
157+
err = txn.Query(ctx, countStmt).Do(func(r *spanner.Row) error {
158+
return r.Column(0, &count)
159+
})
160+
if err != nil {
161+
return err
162+
}
163+
164+
// 3. If the pre-insert count is at the limit, fetch the oldest attempts to delete.
165+
if count >= maxDeliveryAttemptsToKeep {
166+
// We need to delete enough to make room for the one we are adding.
167+
deleteCount := count - maxDeliveryAttemptsToKeep + 1
168+
deleteStmt := spanner.NewStatement(`
169+
SELECT ID
170+
FROM NotificationChannelDeliveryAttempts
171+
WHERE ChannelID = @channelID
172+
ORDER BY AttemptTimestamp ASC
173+
LIMIT @deleteCount`)
174+
deleteStmt.Params["channelID"] = req.ChannelID
175+
deleteStmt.Params["deleteCount"] = deleteCount
176+
177+
var mutations []*spanner.Mutation
178+
err := txn.Query(ctx, deleteStmt).Do(func(r *spanner.Row) error {
179+
var attemptID string
180+
if err := r.Column(0, &attemptID); err != nil {
181+
return err
182+
}
183+
mutations = append(mutations,
184+
spanner.Delete(notificationChannelDeliveryAttemptTable,
185+
spanner.Key{attemptID, req.ChannelID}))
186+
187+
return nil
188+
})
189+
if err != nil {
190+
return err
191+
}
192+
193+
// 4. Buffer delete mutations
194+
if len(mutations) > 0 {
195+
return txn.BufferWrite(mutations)
196+
}
197+
}
198+
199+
return nil
200+
})
201+
if err != nil {
202+
return nil, err
203+
}
204+
205+
return newID, nil
206+
}
207+
208+
// GetNotificationChannelDeliveryAttempt retrieves a single delivery attempt.
209+
func (c *Client) GetNotificationChannelDeliveryAttempt(
210+
ctx context.Context, attemptID string, channelID string) (*NotificationChannelDeliveryAttempt, error) {
211+
key := deliveryAttemptKey{ID: attemptID, ChannelID: channelID}
212+
213+
return newEntityReader[notificationChannelDeliveryAttemptMapper,
214+
NotificationChannelDeliveryAttempt, deliveryAttemptKey](c).readRowByKey(ctx, key)
215+
}
216+
217+
// ListNotificationChannelDeliveryAttempts lists all delivery attempts for a channel.
218+
func (c *Client) ListNotificationChannelDeliveryAttempts(
219+
ctx context.Context,
220+
req ListNotificationChannelDeliveryAttemptsRequest,
221+
) ([]NotificationChannelDeliveryAttempt, *string, error) {
222+
return newEntityLister[notificationChannelDeliveryAttemptMapper](c).list(ctx, req)
223+
}

0 commit comments

Comments
 (0)