Skip to content

Commit 7d6aab7

Browse files
authored
feat(gcpspanner): Implement Saved Search Subscriptions (#1989)
This commit builds upon the Notification Channels feature to introduce Saved Search Subscriptions, allowing users to be notified of changes to a saved search. - **Database Schema (New Migration File):** - **`SavedSearchSubscriptions` table:** Created to associate a `UserID`, `SavedSearchID`, and `ChannelID`. - **Go Client-Side Implementation (`lib/gcpspanner`):** - **`saved_search_subscription.go`:** Implemented full CRUD and `List` functionality for saved search subscriptions, including ownership checks. - **Testing (`lib/gcpspanner`):** - Added comprehensive integration tests covering all `SavedSearchSubscription` operations.
1 parent 24b1208 commit 7d6aab7

3 files changed

Lines changed: 670 additions & 0 deletions

File tree

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
-- Copyright 2025 Google LLC
2+
--
3+
-- Licensed under the Apache License, Version 2.0 (the "License");
4+
-- you may not use this file except in compliance with the License.
5+
-- You may obtain a copy of the License at
6+
--
7+
-- http://www.apache.org/licenses/LICENSE-2.0
8+
--
9+
-- Unless required by applicable law or agreed to in writing, software
10+
-- distributed under the License is distributed on an "AS IS" BASIS,
11+
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
-- See the License for the specific language governing permissions and
13+
-- limitations under the License.
14+
15+
-- SavedSearchSubscriptions links a User to a Saved Search via a Channel.
16+
CREATE TABLE IF NOT EXISTS SavedSearchSubscriptions (
17+
ID STRING(36) NOT NULL DEFAULT (GENERATE_UUID()),
18+
ChannelID STRING(36) NOT NULL,
19+
SavedSearchID STRING(36) NOT NULL,
20+
Triggers ARRAY<STRING(MAX)>,
21+
Frequency STRING(MAX),
22+
CreatedAt TIMESTAMP NOT NULL OPTIONS(allow_commit_timestamp = true),
23+
UpdatedAt TIMESTAMP NOT NULL OPTIONS(allow_commit_timestamp = true),
24+
CONSTRAINT FK_SavedSearchSubscription_NotificationChannel FOREIGN KEY (ChannelID) REFERENCES NotificationChannels (ID) ON DELETE CASCADE,
25+
CONSTRAINT FK_SavedSearchSubscription_SavedSearches FOREIGN KEY (SavedSearchID) REFERENCES SavedSearches (ID) ON DELETE CASCADE
26+
) PRIMARY KEY (ID);
Lines changed: 302 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,302 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package gcpspanner
16+
17+
import (
18+
"context"
19+
"errors"
20+
"fmt"
21+
"log/slog"
22+
"time"
23+
24+
"cloud.google.com/go/spanner"
25+
"google.golang.org/api/iterator"
26+
)
27+
28+
const savedSearchSubscriptionTable = "SavedSearchSubscriptions"
29+
30+
// SavedSearchSubscription represents a row in the SavedSearchSubscription table.
31+
type SavedSearchSubscription struct {
32+
ID string `spanner:"ID"`
33+
ChannelID string `spanner:"ChannelID"`
34+
SavedSearchID string `spanner:"SavedSearchID"`
35+
Triggers []string `spanner:"Triggers"`
36+
Frequency string `spanner:"Frequency"`
37+
CreatedAt time.Time `spanner:"CreatedAt"`
38+
UpdatedAt time.Time `spanner:"UpdatedAt"`
39+
}
40+
41+
// CreateSavedSearchSubscriptionRequest is the request to create a subscription.
42+
type CreateSavedSearchSubscriptionRequest struct {
43+
UserID string
44+
ChannelID string
45+
SavedSearchID string
46+
Triggers []string
47+
Frequency string
48+
}
49+
50+
// UpdateSavedSearchSubscriptionRequest is a request to update a saved search subscription.
51+
type UpdateSavedSearchSubscriptionRequest struct {
52+
ID string
53+
UserID string
54+
Triggers OptionallySet[[]string]
55+
Frequency OptionallySet[string]
56+
}
57+
58+
// ListSavedSearchSubscriptionsRequest is a request to list saved search subscriptions.
59+
type ListSavedSearchSubscriptionsRequest struct {
60+
UserID string
61+
PageSize int
62+
PageToken *string
63+
}
64+
65+
// GetPageToken returns the page token for the request.
66+
func (r ListSavedSearchSubscriptionsRequest) GetPageToken() *string {
67+
return r.PageToken
68+
}
69+
70+
// GetPageSize returns the page size for the request.
71+
func (r ListSavedSearchSubscriptionsRequest) GetPageSize() int {
72+
return r.PageSize
73+
}
74+
75+
type baseSavedSearchSubscriptionMapper struct{}
76+
77+
func (m baseSavedSearchSubscriptionMapper) Table() string {
78+
return savedSearchSubscriptionTable
79+
}
80+
81+
// savedSearchSubscriptionMapper implements the necessary interfaces for the generic helpers.
82+
type savedSearchSubscriptionMapper struct {
83+
baseSavedSearchSubscriptionMapper
84+
}
85+
86+
func (m savedSearchSubscriptionMapper) GetKeyFromExternal(
87+
in UpdateSavedSearchSubscriptionRequest) string {
88+
return in.ID
89+
}
90+
91+
func (m savedSearchSubscriptionMapper) SelectOne(key string) spanner.Statement {
92+
stmt := spanner.NewStatement(fmt.Sprintf(`
93+
SELECT
94+
ID, ChannelID, SavedSearchID, Triggers, Frequency, CreatedAt, UpdatedAt
95+
FROM %s
96+
WHERE ID = @id
97+
LIMIT 1`, m.Table()))
98+
parameters := map[string]interface{}{
99+
"id": key,
100+
}
101+
stmt.Params = parameters
102+
103+
return stmt
104+
}
105+
106+
func (m savedSearchSubscriptionMapper) SelectList(req ListSavedSearchSubscriptionsRequest) spanner.Statement {
107+
// Join with NotificationChannels to filter by UserID.
108+
var pageFilter string
109+
params := map[string]interface{}{
110+
"userID": req.UserID,
111+
"pageSize": req.PageSize,
112+
}
113+
if req.PageToken != nil {
114+
cursor, err := decodeCursor[savedSearchSubscriptionCursor](*req.PageToken)
115+
if err == nil {
116+
params["lastID"] = cursor.LastID
117+
pageFilter = " AND sc.ID > @lastID"
118+
}
119+
}
120+
query := fmt.Sprintf(`SELECT
121+
sc.ID, sc.ChannelID, sc.SavedSearchID, sc.Triggers, sc.Frequency, sc.CreatedAt, sc.UpdatedAt
122+
FROM SavedSearchSubscriptions sc
123+
JOIN NotificationChannels nc ON sc.ChannelID = nc.ID
124+
WHERE nc.UserID = @userID %s
125+
ORDER BY sc.UpdatedAt, sc.ID LIMIT @pageSize`, pageFilter)
126+
127+
stmt := spanner.NewStatement(query)
128+
stmt.Params = params
129+
130+
return stmt
131+
}
132+
133+
func (m savedSearchSubscriptionMapper) Merge(
134+
req UpdateSavedSearchSubscriptionRequest, existing SavedSearchSubscription) SavedSearchSubscription {
135+
if req.Triggers.IsSet {
136+
existing.Triggers = req.Triggers.Value
137+
}
138+
if req.Frequency.IsSet {
139+
existing.Frequency = req.Frequency.Value
140+
}
141+
142+
return existing
143+
}
144+
145+
type savedSearchSubscriptionCursor struct {
146+
LastID string `json:"last_id"`
147+
}
148+
149+
// EncodePageToken returns the ID of the subscription as a page token.
150+
func (m savedSearchSubscriptionMapper) EncodePageToken(item SavedSearchSubscription) string {
151+
return encodeCursor(savedSearchSubscriptionCursor{
152+
LastID: item.ID,
153+
})
154+
}
155+
156+
func (m savedSearchSubscriptionMapper) NewEntity(
157+
id string,
158+
req CreateSavedSearchSubscriptionRequest) (SavedSearchSubscription, error) {
159+
return SavedSearchSubscription{
160+
ID: id,
161+
ChannelID: req.ChannelID,
162+
SavedSearchID: req.SavedSearchID,
163+
Triggers: req.Triggers,
164+
Frequency: req.Frequency,
165+
CreatedAt: time.Time{},
166+
UpdatedAt: time.Time{},
167+
}, nil
168+
}
169+
170+
func (c *Client) checkNotificationChannelOwnershipBySubscriptionID(
171+
ctx context.Context, subscriptionID string, userID string, txn *spanner.ReadWriteTransaction,
172+
) error {
173+
stmt := spanner.Statement{
174+
// Join the SavedSearchSubscriptions and NotificationChannels tables to verify ownership.
175+
SQL: `SELECT
176+
sc.ID
177+
FROM SavedSearchSubscriptions sc
178+
JOIN NotificationChannels nc ON sc.ChannelID = nc.ID
179+
WHERE sc.ID = @subscriptionID AND nc.UserID = @userID
180+
LIMIT 1`,
181+
Params: map[string]interface{}{
182+
"subscriptionID": subscriptionID,
183+
"userID": userID,
184+
},
185+
}
186+
187+
iter := txn.Query(ctx, stmt)
188+
defer iter.Stop()
189+
190+
_, err := iter.Next()
191+
if err != nil {
192+
// No row found. User does not have a role.
193+
if errors.Is(err, iterator.Done) {
194+
return errors.Join(ErrMissingRequiredRole, err)
195+
}
196+
slog.ErrorContext(ctx, "failed to query user role", "error", err)
197+
198+
return errors.Join(ErrInternalQueryFailure, err)
199+
}
200+
201+
return nil
202+
}
203+
204+
// CreateSavedSearchSubscription creates a new saved search subscription.
205+
func (c *Client) CreateSavedSearchSubscription(
206+
ctx context.Context,
207+
req CreateSavedSearchSubscriptionRequest,
208+
) (*string, error) {
209+
var id *string
210+
_, err := c.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
211+
err := c.checkNotificationChannelOwnership(ctx, req.ChannelID, req.UserID, txn)
212+
if err != nil {
213+
return err
214+
}
215+
newID, err := newEntityCreator[savedSearchSubscriptionMapper](c).createWithTransaction(ctx, txn, req)
216+
if err != nil {
217+
return err
218+
}
219+
id = newID
220+
221+
return nil
222+
})
223+
if err != nil {
224+
return nil, err
225+
}
226+
227+
return id, nil
228+
}
229+
230+
// GetSavedSearchSubscription retrieves a subscription if it belongs to the specified user.
231+
func (c *Client) GetSavedSearchSubscription(
232+
ctx context.Context, subscriptionID string, userID string) (*SavedSearchSubscription, error) {
233+
var ret *SavedSearchSubscription
234+
_, err := c.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
235+
err := c.checkNotificationChannelOwnershipBySubscriptionID(ctx, subscriptionID, userID, txn)
236+
if err != nil {
237+
return err
238+
}
239+
sub, err := newEntityReader[savedSearchSubscriptionMapper,
240+
SavedSearchSubscription, string](c).readRowByKeyWithTransaction(ctx, subscriptionID, txn)
241+
if err != nil {
242+
return err
243+
}
244+
ret = sub
245+
246+
return nil
247+
})
248+
249+
return ret, err
250+
}
251+
252+
// UpdateSavedSearchSubscription updates a subscription if it belongs to the specified user.
253+
func (c *Client) UpdateSavedSearchSubscription(
254+
ctx context.Context, req UpdateSavedSearchSubscriptionRequest) error {
255+
_, err := c.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
256+
err := c.checkNotificationChannelOwnershipBySubscriptionID(ctx, req.ID, req.UserID, txn)
257+
if err != nil {
258+
return err
259+
}
260+
261+
return newEntityWriter[savedSearchSubscriptionMapper](c).updateWithTransaction(ctx, txn, req)
262+
})
263+
264+
return err
265+
}
266+
267+
// removeUserSavedSearchMapper implements removableEntityMapper.
268+
type removeSavedSearchSubscriptionMapper struct {
269+
baseSavedSearchSubscriptionMapper
270+
}
271+
272+
func (m removeSavedSearchSubscriptionMapper) DeleteKey(key string) spanner.Key {
273+
return spanner.Key{key}
274+
}
275+
func (m removeSavedSearchSubscriptionMapper) GetKeyFromExternal(in string) string { return in }
276+
277+
func (m removeSavedSearchSubscriptionMapper) SelectOne(key string) spanner.Statement {
278+
return savedSearchSubscriptionMapper{baseSavedSearchSubscriptionMapper: baseSavedSearchSubscriptionMapper{}}.
279+
SelectOne(key)
280+
}
281+
282+
// DeleteSavedSearchSubscription deletes a subscription if it belongs to the specified user.
283+
func (c *Client) DeleteSavedSearchSubscription(
284+
ctx context.Context, subscriptionID string, userID string) error {
285+
_, err := c.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
286+
err := c.checkNotificationChannelOwnershipBySubscriptionID(ctx, subscriptionID, userID, txn)
287+
if err != nil {
288+
return err
289+
}
290+
291+
return newEntityRemover[removeSavedSearchSubscriptionMapper, string](c).
292+
removeWithTransaction(ctx, txn, subscriptionID)
293+
})
294+
295+
return err
296+
}
297+
298+
// ListSavedSearchSubscriptions retrieves a list of subscriptions for a user with pagination.
299+
func (c *Client) ListSavedSearchSubscriptions(
300+
ctx context.Context, req ListSavedSearchSubscriptionsRequest) ([]SavedSearchSubscription, *string, error) {
301+
return newEntityLister[savedSearchSubscriptionMapper](c).list(ctx, req)
302+
}

0 commit comments

Comments
 (0)