Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 31 additions & 12 deletions cmd/gc/build_desired_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -1758,8 +1758,11 @@ func sessionBeadConfigAgent(cfgAgent *config.Agent, qualifiedName string) *confi
return &instanceAgent
}

func claimPoolSlot(cfgAgent *config.Agent, sessionBead beads.Bead, used map[int]bool) int {
if slot := existingPoolSlot(cfgAgent, sessionBead); slot > 0 && !used[slot] {
func claimPoolSlotWithConfig(cfg *config.City, cfgAgent *config.Agent, sessionBead beads.Bead, used map[int]bool) int {
if slot := existingPoolSlotWithConfig(cfg, cfgAgent, sessionBead); slot > 0 {
if used[slot] {
return 0
}
used[slot] = true
return slot
}
Expand Down Expand Up @@ -1856,6 +1859,16 @@ func inBoundsPoolSlot(cfgAgent *config.Agent, slot int) bool {
return true
}

func usablePoolIdentitySlot(cfgAgent *config.Agent, slot int) bool {
if slot <= 0 {
return false
}
if !poolSlotHasConfiguredBound(cfgAgent) {
return true
}
return inBoundsPoolSlot(cfgAgent, slot)
}

func existingPoolSlotWithConfig(cfg *config.City, cfgAgent *config.Agent, sessionBead beads.Bead) int {
if cfgAgent == nil {
return 0
Expand All @@ -1869,20 +1882,20 @@ func existingPoolSlotWithConfig(cfg *config.City, cfgAgent *config.Agent, sessio
}
if sessionBead.Metadata["pool_slot"] != "" {
if slot, err := strconv.Atoi(strings.TrimSpace(sessionBead.Metadata["pool_slot"])); err == nil && slot > 0 {
if agentSlot > 0 && agentSlot == aliasSlot && agentSlot != slot {
if agentSlot > 0 && agentSlot != slot && usablePoolIdentitySlot(cfgAgent, agentSlot) {
return agentSlot
}
if !storedTemplateMatches && agentSlot == 0 && aliasSlot == 0 {
return 0
}
if !inBoundsPoolSlot(cfgAgent, slot) {
if agentSlot > 0 {
if usablePoolIdentitySlot(cfgAgent, agentSlot) {
return agentSlot
}
if aliasSlot > 0 {
if usablePoolIdentitySlot(cfgAgent, aliasSlot) {
return aliasSlot
}
if sessionNameSlot > 0 {
if usablePoolIdentitySlot(cfgAgent, sessionNameSlot) {
return sessionNameSlot
}
if poolSlotHasConfiguredBound(cfgAgent) {
Expand All @@ -1893,13 +1906,13 @@ func existingPoolSlotWithConfig(cfg *config.City, cfgAgent *config.Agent, sessio
}
}
if poolSlotHasConfiguredBound(cfgAgent) {
if agentSlot > 0 && !inBoundsPoolSlot(cfgAgent, agentSlot) {
if !usablePoolIdentitySlot(cfgAgent, agentSlot) {
agentSlot = 0
}
if aliasSlot > 0 && !inBoundsPoolSlot(cfgAgent, aliasSlot) {
if !usablePoolIdentitySlot(cfgAgent, aliasSlot) {
aliasSlot = 0
}
if sessionNameSlot > 0 && !inBoundsPoolSlot(cfgAgent, sessionNameSlot) {
if !usablePoolIdentitySlot(cfgAgent, sessionNameSlot) {
sessionNameSlot = 0
}
}
Expand Down Expand Up @@ -1943,7 +1956,10 @@ func selectOrCreatePoolSessionBead(
}
// Resume tier: reuse the session that has in-progress work assigned.
if preferred != nil && preferred.ID != "" && !used[preferred.ID] && !isFailedCreateSessionBead(*preferred) {
slot := claimPoolSlot(cfgAgent, *preferred, usedSlots)
slot := claimPoolSlotWithConfig(bp.city, cfgAgent, *preferred, usedSlots)
if slot == 0 {
return beads.Bead{}, 0, fmt.Errorf("pool session %s concrete slot already claimed", preferred.ID)
}
return *preferred, slot, nil
}
// Reuse an existing active/creating session bead. Skip drained, closed,
Expand Down Expand Up @@ -1978,11 +1994,14 @@ func selectOrCreatePoolSessionBead(
continue
}
if desiredName := strings.TrimSpace(bead.Metadata["session_name"]); desiredName != "" {
slot := claimPoolSlot(cfgAgent, bead, usedSlots)
slot := claimPoolSlotWithConfig(bp.city, cfgAgent, bead, usedSlots)
if slot == 0 {
continue
}
return bead, slot, nil
}
}
slot := claimPoolSlot(cfgAgent, beads.Bead{}, usedSlots)
slot := claimPoolSlotWithConfig(bp.city, cfgAgent, beads.Bead{}, usedSlots)
_, qualifiedInstance := poolInstanceIdentity(cfgAgent, slot, bp.stderr)
bead, err := createPoolSessionBeadWithGuardedAlias(bp, template, qualifiedInstance, slot)
if err != nil {
Expand Down
109 changes: 104 additions & 5 deletions cmd/gc/build_desired_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4550,6 +4550,26 @@ func TestBuildDesiredState_DoesNotPreserveOutOfBoundsBoundedPoolSlotWithoutIdent
}
}

func TestBuildDesiredState_PrefersInBoundsPoolSlotOverOutOfBoundsAgentName(t *testing.T) {
cfg := &config.City{
Agents: []config.Agent{
{Name: "worker", Dir: "frontend", MaxActiveSessions: intPtr(5)},
},
}
cfgAgent := &cfg.Agents[0]
bead := beads.Bead{
Metadata: map[string]string{
"template": "frontend/worker",
"pool_slot": "2",
"agent_name": "frontend/worker-99",
},
}

if slot := existingPoolSlotWithConfig(cfg, cfgAgent, bead); slot != 2 {
t.Fatalf("existingPoolSlotWithConfig(in-bounds pool_slot, out-of-bounds agent_name) = %d, want 2", slot)
}
}

func TestBuildDesiredState_DoesNotRecoverOutOfBoundsAliasOnlyBoundedPoolSlot(t *testing.T) {
cityPath := t.TempDir()
store := beads.NewMemStore()
Expand Down Expand Up @@ -4578,7 +4598,7 @@ func TestBuildDesiredState_DoesNotRecoverOutOfBoundsAliasOnlyBoundedPoolSlot(t *
}
}

func TestClaimPoolSlot_PreservesStampedOutOfBoundsLiveIdentity(t *testing.T) {
func TestExistingPoolSlot_PreservesStampedOutOfBoundsLiveIdentity(t *testing.T) {
cfgAgent := &config.Agent{Name: "worker", Dir: "frontend", MaxActiveSessions: intPtr(5)}
bead := beads.Bead{
Metadata: map[string]string{
Expand All @@ -4591,10 +4611,6 @@ func TestClaimPoolSlot_PreservesStampedOutOfBoundsLiveIdentity(t *testing.T) {
if slot := existingPoolSlot(cfgAgent, bead); slot != 7 {
t.Fatalf("existingPoolSlot(stamped live slot) = %d, want 7", slot)
}
used := map[int]bool{}
if slot := claimPoolSlot(cfgAgent, bead, used); slot != 7 {
t.Fatalf("claimPoolSlot(stamped live slot) = %d, want 7", slot)
}
}

func TestBuildDesiredState_DoesNotCreateDuplicatePoolBeadForDiscoveredSession(t *testing.T) {
Expand Down Expand Up @@ -4965,6 +4981,89 @@ func TestSelectOrCreatePoolSessionBead_SkipsDrained(t *testing.T) {
}
}

func TestSelectOrCreatePoolSessionBead_PrefersConcreteAgentSlotOverStalePoolMetadata(t *testing.T) {
store := beads.NewMemStore()
poisoned, err := store.Create(beads.Bead{
Title: "frontend/worker",
Type: sessionBeadType,
Labels: []string{sessionBeadLabel},
Metadata: map[string]string{
"template": "frontend/worker",
"agent_name": "frontend/worker-3",
"alias": "backend/worker-4",
"pool_slot": "4",
"session_name": "s-poisoned",
"pool_managed": "true",
"session_origin": "ephemeral",
"state": "asleep",
},
})
if err != nil {
t.Fatal(err)
}
cfg := &config.City{Agents: []config.Agent{
{Dir: "frontend", Name: "worker", MinActiveSessions: intPtr(0), MaxActiveSessions: intPtr(10)},
{Dir: "backend", Name: "worker", MinActiveSessions: intPtr(0), MaxActiveSessions: intPtr(10)},
}}
cfgAgent := &cfg.Agents[0]
bp := &agentBuildParams{
city: cfg,
beadStore: store,
sessionBeads: newSessionBeadSnapshot([]beads.Bead{poisoned}),
agents: cfg.Agents,
}

result, slot, err := selectOrCreatePoolSessionBead(bp, cfgAgent, "frontend/worker", &poisoned, map[string]bool{}, map[int]bool{})
if err != nil {
t.Fatalf("selectOrCreatePoolSessionBead: %v", err)
}
if result.ID != poisoned.ID {
t.Fatalf("selected bead %q, want poisoned preferred bead %q", result.ID, poisoned.ID)
}
if slot != 3 {
t.Fatalf("slot = %d, want concrete agent_name slot 3 over stale pool_slot/alias", slot)
}
}

func TestSelectOrCreatePoolSessionBead_DoesNotRetagDuplicateConcreteSlot(t *testing.T) {
store := beads.NewMemStore()
duplicate, err := store.Create(beads.Bead{
Title: "kimi",
Type: sessionBeadType,
Labels: []string{sessionBeadLabel},
Metadata: map[string]string{
"template": "kimi",
"agent_name": "kimi-9",
"alias": "kimi-15",
"pool_slot": "9",
"session_name": "workflows__kimi-mc-duplicate",
"pool_managed": "true",
"session_origin": "ephemeral",
"state": "creating",
},
})
if err != nil {
t.Fatal(err)
}
cfg := &config.City{Agents: []config.Agent{
{Name: "kimi", MinActiveSessions: intPtr(0), MaxActiveSessions: intPtr(20)},
}}
bp := &agentBuildParams{
city: cfg,
beadStore: store,
sessionBeads: newSessionBeadSnapshot([]beads.Bead{duplicate}),
agents: cfg.Agents,
}

_, _, err = selectOrCreatePoolSessionBead(bp, &cfg.Agents[0], "kimi", &duplicate, map[string]bool{}, map[int]bool{9: true})
if err == nil {
t.Fatal("selectOrCreatePoolSessionBead returned nil error, want duplicate slot rejection")
}
if !strings.Contains(err.Error(), "concrete slot already claimed") {
t.Fatalf("error = %v, want concrete slot already claimed", err)
}
}

func TestSelectOrCreatePoolSessionBead_DoesNotReserveFreshSlotOnCreateError(t *testing.T) {
store := &failingPoolSessionNameStore{MemStore: beads.NewMemStore()}
snapshot := &sessionBeadSnapshot{}
Expand Down
8 changes: 7 additions & 1 deletion cmd/gc/pool_desired_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func computePoolDesiredStates(
if sb.Status == "closed" {
continue
}
template := strings.TrimSpace(sb.Metadata["template"])
template := strings.TrimSpace(normalizedSessionTemplate(sb, cfg))
if template != "" {
sessionBeadTemplate[sb.ID] = template
}
Expand Down Expand Up @@ -137,6 +137,12 @@ func computePoolDesiredStates(
routedTo = cfg.Agents[0].QualifiedName()
}
}
if sessionBeadID != "" {
sessionTemplate := strings.TrimSpace(sessionBeadTemplate[sessionBeadID])
if sessionTemplate != "" && routedTo != "" && routedTo != sessionTemplate {
continue
}
}
if routedTo != template {
continue
}
Expand Down
66 changes: 66 additions & 0 deletions cmd/gc/pool_desired_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,72 @@ func TestComputePoolDesiredStates_InFlightResumeBeadsDoNotConsumeNewDemand(t *te
}
}

func TestComputePoolDesiredStates_DoesNotResumeSessionAcrossExplicitRouteMismatch(t *testing.T) {
cfg := &config.City{
Agents: []config.Agent{
poolAgent("codex-max", "", intPtr(10), 0),
poolAgent("codex-min", "", intPtr(10), 0),
},
}
session := beads.Bead{
ID: "mc-codex-max",
Status: "open",
Type: sessionBeadType,
Labels: []string{sessionBeadLabel, "template:codex-max"},
Metadata: map[string]string{
"template": "codex-max",
"session_name": "workflows__codex-max-mc-codex-max",
"state": "asleep",
},
}
work := []beads.Bead{
workBead("w-mismatched-route", "codex-min", "workflows__codex-max-mc-codex-max", "in_progress", 5),
}

result := ComputePoolDesiredStates(cfg, work, []beads.Bead{session}, nil)

for _, state := range result {
for _, req := range state.Requests {
if req.SessionBeadID == session.ID {
t.Fatalf("mismatched routed work produced resume request under %q: %+v", state.Template, req)
}
}
}
}

func TestComputePoolDesiredStates_DoesNotResumeLegacySessionAcrossExplicitRouteMismatch(t *testing.T) {
cfg := &config.City{
Agents: []config.Agent{
poolAgent("codex-max", "", intPtr(10), 0),
poolAgent("codex-min", "", intPtr(10), 0),
},
}
session := beads.Bead{
ID: "mc-codex-max",
Status: "open",
Type: sessionBeadType,
Labels: []string{sessionBeadLabel},
Metadata: map[string]string{
"agent_name": "codex-max-1",
"session_name": "workflows__codex-max-mc-codex-max",
"state": "asleep",
},
}
work := []beads.Bead{
workBead("w-mismatched-route", "codex-min", "workflows__codex-max-mc-codex-max", "in_progress", 5),
}

result := ComputePoolDesiredStates(cfg, work, []beads.Bead{session}, nil)

for _, state := range result {
for _, req := range state.Requests {
if req.SessionBeadID == session.ID {
t.Fatalf("legacy mismatched routed work produced resume request under %q: %+v", state.Template, req)
}
}
}
}

func TestComputePoolDesiredStates_InFlightPredicateBranches(t *testing.T) {
cfg := &config.City{
Agents: []config.Agent{poolAgent("claude", "", intPtr(10), 0)},
Expand Down
3 changes: 2 additions & 1 deletion cmd/gc/session_beads.go
Original file line number Diff line number Diff line change
Expand Up @@ -1319,7 +1319,8 @@ func syncSessionBeadsWithSnapshotAndRigStores(
}
if err != nil {
recordAliasConflict()
if needsManagedPoolAliasValidation {
if needsManagedPoolAliasValidation ||
(isManagedPool && strings.TrimSpace(b.Metadata["alias"]) != "" && strings.TrimSpace(b.Metadata["alias"]) != managedAlias) {
queueMeta("alias", "")
}
fmt.Fprintf(stderr, "session beads: alias %q for %s unavailable: %v\n", managedAlias, agentName, err) //nolint:errcheck
Expand Down
Loading
Loading