Skip to content

Commit c001dbd

Browse files
committed
add tests, unit and func
1 parent 988ae02 commit c001dbd

File tree

10 files changed

+291
-25
lines changed

10 files changed

+291
-25
lines changed

api/matchingservice/v1/request_response.pb.go

Lines changed: 8 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ require (
6060
go.opentelemetry.io/otel/sdk v1.34.0
6161
go.opentelemetry.io/otel/sdk/metric v1.34.0
6262
go.opentelemetry.io/otel/trace v1.34.0
63-
go.temporal.io/api v1.51.0
63+
go.temporal.io/api v1.51.1-0.20250725001547-f146f460d5c9
6464
go.temporal.io/sdk v1.34.0
6565
go.uber.org/automaxprocs v1.6.0
6666
go.uber.org/fx v1.23.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -399,8 +399,8 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC
399399
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
400400
go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4=
401401
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
402-
go.temporal.io/api v1.51.0 h1:9+e14GrIa7nWoWoudqj/PSwm33yYjV+u8TAR9If7s/g=
403-
go.temporal.io/api v1.51.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
402+
go.temporal.io/api v1.51.1-0.20250725001547-f146f460d5c9 h1:TC5floyNPVF18aZHx10KnjDE1YRAq3sssDMAY7DEq9s=
403+
go.temporal.io/api v1.51.1-0.20250725001547-f146f460d5c9/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
404404
go.temporal.io/sdk v1.34.0 h1:VLg/h6ny7GvLFVoQPqz2NcC93V9yXboQwblkRvZ1cZE=
405405
go.temporal.io/sdk v1.34.0/go.mod h1:iE4U5vFrH3asOhqpBBphpj9zNtw8btp8+MSaf5A0D3w=
406406
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=

proto/internal/temporal/server/api/matchingservice/v1/request_response.proto

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,5 @@ message DescribeWorkerRequest {
616616
}
617617

618618
message DescribeWorkerResponse {
619-
temporal.api.worker.v1.WorkerInfo workers_info = 1;
619+
temporal.api.worker.v1.WorkerInfo worker_info = 1;
620620
}
621-

service/frontend/workflow_handler.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6218,10 +6218,27 @@ func (wh *WorkflowHandler) UpdateWorkerConfig(_ context.Context, request *workfl
62186218
return nil, serviceerror.NewUnimplemented("UpdateWorkerConfig command is not enabled.")
62196219
}
62206220

6221-
func (wh *WorkflowHandler) DescribeWorker(_ context.Context, request *workflowservice.DescribeWorkerRequest,
6221+
func (wh *WorkflowHandler) DescribeWorker(ctx context.Context, request *workflowservice.DescribeWorkerRequest,
62226222
) (*workflowservice.DescribeWorkerResponse, error) {
62236223
if !wh.config.ListWorkersEnabled(request.GetNamespace()) {
62246224
return nil, serviceerror.NewUnimplemented("DescribeWorker command is not enabled.")
62256225
}
6226-
return nil, serviceerror.NewUnimplemented("DescribeWorker command is not enabled.")
6226+
namespaceName := namespace.Name(request.GetNamespace())
6227+
namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespaceName)
6228+
if err != nil {
6229+
return nil, err
6230+
}
6231+
6232+
resp, err := wh.matchingClient.DescribeWorker(ctx, &matchingservice.DescribeWorkerRequest{
6233+
NamespaceId: namespaceID.String(),
6234+
Request: request,
6235+
})
6236+
6237+
if err != nil {
6238+
return nil, err
6239+
}
6240+
6241+
return &workflowservice.DescribeWorkerResponse{
6242+
WorkerInfo: resp.GetWorkerInfo(),
6243+
}, nil
62276244
}

service/matching/handler.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -601,3 +601,19 @@ func (h *Handler) UpdateTaskQueueConfig(
601601
) (*matchingservice.UpdateTaskQueueConfigResponse, error) {
602602
return h.engine.UpdateTaskQueueConfig(ctx, request)
603603
}
604+
605+
func (h *Handler) DescribeWorker(
606+
_ context.Context, request *matchingservice.DescribeWorkerRequest,
607+
) (*matchingservice.DescribeWorkerResponse, error) {
608+
nsID := namespace.ID(request.GetNamespaceId())
609+
hb, err := h.workersRegistry.DescribeWorker(
610+
nsID, request.Request.GetWorkerInstanceKey())
611+
if err != nil {
612+
return nil, err
613+
}
614+
return &matchingservice.DescribeWorkerResponse{
615+
WorkerInfo: &workerpb.WorkerInfo{
616+
WorkerHeartbeat: hb,
617+
},
618+
}, nil
619+
}

service/matching/workers/registry.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,6 @@ type (
99
Registry interface {
1010
RecordWorkerHeartbeats(nsID namespace.ID, workerHeartbeat []*workerpb.WorkerHeartbeat)
1111
ListWorkers(nsID namespace.ID, queue string, nextPageToken []byte) ([]*workerpb.WorkerHeartbeat, error)
12+
DescribeWorker(nsID namespace.ID, workerInstanceKey string) (*workerpb.WorkerHeartbeat, error)
1213
}
1314
)

service/matching/workers/registry_impl.go

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"sync/atomic"
88
"time"
99

10+
"go.temporal.io/api/serviceerror"
1011
workerpb "go.temporal.io/api/worker/v1"
1112
"go.temporal.io/server/common/namespace"
1213
"go.uber.org/fx"
@@ -116,6 +117,23 @@ func (b *bucket) filterWorkers(
116117
return out
117118
}
118119

120+
func (b *bucket) getWorkerHeartbeat(nsID namespace.ID, workerInstanceKey string) (*workerpb.WorkerHeartbeat, error) {
121+
b.mu.Lock()
122+
defer b.mu.Unlock()
123+
124+
mp, ok := b.namespaces[nsID]
125+
if !ok {
126+
return nil, serviceerror.NewNotFoundf("namespace not found: %s", nsID.String())
127+
}
128+
129+
e, exists := mp[workerInstanceKey]
130+
if !exists {
131+
return nil, serviceerror.NewNotFoundf("worker not found: %s", workerInstanceKey)
132+
}
133+
134+
return e.hb, nil
135+
}
136+
119137
// evictByTTL removes entries older than expireBefore from this bucket.
120138
// Returns the number of entries removed.
121139
func (b *bucket) evictByTTL(expireBefore time.Time) int {
@@ -224,7 +242,6 @@ func (m *registryImpl) filterWorkers(
224242
return nil
225243
}
226244
return b.filterWorkers(nsID, predicate)
227-
228245
}
229246

230247
// evictLoop periodically triggers TTL and capacity-based eviction.
@@ -289,17 +306,26 @@ func (m *registryImpl) RecordWorkerHeartbeats(nsID namespace.ID, workerHeartbeat
289306
}
290307

291308
func (m *registryImpl) ListWorkers(nsID namespace.ID, query string, _ []byte) ([]*workerpb.WorkerHeartbeat, error) {
292-
predicate := func(_ *workerpb.WorkerHeartbeat) bool { return true }
293-
if query != "" {
294-
queryEngine, err := newWorkerQueryEngine(nsID.String(), query)
295-
if err != nil {
296-
return nil, err
297-
}
309+
if query == "" {
310+
return m.filterWorkers(nsID, func(_ *workerpb.WorkerHeartbeat) bool { return true }), nil
311+
}
298312

299-
predicate = func(heartbeat *workerpb.WorkerHeartbeat) bool {
300-
result, err := queryEngine.EvaluateWorker(heartbeat)
301-
return err == nil && result
302-
}
313+
queryEngine, err := newWorkerQueryEngine(nsID.String(), query)
314+
if err != nil {
315+
return nil, err
316+
}
317+
318+
predicate := func(heartbeat *workerpb.WorkerHeartbeat) bool {
319+
result, err := queryEngine.EvaluateWorker(heartbeat)
320+
return err == nil && result
303321
}
304322
return m.filterWorkers(nsID, predicate), nil
305323
}
324+
325+
func (m *registryImpl) DescribeWorker(nsID namespace.ID, workerInstanceKey string) (*workerpb.WorkerHeartbeat, error) {
326+
b := m.getBucket(nsID)
327+
if b == nil {
328+
return nil, serviceerror.NewNotFoundf("namespace not found: %s", nsID.String())
329+
}
330+
return b.getWorkerHeartbeat(nsID, workerInstanceKey)
331+
}

service/matching/workers/registry_test.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,3 +194,95 @@ func TestRegistryImpl_ListWorkers(t *testing.T) {
194194
})
195195
}
196196
}
197+
198+
func TestRegistryImpl_DescribeWorker(t *testing.T) {
199+
tests := []struct {
200+
name string
201+
setup func(*registryImpl)
202+
nsID namespace.ID
203+
workerInstanceKey string
204+
expectError bool
205+
}{
206+
{
207+
name: "list workers from non-existent namespace",
208+
setup: func(r *registryImpl) {},
209+
nsID: "non-existent",
210+
workerInstanceKey: "worker",
211+
expectError: true,
212+
},
213+
{
214+
name: "list workers from empty namespace",
215+
setup: func(r *registryImpl) {
216+
},
217+
nsID: "empty-ns",
218+
workerInstanceKey: "worker",
219+
expectError: true,
220+
},
221+
{
222+
name: "list empty worker",
223+
setup: func(r *registryImpl) {
224+
r.upsertHeartbeats("namespace1", []*workerpb.WorkerHeartbeat{{
225+
WorkerInstanceKey: "worker1",
226+
}})
227+
},
228+
nsID: "namespace1",
229+
workerInstanceKey: "",
230+
expectError: true,
231+
},
232+
{
233+
name: "list single worker, doesn't exist",
234+
setup: func(r *registryImpl) {
235+
r.upsertHeartbeats("namespace1", []*workerpb.WorkerHeartbeat{{
236+
WorkerInstanceKey: "worker1",
237+
}})
238+
},
239+
nsID: "namespace1",
240+
workerInstanceKey: "worker2",
241+
expectError: true,
242+
},
243+
{
244+
name: "list single worker",
245+
setup: func(r *registryImpl) {
246+
r.upsertHeartbeats("namespace1", []*workerpb.WorkerHeartbeat{{
247+
WorkerInstanceKey: "worker1",
248+
}})
249+
},
250+
nsID: "namespace1",
251+
workerInstanceKey: "worker1",
252+
},
253+
{
254+
name: "list workers from specific namespace only",
255+
setup: func(r *registryImpl) {
256+
// Setup namespace1
257+
r.upsertHeartbeats("namespace1", []*workerpb.WorkerHeartbeat{{
258+
WorkerInstanceKey: "worker1",
259+
}})
260+
// Setup namespace2
261+
r.upsertHeartbeats("namespace2", []*workerpb.WorkerHeartbeat{{
262+
WorkerInstanceKey: "worker2",
263+
}})
264+
},
265+
nsID: "namespace2",
266+
workerInstanceKey: "worker2",
267+
},
268+
}
269+
270+
for _, tt := range tests {
271+
t.Run(tt.name, func(t *testing.T) {
272+
r := newRegistryImpl(
273+
defaultBuckets, defaultEntryTTL, defaultMinEvictAge, defaultMaxEntries, defaultEvictionInterval,
274+
)
275+
tt.setup(r)
276+
277+
result, err := r.DescribeWorker(tt.nsID, tt.workerInstanceKey)
278+
if tt.expectError {
279+
assert.Error(t, err, "expected an error for non-existent namespace")
280+
assert.Nil(t, result, "result should be nil when an error occurs")
281+
return
282+
}
283+
assert.NoError(t, err, "unexpected error when listing workers")
284+
assert.NotNil(t, result, "result should not be nil when worker exists")
285+
assert.Equal(t, tt.workerInstanceKey, result.WorkerInstanceKey)
286+
})
287+
}
288+
}

0 commit comments

Comments
 (0)