From b000b242224e45b43bc2eef0a9e976b2579df57c Mon Sep 17 00:00:00 2001 From: Gabriel Gilder Date: Thu, 12 Mar 2026 09:26:51 -0700 Subject: [PATCH 1/6] Replace usage of `Fatale` with context cancellation (#1639) * Replace log.Fatale with context-based error propagation This patch modifies gh-ost to use a cancellable context instead of log.Fatale() in listenOnPanicAbort. When using gh-ost as a library, this allows the calling application to recover from aborts (e.g. log the failure reason) instead of having the entire process terminate via os.Exit(). Now we store the error and cancel a context to signal all goroutines to stop gracefully. * Fix shadowing * Simplify non-blocking poll * Simplify non-blocking poll in migrator.go * Fix error return * Fix hang on blocking channel send * Add defensive fix for other potential blocking channel send deadlocks * Add SendWithContext helper to avoid deadlocks * Fix deadlock on PanicAbort sends * Use checkAbort * Fix migration abort race condition * Remove buffer on PanicAbort channel --- .github/CONTRIBUTING.md | 18 +++ go/base/context.go | 66 +++++++++ go/base/context_test.go | 57 ++++++++ go/logic/applier.go | 15 +- go/logic/migrator.go | 149 ++++++++++++++++---- go/logic/migrator_test.go | 281 ++++++++++++++++++++++++++++++++++++++ go/logic/server.go | 3 +- go/logic/streamer.go | 5 + go/logic/throttler.go | 22 ++- 9 files changed, 580 insertions(+), 36 deletions(-) diff --git a/.github/CONTRIBUTING.md b/.github/CONTRIBUTING.md index e681e5a1b..733cf2601 100644 --- a/.github/CONTRIBUTING.md +++ b/.github/CONTRIBUTING.md @@ -19,6 +19,24 @@ Here are a few things you can do that will increase the likelihood of your pull - Keep your change as focused as possible. If there are multiple changes you would like to make that are not dependent upon each other, consider submitting them as separate pull requests. - Write a [good commit message](http://tbaggery.com/2008/04/19/a-note-about-git-commit-messages.html). +## Development Guidelines + +### Channel Safety + +When working with channels in goroutines, it's critical to prevent deadlocks that can occur when a channel receiver exits due to an error while senders are still trying to send values. Always use `base.SendWithContext` for channel sends to avoid deadlocks: + +```go +// ✅ CORRECT - Uses helper to prevent deadlock +if err := base.SendWithContext(ctx, ch, value); err != nil { + return err // context was cancelled +} + +// ❌ WRONG - Can deadlock if receiver exits +ch <- value +``` + +Even if the destination channel is buffered, deadlocks could still occur if the buffer fills up and the receiver exits, so it's important to use `SendWithContext` in those cases as well. + ## Resources - [Contributing to Open Source on GitHub](https://guides.github.com/activities/contributing-to-open-source/) diff --git a/go/base/context.go b/go/base/context.go index 891e27fef..2c8d28d56 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -6,6 +6,7 @@ package base import ( + "context" "fmt" "math" "os" @@ -225,6 +226,16 @@ type MigrationContext struct { InCutOverCriticalSectionFlag int64 PanicAbort chan error + // Context for cancellation signaling across all goroutines + // Stored in struct as it spans the entire migration lifecycle, not per-function. + // context.Context is safe for concurrent use by multiple goroutines. + ctx context.Context //nolint:containedctx + cancelFunc context.CancelFunc + + // Stores the fatal error that triggered abort + AbortError error + abortMutex *sync.Mutex + OriginalTableColumnsOnApplier *sql.ColumnList OriginalTableColumns *sql.ColumnList OriginalTableVirtualColumns *sql.ColumnList @@ -293,6 +304,7 @@ type ContextConfig struct { } func NewMigrationContext() *MigrationContext { + ctx, cancelFunc := context.WithCancel(context.Background()) return &MigrationContext{ Uuid: uuid.NewString(), defaultNumRetries: 60, @@ -313,6 +325,9 @@ func NewMigrationContext() *MigrationContext { lastHeartbeatOnChangelogMutex: &sync.Mutex{}, ColumnRenameMap: make(map[string]string), PanicAbort: make(chan error), + ctx: ctx, + cancelFunc: cancelFunc, + abortMutex: &sync.Mutex{}, Log: NewDefaultLogger(), } } @@ -982,3 +997,54 @@ func (this *MigrationContext) GetGhostTriggerName(triggerName string) string { func (this *MigrationContext) ValidateGhostTriggerLengthBelowMaxLength(triggerName string) bool { return utf8.RuneCountInString(triggerName) <= mysql.MaxTableNameLength } + +// GetContext returns the migration context for cancellation checking +func (this *MigrationContext) GetContext() context.Context { + return this.ctx +} + +// SetAbortError stores the fatal error that triggered abort +// Only the first error is stored (subsequent errors are ignored) +func (this *MigrationContext) SetAbortError(err error) { + this.abortMutex.Lock() + defer this.abortMutex.Unlock() + if this.AbortError == nil { + this.AbortError = err + } +} + +// GetAbortError retrieves the stored abort error +func (this *MigrationContext) GetAbortError() error { + this.abortMutex.Lock() + defer this.abortMutex.Unlock() + return this.AbortError +} + +// CancelContext cancels the migration context to signal all goroutines to stop +// The cancel function is safe to call multiple times and from multiple goroutines. +func (this *MigrationContext) CancelContext() { + if this.cancelFunc != nil { + this.cancelFunc() + } +} + +// SendWithContext attempts to send a value to a channel, but returns early +// if the context is cancelled. This prevents goroutine deadlocks when the +// channel receiver has exited due to an error. +// +// Use this instead of bare channel sends (ch <- val) in goroutines to ensure +// proper cleanup when the migration is aborted. +// +// Example: +// +// if err := base.SendWithContext(ctx, ch, value); err != nil { +// return err // context was cancelled +// } +func SendWithContext[T any](ctx context.Context, ch chan<- T, val T) error { + select { + case ch <- val: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/go/base/context_test.go b/go/base/context_test.go index f8bce6f27..a9f62150d 100644 --- a/go/base/context_test.go +++ b/go/base/context_test.go @@ -6,8 +6,10 @@ package base import ( + "errors" "os" "strings" + "sync" "testing" "time" @@ -213,3 +215,58 @@ func TestReadConfigFile(t *testing.T) { } } } + +func TestSetAbortError_StoresFirstError(t *testing.T) { + ctx := NewMigrationContext() + + err1 := errors.New("first error") + err2 := errors.New("second error") + + ctx.SetAbortError(err1) + ctx.SetAbortError(err2) + + got := ctx.GetAbortError() + if got != err1 { //nolint:errorlint // Testing pointer equality for sentinel error + t.Errorf("Expected first error %v, got %v", err1, got) + } +} + +func TestSetAbortError_ThreadSafe(t *testing.T) { + ctx := NewMigrationContext() + + var wg sync.WaitGroup + errs := []error{ + errors.New("error 1"), + errors.New("error 2"), + errors.New("error 3"), + } + + // Launch 3 goroutines trying to set error concurrently + for _, err := range errs { + wg.Add(1) + go func(e error) { + defer wg.Done() + ctx.SetAbortError(e) + }(err) + } + + wg.Wait() + + // Should store exactly one of the errors + got := ctx.GetAbortError() + if got == nil { + t.Fatal("Expected error to be stored, got nil") + } + + // Verify it's one of the errors we sent + found := false + for _, err := range errs { + if got == err { //nolint:errorlint // Testing pointer equality for sentinel error + found = true + break + } + } + if !found { + t.Errorf("Stored error %v not in list of sent errors", got) + } +} diff --git a/go/logic/applier.go b/go/logic/applier.go index 58761d844..6c09dd61e 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -695,7 +695,17 @@ func (this *Applier) InitiateHeartbeat() { ticker := time.NewTicker(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond) defer ticker.Stop() - for range ticker.C { + for { + // Check for context cancellation each iteration + ctx := this.migrationContext.GetContext() + select { + case <-ctx.Done(): + this.migrationContext.Log.Debugf("Heartbeat injection cancelled") + return + case <-ticker.C: + // Process heartbeat + } + if atomic.LoadInt64(&this.finishedMigrating) > 0 { return } @@ -706,7 +716,8 @@ func (this *Applier) InitiateHeartbeat() { continue } if err := injectHeartbeat(); err != nil { - this.migrationContext.PanicAbort <- fmt.Errorf("injectHeartbeat writing failed %d times, last error: %w", numSuccessiveFailures, err) + // Use helper to prevent deadlock if listenOnPanicAbort already exited + _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, fmt.Errorf("injectHeartbeat writing failed %d times, last error: %w", numSuccessiveFailures, err)) return } } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index aa9a97c1c..ca5f5a729 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -163,7 +163,8 @@ func (this *Migrator) retryOperation(operation func() error, notFatalHint ...boo // there's an error. Let's try again. } if len(notFatalHint) == 0 { - this.migrationContext.PanicAbort <- err + // Use helper to prevent deadlock if listenOnPanicAbort already exited + _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err) } return err } @@ -191,7 +192,8 @@ func (this *Migrator) retryOperationWithExponentialBackoff(operation func() erro } } if len(notFatalHint) == 0 { - this.migrationContext.PanicAbort <- err + // Use helper to prevent deadlock if listenOnPanicAbort already exited + _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err) } return err } @@ -200,14 +202,19 @@ func (this *Migrator) retryOperationWithExponentialBackoff(operation func() erro // consumes and drops any further incoming events that may be left hanging. func (this *Migrator) consumeRowCopyComplete() { if err := <-this.rowCopyComplete; err != nil { - this.migrationContext.PanicAbort <- err + // Abort synchronously to ensure checkAbort() sees the error immediately + this.abort(err) + // Don't mark row copy as complete if there was an error + return } atomic.StoreInt64(&this.rowCopyCompleteFlag, 1) this.migrationContext.MarkRowCopyEndTime() go func() { for err := range this.rowCopyComplete { if err != nil { - this.migrationContext.PanicAbort <- err + // Abort synchronously to ensure the error is stored immediately + this.abort(err) + return } } }() @@ -238,14 +245,14 @@ func (this *Migrator) onChangelogStateEvent(dmlEntry *binlog.BinlogEntry) (err e case Migrated, ReadMigrationRangeValues: // no-op event case GhostTableMigrated: - this.ghostTableMigrated <- true + // Use helper to prevent deadlock if migration aborts before receiver is ready + _ = base.SendWithContext(this.migrationContext.GetContext(), this.ghostTableMigrated, true) case AllEventsUpToLockProcessed: var applyEventFunc tableWriteFunc = func() error { - this.allEventsUpToLockProcessed <- &lockProcessedStruct{ + return base.SendWithContext(this.migrationContext.GetContext(), this.allEventsUpToLockProcessed, &lockProcessedStruct{ state: changelogStateString, coords: dmlEntry.Coordinates.Clone(), - } - return nil + }) } // at this point we know all events up to lock have been read from the streamer, // because the streamer works sequentially. So those events are either already handled, @@ -253,7 +260,8 @@ func (this *Migrator) onChangelogStateEvent(dmlEntry *binlog.BinlogEntry) (err e // So as not to create a potential deadlock, we write this func to applyEventsQueue // asynchronously, understanding it doesn't really matter. go func() { - this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) + // Use helper to prevent deadlock if buffer fills and executeWriteFuncs exits + _ = base.SendWithContext(this.migrationContext.GetContext(), this.applyEventsQueue, newApplyEventStructByFunc(&applyEventFunc)) }() default: return fmt.Errorf("Unknown changelog state: %+v", changelogState) @@ -277,10 +285,24 @@ func (this *Migrator) onChangelogHeartbeatEvent(dmlEntry *binlog.BinlogEntry) (e } } -// listenOnPanicAbort aborts on abort request +// abort stores the error, cancels the context, and logs the abort. +// This is the common abort logic used by both listenOnPanicAbort and +// consumeRowCopyComplete to ensure consistent error handling. +func (this *Migrator) abort(err error) { + // Store the error for Migrate() to return + this.migrationContext.SetAbortError(err) + + // Cancel the context to signal all goroutines to stop + this.migrationContext.CancelContext() + + // Log the error (but don't panic or exit) + this.migrationContext.Log.Errorf("Migration aborted: %v", err) +} + +// listenOnPanicAbort listens for fatal errors and initiates graceful shutdown func (this *Migrator) listenOnPanicAbort() { err := <-this.migrationContext.PanicAbort - this.migrationContext.Log.Fatale(err) + this.abort(err) } // validateAlterStatement validates the `alter` statement meets criteria. @@ -348,10 +370,36 @@ func (this *Migrator) createFlagFiles() (err error) { return nil } +// checkAbort returns abort error if migration was aborted +func (this *Migrator) checkAbort() error { + if abortErr := this.migrationContext.GetAbortError(); abortErr != nil { + return abortErr + } + + ctx := this.migrationContext.GetContext() + if ctx != nil { + select { + case <-ctx.Done(): + // Context cancelled but no abort error stored yet + if abortErr := this.migrationContext.GetAbortError(); abortErr != nil { + return abortErr + } + return ctx.Err() + default: + // Not cancelled + } + } + return nil +} + // Migrate executes the complete migration logic. This is *the* major gh-ost function. func (this *Migrator) Migrate() (err error) { this.migrationContext.Log.Infof("Migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) this.migrationContext.StartTime = time.Now() + + // Ensure context is cancelled on exit (cleanup) + defer this.migrationContext.CancelContext() + if this.migrationContext.Hostname, err = os.Hostname(); err != nil { return err } @@ -375,6 +423,9 @@ func (this *Migrator) Migrate() (err error) { if err := this.initiateInspector(); err != nil { return err } + if err := this.checkAbort(); err != nil { + return err + } // If we are resuming, we will initiateStreaming later when we know // the binlog coordinates to resume streaming from. // If not resuming, the streamer must be initiated before the applier, @@ -383,10 +434,16 @@ func (this *Migrator) Migrate() (err error) { if err := this.initiateStreaming(); err != nil { return err } + if err := this.checkAbort(); err != nil { + return err + } } if err := this.initiateApplier(); err != nil { return err } + if err := this.checkAbort(); err != nil { + return err + } if err := this.createFlagFiles(); err != nil { return err } @@ -493,6 +550,10 @@ func (this *Migrator) Migrate() (err error) { this.migrationContext.Log.Debugf("Operating until row copy is complete") this.consumeRowCopyComplete() this.migrationContext.Log.Infof("Row copy complete") + // Check if row copy was aborted due to error + if err := this.checkAbort(); err != nil { + return err + } if err := this.hooksExecutor.onRowCopyComplete(); err != nil { return err } @@ -532,6 +593,10 @@ func (this *Migrator) Migrate() (err error) { return err } this.migrationContext.Log.Infof("Done migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) + // Final check for abort before declaring success + if err := this.checkAbort(); err != nil { + return err + } return nil } @@ -543,6 +608,10 @@ func (this *Migrator) Revert() error { sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName), sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OldTableName)) this.migrationContext.StartTime = time.Now() + + // Ensure context is cancelled on exit (cleanup) + defer this.migrationContext.CancelContext() + var err error if this.migrationContext.Hostname, err = os.Hostname(); err != nil { return err @@ -561,9 +630,15 @@ func (this *Migrator) Revert() error { if err := this.initiateInspector(); err != nil { return err } + if err := this.checkAbort(); err != nil { + return err + } if err := this.initiateApplier(); err != nil { return err } + if err := this.checkAbort(); err != nil { + return err + } if err := this.createFlagFiles(); err != nil { return err } @@ -588,6 +663,9 @@ func (this *Migrator) Revert() error { if err := this.initiateStreaming(); err != nil { return err } + if err := this.checkAbort(); err != nil { + return err + } if err := this.hooksExecutor.onValidated(); err != nil { return err } @@ -1293,7 +1371,8 @@ func (this *Migrator) initiateStreaming() error { this.migrationContext.Log.Debugf("Beginning streaming") err := this.eventsStreamer.StreamEvents(this.canStopStreaming) if err != nil { - this.migrationContext.PanicAbort <- err + // Use helper to prevent deadlock if listenOnPanicAbort already exited + _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err) } this.migrationContext.Log.Debugf("Done streaming") }() @@ -1319,8 +1398,9 @@ func (this *Migrator) addDMLEventsListener() error { this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, func(dmlEntry *binlog.BinlogEntry) error { - this.applyEventsQueue <- newApplyEventStructByDML(dmlEntry) - return nil + // Use helper to prevent deadlock if buffer fills and executeWriteFuncs exits + // This is critical because this callback blocks the event streamer + return base.SendWithContext(this.migrationContext.GetContext(), this.applyEventsQueue, newApplyEventStructByDML(dmlEntry)) }, ) return err @@ -1398,7 +1478,7 @@ func (this *Migrator) initiateApplier() error { // a chunk of rows onto the ghost table. func (this *Migrator) iterateChunks() error { terminateRowIteration := func(err error) error { - this.rowCopyComplete <- err + _ = base.SendWithContext(this.migrationContext.GetContext(), this.rowCopyComplete, err) return this.migrationContext.Log.Errore(err) } if this.migrationContext.Noop { @@ -1413,6 +1493,9 @@ func (this *Migrator) iterateChunks() error { var hasNoFurtherRangeFlag int64 // Iterate per chunk: for { + if err := this.checkAbort(); err != nil { + return terminateRowIteration(err) + } if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { // Done // There's another such check down the line @@ -1459,7 +1542,7 @@ func (this *Migrator) iterateChunks() error { this.migrationContext.Log.Infof("ApplyIterationInsertQuery has SQL warnings! %s", warning) } joinedWarnings := strings.Join(this.migrationContext.MigrationLastInsertSQLWarnings, "; ") - terminateRowIteration(fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings)) + return terminateRowIteration(fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings)) } } @@ -1482,7 +1565,14 @@ func (this *Migrator) iterateChunks() error { return nil } // Enqueue copy operation; to be executed by executeWriteFuncs() - this.copyRowsQueue <- copyRowsFunc + // Use helper to prevent deadlock if executeWriteFuncs exits + if err := base.SendWithContext(this.migrationContext.GetContext(), this.copyRowsQueue, copyRowsFunc); err != nil { + // Context cancelled, check for abort and exit + if abortErr := this.checkAbort(); abortErr != nil { + return terminateRowIteration(abortErr) + } + return terminateRowIteration(err) + } } } @@ -1563,20 +1653,18 @@ func (this *Migrator) Checkpoint(ctx context.Context) (*Checkpoint, error) { this.applier.LastIterationRangeMutex.Unlock() for { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - this.applier.CurrentCoordinatesMutex.Lock() - if coords.SmallerThanOrEquals(this.applier.CurrentCoordinates) { - id, err := this.applier.WriteCheckpoint(chk) - chk.Id = id - this.applier.CurrentCoordinatesMutex.Unlock() - return chk, err - } + if err := ctx.Err(); err != nil { + return nil, err + } + this.applier.CurrentCoordinatesMutex.Lock() + if coords.SmallerThanOrEquals(this.applier.CurrentCoordinates) { + id, err := this.applier.WriteCheckpoint(chk) + chk.Id = id this.applier.CurrentCoordinatesMutex.Unlock() - time.Sleep(500 * time.Millisecond) + return chk, err } + this.applier.CurrentCoordinatesMutex.Unlock() + time.Sleep(500 * time.Millisecond) } } @@ -1649,6 +1737,9 @@ func (this *Migrator) executeWriteFuncs() error { return nil } for { + if err := this.checkAbort(); err != nil { + return err + } if atomic.LoadInt64(&this.finishedMigrating) > 0 { return nil } diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index c4fd49233..42b6fed37 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -931,3 +931,284 @@ func (suite *MigratorTestSuite) TestRevert() { func TestMigrator(t *testing.T) { suite.Run(t, new(MigratorTestSuite)) } + +func TestPanicAbort_PropagatesError(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrator := NewMigrator(migrationContext, "1.0.0") + + // Start listenOnPanicAbort + go migrator.listenOnPanicAbort() + + // Send an error to PanicAbort + testErr := errors.New("test abort error") + go func() { + migrationContext.PanicAbort <- testErr + }() + + // Wait a bit for error to be processed + time.Sleep(100 * time.Millisecond) + + // Verify error was stored + got := migrationContext.GetAbortError() + if got != testErr { //nolint:errorlint // Testing pointer equality for sentinel error + t.Errorf("Expected error %v, got %v", testErr, got) + } + + // Verify context was cancelled + ctx := migrationContext.GetContext() + select { + case <-ctx.Done(): + // Success - context was cancelled + default: + t.Error("Expected context to be cancelled") + } +} + +func TestPanicAbort_FirstErrorWins(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrator := NewMigrator(migrationContext, "1.0.0") + + // Start listenOnPanicAbort + go migrator.listenOnPanicAbort() + + // Send first error + err1 := errors.New("first error") + go func() { + migrationContext.PanicAbort <- err1 + }() + + // Wait for first error to be processed + time.Sleep(50 * time.Millisecond) + + // Try to send second error (should be ignored) + err2 := errors.New("second error") + migrationContext.SetAbortError(err2) + + // Verify only first error is stored + got := migrationContext.GetAbortError() + if got != err1 { //nolint:errorlint // Testing pointer equality for sentinel error + t.Errorf("Expected first error %v, got %v", err1, got) + } +} + +func TestAbort_AfterRowCopy(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrator := NewMigrator(migrationContext, "1.0.0") + + // Start listenOnPanicAbort + go migrator.listenOnPanicAbort() + + // Give listenOnPanicAbort time to start + time.Sleep(20 * time.Millisecond) + + // Simulate row copy error by sending to rowCopyComplete in a goroutine + // (unbuffered channel, so send must be async) + testErr := errors.New("row copy failed") + go func() { + migrator.rowCopyComplete <- testErr + }() + + // Consume the error (simulating what Migrate() does) + // This is a blocking call that waits for the error + migrator.consumeRowCopyComplete() + + // Wait for the error to be processed by listenOnPanicAbort + time.Sleep(50 * time.Millisecond) + + // Check that error was stored + if got := migrationContext.GetAbortError(); got == nil { + t.Fatal("Expected abort error to be stored after row copy error") + } else if got.Error() != "row copy failed" { + t.Errorf("Expected 'row copy failed', got %v", got) + } + + // Verify context was cancelled + ctx := migrationContext.GetContext() + select { + case <-ctx.Done(): + // Success + case <-time.After(1 * time.Second): + t.Error("Expected context to be cancelled after row copy error") + } +} + +func TestAbort_DuringInspection(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrator := NewMigrator(migrationContext, "1.0.0") + + // Start listenOnPanicAbort + go migrator.listenOnPanicAbort() + + // Simulate error during inspection phase + testErr := errors.New("inspection failed") + go func() { + time.Sleep(10 * time.Millisecond) + select { + case migrationContext.PanicAbort <- testErr: + case <-migrationContext.GetContext().Done(): + } + }() + + // Wait for abort to be processed + time.Sleep(50 * time.Millisecond) + + // Call checkAbort (simulating what Migrate() does after initiateInspector) + err := migrator.checkAbort() + if err == nil { + t.Fatal("Expected checkAbort to return error after abort during inspection") + } + + if err.Error() != "inspection failed" { + t.Errorf("Expected 'inspection failed', got %v", err) + } +} + +func TestAbort_DuringStreaming(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrator := NewMigrator(migrationContext, "1.0.0") + + // Start listenOnPanicAbort + go migrator.listenOnPanicAbort() + + // Simulate error from streaming goroutine + testErr := errors.New("streaming error") + go func() { + time.Sleep(10 * time.Millisecond) + // Use select pattern like actual code does + select { + case migrationContext.PanicAbort <- testErr: + case <-migrationContext.GetContext().Done(): + } + }() + + // Wait for abort to be processed + time.Sleep(50 * time.Millisecond) + + // Verify error stored and context cancelled + if got := migrationContext.GetAbortError(); got == nil { + t.Fatal("Expected abort error to be stored") + } else if got.Error() != "streaming error" { + t.Errorf("Expected 'streaming error', got %v", got) + } + + // Verify checkAbort catches it + err := migrator.checkAbort() + if err == nil { + t.Fatal("Expected checkAbort to return error after streaming abort") + } +} + +func TestRetryExhaustion_TriggersAbort(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrationContext.SetDefaultNumRetries(2) // Only 2 retries + migrator := NewMigrator(migrationContext, "1.0.0") + + // Start listenOnPanicAbort + go migrator.listenOnPanicAbort() + + // Operation that always fails + callCount := 0 + operation := func() error { + callCount++ + return errors.New("persistent failure") + } + + // Call retryOperation (with notFatalHint=false so it sends to PanicAbort) + err := migrator.retryOperation(operation) + + // Should have called operation MaxRetries times + if callCount != 2 { + t.Errorf("Expected 2 retry attempts, got %d", callCount) + } + + // Should return the error + if err == nil { + t.Fatal("Expected retryOperation to return error") + } + + // Wait for abort to be processed + time.Sleep(100 * time.Millisecond) + + // Verify error was sent to PanicAbort and stored + if got := migrationContext.GetAbortError(); got == nil { + t.Error("Expected abort error to be stored after retry exhaustion") + } + + // Verify context was cancelled + ctx := migrationContext.GetContext() + select { + case <-ctx.Done(): + // Success + default: + t.Error("Expected context to be cancelled after retry exhaustion") + } +} + +func TestRevert_AbortsOnError(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrationContext.Revert = true + migrationContext.OldTableName = "_test_del" + migrationContext.OriginalTableName = "test" + migrationContext.DatabaseName = "testdb" + migrator := NewMigrator(migrationContext, "1.0.0") + + // Start listenOnPanicAbort + go migrator.listenOnPanicAbort() + + // Simulate error during revert + testErr := errors.New("revert failed") + go func() { + time.Sleep(10 * time.Millisecond) + select { + case migrationContext.PanicAbort <- testErr: + case <-migrationContext.GetContext().Done(): + } + }() + + // Wait for abort to be processed + time.Sleep(50 * time.Millisecond) + + // Verify checkAbort catches it + err := migrator.checkAbort() + if err == nil { + t.Fatal("Expected checkAbort to return error during revert") + } + + if err.Error() != "revert failed" { + t.Errorf("Expected 'revert failed', got %v", err) + } + + // Verify context was cancelled + ctx := migrationContext.GetContext() + select { + case <-ctx.Done(): + // Success + default: + t.Error("Expected context to be cancelled during revert abort") + } +} + +func TestCheckAbort_ReturnsNilWhenNoError(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrator := NewMigrator(migrationContext, "1.0.0") + + // No error has occurred + err := migrator.checkAbort() + if err != nil { + t.Errorf("Expected no error, got %v", err) + } +} + +func TestCheckAbort_DetectsContextCancellation(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrator := NewMigrator(migrationContext, "1.0.0") + + // Cancel context directly (without going through PanicAbort) + migrationContext.CancelContext() + + // checkAbort should detect the cancellation + err := migrator.checkAbort() + if err == nil { + t.Fatal("Expected checkAbort to return error when context is cancelled") + } +} diff --git a/go/logic/server.go b/go/logic/server.go index 45e5b2bd4..74097acb7 100644 --- a/go/logic/server.go +++ b/go/logic/server.go @@ -450,7 +450,8 @@ help # This message return NoPrintStatusRule, err } err := fmt.Errorf("User commanded 'panic'. The migration will be aborted without cleanup. Please drop the gh-ost tables before trying again.") - this.migrationContext.PanicAbort <- err + // Use helper to prevent deadlock if listenOnPanicAbort already exited + _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err) return NoPrintStatusRule, err } default: diff --git a/go/logic/streamer.go b/go/logic/streamer.go index 63afc3f3d..1c2635138 100644 --- a/go/logic/streamer.go +++ b/go/logic/streamer.go @@ -186,7 +186,12 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error { // The next should block and execute forever, unless there's a serious error. var successiveFailures int var reconnectCoords mysql.BinlogCoordinates + ctx := this.migrationContext.GetContext() for { + // Check for context cancellation each iteration + if err := ctx.Err(); err != nil { + return err + } if canStopStreaming() { return nil } diff --git a/go/logic/throttler.go b/go/logic/throttler.go index 7a6534baf..1ca40f957 100644 --- a/go/logic/throttler.go +++ b/go/logic/throttler.go @@ -362,7 +362,9 @@ func (this *Throttler) collectGeneralThrottleMetrics() error { // Regardless of throttle, we take opportunity to check for panic-abort if this.migrationContext.PanicFlagFile != "" { if base.FileExists(this.migrationContext.PanicFlagFile) { - this.migrationContext.PanicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile) + // Use helper to prevent deadlock if listenOnPanicAbort already exited + _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile)) + return nil } } @@ -385,7 +387,9 @@ func (this *Throttler) collectGeneralThrottleMetrics() error { } if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds == 0 { - this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold) + // Use helper to prevent deadlock if listenOnPanicAbort already exited + _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)) + return nil } if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds > 0 { this.migrationContext.Log.Errorf("critical-load met once: %s=%d, >=%d. Will check again in %d millis", variableName, value, threshold, this.migrationContext.CriticalLoadIntervalMilliseconds) @@ -393,7 +397,8 @@ func (this *Throttler) collectGeneralThrottleMetrics() error { timer := time.NewTimer(time.Millisecond * time.Duration(this.migrationContext.CriticalLoadIntervalMilliseconds)) <-timer.C if criticalLoadMetAgain, variableName, value, threshold, _ := this.criticalLoadIsMet(); criticalLoadMetAgain { - this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met again after %d millis: %s=%d, >=%d", this.migrationContext.CriticalLoadIntervalMilliseconds, variableName, value, threshold) + // Use helper to prevent deadlock if listenOnPanicAbort already exited + _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, fmt.Errorf("critical-load met again after %d millis: %s=%d, >=%d", this.migrationContext.CriticalLoadIntervalMilliseconds, variableName, value, threshold)) } }() } @@ -481,7 +486,16 @@ func (this *Throttler) initiateThrottlerChecks() { ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() - for range ticker.C { + for { + // Check for context cancellation each iteration + ctx := this.migrationContext.GetContext() + select { + case <-ctx.Done(): + return + case <-ticker.C: + // Process throttle check + } + if atomic.LoadInt64(&this.finishedMigrating) > 0 { return } From 67cc636b9ff3eb08ae118a610951747d5e7e5e8c Mon Sep 17 00:00:00 2001 From: Gabriel Gilder Date: Tue, 17 Mar 2026 09:12:15 -0700 Subject: [PATCH 2/6] Improve tests for various error scenarios (#1642) * Improve tests for various error scenarios - Regex meta characters in index names should not break warning detection (required code fix) - Improve tests that only checked number of rows (need to validate data as well) - Test positive case allowing ignored duplicates on migration key - Test behavior with PanicOnWarnings disabled * Address Copilot feedback * Add test for warnings on composite unique keys * Add test for updating pk with duplicate * Improve replica test debugging - Print log excerpt on failure - Upload full log artifacts on failure * Reduce flakiness in update-pk test * Revise test * More robust test fix * Make MySQL wait strategy less flaky Removed the `wait.ForExposedPort()` override from test files. The tests will now use the MySQL module's default wait strategy (`wait.ForLog("port: 3306 MySQL Community Server")`), which properly waits for MySQL to be ready to accept connections. Otherwise the port may be exposed, but MySQL is still initializing and not ready to accept connections. * Customize update-pk integration test Add support for test-specific execution so that we can guarantee that we're specifically testing the DML apply phase * Fix regression in integration test harness * Add test timeouts and fix error propagation Prevent indefinite test hangs by adding 120-second timeout and duration reporting. Fix silent error drops by propagating errors from background write goroutines to PanicAbort channel. Check for abort in sleepWhileTrue loop and handle its error in cutOver. --- .github/workflows/replica-tests.yml | 14 + go/logic/applier.go | 37 +- go/logic/applier_test.go | 403 +++++++++++++++++- go/logic/migrator.go | 24 +- go/logic/migrator_test.go | 2 - go/logic/streamer_test.go | 2 - .../create.sql | 10 + .../expect_failure | 1 + .../extra_args | 1 + .../test.sh | 54 +++ localtests/test.sh | 193 ++++++--- 11 files changed, 656 insertions(+), 85 deletions(-) create mode 100644 localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/create.sql create mode 100644 localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/expect_failure create mode 100644 localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/extra_args create mode 100755 localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/test.sh diff --git a/.github/workflows/replica-tests.yml b/.github/workflows/replica-tests.yml index 957d7a176..5a9bfb7f7 100644 --- a/.github/workflows/replica-tests.yml +++ b/.github/workflows/replica-tests.yml @@ -28,6 +28,20 @@ jobs: - name: Run tests run: script/docker-gh-ost-replica-tests run + - name: Set artifact name + if: failure() + run: | + ARTIFACT_NAME=$(echo "${{ matrix.image }}" | tr '/:' '-') + echo "ARTIFACT_NAME=test-logs-${ARTIFACT_NAME}" >> $GITHUB_ENV + + - name: Upload test logs on failure + if: failure() + uses: actions/upload-artifact@v4 + with: + name: ${{ env.ARTIFACT_NAME }} + path: /tmp/gh-ost-test.* + retention-days: 7 + - name: Teardown environment if: always() run: script/docker-gh-ost-replica-tests down diff --git a/go/logic/applier.go b/go/logic/applier.go index 6c09dd61e..ec2d7be10 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -94,6 +94,21 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier { } } +// compileMigrationKeyWarningRegex compiles a regex pattern that matches duplicate key warnings +// for the migration's unique key. Duplicate warnings are formatted differently across MySQL versions, +// hence the optional table name prefix. Metacharacters in table/index names are escaped to avoid +// regex syntax errors. +func (this *Applier) compileMigrationKeyWarningRegex() (*regexp.Regexp, error) { + escapedTable := regexp.QuoteMeta(this.migrationContext.GetGhostTableName()) + escapedKey := regexp.QuoteMeta(this.migrationContext.UniqueKey.NameInGhostTable) + migrationUniqueKeyPattern := fmt.Sprintf(`for key '(%s\.)?%s'`, escapedTable, escapedKey) + migrationKeyRegex, err := regexp.Compile(migrationUniqueKeyPattern) + if err != nil { + return nil, fmt.Errorf("failed to compile migration key pattern: %w", err) + } + return migrationKeyRegex, nil +} + func (this *Applier) InitDBConnections() (err error) { applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName) uriWithMulti := fmt.Sprintf("%s&multiStatements=true", applierUri) @@ -928,6 +943,12 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected return nil, err } + // Compile regex once before loop to avoid performance penalty and handle errors properly + migrationKeyRegex, err := this.compileMigrationKeyWarningRegex() + if err != nil { + return nil, err + } + var sqlWarnings []string for rows.Next() { var level, message string @@ -936,10 +957,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected this.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row") continue } - // Duplicate warnings are formatted differently across mysql versions, hence the optional table name prefix - migrationUniqueKeyExpression := fmt.Sprintf("for key '(%s\\.)?%s'", this.migrationContext.GetGhostTableName(), this.migrationContext.UniqueKey.NameInGhostTable) - matched, _ := regexp.MatchString(migrationUniqueKeyExpression, message) - if strings.Contains(message, "Duplicate entry") && matched { + if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) { continue } sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code)) @@ -1570,6 +1588,12 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) return rollback(err) } + // Compile regex once before loop to avoid performance penalty and handle errors properly + migrationKeyRegex, err := this.compileMigrationKeyWarningRegex() + if err != nil { + return rollback(err) + } + var sqlWarnings []string for rows.Next() { var level, message string @@ -1578,10 +1602,7 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) this.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row") continue } - // Duplicate warnings are formatted differently across mysql versions, hence the optional table name prefix - migrationUniqueKeyExpression := fmt.Sprintf("for key '(%s\\.)?%s'", this.migrationContext.GetGhostTableName(), this.migrationContext.UniqueKey.NameInGhostTable) - matched, _ := regexp.MatchString(migrationUniqueKeyExpression, message) - if strings.Contains(message, "Duplicate entry") && matched { + if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) { // Duplicate entry on migration unique key is expected during binlog replay // (row was already copied during bulk copy phase) continue diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 9c761373a..6b02e0c02 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -23,7 +23,6 @@ import ( "github.com/github/gh-ost/go/binlog" "github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/sql" - "github.com/testcontainers/testcontainers-go/wait" ) func TestApplierGenerateSqlModeQuery(t *testing.T) { @@ -213,7 +212,6 @@ func (suite *ApplierTestSuite) SetupSuite() { testmysql.WithDatabase(testMysqlDatabase), testmysql.WithUsername(testMysqlUser), testmysql.WithPassword(testMysqlPass), - testcontainers.WithWaitStrategy(wait.ForExposedPort()), testmysql.WithConfigFile("my.cnf.test"), ) suite.Require().NoError(err) @@ -782,23 +780,131 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsWithDuplicateKeyOnNonMigration suite.Require().Error(err) suite.Require().Contains(err.Error(), "Duplicate entry") - // Verify that the ghost table still has only 3 rows (no data loss) - rows, err := suite.db.Query("SELECT * FROM " + getTestGhostTableName() + " ORDER BY id") + // Verify that the ghost table still has only the original 3 rows with correct data (no data loss) + rows, err := suite.db.Query("SELECT id, email FROM " + getTestGhostTableName() + " ORDER BY id") suite.Require().NoError(err) defer rows.Close() - var count int + var results []struct { + id int + email string + } for rows.Next() { var id int var email string err = rows.Scan(&id, &email) suite.Require().NoError(err) - count += 1 + results = append(results, struct { + id int + email string + }{id, email}) + } + suite.Require().NoError(rows.Err()) + + // All 3 original rows should still be present with correct data + suite.Require().Len(results, 3) + suite.Require().Equal(1, results[0].id) + suite.Require().Equal("user1@example.com", results[0].email) + suite.Require().Equal(2, results[1].id) + suite.Require().Equal("user2@example.com", results[1].email) + suite.Require().Equal(3, results[2].id) + suite.Require().Equal("user3@example.com", results[2].email) +} + +func (suite *ApplierTestSuite) TestPanicOnWarningsWithDuplicateCompositeUniqueKey() { + ctx := context.Background() + + var err error + + // Create table with id, email, and username columns + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100), username VARCHAR(100));", getTestTableName())) + suite.Require().NoError(err) + + // Create ghost table with same schema plus a composite unique index on (email, username) + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100), username VARCHAR(100), UNIQUE KEY email_username_unique (email, username));", getTestGhostTableName())) + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + + migrationContext.PanicOnWarnings = true + + migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "email", "username"}) + migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "email", "username"}) + migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "email", "username"}) + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: "PRIMARY", + NameInGhostTable: "PRIMARY", + Columns: *sql.NewColumnList([]string{"id"}), + } + + applier := NewApplier(migrationContext) + suite.Require().NoError(applier.prepareQueries()) + defer applier.Teardown() + + err = applier.InitDBConnections() + suite.Require().NoError(err) + + // Insert initial rows into ghost table (simulating bulk copy phase) + // alice@example.com + bob is ok due to composite unique index + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, email, username) VALUES (1, 'alice@example.com', 'alice'), (2, 'alice@example.com', 'bob'), (3, 'charlie@example.com', 'charlie');", getTestGhostTableName())) + suite.Require().NoError(err) + + // Simulate binlog event: try to insert a row with duplicate composite key (email + username) + // This should fail with a warning because the ghost table has a composite unique index + dmlEvents := []*binlog.BinlogDMLEvent{ + { + DatabaseName: testMysqlDatabase, + TableName: testMysqlTableName, + DML: binlog.InsertDML, + NewColumnValues: sql.ToColumnValues([]interface{}{4, "alice@example.com", "alice"}), // duplicate (email, username) + }, + } + + // This should return an error when PanicOnWarnings is enabled + err = applier.ApplyDMLEventQueries(dmlEvents) + suite.Require().Error(err) + suite.Require().Contains(err.Error(), "Duplicate entry") + + // Verify that the ghost table still has only the original 3 rows with correct data (no data loss) + rows, err := suite.db.Query("SELECT id, email, username FROM " + getTestGhostTableName() + " ORDER BY id") + suite.Require().NoError(err) + defer rows.Close() + + var results []struct { + id int + email string + username string + } + for rows.Next() { + var id int + var email string + var username string + err = rows.Scan(&id, &email, &username) + suite.Require().NoError(err) + results = append(results, struct { + id int + email string + username string + }{id, email, username}) } suite.Require().NoError(rows.Err()) - // All 3 original rows should still be present - suite.Require().Equal(3, count) + // All 3 original rows should still be present with correct data + suite.Require().Len(results, 3) + suite.Require().Equal(1, results[0].id) + suite.Require().Equal("alice@example.com", results[0].email) + suite.Require().Equal("alice", results[0].username) + suite.Require().Equal(2, results[1].id) + suite.Require().Equal("alice@example.com", results[1].email) + suite.Require().Equal("bob", results[1].username) + suite.Require().Equal(3, results[2].id) + suite.Require().Equal("charlie@example.com", results[2].email) + suite.Require().Equal("charlie", results[2].username) } // TestUpdateModifyingUniqueKeyWithDuplicateOnOtherIndex tests the scenario where: @@ -980,6 +1086,287 @@ func (suite *ApplierTestSuite) TestNormalUpdateWithPanicOnWarnings() { suite.Require().NoError(rows.Err()) } +// TestDuplicateOnMigrationKeyAllowedInBinlogReplay tests the positive case where +// a duplicate on the migration unique key during binlog replay is expected and should be allowed +func (suite *ApplierTestSuite) TestDuplicateOnMigrationKeyAllowedInBinlogReplay() { + ctx := context.Background() + + var err error + + // Create table with id and email columns, where id is the primary key + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100));", getTestTableName())) + suite.Require().NoError(err) + + // Create ghost table with same schema plus a new unique index on email + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100), UNIQUE KEY email_unique (email));", getTestGhostTableName())) + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + + migrationContext.PanicOnWarnings = true + + migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "email"}) + migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "email"}) + migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "email"}) + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: "PRIMARY", + NameInGhostTable: "PRIMARY", + Columns: *sql.NewColumnList([]string{"id"}), + } + + applier := NewApplier(migrationContext) + suite.Require().NoError(applier.prepareQueries()) + defer applier.Teardown() + + err = applier.InitDBConnections() + suite.Require().NoError(err) + + // Insert initial rows into ghost table (simulating bulk copy phase) + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, email) VALUES (1, 'alice@example.com'), (2, 'bob@example.com');", getTestGhostTableName())) + suite.Require().NoError(err) + + // Simulate binlog event: try to insert the same row again (duplicate on PRIMARY KEY - the migration key) + // This is expected during binlog replay when a row was already copied during bulk copy + dmlEvents := []*binlog.BinlogDMLEvent{ + { + DatabaseName: testMysqlDatabase, + TableName: testMysqlTableName, + DML: binlog.InsertDML, + NewColumnValues: sql.ToColumnValues([]interface{}{1, "alice@example.com"}), // duplicate PRIMARY KEY + }, + } + + // This should succeed - duplicate on migration unique key is expected and should be filtered out + err = applier.ApplyDMLEventQueries(dmlEvents) + suite.Require().NoError(err) + + // Verify that the ghost table still has only the original 2 rows with correct data + rows, err := suite.db.Query("SELECT id, email FROM " + getTestGhostTableName() + " ORDER BY id") + suite.Require().NoError(err) + defer rows.Close() + + var results []struct { + id int + email string + } + for rows.Next() { + var id int + var email string + err = rows.Scan(&id, &email) + suite.Require().NoError(err) + results = append(results, struct { + id int + email string + }{id, email}) + } + suite.Require().NoError(rows.Err()) + + // Should still have exactly 2 rows with correct data + suite.Require().Len(results, 2) + suite.Require().Equal(1, results[0].id) + suite.Require().Equal("alice@example.com", results[0].email) + suite.Require().Equal(2, results[1].id) + suite.Require().Equal("bob@example.com", results[1].email) +} + +// TestRegexMetacharactersInIndexName tests that index names with regex metacharacters +// are properly escaped. We test with a plus sign in the index name, which without +// QuoteMeta would be treated as a regex quantifier (one or more of 'x' in this case). +// This test verifies the pattern matches ONLY the exact index name, not a regex pattern. +func (suite *ApplierTestSuite) TestRegexMetacharactersInIndexName() { + ctx := context.Background() + + var err error + + // Create tables with an index name containing a plus sign + // Without QuoteMeta, "idx+email" would be treated as a regex pattern where + is a quantifier + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100), UNIQUE KEY `idx+email` (email));", getTestTableName())) + suite.Require().NoError(err) + + // MySQL allows + in index names when quoted + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100), UNIQUE KEY `idx+email` (email));", getTestGhostTableName())) + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + + migrationContext.PanicOnWarnings = true + + migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "email"}) + migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "email"}) + migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "email"}) + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: "idx+email", + NameInGhostTable: "idx+email", + Columns: *sql.NewColumnList([]string{"email"}), + } + + applier := NewApplier(migrationContext) + suite.Require().NoError(applier.prepareQueries()) + defer applier.Teardown() + + err = applier.InitDBConnections() + suite.Require().NoError(err) + + // Insert initial rows + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, email) VALUES (1, 'alice@example.com'), (2, 'bob@example.com');", getTestGhostTableName())) + suite.Require().NoError(err) + + // Test: duplicate on idx+email (the migration key) should be allowed + // This verifies our regex correctly identifies "idx+email" as the migration key + // Without regexp.QuoteMeta, the + would be treated as a regex quantifier and might not match correctly + dmlEvents := []*binlog.BinlogDMLEvent{ + { + DatabaseName: testMysqlDatabase, + TableName: testMysqlTableName, + DML: binlog.InsertDML, + NewColumnValues: sql.ToColumnValues([]interface{}{3, "alice@example.com"}), + }, + } + + err = applier.ApplyDMLEventQueries(dmlEvents) + suite.Require().NoError(err, "Duplicate on idx+email (migration key) should be allowed with PanicOnWarnings enabled") + + // Test: duplicate on PRIMARY (not the migration key) should fail + dmlEvents = []*binlog.BinlogDMLEvent{ + { + DatabaseName: testMysqlDatabase, + TableName: testMysqlTableName, + DML: binlog.InsertDML, + NewColumnValues: sql.ToColumnValues([]interface{}{1, "charlie@example.com"}), + }, + } + + err = applier.ApplyDMLEventQueries(dmlEvents) + suite.Require().Error(err, "Duplicate on PRIMARY (not migration key) should fail with PanicOnWarnings enabled") + suite.Require().Contains(err.Error(), "Duplicate entry") + + // Verify final state - should still have only the original 2 rows + rows, err := suite.db.Query("SELECT id, email FROM " + getTestGhostTableName() + " ORDER BY id") + suite.Require().NoError(err) + defer rows.Close() + + var results []struct { + id int + email string + } + for rows.Next() { + var id int + var email string + err = rows.Scan(&id, &email) + suite.Require().NoError(err) + results = append(results, struct { + id int + email string + }{id, email}) + } + suite.Require().NoError(rows.Err()) + + suite.Require().Len(results, 2) + suite.Require().Equal(1, results[0].id) + suite.Require().Equal("alice@example.com", results[0].email) + suite.Require().Equal(2, results[1].id) + suite.Require().Equal("bob@example.com", results[1].email) +} + +// TestPanicOnWarningsDisabled tests that when PanicOnWarnings is false, +// warnings are not checked and duplicates are silently ignored +func (suite *ApplierTestSuite) TestPanicOnWarningsDisabled() { + ctx := context.Background() + + var err error + + // Create table with id and email columns + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100));", getTestTableName())) + suite.Require().NoError(err) + + // Create ghost table with unique index on email + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100), UNIQUE KEY email_unique (email));", getTestGhostTableName())) + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + + // PanicOnWarnings is false (default) + migrationContext.PanicOnWarnings = false + + migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "email"}) + migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "email"}) + migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "email"}) + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: "PRIMARY", + NameInGhostTable: "PRIMARY", + Columns: *sql.NewColumnList([]string{"id"}), + } + + applier := NewApplier(migrationContext) + suite.Require().NoError(applier.prepareQueries()) + defer applier.Teardown() + + err = applier.InitDBConnections() + suite.Require().NoError(err) + + // Insert initial rows into ghost table + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, email) VALUES (1, 'alice@example.com'), (2, 'bob@example.com');", getTestGhostTableName())) + suite.Require().NoError(err) + + // Simulate binlog event: insert duplicate email on non-migration index + // With PanicOnWarnings disabled, this should succeed (INSERT IGNORE skips it) + dmlEvents := []*binlog.BinlogDMLEvent{ + { + DatabaseName: testMysqlDatabase, + TableName: testMysqlTableName, + DML: binlog.InsertDML, + NewColumnValues: sql.ToColumnValues([]interface{}{3, "alice@example.com"}), // duplicate email + }, + } + + // Should succeed because PanicOnWarnings is disabled + err = applier.ApplyDMLEventQueries(dmlEvents) + suite.Require().NoError(err) + + // Verify that only 2 original rows exist with correct data (the duplicate was silently ignored) + rows, err := suite.db.Query("SELECT id, email FROM " + getTestGhostTableName() + " ORDER BY id") + suite.Require().NoError(err) + defer rows.Close() + + var results []struct { + id int + email string + } + for rows.Next() { + var id int + var email string + err = rows.Scan(&id, &email) + suite.Require().NoError(err) + results = append(results, struct { + id int + email string + }{id, email}) + } + suite.Require().NoError(rows.Err()) + + // Should still have exactly 2 original rows (id=3 was silently ignored) + suite.Require().Len(results, 2) + suite.Require().Equal(1, results[0].id) + suite.Require().Equal("alice@example.com", results[0].email) + suite.Require().Equal(2, results[1].id) + suite.Require().Equal("bob@example.com", results[1].email) +} + func TestApplier(t *testing.T) { suite.Run(t, new(ApplierTestSuite)) } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index ca5f5a729..e3e6d429d 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -124,6 +124,10 @@ func NewMigrator(context *base.MigrationContext, appVersion string) *Migrator { // (or fails with error) func (this *Migrator) sleepWhileTrue(operation func() (bool, error)) error { for { + // Check for abort before continuing + if err := this.checkAbort(); err != nil { + return err + } shouldSleep, err := operation() if err != nil { return err @@ -539,7 +543,12 @@ func (this *Migrator) Migrate() (err error) { if err := this.hooksExecutor.onBeforeRowCopy(); err != nil { return err } - go this.executeWriteFuncs() + go func() { + if err := this.executeWriteFuncs(); err != nil { + // Send error to PanicAbort to trigger abort + _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err) + } + }() go this.iterateChunks() this.migrationContext.MarkRowCopyStartTime() go this.initiateStatus() @@ -679,7 +688,12 @@ func (this *Migrator) Revert() error { this.initiateThrottler() go this.initiateStatus() - go this.executeDMLWriteFuncs() + go func() { + if err := this.executeDMLWriteFuncs(); err != nil { + // Send error to PanicAbort to trigger abort + _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err) + } + }() this.printStatus(ForcePrintStatusRule) var retrier func(func() error, ...bool) error @@ -755,7 +769,7 @@ func (this *Migrator) cutOver() (err error) { this.migrationContext.MarkPointOfInterest() this.migrationContext.Log.Debugf("checking for cut-over postpone") - this.sleepWhileTrue( + if err := this.sleepWhileTrue( func() (bool, error) { heartbeatLag := this.migrationContext.TimeSinceLastHeartbeatOnChangelog() maxLagMillisecondsThrottle := time.Duration(atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)) * time.Millisecond @@ -783,7 +797,9 @@ func (this *Migrator) cutOver() (err error) { } return false, nil }, - ) + ); err != nil { + return err + } atomic.StoreInt64(&this.migrationContext.IsPostponingCutOver, 0) this.migrationContext.MarkPointOfInterest() this.migrationContext.Log.Debugf("checking for cut-over postpone: complete") diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index 42b6fed37..7b02c6b3f 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -32,7 +32,6 @@ import ( "github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/sql" "github.com/testcontainers/testcontainers-go" - "github.com/testcontainers/testcontainers-go/wait" ) func TestMigratorOnChangelogEvent(t *testing.T) { @@ -302,7 +301,6 @@ func (suite *MigratorTestSuite) SetupSuite() { testmysql.WithDatabase(testMysqlDatabase), testmysql.WithUsername(testMysqlUser), testmysql.WithPassword(testMysqlPass), - testcontainers.WithWaitStrategy(wait.ForExposedPort()), testmysql.WithConfigFile("my.cnf.test"), ) suite.Require().NoError(err) diff --git a/go/logic/streamer_test.go b/go/logic/streamer_test.go index 2c5d3886b..8e0b57f80 100644 --- a/go/logic/streamer_test.go +++ b/go/logic/streamer_test.go @@ -13,7 +13,6 @@ import ( "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/modules/mysql" - "github.com/testcontainers/testcontainers-go/wait" "golang.org/x/sync/errgroup" ) @@ -31,7 +30,6 @@ func (suite *EventsStreamerTestSuite) SetupSuite() { mysql.WithDatabase(testMysqlDatabase), mysql.WithUsername(testMysqlUser), mysql.WithPassword(testMysqlPass), - testcontainers.WithWaitStrategy(wait.ForExposedPort()), ) suite.Require().NoError(err) diff --git a/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/create.sql b/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/create.sql new file mode 100644 index 000000000..444092cb2 --- /dev/null +++ b/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/create.sql @@ -0,0 +1,10 @@ +drop table if exists gh_ost_test; +create table gh_ost_test ( + id int auto_increment, + email varchar(100) not null, + primary key (id) +) auto_increment=1; + +insert into gh_ost_test (email) values ('alice@example.com'); +insert into gh_ost_test (email) values ('bob@example.com'); +insert into gh_ost_test (email) values ('charlie@example.com'); diff --git a/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/expect_failure b/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/expect_failure new file mode 100644 index 000000000..a54788d01 --- /dev/null +++ b/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/expect_failure @@ -0,0 +1 @@ +Warnings detected during DML event application diff --git a/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/extra_args b/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/extra_args new file mode 100644 index 000000000..04c41a471 --- /dev/null +++ b/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/extra_args @@ -0,0 +1 @@ +--panic-on-warnings --alter "ADD UNIQUE KEY email_unique (email)" --postpone-cut-over-flag-file=/tmp/gh-ost-test.postpone-cutover diff --git a/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/test.sh b/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/test.sh new file mode 100755 index 000000000..9f33be9a3 --- /dev/null +++ b/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/test.sh @@ -0,0 +1,54 @@ +#!/bin/bash +# Custom test: inject conflicting data AFTER row copy completes +# This tests the DML event application code path, not row copy + +# Create postpone flag file (referenced in extra_args) +postpone_flag_file=/tmp/gh-ost-test.postpone-cutover +touch $postpone_flag_file + +# Build gh-ost command using framework function +build_ghost_command + +# Run in background +echo_dot +# Clear log file before starting gh-ost +echo > $test_logfile +bash -c "$cmd" >>$test_logfile 2>&1 & +ghost_pid=$! + +# Wait for row copy to complete +echo_dot +row_copy_complete=false +for i in {1..30}; do + if grep -q "Row copy complete" $test_logfile; then + row_copy_complete=true + break + fi + ps -p $ghost_pid > /dev/null || { echo; echo "ERROR gh-ost exited early"; rm -f $postpone_flag_file; return 1; } + sleep 1; echo_dot +done + +if ! $row_copy_complete; then + echo; echo "ERROR row copy did not complete within expected time" + rm -f $postpone_flag_file + return 1 +fi + +# Inject conflicting SQL after row copy (UPDATE with PK change creates DELETE+INSERT in binlog) +echo_dot +gh-ost-test-mysql-master test -e "update gh_ost_test set id = 200, email = 'alice@example.com' where id = 2" + +# Wait for binlog event to replicate and be applied +sleep 10; echo_dot + +# Complete cutover by removing postpone flag +rm -f $postpone_flag_file + +# Wait for gh-ost to complete +wait $ghost_pid +execution_result=$? +rm -f $postpone_flag_file + +# Validate using framework function +validate_expected_failure +return $? diff --git a/localtests/test.sh b/localtests/test.sh index 404eeece3..d918d473b 100755 --- a/localtests/test.sh +++ b/localtests/test.sh @@ -30,6 +30,8 @@ replica_port= original_sql_mode= current_gtid_mode= sysbench_pid= +test_timeout=120 +test_failure_log_tail_lines=50 OPTIND=1 while getopts "b:s:dtg" OPTION; do @@ -107,12 +109,6 @@ verify_master_and_replica() { fi } -exec_cmd() { - echo "$@" - command "$@" 1>$test_logfile 2>&1 - return $? -} - echo_dot() { echo -n "." } @@ -141,6 +137,79 @@ start_replication() { done } +build_ghost_command() { + # Build gh-ost command with all standard options + # Expects: ghost_binary, replica_host, replica_port, master_host, master_port, + # table_name, storage_engine, throttle_flag_file, extra_args + cmd="GOTRACEBACK=crash $ghost_binary \ + --user=gh-ost \ + --password=gh-ost \ + --host=$replica_host \ + --port=$replica_port \ + --assume-master-host=${master_host}:${master_port} \ + --database=test \ + --table=${table_name} \ + --storage-engine=${storage_engine} \ + --alter='engine=${storage_engine}' \ + --exact-rowcount \ + --assume-rbr \ + --skip-metadata-lock-check \ + --initially-drop-old-table \ + --initially-drop-ghost-table \ + --throttle-query='select timestampdiff(second, min(last_update), now()) < 5 from _${table_name}_ghc' \ + --throttle-flag-file=$throttle_flag_file \ + --serve-socket-file=/tmp/gh-ost.test.sock \ + --initially-drop-socket-file \ + --test-on-replica \ + --default-retries=3 \ + --chunk-size=10 \ + --verbose \ + --debug \ + --stack \ + --checkpoint \ + --execute ${extra_args[@]}" +} + +print_log_excerpt() { + echo "=== Last $test_failure_log_tail_lines lines of $test_logfile ===" + tail -n $test_failure_log_tail_lines $test_logfile + echo "=== End log excerpt ===" +} + +validate_expected_failure() { + # Check if test expected to fail and validate error message + # Expects: tests_path, test_name, execution_result, test_logfile + if [ -f $tests_path/$test_name/expect_failure ]; then + if [ $execution_result -eq 0 ]; then + echo + echo "ERROR $test_name execution was expected to exit on error but did not." + print_log_excerpt + return 1 + fi + if [ -s $tests_path/$test_name/expect_failure ]; then + # 'expect_failure' file has content. We expect to find this content in the log. + expected_error_message="$(cat $tests_path/$test_name/expect_failure)" + if grep -q "$expected_error_message" $test_logfile; then + return 0 + fi + echo + echo "ERROR $test_name execution was expected to exit with error message '${expected_error_message}' but did not." + print_log_excerpt + return 1 + fi + # 'expect_failure' file has no content. We generally agree that the failure is correct + return 0 + fi + + if [ $execution_result -ne 0 ]; then + echo + echo "ERROR $test_name execution failure. cat $test_logfile:" + cat $test_logfile + return 1 + fi + return 0 +} + sysbench_prepare() { local mysql_host="$1" local mysql_port="$2" @@ -225,7 +294,7 @@ test_single() { cat $tests_path/$test_name/create.sql return 1 fi - + if [ -f $tests_path/$test_name/before.sql ]; then gh-ost-test-mysql-master --default-character-set=utf8mb4 test < $tests_path/$test_name/before.sql gh-ost-test-mysql-replica --default-character-set=utf8mb4 test < $tests_path/$test_name/before.sql @@ -259,6 +328,37 @@ test_single() { table_name="gh_ost_test" ghost_table_name="_gh_ost_test_gho" + + # Check for custom test script + if [ -f $tests_path/$test_name/test.sh ]; then + # Run the custom test script in a subshell with timeout monitoring + # The subshell inherits all functions and variables from the current shell + (source $tests_path/$test_name/test.sh) & + test_pid=$! + + # Monitor the test with timeout + timeout_counter=0 + while kill -0 $test_pid 2>/dev/null; do + if [ $timeout_counter -ge $test_timeout ]; then + kill -TERM $test_pid 2>/dev/null + sleep 1 + kill -KILL $test_pid 2>/dev/null + wait $test_pid 2>/dev/null + echo + echo "ERROR $test_name execution timed out" + print_log_excerpt + return 1 + fi + sleep 1 + ((timeout_counter++)) + done + + # Get the exit code + wait $test_pid 2>/dev/null + execution_result=$? + return $execution_result + fi + # test with sysbench oltp write load if [[ "$test_name" == "sysbench" ]]; then if ! command -v sysbench &>/dev/null; then @@ -279,42 +379,24 @@ test_single() { fi trap cleanup SIGINT - # - cmd="GOTRACEBACK=crash $ghost_binary \ - --user=gh-ost \ - --password=gh-ost \ - --host=$replica_host \ - --port=$replica_port \ - --assume-master-host=${master_host}:${master_port} - --database=test \ - --table=${table_name} \ - --storage-engine=${storage_engine} \ - --alter='engine=${storage_engine}' \ - --exact-rowcount \ - --assume-rbr \ - --skip-metadata-lock-check \ - --initially-drop-old-table \ - --initially-drop-ghost-table \ - --throttle-query='select timestampdiff(second, min(last_update), now()) < 5 from _${table_name}_ghc' \ - --throttle-flag-file=$throttle_flag_file \ - --serve-socket-file=/tmp/gh-ost.test.sock \ - --initially-drop-socket-file \ - --test-on-replica \ - --default-retries=3 \ - --chunk-size=10 \ - --verbose \ - --debug \ - --stack \ - --checkpoint \ - --execute ${extra_args[@]}" + # Build and execute gh-ost command + build_ghost_command echo_dot echo $cmd >$exec_command_file echo_dot - bash $exec_command_file 1>$test_logfile 2>&1 + timeout $test_timeout bash $exec_command_file >$test_logfile 2>&1 execution_result=$? cleanup + # Check for timeout (exit code 124) + if [ $execution_result -eq 124 ]; then + echo + echo "ERROR $test_name execution timed out" + print_log_excerpt + return 1 + fi + if [ -f $tests_path/$test_name/sql_mode ]; then gh-ost-test-mysql-master --default-character-set=utf8mb4 test -e "set @@global.sql_mode='${original_sql_mode}'" gh-ost-test-mysql-replica --default-character-set=utf8mb4 test -e "set @@global.sql_mode='${original_sql_mode}'" @@ -329,33 +411,17 @@ test_single() { gh-ost-test-mysql-master --default-character-set=utf8mb4 test <$tests_path/$test_name/destroy.sql fi - if [ -f $tests_path/$test_name/expect_failure ]; then - if [ $execution_result -eq 0 ]; then - echo - echo "ERROR $test_name execution was expected to exit on error but did not. cat $test_logfile" - return 1 - fi - if [ -s $tests_path/$test_name/expect_failure ]; then - # 'expect_failure' file has content. We expect to find this content in the log. - expected_error_message="$(cat $tests_path/$test_name/expect_failure)" - if grep -q "$expected_error_message" $test_logfile; then - return 0 - fi - echo - echo "ERROR $test_name execution was expected to exit with error message '${expected_error_message}' but did not. cat $test_logfile" - return 1 - fi - # 'expect_failure' file has no content. We generally agree that the failure is correct - return 0 + # Validate expected failure or success + if ! validate_expected_failure; then + return 1 fi - if [ $execution_result -ne 0 ]; then - echo - echo "ERROR $test_name execution failure. cat $test_logfile:" - cat $test_logfile - return 1 + # If this was an expected failure test, we're done (no need to validate structure/checksums) + if [ -f $tests_path/$test_name/expect_failure ]; then + return 0 fi + # Test succeeded - now validate structure and checksums gh-ost-test-mysql-replica --default-character-set=utf8mb4 test -e "show create table ${ghost_table_name}\G" -ss >$ghost_structure_output_file if [ -f $tests_path/$test_name/expect_table_structure ]; then @@ -409,14 +475,19 @@ test_all() { test_dirs=$(find "$tests_path" -mindepth 1 -maxdepth 1 ! -path . -type d | grep "$test_pattern" | sort) while read -r test_dir; do test_name=$(basename "$test_dir") + local test_start_time=$(date +%s) if ! test_single "$test_name"; then + local test_end_time=$(date +%s) + local test_duration=$((test_end_time - test_start_time)) create_statement=$(gh-ost-test-mysql-replica test -t -e "show create table ${ghost_table_name} \G") echo "$create_statement" >>$test_logfile - echo "+ FAIL" + echo "+ FAIL (${test_duration}s)" return 1 else + local test_end_time=$(date +%s) + local test_duration=$((test_end_time - test_start_time)) echo - echo "+ pass" + echo "+ pass (${test_duration}s)" fi mysql_version="$(gh-ost-test-mysql-replica -e "select @@version")" replica_terminology="slave" From b9652c33674625f12ee1b6ee2c83d94726633051 Mon Sep 17 00:00:00 2001 From: yosefbs Date: Wed, 18 Mar 2026 23:51:31 +0200 Subject: [PATCH 3/6] Add retry logic for instant DDL on lock wait timeout (#1651) * Add retry logic for instant DDL on lock wait timeout When attempting instant DDL, a lock wait timeout (errno 1205) may occur if a long-running transaction holds a metadata lock. Rather than failing immediately, retry the operation up to 5 times with linear backoff. Non-timeout errors (e.g. ALGORITHM=INSTANT not supported) still return immediately without retrying. * Fix int-to-Duration type mismatch in retry backoff --------- Co-authored-by: ybs-me --- go/logic/applier.go | 27 +++++++++++++++- go/logic/applier_test.go | 68 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 1 deletion(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index ec2d7be10..179d75302 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -305,7 +305,32 @@ func (this *Applier) AttemptInstantDDL() error { return err } // We don't need a trx, because for instant DDL the SQL mode doesn't matter. - _, err := this.db.Exec(query) + return retryOnLockWaitTimeout(func() error { + _, err := this.db.Exec(query) + return err + }, this.migrationContext.Log) +} + +// retryOnLockWaitTimeout retries the given operation on MySQL lock wait timeout +// (errno 1205). Non-timeout errors return immediately. This is used for instant +// DDL attempts where the operation may be blocked by a long-running transaction. +func retryOnLockWaitTimeout(operation func() error, logger base.Logger) error { + const maxRetries = 5 + var err error + for i := 0; i < maxRetries; i++ { + if i != 0 { + logger.Infof("Retrying after lock wait timeout (attempt %d/%d)", i+1, maxRetries) + RetrySleepFn(time.Duration(i) * 5 * time.Second) + } + err = operation() + if err == nil { + return nil + } + var mysqlErr *drivermysql.MySQLError + if !errors.As(err, &mysqlErr) || mysqlErr.Number != 1205 { + return err + } + } return err } diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 6b02e0c02..fcef5563b 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -8,9 +8,12 @@ package logic import ( "context" gosql "database/sql" + "errors" "strings" "testing" + "time" + drivermysql "github.com/go-sql-driver/mysql" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -198,6 +201,71 @@ func TestApplierInstantDDL(t *testing.T) { }) } +func TestRetryOnLockWaitTimeout(t *testing.T) { + oldRetrySleepFn := RetrySleepFn + defer func() { RetrySleepFn = oldRetrySleepFn }() + RetrySleepFn = func(d time.Duration) {} // no-op for tests + + logger := base.NewMigrationContext().Log + + lockWaitTimeoutErr := &drivermysql.MySQLError{Number: 1205, Message: "Lock wait timeout exceeded"} + nonRetryableErr := &drivermysql.MySQLError{Number: 1845, Message: "ALGORITHM=INSTANT is not supported"} + + t.Run("success on first attempt", func(t *testing.T) { + calls := 0 + err := retryOnLockWaitTimeout(func() error { + calls++ + return nil + }, logger) + require.NoError(t, err) + require.Equal(t, 1, calls) + }) + + t.Run("retry on lock wait timeout then succeed", func(t *testing.T) { + calls := 0 + err := retryOnLockWaitTimeout(func() error { + calls++ + if calls < 3 { + return lockWaitTimeoutErr + } + return nil + }, logger) + require.NoError(t, err) + require.Equal(t, 3, calls) + }) + + t.Run("non-retryable error returns immediately", func(t *testing.T) { + calls := 0 + err := retryOnLockWaitTimeout(func() error { + calls++ + return nonRetryableErr + }, logger) + require.ErrorIs(t, err, nonRetryableErr) + require.Equal(t, 1, calls) + }) + + t.Run("non-mysql error returns immediately", func(t *testing.T) { + calls := 0 + genericErr := errors.New("connection refused") + err := retryOnLockWaitTimeout(func() error { + calls++ + return genericErr + }, logger) + require.ErrorIs(t, err, genericErr) + require.Equal(t, 1, calls) + }) + + t.Run("exhausts all retries", func(t *testing.T) { + calls := 0 + err := retryOnLockWaitTimeout(func() error { + calls++ + return lockWaitTimeoutErr + }, logger) + require.ErrorIs(t, err, lockWaitTimeoutErr) + require.Equal(t, 5, calls) + }) +} + type ApplierTestSuite struct { suite.Suite From 0d5c7370a74e7e9d85b98e32d813d07b043ab8d0 Mon Sep 17 00:00:00 2001 From: Jan Grodowski Date: Tue, 24 Mar 2026 22:21:05 +0100 Subject: [PATCH 4/6] Fix local tests by making .gopath writable to avoid toolchain rm permission errors (#1644) --- script/bootstrap | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/script/bootstrap b/script/bootstrap index 573313a75..96bf45675 100755 --- a/script/bootstrap +++ b/script/bootstrap @@ -11,7 +11,9 @@ set -e # up so it points back to us and go is none the wiser set -x -rm -rf .gopath -mkdir -p .gopath/src/github.com/github -ln -s "$PWD" .gopath/src/github.com/github/gh-ost +if [ ! -L .gopath/src/github.com/github/gh-ost ]; then + rm -rf .gopath + mkdir -p .gopath/src/github.com/github + ln -s "$PWD" .gopath/src/github.com/github/gh-ost +fi export GOPATH=$PWD/.gopath:$GOPATH From 8f274f7196ce75373ae72dc4b9b329d5e243f1ad Mon Sep 17 00:00:00 2001 From: Gabriel Gilder Date: Fri, 27 Mar 2026 09:59:44 -0700 Subject: [PATCH 5/6] Fix handling of warnings on DML batches (#1643) * Handle warnings in middle of DML batch * Add integration test for batch warnings * Update expected failure message for update-pk test --------- Co-authored-by: meiji163 --- go/cmd/gh-ost/main.go | 3 + go/logic/applier.go | 209 ++++++++++++------ go/logic/applier_test.go | 110 +++++++++ .../panic-on-warnings-batch-middle/create.sql | 11 + .../expect_failure | 1 + .../panic-on-warnings-batch-middle/extra_args | 1 + .../panic-on-warnings-batch-middle/test.sh | 57 +++++ .../expect_failure | 2 +- 8 files changed, 324 insertions(+), 70 deletions(-) create mode 100644 localtests/panic-on-warnings-batch-middle/create.sql create mode 100644 localtests/panic-on-warnings-batch-middle/expect_failure create mode 100644 localtests/panic-on-warnings-batch-middle/extra_args create mode 100755 localtests/panic-on-warnings-batch-middle/test.sh diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index fae519680..567137fd5 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -316,6 +316,9 @@ func main() { if migrationContext.CheckpointIntervalSeconds < 10 { migrationContext.Log.Fatalf("--checkpoint-seconds should be >=10") } + if migrationContext.CountTableRows && migrationContext.PanicOnWarnings { + migrationContext.Log.Warning("--exact-rowcount with --panic-on-warnings: row counts cannot be exact due to warning detection") + } switch *cutOver { case "atomic", "default", "": diff --git a/go/logic/applier.go b/go/logic/applier.go index 179d75302..709fd08da 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -1522,6 +1522,107 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlB return []*dmlBuildResult{newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML))} } +// executeBatchWithWarningChecking executes a batch of DML statements with SHOW WARNINGS +// interleaved after each statement to detect warnings from any statement in the batch. +// This is used when PanicOnWarnings is enabled to ensure warnings from middle statements +// are not lost (SHOW WARNINGS only shows warnings from the last statement in a multi-statement batch). +func (this *Applier) executeBatchWithWarningChecking(ctx context.Context, tx *gosql.Tx, buildResults []*dmlBuildResult) (int64, error) { + // Build query with interleaved SHOW WARNINGS: stmt1; SHOW WARNINGS; stmt2; SHOW WARNINGS; ... + var queryBuilder strings.Builder + args := make([]interface{}, 0) + + for _, buildResult := range buildResults { + queryBuilder.WriteString(buildResult.query) + queryBuilder.WriteString(";\nSHOW WARNINGS;\n") + args = append(args, buildResult.args...) + } + + query := queryBuilder.String() + + // Execute the multi-statement query + rows, err := tx.QueryContext(ctx, query, args...) + if err != nil { + return 0, fmt.Errorf("%w; query=%s; args=%+v", err, query, args) + } + defer rows.Close() + + var totalDelta int64 + + // QueryContext with multi-statement queries returns rows positioned at the first result set + // that produces rows (i.e., the first SHOW WARNINGS), automatically skipping DML results. + // Verify we're at a SHOW WARNINGS result set (should have 3 columns: Level, Code, Message) + cols, err := rows.Columns() + if err != nil { + return 0, fmt.Errorf("failed to get columns: %w", err) + } + + // If somehow we're not at a result set with columns, try to advance + if len(cols) == 0 { + if !rows.NextResultSet() { + return 0, fmt.Errorf("expected SHOW WARNINGS result set after first statement") + } + } + + // Compile regex once before loop to avoid performance penalty and handle errors properly + migrationKeyRegex, err := this.compileMigrationKeyWarningRegex() + if err != nil { + return 0, err + } + + // Iterate through SHOW WARNINGS result sets. + // DML statements don't create navigable result sets, so we move directly between SHOW WARNINGS. + // Pattern: [at SHOW WARNINGS #1] -> read warnings -> NextResultSet() -> [at SHOW WARNINGS #2] -> ... + for i := 0; i < len(buildResults); i++ { + // We can't get exact rows affected with QueryContext (needed for reading SHOW WARNINGS). + // Use the theoretical delta (+1 for INSERT, -1 for DELETE, 0 for UPDATE) as an approximation. + // This may be inaccurate (e.g., INSERT IGNORE with duplicate affects 0 rows but we count +1). + totalDelta += buildResults[i].rowsDelta + + // Read warnings from this statement's SHOW WARNINGS result set + var sqlWarnings []string + for rows.Next() { + var level, message string + var code int + if err := rows.Scan(&level, &code, &message); err != nil { + // Scan failure means we cannot reliably read warnings. + // Since PanicOnWarnings is a safety feature, we must fail hard rather than silently skip. + return 0, fmt.Errorf("failed to scan SHOW WARNINGS for statement %d: %w", i+1, err) + } + + if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) { + // Duplicate entry on migration unique key is expected during binlog replay + // (row was already copied during bulk copy phase) + continue + } + sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code)) + } + + // Check for errors that occurred while iterating through warnings + if err := rows.Err(); err != nil { + return 0, fmt.Errorf("error reading SHOW WARNINGS result set for statement %d: %w", i+1, err) + } + + if len(sqlWarnings) > 0 { + return 0, fmt.Errorf("warnings detected in statement %d of %d: %v", i+1, len(buildResults), sqlWarnings) + } + + // Move to the next statement's SHOW WARNINGS result set + // For the last statement, there's no next result set + // DML statements don't create result sets, so we only need one NextResultSet call + // to move from SHOW WARNINGS #N to SHOW WARNINGS #(N+1) + if i < len(buildResults)-1 { + if !rows.NextResultSet() { + if err := rows.Err(); err != nil { + return 0, fmt.Errorf("error moving to SHOW WARNINGS for statement %d: %w", i+2, err) + } + return 0, fmt.Errorf("expected SHOW WARNINGS result set for statement %d", i+2) + } + } + } + + return totalDelta, nil +} + // ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error { var totalDelta int64 @@ -1561,82 +1662,52 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) } } - // We batch together the DML queries into multi-statements to minimize network trips. - // We have to use the raw driver connection to access the rows affected - // for each statement in the multi-statement. - execErr := conn.Raw(func(driverConn any) error { - ex := driverConn.(driver.ExecerContext) - nvc := driverConn.(driver.NamedValueChecker) - - multiArgs := make([]driver.NamedValue, 0, nArgs) - multiQueryBuilder := strings.Builder{} - for _, buildResult := range buildResults { - for _, arg := range buildResult.args { - nv := driver.NamedValue{Value: driver.Value(arg)} - nvc.CheckNamedValue(&nv) - multiArgs = append(multiArgs, nv) - } - - multiQueryBuilder.WriteString(buildResult.query) - multiQueryBuilder.WriteString(";\n") - } - - res, err := ex.ExecContext(ctx, multiQueryBuilder.String(), multiArgs) - if err != nil { - err = fmt.Errorf("%w; query=%s; args=%+v", err, multiQueryBuilder.String(), multiArgs) - return err - } - - mysqlRes := res.(drivermysql.Result) - - // each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1). - // multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event - for i, rowsAffected := range mysqlRes.AllRowsAffected() { - totalDelta += buildResults[i].rowsDelta * rowsAffected - } - return nil - }) - - if execErr != nil { - return rollback(execErr) - } - - // Check for warnings when PanicOnWarnings is enabled + // When PanicOnWarnings is enabled, we need to check warnings after each statement + // in the batch. SHOW WARNINGS only shows warnings from the last statement in a + // multi-statement query, so we interleave SHOW WARNINGS after each DML statement. if this.migrationContext.PanicOnWarnings { - //nolint:execinquery - rows, err := tx.Query("SHOW WARNINGS") - if err != nil { - return rollback(err) - } - defer rows.Close() - if err = rows.Err(); err != nil { - return rollback(err) - } - - // Compile regex once before loop to avoid performance penalty and handle errors properly - migrationKeyRegex, err := this.compileMigrationKeyWarningRegex() + totalDelta, err = this.executeBatchWithWarningChecking(ctx, tx, buildResults) if err != nil { return rollback(err) } + } else { + // Fast path: batch together DML queries into multi-statements to minimize network trips. + // We use the raw driver connection to access the rows affected for each statement. + execErr := conn.Raw(func(driverConn any) error { + ex := driverConn.(driver.ExecerContext) + nvc := driverConn.(driver.NamedValueChecker) + + multiArgs := make([]driver.NamedValue, 0, nArgs) + multiQueryBuilder := strings.Builder{} + for _, buildResult := range buildResults { + for _, arg := range buildResult.args { + nv := driver.NamedValue{Value: driver.Value(arg)} + nvc.CheckNamedValue(&nv) + multiArgs = append(multiArgs, nv) + } + + multiQueryBuilder.WriteString(buildResult.query) + multiQueryBuilder.WriteString(";\n") + } - var sqlWarnings []string - for rows.Next() { - var level, message string - var code int - if err := rows.Scan(&level, &code, &message); err != nil { - this.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row") - continue + res, err := ex.ExecContext(ctx, multiQueryBuilder.String(), multiArgs) + if err != nil { + err = fmt.Errorf("%w; query=%s; args=%+v", err, multiQueryBuilder.String(), multiArgs) + return err } - if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) { - // Duplicate entry on migration unique key is expected during binlog replay - // (row was already copied during bulk copy phase) - continue + + mysqlRes := res.(drivermysql.Result) + + // each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1). + // multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event + for i, rowsAffected := range mysqlRes.AllRowsAffected() { + totalDelta += buildResults[i].rowsDelta * rowsAffected } - sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code)) - } - if len(sqlWarnings) > 0 { - warningMsg := fmt.Sprintf("Warnings detected during DML event application: %v", sqlWarnings) - return rollback(errors.New(warningMsg)) + return nil + }) + + if execErr != nil { + return rollback(execErr) } } diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index fcef5563b..a9ecb889d 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -1435,6 +1435,116 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsDisabled() { suite.Require().Equal("bob@example.com", results[1].email) } +// TestMultipleDMLEventsInBatch tests that multiple DML events are processed in a single transaction +// and that if one fails due to a warning, the entire batch is rolled back - including events that +// come AFTER the failure. This proves true transaction atomicity. +func (suite *ApplierTestSuite) TestMultipleDMLEventsInBatch() { + ctx := context.Background() + + var err error + + // Create table with id and email columns + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100));", getTestTableName())) + suite.Require().NoError(err) + + // Create ghost table with unique index on email + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100), UNIQUE KEY email_unique (email));", getTestGhostTableName())) + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + + migrationContext.PanicOnWarnings = true + + migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "email"}) + migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "email"}) + migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "email"}) + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: "PRIMARY", + NameInGhostTable: "PRIMARY", + Columns: *sql.NewColumnList([]string{"id"}), + } + + applier := NewApplier(migrationContext) + suite.Require().NoError(applier.prepareQueries()) + defer applier.Teardown() + + err = applier.InitDBConnections() + suite.Require().NoError(err) + + // Insert initial rows into ghost table + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, email) VALUES (1, 'alice@example.com'), (3, 'charlie@example.com');", getTestGhostTableName())) + suite.Require().NoError(err) + + // Simulate multiple binlog events in a batch: + // 1. Duplicate on PRIMARY KEY (allowed - expected during binlog replay) + // 2. Duplicate on email index (should fail) ← FAILURE IN MIDDLE + // 3. Valid insert (would succeed) ← SUCCESS AFTER FAILURE + // + // The critical test: Even though event #3 would succeed on its own, it must be rolled back + // because event #2 failed. This proves the entire batch is truly atomic. + dmlEvents := []*binlog.BinlogDMLEvent{ + { + DatabaseName: testMysqlDatabase, + TableName: testMysqlTableName, + DML: binlog.InsertDML, + NewColumnValues: sql.ToColumnValues([]interface{}{1, "alice@example.com"}), // duplicate PRIMARY (normally allowed) + }, + { + DatabaseName: testMysqlDatabase, + TableName: testMysqlTableName, + DML: binlog.InsertDML, + NewColumnValues: sql.ToColumnValues([]interface{}{4, "alice@example.com"}), // duplicate email (FAILS) + }, + { + DatabaseName: testMysqlDatabase, + TableName: testMysqlTableName, + DML: binlog.InsertDML, + NewColumnValues: sql.ToColumnValues([]interface{}{2, "bob@example.com"}), // valid insert (would succeed) + }, + } + + // Should fail due to the second event + err = applier.ApplyDMLEventQueries(dmlEvents) + suite.Require().Error(err) + suite.Require().Contains(err.Error(), "Duplicate entry") + + // Verify that the entire batch was rolled back - still only the original 2 rows + // Critically: id=2 (bob@example.com) from event #3 should NOT be present + rows, err := suite.db.Query("SELECT id, email FROM " + getTestGhostTableName() + " ORDER BY id") + suite.Require().NoError(err) + defer rows.Close() + + var results []struct { + id int + email string + } + for rows.Next() { + var id int + var email string + err = rows.Scan(&id, &email) + suite.Require().NoError(err) + results = append(results, struct { + id int + email string + }{id, email}) + } + suite.Require().NoError(rows.Err()) + + // Should still have exactly 2 original rows (entire batch was rolled back) + // This proves that even event #3 (which would have succeeded) was rolled back + suite.Require().Len(results, 2) + suite.Require().Equal(1, results[0].id) + suite.Require().Equal("alice@example.com", results[0].email) + suite.Require().Equal(3, results[1].id) + suite.Require().Equal("charlie@example.com", results[1].email) + // Critically: id=2 (bob@example.com) is NOT present, proving event #3 was rolled back +} + func TestApplier(t *testing.T) { suite.Run(t, new(ApplierTestSuite)) } diff --git a/localtests/panic-on-warnings-batch-middle/create.sql b/localtests/panic-on-warnings-batch-middle/create.sql new file mode 100644 index 000000000..e4883ca66 --- /dev/null +++ b/localtests/panic-on-warnings-batch-middle/create.sql @@ -0,0 +1,11 @@ +drop table if exists gh_ost_test; +create table gh_ost_test ( + id int auto_increment, + email varchar(255) not null, + primary key (id) +) auto_increment=1; + +-- Insert initial data - all unique emails +insert into gh_ost_test (email) values ('alice@example.com'); +insert into gh_ost_test (email) values ('bob@example.com'); +insert into gh_ost_test (email) values ('charlie@example.com'); diff --git a/localtests/panic-on-warnings-batch-middle/expect_failure b/localtests/panic-on-warnings-batch-middle/expect_failure new file mode 100644 index 000000000..11800efb0 --- /dev/null +++ b/localtests/panic-on-warnings-batch-middle/expect_failure @@ -0,0 +1 @@ +ERROR warnings detected in statement 1 of 2 diff --git a/localtests/panic-on-warnings-batch-middle/extra_args b/localtests/panic-on-warnings-batch-middle/extra_args new file mode 100644 index 000000000..55cdcd0e8 --- /dev/null +++ b/localtests/panic-on-warnings-batch-middle/extra_args @@ -0,0 +1 @@ +--default-retries=1 --panic-on-warnings --alter "add unique index email_idx(email)" --postpone-cut-over-flag-file=/tmp/gh-ost-test.postpone-cutover diff --git a/localtests/panic-on-warnings-batch-middle/test.sh b/localtests/panic-on-warnings-batch-middle/test.sh new file mode 100755 index 000000000..cbe3829c7 --- /dev/null +++ b/localtests/panic-on-warnings-batch-middle/test.sh @@ -0,0 +1,57 @@ +#!/bin/bash +# Custom test: inject batched DML events AFTER row copy completes +# Tests that warnings in the middle of a DML batch are detected + +# Create postpone flag file (referenced in extra_args) +postpone_flag_file=/tmp/gh-ost-test.postpone-cutover +touch $postpone_flag_file + +# Set table names (required by build_ghost_command) +table_name="gh_ost_test" +ghost_table_name="_gh_ost_test_gho" + +# Build gh-ost command using framework function +build_ghost_command + +# Run in background +echo_dot +echo > $test_logfile +bash -c "$cmd" >>$test_logfile 2>&1 & +ghost_pid=$! + +# Wait for row copy to complete +echo_dot +for i in {1..30}; do + grep -q "Row copy complete" $test_logfile && break + ps -p $ghost_pid > /dev/null || { echo; echo "ERROR gh-ost exited early"; rm -f $postpone_flag_file; return 1; } + sleep 1; echo_dot +done + +# Inject batched DML events that will create warnings +# These must be in a single transaction to be batched during binlog replay +echo_dot +gh-ost-test-mysql-master test << 'EOF' +BEGIN; +-- INSERT with duplicate PRIMARY KEY - warning on migration key (filtered by gh-ost) +INSERT IGNORE INTO gh_ost_test (id, email) VALUES (1, 'duplicate_pk@example.com'); +-- INSERT with duplicate email - warning on unique index (should trigger failure) +INSERT IGNORE INTO gh_ost_test (email) VALUES ('alice@example.com'); +-- INSERT with unique data - would succeed if not for previous warning +INSERT IGNORE INTO gh_ost_test (email) VALUES ('new@example.com'); +COMMIT; +EOF + +# Wait for binlog events to replicate and be applied +sleep 10; echo_dot + +# Complete cutover by removing postpone flag +rm -f $postpone_flag_file + +# Wait for gh-ost to complete +wait $ghost_pid +execution_result=$? +rm -f $postpone_flag_file + +# Validate using framework function +validate_expected_failure +return $? diff --git a/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/expect_failure b/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/expect_failure index a54788d01..fb8dc562a 100644 --- a/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/expect_failure +++ b/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/expect_failure @@ -1 +1 @@ -Warnings detected during DML event application +ERROR warnings detected in statement 1 of 1 From 8bc63f089aa82a373d97407ac815178e38890b72 Mon Sep 17 00:00:00 2001 From: Gabriel Gilder Date: Thu, 2 Apr 2026 12:22:45 -0700 Subject: [PATCH 6/6] Fix abort/retry interaction (#1655) * Add failing test for retry + abort issue * Fix retry after abort issue * Skip retries for warning errors Warning errors indicate data consistency issues that won't resolve on retry, so attempting to retry them is futile and causes unnecessary delays. This change detects warning errors early and aborts immediately instead of retrying. * Fix test expectation --- go/logic/migrator.go | 22 ++ go/logic/migrator_test.go | 211 ++++++++++++++++++ .../expect_failure | 2 +- 3 files changed, 234 insertions(+), 1 deletion(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index e3e6d429d..f32d859b8 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -160,10 +160,21 @@ func (this *Migrator) retryOperation(operation func() error, notFatalHint ...boo // sleep after previous iteration RetrySleepFn(1 * time.Second) } + // Check for abort/context cancellation before each retry + if abortErr := this.checkAbort(); abortErr != nil { + return abortErr + } err = operation() if err == nil { return nil } + // Check if this is an unrecoverable error (data consistency issues won't resolve on retry) + if strings.Contains(err.Error(), "warnings detected") { + if len(notFatalHint) == 0 { + _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err) + } + return err + } // there's an error. Let's try again. } if len(notFatalHint) == 0 { @@ -190,10 +201,21 @@ func (this *Migrator) retryOperationWithExponentialBackoff(operation func() erro if i != 0 { RetrySleepFn(time.Duration(interval) * time.Second) } + // Check for abort/context cancellation before each retry + if abortErr := this.checkAbort(); abortErr != nil { + return abortErr + } err = operation() if err == nil { return nil } + // Check if this is an unrecoverable error (data consistency issues won't resolve on retry) + if strings.Contains(err.Error(), "warnings detected") { + if len(notFatalHint) == 0 { + _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err) + } + return err + } } if len(notFatalHint) == 0 { // Use helper to prevent deadlock if listenOnPanicAbort already exited diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index 7b02c6b3f..f731035e1 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -714,6 +714,118 @@ func TestMigratorRetryWithExponentialBackoff(t *testing.T) { assert.Equal(t, tries, 100) } +func TestMigratorRetryAbortsOnContextCancellation(t *testing.T) { + oldRetrySleepFn := RetrySleepFn + defer func() { RetrySleepFn = oldRetrySleepFn }() + + migrationContext := base.NewMigrationContext() + migrationContext.SetDefaultNumRetries(100) + migrator := NewMigrator(migrationContext, "1.2.3") + + RetrySleepFn = func(duration time.Duration) { + // No sleep needed for this test + } + + var tries = 0 + retryable := func() error { + tries++ + if tries == 5 { + // Cancel context on 5th try + migrationContext.CancelContext() + } + return errors.New("Simulated error") + } + + result := migrator.retryOperation(retryable, false) + assert.Error(t, result) + // Should abort after 6 tries: 5 failures + 1 checkAbort detection + assert.True(t, tries <= 6, "Expected tries <= 6, got %d", tries) + // Verify we got context cancellation error + assert.Contains(t, result.Error(), "context canceled") +} + +func TestMigratorRetryWithExponentialBackoffAbortsOnContextCancellation(t *testing.T) { + oldRetrySleepFn := RetrySleepFn + defer func() { RetrySleepFn = oldRetrySleepFn }() + + migrationContext := base.NewMigrationContext() + migrationContext.SetDefaultNumRetries(100) + migrationContext.SetExponentialBackoffMaxInterval(42) + migrator := NewMigrator(migrationContext, "1.2.3") + + RetrySleepFn = func(duration time.Duration) { + // No sleep needed for this test + } + + var tries = 0 + retryable := func() error { + tries++ + if tries == 5 { + // Cancel context on 5th try + migrationContext.CancelContext() + } + return errors.New("Simulated error") + } + + result := migrator.retryOperationWithExponentialBackoff(retryable, false) + assert.Error(t, result) + // Should abort after 6 tries: 5 failures + 1 checkAbort detection + assert.True(t, tries <= 6, "Expected tries <= 6, got %d", tries) + // Verify we got context cancellation error + assert.Contains(t, result.Error(), "context canceled") +} + +func TestMigratorRetrySkipsRetriesForWarnings(t *testing.T) { + oldRetrySleepFn := RetrySleepFn + defer func() { RetrySleepFn = oldRetrySleepFn }() + + migrationContext := base.NewMigrationContext() + migrationContext.SetDefaultNumRetries(100) + migrator := NewMigrator(migrationContext, "1.2.3") + + RetrySleepFn = func(duration time.Duration) { + t.Fatal("Should not sleep/retry for warning errors") + } + + var tries = 0 + retryable := func() error { + tries++ + return errors.New("warnings detected in statement 1 of 1: [Warning: Duplicate entry 'test' for key 'idx' (1062)]") + } + + result := migrator.retryOperation(retryable, false) + assert.Error(t, result) + // Should only try once - no retries for warnings + assert.Equal(t, 1, tries, "Expected exactly 1 try (no retries) for warning error") + assert.Contains(t, result.Error(), "warnings detected") +} + +func TestMigratorRetryWithExponentialBackoffSkipsRetriesForWarnings(t *testing.T) { + oldRetrySleepFn := RetrySleepFn + defer func() { RetrySleepFn = oldRetrySleepFn }() + + migrationContext := base.NewMigrationContext() + migrationContext.SetDefaultNumRetries(100) + migrationContext.SetExponentialBackoffMaxInterval(42) + migrator := NewMigrator(migrationContext, "1.2.3") + + RetrySleepFn = func(duration time.Duration) { + t.Fatal("Should not sleep/retry for warning errors") + } + + var tries = 0 + retryable := func() error { + tries++ + return errors.New("warnings detected in statement 1 of 1: [Warning: Duplicate entry 'test' for key 'idx' (1062)]") + } + + result := migrator.retryOperationWithExponentialBackoff(retryable, false) + assert.Error(t, result) + // Should only try once - no retries for warnings + assert.Equal(t, 1, tries, "Expected exactly 1 try (no retries) for warning error") + assert.Contains(t, result.Error(), "warnings detected") +} + func (suite *MigratorTestSuite) TestCutOverLossDataCaseLockGhostBeforeRename() { ctx := context.Background() @@ -1210,3 +1322,102 @@ func TestCheckAbort_DetectsContextCancellation(t *testing.T) { t.Fatal("Expected checkAbort to return error when context is cancelled") } } + +func (suite *MigratorTestSuite) TestPanicOnWarningsDuplicateDuringCutoverWithHighRetries() { + ctx := context.Background() + + // Create table with email column (no unique constraint initially) + _, err := suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY AUTO_INCREMENT, email VARCHAR(100))", getTestTableName())) + suite.Require().NoError(err) + + // Insert initial rows with unique email values - passes pre-flight validation + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (email) VALUES ('user1@example.com')", getTestTableName())) + suite.Require().NoError(err) + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (email) VALUES ('user2@example.com')", getTestTableName())) + suite.Require().NoError(err) + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (email) VALUES ('user3@example.com')", getTestTableName())) + suite.Require().NoError(err) + + // Verify we have 3 rows + var count int + err = suite.db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", getTestTableName())).Scan(&count) + suite.Require().NoError(err) + suite.Require().Equal(3, count) + + // Create postpone flag file + tmpDir, err := os.MkdirTemp("", "gh-ost-postpone-test") + suite.Require().NoError(err) + defer os.RemoveAll(tmpDir) + postponeFlagFile := filepath.Join(tmpDir, "postpone.flag") + err = os.WriteFile(postponeFlagFile, []byte{}, 0644) + suite.Require().NoError(err) + + // Start migration in goroutine + done := make(chan error, 1) + go func() { + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + if err != nil { + done <- err + return + } + + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.InspectorConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + migrationContext.AlterStatementOptions = "ADD UNIQUE KEY unique_email_idx (email)" + migrationContext.HeartbeatIntervalMilliseconds = 100 + migrationContext.PostponeCutOverFlagFile = postponeFlagFile + migrationContext.PanicOnWarnings = true + + // High retry count + exponential backoff means retries will take a long time and fail the test if not properly aborted + migrationContext.SetDefaultNumRetries(30) + migrationContext.CutOverExponentialBackoff = true + migrationContext.SetExponentialBackoffMaxInterval(128) + + migrator := NewMigrator(migrationContext, "0.0.0") + + //nolint:contextcheck + done <- migrator.Migrate() + }() + + // Wait for migration to reach postponed state + // TODO replace this with an actual check for postponed state + time.Sleep(3 * time.Second) + + // Now insert a duplicate email value while migration is postponed + // This simulates data arriving during migration that would violate the unique constraint + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (email) VALUES ('user1@example.com')", getTestTableName())) + suite.Require().NoError(err) + + // Verify we now have 4 rows (including the duplicate) + err = suite.db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", getTestTableName())).Scan(&count) + suite.Require().NoError(err) + suite.Require().Equal(4, count) + + // Unpostpone the migration - gh-ost will now try to apply binlog events with the duplicate + err = os.Remove(postponeFlagFile) + suite.Require().NoError(err) + + // Wait for Migrate() to return - with timeout to detect if it hangs + select { + case migrateErr := <-done: + // Success - Migrate() returned + // It should return an error due to the duplicate + suite.Require().Error(migrateErr, "Expected migration to fail due to duplicate key violation") + suite.Require().Contains(migrateErr.Error(), "Duplicate entry", "Error should mention duplicate entry") + case <-time.After(5 * time.Minute): + suite.FailNow("Migrate() hung and did not return within 5 minutes - failure to abort on warnings in retry loop") + } + + // Verify all 4 rows are still in the original table (no silent data loss) + err = suite.db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", getTestTableName())).Scan(&count) + suite.Require().NoError(err) + suite.Require().Equal(4, count, "Original table should still have all 4 rows") + + // Verify both user1@example.com entries still exist + var duplicateCount int + err = suite.db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE email = 'user1@example.com'", getTestTableName())).Scan(&duplicateCount) + suite.Require().NoError(err) + suite.Require().Equal(2, duplicateCount, "Should have 2 duplicate email entries") +} diff --git a/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/expect_failure b/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/expect_failure index fb8dc562a..5a6e5411e 100644 --- a/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/expect_failure +++ b/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/expect_failure @@ -1 +1 @@ -ERROR warnings detected in statement 1 of 1 +ERROR warnings detected in statement