diff --git a/frontend/src/views/settings/webhooks.vue b/frontend/src/views/settings/webhooks.vue
new file mode 100644
index 000000000..4763d76a8
--- /dev/null
+++ b/frontend/src/views/settings/webhooks.vue
@@ -0,0 +1,174 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ {{ props.option }}
+
+
+ {{ $t('globals.messages.emptyState') }}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ {{ $t('globals.buttons.addNew') }}
+
+
+
+
+
diff --git a/i18n/en.json b/i18n/en.json
index a87b08487..7ad7398aa 100644
--- a/i18n/en.json
+++ b/i18n/en.json
@@ -251,6 +251,8 @@
"globals.terms.tx": "Transactional | Transactional",
"globals.terms.user": "User | Users",
"globals.terms.users": "Users",
+ "globals.terms.webhook": "Webhook | Webhooks",
+ "globals.terms.webhooks": "Webhooks",
"globals.terms.year": "Year | Years",
"globals.terms.import": "Import",
"globals.terms.url": "URL",
@@ -513,6 +515,10 @@
"settings.performance.slidingWindowHelp": "Limit the total number of messages that are sent out in given period. On reaching this limit, messages are be held from sending until the time window clears.",
"settings.performance.slidingWindowRate": "Max. messages",
"settings.performance.slidingWindowRateHelp": "Maximum number of messages to send within the window duration.",
+ "settings.performance.webhookWorkers": "Webhook workers",
+ "settings.performance.webhookWorkersHelp": "Number of concurrent worker goroutines that process webhook deliveries from the queue.",
+ "settings.performance.webhookBatchSize": "Webhook batch size",
+ "settings.performance.webhookBatchSizeHelp": "Number of webhook log entries to fetch and process per batch by each worker.",
"settings.privacy.allowBlocklist": "Allow blocklisting",
"settings.privacy.allowBlocklistHelp": "Allow subscribers to unsubscribe from all mailing lists and mark themselves as blocklisted?",
"settings.privacy.allowExport": "Allow exporting",
@@ -569,6 +575,25 @@
"settings.smtp.testEnterEmail": "Re-enter password to test",
"settings.smtp.toEmail": "To e-mail",
"settings.title": "Settings",
+ "settings.webhooks.authBasic": "Basic Auth",
+ "settings.webhooks.authNone": "None",
+ "settings.webhooks.authToken": "Bearer Token",
+ "settings.webhooks.authType": "Authentication",
+ "settings.webhooks.authTypeHelp": "Method to authenticate webhook requests.",
+ "settings.webhooks.events": "Events",
+ "settings.webhooks.eventsHelp": "Select events that will trigger this webhook.",
+ "settings.webhooks.token": "Token",
+ "settings.webhooks.tokenHelp": "Bearer token for authentication. The token will be sent in the Authorization header.",
+ "settings.webhooks.maxRetries": "Max retries",
+ "settings.webhooks.maxRetriesHelp": "Maximum number of retry attempts on failure (uses exponential backoff).",
+ "settings.webhooks.name": "Webhooks",
+ "settings.webhooks.nameHelp": "A unique name to identify this webhook.",
+ "settings.webhooks.password": "Password",
+ "settings.webhooks.timeout": "Request timeout",
+ "settings.webhooks.timeoutHelp": "Timeout for webhook HTTP requests. Example: 30s, 1m.",
+ "settings.webhooks.url": "URL",
+ "settings.webhooks.urlHelp": "The endpoint URL that will receive webhook events.",
+ "settings.webhooks.username": "Username",
"settings.updateAvailable": "A new update {version} is available.",
"subscribers.advancedQuery": "Advanced",
"subscribers.advancedQueryHelp": "Partial SQL expression to query subscriber attributes",
diff --git a/internal/core/bounces.go b/internal/core/bounces.go
index 1fc3732c1..6126f7eb2 100644
--- a/internal/core/bounces.go
+++ b/internal/core/bounces.go
@@ -81,9 +81,13 @@ func (c *Core) RecordBounce(b models.Bounce) error {
}
c.log.Printf("error recording bounce: %v", err)
+ return err
}
- return err
+ // Trigger webhook for bounce event.
+ c.TriggerWebhook(models.EventSubscriberBounced, b)
+
+ return nil
}
// BlocklistBouncedSubscribers blocklists all bounced subscribers.
diff --git a/internal/core/campaigns.go b/internal/core/campaigns.go
index 14aca6383..75c02a6c3 100644
--- a/internal/core/campaigns.go
+++ b/internal/core/campaigns.go
@@ -299,6 +299,19 @@ func (c *Core) UpdateCampaignStatus(id int, status string) (models.Campaign, err
}
cm.Status = status
+
+ // Trigger webhooks for campaign status changes.
+ switch status {
+ case models.CampaignStatusRunning:
+ c.TriggerWebhook(models.EventCampaignStarted, cm)
+ case models.CampaignStatusPaused:
+ c.TriggerWebhook(models.EventCampaignPaused, cm)
+ case models.CampaignStatusCancelled:
+ c.TriggerWebhook(models.EventCampaignCancelled, cm)
+ case models.CampaignStatusFinished:
+ c.TriggerWebhook(models.EventCampaignFinished, cm)
+ }
+
return cm, nil
}
diff --git a/internal/core/core.go b/internal/core/core.go
index c612e5766..466ff9d1d 100644
--- a/internal/core/core.go
+++ b/internal/core/core.go
@@ -6,6 +6,7 @@ package core
import (
"bytes"
+ "context"
"fmt"
"log"
"net/http"
@@ -37,6 +38,8 @@ type Core struct {
db *sqlx.DB
q *models.Queries
log *log.Logger
+
+ webhookTrigger chan (webhookTriggerEvent)
}
// Constants represents constant config.
@@ -52,6 +55,12 @@ type Constants struct {
// Hooks contains external function hooks that are required by the core package.
type Hooks struct {
SendOptinConfirmation func(models.Subscriber, []int) (int, error)
+ TriggerWebhook func(event string, data any) error
+}
+
+type webhookTriggerEvent struct {
+ Event string
+ Data any
}
// Opt contains the controllers required to start the core.
@@ -84,6 +93,8 @@ func New(o *Opt, h *Hooks) *Core {
db: o.DB,
q: o.Queries,
log: o.Log,
+
+ webhookTrigger: make(chan webhookTriggerEvent, 1<<16), // explicit 64k to avoid memory exhaustion
}
}
@@ -207,3 +218,28 @@ func sanitizeSQLExp(q string) string {
func strHasLen(str string, min, max int) bool {
return len(str) >= min && len(str) <= max
}
+
+// TriggerWebhook triggers a webhook event if the hook is set.
+func (c *Core) TriggerWebhook(event string, data any) {
+ c.log.Printf("trigger webhook for event %s", event)
+ c.webhookTrigger <- webhookTriggerEvent{
+ Event: event,
+ Data: data,
+ }
+}
+
+// PersistWebhookLogs will receive each webhook trigger and write them to db
+// to survive restart and allow webhook worker pool to pick up
+func (c *Core) PersistWebhookLogs(ctx context.Context) {
+ for {
+ select {
+ case <-ctx.Done():
+ c.log.Println("shutdown signal received. stopping persist webhook_log goroutine.")
+ return
+ case ev := <-c.webhookTrigger:
+ if err := c.h.TriggerWebhook(ev.Event, ev.Data); err != nil {
+ c.log.Printf("error triggering webhook %s: %v", ev.Event, err)
+ }
+ }
+ }
+}
diff --git a/internal/core/subscribers.go b/internal/core/subscribers.go
index 95332f617..d7e2d785b 100644
--- a/internal/core/subscribers.go
+++ b/internal/core/subscribers.go
@@ -343,8 +343,15 @@ func (c *Core) InsertSubscriber(sub models.Subscriber, listIDs []int, listUUIDs
}
hasOptin = num > 0
+ if hasOptin {
+ // Trigger webhook for optin start.
+ c.TriggerWebhook(models.EventSubscriberOptinStart, out)
+ }
}
+ // Trigger webhook for new subscriber creation.
+ c.TriggerWebhook(models.EventSubscriberCreated, out)
+
return out, hasOptin, nil
}
@@ -379,6 +386,9 @@ func (c *Core) UpdateSubscriber(id int, sub models.Subscriber) (models.Subscribe
return models.Subscriber{}, err
}
+ // Trigger webhook for subscriber update.
+ c.TriggerWebhook(models.EventSubscriberUpdated, out)
+
return out, nil
}
@@ -431,8 +441,15 @@ func (c *Core) UpdateSubscriberWithLists(id int, sub models.Subscriber, listIDs
return out, hasOptin, err
}
hasOptin = num > 0
+ if hasOptin {
+ // Trigger webhook for optin start.
+ c.TriggerWebhook(models.EventSubscriberOptinStart, out)
+ }
}
+ // Trigger webhook for subscriber update.
+ c.TriggerWebhook(models.EventSubscriberUpdated, out)
+
return out, hasOptin, nil
}
@@ -473,6 +490,12 @@ func (c *Core) DeleteSubscribers(subIDs []int, subUUIDs []string) error {
c.i18n.Ts("globals.messages.errorDeleting", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
}
+ // Trigger webhook for subscriber deletion with IDs.
+ c.TriggerWebhook(models.EventSubscriberDeleted, map[string]any{
+ "ids": subIDs,
+ "uuids": subUUIDs,
+ })
+
return nil
}
@@ -496,6 +519,13 @@ func (c *Core) UnsubscribeByCampaign(subUUID, campUUID string, blocklist bool) e
c.i18n.Ts("globals.messages.errorUpdating", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
}
+ // Trigger webhook for unsubscribe.
+ c.TriggerWebhook(models.EventSubscriberUnsubscribed, map[string]any{
+ "subscriber_uuid": subUUID,
+ "campaign_uuid": campUUID,
+ "blocklisted": blocklist,
+ })
+
return nil
}
@@ -511,6 +541,12 @@ func (c *Core) ConfirmOptionSubscription(subUUID string, listUUIDs []string, met
c.i18n.Ts("globals.messages.errorUpdating", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
}
+ // Trigger webhook for optin finish.
+ c.TriggerWebhook(models.EventSubscriberOptinFinish, map[string]any{
+ "subscriber_uuid": subUUID,
+ "list_uuids": listUUIDs,
+ })
+
return nil
}
diff --git a/internal/migrations/v6.1.0.go b/internal/migrations/v6.1.0.go
new file mode 100644
index 000000000..0c1393009
--- /dev/null
+++ b/internal/migrations/v6.1.0.go
@@ -0,0 +1,60 @@
+package migrations
+
+import (
+ "log"
+
+ "github.com/jmoiron/sqlx"
+ "github.com/knadh/koanf/v2"
+ "github.com/knadh/stuffbin"
+)
+
+// V6_1_0 adds webhook_logs table for persistent webhook delivery with background workers.
+func V6_1_0(db *sqlx.DB, fs stuffbin.FileSystem, ko *koanf.Koanf, lo *log.Logger) error {
+ // Create webhook_log_status enum type.
+ _, err := db.Exec(`
+ DO $$ BEGIN
+ IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'webhook_log_status') THEN
+ CREATE TYPE webhook_log_status AS ENUM ('triggered', 'processing', 'completed', 'failed');
+ END IF;
+ END $$;
+ `)
+ if err != nil {
+ return err
+ }
+
+ // Create webhook_logs table.
+ _, err = db.Exec(`
+ CREATE TABLE IF NOT EXISTS webhook_logs (
+ id SERIAL PRIMARY KEY,
+ webhook_id TEXT NOT NULL,
+ event TEXT NOT NULL,
+ payload JSONB NOT NULL DEFAULT '{}',
+ status webhook_log_status NOT NULL DEFAULT 'triggered',
+ retries INT NOT NULL DEFAULT 0,
+ last_retried_at TIMESTAMP WITH TIME ZONE,
+ response JSONB NOT NULL DEFAULT '{}',
+ note TEXT,
+ created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
+ updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
+ );
+
+ CREATE INDEX IF NOT EXISTS idx_webhook_logs_webhook_id ON webhook_logs(webhook_id);
+ CREATE INDEX IF NOT EXISTS idx_webhook_logs_status ON webhook_logs(status);
+ CREATE INDEX IF NOT EXISTS idx_webhook_logs_created_at ON webhook_logs(created_at);
+ CREATE INDEX IF NOT EXISTS idx_webhook_logs_status_created ON webhook_logs(status, created_at);
+ `)
+ if err != nil {
+ return err
+ }
+
+ // Add webhook workers setting.
+ _, err = db.Exec(`
+ INSERT INTO settings (key, value, updated_at) VALUES ('app.webhook_workers', '2', NOW()) ON CONFLICT (key) DO NOTHING;
+ INSERT INTO settings (key, value, updated_at) VALUES ('app.webhook_batch_size', '50', NOW()) ON CONFLICT (key) DO NOTHING;
+ `)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
diff --git a/internal/subimporter/importer.go b/internal/subimporter/importer.go
index f318bc39c..b0c415588 100644
--- a/internal/subimporter/importer.go
+++ b/internal/subimporter/importer.go
@@ -53,6 +53,8 @@ type Importer struct {
db *sql.DB
i18n *i18n.I18n
+ triggerWebhook func(event string, data any)
+
domainBlocklist map[string]struct{}
hasBlocklistWildcards bool
hasBlocklist bool
@@ -73,6 +75,8 @@ type Options struct {
UpdateListDateStmt *sql.Stmt
PostCB func(subject string, data any) error
+ TriggerWebhook func(event string, data any)
+
DomainBlocklist []string
DomainAllowlist []string
}
@@ -159,6 +163,8 @@ func New(opt Options, db *sql.DB, i *i18n.I18n) *Importer {
im.hasAllowlistWildcards = hasWildcards
im.hasAllowlist = len(mp) > 0
+ im.triggerWebhook = opt.TriggerWebhook
+
return &im
}
@@ -279,6 +285,8 @@ func (s *Session) Start() {
cur = 0
)
+ s.im.triggerWebhook(models.EventBatchImportStarted, s.opt)
+
listIDs := make([]int, len(s.opt.ListIDs))
copy(listIDs, s.opt.ListIDs)
@@ -340,6 +348,7 @@ func (s *Session) Start() {
s.log.Printf("error updating lists date: %v", err)
}
s.im.sendNotif(StatusFinished)
+ s.im.triggerWebhook(models.EventBatchImportCompleted, s.opt)
return
}
@@ -349,6 +358,7 @@ func (s *Session) Start() {
s.im.setStatus(StatusFailed)
s.log.Printf("error committing to DB: %v", err)
s.im.sendNotif(StatusFailed)
+ s.im.triggerWebhook(models.EventBatchImportFailed, s.opt)
return
}
diff --git a/internal/webhooks/webhooks.go b/internal/webhooks/webhooks.go
new file mode 100644
index 000000000..3d951c262
--- /dev/null
+++ b/internal/webhooks/webhooks.go
@@ -0,0 +1,138 @@
+// Package webhooks implements an outgoing webhook delivery system for listmonk.
+// It creates webhook log entries that are processed by background workers.
+package webhooks
+
+import (
+ "encoding/json"
+ "log"
+ "sync"
+ "time"
+
+ "github.com/knadh/listmonk/models"
+)
+
+// Webhook represents a webhook configuration loaded from settings.
+type Webhook struct {
+ UUID string
+ Enabled bool
+ Name string
+ URL string
+ Events map[string]struct{} // O(1) lookup
+ AuthType string
+ AuthBasicUser string
+ AuthBasicPass string
+ AuthToken string
+ MaxRetries int
+ Timeout time.Duration
+}
+
+// Manager handles webhook event triggering by creating log entries.
+type Manager struct {
+ webhooks []Webhook
+ log *log.Logger
+ mu sync.RWMutex
+ versionString string
+
+ // Database query for creating webhook logs.
+ createLogStmt *models.Queries
+}
+
+// New creates a new webhook manager.
+func New(log *log.Logger, versionString string, queries *models.Queries) *Manager {
+ return &Manager{
+ webhooks: []Webhook{},
+ log: log,
+ versionString: versionString,
+ createLogStmt: queries,
+ }
+}
+
+// Load loads webhooks from settings into memory.
+func (m *Manager) Load(settings []models.Webhook) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ m.webhooks = make([]Webhook, 0, len(settings))
+ for _, s := range settings {
+ if !s.Enabled {
+ continue
+ }
+
+ // Parse timeout with default.
+ timeout, err := time.ParseDuration(s.Timeout)
+ if err != nil || timeout <= 0 {
+ timeout = 30 * time.Second
+ }
+
+ // Default max retries.
+ maxRetries := s.MaxRetries
+ if maxRetries <= 0 {
+ maxRetries = 3
+ }
+
+ events := make(map[string]struct{})
+ for _, ev := range s.Events {
+ events[ev] = struct{}{}
+ }
+
+ m.webhooks = append(m.webhooks, Webhook{
+ UUID: s.UUID,
+ Enabled: s.Enabled,
+ Name: s.Name,
+ URL: s.URL,
+ Events: events,
+ AuthType: s.AuthType,
+ AuthBasicUser: s.AuthBasicUser,
+ AuthBasicPass: s.AuthBasicPass,
+ AuthToken: s.AuthToken,
+ MaxRetries: maxRetries,
+ Timeout: timeout,
+ })
+ }
+}
+
+// Trigger creates webhook log entries for all webhooks subscribed to the given event.
+// The logs are processed asynchronously by background workers.
+func (m *Manager) Trigger(event string, data any) error {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+
+ // Build the event payload once.
+ payload := models.WebhookEvent{
+ Event: event,
+ Timestamp: time.Now().UTC(),
+ Data: data,
+ }
+
+ payloadBytes, err := json.Marshal(payload)
+ if err != nil {
+ m.log.Printf("error marshaling webhook payload: %v", err)
+ return err
+ }
+
+ // Create webhook log entries for subscribed webhooks.
+ for _, wh := range m.webhooks {
+ if !m.isSubscribed(wh, event) {
+ continue
+ }
+
+ // Create a webhook log entry.
+ if _, err := m.createLogStmt.CreateWebhookLog.Exec(wh.UUID, event, payloadBytes); err != nil {
+ m.log.Printf("error creating webhook log for %s: %v", wh.Name, err)
+ continue
+ }
+ }
+
+ return nil
+}
+
+// isSubscribed checks if a webhook is subscribed to the given event.
+func (m *Manager) isSubscribed(wh Webhook, event string) bool {
+ _, exists := wh.Events[event]
+ return exists
+}
+
+// Close is a no-op for the settings-based manager.
+func (m *Manager) Close() {
+ // No cleanup needed for settings-based manager.
+}
diff --git a/internal/webhooks/worker.go b/internal/webhooks/worker.go
new file mode 100644
index 000000000..837d06358
--- /dev/null
+++ b/internal/webhooks/worker.go
@@ -0,0 +1,310 @@
+package webhooks
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "log"
+ "net/http"
+ "sync"
+ "time"
+
+ "github.com/jmoiron/sqlx"
+ "github.com/knadh/listmonk/models"
+)
+
+// WorkerConfig holds the configuration for webhook workers.
+type WorkerConfig struct {
+ NumWorkers int
+ BatchSize int
+}
+
+// WorkerPool manages a pool of webhook delivery workers.
+type WorkerPool struct {
+ cfg WorkerConfig
+ db *sqlx.DB
+ queries *models.Queries
+ webhooks map[string]Webhook // Webhook configs indexed by UUID
+ webhooksMu sync.RWMutex
+ log *log.Logger
+ versionString string
+
+ // Control channels
+ ctx context.Context
+ cancel context.CancelFunc
+ wg sync.WaitGroup
+}
+
+// NewWorkerPool creates a new webhook worker pool.
+func NewWorkerPool(cfg WorkerConfig, db *sqlx.DB, queries *models.Queries, log *log.Logger, versionString string) *WorkerPool {
+ ctx, cancel := context.WithCancel(context.Background())
+
+ if cfg.NumWorkers < 1 {
+ cfg.NumWorkers = 2
+ }
+ if cfg.BatchSize < 1 {
+ cfg.BatchSize = 50
+ }
+
+ return &WorkerPool{
+ cfg: cfg,
+ db: db,
+ queries: queries,
+ webhooks: make(map[string]Webhook),
+ log: log,
+ versionString: versionString,
+ ctx: ctx,
+ cancel: cancel,
+ }
+}
+
+// LoadWebhooks loads webhook configurations into the worker pool.
+func (p *WorkerPool) LoadWebhooks(settings []models.Webhook) {
+ p.webhooksMu.Lock()
+ defer p.webhooksMu.Unlock()
+
+ numWebhooks := len(settings)
+
+ p.webhooks = make(map[string]Webhook, numWebhooks)
+ for _, s := range settings {
+ if !s.Enabled {
+ continue
+ }
+
+ // Parse timeout with default.
+ timeout, err := time.ParseDuration(s.Timeout)
+ if err != nil || timeout <= 0 {
+ timeout = 30 * time.Second
+ }
+
+ // Default max retries.
+ maxRetries := s.MaxRetries
+ if maxRetries <= 0 {
+ maxRetries = 3
+ }
+
+ events := make(map[string]struct{})
+ for _, ev := range s.Events {
+ events[ev] = struct{}{}
+ }
+
+ p.webhooks[s.UUID] = Webhook{
+ UUID: s.UUID,
+ Enabled: s.Enabled,
+ Name: s.Name,
+ URL: s.URL,
+ Events: events,
+ AuthType: s.AuthType,
+ AuthBasicUser: s.AuthBasicUser,
+ AuthBasicPass: s.AuthBasicPass,
+ AuthToken: s.AuthToken,
+ MaxRetries: maxRetries,
+ Timeout: timeout,
+ }
+ }
+}
+
+// Run starts the worker pool. This is a blocking call.
+func (p *WorkerPool) Run() {
+ // Reset any stale processing logs on startup.
+ if _, err := p.queries.ResetStaleProcessingLogs.Exec(); err != nil {
+ p.log.Printf("error resetting stale webhook logs: %v", err)
+ }
+
+ // Start worker goroutines.
+ for i := 0; i < p.cfg.NumWorkers; i++ {
+ p.wg.Add(1)
+ go p.worker(i)
+ }
+
+ p.log.Printf("started %d webhook workers with batch size %d", p.cfg.NumWorkers, p.cfg.BatchSize)
+
+ // Wait for all workers to complete.
+ p.wg.Wait()
+}
+
+// Close gracefully shuts down the worker pool.
+func (p *WorkerPool) Close() {
+ p.cancel()
+ p.wg.Wait()
+ p.log.Printf("webhook worker pool stopped")
+}
+
+// worker is a single worker goroutine that processes webhook logs.
+func (p *WorkerPool) worker(id int) {
+ defer p.wg.Done()
+
+ ticker := time.NewTicker(time.Second * 2)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-p.ctx.Done():
+ return
+ case <-ticker.C:
+ p.processBatch()
+ }
+ }
+}
+
+// processBatch fetches and processes a batch of pending webhook logs.
+func (p *WorkerPool) processBatch() {
+ // Fetch a batch of pending logs (already locked with SELECT FOR UPDATE SKIP LOCKED).
+ var logs []models.WebhookLog
+ if err := p.queries.GetPendingWebhookLogs.Select(&logs, p.cfg.BatchSize); err != nil {
+ p.log.Printf("error fetching webhook logs: %v", err)
+ return
+ }
+
+ if len(logs) == 0 {
+ return
+ }
+
+ // Process each log.
+ for _, wl := range logs {
+ p.processLog(wl)
+ }
+}
+
+// processLog processes a single webhook log entry.
+func (p *WorkerPool) processLog(wl models.WebhookLog) {
+ // Get the webhook configuration.
+ p.webhooksMu.RLock()
+ wh, exists := p.webhooks[wl.WebhookID]
+ p.webhooksMu.RUnlock()
+
+ // If webhook doesn't exist (deleted), mark as failed.
+ if !exists {
+ resp := models.WebhookResponse{}
+ note := "webhook configuration not found (may have been deleted)"
+ if _, err := p.queries.UpdateWebhookLogFailed.Exec(wl.ID, resp, note); err != nil {
+ p.log.Printf("error marking webhook log %d as failed: %v", wl.ID, err)
+ }
+ return
+ }
+
+ // Attempt delivery with retries.
+ p.attemptDelivery(wl, wh)
+}
+
+// attemptDelivery attempts to deliver a webhook with retry logic.
+func (p *WorkerPool) attemptDelivery(wl models.WebhookLog, wh Webhook) {
+ // Get the payload bytes.
+ payloadBytes, err := json.Marshal(wl.Payload)
+ if err != nil {
+ resp := models.WebhookResponse{}
+ note := fmt.Sprintf("error marshaling payload: %v", err)
+ if _, err := p.queries.UpdateWebhookLogFailed.Exec(wl.ID, resp, note); err != nil {
+ p.log.Printf("error marking webhook log %d as failed: %v", wl.ID, err)
+ }
+ return
+ }
+
+ // Check if context is cancelled.
+ select {
+ case <-p.ctx.Done():
+ // Reset the log to triggered so it can be picked up again.
+ if _, err := p.queries.MarkWebhookLogTriggered.Exec(wl.ID); err != nil {
+ p.log.Printf("error resetting webhook log %d: %v", wl.ID, err)
+ }
+ return
+ default:
+ }
+
+ // Check if we must wait for the next tick due to earlier retry failure
+ if wl.Retries > 0 {
+ backoff := time.Duration(1<
30*time.Second {
+ backoff = 30 * time.Second
+ }
+ now := time.Now()
+ if wl.LastRetriedAt.Valid && !now.After(wl.LastRetriedAt.Time.Add(backoff)) {
+ // we're trying too soon.. queue for later retry
+ return
+ }
+ }
+
+ // Attempt delivery.
+ resp, err := p.send(wh, wl.Event, payloadBytes)
+ if err == nil {
+ // Success - mark as completed.
+ if _, err := p.queries.UpdateWebhookLogSuccess.Exec(wl.ID, resp); err != nil {
+ p.log.Printf("error marking webhook log %d as success: %v", wl.ID, err)
+ }
+ if wl.Retries > 0 {
+ p.log.Printf("webhook %s (log %d) delivered after %d retries", wh.Name, wl.ID, wl.Retries)
+ }
+ return
+ }
+
+ if wl.Retries >= wh.MaxRetries {
+ // All retries exhausted - mark as failed.
+ resp := models.WebhookResponse{}
+ note := fmt.Sprintf("delivery failed after %d attempts", wh.MaxRetries+1)
+ if _, err := p.queries.UpdateWebhookLogFailed.Exec(wl.ID, resp, note); err != nil {
+ p.log.Printf("error marking webhook log %d as failed: %v", wl.ID, err)
+ }
+ p.log.Printf("webhook %s (log %d) delivery failed after %d attempts", wh.Name, wl.ID, wh.MaxRetries+1)
+ return
+ }
+
+ // Log the failure.
+ p.log.Printf("webhook %s (log %d) delivery attempt %d failed: %v", wh.Name, wl.ID, wl.Retries+wl.Retries+1, err)
+
+ // Update retry count.
+ note := fmt.Sprintf("attempt %d failed: %v", wl.Retries+wl.Retries+1, err)
+ if _, err := p.queries.UpdateWebhookLogRetry.Exec(wl.ID, resp, note); err != nil {
+ p.log.Printf("error updating webhook log %d retry: %v", wl.ID, err)
+ }
+}
+
+// send makes an HTTP request to deliver the webhook.
+func (p *WorkerPool) send(wh Webhook, event string, payload []byte) (models.WebhookResponse, error) {
+ resp := models.WebhookResponse{}
+
+ req, err := http.NewRequest(http.MethodPost, wh.URL, bytes.NewReader(payload))
+ if err != nil {
+ return resp, fmt.Errorf("creating request: %w", err)
+ }
+
+ // Set headers.
+ req.Header.Set("Content-Type", "application/json")
+ req.Header.Set("User-Agent", fmt.Sprintf("listmonk/%s", p.versionString))
+ req.Header.Set("X-Listmonk-Event", event)
+
+ // Apply authentication.
+ switch wh.AuthType {
+ case models.WebhookAuthTypeBasic:
+ req.SetBasicAuth(wh.AuthBasicUser, wh.AuthBasicPass)
+
+ case models.WebhookAuthTypeToken:
+ req.Header.Set("Authorization", "Bearer "+wh.AuthToken)
+ }
+
+ // Create a client with the specific timeout.
+ client := &http.Client{Timeout: wh.Timeout}
+
+ // Make the request.
+ httpResp, err := client.Do(req)
+ if err != nil {
+ return resp, fmt.Errorf("request failed: %w", err)
+ }
+ defer func() {
+ io.Copy(io.Discard, httpResp.Body)
+ httpResp.Body.Close()
+ }()
+
+ // Read response body (limit to 1KB to avoid memory issues).
+ bodyBytes, _ := io.ReadAll(io.LimitReader(httpResp.Body, 1024))
+ resp.StatusCode = httpResp.StatusCode
+ resp.Body = string(bodyBytes)
+
+ // Check if delivery was successful (2xx status).
+ if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
+ return resp, fmt.Errorf("non-2xx status: %d", httpResp.StatusCode)
+ }
+
+ return resp, nil
+}
diff --git a/models/queries.go b/models/queries.go
index 0703b6c37..c280939a1 100644
--- a/models/queries.go
+++ b/models/queries.go
@@ -134,6 +134,16 @@ type Queries struct {
DeleteRole *sqlx.Stmt `query:"delete-role"`
UpsertListPermissions *sqlx.Stmt `query:"upsert-list-permissions"`
DeleteListPermission *sqlx.Stmt `query:"delete-list-permission"`
+
+ // Webhook log queries.
+ CreateWebhookLog *sqlx.Stmt `query:"create-webhook-log"`
+ GetPendingWebhookLogs *sqlx.Stmt `query:"get-pending-webhook-logs"`
+ UpdateWebhookLogSuccess *sqlx.Stmt `query:"update-webhook-log-success"`
+ UpdateWebhookLogRetry *sqlx.Stmt `query:"update-webhook-log-retry"`
+ UpdateWebhookLogFailed *sqlx.Stmt `query:"update-webhook-log-failed"`
+ MarkWebhookLogTriggered *sqlx.Stmt `query:"mark-webhook-log-triggered"`
+ ResetStaleProcessingLogs *sqlx.Stmt `query:"reset-stale-processing-logs"`
+ DeleteOldWebhookLogs *sqlx.Stmt `query:"delete-old-webhook-logs"`
}
// compileSubscriberQueryTpl takes an arbitrary WHERE expressions
diff --git a/models/settings.go b/models/settings.go
index 0051cb177..48af2cb09 100644
--- a/models/settings.go
+++ b/models/settings.go
@@ -28,6 +28,9 @@ type Settings struct {
AppMessageSlidingWindowDuration string `json:"app.message_sliding_window_duration"`
AppMessageSlidingWindowRate int `json:"app.message_sliding_window_rate"`
+ AppWebhookWorkers int `json:"app.webhook_workers"`
+ AppWebhookBatchSize int `json:"app.webhook_batch_size"`
+
PrivacyIndividualTracking bool `json:"privacy.individual_tracking"`
PrivacyUnsubHeader bool `json:"privacy.unsubscribe_header"`
PrivacyAllowBlocklist bool `json:"privacy.allow_blocklist"`
@@ -110,6 +113,8 @@ type Settings struct {
MaxMsgRetries int `json:"max_msg_retries"`
} `json:"messengers"`
+ Webhooks []Webhook `json:"webhooks"`
+
BounceEnabled bool `json:"bounce.enabled"`
BounceEnableWebhooks bool `json:"bounce.webhooks_enabled"`
BounceActions map[string]struct {
diff --git a/models/webhooks.go b/models/webhooks.go
new file mode 100644
index 000000000..eefbd21eb
--- /dev/null
+++ b/models/webhooks.go
@@ -0,0 +1,143 @@
+package models
+
+import (
+ "database/sql/driver"
+ "encoding/json"
+ "time"
+
+ "gopkg.in/volatiletech/null.v6"
+)
+
+// Webhook event types.
+const (
+ // Subscriber events.
+ EventSubscriberCreated = "subscriber.created"
+ EventSubscriberUpdated = "subscriber.updated"
+ EventSubscriberDeleted = "subscriber.deleted"
+ EventSubscriberOptinStart = "subscriber.optin_start"
+ EventSubscriberOptinFinish = "subscriber.optin_finish"
+
+ // Batch import events.
+ EventBatchImportStarted = "batch_import.started"
+ EventBatchImportCompleted = "batch_import.completed"
+ EventBatchImportFailed = "batch_import.failed"
+
+ // Subscription events.
+ EventSubscriberAddedToList = "subscriber.added_to_list"
+ EventSubscriberRemovedFromList = "subscriber.removed_from_list"
+ EventSubscriberUnsubscribed = "subscriber.unsubscribed"
+
+ // Bounce events.
+ EventSubscriberBounced = "subscriber.bounced"
+
+ // Campaign events.
+ EventCampaignStarted = "campaign.started"
+ EventCampaignPaused = "campaign.paused"
+ EventCampaignCancelled = "campaign.cancelled"
+ EventCampaignFinished = "campaign.finished"
+)
+
+// Webhook auth types.
+const (
+ WebhookAuthTypeNone = "none"
+ WebhookAuthTypeBasic = "basic"
+ WebhookAuthTypeToken = "token"
+)
+
+// Webhook log status types.
+const (
+ WebhookLogStatusTriggered = "triggered"
+ WebhookLogStatusProcessing = "processing"
+ WebhookLogStatusCompleted = "completed"
+ WebhookLogStatusFailed = "failed"
+)
+
+// Webhook is the configured endpoint to send events to.
+type Webhook struct {
+ UUID string `json:"uuid"`
+ Enabled bool `json:"enabled"`
+ Name string `json:"name"`
+ URL string `json:"url"`
+ Events []string `json:"events"`
+ AuthType string `json:"auth_type"`
+ AuthBasicUser string `json:"auth_basic_user"`
+ AuthBasicPass string `json:"auth_basic_pass,omitempty"`
+ AuthToken string `json:"auth_token,omitempty"`
+ MaxRetries int `json:"max_retries"`
+ Timeout string `json:"timeout"`
+}
+
+// WebhookEvent represents an event payload to be sent to webhooks.
+type WebhookEvent struct {
+ Event string `json:"event"`
+ Timestamp time.Time `json:"timestamp"`
+ Data any `json:"data"`
+}
+
+// WebhookLog represents a webhook delivery log entry.
+type WebhookLog struct {
+ ID int `db:"id" json:"id"`
+ WebhookID string `db:"webhook_id" json:"webhook_id"`
+ Event string `db:"event" json:"event"`
+ Payload JSON `db:"payload" json:"payload"`
+ Status string `db:"status" json:"status"`
+ Retries int `db:"retries" json:"retries"`
+ LastRetriedAt null.Time `db:"last_retried_at" json:"last_retried_at"`
+ Response WebhookResponse `db:"response" json:"response"`
+ Note null.String `db:"note" json:"note"`
+ CreatedAt time.Time `db:"created_at" json:"created_at"`
+ UpdatedAt time.Time `db:"updated_at" json:"updated_at"`
+}
+
+// WebhookResponse stores the HTTP response details from webhook delivery.
+type WebhookResponse struct {
+ StatusCode int `json:"status_code,omitempty"`
+ Body string `json:"body,omitempty"`
+}
+
+// Scan implements the sql.Scanner interface for WebhookResponse.
+func (r *WebhookResponse) Scan(src interface{}) error {
+ if src == nil {
+ *r = WebhookResponse{}
+ return nil
+ }
+
+ var b []byte
+ switch v := src.(type) {
+ case []byte:
+ b = v
+ case string:
+ b = []byte(v)
+ default:
+ return nil
+ }
+
+ return json.Unmarshal(b, r)
+}
+
+// Value implements the driver.Valuer interface for WebhookResponse.
+func (r WebhookResponse) Value() (driver.Value, error) {
+ return json.Marshal(r)
+}
+
+// AllWebhookEvents returns a list of all available webhook events.
+func AllWebhookEvents() []string {
+ return []string{
+ EventBatchImportCompleted,
+ EventBatchImportFailed,
+ EventBatchImportStarted,
+ EventCampaignCancelled,
+ EventCampaignFinished,
+ EventCampaignPaused,
+ EventCampaignStarted,
+ EventSubscriberAddedToList,
+ EventSubscriberBounced,
+ EventSubscriberCreated,
+ EventSubscriberDeleted,
+ EventSubscriberOptinFinish,
+ EventSubscriberOptinStart,
+ EventSubscriberRemovedFromList,
+ EventSubscriberUnsubscribed,
+ EventSubscriberUpdated,
+ }
+}
diff --git a/queries/webhooks.sql b/queries/webhooks.sql
new file mode 100644
index 000000000..a926737d9
--- /dev/null
+++ b/queries/webhooks.sql
@@ -0,0 +1,72 @@
+-- webhooks
+
+-- name: create-webhook-log
+-- Creates a new webhook log entry with triggered status.
+INSERT INTO webhook_logs (webhook_id, event, payload, status)
+VALUES ($1, $2, $3, 'triggered')
+RETURNING id;
+
+-- name: get-pending-webhook-logs
+-- Fetches a batch of triggered webhook logs and locks them for processing.
+-- Uses SKIP LOCKED to allow concurrent workers to process different batches.
+UPDATE webhook_logs
+SET status = 'processing', updated_at = NOW()
+WHERE id IN (
+ SELECT id FROM webhook_logs
+ WHERE status = 'triggered'
+ ORDER BY created_at ASC
+ LIMIT $1
+ FOR UPDATE SKIP LOCKED
+)
+RETURNING id, webhook_id, event, payload, retries, created_at, updated_at;
+
+-- name: update-webhook-log-success
+-- Marks a webhook log as completed with response data.
+UPDATE webhook_logs
+SET status = 'completed',
+ response = $2,
+ updated_at = NOW()
+WHERE id = $1;
+
+-- name: update-webhook-log-retry
+-- Updates a webhook log after a failed attempt, incrementing retry count.
+UPDATE webhook_logs
+SET retries = retries + 1,
+ last_retried_at = NOW(),
+ response = $2,
+ note = $3,
+ updated_at = NOW()
+WHERE id = $1;
+
+-- name: update-webhook-log-failed
+-- Marks a webhook log as failed after all retries exhausted.
+UPDATE webhook_logs
+SET status = 'failed',
+ retries = retries + 1,
+ last_retried_at = NOW(),
+ response = $2,
+ note = $3,
+ updated_at = NOW()
+WHERE id = $1;
+
+-- name: mark-webhook-log-triggered
+-- Resets a processing webhook log back to triggered status (for recovery after crash).
+UPDATE webhook_logs
+SET status = 'triggered',
+ updated_at = NOW()
+WHERE id = $1;
+
+-- name: reset-stale-processing-logs
+-- Resets webhook logs that have been stuck in processing status for too long (recovery after crash).
+-- This should be called on app startup.
+UPDATE webhook_logs
+SET status = 'triggered',
+ updated_at = NOW()
+WHERE status = 'processing'
+ AND updated_at < NOW() - INTERVAL '5 minutes';
+
+-- name: delete-old-webhook-logs
+-- Deletes old completed and failed webhook logs older than specified days.
+DELETE FROM webhook_logs
+WHERE status IN ('completed', 'failed')
+ AND created_at < NOW() - ($1 || ' days')::INTERVAL;
diff --git a/schema.sql b/schema.sql
index 308161b46..504df476a 100644
--- a/schema.sql
+++ b/schema.sql
@@ -12,6 +12,7 @@ DROP TYPE IF EXISTS user_type CASCADE; CREATE TYPE user_type AS ENUM ('user', 'a
DROP TYPE IF EXISTS user_status CASCADE; CREATE TYPE user_status AS ENUM ('enabled', 'disabled');
DROP TYPE IF EXISTS role_type CASCADE; CREATE TYPE role_type AS ENUM ('user', 'list');
DROP TYPE IF EXISTS twofa_type CASCADE; CREATE TYPE twofa_type AS ENUM ('none', 'totp');
+DROP TYPE IF EXISTS webhook_log_status CASCADE; CREATE TYPE webhook_log_status AS ENUM ('triggered', 'processing', 'completed', 'failed');
CREATE EXTENSION IF NOT EXISTS pgcrypto;
@@ -294,7 +295,9 @@ INSERT INTO settings (key, value) VALUES
('appearance.admin.custom_js', '""'),
('appearance.public.custom_css', '""'),
('appearance.public.custom_js', '""'),
- ('maintenance.db', '{"vacuum": false, "vacuum_cron_interval": "0 2 * * *"}');
+ ('maintenance.db', '{"vacuum": false, "vacuum_cron_interval": "0 2 * * *"}'),
+ ('app.webhook_workers', '2'),
+ ('app.webhook_batch_size', '50');
-- bounces
DROP TABLE IF EXISTS bounces CASCADE;
@@ -357,6 +360,26 @@ CREATE TABLE sessions (
);
DROP INDEX IF EXISTS idx_sessions; CREATE INDEX idx_sessions ON sessions (id, created_at);
+-- webhook_logs
+DROP TABLE IF EXISTS webhook_logs CASCADE;
+CREATE TABLE webhook_logs (
+ id SERIAL PRIMARY KEY,
+ webhook_id TEXT NOT NULL,
+ event TEXT NOT NULL,
+ payload JSONB NOT NULL DEFAULT '{}',
+ status webhook_log_status NOT NULL DEFAULT 'triggered',
+ retries INT NOT NULL DEFAULT 0,
+ last_retried_at TIMESTAMP WITH TIME ZONE,
+ response JSONB NOT NULL DEFAULT '{}',
+ note TEXT,
+ created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
+ updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
+);
+DROP INDEX IF EXISTS idx_webhook_logs_webhook_id; CREATE INDEX idx_webhook_logs_webhook_id ON webhook_logs(webhook_id);
+DROP INDEX IF EXISTS idx_webhook_logs_status; CREATE INDEX idx_webhook_logs_status ON webhook_logs(status);
+DROP INDEX IF EXISTS idx_webhook_logs_created_at; CREATE INDEX idx_webhook_logs_created_at ON webhook_logs(created_at);
+DROP INDEX IF EXISTS idx_webhook_logs_status_created; CREATE INDEX idx_webhook_logs_status_created ON webhook_logs(status, created_at);
+
-- materialized views
-- dashboard stats