Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion broker/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ SQL_GEN_OUT_ILL_DB = ill_db/ill_db_gen.go ill_db/ill_models_gen.go ill_db/ill_qu
SQL_GEN_OUT_EVENT = events/event_db_gen.go events/event_models_gen.go events/event_query.sql_gen.go
SQL_GEN_OUT_PR = patron_request/db/pr_db_gen.go patron_request/db/pr_models_gen.go patron_request/db/pr_query.sql_gen.go
SQL_GEN_OUT_PS = pullslip/db/ps_db_gen.go pullslip/db/ps_models_gen.go pullslip/db/ps_query.sql_gen.go
SQL_GEN_OUT = $(SQL_GEN_OUT_ILL_DB) $(SQL_GEN_OUT_EVENT) $(SQL_GEN_OUT_PR) $(SQL_GEN_OUT_PS)
SQL_GEN_OUT_SCHED = scheduler/db/sched_db_gen.go scheduler/db/sched_models_gen.go scheduler/db/sched_query.sql_gen.go
SQL_GEN_OUT = $(SQL_GEN_OUT_ILL_DB) $(SQL_GEN_OUT_EVENT) $(SQL_GEN_OUT_PR) $(SQL_GEN_OUT_PS) $(SQL_GEN_OUT_SCHED)
SQL_GEN_IN = sqlc/*.sql

# OpenAPI
Expand Down
1 change: 1 addition & 0 deletions broker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ Configuration is provided via environment variables:
| | the `{tenant}` token is replaced by the `X-Okapi-Tenant` header value | |
| `SUPPLIER_PATRON_PATTERN` | Pattern used to create patron ID when receiving Request on supplier side | `%v_user` |
| `LANGUAGE` | Language parameter used for ts_vector search in DB | `english` |
| `SCHEDULER_RETRY_DELAY` | Delay for rescheduling failed scheduled task | `5m` |

# Build

Expand Down
20 changes: 20 additions & 0 deletions broker/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
psapi "github.com/indexdata/crosslink/broker/pullslip/api"
ps_db "github.com/indexdata/crosslink/broker/pullslip/db"
psoapi "github.com/indexdata/crosslink/broker/pullslip/oapi"
sched_db "github.com/indexdata/crosslink/broker/scheduler/db"
sched_service "github.com/indexdata/crosslink/broker/scheduler/service"
"github.com/indexdata/crosslink/broker/tenant"

"github.com/dustin/go-humanize"
Expand Down Expand Up @@ -196,6 +198,12 @@ func Init(ctx context.Context) (Context, error) {
if err != nil {
return Context{}, err
}

skdRepo := sched_db.CreateSkdRepo(pool)
if err = StartScheduler(ctx, skdRepo, eventBus); err != nil {
return Context{}, err
}

return Context{
EventBus: eventBus,
IllRepo: illRepo,
Expand Down Expand Up @@ -357,6 +365,18 @@ func StartEventBus(ctx context.Context, eventBus events.EventBus) error {
return nil
}

// StartScheduler creates the scheduler service, begins listening on
// scheduler_channel, and launches the scheduling loop in a background goroutine.
func StartScheduler(ctx context.Context, skdRepo sched_db.SchedRepo, eventBus events.EventBus) error {
extCtx := common.CreateExtCtxWithArgs(ctx, nil)
svc := sched_service.NewSchedulerService(skdRepo, eventBus, ConnectionString)
if err := svc.Listen(extCtx); err != nil {
Comment thread
JanisSaldabols marked this conversation as resolved.
return fmt.Errorf("starting scheduler listener failed: %w", err)
}
go svc.Run(extCtx)
return nil
}

func HandleHealthz(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("OK"))
}
Expand Down
1 change: 1 addition & 0 deletions broker/events/eventmodels.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type EventDomain string
const (
EventDomainPatronRequest EventDomain = "PATRON_REQUEST"
EventDomainIllTransaction EventDomain = "ILL_TRANSACTION"
EventDomainScheduler EventDomain = "SCHEDULER"
)

type EventName string
Expand Down
1 change: 1 addition & 0 deletions broker/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ require (
github.com/jackc/pgx/v5 v5.9.2
github.com/lib/pq v1.12.3
github.com/oapi-codegen/runtime v1.4.0
github.com/robfig/cron/v3 v3.0.1
github.com/stretchr/testify v1.11.1
github.com/testcontainers/testcontainers-go v0.42.0
github.com/testcontainers/testcontainers-go/modules/postgres v0.42.0
Expand Down
2 changes: 2 additions & 0 deletions broker/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 h1:o4JXh1EVt
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/riza-io/grpc-go v0.2.0 h1:2HxQKFVE7VuYstcJ8zqpN84VnAoJ4dCL6YFhJewNcHQ=
github.com/riza-io/grpc-go v0.2.0/go.mod h1:2bDvR9KkKC3KhtlSHfR3dAXjUMT86kg4UfWFyVGWqi8=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
Expand Down
2 changes: 2 additions & 0 deletions broker/migrations/038_add_scheduler.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP INDEX IF EXISTS idx_scheduled_task_run_at;
DROP TABLE IF EXISTS scheduled_task;
14 changes: 14 additions & 0 deletions broker/migrations/038_add_scheduler.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE TABLE scheduled_task
(
id TEXT PRIMARY KEY,
event_name TEXT NOT NULL,
cron_expr TEXT NOT NULL,
payload JSONB,
run_at TIMESTAMPTZ,
status TEXT NOT NULL DEFAULT 'pending',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ,
FOREIGN KEY (event_name) REFERENCES event_config (event_name)
);

CREATE INDEX idx_scheduled_task_run_at ON scheduled_task (run_at) WHERE status = 'pending' AND run_at IS NOT NULL;
9 changes: 9 additions & 0 deletions broker/scheduler/db/models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package sched_db

type ScheduledTaskStatus string

const (
ScheduledTaskStatusPending ScheduledTaskStatus = "pending"
ScheduledTaskStatusRunning ScheduledTaskStatus = "running"
ScheduledTaskStatusStopped ScheduledTaskStatus = "stopped"
)
86 changes: 86 additions & 0 deletions broker/scheduler/db/repo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package sched_db

import (
"time"

"github.com/indexdata/crosslink/broker/common"
"github.com/indexdata/crosslink/broker/repo"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
)

const SchedulerChannel = "crosslink_sched_channel"

type SchedRepo interface {
repo.Transactional[SchedRepo]
SaveScheduledTask(ctx common.ExtendedContext, params SaveScheduledTaskParams) (ScheduledTask, error)
ClaimNextScheduledTask(ctx common.ExtendedContext) (ScheduledTask, error)
GetNextRunAt(ctx common.ExtendedContext) (pgtype.Timestamptz, error)
GetStuckRunningTasks(ctx common.ExtendedContext, stuckAfter time.Duration) ([]ScheduledTask, error)
}

type PgSchedRepo struct {
repo.PgBaseRepo[SchedRepo]
queries Queries
}

// WithTxFunc delegates transaction handling to PgBaseRepo.
func (r *PgSchedRepo) WithTxFunc(ctx common.ExtendedContext, fn func(SchedRepo) error) error {
return r.PgBaseRepo.WithTxFunc(ctx, r, fn)
}

// CreateWithPgBaseRepo creates a derived repo bound to the provided tx-aware base.
func (r *PgSchedRepo) CreateWithPgBaseRepo(base *repo.PgBaseRepo[SchedRepo]) SchedRepo {
derived := new(PgSchedRepo)
derived.PgBaseRepo = *base
return derived
}

// CreateSkdRepo creates a new SchedRepo backed by the given connection pool.
func CreateSkdRepo(dbPool *pgxpool.Pool) SchedRepo {
r := new(PgSchedRepo)
r.Pool = dbPool
return r
}

func (r *PgSchedRepo) SaveScheduledTask(ctx common.ExtendedContext, params SaveScheduledTaskParams) (ScheduledTask, error) {
row, err := r.queries.SaveScheduledTask(ctx, r.GetConnOrTx(), params)
if err == nil {
r.notify(ctx)
}
return row.ScheduledTask, err
}

func (r *PgSchedRepo) ClaimNextScheduledTask(ctx common.ExtendedContext) (ScheduledTask, error) {
row, err := r.queries.ClaimNextScheduledTask(ctx, r.GetConnOrTx())
return row.ScheduledTask, err
}

func (r *PgSchedRepo) GetNextRunAt(ctx common.ExtendedContext) (pgtype.Timestamptz, error) {
return r.queries.GetNextRunAt(ctx, r.GetConnOrTx())
}

// GetStuckRunningTasks returns tasks that have been in 'running' state for
// longer than stuckAfter, indicating they were claimed but never completed.
func (r *PgSchedRepo) GetStuckRunningTasks(ctx common.ExtendedContext, stuckAfter time.Duration) ([]ScheduledTask, error) {
rows, err := r.queries.GetStuckRunningTasks(ctx, r.GetConnOrTx(), pgDuration(stuckAfter))
if err != nil {
return nil, err
}
tasks := make([]ScheduledTask, 0, len(rows))
for _, row := range rows {
tasks = append(tasks, row.ScheduledTask)
}
return tasks, nil
}

func pgDuration(d time.Duration) pgtype.Interval {
return pgtype.Interval{Microseconds: d.Microseconds(), Valid: true}
}

func (r *PgSchedRepo) notify(ctx common.ExtendedContext) {
_, err := r.GetConnOrTx().Exec(ctx, "NOTIFY "+SchedulerChannel)
if err != nil {
ctx.Logger().Error("failed to notify scheduler channel", "channel", SchedulerChannel, "error", err)
}
}
Loading
Loading