diff --git a/backend/admin/service.go b/backend/admin/service.go index 8134930cd7..42ecbbad01 100644 --- a/backend/admin/service.go +++ b/backend/admin/service.go @@ -48,7 +48,7 @@ type Config struct { type Service struct { env *EnvironmentManager - timelineClient *timelineclient.Client + timelineClient *timelineclient.RealClient schemaClient ftlv1connect.SchemaServiceClient source *schemaeventsource.EventSource storage *oci.ArtefactService @@ -86,7 +86,7 @@ func NewAdminService( source *schemaeventsource.EventSource, storage *oci.ArtefactService, routes *routing.VerbCallRouter, - timelineClient *timelineclient.Client, + timelineClient *timelineclient.RealClient, waitFor []string, ) *Service { return &Service{ diff --git a/backend/console/console.go b/backend/console/console.go index 93262aa078..e31c9b655a 100644 --- a/backend/console/console.go +++ b/backend/console/console.go @@ -41,7 +41,7 @@ type Config struct { type Service struct { schemaEventSource *schemaeventsource.EventSource - timelineClient *timelineclient.Client + timelineClient *timelineclient.RealClient adminClient admin.EnvironmentClient callClient routing.CallClient buildEngineClient buildenginepbconnect.BuildEngineServiceClient @@ -55,7 +55,7 @@ var _ consolepbconnect.ConsoleServiceHandler = (*Service)(nil) func New( eventSource *schemaeventsource.EventSource, - timelineClient *timelineclient.Client, + timelineClient *timelineclient.RealClient, adminClient admin.EnvironmentClient, client routing.CallClient, buildEngineClient buildenginepbconnect.BuildEngineServiceClient, diff --git a/backend/cron/service.go b/backend/cron/service.go index 788d10e56e..d05d669435 100644 --- a/backend/cron/service.go +++ b/backend/cron/service.go @@ -60,7 +60,7 @@ type Config struct { } // Start the cron service. Blocks until the context is cancelled. -func Start(ctx context.Context, config Config, eventSource *schemaeventsource.EventSource, client routing.CallClient, timelineClient *timelineclient.Client) error { +func Start(ctx context.Context, config Config, eventSource *schemaeventsource.EventSource, client routing.CallClient, timelineClient timelineclient.Publisher) error { logger := log.FromContext(ctx).Scope("cron") ctx = log.ContextWithLogger(ctx, logger) // Map of cron jobs for each module. @@ -153,7 +153,7 @@ func Start(ctx context.Context, config Config, eventSource *schemaeventsource.Ev return nil } -func executeJob(ctx context.Context, state *statemachine.SingleQueryHandle[struct{}, CronState, CronEvent], client routing.CallClient, job *cronJob, timelineClient *timelineclient.Client) error { +func executeJob(ctx context.Context, state *statemachine.SingleQueryHandle[struct{}, CronState, CronEvent], client routing.CallClient, job *cronJob, timelineClient timelineclient.Publisher) error { logger := log.FromContext(ctx).Scope("cron").Module(job.module) logger.Debugf("Executing cron job %s", job) @@ -249,7 +249,7 @@ func scheduleNext(cronQueue []*cronJob) (time.Duration, bool) { return next, true } -func updateCronJobs(ctx context.Context, cronJobs map[string][]*cronJob, change schema.Notification, timelineClient *timelineclient.Client) error { +func updateCronJobs(ctx context.Context, cronJobs map[string][]*cronJob, change schema.Notification, timelineClient timelineclient.Publisher) error { logger := log.FromContext(ctx).Scope("cron") // Track jobs before the update to detect changes @@ -297,7 +297,7 @@ func updateCronJobs(ctx context.Context, cronJobs map[string][]*cronJob, change return nil } -func publishNewOrChangedJobs(ctx context.Context, newJobs []*cronJob, oldJobs map[string]*cronJob, timelineClient *timelineclient.Client) { +func publishNewOrChangedJobs(ctx context.Context, newJobs []*cronJob, oldJobs map[string]*cronJob, timelineClient timelineclient.Publisher) { for _, job := range newJobs { oldJob, exists := oldJobs[job.Key()] // Publish event if job is new or if the schedule has changed diff --git a/backend/ingress/service.go b/backend/ingress/service.go index 077c1ece73..317ccf7acf 100644 --- a/backend/ingress/service.go +++ b/backend/ingress/service.go @@ -40,13 +40,13 @@ type service struct { // Complete schema synchronised from the database. view *atomic.Value[materialisedView] client routing.CallClient - timelineClient *timelineclient.Client + timelineClient timelineclient.Publisher routeTable *routing.RouteTable urlPrefix string } // Start the HTTP ingress service. Blocks until the context is cancelled. -func Start(ctx context.Context, bind *url.URL, config Config, eventSource *schemaeventsource.EventSource, client routing.CallClient, timelineClient *timelineclient.Client) error { +func Start(ctx context.Context, bind *url.URL, config Config, eventSource *schemaeventsource.EventSource, client routing.CallClient, timelineClient timelineclient.Publisher) error { logger := log.FromContext(ctx).Scope("http-ingress") ctx = log.ContextWithLogger(ctx, logger) svc := &service{ diff --git a/backend/provisioner/service.go b/backend/provisioner/service.go index 1f392f5269..2341254785 100644 --- a/backend/provisioner/service.go +++ b/backend/provisioner/service.go @@ -81,7 +81,7 @@ func Start( ctx context.Context, registry *ProvisionerRegistry, schemaClient schemaconnect.SchemaServiceClient, - timelineClient *timeline.Client, + timelineClient timeline.Publisher, ) error { timelineLogSink := timeline.NewLogSink(timelineClient, log.Debug) go timelineLogSink.RunLogLoop(ctx) diff --git a/backend/runner/proxy/proxy.go b/backend/runner/proxy/proxy.go index 4bc9031207..b63c1ec2be 100644 --- a/backend/runner/proxy/proxy.go +++ b/backend/runner/proxy/proxy.go @@ -38,7 +38,7 @@ type Service struct { deploymentContextProvider deploymentcontext.DeploymentContextProvider controllerLeaseService ftlleaseconnect.LeaseServiceClient moduleVerbService *xsync.MapOf[string, moduleVerbService] - timelineClient *timelineclient.Client + timelineClient timelineclient.Publisher queryService *query.Service localModuleName string schema *schema.Module @@ -49,7 +49,7 @@ type Service struct { func New(controllerModuleService deploymentcontext.DeploymentContextProvider, leaseClient ftlleaseconnect.LeaseServiceClient, - timelineClient *timelineclient.Client, + timelineClient timelineclient.Publisher, queryService *query.Service, schema *schema.Module, localDeployment key.Deployment, diff --git a/backend/runner/pubsub/consumer.go b/backend/runner/pubsub/consumer.go index a467e60cc7..24a7316d2a 100644 --- a/backend/runner/pubsub/consumer.go +++ b/backend/runner/pubsub/consumer.go @@ -73,13 +73,13 @@ type consumer struct { deadLetterPublisher optional.Option[*publisher] verbClient VerbClient - timelineClient *timelineclient.Client + timelineClient timelineclient.Publisher claimedPartitionsChan chan partitionEvent } func newConsumer(moduleName string, verb *schema.Verb, subscriber *schema.MetadataSubscriber, deployment key.Deployment, - deadLetterPublisher optional.Option[*publisher], verbClient VerbClient, timelineClient *timelineclient.Client) (*consumer, error) { + deadLetterPublisher optional.Option[*publisher], verbClient VerbClient, timelineClient timelineclient.Publisher) (*consumer, error) { if verb.Runtime == nil { return nil, errors.Errorf("subscription %s has no runtime", verb.Name) } diff --git a/backend/runner/pubsub/publisher.go b/backend/runner/pubsub/publisher.go index d20ae9d03a..58e1ae5fca 100644 --- a/backend/runner/pubsub/publisher.go +++ b/backend/runner/pubsub/publisher.go @@ -27,10 +27,10 @@ type publisher struct { topic *schema.Topic producer sarama.SyncProducer - timelineClient *timelineclient.Client + timelineClient timelineclient.Publisher } -func newPublisher(module string, t *schema.Topic, deployment key.Deployment, timelineClient *timelineclient.Client) (*publisher, error) { +func newPublisher(module string, t *schema.Topic, deployment key.Deployment, timelineClient timelineclient.Publisher) (*publisher, error) { if t.Runtime == nil { return nil, errors.Errorf("topic %s has no runtime", t.Name) } diff --git a/backend/runner/pubsub/pubsub.go b/backend/runner/pubsub/pubsub.go index aaa8a82f9a..a22d55fd8d 100644 --- a/backend/runner/pubsub/pubsub.go +++ b/backend/runner/pubsub/pubsub.go @@ -30,7 +30,7 @@ type VerbClient interface { var _ pubsubpbconnect.PublishServiceHandler = (*Service)(nil) var _ pubsubpbconnect.PubSubAdminServiceHandler = (*Service)(nil) -func New(module *schema.Module, deployment key.Deployment, verbClient VerbClient, timelineClient *timelineclient.Client) (*Service, error) { +func New(module *schema.Module, deployment key.Deployment, verbClient VerbClient, timelineClient timelineclient.Publisher) (*Service, error) { publishers := map[string]*publisher{} for t := range sl.FilterVariants[*schema.Topic](module.Decls) { publisher, err := newPublisher(module.Name, t, deployment, timelineClient) diff --git a/backend/runner/runner.go b/backend/runner/runner.go index 485693804b..be9fa6f006 100644 --- a/backend/runner/runner.go +++ b/backend/runner/runner.go @@ -204,7 +204,7 @@ type Service struct { deploymentProvider oci.DeploymentArtefactProvider deploymentContextProvider deploymentcontext.DeploymentContextProvider schema *schema.Module - timelineClient *timeline.Client + timelineClient timeline.Publisher timelineLogSink *timeline.LogSink // Failed to register with the Controller registrationFailure atomic.Value[optional.Option[error]] diff --git a/backend/schemaservice/schemaservice.go b/backend/schemaservice/schemaservice.go index f9702cbc78..2f27a5226e 100644 --- a/backend/schemaservice/schemaservice.go +++ b/backend/schemaservice/schemaservice.go @@ -36,7 +36,7 @@ type Config struct { type Service struct { State *statemachine.SingleQueryHandle[struct{}, SchemaState, EventWrapper] Config Config - timelineClient *timelineclient.Client + timelineClient timelineclient.Publisher receiverClient optional.Option[ftlv1connect.SchemaMirrorServiceClient] // TODO: rename as mirror devMode bool creationLock sync.Mutex @@ -65,7 +65,7 @@ func (s *Service) GetDeployment(ctx context.Context, c *connect.Request[ftlv1.Ge var _ ftlv1connect.SchemaServiceHandler = (*Service)(nil) -func NewLocalService(ctx context.Context, config Config, timelineClient *timelineclient.Client, devMode bool) *Service { +func NewLocalService(ctx context.Context, config Config, timelineClient timelineclient.Publisher, devMode bool) *Service { s := &Service{ State: statemachine.NewSingleQueryHandle(statemachine.NewLocalHandle(newStateMachine(ctx, "")), struct{}{}), Config: config, @@ -85,7 +85,7 @@ func (s *Service) StartServices(context.Context) ([]rpc.Option, error) { func New( ctx context.Context, config Config, - timelineClient *timelineclient.Client, + timelineClient timelineclient.Publisher, receiverClient optional.Option[ftlv1connect.SchemaMirrorServiceClient], realm string, devMode bool, diff --git a/backend/timeline/integration_test.go b/backend/timeline/integration_test.go index c244420a12..2f33cf9715 100644 --- a/backend/timeline/integration_test.go +++ b/backend/timeline/integration_test.go @@ -40,9 +40,6 @@ func TestTimeline(t *testing.T) { in.Call("publisher", "publish", in.Obj{}, func(t testing.TB, resp in.Obj) {}), in.SubTests( - in.SubTest{Name: "Limit", Action: in.VerifyTimeline(1, []*timelinepb.TimelineQuery_Filter{}, func(ctx context.Context, t testing.TB, events []*timelinepb.Event) { - assert.Equal(t, 1, len(events)) - })}, in.SubTest{Name: "IngressEvent", Action: in.VerifyTimeline(1000, []*timelinepb.TimelineQuery_Filter{ { Filter: &timelinepb.TimelineQuery_Filter_EventTypes{ diff --git a/cmd/ftl/app.go b/cmd/ftl/app.go index 32bf1728b4..69beff497c 100644 --- a/cmd/ftl/app.go +++ b/cmd/ftl/app.go @@ -260,7 +260,7 @@ func makeBindContext(logger *log.Logger, cancel context.CancelCauseFunc, csm *cu kctx.Bind(csm) kctx.Bind(&cli.SharedCLI) - err = kctx.BindToProvider(func() (*timelineclient.Client, error) { + err = kctx.BindToProvider(func() (*timelineclient.RealClient, error) { return timelineclient.NewClient(ctx, cli.TimelineConfig), nil }) kctx.FatalIfErrorf(err) diff --git a/cmd/ftl/cmd_replay.go b/cmd/ftl/cmd_replay.go index 67652d1c0a..5050f4aa06 100644 --- a/cmd/ftl/cmd_replay.go +++ b/cmd/ftl/cmd_replay.go @@ -30,7 +30,7 @@ func (c *replayCmd) Run( ctx context.Context, verbClient ftlv1connect.VerbServiceClient, eventSource *schemaeventsource.EventSource, - timelineClient *timelineclient.Client, + timelineClient *timelineclient.RealClient, ) error { // Wait timeout is for both pings to complete, not each ping individually startTime := time.Now() diff --git a/internal/routing/verb_routing.go b/internal/routing/verb_routing.go index da48e9acc5..69410c6c69 100644 --- a/internal/routing/verb_routing.go +++ b/internal/routing/verb_routing.go @@ -33,7 +33,7 @@ type CallClient interface { type VerbCallRouter struct { routingTable *RouteTable moduleClients *xsync.MapOf[string, optional.Option[ftlv1connect.VerbServiceClient]] - timelineClient *timelineclient.Client + timelineClient timelineclient.Publisher } func (s *VerbCallRouter) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) { @@ -89,7 +89,7 @@ func (s *VerbCallRouter) Call(ctx context.Context, req *connect.Request[ftlv1.Ca return resp, nil } -func NewVerbRouterFromTable(ctx context.Context, routeTable *RouteTable, timelineClient *timelineclient.Client) *VerbCallRouter { +func NewVerbRouterFromTable(ctx context.Context, routeTable *RouteTable, timelineClient timelineclient.Publisher) *VerbCallRouter { svc := &VerbCallRouter{ routingTable: routeTable, moduleClients: xsync.NewMapOf[string, optional.Option[ftlv1connect.VerbServiceClient]](), @@ -106,7 +106,7 @@ func NewVerbRouterFromTable(ctx context.Context, routeTable *RouteTable, timelin }() return svc } -func NewVerbRouter(ctx context.Context, changes *schemaeventsource.EventSource, timelineClient *timelineclient.Client) *VerbCallRouter { +func NewVerbRouter(ctx context.Context, changes *schemaeventsource.EventSource, timelineClient timelineclient.Publisher) *VerbCallRouter { return NewVerbRouterFromTable(ctx, New(ctx, changes), timelineClient) } diff --git a/internal/timelineclient/client.go b/internal/timelineclient/client.go index af2a352c03..96507f1924 100644 --- a/internal/timelineclient/client.go +++ b/internal/timelineclient/client.go @@ -26,7 +26,11 @@ type Config struct { TimelineEndpoint *url.URL `help:"Timeline endpoint (discard:// to disable)." env:"FTL_TIMELINE_ENDPOINT" default:"http://127.0.0.1:8892"` } -type Client struct { +type Publisher interface { + Publish(ctx context.Context, event Event) +} + +type RealClient struct { timelinepbconnect.TimelineServiceClient entries chan *timelinepb.CreateEventsRequest_EventEntry @@ -34,7 +38,9 @@ type Client struct { lastFailedError atomic.Value[time.Time] } -func (c *Client) Ping(context.Context, *connect.Request[v1.PingRequest]) (*connect.Response[v1.PingResponse], error) { +var _ Publisher = &RealClient{} + +func (c *RealClient) Ping(context.Context, *connect.Request[v1.PingRequest]) (*connect.Response[v1.PingResponse], error) { return connect.NewResponse(&v1.PingResponse{}), nil } @@ -46,12 +52,12 @@ var NullConfig = Config{ // // If endpoint is discard:// the client will not create an RPC client or send any RPC requests, and all events // will be immediately discarded. -func NewClient(ctx context.Context, config Config) *Client { +func NewClient(ctx context.Context, config Config) *RealClient { var c timelinepbconnect.TimelineServiceClient if config.TimelineEndpoint.Scheme != "discard" { c = rpc.Dial(timelinepbconnect.NewTimelineServiceClient, config.TimelineEndpoint.String(), log.Error) } - client := &Client{ + client := &RealClient{ TimelineServiceClient: c, entries: make(chan *timelinepb.CreateEventsRequest_EventEntry, 1000), } @@ -70,7 +76,7 @@ type Event interface { } // Publish asynchronously enqueues an event for publication to the timeline. -func (c *Client) Publish(ctx context.Context, event Event) { +func (c *RealClient) Publish(ctx context.Context, event Event) { entry, err := event.ToEntry() entry.Timestamp = timestamppb.New(time.Now()) if err != nil { @@ -87,7 +93,7 @@ func (c *Client) Publish(ctx context.Context, event Event) { } } -func (c *Client) noopEvents(ctx context.Context) { +func (c *RealClient) noopEvents(ctx context.Context) { for { select { case <-c.entries: @@ -97,7 +103,7 @@ func (c *Client) noopEvents(ctx context.Context) { } } -func (c *Client) processEvents(ctx context.Context) { +func (c *RealClient) processEvents(ctx context.Context) { lastFlush := time.Now() buffer := make([]*timelinepb.CreateEventsRequest_EventEntry, 0, maxBatchSize) for { @@ -125,7 +131,7 @@ func (c *Client) processEvents(ctx context.Context) { } // Flush all events in the buffer to the timeline service in a single call. -func (c *Client) flushEvents(ctx context.Context, entries []*timelinepb.CreateEventsRequest_EventEntry) { +func (c *RealClient) flushEvents(ctx context.Context, entries []*timelinepb.CreateEventsRequest_EventEntry) { logger := log.FromContext(ctx).Scope("timeline") _, err := c.CreateEvents(ctx, connect.NewRequest(&timelinepb.CreateEventsRequest{ Entries: entries, diff --git a/internal/timelineclient/log_sink.go b/internal/timelineclient/log_sink.go index 3442057e2f..a65a51430b 100644 --- a/internal/timelineclient/log_sink.go +++ b/internal/timelineclient/log_sink.go @@ -15,14 +15,14 @@ import ( // // It needs to be run in a separate goroutine after creation by calling RunLogLoop. type LogSink struct { - client *Client + client Publisher logQueue chan log.Entry level log.Level } var _ log.Sink = (*LogSink)(nil) -func NewLogSink(client *Client, level log.Level) *LogSink { +func NewLogSink(client Publisher, level log.Level) *LogSink { return &LogSink{ client: client, logQueue: make(chan log.Entry, 10000),