Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 44 additions & 5 deletions cmd/gc/adoption_barrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var poolSlotPattern = regexp.MustCompile(`-(\d+)$`)
// Returns the adoption result and whether the barrier passed (all running
// sessions have beads).
func runAdoptionBarrier(
cityPath string,
store beads.Store,
sp runtime.Provider,
cfg *config.City,
Expand Down Expand Up @@ -204,12 +205,30 @@ func runAdoptionBarrier(
continue
}

_, createErr := store.Create(beads.Bead{
Title: detail.AgentName,
Type: sessionBeadType,
Labels: []string{sessionBeadLabel, "agent:" + detail.AgentName},
Metadata: meta,
alreadyHadBead := false
createErr := sessionpkg.WithCitySessionIdentifierLocks(cityPath, []string{sessionName, detail.AgentName}, func() error {
hasBead, err := openSessionBeadExists(store, sessionName)
if err != nil {
return err
}
if hasBead {
alreadyHadBead = true
return nil
}
_, err = store.Create(beads.Bead{
Title: detail.AgentName,
Type: sessionBeadType,
Labels: []string{sessionBeadLabel, "agent:" + detail.AgentName},
Metadata: meta,
})
return err
})
if alreadyHadBead {
result.AlreadyHadBead++
detail.HasBead = true
result.Details = append(result.Details, detail)
continue
}
if createErr != nil {
fmt.Fprintf(stderr, "adoption barrier: creating bead for %s: %v\n", sessionName, createErr) //nolint:errcheck
result.Skipped++
Expand All @@ -224,6 +243,26 @@ func runAdoptionBarrier(
return result, passed
}

func openSessionBeadExists(store beads.Store, sessionName string) (bool, error) {
existing, err := store.List(beads.ListQuery{
Label: sessionBeadLabel,
Metadata: map[string]string{"session_name": sessionName},
Live: true,
})
if err != nil {
return false, fmt.Errorf("listing session beads for %q: %w", sessionName, err)
}
for _, b := range existing {
if b.Status == "closed" {
continue
}
if sessionpkg.IsSessionBeadOrRepairable(b) {
return true, nil
}
}
return false, nil
}

// resolvePoolBase attempts to match a pool instance session name back to its
// base template agent. It strips the numeric suffix (e.g., "worker-3" -> "worker")
// and checks whether the resulting base name corresponds to a configured agent.
Expand Down
148 changes: 134 additions & 14 deletions cmd/gc/adoption_barrier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/gastownhall/gascity/internal/clock"
"github.com/gastownhall/gascity/internal/config"
"github.com/gastownhall/gascity/internal/runtime"
"github.com/gastownhall/gascity/internal/session"
)

// fakeAdoptionProvider implements runtime.Provider for adoption barrier tests.
Expand All @@ -19,6 +20,42 @@ type fakeAdoptionProvider struct {
listErr error
}

type adoptionLockProbeStore struct {
beads.Store

targetSessionName string
listed chan struct{}
createAttempted chan struct{}
allowCreate <-chan struct{}
}

func (s *adoptionLockProbeStore) List(query beads.ListQuery) ([]beads.Bead, error) {
result, err := s.Store.List(query)
if query.Label == sessionBeadLabel {
select {
case s.listed <- struct{}{}:
default:
}
}
return result, err
}

func (s *adoptionLockProbeStore) Create(b beads.Bead) (beads.Bead, error) {
if b.Type == sessionBeadType && b.Metadata["session_name"] == s.targetSessionName {
select {
case s.createAttempted <- struct{}{}:
default:
}
<-s.allowCreate
}
return s.Store.Create(b)
}

type adoptionBarrierOutcome struct {
result adoptionResult
passed bool
}

func (f *fakeAdoptionProvider) ListRunning(_ string) ([]string, error) {
return f.running, f.listErr
}
Expand Down Expand Up @@ -51,7 +88,7 @@ func TestAdoptionBarrier_NoRunning(t *testing.T) {
cfg := &config.City{}
var stderr bytes.Buffer

result, passed := runAdoptionBarrier(store, sp, cfg, "test-city", clock.Real{}, &stderr, false)
result, passed := runAdoptionBarrier("", store, sp, cfg, "test-city", clock.Real{}, &stderr, false)
if !passed {
t.Error("barrier should pass with no running sessions")
}
Expand All @@ -69,7 +106,7 @@ func TestAdoptionBarrier_PartialListUsesVisibleSessionsButFailsBarrier(t *testin
cfg := &config.City{Agents: []config.Agent{{Name: "worker"}}}
var stderr bytes.Buffer

result, passed := runAdoptionBarrier(store, sp, cfg, "test-city", clock.Real{}, &stderr, false)
result, passed := runAdoptionBarrier("", store, sp, cfg, "test-city", clock.Real{}, &stderr, false)
if passed {
t.Fatal("barrier should fail closed on partial session listing")
}
Expand All @@ -93,7 +130,7 @@ func TestAdoptionBarrier_AdoptsRunning(t *testing.T) {
var stderr bytes.Buffer
clk := &clock.Fake{Time: time.Date(2026, 3, 8, 12, 0, 0, 0, time.UTC)}

result, passed := runAdoptionBarrier(store, sp, cfg, "test-city", clk, &stderr, false)
result, passed := runAdoptionBarrier("", store, sp, cfg, "test-city", clk, &stderr, false)
if !passed {
t.Errorf("barrier should pass, stderr: %s", stderr.String())
}
Expand Down Expand Up @@ -155,7 +192,7 @@ func TestAdoptionBarrier_SkipsExistingBead(t *testing.T) {
}
var stderr bytes.Buffer

result, passed := runAdoptionBarrier(store, sp, cfg, "test-city", clock.Real{}, &stderr, false)
result, passed := runAdoptionBarrier("", store, sp, cfg, "test-city", clock.Real{}, &stderr, false)
if !passed {
t.Error("barrier should pass")
}
Expand Down Expand Up @@ -190,7 +227,7 @@ func TestAdoptionBarrier_ClosedBeadDoesNotBlock(t *testing.T) {
cfg := &config.City{Agents: []config.Agent{{Name: "mayor", MaxActiveSessions: intPtr(1)}}}
var stderr bytes.Buffer

result, passed := runAdoptionBarrier(store, sp, cfg, "test-city", clock.Real{}, &stderr, false)
result, passed := runAdoptionBarrier("", store, sp, cfg, "test-city", clock.Real{}, &stderr, false)
if !passed {
t.Error("barrier should pass")
}
Expand All @@ -206,13 +243,13 @@ func TestAdoptionBarrier_Rerunnable(t *testing.T) {
var stderr bytes.Buffer

// First run: adopts.
r1, _ := runAdoptionBarrier(store, sp, cfg, "test-city", clock.Real{}, &stderr, false)
r1, _ := runAdoptionBarrier("", store, sp, cfg, "test-city", clock.Real{}, &stderr, false)
if r1.Adopted != 1 {
t.Fatalf("first run Adopted = %d, want 1", r1.Adopted)
}

// Second run: dedup prevents duplicates.
r2, passed := runAdoptionBarrier(store, sp, cfg, "test-city", clock.Real{}, &stderr, false)
r2, passed := runAdoptionBarrier("", store, sp, cfg, "test-city", clock.Real{}, &stderr, false)
if !passed {
t.Error("second run: barrier should pass")
}
Expand All @@ -224,6 +261,89 @@ func TestAdoptionBarrier_Rerunnable(t *testing.T) {
}
}

func TestAdoptionBarrier_SerializesCreateWithSessionIdentifierLock(t *testing.T) {
const sessionName = "worker-3"
const agentName = "worker-3"
cityPath := t.TempDir()
baseStore := beads.NewMemStore()
allowCreate := make(chan struct{})
store := &adoptionLockProbeStore{
Store: baseStore,
targetSessionName: sessionName,
listed: make(chan struct{}, 1),
createAttempted: make(chan struct{}, 1),
allowCreate: allowCreate,
}
sp := &fakeAdoptionProvider{running: []string{sessionName}}
cfg := &config.City{Agents: []config.Agent{{Name: "worker", MinActiveSessions: intPtr(1), MaxActiveSessions: intPtr(5)}}}
var stderr bytes.Buffer
done := make(chan adoptionBarrierOutcome, 1)

err := session.WithCitySessionAliasLock(cityPath, agentName, func() error {
go func() {
result, passed := runAdoptionBarrier(cityPath, store, sp, cfg, "test-city", clock.Real{}, &stderr, false)
done <- adoptionBarrierOutcome{result: result, passed: passed}
}()

select {
case <-store.listed:
case <-time.After(time.Second):
t.Fatal("adoption barrier did not list existing session beads")
}

select {
case <-store.createAttempted:
close(allowCreate)
outcome := <-done
t.Fatalf("adoption barrier created while session_name lock was held; outcome=%+v stderr=%q", outcome, stderr.String())
case <-time.After(100 * time.Millisecond):
}

_, createErr := baseStore.Create(beads.Bead{
Title: agentName,
Type: sessionBeadType,
Labels: []string{sessionBeadLabel, "agent:" + agentName},
Metadata: map[string]string{
"agent_name": agentName,
"session_name": sessionName,
"state": "active",
},
})
return createErr
})
if err != nil {
t.Fatalf("holding session identifier lock: %v", err)
}
close(allowCreate)

var outcome adoptionBarrierOutcome
select {
case outcome = <-done:
case <-time.After(time.Second):
t.Fatal("adoption barrier did not finish after session_name lock released")
}
if !outcome.passed {
t.Fatalf("barrier should pass, stderr: %s", stderr.String())
}
if outcome.result.Adopted != 0 {
t.Fatalf("Adopted = %d, want 0 after locked peer created the bead", outcome.result.Adopted)
}
if outcome.result.AlreadyHadBead != 1 {
t.Fatalf("AlreadyHadBead = %d, want 1", outcome.result.AlreadyHadBead)
}

beadList, err := baseStore.ListByLabel(sessionBeadLabel, 0)
if err != nil {
t.Fatalf("listing session beads: %v", err)
}
if len(beadList) != 1 {
t.Fatalf("session bead count = %d, want 1", len(beadList))
}
if got := beadList[0].Metadata["session_name"]; got != sessionName {
t.Fatalf("session_name = %q, want %q", got, sessionName)
}
}

func TestAdoptionBarrier_DryRun(t *testing.T) {
store := beads.NewMemStore()
sp := &fakeAdoptionProvider{running: []string{"test-city-mayor", "test-city-worker"}}
Expand All @@ -235,7 +355,7 @@ func TestAdoptionBarrier_DryRun(t *testing.T) {
}
var stderr bytes.Buffer

result, passed := runAdoptionBarrier(store, sp, cfg, "test-city", clock.Real{}, &stderr, true)
result, passed := runAdoptionBarrier("", store, sp, cfg, "test-city", clock.Real{}, &stderr, true)
if !passed {
t.Error("dry run barrier should pass")
}
Expand Down Expand Up @@ -267,7 +387,7 @@ func TestAdoptionBarrier_SkipsDeadSessions(t *testing.T) {
}
var stderr bytes.Buffer

result, passed := runAdoptionBarrier(store, sp, cfg, "test-city", clock.Real{}, &stderr, false)
result, passed := runAdoptionBarrier("", store, sp, cfg, "test-city", clock.Real{}, &stderr, false)
if !passed {
t.Fatalf("barrier should pass, stderr: %s", stderr.String())
}
Expand All @@ -291,7 +411,7 @@ func TestAdoptionBarrier_NilStore(t *testing.T) {
cfg := &config.City{}
var stderr bytes.Buffer

_, passed := runAdoptionBarrier(nil, sp, cfg, "test-city", clock.Real{}, &stderr, false)
_, passed := runAdoptionBarrier("", nil, sp, cfg, "test-city", clock.Real{}, &stderr, false)
if passed {
t.Error("nil store: barrier should not pass")
}
Expand All @@ -309,7 +429,7 @@ func TestAdoptionBarrier_PoolSlotDetection(t *testing.T) {
}
var stderr bytes.Buffer

result, _ := runAdoptionBarrier(store, sp, cfg, "test-city", clock.Real{}, &stderr, true)
result, _ := runAdoptionBarrier("", store, sp, cfg, "test-city", clock.Real{}, &stderr, true)
// Pool instance "worker-3" should resolve to config agent "worker"
// via resolvePoolBase, with pool slot 3. AgentName should be the
// expanded instance name "worker-3" (matching syncSessionBeads).
Expand All @@ -336,7 +456,7 @@ func TestAdoptionBarrier_PoolOutOfBounds(t *testing.T) {
}
var stderr bytes.Buffer

result, _ := runAdoptionBarrier(store, sp, cfg, "test-city", clock.Real{}, &stderr, true)
result, _ := runAdoptionBarrier("", store, sp, cfg, "test-city", clock.Real{}, &stderr, true)
found := false
for _, d := range result.Details {
if d.SessionName == "worker-7" && d.PoolSlot == 7 && d.OutOfBounds {
Expand Down Expand Up @@ -378,7 +498,7 @@ func TestAdoptionBarrier_SingletonWithNumericSuffix(t *testing.T) {
}
var stderr bytes.Buffer

result, passed := runAdoptionBarrier(store, sp, cfg, "test-city", clock.Real{}, &stderr, false)
result, passed := runAdoptionBarrier("", store, sp, cfg, "test-city", clock.Real{}, &stderr, false)
if !passed {
t.Errorf("barrier should pass, stderr: %s", stderr.String())
}
Expand All @@ -401,7 +521,7 @@ func TestAdoptionBarrier_UnknownSession(t *testing.T) {
cfg := &config.City{} // no agents configured
var stderr bytes.Buffer

result, passed := runAdoptionBarrier(store, sp, cfg, "test-city", clock.Real{}, &stderr, false)
result, passed := runAdoptionBarrier("", store, sp, cfg, "test-city", clock.Real{}, &stderr, false)
if !passed {
t.Error("barrier should pass (adopt permissively)")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/gc/city_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func (cr *CityRuntime) run(ctx context.Context) {
cr.onStatus("adopting_sessions")
}
if cr.cityBeadStore() != nil {
result, passed := runAdoptionBarrier(cr.cityBeadStore(), cr.sp, cr.cfg, cr.cityName, clock.Real{}, cr.stderr, false)
result, passed := runAdoptionBarrier(cr.cityPath, cr.cityBeadStore(), cr.sp, cr.cfg, cr.cityName, clock.Real{}, cr.stderr, false)
if result.Adopted > 0 {
fmt.Fprintf(cr.stdout, "Adopted %d running session(s) into bead store.\n", result.Adopted) //nolint:errcheck
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/gc/cmd_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ func doStartStandalone(args []string, controllerMode bool, stdout, stderr io.Wri
oneShotStore = store

// Run adoption barrier before sync.
result, passed := runAdoptionBarrier(store, sp, cfg, cityName, clock.Real{}, stderr, false)
result, passed := runAdoptionBarrier(cityPath, store, sp, cfg, cityName, clock.Real{}, stderr, false)
if result.Adopted > 0 {
fmt.Fprintf(stdout, "Adopted %d running session(s) into bead store.\n", result.Adopted) //nolint:errcheck
}
Expand Down
Loading