Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ jobs:
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

integration:
name: Integration Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version-file: go.mod
- name: Run integration tests
run: make test-integration

security:
name: Security Scan
runs-on: ubuntu-latest
Expand Down
4 changes: 4 additions & 0 deletions cmd/skillbox-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/devs-group/skillbox/internal/api"
"github.com/devs-group/skillbox/internal/artifacts"
"github.com/devs-group/skillbox/internal/backfill"
"github.com/devs-group/skillbox/internal/config"
"github.com/devs-group/skillbox/internal/registry"
"github.com/devs-group/skillbox/internal/runner"
Expand Down Expand Up @@ -60,6 +61,9 @@ func main() {
os.Exit(1)
}

// Repoint legacy 0.0.0 skills to 1.0.0 and backfill the active-version pointer.
backfill.SkillVersions(context.Background(), db, reg, slog.Default())

// Initialize artifact collector (MinIO)
collector, err := artifacts.New(cfg.S3Endpoint, cfg.S3AccessKey, cfg.S3SecretKey, cfg.S3BucketExecs, cfg.S3UseSSL)
if err != nil {
Expand Down
94 changes: 94 additions & 0 deletions internal/backfill/skills.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Package backfill holds idempotent boot-time data migrations that need both DB and S3 access.
package backfill

import (
"context"
"log/slog"

"github.com/devs-group/skillbox/internal/store"
)

// LegacyVersion is the placeholder version assigned to skills imported before the version system.
const LegacyVersion = "0.0.0"

// TargetVersion is the semver legacy skills are repointed to.
const TargetVersion = "1.0.0"

// artifactMover is the subset of registry the backfill needs.
type artifactMover interface {
CopyVersion(ctx context.Context, tenantID, skillName, from, to string) error
Delete(ctx context.Context, tenantID, skillName, version string) error
}

// skillStore is the subset of store the backfill needs.
type skillStore interface {
ListSkillsAtVersion(ctx context.Context, version string) ([]store.SkillRef, error)
SkillVersionExists(ctx context.Context, tenantID, name, version string) (bool, error)
RenameSkillVersion(ctx context.Context, tenantID, name, from, to string) error
BackfillActiveVersions(ctx context.Context) (int64, error)
}

// Result summarizes one backfill pass.
type Result struct {
VersionsRenamed int
ActiveBackfilled int64
Skipped int
Failed int
}

// SkillVersions repoints legacy 0.0.0 skills to 1.0.0 (S3 copy + DB rename + old-key delete),
// then backfills the is_active pointer. Idempotent and failure-tolerant: per-skill errors are
// logged and the pass continues, so a later boot retries what's left.
func SkillVersions(ctx context.Context, st skillStore, mv artifactMover, logger *slog.Logger) Result {
if logger == nil {
logger = slog.Default()
}
var res Result

legacy, err := st.ListSkillsAtVersion(ctx, LegacyVersion)
if err != nil {
logger.Error("skill backfill: failed to list legacy skills", "err", err)
}
for _, ref := range legacy {
// A 1.0.0 already present means we can't repoint without colliding; leave the legacy row as-is.
exists, err := st.SkillVersionExists(ctx, ref.TenantID, ref.Name, TargetVersion)
if err != nil {
logger.Error("skill backfill: version-exists check failed", "tenant", ref.TenantID, "name", ref.Name, "err", err)
res.Failed++
continue
}
if exists {
logger.Warn("skill backfill: target version already exists, skipping", "tenant", ref.TenantID, "name", ref.Name, "target", TargetVersion)
res.Skipped++
continue
}
// Copy S3 first; if the DB rename then fails the old key is untouched, so a retry is safe.
if err := mv.CopyVersion(ctx, ref.TenantID, ref.Name, LegacyVersion, TargetVersion); err != nil {
logger.Error("skill backfill: s3 copy failed", "tenant", ref.TenantID, "name", ref.Name, "err", err)
res.Failed++
continue
}
if err := st.RenameSkillVersion(ctx, ref.TenantID, ref.Name, LegacyVersion, TargetVersion); err != nil {
logger.Error("skill backfill: db rename failed, leaving legacy key in place", "tenant", ref.TenantID, "name", ref.Name, "err", err)
res.Failed++
continue
}
if err := mv.Delete(ctx, ref.TenantID, ref.Name, LegacyVersion); err != nil {
logger.Warn("skill backfill: legacy s3 delete failed, version migrated but old object remains", "tenant", ref.TenantID, "name", ref.Name, "err", err)
}
res.VersionsRenamed++
}

n, err := st.BackfillActiveVersions(ctx)
if err != nil {
logger.Error("skill backfill: active-version backfill failed", "err", err)
}
res.ActiveBackfilled = n

logger.Info("skill backfill: complete",
"versions_renamed", res.VersionsRenamed,
"active_backfilled", res.ActiveBackfilled,
"skipped", res.Skipped,
"failed", res.Failed)
return res
}
130 changes: 130 additions & 0 deletions internal/backfill/skills_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
//go:build integration

package backfill

import (
"context"
"os"
"testing"

"github.com/google/uuid"

"github.com/devs-group/skillbox/internal/store"
)

func newStore(t *testing.T) *store.Store {
t.Helper()
dsn := os.Getenv("SKILLBOX_TEST_DB_DSN")
if dsn == "" {
if os.Getenv("REQUIRE_TEST_DB") != "" {
t.Fatal("SKILLBOX_TEST_DB_DSN is required when REQUIRE_TEST_DB is set")
}
t.Skip("SKILLBOX_TEST_DB_DSN not set; skipping integration test")
}
s, err := store.New(dsn)
if err != nil {
if os.Getenv("REQUIRE_TEST_DB") != "" {
t.Fatalf("connect test db: %v", err)
}
t.Skipf("connect test db: %v", err)
}
return s
}

type fakeMover struct {
copied map[string]bool
deleted map[string]bool
}

func newFakeMover() *fakeMover {
return &fakeMover{copied: map[string]bool{}, deleted: map[string]bool{}}
}

func (f *fakeMover) CopyVersion(_ context.Context, tenantID, name, from, to string) error {
f.copied[tenantID+"/"+name+"/"+from+"->"+to] = true
return nil
}

func (f *fakeMover) Delete(_ context.Context, tenantID, name, version string) error {
f.deleted[tenantID+"/"+name+"/"+version] = true
return nil
}

func TestSkillVersions_BackfillRenamesAndActivates(t *testing.T) {
st := newStore(t)
defer st.Close() //nolint:errcheck
ctx := context.Background()
db := st.DB()

tenant := "t-" + uuid.NewString()

// Legacy skill awaiting migration, plus an approval pinned to 0.0.0.
if _, err := db.ExecContext(ctx, `INSERT INTO sandbox.skills (tenant_id, name, version, status, is_active) VALUES ($1,'docx','0.0.0','available',false)`, tenant); err != nil {
t.Fatal(err)
}
var userID string
if err := db.QueryRowContext(ctx, `INSERT INTO sandbox.users (kratos_identity_id, tenant_id, email) VALUES ($1,$2,$3) RETURNING id`, uuid.NewString(), tenant, tenant+"@test.local").Scan(&userID); err != nil {
t.Fatal(err)
}
if _, err := db.ExecContext(ctx, `INSERT INTO sandbox.approval_requests (tenant_id, user_id, skill_name, skill_version, status) VALUES ($1,$2,'docx','0.0.0','pending')`, tenant, userID); err != nil {
t.Fatal(err)
}

// Collision case: both 0.0.0 and 1.0.0 already exist; legacy must be left untouched.
collide := "c-" + uuid.NewString()
if _, err := db.ExecContext(ctx, `INSERT INTO sandbox.skills (tenant_id, name, version, status, is_active) VALUES ($1,'pdf','0.0.0','available',false),($1,'pdf','1.0.0','available',false)`, collide); err != nil {
t.Fatal(err)
}

mv := newFakeMover()
res := SkillVersions(ctx, st, mv, nil)

if res.VersionsRenamed != 1 {
t.Errorf("VersionsRenamed = %d, want 1", res.VersionsRenamed)
}
if res.Skipped != 1 {
t.Errorf("Skipped = %d, want 1", res.Skipped)
}
if res.Failed != 0 {
t.Errorf("Failed = %d, want 0", res.Failed)
}

// Legacy skill repointed to 1.0.0 and made active.
var version string
var active bool
if err := db.QueryRowContext(ctx, `SELECT version, is_active FROM sandbox.skills WHERE tenant_id=$1 AND name='docx'`, tenant).Scan(&version, &active); err != nil {
t.Fatal(err)
}
if version != "1.0.0" || !active {
t.Errorf("docx version=%q active=%v, want 1.0.0/true", version, active)
}

// Approval row repointed.
var apprVersion string
if err := db.QueryRowContext(ctx, `SELECT skill_version FROM sandbox.approval_requests WHERE tenant_id=$1 AND skill_name='docx'`, tenant).Scan(&apprVersion); err != nil {
t.Fatal(err)
}
if apprVersion != "1.0.0" {
t.Errorf("approval version=%q, want 1.0.0", apprVersion)
}

// S3 copy + delete happened for the renamed skill only.
if !mv.copied[tenant+"/docx/0.0.0->1.0.0"] {
t.Error("expected S3 copy for docx")
}
if !mv.deleted[tenant+"/docx/0.0.0"] {
t.Error("expected S3 delete of legacy docx")
}
if mv.copied[collide+"/pdf/0.0.0->1.0.0"] {
t.Error("collision skill must not be copied")
}

// Collision legacy row still 0.0.0; the pre-existing 1.0.0 became active.
var collideLegacy bool
if err := db.QueryRowContext(ctx, `SELECT EXISTS(SELECT 1 FROM sandbox.skills WHERE tenant_id=$1 AND name='pdf' AND version='0.0.0')`, collide).Scan(&collideLegacy); err != nil {
t.Fatal(err)
}
if !collideLegacy {
t.Error("collision legacy 0.0.0 row should remain")
}
}
17 changes: 17 additions & 0 deletions internal/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,23 @@ func (r *Registry) Promote(ctx context.Context, tenantID, skillName, version str
return nil
}

// CopyVersion server-side copies a promoted skill archive to a new version. The source is left in place.
func (r *Registry) CopyVersion(ctx context.Context, tenantID, skillName, from, to string) error {
src := objectPath(tenantID, skillName, from)
dst := objectPath(tenantID, skillName, to)
_, err := r.client.CopyObject(ctx, minio.CopyDestOptions{
Bucket: r.bucket,
Object: dst,
}, minio.CopySrcOptions{
Bucket: r.bucket,
Object: src,
})
if err != nil {
return fmt.Errorf("copy skill version (%s → %s): %w", src, dst, err)
}
return nil
}

// Quarantine deletes a blocked skill's archive from S3; the DB row is retained for display.
func (r *Registry) Quarantine(ctx context.Context, tenantID, skillName, version string) error {
return r.client.RemoveObject(ctx, r.bucket, pendingPath(tenantID, skillName, version), minio.RemoveObjectOptions{})
Expand Down
97 changes: 97 additions & 0 deletions internal/store/skills_backfill.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package store

import (
"context"
"fmt"
)

// SkillRef identifies a skill by tenant and name.
type SkillRef struct {
TenantID string
Name string
}

// ListSkillsAtVersion returns every (tenant, name) that has a row pinned to the given version.
func (s *Store) ListSkillsAtVersion(ctx context.Context, version string) ([]SkillRef, error) {
rows, err := s.conn().QueryContext(ctx, `
SELECT tenant_id, name FROM sandbox.skills WHERE version = $1
`, version)
if err != nil {
return nil, fmt.Errorf("list skills at version: %w", err)
}
defer rows.Close() //nolint:errcheck

var refs []SkillRef
for rows.Next() {
var r SkillRef
if err := rows.Scan(&r.TenantID, &r.Name); err != nil {
return nil, fmt.Errorf("scan skill ref: %w", err)
}
refs = append(refs, r)
}
return refs, rows.Err()
}

// SkillVersionExists reports whether a specific (tenant, name, version) row exists.
func (s *Store) SkillVersionExists(ctx context.Context, tenantID, name, version string) (bool, error) {
var exists bool
err := s.conn().QueryRowContext(ctx, `
SELECT EXISTS(SELECT 1 FROM sandbox.skills WHERE tenant_id = $1 AND name = $2 AND version = $3)
`, tenantID, name, version).Scan(&exists)
if err != nil {
return false, fmt.Errorf("skill version exists: %w", err)
}
return exists, nil
}

// RenameSkillVersion repoints a skill's version in place across the skills table and any
// approval rows that pinned the old version. Runs in one transaction.
func (s *Store) RenameSkillVersion(ctx context.Context, tenantID, name, from, to string) error {
return s.RunInTx(ctx, func(tx *Store) error {
if _, err := tx.conn().ExecContext(ctx, `
UPDATE sandbox.skills SET version = $4
WHERE tenant_id = $1 AND name = $2 AND version = $3
`, tenantID, name, from, to); err != nil {
return fmt.Errorf("rename skill version: %w", err)
}
if _, err := tx.conn().ExecContext(ctx, `
UPDATE sandbox.approval_requests SET skill_version = $4
WHERE tenant_id = $1 AND skill_name = $2 AND skill_version = $3
`, tenantID, name, from, to); err != nil {
return fmt.Errorf("rename approval_requests version: %w", err)
}
if _, err := tx.conn().ExecContext(ctx, `
UPDATE sandbox.tenant_approved_skills SET skill_version = $4
WHERE tenant_id = $1 AND skill_name = $2 AND skill_version = $3
`, tenantID, name, from, to); err != nil {
return fmt.Errorf("rename tenant_approved_skills version: %w", err)
}
return nil
})
}

// BackfillActiveVersions sets is_active on one version per (tenant, name) that has none,
// mirroring ResolveActiveVersion: newest available, else newest by uploaded_at. Idempotent.
func (s *Store) BackfillActiveVersions(ctx context.Context) (int64, error) {
res, err := s.conn().ExecContext(ctx, `
WITH ranked AS (
SELECT tenant_id, name, version,
ROW_NUMBER() OVER (
PARTITION BY tenant_id, name
ORDER BY (status = 'available') DESC, uploaded_at DESC
) AS rn
FROM sandbox.skills s
WHERE NOT EXISTS (
SELECT 1 FROM sandbox.skills a
WHERE a.tenant_id = s.tenant_id AND a.name = s.name AND a.is_active
)
)
UPDATE sandbox.skills t SET is_active = true
FROM ranked r
WHERE t.tenant_id = r.tenant_id AND t.name = r.name AND t.version = r.version AND r.rn = 1
`)
if err != nil {
return 0, fmt.Errorf("backfill active versions: %w", err)
}
return res.RowsAffected()
}
Loading