diff --git a/apps/console/src/pages/iam/organizations/settings/AuditLogSettingsPage.tsx b/apps/console/src/pages/iam/organizations/settings/AuditLogSettingsPage.tsx index 586093869..8074468da 100644 --- a/apps/console/src/pages/iam/organizations/settings/AuditLogSettingsPage.tsx +++ b/apps/console/src/pages/iam/organizations/settings/AuditLogSettingsPage.tsx @@ -3,7 +3,13 @@ import { useTranslate } from "@probo/i18n"; import { Badge, Button, + Dialog, + DialogContent, + DialogFooter, + Field, + IconArrowDown, IconChevronDown, + Input, Spinner, Table, Tbody, @@ -11,15 +17,20 @@ import { Th, Thead, Tr, + useDialogRef, + useToast, } from "@probo/ui"; +import { useState } from "react"; import { graphql, type PreloadedQuery, useFragment, + useMutation, usePaginationFragment, usePreloadedQuery, } from "react-relay"; +import type { AuditLogSettingsPageExportMutation } from "#/__generated__/iam/AuditLogSettingsPageExportMutation.graphql"; import type { AuditLogSettingsPageFragment$key } from "#/__generated__/iam/AuditLogSettingsPageFragment.graphql"; import type { AuditLogSettingsPageQuery } from "#/__generated__/iam/AuditLogSettingsPageQuery.graphql"; import type { AuditLogSettingsPageRefetchQuery } from "#/__generated__/iam/AuditLogSettingsPageRefetchQuery.graphql"; @@ -30,6 +41,8 @@ export const auditLogSettingsPageQuery = graphql` organization: node(id: $organizationId) @required(action: THROW) { __typename ... on Organization { + id + canExportAuditLog: permission(action: "iam:audit-log:export") ...AuditLogSettingsPageFragment } } @@ -75,6 +88,16 @@ const auditLogEntryRowFragment = graphql` } `; +const exportMutation = graphql` + mutation AuditLogSettingsPageExportMutation( + $input: RequestAuditLogExportInput! + ) { + requestAuditLogExport(input: $input) { + logExportId + } + } +`; + function ActorTypeBadge({ type }: { type: string }) { switch (type) { case "USER": @@ -156,6 +179,112 @@ function AuditLogEntryRow({ ); } +function ExportAuditLogDialog({ + organizationId, +}: { + organizationId: string; +}) { + const { __ } = useTranslate(); + const { toast } = useToast(); + const dialogRef = useDialogRef(); + const [fromDate, setFromDate] = useState(""); + const [toDate, setToDate] = useState(""); + const [commitExport, isExporting] = useMutation(exportMutation); + + const handleExport = () => { + if (!fromDate || !toDate) return; + + commitExport({ + variables: { + input: { + organizationId, + fromTime: new Date(`${fromDate}T00:00:00Z`).toISOString(), + toTime: new Date(Date.parse(`${toDate}T00:00:00Z`) + 24 * 60 * 60 * 1000).toISOString(), + }, + }, + onCompleted: (_response, errors) => { + if (errors) { + toast({ + title: __("Error"), + description: __("Failed to request audit log export."), + variant: "error", + }); + return; + } + toast({ + title: __("Success"), + description: __("Export started. You will receive an email with a download link when it is ready."), + variant: "success", + }); + dialogRef.current?.close(); + setFromDate(""); + setToDate(""); + }, + onError: () => { + toast({ + title: __("Error"), + description: __("Failed to request audit log export."), + variant: "error", + }); + }, + }); + }; + + return ( + <> + + + +

+ {__("Select a date range to export audit log entries as JSONL. You will receive an email with a download link.")} +

+ + setFromDate(e.target.value)} + required + /> + + + setToDate(e.target.value)} + required + /> + +
+ + + +
+ + ); +} + export function AuditLogSettingsPage(props: { queryRef: PreloadedQuery; }) { @@ -180,13 +309,18 @@ export function AuditLogSettingsPage(props: { return (
-
-

{__("Audit Log")}

-

- {__( - "A record of all actions performed in your organization. Entries are immutable and cannot be modified or deleted.", - )} -

+
+
+

{__("Audit Log")}

+

+ {__( + "A record of all actions performed in your organization. Entries are immutable and cannot be modified or deleted.", + )} +

+
+ {organization.canExportAuditLog && ( + + )}
{entries.length === 0 diff --git a/e2e/console/audit_log_test.go b/e2e/console/audit_log_test.go index 71c5d71fc..ad34a0789 100644 --- a/e2e/console/audit_log_test.go +++ b/e2e/console/audit_log_test.go @@ -247,6 +247,74 @@ func TestAuditLog_RBAC(t *testing.T) { }) } +func TestAuditLog_Export(t *testing.T) { + t.Parallel() + owner := testutil.NewClient(t, testutil.RoleOwner) + + const mutation = ` + mutation($input: RequestAuditLogExportInput!) { + requestAuditLogExport(input: $input) { + logExportId + } + } + ` + + t.Run("owner can request export", func(t *testing.T) { + t.Parallel() + + var result struct { + RequestAuditLogExport struct { + LogExportID string `json:"logExportId"` + } `json:"requestAuditLogExport"` + } + + err := owner.Execute(mutation, map[string]any{ + "input": map[string]any{ + "organizationId": owner.GetOrganizationID().String(), + "fromTime": "2026-01-01T00:00:00Z", + "toTime": "2026-03-24T00:00:00Z", + }, + }, &result) + require.NoError(t, err) + assert.NotEmpty(t, result.RequestAuditLogExport.LogExportID) + }) + + t.Run("admin can request export", func(t *testing.T) { + t.Parallel() + admin := testutil.NewClientInOrg(t, testutil.RoleAdmin, owner) + + var result struct { + RequestAuditLogExport struct { + LogExportID string `json:"logExportId"` + } `json:"requestAuditLogExport"` + } + + err := admin.Execute(mutation, map[string]any{ + "input": map[string]any{ + "organizationId": admin.GetOrganizationID().String(), + "fromTime": "2026-01-01T00:00:00Z", + "toTime": "2026-03-24T00:00:00Z", + }, + }, &result) + require.NoError(t, err) + assert.NotEmpty(t, result.RequestAuditLogExport.LogExportID) + }) + + t.Run("viewer cannot request export", func(t *testing.T) { + t.Parallel() + viewer := testutil.NewClientInOrg(t, testutil.RoleViewer, owner) + + _, err := viewer.Do(mutation, map[string]any{ + "input": map[string]any{ + "organizationId": viewer.GetOrganizationID().String(), + "fromTime": "2026-01-01T00:00:00Z", + "toTime": "2026-03-24T00:00:00Z", + }, + }) + testutil.RequireForbiddenError(t, err, "viewer cannot request audit log export") + }) +} + func TestAuditLog_TenantIsolation(t *testing.T) { t.Parallel() diff --git a/e2e/console/scim_event_test.go b/e2e/console/scim_event_test.go new file mode 100644 index 000000000..bc52421b0 --- /dev/null +++ b/e2e/console/scim_event_test.go @@ -0,0 +1,91 @@ +// Copyright (c) 2026 Probo Inc . +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice appear in all copies. +// +// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH +// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +// AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, +// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM +// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR +// OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR +// PERFORMANCE OF THIS SOFTWARE. + +package console_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.probo.inc/probo/e2e/internal/testutil" +) + +func TestSCIMEvent_Export(t *testing.T) { + t.Parallel() + owner := testutil.NewClient(t, testutil.RoleOwner) + + const mutation = ` + mutation($input: RequestSCIMEventExportInput!) { + requestSCIMEventExport(input: $input) { + logExportId + } + } + ` + + t.Run("owner can request export", func(t *testing.T) { + t.Parallel() + + var result struct { + RequestSCIMEventExport struct { + LogExportID string `json:"logExportId"` + } `json:"requestSCIMEventExport"` + } + + err := owner.Execute(mutation, map[string]any{ + "input": map[string]any{ + "organizationId": owner.GetOrganizationID().String(), + "fromTime": "2026-01-01T00:00:00Z", + "toTime": "2026-03-24T00:00:00Z", + }, + }, &result) + require.NoError(t, err) + assert.NotEmpty(t, result.RequestSCIMEventExport.LogExportID) + }) + + t.Run("admin can request export", func(t *testing.T) { + t.Parallel() + admin := testutil.NewClientInOrg(t, testutil.RoleAdmin, owner) + + var result struct { + RequestSCIMEventExport struct { + LogExportID string `json:"logExportId"` + } `json:"requestSCIMEventExport"` + } + + err := admin.Execute(mutation, map[string]any{ + "input": map[string]any{ + "organizationId": admin.GetOrganizationID().String(), + "fromTime": "2026-01-01T00:00:00Z", + "toTime": "2026-03-24T00:00:00Z", + }, + }, &result) + require.NoError(t, err) + assert.NotEmpty(t, result.RequestSCIMEventExport.LogExportID) + }) + + t.Run("viewer cannot request export", func(t *testing.T) { + t.Parallel() + viewer := testutil.NewClientInOrg(t, testutil.RoleViewer, owner) + + _, err := viewer.Do(mutation, map[string]any{ + "input": map[string]any{ + "organizationId": viewer.GetOrganizationID().String(), + "fromTime": "2026-01-01T00:00:00Z", + "toTime": "2026-03-24T00:00:00Z", + }, + }) + testutil.RequireForbiddenError(t, err, "viewer cannot request SCIM event export") + }) +} diff --git a/packages/emails/emails.go b/packages/emails/emails.go index 0d10da726..831bc2b1c 100644 --- a/packages/emails/emails.go +++ b/packages/emails/emails.go @@ -222,6 +222,7 @@ const ( subjectMailingListSubscription = "%s – Confirm Your Compliance Updates Subscription" subjectMailingListUnsubscription = "%s – You've been unsubscribed" subjectMailingListUpdates = "%s – %s" + subjectLogExport = "Your log export is ready" ) var ( @@ -251,6 +252,8 @@ var ( mailingListUnsubscriptionTextTemplate = texttemplate.Must(texttemplate.ParseFS(Templates, "dist/mailing-list-unsubscription.txt.tmpl")) mailingListUpdatesHTMLTemplate = htmltemplate.Must(htmltemplate.ParseFS(Templates, "dist/mailing-list-updates.html.tmpl")) mailingListUpdatesTextTemplate = texttemplate.Must(texttemplate.ParseFS(Templates, "dist/mailing-list-updates.txt.tmpl")) + logExportHTMLTemplate = htmltemplate.Must(htmltemplate.ParseFS(Templates, "dist/log-export.html.tmpl")) + logExportTextTemplate = texttemplate.Must(texttemplate.ParseFS(Templates, "dist/log-export.txt.tmpl")) ) func (p *Presenter) getCommonVariables(ctx context.Context) (*CommonVariables, error) { @@ -414,6 +417,24 @@ func (p *Presenter) RenderFrameworkExport(ctx context.Context, downloadUrl strin return subjectFrameworkExport, textBody, htmlBody, err } +func (p *Presenter) RenderLogExport(ctx context.Context, downloadUrl string) (subject string, textBody string, htmlBody *string, err error) { + vars, err := p.getCommonVariables(ctx) + if err != nil { + return "", "", nil, fmt.Errorf("cannot get common variables: %w", err) + } + + data := struct { + *CommonVariables + DownloadUrl string + }{ + CommonVariables: vars, + DownloadUrl: downloadUrl, + } + + textBody, htmlBody, err = renderEmail(logExportTextTemplate, logExportHTMLTemplate, data) + return subjectLogExport, textBody, htmlBody, err +} + func (p *Presenter) RenderTrustCenterAccess(ctx context.Context, organizationName string) (subject string, textBody string, htmlBody *string, err error) { vars, err := p.getCommonVariables(ctx) if err != nil { diff --git a/packages/emails/scripts/build.ts b/packages/emails/scripts/build.ts index 5d491a503..934131d8b 100644 --- a/packages/emails/scripts/build.ts +++ b/packages/emails/scripts/build.ts @@ -16,6 +16,7 @@ import TrustCenterDocumentAccessRejected from "../src/TrustCenterDocumentAccessR import ElectronicSignatureCertificate from "../src/ElectronicSignatureCertificate"; import MailingListSubscription from "../src/MailingListSubscription"; import MailingListUnsubscription from "../src/MailingListUnsubscription"; +import LogExport from "../src/LogExport"; import MagicLink from "../src/MagicLink"; const __filename = fileURLToPath(import.meta.url); @@ -79,6 +80,10 @@ const templates: TemplateConfig[] = [ name: "mailing-list-updates", render: () => MailingListUpdates(), }, + { + name: "log-export", + render: () => LogExport(), + }, ]; async function build() { diff --git a/packages/emails/src/LogExport.tsx b/packages/emails/src/LogExport.tsx new file mode 100644 index 000000000..f1e789dff --- /dev/null +++ b/packages/emails/src/LogExport.tsx @@ -0,0 +1,25 @@ +import { Button, Section, Text } from '@react-email/components'; +import * as React from 'react'; +import EmailLayout, { bodyText, button, buttonContainer, footerText } from './components/EmailLayout'; + +export const LogExport = () => { + return ( + + + Your log export has been completed successfully. Click the button below to download it: + + +
+ +
+ + + This link will expire in 24 hours. + +
+ ); +}; + +export default LogExport; diff --git a/packages/emails/templates/log-export.txt b/packages/emails/templates/log-export.txt new file mode 100644 index 000000000..d11a830ef --- /dev/null +++ b/packages/emails/templates/log-export.txt @@ -0,0 +1,12 @@ +Probo + +Hi {{.RecipientFullName}}, + +Your log export has been completed successfully. Click the link below to download it: + +{{.DownloadUrl}} + +This link will expire in 24 hours. + +{{.SenderCompanyHeadquarterAddress}} +Powered By Probo diff --git a/pkg/cmd/auditlog/audit_log.go b/pkg/cmd/auditlog/audit_log.go index 888b231ab..6f99c0efa 100644 --- a/pkg/cmd/auditlog/audit_log.go +++ b/pkg/cmd/auditlog/audit_log.go @@ -16,6 +16,7 @@ package auditlog import ( "github.com/spf13/cobra" + "go.probo.inc/probo/pkg/cmd/auditlog/export" "go.probo.inc/probo/pkg/cmd/auditlog/list" "go.probo.inc/probo/pkg/cmd/auditlog/view" "go.probo.inc/probo/pkg/cmd/cmdutil" @@ -27,6 +28,7 @@ func NewCmdAuditLog(f *cmdutil.Factory) *cobra.Command { Short: "Manage audit log entries", } + cmd.AddCommand(export.NewCmdExport(f)) cmd.AddCommand(list.NewCmdList(f)) cmd.AddCommand(view.NewCmdView(f)) diff --git a/pkg/cmd/auditlog/export/export.go b/pkg/cmd/auditlog/export/export.go new file mode 100644 index 000000000..7d5cb2688 --- /dev/null +++ b/pkg/cmd/auditlog/export/export.go @@ -0,0 +1,119 @@ +// Copyright (c) 2026 Probo Inc . +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice appear in all copies. +// +// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH +// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +// AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, +// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM +// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR +// OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR +// PERFORMANCE OF THIS SOFTWARE. + +package export + +import ( + "encoding/json" + "fmt" + + "github.com/spf13/cobra" + "go.probo.inc/probo/pkg/cli/api" + "go.probo.inc/probo/pkg/cmd/cmdutil" +) + +const exportMutation = ` +mutation($input: RequestAuditLogExportInput!) { + requestAuditLogExport(input: $input) { + logExportId + } +} +` + +func NewCmdExport(f *cmdutil.Factory) *cobra.Command { + var ( + flagOrg string + flagFrom string + flagTo string + ) + + cmd := &cobra.Command{ + Use: "export", + Short: "Export audit log entries", + Example: ` prb audit-log export --org --from 2026-01-01T00:00:00Z --to 2026-02-01T00:00:00Z + prb audit-log export --from 2026-03-01T00:00:00Z --to 2026-03-24T00:00:00Z`, + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + cfg, err := f.Config() + if err != nil { + return err + } + + host, hc, err := cfg.DefaultHost() + if err != nil { + return err + } + + client := api.NewClient( + host, + hc.Token, + "/api/console/v1/graphql", + cfg.HTTPTimeoutDuration(), + ) + + if flagOrg == "" { + flagOrg = hc.Organization + } + + if flagOrg == "" { + return fmt.Errorf("organization is required; pass --org or set a default with 'prb auth login'") + } + + if flagFrom == "" { + return fmt.Errorf("--from is required (RFC3339 timestamp)") + } + + if flagTo == "" { + return fmt.Errorf("--to is required (RFC3339 timestamp)") + } + + variables := map[string]any{ + "input": map[string]any{ + "organizationId": flagOrg, + "fromTime": flagFrom, + "toTime": flagTo, + }, + } + + data, err := client.Do(exportMutation, variables) + if err != nil { + return err + } + + var resp struct { + RequestAuditLogExport struct { + LogExportID string `json:"logExportId"` + } `json:"requestAuditLogExport"` + } + + if err := json.Unmarshal(data, &resp); err != nil { + return err + } + + _, _ = fmt.Fprintf( + f.IOStreams.Out, + "Audit log export requested %s\nYou will receive an email with a download link when the export is ready.\n", + resp.RequestAuditLogExport.LogExportID, + ) + + return nil + }, + } + + cmd.Flags().StringVar(&flagOrg, "org", "", "Organization ID") + cmd.Flags().StringVar(&flagFrom, "from", "", "Start time in RFC3339 format (e.g. 2026-01-01T00:00:00Z)") + cmd.Flags().StringVar(&flagTo, "to", "", "End time in RFC3339 format (e.g. 2026-02-01T00:00:00Z)") + + return cmd +} diff --git a/pkg/cmd/root/root.go b/pkg/cmd/root/root.go index 557385d11..4887cf1df 100644 --- a/pkg/cmd/root/root.go +++ b/pkg/cmd/root/root.go @@ -29,6 +29,7 @@ import ( "go.probo.inc/probo/pkg/cmd/framework" "go.probo.inc/probo/pkg/cmd/org" "go.probo.inc/probo/pkg/cmd/risk" + "go.probo.inc/probo/pkg/cmd/scimevent" "go.probo.inc/probo/pkg/cmd/soa" "go.probo.inc/probo/pkg/cmd/user" "go.probo.inc/probo/pkg/cmd/version" @@ -77,6 +78,7 @@ func NewCmdRoot(f *cmdutil.Factory) *cobra.Command { cmd.AddCommand(framework.NewCmdFramework(f)) cmd.AddCommand(org.NewCmdOrg(f)) cmd.AddCommand(risk.NewCmdRisk(f)) + cmd.AddCommand(scimevent.NewCmdSCIMEvent(f)) cmd.AddCommand(soa.NewCmdSoa(f)) cmd.AddCommand(user.NewCmdUser(f)) cmd.AddCommand(version.NewCmdVersion(f)) diff --git a/pkg/cmd/scimevent/export/export.go b/pkg/cmd/scimevent/export/export.go new file mode 100644 index 000000000..0853b32b8 --- /dev/null +++ b/pkg/cmd/scimevent/export/export.go @@ -0,0 +1,119 @@ +// Copyright (c) 2026 Probo Inc . +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice appear in all copies. +// +// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH +// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +// AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, +// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM +// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR +// OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR +// PERFORMANCE OF THIS SOFTWARE. + +package export + +import ( + "encoding/json" + "fmt" + + "github.com/spf13/cobra" + "go.probo.inc/probo/pkg/cli/api" + "go.probo.inc/probo/pkg/cmd/cmdutil" +) + +const exportMutation = ` +mutation($input: RequestSCIMEventExportInput!) { + requestSCIMEventExport(input: $input) { + logExportId + } +} +` + +func NewCmdExport(f *cmdutil.Factory) *cobra.Command { + var ( + flagOrg string + flagFrom string + flagTo string + ) + + cmd := &cobra.Command{ + Use: "export", + Short: "Export SCIM events", + Example: ` prb scim-event export --org --from 2026-01-01T00:00:00Z --to 2026-02-01T00:00:00Z + prb scim-event export --from 2026-03-01T00:00:00Z --to 2026-03-24T00:00:00Z`, + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + cfg, err := f.Config() + if err != nil { + return err + } + + host, hc, err := cfg.DefaultHost() + if err != nil { + return err + } + + client := api.NewClient( + host, + hc.Token, + "/api/console/v1/graphql", + cfg.HTTPTimeoutDuration(), + ) + + if flagOrg == "" { + flagOrg = hc.Organization + } + + if flagOrg == "" { + return fmt.Errorf("organization is required; pass --org or set a default with 'prb auth login'") + } + + if flagFrom == "" { + return fmt.Errorf("--from is required (RFC3339 timestamp)") + } + + if flagTo == "" { + return fmt.Errorf("--to is required (RFC3339 timestamp)") + } + + variables := map[string]any{ + "input": map[string]any{ + "organizationId": flagOrg, + "fromTime": flagFrom, + "toTime": flagTo, + }, + } + + data, err := client.Do(exportMutation, variables) + if err != nil { + return err + } + + var resp struct { + RequestSCIMEventExport struct { + LogExportID string `json:"logExportId"` + } `json:"requestSCIMEventExport"` + } + + if err := json.Unmarshal(data, &resp); err != nil { + return err + } + + _, _ = fmt.Fprintf( + f.IOStreams.Out, + "SCIM event export requested %s\nYou will receive an email with a download link when the export is ready.\n", + resp.RequestSCIMEventExport.LogExportID, + ) + + return nil + }, + } + + cmd.Flags().StringVar(&flagOrg, "org", "", "Organization ID") + cmd.Flags().StringVar(&flagFrom, "from", "", "Start time in RFC3339 format (e.g. 2026-01-01T00:00:00Z)") + cmd.Flags().StringVar(&flagTo, "to", "", "End time in RFC3339 format (e.g. 2026-02-01T00:00:00Z)") + + return cmd +} diff --git a/pkg/cmd/scimevent/scim_event.go b/pkg/cmd/scimevent/scim_event.go new file mode 100644 index 000000000..1b79da1c3 --- /dev/null +++ b/pkg/cmd/scimevent/scim_event.go @@ -0,0 +1,32 @@ +// Copyright (c) 2026 Probo Inc . +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice appear in all copies. +// +// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH +// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +// AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, +// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM +// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR +// OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR +// PERFORMANCE OF THIS SOFTWARE. + +package scimevent + +import ( + "github.com/spf13/cobra" + "go.probo.inc/probo/pkg/cmd/cmdutil" + "go.probo.inc/probo/pkg/cmd/scimevent/export" +) + +func NewCmdSCIMEvent(f *cmdutil.Factory) *cobra.Command { + cmd := &cobra.Command{ + Use: "scim-event ", + Short: "Manage SCIM events", + } + + cmd.AddCommand(export.NewCmdExport(f)) + + return cmd +} diff --git a/pkg/coredata/audit_log_entry.go b/pkg/coredata/audit_log_entry.go index 3277151cd..47b61196f 100644 --- a/pkg/coredata/audit_log_entry.go +++ b/pkg/coredata/audit_log_entry.go @@ -19,6 +19,7 @@ import ( "encoding/json" "errors" "fmt" + "iter" "maps" "time" @@ -214,6 +215,81 @@ WHERE return nil } +func AuditLogEntriesByOrganizationIDAndTimeRange( + ctx context.Context, + conn pg.Conn, + scope Scoper, + organizationID gid.GID, + fromTime time.Time, + toTime time.Time, +) iter.Seq2[*AuditLogEntry, error] { + q := ` +DECLARE audit_log_export_cursor CURSOR FOR +SELECT + id, + organization_id, + actor_id, + actor_type, + action, + resource_type, + resource_id, + metadata, + created_at +FROM + audit_log_entries +WHERE + %s + AND organization_id = @organization_id + AND created_at >= @from_time + AND created_at < @to_time +ORDER BY + created_at ASC +` + q = fmt.Sprintf(q, scope.SQLFragment()) + + args := pgx.StrictNamedArgs{ + "organization_id": organizationID, + "from_time": fromTime, + "to_time": toTime, + } + maps.Copy(args, scope.SQLArguments()) + + return func(yield func(*AuditLogEntry, error) bool) { + if _, err := conn.Exec(ctx, q, args); err != nil { + yield(nil, fmt.Errorf("cannot declare audit log cursor: %w", err)) + return + } + + defer func() { + _, _ = conn.Exec(ctx, "CLOSE audit_log_export_cursor") + }() + + for { + rows, err := conn.Query(ctx, "FETCH 500 FROM audit_log_export_cursor") + if err != nil { + yield(nil, fmt.Errorf("cannot fetch audit log entries: %w", err)) + return + } + + entries, err := pgx.CollectRows(rows, pgx.RowToAddrOfStructByName[AuditLogEntry]) + if err != nil { + yield(nil, fmt.Errorf("cannot collect audit log entries: %w", err)) + return + } + + if len(entries) == 0 { + return + } + + for _, entry := range entries { + if !yield(entry, nil) { + return + } + } + } + } +} + func (es *AuditLogEntries) CountByOrganizationID( ctx context.Context, conn pg.Conn, diff --git a/pkg/coredata/entity_type_reg.go b/pkg/coredata/entity_type_reg.go index 1fcefb412..3657d8b7c 100644 --- a/pkg/coredata/entity_type_reg.go +++ b/pkg/coredata/entity_type_reg.go @@ -92,6 +92,7 @@ const ( MailingListUpdateEntityType uint16 = 66 FindingEntityType uint16 = 67 AuditLogEntryEntityType uint16 = 68 + LogExportEntityType uint16 = 69 ) func NewEntityFromID(id gid.GID) (any, bool) { @@ -226,6 +227,8 @@ func NewEntityFromID(id gid.GID) (any, bool) { return &MailingListUpdate{ID: id}, true case AuditLogEntryEntityType: return &AuditLogEntry{ID: id}, true + case LogExportEntityType: + return &LogExport{ID: id}, true default: return nil, false } diff --git a/pkg/coredata/log_export.go b/pkg/coredata/log_export.go new file mode 100644 index 000000000..0bd36f569 --- /dev/null +++ b/pkg/coredata/log_export.go @@ -0,0 +1,258 @@ +// Copyright (c) 2026 Probo Inc . +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice appear in all copies. +// +// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH +// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +// AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, +// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM +// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR +// OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR +// PERFORMANCE OF THIS SOFTWARE. + +package coredata + +import ( + "context" + "errors" + "fmt" + "maps" + "time" + + "github.com/jackc/pgx/v5" + "go.gearno.de/kit/pg" + "go.probo.inc/probo/pkg/gid" + "go.probo.inc/probo/pkg/mail" +) + +type ( + LogExport struct { + ID gid.GID `db:"id"` + OrganizationID gid.GID `db:"organization_id"` + Type LogExportType `db:"type"` + Status LogExportStatus `db:"status"` + FromTime time.Time `db:"from_time"` + ToTime time.Time `db:"to_time"` + FileID *gid.GID `db:"file_id"` + Error *string `db:"error"` + RecipientEmail mail.Addr `db:"recipient_email"` + RecipientName string `db:"recipient_name"` + CreatedAt time.Time `db:"created_at"` + StartedAt *time.Time `db:"started_at"` + CompletedAt *time.Time `db:"completed_at"` + } + + LogExports []*LogExport +) + +func (le *LogExport) AuthorizationAttributes(ctx context.Context, conn pg.Conn) (map[string]string, error) { + q := `SELECT organization_id FROM log_exports WHERE id = $1 LIMIT 1;` + + var organizationID gid.GID + if err := conn.QueryRow(ctx, q, le.ID).Scan(&organizationID); err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return nil, ErrResourceNotFound + } + return nil, fmt.Errorf("cannot query log export authorization attributes: %w", err) + } + + return map[string]string{"organization_id": organizationID.String()}, nil +} + +func (le *LogExport) Insert( + ctx context.Context, + conn pg.Conn, + scope Scoper, +) error { + q := ` +INSERT INTO log_exports ( + id, + tenant_id, + organization_id, + type, + status, + from_time, + to_time, + recipient_email, + recipient_name, + created_at +) VALUES ( + @id, + @tenant_id, + @organization_id, + @type, + @status, + @from_time, + @to_time, + @recipient_email, + @recipient_name, + @created_at +)` + args := pgx.StrictNamedArgs{ + "id": le.ID, + "tenant_id": scope.GetTenantID(), + "organization_id": le.OrganizationID, + "type": le.Type, + "status": le.Status, + "from_time": le.FromTime, + "to_time": le.ToTime, + "recipient_email": le.RecipientEmail, + "recipient_name": le.RecipientName, + "created_at": le.CreatedAt, + } + _, err := conn.Exec(ctx, q, args) + return err +} + +func (le *LogExport) Update( + ctx context.Context, + conn pg.Conn, + scope Scoper, +) error { + q := ` +UPDATE + log_exports +SET + status = @status, + error = @error, + file_id = @file_id, + started_at = @started_at, + completed_at = @completed_at +WHERE + %s + AND id = @id +` + q = fmt.Sprintf(q, scope.SQLFragment()) + args := pgx.StrictNamedArgs{ + "status": le.Status, + "error": le.Error, + "file_id": le.FileID, + "started_at": le.StartedAt, + "completed_at": le.CompletedAt, + "id": le.ID, + } + maps.Copy(args, scope.SQLArguments()) + _, err := conn.Exec(ctx, q, args) + return err +} + +func (le *LogExport) LoadByID( + ctx context.Context, + conn pg.Conn, + scope Scoper, + id gid.GID, +) error { + q := ` +SELECT + id, + organization_id, + type, + status, + from_time, + to_time, + file_id, + error, + recipient_email, + recipient_name, + created_at, + started_at, + completed_at +FROM + log_exports +WHERE + %s + AND id = @id +` + q = fmt.Sprintf(q, scope.SQLFragment()) + args := pgx.StrictNamedArgs{"id": id} + maps.Copy(args, scope.SQLArguments()) + + rows, err := conn.Query(ctx, q, args) + if err != nil { + return err + } + + le2, err := pgx.CollectExactlyOneRow(rows, pgx.RowToStructByName[LogExport]) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return ErrResourceNotFound + } + return fmt.Errorf("cannot collect log export: %w", err) + } + + *le = le2 + return nil +} + +func (le *LogExport) LoadNextPendingForUpdateSkipLocked( + ctx context.Context, + conn pg.Conn, +) error { + q := ` +SELECT + id, + organization_id, + type, + status, + from_time, + to_time, + file_id, + error, + recipient_email, + recipient_name, + created_at, + started_at, + completed_at +FROM + log_exports +WHERE + status = @status +ORDER BY + created_at ASC +LIMIT 1 +FOR UPDATE SKIP LOCKED +` + args := pgx.StrictNamedArgs{ + "status": LogExportStatusPending, + } + rows, err := conn.Query(ctx, q, args) + if err != nil { + return err + } + + le2, err := pgx.CollectExactlyOneRow(rows, pgx.RowToStructByName[LogExport]) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return ErrResourceNotFound + } + return fmt.Errorf("cannot collect log export: %w", err) + } + + *le = le2 + return nil +} + +func ResetStaleLogExports( + ctx context.Context, + conn pg.Conn, + staleAfter time.Duration, +) error { + q := ` +UPDATE log_exports +SET + status = @pending_status, + started_at = NULL +WHERE + status = @processing_status + AND started_at < @stale_threshold +` + args := pgx.StrictNamedArgs{ + "pending_status": LogExportStatusPending, + "processing_status": LogExportStatusProcessing, + "stale_threshold": time.Now().Add(-staleAfter), + } + _, err := conn.Exec(ctx, q, args) + return err +} diff --git a/pkg/coredata/log_export_status.go b/pkg/coredata/log_export_status.go new file mode 100644 index 000000000..9151097eb --- /dev/null +++ b/pkg/coredata/log_export_status.go @@ -0,0 +1,65 @@ +// Copyright (c) 2026 Probo Inc . +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice appear in all copies. +// +// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH +// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +// AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, +// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM +// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR +// OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR +// PERFORMANCE OF THIS SOFTWARE. + +package coredata + +import ( + "database/sql/driver" + "fmt" +) + +type ( + LogExportStatus string +) + +const ( + LogExportStatusPending LogExportStatus = "PENDING" + LogExportStatusProcessing LogExportStatus = "PROCESSING" + LogExportStatusCompleted LogExportStatus = "COMPLETED" + LogExportStatusFailed LogExportStatus = "FAILED" +) + +func (s LogExportStatus) String() string { + return string(s) +} + +func (s *LogExportStatus) Scan(value any) error { + var str string + switch v := value.(type) { + case string: + str = v + case []byte: + str = string(v) + default: + return fmt.Errorf("unsupported type for LogExportStatus: %T", value) + } + + switch str { + case LogExportStatusPending.String(): + *s = LogExportStatusPending + case LogExportStatusProcessing.String(): + *s = LogExportStatusProcessing + case LogExportStatusCompleted.String(): + *s = LogExportStatusCompleted + case LogExportStatusFailed.String(): + *s = LogExportStatusFailed + default: + return fmt.Errorf("invalid LogExportStatus value: %q", str) + } + return nil +} + +func (s LogExportStatus) Value() (driver.Value, error) { + return s.String(), nil +} diff --git a/pkg/coredata/log_export_type.go b/pkg/coredata/log_export_type.go new file mode 100644 index 000000000..4bba4f717 --- /dev/null +++ b/pkg/coredata/log_export_type.go @@ -0,0 +1,59 @@ +// Copyright (c) 2026 Probo Inc . +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice appear in all copies. +// +// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH +// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +// AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, +// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM +// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR +// OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR +// PERFORMANCE OF THIS SOFTWARE. + +package coredata + +import ( + "database/sql/driver" + "fmt" +) + +type ( + LogExportType string +) + +const ( + LogExportTypeAuditLog LogExportType = "AUDIT_LOG" + LogExportTypeSCIMEvent LogExportType = "SCIM_EVENT" +) + +func (t LogExportType) String() string { + return string(t) +} + +func (t *LogExportType) Scan(value any) error { + var s string + switch v := value.(type) { + case string: + s = v + case []byte: + s = string(v) + default: + return fmt.Errorf("unsupported type for LogExportType: %T", value) + } + + switch s { + case LogExportTypeAuditLog.String(): + *t = LogExportTypeAuditLog + case LogExportTypeSCIMEvent.String(): + *t = LogExportTypeSCIMEvent + default: + return fmt.Errorf("invalid LogExportType value: %q", s) + } + return nil +} + +func (t LogExportType) Value() (driver.Value, error) { + return t.String(), nil +} diff --git a/pkg/coredata/migrations/20260325T120000Z.sql b/pkg/coredata/migrations/20260325T120000Z.sql new file mode 100644 index 000000000..5585274a5 --- /dev/null +++ b/pkg/coredata/migrations/20260325T120000Z.sql @@ -0,0 +1,19 @@ +CREATE TABLE log_exports ( + id TEXT NOT NULL PRIMARY KEY, + tenant_id TEXT NOT NULL, + organization_id TEXT NOT NULL, + type TEXT NOT NULL, + status TEXT NOT NULL, + from_time TIMESTAMPTZ NOT NULL, + to_time TIMESTAMPTZ NOT NULL, + file_id TEXT, + error TEXT, + recipient_email TEXT NOT NULL, + recipient_name TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL, + started_at TIMESTAMPTZ, + completed_at TIMESTAMPTZ +); + +CREATE INDEX idx_log_exports_pending ON log_exports (status, created_at) + WHERE status = 'PENDING'; diff --git a/pkg/coredata/scim_event.go b/pkg/coredata/scim_event.go index 6c0a8fd18..70ceb42ec 100644 --- a/pkg/coredata/scim_event.go +++ b/pkg/coredata/scim_event.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "iter" "maps" "net" "time" @@ -183,6 +184,84 @@ INSERT INTO iam_scim_events ( return nil } +func SCIMEventsByOrganizationIDAndTimeRange( + ctx context.Context, + conn pg.Conn, + scope Scoper, + organizationID gid.GID, + fromTime time.Time, + toTime time.Time, +) iter.Seq2[*SCIMEvent, error] { + q := ` +DECLARE scim_event_export_cursor CURSOR FOR +SELECT + id, + organization_id, + scim_configuration_id, + method, + path, + request_body, + response_body, + status_code, + error_message, + user_name, + ip_address, + created_at +FROM + iam_scim_events +WHERE + %s + AND organization_id = @organization_id + AND created_at >= @from_time + AND created_at < @to_time +ORDER BY + created_at ASC +` + q = fmt.Sprintf(q, scope.SQLFragment()) + + args := pgx.StrictNamedArgs{ + "organization_id": organizationID, + "from_time": fromTime, + "to_time": toTime, + } + maps.Copy(args, scope.SQLArguments()) + + return func(yield func(*SCIMEvent, error) bool) { + if _, err := conn.Exec(ctx, q, args); err != nil { + yield(nil, fmt.Errorf("cannot declare scim event cursor: %w", err)) + return + } + + defer func() { + _, _ = conn.Exec(ctx, "CLOSE scim_event_export_cursor") + }() + + for { + rows, err := conn.Query(ctx, "FETCH 500 FROM scim_event_export_cursor") + if err != nil { + yield(nil, fmt.Errorf("cannot fetch scim events: %w", err)) + return + } + + events, err := pgx.CollectRows(rows, pgx.RowToAddrOfStructByName[SCIMEvent]) + if err != nil { + yield(nil, fmt.Errorf("cannot collect scim events: %w", err)) + return + } + + if len(events) == 0 { + return + } + + for _, event := range events { + if !yield(event, nil) { + return + } + } + } + } +} + func (s *SCIMEvents) LoadByOrganizationID( ctx context.Context, conn pg.Conn, diff --git a/pkg/iam/iam_actions.go b/pkg/iam/iam_actions.go index 4a44acc4b..3cb864b2a 100644 --- a/pkg/iam/iam_actions.go +++ b/pkg/iam/iam_actions.go @@ -96,4 +96,8 @@ const ( // Audit log entry actions ActionAuditLogEntryGet = "iam:audit-log-entry:get" ActionAuditLogEntryList = "iam:audit-log-entry:list" + + // Log export actions + ActionAuditLogExport = "iam:audit-log:export" + ActionSCIMEventExport = "iam:scim-event:export" ) diff --git a/pkg/iam/iam_policies.go b/pkg/iam/iam_policies.go index 2b007e19e..dc79d9062 100644 --- a/pkg/iam/iam_policies.go +++ b/pkg/iam/iam_policies.go @@ -208,9 +208,15 @@ var IAMOwnerPolicy = policy.NewPolicy( policy.Allow( ActionAuditLogEntryGet, ActionAuditLogEntryList, + ActionAuditLogExport, ). WithSID("audit-log-entry-access"). When(policy.Equals("principal.organization_id", "resource.organization_id")), + + // Can export SCIM events (scoped to own organization) + policy.Allow(ActionSCIMEventExport). + WithSID("scim-event-export-access"). + When(policy.Equals("principal.organization_id", "resource.organization_id")), ). WithDescription("Full IAM access for organization owners") @@ -310,15 +316,21 @@ var IAMAdminPolicy = policy.NewPolicy( ). WithSID("deny-scim-management"), - // Can view audit log entries (scoped to own organization) + // Can view and export audit log entries (scoped to own organization) policy.Allow( ActionAuditLogEntryGet, ActionAuditLogEntryList, + ActionAuditLogExport, ). WithSID("audit-log-entry-admin-access"). When( policy.Equals("principal.organization_id", "resource.organization_id"), ), + + // Can export SCIM events (scoped to own organization) + policy.Allow(ActionSCIMEventExport). + WithSID("scim-event-export-admin-access"). + When(policy.Equals("principal.organization_id", "resource.organization_id")), ). WithDescription("IAM admin access - can manage members but cannot delete organization or manage SAML/SCIM") diff --git a/pkg/iam/log_export_worker.go b/pkg/iam/log_export_worker.go new file mode 100644 index 000000000..9e5d92023 --- /dev/null +++ b/pkg/iam/log_export_worker.go @@ -0,0 +1,369 @@ +// Copyright (c) 2026 Probo Inc . +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice appear in all copies. +// +// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH +// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +// AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, +// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM +// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR +// OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR +// PERFORMANCE OF THIS SOFTWARE. + +package iam + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "sync" + "time" + + "go.gearno.de/crypto/uuid" + "go.gearno.de/kit/log" + "go.gearno.de/kit/pg" + emails "go.probo.inc/probo/packages/emails" + "go.probo.inc/probo/pkg/coredata" + "go.probo.inc/probo/pkg/filemanager" + "go.probo.inc/probo/pkg/gid" +) + +const ( + logExportEmailExpiresIn = 24 * time.Hour +) + +type ( + LogExportWorker struct { + pg *pg.Client + fm *filemanager.Service + bucket string + baseURL string + logger *log.Logger + interval time.Duration + staleAfter time.Duration + maxConcurrency int + } + + LogExportWorkerOption func(*LogExportWorker) +) + +func WithLogExportWorkerInterval(d time.Duration) LogExportWorkerOption { + return func(w *LogExportWorker) { w.interval = d } +} + +func WithLogExportWorkerStaleAfter(d time.Duration) LogExportWorkerOption { + return func(w *LogExportWorker) { w.staleAfter = d } +} + +func WithLogExportWorkerMaxConcurrency(n int) LogExportWorkerOption { + return func(w *LogExportWorker) { + if n > 0 { + w.maxConcurrency = n + } + } +} + +func NewLogExportWorker( + pgClient *pg.Client, + fm *filemanager.Service, + bucket string, + baseURL string, + logger *log.Logger, + opts ...LogExportWorkerOption, +) *LogExportWorker { + w := &LogExportWorker{ + pg: pgClient, + fm: fm, + bucket: bucket, + baseURL: baseURL, + logger: logger, + interval: 10 * time.Second, + staleAfter: 5 * time.Minute, + maxConcurrency: 3, + } + for _, opt := range opts { + opt(w) + } + return w +} + +func (w *LogExportWorker) Run(ctx context.Context) error { + var ( + wg sync.WaitGroup + sem = make(chan struct{}, w.maxConcurrency) + ticker = time.NewTicker(w.interval) + ) + defer ticker.Stop() + defer wg.Wait() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + nonCancelableCtx := context.WithoutCancel(ctx) + w.recoverStaleRows(nonCancelableCtx) + for { + if err := w.processNext(ctx, sem, &wg); err != nil { + if !errors.Is(err, coredata.ErrResourceNotFound) { + w.logger.ErrorCtx(nonCancelableCtx, "cannot process log export", log.Error(err)) + } + break + } + } + } + } +} + +func (w *LogExportWorker) processNext(ctx context.Context, sem chan struct{}, wg *sync.WaitGroup) error { + select { + case sem <- struct{}{}: + case <-ctx.Done(): + return ctx.Err() + } + + var ( + export coredata.LogExport + now = time.Now() + nonCancelableCtx = context.WithoutCancel(ctx) + ) + + if err := w.pg.WithTx( + nonCancelableCtx, + func(tx pg.Conn) error { + if err := export.LoadNextPendingForUpdateSkipLocked(nonCancelableCtx, tx); err != nil { + return err + } + + scope := coredata.NewScopeFromObjectID(export.ID) + export.Status = coredata.LogExportStatusProcessing + export.StartedAt = new(now) + + if err := export.Update(nonCancelableCtx, tx, scope); err != nil { + return fmt.Errorf("cannot update log export: %w", err) + } + + return nil + }, + ); err != nil { + <-sem + return err + } + + wg.Add(1) + go func(export coredata.LogExport) { + defer wg.Done() + defer func() { <-sem }() + + if err := w.handle(nonCancelableCtx, &export); err != nil { + w.logger.ErrorCtx( + nonCancelableCtx, + "cannot handle log export", + log.Error(err), + log.String("log_export_id", export.ID.String()), + ) + if err := w.handleFailure(nonCancelableCtx, &export, err); err != nil { + w.logger.ErrorCtx(nonCancelableCtx, "cannot mark log export as failed", log.Error(err)) + } + } + }(export) + + return nil +} + +func (w *LogExportWorker) handle(ctx context.Context, export *coredata.LogExport) error { + scope := coredata.NewScopeFromObjectID(export.ID) + + fileKey := uuid.MustNewV4().String() + now := time.Now() + typeName := "audit-log" + if export.Type == coredata.LogExportTypeSCIMEvent { + typeName = "scim-event" + } + fileName := fmt.Sprintf( + "%s-export-%s-to-%s.jsonl", + typeName, + export.FromTime.Format("2006-01-02"), + export.ToTime.Format("2006-01-02"), + ) + + file := coredata.File{ + ID: gid.New(export.ID.TenantID(), coredata.FileEntityType), + OrganizationID: export.OrganizationID, + BucketName: w.bucket, + MimeType: "application/x-ndjson", + FileName: fileName, + FileKey: fileKey, + Visibility: coredata.FileVisibilityPrivate, + CreatedAt: now, + UpdatedAt: now, + } + + pr, pw := io.Pipe() + + var uploadErr error + var fileSize int64 + uploadDone := make(chan struct{}) + + go func() { + defer close(uploadDone) + fileSize, uploadErr = w.fm.PutFile( + ctx, + &file, + pr, + map[string]string{ + "type": typeName + "-export", + "log-export-id": export.ID.String(), + "organization-id": export.OrganizationID.String(), + }, + ) + pr.CloseWithError(uploadErr) + }() + + writeErr := w.streamJSONL(ctx, export, scope, pw) + if writeErr != nil { + _ = pw.CloseWithError(writeErr) + } else { + _ = pw.Close() + } + + <-uploadDone + + if writeErr != nil { + return fmt.Errorf("cannot write JSONL: %w", writeErr) + } + if uploadErr != nil { + return fmt.Errorf("cannot upload file to S3: %w", uploadErr) + } + file.FileSize = fileSize + + downloadURL, err := w.fm.GenerateFileUrl(ctx, &file, logExportEmailExpiresIn) + if err != nil { + return fmt.Errorf("cannot generate download URL: %w", err) + } + + emailPresenter := emails.NewPresenter(w.fm, w.bucket, w.baseURL, export.RecipientName) + subject, textBody, htmlBody, err := emailPresenter.RenderLogExport(ctx, downloadURL) + if err != nil { + return fmt.Errorf("cannot render log export email: %w", err) + } + + return w.pg.WithTx( + ctx, + func(tx pg.Conn) error { + if err := file.Insert(ctx, tx, scope); err != nil { + return fmt.Errorf("cannot insert file: %w", err) + } + + email := coredata.NewEmail( + export.RecipientName, + export.RecipientEmail, + subject, + textBody, + htmlBody, + nil, + ) + if err := email.Insert(ctx, tx); err != nil { + return fmt.Errorf("cannot insert email: %w", err) + } + + export.FileID = &file.ID + export.Status = coredata.LogExportStatusCompleted + export.CompletedAt = new(time.Now()) + if err := export.Update(ctx, tx, scope); err != nil { + return fmt.Errorf("cannot update log export: %w", err) + } + + return nil + }, + ) +} + +func (w *LogExportWorker) streamJSONL( + ctx context.Context, + export *coredata.LogExport, + scope coredata.Scoper, + pw io.Writer, +) error { + return w.pg.WithTx( + ctx, + func(conn pg.Conn) error { + enc := json.NewEncoder(pw) + + switch export.Type { + case coredata.LogExportTypeAuditLog: + for entry, err := range coredata.AuditLogEntriesByOrganizationIDAndTimeRange( + ctx, + conn, + scope, + export.OrganizationID, + export.FromTime, + export.ToTime, + ) { + if err != nil { + return err + } + if err := enc.Encode(entry); err != nil { + return fmt.Errorf("cannot encode audit log entry: %w", err) + } + } + return nil + + case coredata.LogExportTypeSCIMEvent: + for event, err := range coredata.SCIMEventsByOrganizationIDAndTimeRange( + ctx, + conn, + scope, + export.OrganizationID, + export.FromTime, + export.ToTime, + ) { + if err != nil { + return err + } + if err := enc.Encode(event); err != nil { + return fmt.Errorf("cannot encode SCIM event: %w", err) + } + } + return nil + + default: + return fmt.Errorf("unsupported log export type: %q", export.Type) + } + }, + ) +} + +func (w *LogExportWorker) handleFailure(ctx context.Context, export *coredata.LogExport, failureErr error) error { + return w.pg.WithConn( + ctx, + func(conn pg.Conn) error { + scope := coredata.NewScopeFromObjectID(export.ID) + errorMsg := failureErr.Error() + export.Error = &errorMsg + export.Status = coredata.LogExportStatusFailed + export.CompletedAt = new(time.Now()) + + if err := export.Update(ctx, conn, scope); err != nil { + return fmt.Errorf("cannot update log export: %w", err) + } + + return nil + }, + ) +} + +func (w *LogExportWorker) recoverStaleRows(ctx context.Context) { + if err := w.pg.WithConn( + ctx, + func(conn pg.Conn) error { + return coredata.ResetStaleLogExports(ctx, conn, w.staleAfter) + }, + ); err != nil { + w.logger.ErrorCtx(ctx, "cannot recover stale log exports", log.Error(err)) + } +} diff --git a/pkg/iam/organization_service.go b/pkg/iam/organization_service.go index 1c9fad9d6..f8296726d 100644 --- a/pkg/iam/organization_service.go +++ b/pkg/iam/organization_service.go @@ -2188,3 +2188,54 @@ func (s *OrganizationService) CountAuditLogEntries( return count, err } + +type RequestLogExportRequest struct { + OrganizationID gid.GID + Type coredata.LogExportType + FromTime time.Time + ToTime time.Time + RecipientEmail mail.Addr + RecipientName string +} + +func (s *OrganizationService) RequestLogExport( + ctx context.Context, + req RequestLogExportRequest, +) (*coredata.LogExport, error) { + if !req.FromTime.Before(req.ToTime) { + return nil, fmt.Errorf("from_time must be before to_time") + } + + scope := coredata.NewScopeFromObjectID(req.OrganizationID) + logExport := &coredata.LogExport{} + + err := s.pg.WithTx( + ctx, + func(tx pg.Conn) error { + now := time.Now() + + logExport = &coredata.LogExport{ + ID: gid.New(scope.GetTenantID(), coredata.LogExportEntityType), + OrganizationID: req.OrganizationID, + Type: req.Type, + Status: coredata.LogExportStatusPending, + FromTime: req.FromTime, + ToTime: req.ToTime, + RecipientEmail: req.RecipientEmail, + RecipientName: req.RecipientName, + CreatedAt: now, + } + + if err := logExport.Insert(ctx, tx, scope); err != nil { + return fmt.Errorf("cannot insert log export: %w", err) + } + + return nil + }, + ) + if err != nil { + return nil, err + } + + return logExport, nil +} diff --git a/pkg/iam/service.go b/pkg/iam/service.go index c3cea7061..82370ca29 100644 --- a/pkg/iam/service.go +++ b/pkg/iam/service.go @@ -53,6 +53,7 @@ type ( Authorizer *Authorizer samlDomainVerifier *SAMLDomainVerifier + logExportWorker *LogExportWorker } Config struct { @@ -171,6 +172,14 @@ func NewService( cfg.DomainVerificationResolverAddr, ) + svc.logExportWorker = NewLogExportWorker( + pgClient, + fm, + cfg.Bucket, + cfg.BaseURL.String(), + cfg.Logger.Named("log-export-worker"), + ) + return svc, nil } @@ -215,12 +224,22 @@ func (s *Service) Run(ctx context.Context) error { }, ) + logExportCtx, stopLogExport := context.WithCancel(context.WithoutCancel(ctx)) + wg.Go( + func() { + if err := s.logExportWorker.Run(logExportCtx); err != nil { + cancel(fmt.Errorf("log export worker crashed: %w", err)) + } + }, + ) + <-ctx.Done() stopSAML() stopOIDC() stopDomainVerifier() stopSCIM() + stopLogExport() wg.Wait() diff --git a/pkg/server/api/connect/v1/schema.graphql b/pkg/server/api/connect/v1/schema.graphql index f8f672b83..07a81a493 100644 --- a/pkg/server/api/connect/v1/schema.graphql +++ b/pkg/server/api/connect/v1/schema.graphql @@ -134,6 +134,10 @@ type Mutation { updateSCIMBridge( input: UpdateSCIMBridgeInput! ): UpdateSCIMBridgePayload @session(required: PRESENT) + + requestAuditLogExport( + input: RequestAuditLogExportInput! + ): RequestAuditLogExportPayload @session(required: PRESENT) } type Identity implements Node { @@ -1033,3 +1037,13 @@ type RegenerateSCIMTokenPayload { type UpdateSCIMBridgePayload { scimBridge: SCIMBridge! } + +input RequestAuditLogExportInput { + organizationId: ID! + fromTime: Datetime! + toTime: Datetime! +} + +type RequestAuditLogExportPayload { + logExportId: ID! +} diff --git a/pkg/server/api/connect/v1/v1_resolver.go b/pkg/server/api/connect/v1/v1_resolver.go index 64fef54d8..e6c92842c 100644 --- a/pkg/server/api/connect/v1/v1_resolver.go +++ b/pkg/server/api/connect/v1/v1_resolver.go @@ -1212,6 +1212,35 @@ func (r *mutationResolver) UpdateSCIMBridge(ctx context.Context, input types.Upd }, nil } +// RequestAuditLogExport is the resolver for the requestAuditLogExport field. +func (r *mutationResolver) RequestAuditLogExport(ctx context.Context, input types.RequestAuditLogExportInput) (*types.RequestAuditLogExportPayload, error) { + if err := r.authorize(ctx, input.OrganizationID, iam.ActionAuditLogExport); err != nil { + return nil, err + } + + identity := authn.IdentityFromContext(ctx) + + logExport, err := r.iam.OrganizationService.RequestLogExport( + ctx, + iam.RequestLogExportRequest{ + OrganizationID: input.OrganizationID, + Type: coredata.LogExportTypeAuditLog, + FromTime: input.FromTime, + ToTime: input.ToTime, + RecipientEmail: identity.EmailAddress, + RecipientName: identity.FullName, + }, + ) + if err != nil { + r.logger.ErrorCtx(ctx, "cannot request audit log export", log.Error(err)) + return nil, gqlutils.Internal(ctx) + } + + return &types.RequestAuditLogExportPayload{ + LogExportID: logExport.ID, + }, nil +} + // LogoURL is the resolver for the logoUrl field. func (r *organizationResolver) LogoURL(ctx context.Context, obj *types.Organization) (*string, error) { if err := r.authorize(ctx, obj.ID, iam.ActionOrganizationGet, authz.WithSkipAssumptionCheck()); err != nil { diff --git a/pkg/server/api/console/v1/schema.graphql b/pkg/server/api/console/v1/schema.graphql index 355b129aa..3c5aeb054 100644 --- a/pkg/server/api/console/v1/schema.graphql +++ b/pkg/server/api/console/v1/schema.graphql @@ -3893,6 +3893,13 @@ type Mutation { deleteCustomDomain( input: DeleteCustomDomainInput! ): DeleteCustomDomainPayload! + # Log export mutations + requestAuditLogExport( + input: RequestAuditLogExportInput! + ): RequestAuditLogExportPayload! + requestSCIMEventExport( + input: RequestSCIMEventExportInput! + ): RequestSCIMEventExportPayload! } # Input Types @@ -6054,3 +6061,23 @@ type AuditLogEntryEdge { cursor: CursorKey! node: AuditLogEntry! } + +input RequestAuditLogExportInput { + organizationId: ID! + fromTime: Datetime! + toTime: Datetime! +} + +type RequestAuditLogExportPayload { + logExportId: ID! +} + +input RequestSCIMEventExportInput { + organizationId: ID! + fromTime: Datetime! + toTime: Datetime! +} + +type RequestSCIMEventExportPayload { + logExportId: ID! +} diff --git a/pkg/server/api/console/v1/v1_resolver.go b/pkg/server/api/console/v1/v1_resolver.go index ae9c47735..15bae6455 100644 --- a/pkg/server/api/console/v1/v1_resolver.go +++ b/pkg/server/api/console/v1/v1_resolver.go @@ -6293,6 +6293,64 @@ func (r *mutationResolver) DeleteCustomDomain(ctx context.Context, input types.D }, nil } +// RequestAuditLogExport is the resolver for the requestAuditLogExport field. +func (r *mutationResolver) RequestAuditLogExport(ctx context.Context, input types.RequestAuditLogExportInput) (*types.RequestAuditLogExportPayload, error) { + if err := r.authorize(ctx, input.OrganizationID, iam.ActionAuditLogExport); err != nil { + return nil, err + } + + identity := authn.IdentityFromContext(ctx) + + logExport, err := r.iam.OrganizationService.RequestLogExport( + ctx, + iam.RequestLogExportRequest{ + OrganizationID: input.OrganizationID, + Type: coredata.LogExportTypeAuditLog, + FromTime: input.FromTime, + ToTime: input.ToTime, + RecipientEmail: identity.EmailAddress, + RecipientName: identity.FullName, + }, + ) + if err != nil { + r.logger.ErrorCtx(ctx, "cannot request audit log export", log.Error(err)) + return nil, gqlutils.Internal(ctx) + } + + return &types.RequestAuditLogExportPayload{ + LogExportID: logExport.ID, + }, nil +} + +// RequestSCIMEventExport is the resolver for the requestSCIMEventExport field. +func (r *mutationResolver) RequestSCIMEventExport(ctx context.Context, input types.RequestSCIMEventExportInput) (*types.RequestSCIMEventExportPayload, error) { + if err := r.authorize(ctx, input.OrganizationID, iam.ActionSCIMEventExport); err != nil { + return nil, err + } + + identity := authn.IdentityFromContext(ctx) + + logExport, err := r.iam.OrganizationService.RequestLogExport( + ctx, + iam.RequestLogExportRequest{ + OrganizationID: input.OrganizationID, + Type: coredata.LogExportTypeSCIMEvent, + FromTime: input.FromTime, + ToTime: input.ToTime, + RecipientEmail: identity.EmailAddress, + RecipientName: identity.FullName, + }, + ) + if err != nil { + r.logger.ErrorCtx(ctx, "cannot request SCIM event export", log.Error(err)) + return nil, gqlutils.Internal(ctx) + } + + return &types.RequestSCIMEventExportPayload{ + LogExportID: logExport.ID, + }, nil +} + // Organization is the resolver for the organization field. func (r *obligationResolver) Organization(ctx context.Context, obj *types.Obligation) (*types.Organization, error) { if err := r.authorize(ctx, obj.ID, probo.ActionOrganizationGet); err != nil { diff --git a/pkg/server/api/mcp/v1/schema.resolvers.go b/pkg/server/api/mcp/v1/schema.resolvers.go index 41cb1cb5c..6cf5dae01 100644 --- a/pkg/server/api/mcp/v1/schema.resolvers.go +++ b/pkg/server/api/mcp/v1/schema.resolvers.go @@ -3359,3 +3359,52 @@ func (r *Resolver) GetAuditLogEntryTool(ctx context.Context, req *mcp.CallToolRe AuditLogEntry: types.NewAuditLogEntry(entry), }, nil } + +func (r *Resolver) RequestAuditLogExportTool(ctx context.Context, req *mcp.CallToolRequest, input *types.RequestAuditLogExportInput) (*mcp.CallToolResult, types.RequestAuditLogExportOutput, error) { + r.MustAuthorize(ctx, input.OrganizationID, iam.ActionAuditLogExport) + + identity := authn.IdentityFromContext(ctx) + + logExport, err := r.iamSvc.OrganizationService.RequestLogExport( + ctx, + iam.RequestLogExportRequest{ + OrganizationID: input.OrganizationID, + Type: coredata.LogExportTypeAuditLog, + FromTime: input.FromTime, + ToTime: input.ToTime, + RecipientEmail: identity.EmailAddress, + RecipientName: identity.FullName, + }, + ) + if err != nil { + return nil, types.RequestAuditLogExportOutput{}, fmt.Errorf("cannot request audit log export: %w", err) + } + + return nil, types.RequestAuditLogExportOutput{ + LogExportID: logExport.ID, + }, nil +} +func (r *Resolver) RequestSCIMEventExportTool(ctx context.Context, req *mcp.CallToolRequest, input *types.RequestSCIMEventExportInput) (*mcp.CallToolResult, types.RequestSCIMEventExportOutput, error) { + r.MustAuthorize(ctx, input.OrganizationID, iam.ActionSCIMEventExport) + + identity := authn.IdentityFromContext(ctx) + + logExport, err := r.iamSvc.OrganizationService.RequestLogExport( + ctx, + iam.RequestLogExportRequest{ + OrganizationID: input.OrganizationID, + Type: coredata.LogExportTypeSCIMEvent, + FromTime: input.FromTime, + ToTime: input.ToTime, + RecipientEmail: identity.EmailAddress, + RecipientName: identity.FullName, + }, + ) + if err != nil { + return nil, types.RequestSCIMEventExportOutput{}, fmt.Errorf("cannot request SCIM event export: %w", err) + } + + return nil, types.RequestSCIMEventExportOutput{ + LogExportID: logExport.ID, + }, nil +} diff --git a/pkg/server/api/mcp/v1/specification.yaml b/pkg/server/api/mcp/v1/specification.yaml index 8ce6f07b6..a9cda2e1d 100644 --- a/pkg/server/api/mcp/v1/specification.yaml +++ b/pkg/server/api/mcp/v1/specification.yaml @@ -6532,6 +6532,66 @@ components: go.probo.inc/mcpgen/type: time.Time description: When the action was performed + RequestAuditLogExportInput: + type: object + required: + - organization_id + - from_time + - to_time + properties: + organization_id: + $ref: "#/components/schemas/GID" + description: Organization ID + from_time: + type: string + format: date-time + go.probo.inc/mcpgen/type: time.Time + description: Start of the time range (inclusive) + to_time: + type: string + format: date-time + go.probo.inc/mcpgen/type: time.Time + description: End of the time range (exclusive) + + RequestAuditLogExportOutput: + type: object + required: + - log_export_id + properties: + log_export_id: + $ref: "#/components/schemas/GID" + description: ID of the created log export + + RequestSCIMEventExportInput: + type: object + required: + - organization_id + - from_time + - to_time + properties: + organization_id: + $ref: "#/components/schemas/GID" + description: Organization ID + from_time: + type: string + format: date-time + go.probo.inc/mcpgen/type: time.Time + description: Start of the time range (inclusive) + to_time: + type: string + format: date-time + go.probo.inc/mcpgen/type: time.Time + description: End of the time range (exclusive) + + RequestSCIMEventExportOutput: + type: object + required: + - log_export_id + properties: + log_export_id: + $ref: "#/components/schemas/GID" + description: ID of the created log export + tools: - name: listOrganizations description: List all organizations the user has access to @@ -7694,3 +7754,23 @@ tools: $ref: "#/components/schemas/ListAuditLogEntriesInput" outputSchema: $ref: "#/components/schemas/ListAuditLogEntriesOutput" + + - name: requestAuditLogExport + description: Request an export of audit log entries for the organization within a time range. The export will be emailed as a JSONL download link. + hints: + readonly: false + idempotent: false + inputSchema: + $ref: "#/components/schemas/RequestAuditLogExportInput" + outputSchema: + $ref: "#/components/schemas/RequestAuditLogExportOutput" + + - name: requestSCIMEventExport + description: Request an export of SCIM events for the organization within a time range. The export will be emailed as a JSONL download link. + hints: + readonly: false + idempotent: false + inputSchema: + $ref: "#/components/schemas/RequestSCIMEventExportInput" + outputSchema: + $ref: "#/components/schemas/RequestSCIMEventExportOutput"