diff --git a/Makefile b/Makefile index b91bee146ab..7f64c232d57 100644 --- a/Makefile +++ b/Makefile @@ -340,6 +340,10 @@ unit-test-go: $(GOTESTSUM) unit-test-go-coverpkg: $(GOTESTSUM) $(GOTESTSUM) --format pkgname --junitfile report.xml -- -coverpkg=./... -coverprofile=cover_coverpkg.out ./... +.PHONY: test-go-race +test-go-race: + go test -count=50 -race ./... + .PHONY: fmt fmt: $(GOLANGCI_LINT) ## Format Go source files using golangci-lint formatters (gci, gofumpt) $(GOLANGCI_LINT) fmt diff --git a/pkg/database/cosmosdb/openshiftcluster_ext.go b/pkg/database/cosmosdb/openshiftcluster_ext.go deleted file mode 100644 index 9d80800e118..00000000000 --- a/pkg/database/cosmosdb/openshiftcluster_ext.go +++ /dev/null @@ -1,17 +0,0 @@ -package cosmosdb - -// Copyright (c) Microsoft Corporation. -// Licensed under the Apache License 2.0. - -// AllIteratorsConsumed returns whether all fake changefeeds have consumed their -// full contents -func (c *FakeOpenShiftClusterDocumentClient) AllIteratorsConsumed() bool { - c.lock.Lock() - defer c.lock.Unlock() - for _, i := range c.changeFeedIterators { - if !i.done { - return false - } - } - return true -} diff --git a/pkg/database/cosmosdb/subscriptions_ext.go b/pkg/database/cosmosdb/subscriptions_ext.go deleted file mode 100644 index c73bdba9b97..00000000000 --- a/pkg/database/cosmosdb/subscriptions_ext.go +++ /dev/null @@ -1,17 +0,0 @@ -package cosmosdb - -// Copyright (c) Microsoft Corporation. -// Licensed under the Apache License 2.0. - -// AllIteratorsConsumed returns whether all fake changefeeds have consumed their -// full contents -func (c *FakeSubscriptionDocumentClient) AllIteratorsConsumed() bool { - c.lock.Lock() - defer c.lock.Unlock() - for _, i := range c.changeFeedIterators { - if !i.done { - return false - } - } - return true -} diff --git a/pkg/monitor/test_helpers.go b/pkg/monitor/test_helpers.go index 40aebfab899..0156b2e311f 100644 --- a/pkg/monitor/test_helpers.go +++ b/pkg/monitor/test_helpers.go @@ -32,26 +32,24 @@ var fakeClusterVisitMonitoringAttempts = map[string]*int{} // TestEnvironment contains all the test setup components type TestEnvironment struct { - OpenShiftClusterDB database.OpenShiftClusters - SubscriptionsDB database.Subscriptions - MonitorsDB database.Monitors - OpenShiftClusterClient *cosmosdb.FakeOpenShiftClusterDocumentClient - SubscriptionsClient *cosmosdb.FakeSubscriptionDocumentClient - FakeMonitorsDBClient *cosmosdb.FakeMonitorDocumentClient - Controller *gomock.Controller - TestLogger *logrus.Entry - Dialer *mock_proxy.MockDialer - MockEnv *mock_env.MockInterface - NoopMetricsEmitter noop.Noop - NoopClusterMetrics noop.Noop - DBGroup monitorDBs + OpenShiftClusterDB database.OpenShiftClusters + SubscriptionsDB database.Subscriptions + MonitorsDB database.Monitors + FakeMonitorsDBClient *cosmosdb.FakeMonitorDocumentClient + Controller *gomock.Controller + TestLogger *logrus.Entry + Dialer *mock_proxy.MockDialer + MockEnv *mock_env.MockInterface + NoopMetricsEmitter noop.Noop + NoopClusterMetrics noop.Noop + DBGroup monitorDBs } // SetupTestEnvironment creates a common test environment for monitor tests func SetupTestEnvironment(t *testing.T) *TestEnvironment { // Create databases - openShiftClusterDB, openShiftClusterClient := testdatabase.NewFakeOpenShiftClusters() - subscriptionsDB, subscriptionsClient := testdatabase.NewFakeSubscriptions() + openShiftClusterDB, _ := testdatabase.NewFakeOpenShiftClusters() + subscriptionsDB, _ := testdatabase.NewFakeSubscriptions() monitorsDB, fakeMonitorsDBClient := testdatabase.NewFakeMonitors() // Create mocks @@ -85,19 +83,17 @@ func SetupTestEnvironment(t *testing.T) *TestEnvironment { f.Create() return &TestEnvironment{ - OpenShiftClusterDB: openShiftClusterDB, - SubscriptionsDB: subscriptionsDB, - MonitorsDB: monitorsDB, - OpenShiftClusterClient: openShiftClusterClient, - SubscriptionsClient: subscriptionsClient, - FakeMonitorsDBClient: fakeMonitorsDBClient, - Controller: ctrl, - TestLogger: testlogger, - Dialer: dialer, - MockEnv: mockEnv, - NoopMetricsEmitter: noopMetricsEmitter, - NoopClusterMetrics: noopClusterMetricsEmitter, - DBGroup: dbs, + OpenShiftClusterDB: openShiftClusterDB, + SubscriptionsDB: subscriptionsDB, + MonitorsDB: monitorsDB, + FakeMonitorsDBClient: fakeMonitorsDBClient, + Controller: ctrl, + TestLogger: testlogger, + Dialer: dialer, + MockEnv: mockEnv, + NoopMetricsEmitter: noopMetricsEmitter, + NoopClusterMetrics: noopClusterMetricsEmitter, + DBGroup: dbs, } } diff --git a/pkg/monitor/worker_test.go b/pkg/monitor/worker_test.go index 4b1e179e918..540cb0b079f 100644 --- a/pkg/monitor/worker_test.go +++ b/pkg/monitor/worker_test.go @@ -138,13 +138,44 @@ func TestChangefeedOperations(t *testing.T) { } } - // Wait for changefeeds to be consumed - assert.Eventually(t, env.OpenShiftClusterClient.AllIteratorsConsumed, time.Second, 10*time.Millisecond) - assert.Eventually(t, env.SubscriptionsClient.AllIteratorsConsumed, time.Second, 10*time.Millisecond) + // Require two GetLastProcessed() advancements; one sweep may fire OnAllPendingProcessed() before the mutation is visible. + beforeCluster, _ := mon.lastClusterChangefeed.Load().(time.Time) + beforeSubs, _ := mon.subs.GetLastProcessed() + + seenClusterAdvance := false + assert.Eventually(t, func() bool { + ts, ok := mon.lastClusterChangefeed.Load().(time.Time) + if !ok || !ts.After(beforeCluster) { + return false + } + if !seenClusterAdvance { + seenClusterAdvance = true + beforeCluster = ts + return false + } + return true + }, time.Second, 10*time.Millisecond) + + seenSubsAdvance := false + assert.Eventually(t, func() bool { + last, ok := mon.subs.GetLastProcessed() + if !ok || !last.After(beforeSubs) { + return false + } + if !seenSubsAdvance { + seenSubsAdvance = true + beforeSubs = last + return false + } + return true + }, time.Second, 10*time.Millisecond) // Validate expected results - if len(mon.docs) != op.expectDocs { - t.Errorf("%s: expected %d documents in cache, got %d", op.name, op.expectDocs, len(mon.docs)) + mon.mu.RLock() + nDocs := len(mon.docs) + mon.mu.RUnlock() + if nDocs != op.expectDocs { + t.Errorf("%s: expected %d documents in cache, got %d", op.name, op.expectDocs, nDocs) } if mon.subs.GetCacheSize() != op.expectSubs { t.Errorf("%s: expected %d subscriptions in cache, got %d", op.name, op.expectSubs, mon.subs.GetCacheSize()) diff --git a/pkg/util/changefeed/subscriptioncache_test.go b/pkg/util/changefeed/subscriptioncache_test.go index 4e35cdd884c..45d3979717f 100644 --- a/pkg/util/changefeed/subscriptioncache_test.go +++ b/pkg/util/changefeed/subscriptioncache_test.go @@ -70,7 +70,7 @@ func TestSubscriptionChangefeed(t *testing.T) { for _, tC := range testCases { t.Run(tC.desc, func(t *testing.T) { startedTime := time.Now().UnixNano() - subscriptionsDB, subscriptionsClient := testdatabase.NewFakeSubscriptions() + subscriptionsDB, _ := testdatabase.NewFakeSubscriptions() _, log := testlog.LogForTesting(t) // need to register the changefeed before making documents @@ -147,7 +147,6 @@ func TestSubscriptionChangefeed(t *testing.T) { go RunChangefeed(t.Context(), log, subscriptionChangefeed, 100*time.Microsecond, 1, cache, stop) cache.WaitForInitialPopulation() - assert.Eventually(t, subscriptionsClient.AllIteratorsConsumed, time.Second, 1*time.Millisecond) // Create some after initially populated _, err := subscriptionsDB.Create(t.Context(), &api.SubscriptionDocument{ @@ -185,7 +184,21 @@ func TestSubscriptionChangefeed(t *testing.T) { }, }) require.NoError(t, err) - assert.Eventually(t, subscriptionsClient.AllIteratorsConsumed, time.Second, 1*time.Millisecond) + // Require two GetLastProcessed() advancements; one sweep may fire OnAllPendingProcessed() before the mutation is visible. + seen := false + before, _ := cache.GetLastProcessed() + assert.Eventually(t, func() bool { + last, ok := cache.GetLastProcessed() + if !ok || !last.After(before) { + return false + } + if !seen { + seen = true + before = last + return false + } + return true + }, time.Second, 1*time.Millisecond) // Switch a registered to suspended old2, err := subscriptionsDB.Get(t.Context(), "8c90b62a-3783-4ea6-a8c8-cbaee4667ffd") @@ -201,7 +214,20 @@ func TestSubscriptionChangefeed(t *testing.T) { }, }) require.NoError(t, err) - assert.Eventually(t, subscriptionsClient.AllIteratorsConsumed, time.Second, 1*time.Millisecond) + seen = false + before, _ = cache.GetLastProcessed() + assert.Eventually(t, func() bool { + last, ok := cache.GetLastProcessed() + if !ok || !last.After(before) { + return false + } + if !seen { + seen = true + before = last + return false + } + return true + }, time.Second, 1*time.Millisecond) // Switch a registered to deleted old3, err := subscriptionsDB.Get(t.Context(), "4e07b0f5-c789-4817-9079-94012b04e1c9") @@ -217,7 +243,20 @@ func TestSubscriptionChangefeed(t *testing.T) { }, }) require.NoError(t, err) - assert.Eventually(t, subscriptionsClient.AllIteratorsConsumed, time.Second, 1*time.Millisecond) + seen = false + before, _ = cache.GetLastProcessed() + assert.Eventually(t, func() bool { + last, ok := cache.GetLastProcessed() + if !ok || !last.After(before) { + return false + } + if !seen { + seen = true + before = last + return false + } + return true + }, time.Second, 1*time.Millisecond) // Validate the expected cache contents assert.Equal(t, tC.expected, maps.Collect(cache.subs.All()))