diff --git a/cmd/gc/adoption_barrier.go b/cmd/gc/adoption_barrier.go index cff85d39b..cc7608b91 100644 --- a/cmd/gc/adoption_barrier.go +++ b/cmd/gc/adoption_barrier.go @@ -51,6 +51,7 @@ var poolSlotPattern = regexp.MustCompile(`-(\d+)$`) // Returns the adoption result and whether the barrier passed (all running // sessions have beads). func runAdoptionBarrier( + cityPath string, store beads.Store, sp runtime.Provider, cfg *config.City, @@ -204,12 +205,30 @@ func runAdoptionBarrier( continue } - _, createErr := store.Create(beads.Bead{ - Title: detail.AgentName, - Type: sessionBeadType, - Labels: []string{sessionBeadLabel, "agent:" + detail.AgentName}, - Metadata: meta, + alreadyHadBead := false + createErr := sessionpkg.WithCitySessionIdentifierLocks(cityPath, []string{sessionName, detail.AgentName}, func() error { + hasBead, err := openSessionBeadExists(store, sessionName) + if err != nil { + return err + } + if hasBead { + alreadyHadBead = true + return nil + } + _, err = store.Create(beads.Bead{ + Title: detail.AgentName, + Type: sessionBeadType, + Labels: []string{sessionBeadLabel, "agent:" + detail.AgentName}, + Metadata: meta, + }) + return err }) + if alreadyHadBead { + result.AlreadyHadBead++ + detail.HasBead = true + result.Details = append(result.Details, detail) + continue + } if createErr != nil { fmt.Fprintf(stderr, "adoption barrier: creating bead for %s: %v\n", sessionName, createErr) //nolint:errcheck result.Skipped++ @@ -224,6 +243,26 @@ func runAdoptionBarrier( return result, passed } +func openSessionBeadExists(store beads.Store, sessionName string) (bool, error) { + existing, err := store.List(beads.ListQuery{ + Label: sessionBeadLabel, + Metadata: map[string]string{"session_name": sessionName}, + Live: true, + }) + if err != nil { + return false, fmt.Errorf("listing session beads for %q: %w", sessionName, err) + } + for _, b := range existing { + if b.Status == "closed" { + continue + } + if sessionpkg.IsSessionBeadOrRepairable(b) { + return true, nil + } + } + return false, nil +} + // resolvePoolBase attempts to match a pool instance session name back to its // base template agent. It strips the numeric suffix (e.g., "worker-3" -> "worker") // and checks whether the resulting base name corresponds to a configured agent. diff --git a/cmd/gc/adoption_barrier_test.go b/cmd/gc/adoption_barrier_test.go index 1a8f07f70..163b6f266 100644 --- a/cmd/gc/adoption_barrier_test.go +++ b/cmd/gc/adoption_barrier_test.go @@ -9,6 +9,7 @@ import ( "github.com/gastownhall/gascity/internal/clock" "github.com/gastownhall/gascity/internal/config" "github.com/gastownhall/gascity/internal/runtime" + "github.com/gastownhall/gascity/internal/session" ) // fakeAdoptionProvider implements runtime.Provider for adoption barrier tests. @@ -19,6 +20,42 @@ type fakeAdoptionProvider struct { listErr error } +type adoptionLockProbeStore struct { + beads.Store + + targetSessionName string + listed chan struct{} + createAttempted chan struct{} + allowCreate <-chan struct{} +} + +func (s *adoptionLockProbeStore) List(query beads.ListQuery) ([]beads.Bead, error) { + result, err := s.Store.List(query) + if query.Label == sessionBeadLabel { + select { + case s.listed <- struct{}{}: + default: + } + } + return result, err +} + +func (s *adoptionLockProbeStore) Create(b beads.Bead) (beads.Bead, error) { + if b.Type == sessionBeadType && b.Metadata["session_name"] == s.targetSessionName { + select { + case s.createAttempted <- struct{}{}: + default: + } + <-s.allowCreate + } + return s.Store.Create(b) +} + +type adoptionBarrierOutcome struct { + result adoptionResult + passed bool +} + func (f *fakeAdoptionProvider) ListRunning(_ string) ([]string, error) { return f.running, f.listErr } @@ -51,7 +88,7 @@ func TestAdoptionBarrier_NoRunning(t *testing.T) { cfg := &config.City{} var stderr bytes.Buffer - result, passed := runAdoptionBarrier(store, sp, cfg, "test-city", clock.Real{}, &stderr, false) + result, passed := runAdoptionBarrier("", store, sp, cfg, "test-city", clock.Real{}, &stderr, false) if !passed { t.Error("barrier should pass with no running sessions") } @@ -69,7 +106,7 @@ func TestAdoptionBarrier_PartialListUsesVisibleSessionsButFailsBarrier(t *testin cfg := &config.City{Agents: []config.Agent{{Name: "worker"}}} var stderr bytes.Buffer - result, passed := runAdoptionBarrier(store, sp, cfg, "test-city", clock.Real{}, &stderr, false) + result, passed := runAdoptionBarrier("", store, sp, cfg, "test-city", clock.Real{}, &stderr, false) if passed { t.Fatal("barrier should fail closed on partial session listing") } @@ -93,7 +130,7 @@ func TestAdoptionBarrier_AdoptsRunning(t *testing.T) { var stderr bytes.Buffer clk := &clock.Fake{Time: time.Date(2026, 3, 8, 12, 0, 0, 0, time.UTC)} - result, passed := runAdoptionBarrier(store, sp, cfg, "test-city", clk, &stderr, false) + result, passed := runAdoptionBarrier("", store, sp, cfg, "test-city", clk, &stderr, false) if !passed { t.Errorf("barrier should pass, stderr: %s", stderr.String()) } @@ -155,7 +192,7 @@ func TestAdoptionBarrier_SkipsExistingBead(t *testing.T) { } var stderr bytes.Buffer - result, passed := runAdoptionBarrier(store, sp, cfg, "test-city", clock.Real{}, &stderr, false) + result, passed := runAdoptionBarrier("", store, sp, cfg, "test-city", clock.Real{}, &stderr, false) if !passed { t.Error("barrier should pass") } @@ -190,7 +227,7 @@ func TestAdoptionBarrier_ClosedBeadDoesNotBlock(t *testing.T) { cfg := &config.City{Agents: []config.Agent{{Name: "mayor", MaxActiveSessions: intPtr(1)}}} var stderr bytes.Buffer - result, passed := runAdoptionBarrier(store, sp, cfg, "test-city", clock.Real{}, &stderr, false) + result, passed := runAdoptionBarrier("", store, sp, cfg, "test-city", clock.Real{}, &stderr, false) if !passed { t.Error("barrier should pass") } @@ -206,13 +243,13 @@ func TestAdoptionBarrier_Rerunnable(t *testing.T) { var stderr bytes.Buffer // First run: adopts. - r1, _ := runAdoptionBarrier(store, sp, cfg, "test-city", clock.Real{}, &stderr, false) + r1, _ := runAdoptionBarrier("", store, sp, cfg, "test-city", clock.Real{}, &stderr, false) if r1.Adopted != 1 { t.Fatalf("first run Adopted = %d, want 1", r1.Adopted) } // Second run: dedup prevents duplicates. - r2, passed := runAdoptionBarrier(store, sp, cfg, "test-city", clock.Real{}, &stderr, false) + r2, passed := runAdoptionBarrier("", store, sp, cfg, "test-city", clock.Real{}, &stderr, false) if !passed { t.Error("second run: barrier should pass") } @@ -224,6 +261,89 @@ func TestAdoptionBarrier_Rerunnable(t *testing.T) { } } +func TestAdoptionBarrier_SerializesCreateWithSessionIdentifierLock(t *testing.T) { + const sessionName = "worker-3" + const agentName = "worker-3" + cityPath := t.TempDir() + baseStore := beads.NewMemStore() + allowCreate := make(chan struct{}) + store := &adoptionLockProbeStore{ + Store: baseStore, + targetSessionName: sessionName, + listed: make(chan struct{}, 1), + createAttempted: make(chan struct{}, 1), + allowCreate: allowCreate, + } + sp := &fakeAdoptionProvider{running: []string{sessionName}} + cfg := &config.City{Agents: []config.Agent{{Name: "worker", MinActiveSessions: intPtr(1), MaxActiveSessions: intPtr(5)}}} + var stderr bytes.Buffer + done := make(chan adoptionBarrierOutcome, 1) + + err := session.WithCitySessionAliasLock(cityPath, agentName, func() error { + go func() { + result, passed := runAdoptionBarrier(cityPath, store, sp, cfg, "test-city", clock.Real{}, &stderr, false) + done <- adoptionBarrierOutcome{result: result, passed: passed} + }() + + select { + case <-store.listed: + case <-time.After(time.Second): + t.Fatal("adoption barrier did not list existing session beads") + } + + select { + case <-store.createAttempted: + close(allowCreate) + outcome := <-done + t.Fatalf("adoption barrier created while session_name lock was held; outcome=%+v stderr=%q", outcome, stderr.String()) + case <-time.After(100 * time.Millisecond): + } + + _, createErr := baseStore.Create(beads.Bead{ + Title: agentName, + Type: sessionBeadType, + Labels: []string{sessionBeadLabel, "agent:" + agentName}, + Metadata: map[string]string{ + "agent_name": agentName, + "session_name": sessionName, + "state": "active", + }, + }) + return createErr + }) + if err != nil { + t.Fatalf("holding session identifier lock: %v", err) + } + close(allowCreate) + + var outcome adoptionBarrierOutcome + select { + case outcome = <-done: + case <-time.After(time.Second): + t.Fatal("adoption barrier did not finish after session_name lock released") + } + if !outcome.passed { + t.Fatalf("barrier should pass, stderr: %s", stderr.String()) + } + if outcome.result.Adopted != 0 { + t.Fatalf("Adopted = %d, want 0 after locked peer created the bead", outcome.result.Adopted) + } + if outcome.result.AlreadyHadBead != 1 { + t.Fatalf("AlreadyHadBead = %d, want 1", outcome.result.AlreadyHadBead) + } + + beadList, err := baseStore.ListByLabel(sessionBeadLabel, 0) + if err != nil { + t.Fatalf("listing session beads: %v", err) + } + if len(beadList) != 1 { + t.Fatalf("session bead count = %d, want 1", len(beadList)) + } + if got := beadList[0].Metadata["session_name"]; got != sessionName { + t.Fatalf("session_name = %q, want %q", got, sessionName) + } +} + func TestAdoptionBarrier_DryRun(t *testing.T) { store := beads.NewMemStore() sp := &fakeAdoptionProvider{running: []string{"test-city-mayor", "test-city-worker"}} @@ -235,7 +355,7 @@ func TestAdoptionBarrier_DryRun(t *testing.T) { } var stderr bytes.Buffer - result, passed := runAdoptionBarrier(store, sp, cfg, "test-city", clock.Real{}, &stderr, true) + result, passed := runAdoptionBarrier("", store, sp, cfg, "test-city", clock.Real{}, &stderr, true) if !passed { t.Error("dry run barrier should pass") } @@ -267,7 +387,7 @@ func TestAdoptionBarrier_SkipsDeadSessions(t *testing.T) { } var stderr bytes.Buffer - result, passed := runAdoptionBarrier(store, sp, cfg, "test-city", clock.Real{}, &stderr, false) + result, passed := runAdoptionBarrier("", store, sp, cfg, "test-city", clock.Real{}, &stderr, false) if !passed { t.Fatalf("barrier should pass, stderr: %s", stderr.String()) } @@ -291,7 +411,7 @@ func TestAdoptionBarrier_NilStore(t *testing.T) { cfg := &config.City{} var stderr bytes.Buffer - _, passed := runAdoptionBarrier(nil, sp, cfg, "test-city", clock.Real{}, &stderr, false) + _, passed := runAdoptionBarrier("", nil, sp, cfg, "test-city", clock.Real{}, &stderr, false) if passed { t.Error("nil store: barrier should not pass") } @@ -309,7 +429,7 @@ func TestAdoptionBarrier_PoolSlotDetection(t *testing.T) { } var stderr bytes.Buffer - result, _ := runAdoptionBarrier(store, sp, cfg, "test-city", clock.Real{}, &stderr, true) + result, _ := runAdoptionBarrier("", store, sp, cfg, "test-city", clock.Real{}, &stderr, true) // Pool instance "worker-3" should resolve to config agent "worker" // via resolvePoolBase, with pool slot 3. AgentName should be the // expanded instance name "worker-3" (matching syncSessionBeads). @@ -336,7 +456,7 @@ func TestAdoptionBarrier_PoolOutOfBounds(t *testing.T) { } var stderr bytes.Buffer - result, _ := runAdoptionBarrier(store, sp, cfg, "test-city", clock.Real{}, &stderr, true) + result, _ := runAdoptionBarrier("", store, sp, cfg, "test-city", clock.Real{}, &stderr, true) found := false for _, d := range result.Details { if d.SessionName == "worker-7" && d.PoolSlot == 7 && d.OutOfBounds { @@ -378,7 +498,7 @@ func TestAdoptionBarrier_SingletonWithNumericSuffix(t *testing.T) { } var stderr bytes.Buffer - result, passed := runAdoptionBarrier(store, sp, cfg, "test-city", clock.Real{}, &stderr, false) + result, passed := runAdoptionBarrier("", store, sp, cfg, "test-city", clock.Real{}, &stderr, false) if !passed { t.Errorf("barrier should pass, stderr: %s", stderr.String()) } @@ -401,7 +521,7 @@ func TestAdoptionBarrier_UnknownSession(t *testing.T) { cfg := &config.City{} // no agents configured var stderr bytes.Buffer - result, passed := runAdoptionBarrier(store, sp, cfg, "test-city", clock.Real{}, &stderr, false) + result, passed := runAdoptionBarrier("", store, sp, cfg, "test-city", clock.Real{}, &stderr, false) if !passed { t.Error("barrier should pass (adopt permissively)") } diff --git a/cmd/gc/city_runtime.go b/cmd/gc/city_runtime.go index c3078c4ed..993af6a2b 100644 --- a/cmd/gc/city_runtime.go +++ b/cmd/gc/city_runtime.go @@ -395,7 +395,7 @@ func (cr *CityRuntime) run(ctx context.Context) { cr.onStatus("adopting_sessions") } if cr.cityBeadStore() != nil { - result, passed := runAdoptionBarrier(cr.cityBeadStore(), cr.sp, cr.cfg, cr.cityName, clock.Real{}, cr.stderr, false) + result, passed := runAdoptionBarrier(cr.cityPath, cr.cityBeadStore(), cr.sp, cr.cfg, cr.cityName, clock.Real{}, cr.stderr, false) if result.Adopted > 0 { fmt.Fprintf(cr.stdout, "Adopted %d running session(s) into bead store.\n", result.Adopted) //nolint:errcheck } diff --git a/cmd/gc/cmd_start.go b/cmd/gc/cmd_start.go index dcaccef0e..9aba7aaee 100644 --- a/cmd/gc/cmd_start.go +++ b/cmd/gc/cmd_start.go @@ -621,7 +621,7 @@ func doStartStandalone(args []string, controllerMode bool, stdout, stderr io.Wri oneShotStore = store // Run adoption barrier before sync. - result, passed := runAdoptionBarrier(store, sp, cfg, cityName, clock.Real{}, stderr, false) + result, passed := runAdoptionBarrier(cityPath, store, sp, cfg, cityName, clock.Real{}, stderr, false) if result.Adopted > 0 { fmt.Fprintf(stdout, "Adopted %d running session(s) into bead store.\n", result.Adopted) //nolint:errcheck }