Skip to content
This repository was archived by the owner on Aug 17, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions backend/admin/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Config struct {

type Service struct {
env *EnvironmentManager
timelineClient *timelineclient.Client
timelineClient *timelineclient.RealClient
schemaClient ftlv1connect.SchemaServiceClient
source *schemaeventsource.EventSource
storage *oci.ArtefactService
Expand Down Expand Up @@ -86,7 +86,7 @@ func NewAdminService(
source *schemaeventsource.EventSource,
storage *oci.ArtefactService,
routes *routing.VerbCallRouter,
timelineClient *timelineclient.Client,
timelineClient *timelineclient.RealClient,
waitFor []string,
) *Service {
return &Service{
Expand Down
4 changes: 2 additions & 2 deletions backend/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Config struct {

type Service struct {
schemaEventSource *schemaeventsource.EventSource
timelineClient *timelineclient.Client
timelineClient *timelineclient.RealClient
adminClient admin.EnvironmentClient
callClient routing.CallClient
buildEngineClient buildenginepbconnect.BuildEngineServiceClient
Expand All @@ -55,7 +55,7 @@ var _ consolepbconnect.ConsoleServiceHandler = (*Service)(nil)

func New(
eventSource *schemaeventsource.EventSource,
timelineClient *timelineclient.Client,
timelineClient *timelineclient.RealClient,
adminClient admin.EnvironmentClient,
client routing.CallClient,
buildEngineClient buildenginepbconnect.BuildEngineServiceClient,
Expand Down
8 changes: 4 additions & 4 deletions backend/cron/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type Config struct {
}

// Start the cron service. Blocks until the context is cancelled.
func Start(ctx context.Context, config Config, eventSource *schemaeventsource.EventSource, client routing.CallClient, timelineClient *timelineclient.Client) error {
func Start(ctx context.Context, config Config, eventSource *schemaeventsource.EventSource, client routing.CallClient, timelineClient timelineclient.Publisher) error {
logger := log.FromContext(ctx).Scope("cron")
ctx = log.ContextWithLogger(ctx, logger)
// Map of cron jobs for each module.
Expand Down Expand Up @@ -153,7 +153,7 @@ func Start(ctx context.Context, config Config, eventSource *schemaeventsource.Ev
return nil
}

func executeJob(ctx context.Context, state *statemachine.SingleQueryHandle[struct{}, CronState, CronEvent], client routing.CallClient, job *cronJob, timelineClient *timelineclient.Client) error {
func executeJob(ctx context.Context, state *statemachine.SingleQueryHandle[struct{}, CronState, CronEvent], client routing.CallClient, job *cronJob, timelineClient timelineclient.Publisher) error {
logger := log.FromContext(ctx).Scope("cron").Module(job.module)
logger.Debugf("Executing cron job %s", job)

Expand Down Expand Up @@ -249,7 +249,7 @@ func scheduleNext(cronQueue []*cronJob) (time.Duration, bool) {
return next, true
}

func updateCronJobs(ctx context.Context, cronJobs map[string][]*cronJob, change schema.Notification, timelineClient *timelineclient.Client) error {
func updateCronJobs(ctx context.Context, cronJobs map[string][]*cronJob, change schema.Notification, timelineClient timelineclient.Publisher) error {
logger := log.FromContext(ctx).Scope("cron")

// Track jobs before the update to detect changes
Expand Down Expand Up @@ -297,7 +297,7 @@ func updateCronJobs(ctx context.Context, cronJobs map[string][]*cronJob, change
return nil
}

func publishNewOrChangedJobs(ctx context.Context, newJobs []*cronJob, oldJobs map[string]*cronJob, timelineClient *timelineclient.Client) {
func publishNewOrChangedJobs(ctx context.Context, newJobs []*cronJob, oldJobs map[string]*cronJob, timelineClient timelineclient.Publisher) {
for _, job := range newJobs {
oldJob, exists := oldJobs[job.Key()]
// Publish event if job is new or if the schedule has changed
Expand Down
4 changes: 2 additions & 2 deletions backend/ingress/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ type service struct {
// Complete schema synchronised from the database.
view *atomic.Value[materialisedView]
client routing.CallClient
timelineClient *timelineclient.Client
timelineClient timelineclient.Publisher
routeTable *routing.RouteTable
urlPrefix string
}

// Start the HTTP ingress service. Blocks until the context is cancelled.
func Start(ctx context.Context, bind *url.URL, config Config, eventSource *schemaeventsource.EventSource, client routing.CallClient, timelineClient *timelineclient.Client) error {
func Start(ctx context.Context, bind *url.URL, config Config, eventSource *schemaeventsource.EventSource, client routing.CallClient, timelineClient timelineclient.Publisher) error {
logger := log.FromContext(ctx).Scope("http-ingress")
ctx = log.ContextWithLogger(ctx, logger)
svc := &service{
Expand Down
2 changes: 1 addition & 1 deletion backend/provisioner/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func Start(
ctx context.Context,
registry *ProvisionerRegistry,
schemaClient schemaconnect.SchemaServiceClient,
timelineClient *timeline.Client,
timelineClient timeline.Publisher,
) error {
timelineLogSink := timeline.NewLogSink(timelineClient, log.Debug)
go timelineLogSink.RunLogLoop(ctx)
Expand Down
4 changes: 2 additions & 2 deletions backend/runner/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Service struct {
deploymentContextProvider deploymentcontext.DeploymentContextProvider
controllerLeaseService ftlleaseconnect.LeaseServiceClient
moduleVerbService *xsync.MapOf[string, moduleVerbService]
timelineClient *timelineclient.Client
timelineClient timelineclient.Publisher
queryService *query.Service
localModuleName string
schema *schema.Module
Expand All @@ -49,7 +49,7 @@ type Service struct {

func New(controllerModuleService deploymentcontext.DeploymentContextProvider,
leaseClient ftlleaseconnect.LeaseServiceClient,
timelineClient *timelineclient.Client,
timelineClient timelineclient.Publisher,
queryService *query.Service,
schema *schema.Module,
localDeployment key.Deployment,
Expand Down
4 changes: 2 additions & 2 deletions backend/runner/pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ type consumer struct {
deadLetterPublisher optional.Option[*publisher]

verbClient VerbClient
timelineClient *timelineclient.Client
timelineClient timelineclient.Publisher

claimedPartitionsChan chan partitionEvent
}

func newConsumer(moduleName string, verb *schema.Verb, subscriber *schema.MetadataSubscriber, deployment key.Deployment,
deadLetterPublisher optional.Option[*publisher], verbClient VerbClient, timelineClient *timelineclient.Client) (*consumer, error) {
deadLetterPublisher optional.Option[*publisher], verbClient VerbClient, timelineClient timelineclient.Publisher) (*consumer, error) {
if verb.Runtime == nil {
return nil, errors.Errorf("subscription %s has no runtime", verb.Name)
}
Expand Down
4 changes: 2 additions & 2 deletions backend/runner/pubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ type publisher struct {
topic *schema.Topic
producer sarama.SyncProducer

timelineClient *timelineclient.Client
timelineClient timelineclient.Publisher
}

func newPublisher(module string, t *schema.Topic, deployment key.Deployment, timelineClient *timelineclient.Client) (*publisher, error) {
func newPublisher(module string, t *schema.Topic, deployment key.Deployment, timelineClient timelineclient.Publisher) (*publisher, error) {
if t.Runtime == nil {
return nil, errors.Errorf("topic %s has no runtime", t.Name)
}
Expand Down
2 changes: 1 addition & 1 deletion backend/runner/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type VerbClient interface {
var _ pubsubpbconnect.PublishServiceHandler = (*Service)(nil)
var _ pubsubpbconnect.PubSubAdminServiceHandler = (*Service)(nil)

func New(module *schema.Module, deployment key.Deployment, verbClient VerbClient, timelineClient *timelineclient.Client) (*Service, error) {
func New(module *schema.Module, deployment key.Deployment, verbClient VerbClient, timelineClient timelineclient.Publisher) (*Service, error) {
publishers := map[string]*publisher{}
for t := range sl.FilterVariants[*schema.Topic](module.Decls) {
publisher, err := newPublisher(module.Name, t, deployment, timelineClient)
Expand Down
2 changes: 1 addition & 1 deletion backend/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ type Service struct {
deploymentProvider oci.DeploymentArtefactProvider
deploymentContextProvider deploymentcontext.DeploymentContextProvider
schema *schema.Module
timelineClient *timeline.Client
timelineClient timeline.Publisher
timelineLogSink *timeline.LogSink
// Failed to register with the Controller
registrationFailure atomic.Value[optional.Option[error]]
Expand Down
6 changes: 3 additions & 3 deletions backend/schemaservice/schemaservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Config struct {
type Service struct {
State *statemachine.SingleQueryHandle[struct{}, SchemaState, EventWrapper]
Config Config
timelineClient *timelineclient.Client
timelineClient timelineclient.Publisher
receiverClient optional.Option[ftlv1connect.SchemaMirrorServiceClient] // TODO: rename as mirror
devMode bool
creationLock sync.Mutex
Expand Down Expand Up @@ -65,7 +65,7 @@ func (s *Service) GetDeployment(ctx context.Context, c *connect.Request[ftlv1.Ge

var _ ftlv1connect.SchemaServiceHandler = (*Service)(nil)

func NewLocalService(ctx context.Context, config Config, timelineClient *timelineclient.Client, devMode bool) *Service {
func NewLocalService(ctx context.Context, config Config, timelineClient timelineclient.Publisher, devMode bool) *Service {
s := &Service{
State: statemachine.NewSingleQueryHandle(statemachine.NewLocalHandle(newStateMachine(ctx, "")), struct{}{}),
Config: config,
Expand All @@ -85,7 +85,7 @@ func (s *Service) StartServices(context.Context) ([]rpc.Option, error) {
func New(
ctx context.Context,
config Config,
timelineClient *timelineclient.Client,
timelineClient timelineclient.Publisher,
receiverClient optional.Option[ftlv1connect.SchemaMirrorServiceClient],
realm string,
devMode bool,
Expand Down
3 changes: 0 additions & 3 deletions backend/timeline/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ func TestTimeline(t *testing.T) {
in.Call("publisher", "publish", in.Obj{}, func(t testing.TB, resp in.Obj) {}),

in.SubTests(
in.SubTest{Name: "Limit", Action: in.VerifyTimeline(1, []*timelinepb.TimelineQuery_Filter{}, func(ctx context.Context, t testing.TB, events []*timelinepb.Event) {
assert.Equal(t, 1, len(events))
})},
in.SubTest{Name: "IngressEvent", Action: in.VerifyTimeline(1000, []*timelinepb.TimelineQuery_Filter{
{
Filter: &timelinepb.TimelineQuery_Filter_EventTypes{
Expand Down
2 changes: 1 addition & 1 deletion cmd/ftl/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func makeBindContext(logger *log.Logger, cancel context.CancelCauseFunc, csm *cu
kctx.Bind(csm)
kctx.Bind(&cli.SharedCLI)

err = kctx.BindToProvider(func() (*timelineclient.Client, error) {
err = kctx.BindToProvider(func() (*timelineclient.RealClient, error) {
return timelineclient.NewClient(ctx, cli.TimelineConfig), nil
})
kctx.FatalIfErrorf(err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/ftl/cmd_replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (c *replayCmd) Run(
ctx context.Context,
verbClient ftlv1connect.VerbServiceClient,
eventSource *schemaeventsource.EventSource,
timelineClient *timelineclient.Client,
timelineClient *timelineclient.RealClient,
) error {
// Wait timeout is for both pings to complete, not each ping individually
startTime := time.Now()
Expand Down
6 changes: 3 additions & 3 deletions internal/routing/verb_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type CallClient interface {
type VerbCallRouter struct {
routingTable *RouteTable
moduleClients *xsync.MapOf[string, optional.Option[ftlv1connect.VerbServiceClient]]
timelineClient *timelineclient.Client
timelineClient timelineclient.Publisher
}

func (s *VerbCallRouter) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) {
Expand Down Expand Up @@ -89,7 +89,7 @@ func (s *VerbCallRouter) Call(ctx context.Context, req *connect.Request[ftlv1.Ca
return resp, nil
}

func NewVerbRouterFromTable(ctx context.Context, routeTable *RouteTable, timelineClient *timelineclient.Client) *VerbCallRouter {
func NewVerbRouterFromTable(ctx context.Context, routeTable *RouteTable, timelineClient timelineclient.Publisher) *VerbCallRouter {
svc := &VerbCallRouter{
routingTable: routeTable,
moduleClients: xsync.NewMapOf[string, optional.Option[ftlv1connect.VerbServiceClient]](),
Expand All @@ -106,7 +106,7 @@ func NewVerbRouterFromTable(ctx context.Context, routeTable *RouteTable, timelin
}()
return svc
}
func NewVerbRouter(ctx context.Context, changes *schemaeventsource.EventSource, timelineClient *timelineclient.Client) *VerbCallRouter {
func NewVerbRouter(ctx context.Context, changes *schemaeventsource.EventSource, timelineClient timelineclient.Publisher) *VerbCallRouter {
return NewVerbRouterFromTable(ctx, New(ctx, changes), timelineClient)
}

Expand Down
22 changes: 14 additions & 8 deletions internal/timelineclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,21 @@ type Config struct {
TimelineEndpoint *url.URL `help:"Timeline endpoint (discard:// to disable)." env:"FTL_TIMELINE_ENDPOINT" default:"http://127.0.0.1:8892"`
}

type Client struct {
type Publisher interface {
Publish(ctx context.Context, event Event)
}

type RealClient struct {
timelinepbconnect.TimelineServiceClient

entries chan *timelinepb.CreateEventsRequest_EventEntry
lastDroppedError atomic.Value[time.Time]
lastFailedError atomic.Value[time.Time]
}

func (c *Client) Ping(context.Context, *connect.Request[v1.PingRequest]) (*connect.Response[v1.PingResponse], error) {
var _ Publisher = &RealClient{}

func (c *RealClient) Ping(context.Context, *connect.Request[v1.PingRequest]) (*connect.Response[v1.PingResponse], error) {
return connect.NewResponse(&v1.PingResponse{}), nil
}

Expand All @@ -46,12 +52,12 @@ var NullConfig = Config{
//
// If endpoint is discard:// the client will not create an RPC client or send any RPC requests, and all events
// will be immediately discarded.
func NewClient(ctx context.Context, config Config) *Client {
func NewClient(ctx context.Context, config Config) *RealClient {
var c timelinepbconnect.TimelineServiceClient
if config.TimelineEndpoint.Scheme != "discard" {
c = rpc.Dial(timelinepbconnect.NewTimelineServiceClient, config.TimelineEndpoint.String(), log.Error)
}
client := &Client{
client := &RealClient{
TimelineServiceClient: c,
entries: make(chan *timelinepb.CreateEventsRequest_EventEntry, 1000),
}
Expand All @@ -70,7 +76,7 @@ type Event interface {
}

// Publish asynchronously enqueues an event for publication to the timeline.
func (c *Client) Publish(ctx context.Context, event Event) {
func (c *RealClient) Publish(ctx context.Context, event Event) {
entry, err := event.ToEntry()
entry.Timestamp = timestamppb.New(time.Now())
if err != nil {
Expand All @@ -87,7 +93,7 @@ func (c *Client) Publish(ctx context.Context, event Event) {
}
}

func (c *Client) noopEvents(ctx context.Context) {
func (c *RealClient) noopEvents(ctx context.Context) {
for {
select {
case <-c.entries:
Expand All @@ -97,7 +103,7 @@ func (c *Client) noopEvents(ctx context.Context) {
}
}

func (c *Client) processEvents(ctx context.Context) {
func (c *RealClient) processEvents(ctx context.Context) {
lastFlush := time.Now()
buffer := make([]*timelinepb.CreateEventsRequest_EventEntry, 0, maxBatchSize)
for {
Expand Down Expand Up @@ -125,7 +131,7 @@ func (c *Client) processEvents(ctx context.Context) {
}

// Flush all events in the buffer to the timeline service in a single call.
func (c *Client) flushEvents(ctx context.Context, entries []*timelinepb.CreateEventsRequest_EventEntry) {
func (c *RealClient) flushEvents(ctx context.Context, entries []*timelinepb.CreateEventsRequest_EventEntry) {
logger := log.FromContext(ctx).Scope("timeline")
_, err := c.CreateEvents(ctx, connect.NewRequest(&timelinepb.CreateEventsRequest{
Entries: entries,
Expand Down
4 changes: 2 additions & 2 deletions internal/timelineclient/log_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ import (
//
// It needs to be run in a separate goroutine after creation by calling RunLogLoop.
type LogSink struct {
client *Client
client Publisher
logQueue chan log.Entry
level log.Level
}

var _ log.Sink = (*LogSink)(nil)

func NewLogSink(client *Client, level log.Level) *LogSink {
func NewLogSink(client Publisher, level log.Level) *LogSink {
return &LogSink{
client: client,
logQueue: make(chan log.Entry, 10000),
Expand Down
Loading