diff --git a/.gitignore b/.gitignore index 62dc1ff..673a5dd 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,8 @@ sing-box-reference .agents .devcontainer -start-instance.sh \ No newline at end of file +start-instance.sh +data/ +start.bat +resin.exe +.gitignore diff --git a/cmd/resin/main.go b/cmd/resin/main.go index 4d3d6ec..d81dfe7 100644 --- a/cmd/resin/main.go +++ b/cmd/resin/main.go @@ -310,6 +310,14 @@ func newTopologyRuntime( // No NotifyNodeDirty here — AddNodeFromSub already notifies all platforms. probeMgr.TriggerImmediateEgressProbe(hash) }) + pool.SetOnNodeUpdated(func(hash node.Hash) { + if onNodeRemoved != nil { + onNodeRemoved(hash) + } + outboundMgr.EnsureNodeOutbound(hash) + probeMgr.TriggerImmediateEgressProbe(hash) + probeMgr.TriggerImmediateLatencyProbe(hash) + }) pool.SetOnNodeRemoved(func(hash node.Hash, entry *node.NodeEntry) { markNodeRemovedDirty(engine, hash, entry) outboundMgr.RemoveNodeOutbound(entry) @@ -380,6 +388,7 @@ func bootstrapTopology( sub.SetFetchConfig(ms.URL, ms.UpdateIntervalNs) sub.SetSourceType(ms.SourceType) sub.SetContent(ms.Content) + sub.SetUpstreamSubscriptionID(ms.UpstreamSubscriptionID) sub.SetEphemeralNodeEvictDelayNs(ms.EphemeralNodeEvictDelayNs) sub.CreatedAtNs = ms.CreatedAtNs sub.UpdatedAtNs = ms.UpdatedAtNs diff --git a/internal/api/subscription_local_contract_test.go b/internal/api/subscription_local_contract_test.go index 29fd87e..07a506d 100644 --- a/internal/api/subscription_local_contract_test.go +++ b/internal/api/subscription_local_contract_test.go @@ -53,3 +53,81 @@ func TestAPIContract_SubscriptionSourceTypeReadOnlyOnPatch(t *testing.T) { } assertErrorCode(t, rec, "INVALID_ARGUMENT") } + +func TestAPIContract_SubscriptionUpstreamCreateAndPatchValidation(t *testing.T) { + srv, _, _ := newControlPlaneTestServer(t) + + createARec := doJSONRequest(t, srv, http.MethodPost, "/api/v1/subscriptions", map[string]any{ + "name": "sub-a", + "url": "https://example.com/a", + }, true) + if createARec.Code != http.StatusCreated { + t.Fatalf("create sub-a status: got %d, want %d, body=%s", createARec.Code, http.StatusCreated, createARec.Body.String()) + } + createABody := decodeJSONMap(t, createARec) + subAID, _ := createABody["id"].(string) + if subAID == "" { + t.Fatalf("create sub-a missing id: body=%s", createARec.Body.String()) + } + + createBRec := doJSONRequest(t, srv, http.MethodPost, "/api/v1/subscriptions", map[string]any{ + "name": "sub-b", + "url": "https://example.com/b", + "upstream_subscription_id": subAID, + }, true) + if createBRec.Code != http.StatusCreated { + t.Fatalf("create sub-b with upstream status: got %d, want %d, body=%s", createBRec.Code, http.StatusCreated, createBRec.Body.String()) + } + createBBody := decodeJSONMap(t, createBRec) + subBID, _ := createBBody["id"].(string) + if subBID == "" { + t.Fatalf("create sub-b missing id: body=%s", createBRec.Body.String()) + } + if got, _ := createBBody["upstream_subscription_id"].(string); got != subAID { + t.Fatalf("create sub-b upstream_subscription_id: got %q, want %q", got, subAID) + } + + getBRec := doJSONRequest(t, srv, http.MethodGet, "/api/v1/subscriptions/"+subBID, nil, true) + if getBRec.Code != http.StatusOK { + t.Fatalf("get sub-b status: got %d, want %d, body=%s", getBRec.Code, http.StatusOK, getBRec.Body.String()) + } + getBBody := decodeJSONMap(t, getBRec) + if got, _ := getBBody["upstream_subscription_id"].(string); got != subAID { + t.Fatalf("get sub-b upstream_subscription_id: got %q, want %q", got, subAID) + } + + selfRec := doJSONRequest(t, srv, http.MethodPatch, "/api/v1/subscriptions/"+subBID, map[string]any{ + "upstream_subscription_id": subBID, + }, true) + if selfRec.Code != http.StatusBadRequest { + t.Fatalf("patch self upstream status: got %d, want %d, body=%s", selfRec.Code, http.StatusBadRequest, selfRec.Body.String()) + } + assertErrorCode(t, selfRec, "INVALID_ARGUMENT") + + missingRec := doJSONRequest(t, srv, http.MethodPatch, "/api/v1/subscriptions/"+subBID, map[string]any{ + "upstream_subscription_id": "11111111-1111-1111-1111-111111111111", + }, true) + if missingRec.Code != http.StatusBadRequest { + t.Fatalf("patch missing upstream status: got %d, want %d, body=%s", missingRec.Code, http.StatusBadRequest, missingRec.Body.String()) + } + assertErrorCode(t, missingRec, "INVALID_ARGUMENT") + + cycleRec := doJSONRequest(t, srv, http.MethodPatch, "/api/v1/subscriptions/"+subAID, map[string]any{ + "upstream_subscription_id": subBID, + }, true) + if cycleRec.Code != http.StatusBadRequest { + t.Fatalf("patch cycle upstream status: got %d, want %d, body=%s", cycleRec.Code, http.StatusBadRequest, cycleRec.Body.String()) + } + assertErrorCode(t, cycleRec, "INVALID_ARGUMENT") + + clearRec := doJSONRequest(t, srv, http.MethodPatch, "/api/v1/subscriptions/"+subBID, map[string]any{ + "upstream_subscription_id": "", + }, true) + if clearRec.Code != http.StatusOK { + t.Fatalf("patch clear upstream status: got %d, want %d, body=%s", clearRec.Code, http.StatusOK, clearRec.Body.String()) + } + clearBody := decodeJSONMap(t, clearRec) + if got, _ := clearBody["upstream_subscription_id"].(string); got != "" { + t.Fatalf("clear upstream_subscription_id: got %q, want empty", got) + } +} diff --git a/internal/model/models.go b/internal/model/models.go index 9e6a205..7898623 100644 --- a/internal/model/models.go +++ b/internal/model/models.go @@ -28,6 +28,7 @@ type Subscription struct { Enabled bool `json:"enabled"` Ephemeral bool `json:"ephemeral"` EphemeralNodeEvictDelayNs int64 `json:"ephemeral_node_evict_delay_ns"` + UpstreamSubscriptionID string `json:"upstream_subscription_id"` CreatedAtNs int64 `json:"created_at_ns"` UpdatedAtNs int64 `json:"updated_at_ns"` } diff --git a/internal/outbound/builder.go b/internal/outbound/builder.go index 5bef9dc..2bfef16 100644 --- a/internal/outbound/builder.go +++ b/internal/outbound/builder.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + stdlog "log" "github.com/sagernet/sing-box/adapter" "github.com/sagernet/sing-box/adapter/endpoint" @@ -32,9 +33,10 @@ type OutboundBuilder interface { // It holds a fully-wired context with DNS services so that domain-based // outbound servers can be resolved. type SingboxBuilder struct { - registry *sbOutbound.Registry - ctx context.Context - logFactory log.Factory + outboundManager *sbOutbound.Manager + ctx context.Context + logFactory log.Factory + dnsTransportManager *dns.TransportManager dnsRouter *dns.Router } @@ -102,10 +104,8 @@ func NewSingboxBuilder() (*SingboxBuilder, error) { return nil, fmt.Errorf("singbox builder: start DNS router: %w", err) } - registry := service.FromContext[adapter.OutboundRegistry](ctx).(*sbOutbound.Registry) - return &SingboxBuilder{ - registry: registry, + outboundManager: outboundMgr, ctx: ctx, logFactory: logFactory, dnsTransportManager: dnsTransportMgr, @@ -123,20 +123,26 @@ func (b *SingboxBuilder) Build(rawOptions json.RawMessage) (adapter.Outbound, er if err := sJson.UnmarshalContext(b.ctx, rawOptions, &outboundConfig); err != nil { return nil, fmt.Errorf("parse outbound options: %w", err) } + fmt.Printf("[outbound] raw=%s\n", string(rawOptions)) + stdlog.Printf("[outbound] raw=%s", string(rawOptions)) - // 2. Create the outbound instance via the registry. + // 2. Create the outbound instance via manager.Create so detour dependencies + // can be resolved against a shared outbound manager registry. logger := b.logFactory.NewLogger("outbound/" + outboundConfig.Type) - ob, err := b.registry.CreateOutbound( + if err := b.outboundManager.Create( b.ctx, nil, // router — not needed for simple dialing logger, outboundConfig.Tag, outboundConfig.Type, outboundConfig.Options, - ) - if err != nil { + ); err != nil { return nil, fmt.Errorf("create outbound [%s]: %w", outboundConfig.Type, err) } + ob, ok := b.outboundManager.Outbound(outboundConfig.Tag) + if !ok { + return nil, fmt.Errorf("create outbound [%s]: created outbound not found by tag %s", outboundConfig.Type, outboundConfig.Tag) + } // 3. Run lifecycle start stages. On failure, close and return error. for _, stage := range adapter.ListStartStages { diff --git a/internal/outbound/manager.go b/internal/outbound/manager.go index a9e24f8..ab5e75c 100644 --- a/internal/outbound/manager.go +++ b/internal/outbound/manager.go @@ -2,8 +2,11 @@ package outbound import ( "context" + "encoding/json" "errors" + "fmt" "io" + "log" "time" "github.com/Resinat/Resin/internal/netutil" @@ -45,6 +48,10 @@ func (m *OutboundManager) isLiveEntry(hash node.Hash, entry *node.NodeEntry) boo // Uses CompareAndSwap(nil, &wrapped) to guarantee only one goroutine's build // result is stored. Losers discard their result (stage 6 adds io.Closer release). func (m *OutboundManager) EnsureNodeOutbound(hash node.Hash) { + m.ensureNodeOutbound(hash, map[node.Hash]struct{}{}) +} + +func (m *OutboundManager) ensureNodeOutbound(hash node.Hash, visiting map[node.Hash]struct{}) { entry, ok := m.pool.GetEntry(hash) if !ok { return @@ -53,12 +60,37 @@ func (m *OutboundManager) EnsureNodeOutbound(hash node.Hash) { if entry.Outbound.Load() != nil { return } + if _, exists := visiting[hash]; exists { + entry.SetLastError("outbound build: detour cycle detected") + return + } + visiting[hash] = struct{}{} + defer delete(visiting, hash) + + if detourTag, ok := parseOutboundDetourTag(entry.RawOptions); ok { + log.Printf("[outbound] ensure node=%s detour=%s", hash.Hex(), detourTag) + detourHash, found := m.findHashByOutboundTag(detourTag) + if !found { + log.Printf("[outbound] detour target missing node=%s detour=%s", hash.Hex(), detourTag) + entry.SetLastError("outbound build: detour target not found: " + detourTag) + return + } + m.ensureNodeOutbound(detourHash, visiting) + detourEntry, ok := m.pool.GetEntry(detourHash) + if !ok || detourEntry.Outbound.Load() == nil { + log.Printf("[outbound] detour target not ready node=%s detour=%s detour_hash=%s", hash.Hex(), detourTag, detourHash.Hex()) + entry.SetLastError("outbound build: detour target not ready: " + detourTag) + return + } + log.Printf("[outbound] detour target ready node=%s detour=%s detour_hash=%s", hash.Hex(), detourTag, detourHash.Hex()) + } ob, err := m.builder.Build(entry.RawOptions) if err != nil { entry.SetLastError("outbound build: " + err.Error()) return } + entry.SetLastError("") // Build can race with node deletion/replacement. If this entry is no longer // the pool's live value for the hash, discard the build result. @@ -132,3 +164,56 @@ func (m *OutboundManager) FetchWithUserAgent( UserAgent: userAgent, }) } + +func parseOutboundDetourTag(raw json.RawMessage) (string, bool) { + var outbound map[string]any + if err := json.Unmarshal(raw, &outbound); err != nil { + return "", false + } + rawDetour, ok := outbound["detour"] + if !ok { + return "", false + } + detour, ok := rawDetour.(string) + if !ok || detour == "" { + return "", false + } + return detour, true +} + +func parseOutboundTag(raw json.RawMessage) (string, error) { + var outbound map[string]any + if err := json.Unmarshal(raw, &outbound); err != nil { + return "", fmt.Errorf("parse outbound: %w", err) + } + rawTag, ok := outbound["tag"] + if !ok { + return "", errors.New("tag missing") + } + tag, ok := rawTag.(string) + if !ok || tag == "" { + return "", errors.New("tag invalid") + } + return tag, nil +} + +func (m *OutboundManager) findHashByOutboundTag(tag string) (node.Hash, bool) { + found := node.Zero + matched := false + m.pool.RangeNodes(func(h node.Hash, entry *node.NodeEntry) bool { + if entry == nil { + return true + } + outboundTag, err := parseOutboundTag(entry.RawOptions) + if err != nil { + return true + } + if outboundTag == tag { + found = h + matched = true + return false + } + return true + }) + return found, matched +} diff --git a/internal/probe/manager.go b/internal/probe/manager.go index d3c9a64..b08a682 100644 --- a/internal/probe/manager.go +++ b/internal/probe/manager.go @@ -740,6 +740,7 @@ func (m *ProbeManager) probeEgress(hash node.Hash, entry *node.NodeEntry) { } if entry.Outbound.Load() == nil { + log.Printf("[probe] skip egress for %s: outbound not ready", hash.Hex()) return } @@ -767,6 +768,7 @@ func (m *ProbeManager) probeLatency(hash node.Hash, entry *node.NodeEntry, testU } if entry.Outbound.Load() == nil { + log.Printf("[probe] skip latency for %s: outbound not ready", hash.Hex()) return } diff --git a/internal/proxy/route_outbound.go b/internal/proxy/route_outbound.go index aead11c..1eb1d05 100644 --- a/internal/proxy/route_outbound.go +++ b/internal/proxy/route_outbound.go @@ -1,6 +1,9 @@ package proxy import ( + "encoding/json" + "log" + "github.com/Resinat/Resin/internal/outbound" "github.com/Resinat/Resin/internal/routing" "github.com/sagernet/sing-box/adapter" @@ -29,9 +32,17 @@ func resolveRoutedOutbound( } obPtr := entry.Outbound.Load() if obPtr == nil { + log.Printf("[proxy] route node=%s outbound=nil", result.NodeHash.Hex()) return routedOutbound{}, ErrNoAvailableNodes } + var raw map[string]any + if err := json.Unmarshal(entry.RawOptions, &raw); err == nil { + tag, _ := raw["tag"].(string) + detour, _ := raw["detour"].(string) + log.Printf("[proxy] route node=%s tag=%s detour=%s", result.NodeHash.Hex(), tag, detour) + } + return routedOutbound{ Route: result, Outbound: *obPtr, diff --git a/internal/service/control_plane_subscription.go b/internal/service/control_plane_subscription.go index d13d794..166dfa0 100644 --- a/internal/service/control_plane_subscription.go +++ b/internal/service/control_plane_subscription.go @@ -37,6 +37,7 @@ type SubscriptionResponse struct { LastChecked string `json:"last_checked,omitempty"` LastUpdated string `json:"last_updated,omitempty"` LastError string `json:"last_error,omitempty"` + UpstreamSubscriptionID string `json:"upstream_subscription_id"` } func (s *ControlPlaneService) subToResponse(sub *subscription.Subscription) SubscriptionResponse { @@ -83,6 +84,7 @@ func (s *ControlPlaneService) subToResponse(sub *subscription.Subscription) Subs resp.LastUpdated = time.Unix(0, lu).UTC().Format(time.RFC3339Nano) } resp.LastError = sub.GetLastError() + resp.UpstreamSubscriptionID = sub.UpstreamSubscriptionID() return resp } @@ -122,6 +124,7 @@ type CreateSubscriptionRequest struct { Enabled *bool `json:"enabled"` Ephemeral *bool `json:"ephemeral"` EphemeralNodeEvictDelay *string `json:"ephemeral_node_evict_delay"` + UpstreamSubscriptionID *string `json:"upstream_subscription_id"` } const minSubscriptionUpdateInterval = 30 * time.Second @@ -213,6 +216,14 @@ func (s *ControlPlaneService) CreateSubscription(req CreateSubscriptionRequest) id := uuid.New().String() now := time.Now().UnixNano() + upstreamSubscriptionID := "" + if req.UpstreamSubscriptionID != nil { + upstreamSubscriptionID = strings.TrimSpace(*req.UpstreamSubscriptionID) + } + if verr := s.validateSubscriptionUpstream(id, upstreamSubscriptionID); verr != nil { + return nil, verr + } + ms := model.Subscription{ ID: id, Name: name, @@ -223,6 +234,7 @@ func (s *ControlPlaneService) CreateSubscription(req CreateSubscriptionRequest) Enabled: enabled, Ephemeral: ephemeral, EphemeralNodeEvictDelayNs: int64(ephemeralNodeEvictDelay), + UpstreamSubscriptionID: upstreamSubscriptionID, CreatedAtNs: now, UpdatedAtNs: now, } @@ -234,6 +246,7 @@ func (s *ControlPlaneService) CreateSubscription(req CreateSubscriptionRequest) sub.SetFetchConfig(subURL, int64(updateInterval)) sub.SetSourceType(sourceType) sub.SetContent(content) + sub.SetUpstreamSubscriptionID(upstreamSubscriptionID) sub.SetEphemeralNodeEvictDelayNs(int64(ephemeralNodeEvictDelay)) sub.CreatedAtNs = now sub.UpdatedAtNs = now @@ -267,6 +280,7 @@ func (s *ControlPlaneService) UpdateSubscription(id string, patchJSON json.RawMe enabledChanged := false urlChanged := false contentChanged := false + upstreamChanged := false sourceType := sub.SourceType() newName := sub.Name() @@ -348,6 +362,20 @@ func (s *ControlPlaneService) UpdateSubscription(id string, patchJSON json.RawMe newEphemeralNodeEvictDelay = int64(d) } + newUpstreamSubscriptionID := sub.UpstreamSubscriptionID() + if upstreamID, ok, err := patch.optionalString("upstream_subscription_id"); err != nil { + return nil, err + } else if ok { + nextUpstream := strings.TrimSpace(upstreamID) + if nextUpstream != newUpstreamSubscriptionID { + upstreamChanged = true + } + newUpstreamSubscriptionID = nextUpstream + } + if verr := s.validateSubscriptionUpstream(id, newUpstreamSubscriptionID); verr != nil { + return nil, verr + } + now := time.Now().UnixNano() ms := model.Subscription{ ID: id, @@ -359,6 +387,7 @@ func (s *ControlPlaneService) UpdateSubscription(id string, patchJSON json.RawMe Enabled: newEnabled, Ephemeral: newEphemeral, EphemeralNodeEvictDelayNs: newEphemeralNodeEvictDelay, + UpstreamSubscriptionID: newUpstreamSubscriptionID, CreatedAtNs: sub.CreatedAtNs, UpdatedAtNs: now, } @@ -369,6 +398,7 @@ func (s *ControlPlaneService) UpdateSubscription(id string, patchJSON json.RawMe // Apply side-effects via scheduler. sub.SetFetchConfig(newURL, newInterval) sub.SetContent(newContent) + sub.SetUpstreamSubscriptionID(newUpstreamSubscriptionID) sub.SetEphemeral(newEphemeral) sub.SetEphemeralNodeEvictDelayNs(newEphemeralNodeEvictDelay) sub.UpdatedAtNs = now @@ -379,7 +409,7 @@ func (s *ControlPlaneService) UpdateSubscription(id string, patchJSON json.RawMe if enabledChanged { s.Scheduler.SetSubscriptionEnabled(sub, newEnabled) } - if urlChanged || contentChanged { + if urlChanged || contentChanged || upstreamChanged { go s.Scheduler.UpdateSubscription(sub) } @@ -504,3 +534,33 @@ func shouldCleanupSubscriptionNode(entry *node.NodeEntry) bool { } return entry.IsCircuitOpen() || (!entry.HasOutbound() && entry.GetLastError() != "") } + +func (s *ControlPlaneService) validateSubscriptionUpstream(currentID, upstreamID string) *ServiceError { + if upstreamID == "" { + return nil + } + if upstreamID == currentID { + return invalidArg("upstream_subscription_id: cannot reference itself") + } + if s.SubMgr.Lookup(upstreamID) == nil { + return invalidArg("upstream_subscription_id: subscription not found") + } + + visited := map[string]struct{}{} + nextID := upstreamID + for nextID != "" { + if nextID == currentID { + return invalidArg("upstream_subscription_id: cycle detected") + } + if _, ok := visited[nextID]; ok { + return invalidArg("upstream_subscription_id: cycle detected") + } + visited[nextID] = struct{}{} + nextSub := s.SubMgr.Lookup(nextID) + if nextSub == nil { + return invalidArg("upstream_subscription_id: subscription not found") + } + nextID = nextSub.UpstreamSubscriptionID() + } + return nil +} diff --git a/internal/service/control_plane_system.go b/internal/service/control_plane_system.go index 127bf3a..dfe286d 100644 --- a/internal/service/control_plane_system.go +++ b/internal/service/control_plane_system.go @@ -108,6 +108,7 @@ var subscriptionPatchAllowedFields = map[string]bool{ "enabled": true, "ephemeral": true, "ephemeral_node_evict_delay": true, + "upstream_subscription_id": true, } func parseRuntimeConfigPatch(patchJSON json.RawMessage, out *config.RuntimeConfig) *ServiceError { diff --git a/internal/state/migrate.go b/internal/state/migrate.go index 55cc953..2ed0792 100644 --- a/internal/state/migrate.go +++ b/internal/state/migrate.go @@ -23,6 +23,7 @@ const ( stateVersionAddEmptyAccountBehavior = 2 stateVersionAddFixedAccountHeader = 3 stateVersionNormalizeMissAction = 4 + stateVersionAddSubscriptionUpstream = 5 stateLegacyBaselineVersion = stateVersionAddFixedAccountHeader ) diff --git a/internal/state/migrations/state/000005_subscriptions_add_upstream_subscription_id.down.sql b/internal/state/migrations/state/000005_subscriptions_add_upstream_subscription_id.down.sql new file mode 100644 index 0000000..0f31be0 --- /dev/null +++ b/internal/state/migrations/state/000005_subscriptions_add_upstream_subscription_id.down.sql @@ -0,0 +1 @@ +-- Irreversible in SQLite without table rebuild. diff --git a/internal/state/migrations/state/000005_subscriptions_add_upstream_subscription_id.up.sql b/internal/state/migrations/state/000005_subscriptions_add_upstream_subscription_id.up.sql new file mode 100644 index 0000000..c5747d2 --- /dev/null +++ b/internal/state/migrations/state/000005_subscriptions_add_upstream_subscription_id.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE subscriptions +ADD COLUMN upstream_subscription_id TEXT NOT NULL DEFAULT ''; diff --git a/internal/state/repo_state.go b/internal/state/repo_state.go index f684df9..41a23b5 100644 --- a/internal/state/repo_state.go +++ b/internal/state/repo_state.go @@ -300,8 +300,8 @@ func (r *StateRepo) UpsertSubscription(s model.Subscription) error { _, err := r.db.Exec(` INSERT INTO subscriptions (id, name, source_type, url, content, update_interval_ns, enabled, - ephemeral, ephemeral_node_evict_delay_ns, created_at_ns, updated_at_ns) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ephemeral, ephemeral_node_evict_delay_ns, upstream_subscription_id, created_at_ns, updated_at_ns) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET name = excluded.name, source_type = excluded.source_type, @@ -311,9 +311,10 @@ func (r *StateRepo) UpsertSubscription(s model.Subscription) error { enabled = excluded.enabled, ephemeral = excluded.ephemeral, ephemeral_node_evict_delay_ns = excluded.ephemeral_node_evict_delay_ns, + upstream_subscription_id = excluded.upstream_subscription_id, updated_at_ns = excluded.updated_at_ns `, s.ID, s.Name, s.SourceType, s.URL, s.Content, s.UpdateIntervalNs, s.Enabled, - s.Ephemeral, s.EphemeralNodeEvictDelayNs, s.CreatedAtNs, s.UpdatedAtNs) + s.Ephemeral, s.EphemeralNodeEvictDelayNs, s.UpstreamSubscriptionID, s.CreatedAtNs, s.UpdatedAtNs) return err } @@ -336,7 +337,7 @@ func (r *StateRepo) DeleteSubscription(id string) error { // ListSubscriptions returns all subscriptions. func (r *StateRepo) ListSubscriptions() ([]model.Subscription, error) { rows, err := r.db.Query(`SELECT id, name, source_type, url, content, update_interval_ns, enabled, - ephemeral, ephemeral_node_evict_delay_ns, created_at_ns, updated_at_ns FROM subscriptions`) + ephemeral, ephemeral_node_evict_delay_ns, upstream_subscription_id, created_at_ns, updated_at_ns FROM subscriptions`) if err != nil { return nil, err } @@ -346,7 +347,7 @@ func (r *StateRepo) ListSubscriptions() ([]model.Subscription, error) { for rows.Next() { var s model.Subscription if err := rows.Scan(&s.ID, &s.Name, &s.SourceType, &s.URL, &s.Content, &s.UpdateIntervalNs, &s.Enabled, - &s.Ephemeral, &s.EphemeralNodeEvictDelayNs, &s.CreatedAtNs, &s.UpdatedAtNs); err != nil { + &s.Ephemeral, &s.EphemeralNodeEvictDelayNs, &s.UpstreamSubscriptionID, &s.CreatedAtNs, &s.UpdatedAtNs); err != nil { return nil, err } if s.SourceType == "" { diff --git a/internal/state/repo_state_test.go b/internal/state/repo_state_test.go index 8cabe8b..9aa2c54 100644 --- a/internal/state/repo_state_test.go +++ b/internal/state/repo_state_test.go @@ -89,6 +89,24 @@ func TestMigrateStateDB_LegacyBaselineAdvancesToLatest(t *testing.T) { if err != nil { t.Fatalf("create legacy latest-like platforms table: %v", err) } + _, err = db.Exec(` + CREATE TABLE subscriptions ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + source_type TEXT NOT NULL DEFAULT 'remote', + url TEXT NOT NULL, + content TEXT NOT NULL DEFAULT '', + update_interval_ns INTEGER NOT NULL, + enabled INTEGER NOT NULL DEFAULT 1, + ephemeral INTEGER NOT NULL DEFAULT 0, + ephemeral_node_evict_delay_ns INTEGER NOT NULL, + created_at_ns INTEGER NOT NULL, + updated_at_ns INTEGER NOT NULL + ) + `) + if err != nil { + t.Fatalf("create legacy subscriptions table: %v", err) + } if err := MigrateStateDB(db); err != nil { t.Fatalf("MigrateStateDB: %v", err) @@ -133,6 +151,24 @@ func TestMigrateStateDB_NormalizesLegacyRandomMissAction(t *testing.T) { if err != nil { t.Fatalf("create legacy latest-like platforms table: %v", err) } + _, err = db.Exec(` + CREATE TABLE subscriptions ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + source_type TEXT NOT NULL DEFAULT 'remote', + url TEXT NOT NULL, + content TEXT NOT NULL DEFAULT '', + update_interval_ns INTEGER NOT NULL, + enabled INTEGER NOT NULL DEFAULT 1, + ephemeral INTEGER NOT NULL DEFAULT 0, + ephemeral_node_evict_delay_ns INTEGER NOT NULL, + created_at_ns INTEGER NOT NULL, + updated_at_ns INTEGER NOT NULL + ) + `) + if err != nil { + t.Fatalf("create legacy subscriptions table: %v", err) + } _, err = db.Exec(` INSERT INTO platforms ( id, name, sticky_ttl_ns, regex_filters_json, region_filters_json, @@ -435,9 +471,16 @@ func TestStateRepo_Subscriptions_CRUD(t *testing.T) { now := time.Now().UnixNano() s := model.Subscription{ - ID: "sub-1", Name: "MySub", URL: "https://example.com/sub", - UpdateIntervalNs: int64(30 * time.Second), Enabled: true, - Ephemeral: false, EphemeralNodeEvictDelayNs: int64(72 * time.Hour), CreatedAtNs: now, UpdatedAtNs: now, + ID: "sub-1", + Name: "MySub", + URL: "https://example.com/sub", + UpdateIntervalNs: int64(30 * time.Second), + Enabled: true, + Ephemeral: false, + EphemeralNodeEvictDelayNs: int64(72 * time.Hour), + UpstreamSubscriptionID: "upstream-sub", + CreatedAtNs: now, + UpdatedAtNs: now, } if err := repo.UpsertSubscription(s); err != nil { t.Fatal(err) diff --git a/internal/subscription/subscription.go b/internal/subscription/subscription.go index 5628659..e39c8e2 100644 --- a/internal/subscription/subscription.go +++ b/internal/subscription/subscription.go @@ -2,6 +2,7 @@ package subscription import ( + "strings" "sync" "sync/atomic" "time" @@ -113,6 +114,7 @@ type Subscription struct { url string sourceType string content string + upstreamSubscriptionID string // updateIntervalNs is the configured subscription refresh interval. updateIntervalNs int64 name string @@ -137,7 +139,7 @@ type Subscription struct { managedNodes atomic.Pointer[ManagedNodes] // configVersion is incremented whenever refresh-input-related config changes - // (URL/source/content/update-interval). Scheduler uses it for stale-guard. + // (URL/source/content/update-interval/upstream-subscription). Scheduler uses it for stale-guard. configVersion atomic.Int64 } @@ -194,6 +196,13 @@ func (s *Subscription) Content() string { return s.content } +// UpstreamSubscriptionID returns the configured upstream subscription ID. +func (s *Subscription) UpstreamSubscriptionID() string { + s.mu.RLock() + defer s.mu.RUnlock() + return s.upstreamSubscriptionID +} + // ConfigVersion returns the scheduler input config version. func (s *Subscription) ConfigVersion() int64 { return s.configVersion.Load() @@ -239,6 +248,17 @@ func (s *Subscription) SetContent(content string) { s.mu.Unlock() } +// SetUpstreamSubscriptionID updates upstream subscription ID (thread-safe). +func (s *Subscription) SetUpstreamSubscriptionID(id string) { + id = strings.TrimSpace(id) + s.mu.Lock() + if s.upstreamSubscriptionID != id { + s.upstreamSubscriptionID = id + s.configVersion.Add(1) + } + s.mu.Unlock() +} + // Name returns the subscription name (thread-safe). func (s *Subscription) Name() string { s.mu.RLock() diff --git a/internal/topology/pool.go b/internal/topology/pool.go index d2bb154..39787b1 100644 --- a/internal/topology/pool.go +++ b/internal/topology/pool.go @@ -4,8 +4,10 @@ package topology import ( + "bytes" "encoding/json" "errors" + "io" "net/netip" "runtime" "strings" @@ -39,6 +41,7 @@ type GlobalNodePool struct { // Persistence callbacks (optional, nil in tests without persistence). onNodeAdded func(hash node.Hash) // called after a new node is created onNodeRemoved func(hash node.Hash, entry *node.NodeEntry) // called after a node is deleted from pool + onNodeUpdated func(hash node.Hash) // called after an existing node raw options are replaced onSubNodeChanged func(subID string, hash node.Hash, added bool) // Health callbacks (optional). @@ -58,6 +61,7 @@ type PoolConfig struct { GeoLookup platform.GeoLookupFunc OnNodeAdded func(hash node.Hash) OnNodeRemoved func(hash node.Hash, entry *node.NodeEntry) + OnNodeUpdated func(hash node.Hash) OnSubNodeChanged func(subID string, hash node.Hash, added bool) OnNodeDynamicChanged func(hash node.Hash) OnNodeLatencyChanged func(hash node.Hash, domain string) @@ -87,6 +91,7 @@ func NewGlobalNodePool(cfg PoolConfig) *GlobalNodePool { geoLookup: cfg.GeoLookup, onNodeAdded: cfg.OnNodeAdded, onNodeRemoved: cfg.OnNodeRemoved, + onNodeUpdated: cfg.OnNodeUpdated, onSubNodeChanged: cfg.OnSubNodeChanged, onNodeDynamicChanged: cfg.OnNodeDynamicChanged, onNodeLatencyChanged: cfg.OnNodeLatencyChanged, @@ -105,6 +110,7 @@ func NewGlobalNodePool(cfg PoolConfig) *GlobalNodePool { // After mutation, notifies all platforms to re-evaluate the node. func (p *GlobalNodePool) AddNodeFromSub(hash node.Hash, rawOpts json.RawMessage, subID string) { isNew := false + rawChanged := false p.nodes.Compute(hash, func(entry *node.NodeEntry, loaded bool) (*node.NodeEntry, xsync.ComputeOp) { if !loaded { createdAt := time.Now() @@ -112,6 +118,15 @@ func (p *GlobalNodePool) AddNodeFromSub(hash node.Hash, rawOpts json.RawMessage, // New subscription nodes start as circuit-open and must be proven healthy by probes. entry.CircuitOpenSince.Store(createdAt.UnixNano()) isNew = true + } else if !bytes.Equal(entry.RawOptions, rawOpts) { + oldOutbound := entry.Outbound.Swap(nil) + if oldOutbound != nil { + if c, ok := (*oldOutbound).(io.Closer); ok { + _ = c.Close() + } + } + entry.RawOptions = append(json.RawMessage(nil), rawOpts...) + rawChanged = true } entry.AddSubscriptionID(subID) return entry, xsync.UpdateOp @@ -120,6 +135,9 @@ func (p *GlobalNodePool) AddNodeFromSub(hash node.Hash, rawOpts json.RawMessage, if isNew && p.onNodeAdded != nil { p.onNodeAdded(hash) } + if rawChanged && p.onNodeUpdated != nil { + p.onNodeUpdated(hash) + } if p.onSubNodeChanged != nil { p.onSubNodeChanged(subID, hash, true) } @@ -499,6 +517,12 @@ func (p *GlobalNodePool) SetOnNodeAdded(fn func(hash node.Hash)) { p.onNodeAdded = fn } +// SetOnNodeUpdated sets the callback fired when an existing node's raw options are replaced. +// Must be called before any background workers are started. +func (p *GlobalNodePool) SetOnNodeUpdated(fn func(hash node.Hash)) { + p.onNodeUpdated = fn +} + // SetOnNodeRemoved sets the callback fired when a node is removed from the pool. // Must be called before any background workers are started. func (p *GlobalNodePool) SetOnNodeRemoved(fn func(hash node.Hash, entry *node.NodeEntry)) { diff --git a/internal/topology/scheduler_test.go b/internal/topology/scheduler_test.go index 8e3b0cb..0c5cd5c 100644 --- a/internal/topology/scheduler_test.go +++ b/internal/topology/scheduler_test.go @@ -212,6 +212,46 @@ func TestScheduler_UpdateSubscription_LocalSubscription_SuccessWithoutFetcher(t } } +func TestScheduler_UpdateSubscription_ChainInjectsDetour(t *testing.T) { + subMgr := NewSubscriptionManager() + upstream := subscription.NewSubscription("upstream", "Upstream", "http://example.com/up", true, false) + downstream := subscription.NewSubscription("downstream", "Downstream", "http://example.com/down", true, false) + downstream.SetUpstreamSubscriptionID(upstream.ID) + subMgr.Register(upstream) + subMgr.Register(downstream) + + pool := newTestPool(subMgr) + + upstreamRaw := json.RawMessage(`{"type":"shadowsocks","tag":"upstream-node","server":"1.1.1.1","server_port":443}`) + upstreamHash := node.HashFromRawOptions(upstreamRaw) + upstreamManaged := subscription.NewManagedNodes() + upstreamManaged.StoreNode(upstreamHash, subscription.ManagedNode{Tags: []string{"up"}}) + upstream.SwapManagedNodes(upstreamManaged) + pool.AddNodeFromSub(upstreamHash, upstreamRaw, upstream.ID) + + downstreamBody := makeSubscriptionJSON( + `{"type":"vmess","tag":"down-node","server":"2.2.2.2","server_port":443}`, + ) + sched := newTestScheduler(subMgr, pool, makeMockFetcher(downstreamBody, nil)) + sched.UpdateSubscription(downstream) + + downstreamHash := node.HashFromRawOptions([]byte(`{"type":"vmess","tag":"down-node","server":"2.2.2.2","server_port":443}`)) + entry, ok := pool.GetEntry(downstreamHash) + if !ok { + t.Fatal("expected downstream node in pool") + } + + var outbound map[string]any + if err := json.Unmarshal(entry.RawOptions, &outbound); err != nil { + t.Fatalf("decode downstream outbound: %v", err) + } + detour, _ := outbound["detour"].(string) + wantDetour := chainedOutboundTagPrefix + upstreamHash.Hex() + if detour != wantDetour { + t.Fatalf("downstream detour: got %q, want %q", detour, wantDetour) + } +} + func TestScheduler_UpdateSubscription_LocalSubscription_ParseFailure(t *testing.T) { subMgr := NewSubscriptionManager() sub := subscription.NewSubscription("s1", "LocalSub", "", true, false) diff --git a/internal/topology/subscription_scheduler.go b/internal/topology/subscription_scheduler.go index 97b78a1..c98b62e 100644 --- a/internal/topology/subscription_scheduler.go +++ b/internal/topology/subscription_scheduler.go @@ -2,8 +2,11 @@ package topology import ( "context" + "encoding/json" + "fmt" "log" "runtime" + "sort" "sync" "time" @@ -13,7 +16,10 @@ import ( "github.com/Resinat/Resin/internal/subscription" ) -const schedulerLookahead = 15 * time.Second +const ( + schedulerLookahead = 15 * time.Second + chainedOutboundTagPrefix = "resin-node-" +) // SubscriptionScheduler manages periodic subscription updates. type SubscriptionScheduler struct { @@ -220,15 +226,28 @@ func (s *SubscriptionScheduler) UpdateSubscription(sub *subscription.Subscriptio } // 3. Build new managed nodes map (lock-free, pure computation). + upstreamDetourTag, err := s.resolveUpstreamDetourTag(sub.UpstreamSubscriptionID()) + if err != nil { + s.handleUpdateFailure(sub, attemptStartedNs, attemptConfigVersion, "chain", err) + return + } + log.Printf("[scheduler] subscription=%s upstream=%s detour=%s", sub.ID, sub.UpstreamSubscriptionID(), upstreamDetourTag) + newManagedNodes := subscription.NewManagedNodes() rawByHash := make(map[node.Hash][]byte) for _, p := range parsed { - h := node.HashFromRawOptions(p.RawOptions) - existing, _ := newManagedNodes.LoadNode(h) + rawOptions, hash, err := rewriteParsedOutboundRaw(p.RawOptions, upstreamDetourTag) + if err != nil { + s.handleUpdateFailure(sub, attemptStartedNs, attemptConfigVersion, "rewrite", err) + return + } + log.Printf("[scheduler] subscription=%s node=%s tag=%s detour=%s", sub.ID, hash.Hex(), p.Tag, upstreamDetourTag) + + existing, _ := newManagedNodes.LoadNode(hash) existing.Tags = append(existing.Tags, p.Tag) - newManagedNodes.StoreNode(h, existing) - if _, ok := rawByHash[h]; !ok { - rawByHash[h] = p.RawOptions + newManagedNodes.StoreNode(hash, existing) + if _, ok := rawByHash[hash]; !ok { + rawByHash[hash] = rawOptions } } @@ -241,7 +260,7 @@ func (s *SubscriptionScheduler) UpdateSubscription(sub *subscription.Subscriptio } // Stale success guard: if a newer successful update has already landed, // discard this older attempt to avoid rolling state backward. - if sub.LastUpdatedNs.Load() > attemptStartedNs { + if sub.LastUpdatedNs.Load() >= attemptStartedNs { return } @@ -294,6 +313,7 @@ func (s *SubscriptionScheduler) UpdateSubscription(sub *subscription.Subscriptio if s.onSubUpdated != nil { s.onSubUpdated(sub) } + s.refreshDownstreamSubscriptions(sub.ID) } // handleUpdateFailure applies a fetch/parse failure to subscription state. @@ -312,7 +332,7 @@ func (s *SubscriptionScheduler) handleUpdateFailure( if sub.ConfigVersion() != attemptConfigVersion { return } - if sub.LastUpdatedNs.Load() > attemptStartedNs { + if sub.LastUpdatedNs.Load() >= attemptStartedNs { return } now := time.Now().UnixNano() @@ -400,3 +420,77 @@ func (s *SubscriptionScheduler) RenameSubscription(sub *subscription.Subscriptio func (s *SubscriptionScheduler) fetchViaDownloader(url string) ([]byte, error) { return s.downloader.Download(s.downloadCtx, url) } + +func (s *SubscriptionScheduler) resolveUpstreamDetourTag(upstreamSubID string) (string, error) { + if upstreamSubID == "" { + return "", nil + } + upstreamSub := s.subManager.Lookup(upstreamSubID) + if upstreamSub == nil { + return "", fmt.Errorf("upstream subscription %s not found", upstreamSubID) + } + + hashes := make([]node.Hash, 0) + upstreamSub.ManagedNodes().RangeNodes(func(h node.Hash, managed subscription.ManagedNode) bool { + if managed.Evicted { + return true + } + if _, ok := s.pool.GetEntry(h); !ok { + return true + } + hashes = append(hashes, h) + return true + }) + if len(hashes) == 0 { + return "", fmt.Errorf("upstream subscription %s has no active nodes in pool", upstreamSubID) + } + sort.Slice(hashes, func(i, j int) bool { + return hashes[i].Hex() < hashes[j].Hex() + }) + return chainedOutboundTagPrefix + hashes[0].Hex(), nil +} + +func (s *SubscriptionScheduler) refreshDownstreamSubscriptions(upstreamSubID string) { + if upstreamSubID == "" { + return + } + queue := []string{upstreamSubID} + visited := map[string]struct{}{upstreamSubID: {}} + for len(queue) > 0 { + current := queue[0] + queue = queue[1:] + s.subManager.Range(func(_ string, sub *subscription.Subscription) bool { + if sub.UpstreamSubscriptionID() != current { + return true + } + if _, ok := visited[sub.ID]; ok { + return true + } + visited[sub.ID] = struct{}{} + if sub.Enabled() { + go s.UpdateSubscription(sub) + } + queue = append(queue, sub.ID) + return true + }) + } +} + +func rewriteParsedOutboundRaw(raw []byte, detourTag string) ([]byte, node.Hash, error) { + var outbound map[string]any + if err := json.Unmarshal(raw, &outbound); err != nil { + return nil, node.Zero, fmt.Errorf("decode outbound: %w", err) + } + hash := node.HashFromRawOptions(raw) + outbound["tag"] = chainedOutboundTagPrefix + hash.Hex() + if detourTag != "" { + outbound["detour"] = detourTag + } else { + delete(outbound, "detour") + } + rewritten, err := json.Marshal(outbound) + if err != nil { + return nil, node.Zero, fmt.Errorf("encode outbound: %w", err) + } + return rewritten, hash, nil +} diff --git a/webui/src/features/subscriptions/SubscriptionPage.tsx b/webui/src/features/subscriptions/SubscriptionPage.tsx index 9e77568..c10f587 100644 --- a/webui/src/features/subscriptions/SubscriptionPage.tsx +++ b/webui/src/features/subscriptions/SubscriptionPage.tsx @@ -45,6 +45,7 @@ const subscriptionCreateSchema = z.object({ content: z.string(), update_interval: z.string().trim().min(1, "更新间隔不能为空"), ephemeral_node_evict_delay: z.string().trim().min(1, "临时节点驱逐延迟不能为空"), + upstream_subscription_id: z.string(), enabled: z.boolean(), ephemeral: z.boolean(), }).superRefine((value, ctx) => { @@ -91,6 +92,7 @@ function subscriptionToEditForm(subscription: Subscription): SubscriptionEditFor content: subscription.content ?? "", update_interval: subscription.update_interval, ephemeral_node_evict_delay: subscription.ephemeral_node_evict_delay, + upstream_subscription_id: subscription.upstream_subscription_id || "", enabled: subscription.enabled, ephemeral: subscription.ephemeral, }; @@ -170,6 +172,12 @@ export function SubscriptionPage() { return subscriptions.find((item) => item.id === selectedSubscriptionId) ?? null; }, [selectedSubscriptionId, subscriptions]); + const upstreamOptions = useMemo(() => { + return subscriptions + .map((item) => ({ id: item.id, name: item.name })) + .sort((a, b) => a.name.localeCompare(b.name)); + }, [subscriptions]); + const drawerVisible = drawerOpen && Boolean(selectedSubscription); const createForm = useForm({ @@ -181,6 +189,7 @@ export function SubscriptionPage() { content: "", update_interval: "12h", ephemeral_node_evict_delay: "72h", + upstream_subscription_id: "", enabled: true, ephemeral: false, }, @@ -198,6 +207,7 @@ export function SubscriptionPage() { content: "", update_interval: "12h", ephemeral_node_evict_delay: "72h", + upstream_subscription_id: "", enabled: true, ephemeral: false, }, @@ -252,6 +262,7 @@ export function SubscriptionPage() { content: "", update_interval: LOCAL_SOURCE_UPDATE_INTERVAL, ephemeral_node_evict_delay: "72h", + upstream_subscription_id: "", enabled: true, ephemeral: false, }); @@ -272,6 +283,7 @@ export function SubscriptionPage() { name: formData.name.trim(), update_interval: normalizeSubmitUpdateInterval(formData.source_type, formData.update_interval), ephemeral_node_evict_delay: formData.ephemeral_node_evict_delay.trim(), + upstream_subscription_id: formData.upstream_subscription_id.trim(), enabled: formData.enabled, ephemeral: formData.ephemeral, ...(formData.source_type === "remote" @@ -372,6 +384,7 @@ export function SubscriptionPage() { source_type: values.source_type, update_interval: normalizeSubmitUpdateInterval(values.source_type, values.update_interval), ephemeral_node_evict_delay: values.ephemeral_node_evict_delay.trim(), + upstream_subscription_id: values.upstream_subscription_id.trim(), enabled: values.enabled, ephemeral: values.ephemeral, ...(values.source_type === "remote" @@ -808,6 +821,20 @@ export function SubscriptionPage() { ) : null} +
+ + +
+
+
+ + +
+