Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion broker/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ SQL_GEN_OUT_ILL_DB = ill_db/ill_db_gen.go ill_db/ill_models_gen.go ill_db/ill_qu
SQL_GEN_OUT_EVENT = events/event_db_gen.go events/event_models_gen.go events/event_query.sql_gen.go
SQL_GEN_OUT_PR = patron_request/db/pr_db_gen.go patron_request/db/pr_models_gen.go patron_request/db/pr_query.sql_gen.go
SQL_GEN_OUT_PS = pullslip/db/ps_db_gen.go pullslip/db/ps_models_gen.go pullslip/db/ps_query.sql_gen.go
SQL_GEN_OUT = $(SQL_GEN_OUT_ILL_DB) $(SQL_GEN_OUT_EVENT) $(SQL_GEN_OUT_PR) $(SQL_GEN_OUT_PS)
SQL_GEN_OUT_SCHED = scheduler/db/sched_db_gen.go scheduler/db/sched_models_gen.go scheduler/db/sched_query.sql_gen.go
SQL_GEN_OUT = $(SQL_GEN_OUT_ILL_DB) $(SQL_GEN_OUT_EVENT) $(SQL_GEN_OUT_PR) $(SQL_GEN_OUT_PS) $(SQL_GEN_OUT_SCHED)
SQL_GEN_IN = sqlc/*.sql

# OpenAPI
Expand Down
1 change: 1 addition & 0 deletions broker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ Configuration is provided via environment variables:
| | the `{tenant}` token is replaced by the `X-Okapi-Tenant` header value | |
| `SUPPLIER_PATRON_PATTERN` | Pattern used to create patron ID when receiving Request on supplier side | `%v_user` |
| `LANGUAGE` | Language parameter used for ts_vector search in DB | `english` |
| `SCHEDULER_RETRY_DELAY` | Delay for rescheduling failed scheduled task | `5m` |

# Build

Expand Down
20 changes: 20 additions & 0 deletions broker/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
psapi "github.com/indexdata/crosslink/broker/pullslip/api"
ps_db "github.com/indexdata/crosslink/broker/pullslip/db"
psoapi "github.com/indexdata/crosslink/broker/pullslip/oapi"
sched_db "github.com/indexdata/crosslink/broker/scheduler/db"
sched_service "github.com/indexdata/crosslink/broker/scheduler/service"
"github.com/indexdata/crosslink/broker/tenant"

"github.com/dustin/go-humanize"
Expand Down Expand Up @@ -196,6 +198,12 @@ func Init(ctx context.Context) (Context, error) {
if err != nil {
return Context{}, err
}

skdRepo := sched_db.CreateSkdRepo(pool)
if err = StartScheduler(ctx, skdRepo, eventBus); err != nil {
return Context{}, err
}

return Context{
EventBus: eventBus,
IllRepo: illRepo,
Expand Down Expand Up @@ -357,6 +365,18 @@ func StartEventBus(ctx context.Context, eventBus events.EventBus) error {
return nil
}

// StartScheduler creates the scheduler service, begins listening on
// sched_db.SchedulerChannel, and launches the scheduling loop in a background goroutine.
func StartScheduler(ctx context.Context, skdRepo sched_db.SchedRepo, eventBus events.EventBus) error {
extCtx := common.CreateExtCtxWithArgs(ctx, nil)
svc := sched_service.NewSchedulerService(skdRepo, eventBus, ConnectionString)
if err := svc.Listen(extCtx); err != nil {
Comment thread
JanisSaldabols marked this conversation as resolved.
return fmt.Errorf("starting scheduler listener failed: %w", err)
}
go svc.Run(extCtx)
return nil
}

func HandleHealthz(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("OK"))
}
Expand Down
1 change: 1 addition & 0 deletions broker/events/eventmodels.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type EventDomain string
const (
EventDomainPatronRequest EventDomain = "PATRON_REQUEST"
EventDomainIllTransaction EventDomain = "ILL_TRANSACTION"
EventDomainScheduler EventDomain = "SCHEDULER"
)

type EventName string
Expand Down
1 change: 1 addition & 0 deletions broker/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ require (
github.com/jackc/pgx/v5 v5.9.2
github.com/lib/pq v1.12.3
github.com/oapi-codegen/runtime v1.4.0
github.com/robfig/cron/v3 v3.0.1
github.com/stretchr/testify v1.11.1
github.com/testcontainers/testcontainers-go v0.42.0
github.com/testcontainers/testcontainers-go/modules/postgres v0.42.0
Expand Down
2 changes: 2 additions & 0 deletions broker/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 h1:o4JXh1EVt
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/riza-io/grpc-go v0.2.0 h1:2HxQKFVE7VuYstcJ8zqpN84VnAoJ4dCL6YFhJewNcHQ=
github.com/riza-io/grpc-go v0.2.0/go.mod h1:2bDvR9KkKC3KhtlSHfR3dAXjUMT86kg4UfWFyVGWqi8=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
Expand Down
2 changes: 2 additions & 0 deletions broker/migrations/038_add_scheduler.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP INDEX IF EXISTS idx_scheduled_task_run_at;
DROP TABLE IF EXISTS scheduled_task;
14 changes: 14 additions & 0 deletions broker/migrations/038_add_scheduler.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE TABLE scheduled_task
(
id TEXT PRIMARY KEY,
event_name TEXT NOT NULL,
cron_expr TEXT NOT NULL,
payload JSONB,
run_at TIMESTAMPTZ,
status TEXT NOT NULL DEFAULT 'pending',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ,
FOREIGN KEY (event_name) REFERENCES event_config (event_name)
);

CREATE INDEX idx_scheduled_task_run_at ON scheduled_task (run_at) WHERE status = 'pending' AND run_at IS NOT NULL;
9 changes: 9 additions & 0 deletions broker/scheduler/db/models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package sched_db

type ScheduledTaskStatus string

const (
ScheduledTaskStatusPending ScheduledTaskStatus = "pending"
ScheduledTaskStatusRunning ScheduledTaskStatus = "running"
ScheduledTaskStatusStopped ScheduledTaskStatus = "stopped"
)
86 changes: 86 additions & 0 deletions broker/scheduler/db/repo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package sched_db

import (
"time"

"github.com/indexdata/crosslink/broker/common"
"github.com/indexdata/crosslink/broker/repo"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
)

const SchedulerChannel = "crosslink_sched_channel"

type SchedRepo interface {
repo.Transactional[SchedRepo]
SaveScheduledTask(ctx common.ExtendedContext, params SaveScheduledTaskParams) (ScheduledTask, error)
ClaimNextScheduledTask(ctx common.ExtendedContext) (ScheduledTask, error)
GetNextRunAt(ctx common.ExtendedContext) (pgtype.Timestamptz, error)
GetStuckRunningTasks(ctx common.ExtendedContext, stuckAfter time.Duration) ([]ScheduledTask, error)
}

type PgSchedRepo struct {
repo.PgBaseRepo[SchedRepo]
queries Queries
}

// WithTxFunc delegates transaction handling to PgBaseRepo.
func (r *PgSchedRepo) WithTxFunc(ctx common.ExtendedContext, fn func(SchedRepo) error) error {
return r.PgBaseRepo.WithTxFunc(ctx, r, fn)
}

// CreateWithPgBaseRepo creates a derived repo bound to the provided tx-aware base.
func (r *PgSchedRepo) CreateWithPgBaseRepo(base *repo.PgBaseRepo[SchedRepo]) SchedRepo {
derived := new(PgSchedRepo)
derived.PgBaseRepo = *base
return derived
}

// CreateSkdRepo creates a new SchedRepo backed by the given connection pool.
func CreateSkdRepo(dbPool *pgxpool.Pool) SchedRepo {
r := new(PgSchedRepo)
r.Pool = dbPool
return r
}

Comment on lines +39 to +45
func (r *PgSchedRepo) SaveScheduledTask(ctx common.ExtendedContext, params SaveScheduledTaskParams) (ScheduledTask, error) {
row, err := r.queries.SaveScheduledTask(ctx, r.GetConnOrTx(), params)
if err == nil {
r.notify(ctx)
}
return row.ScheduledTask, err
}

func (r *PgSchedRepo) ClaimNextScheduledTask(ctx common.ExtendedContext) (ScheduledTask, error) {
row, err := r.queries.ClaimNextScheduledTask(ctx, r.GetConnOrTx())
return row.ScheduledTask, err
}

func (r *PgSchedRepo) GetNextRunAt(ctx common.ExtendedContext) (pgtype.Timestamptz, error) {
return r.queries.GetNextRunAt(ctx, r.GetConnOrTx())
}

// GetStuckRunningTasks returns tasks that have been in 'running' state for
// longer than stuckAfter, indicating they were claimed but never completed.
func (r *PgSchedRepo) GetStuckRunningTasks(ctx common.ExtendedContext, stuckAfter time.Duration) ([]ScheduledTask, error) {
rows, err := r.queries.GetStuckRunningTasks(ctx, r.GetConnOrTx(), pgDuration(stuckAfter))
if err != nil {
return nil, err
}
tasks := make([]ScheduledTask, 0, len(rows))
for _, row := range rows {
tasks = append(tasks, row.ScheduledTask)
}
return tasks, nil
}

func pgDuration(d time.Duration) pgtype.Interval {
return pgtype.Interval{Microseconds: d.Microseconds(), Valid: true}
}

func (r *PgSchedRepo) notify(ctx common.ExtendedContext) {
_, err := r.GetConnOrTx().Exec(ctx, "NOTIFY "+SchedulerChannel)
if err != nil {
ctx.Logger().Error("failed to notify scheduler channel", "channel", SchedulerChannel, "error", err)
}
}
Loading
Loading