diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index d690a9f65..743e58bea 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -17,7 +17,7 @@ import ( gomysql "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/replication" - uuid "github.com/google/uuid" + "github.com/google/uuid" "golang.org/x/net/context" ) @@ -31,6 +31,16 @@ type GoMySQLReader struct { // LastTrxCoords are the coordinates of the last transaction completely read. // If using the file coordinates it is binlog position of the transaction's XID event. LastTrxCoords mysql.BinlogCoordinates + // currentTrxCoords is set once per GTIDEvent and shared by all RowsEvents within + // the same transaction. It points to currentCoordinates (a lazy *GTIDBinlogCoordinates), + // which is replaced at the next GTIDEvent — so old entries retain valid references. + // Only accessed from within the StreamEvents goroutine; no mutex needed. + currentTrxCoords mysql.BinlogCoordinates + // lastCommittedCoords is the GTIDBinlogCoordinates from the most recently seen + // XIDEvent (or the initial coordinates). WithPendingGTID aliases its GTIDSet as + // the base for each new in-flight coord, so the Clone is deferred until actually + // needed. Only written from within StreamEvents; no mutex needed. + lastCommittedCoords *mysql.GTIDBinlogCoordinates } func NewGoMySQLReader(migrationContext *base.MigrationContext) *GoMySQLReader { @@ -68,7 +78,8 @@ func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordin // Start sync with specified GTID set or binlog file and position if this.migrationContext.UseGTIDs { coords := coordinates.(*mysql.GTIDBinlogCoordinates) - this.binlogStreamer, err = this.binlogSyncer.StartSyncGTID(coords.GTIDSet) + this.lastCommittedCoords = coords + this.binlogStreamer, err = this.binlogSyncer.StartSyncGTID(coords.GTIDSet()) } else { coords := this.currentCoordinates.(*mysql.FileBinlogCoordinates) this.binlogStreamer, err = this.binlogSyncer.StartSync(gomysql.Position{ @@ -86,11 +97,21 @@ func (this *GoMySQLReader) GetCurrentBinlogCoordinates() mysql.BinlogCoordinates } func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error { - currentCoords := this.GetCurrentBinlogCoordinates() + var currentCoords mysql.BinlogCoordinates + if this.migrationContext.UseGTIDs && this.currentTrxCoords != nil { + currentCoords = this.currentTrxCoords + } else { + currentCoords = this.GetCurrentBinlogCoordinates() + } + dml := ToEventDML(ev.Header.EventType.String()) if dml == NotDML { - return fmt.Errorf("Unknown DML type: %s", ev.Header.EventType.String()) + return fmt.Errorf("Unknown DML type: %v", ev.Header.EventType) } + + schemaName := string(rowsEvent.Table.Schema) + tableName := string(rowsEvent.Table.Table) + for i, row := range rowsEvent.Rows { if dml == UpdateDML && i%2 == 1 { // An update has two rows (WHERE+SET) @@ -98,11 +119,8 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven continue } binlogEntry := NewBinlogEntryAt(currentCoords) - binlogEntry.DmlEvent = NewBinlogDMLEvent( - string(rowsEvent.Table.Schema), - string(rowsEvent.Table.Table), - dml, - ) + binlogEntry.DmlEvent = NewBinlogDMLEvent(schemaName, tableName, dml) + switch dml { case InsertDML: { @@ -166,14 +184,11 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha if err != nil { return err } + pending := this.lastCommittedCoords.WithPendingGTID(sid, event.GNO) this.currentCoordinatesMutex.Lock() - if this.LastTrxCoords != nil { - this.currentCoordinates = this.LastTrxCoords.Clone() - } - coords := this.currentCoordinates.(*mysql.GTIDBinlogCoordinates) - trxGset := gomysql.NewUUIDSet(sid, gomysql.Interval{Start: event.GNO, Stop: event.GNO + 1}) - coords.GTIDSet.AddSet(trxGset) + this.currentCoordinates = pending this.currentCoordinatesMutex.Unlock() + this.currentTrxCoords = pending case *replication.RotateEvent: if this.migrationContext.UseGTIDs { continue @@ -185,7 +200,12 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha this.currentCoordinatesMutex.Unlock() case *replication.XIDEvent: if this.migrationContext.UseGTIDs { - this.LastTrxCoords = &mysql.GTIDBinlogCoordinates{GTIDSet: event.GSet.(*gomysql.MysqlGTIDSet)} + // go-mysql allocates a fresh MysqlGTIDSet for every XIDEvent, so we can + // alias event.GSet directly. gtidSet is unexported; mutation from outside + // the mysql package is not possible. + committed := mysql.NewGTIDBinlogCoordinatesFromSet(event.GSet.(*gomysql.MysqlGTIDSet)) + this.LastTrxCoords = committed + this.lastCommittedCoords = committed } else { this.LastTrxCoords = this.currentCoordinates.Clone() } diff --git a/go/binlog/streaming_bench_test.go b/go/binlog/streaming_bench_test.go new file mode 100644 index 000000000..fca5e83e3 --- /dev/null +++ b/go/binlog/streaming_bench_test.go @@ -0,0 +1,208 @@ +package binlog + +import ( + "fmt" + "io" + "os" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/github/gh-ost/go/base" + "github.com/github/gh-ost/go/mysql" + gomysql "github.com/go-mysql-org/go-mysql/mysql" + "github.com/go-mysql-org/go-mysql/replication" + guuid "github.com/google/uuid" +) + +const ( + benchTxCount = 1_000 + benchRowsPerTx = 5 +) + +const benchNumUUIDs = 182 + +// benchServerSID is a fixed synthetic UUID used as the "active" server in benchmarks. +var benchServerSID = guuid.MustParse("aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee") + +// buildSyntheticGTIDSet generates a realistic-sized GTID set with numUUIDs unique +// server UUIDs, each with a transaction range of 1 to a varying upper bound. +func buildSyntheticGTIDSet(numUUIDs int) *gomysql.MysqlGTIDSet { + set := new(gomysql.MysqlGTIDSet) + set.Sets = make(map[string]*gomysql.UUIDSet) + for i := 0; i < numUUIDs; i++ { + // Deterministic UUIDs seeded from index + sid := guuid.MustParse(fmt.Sprintf("%08x-0000-4000-8000-%012x", i, i)) + gno := int64(1_000_000 + i*100_000) + uuidSet := gomysql.NewUUIDSet(sid, gomysql.Interval{Start: 1, Stop: gno + 1}) + set.Sets[sid.String()] = uuidSet + } + // Include the benchmark server UUID so the initial set size stays consistent + // throughout the benchmark (no extra map entry created on first AddSet). + set.Sets[benchServerSID.String()] = gomysql.NewUUIDSet(benchServerSID, gomysql.Interval{Start: 1, Stop: 2}) + return set +} + +func buildGTIDEvents(initialSet *gomysql.MysqlGTIDSet) []*replication.BinlogEvent { + events := make([]*replication.BinlogEvent, 0, benchTxCount*(benchRowsPerTx+2)) + accSet := initialSet.Clone().(*gomysql.MysqlGTIDSet) + sid := benchServerSID + sidBytes, _ := sid.MarshalBinary() + + for i := 0; i < benchTxCount; i++ { + gno := int64(i + 1) + + events = append(events, &replication.BinlogEvent{ + Header: &replication.EventHeader{EventType: replication.GTID_EVENT}, + Event: &replication.GTIDEvent{SID: sidBytes, GNO: gno}, + }) + + for r := 0; r < benchRowsPerTx; r++ { + events = append(events, &replication.BinlogEvent{ + Header: &replication.EventHeader{ + EventType: replication.WRITE_ROWS_EVENTv2, + LogPos: uint32(i*1000 + r + 1), + EventSize: 100, + }, + Event: &replication.RowsEvent{ + Table: &replication.TableMapEvent{ + Schema: []byte("mydb"), + Table: []byte("orders"), + }, + Rows: [][]interface{}{{int64(i), "value"}}, + }, + }) + } + + trxGset := gomysql.NewUUIDSet(sid, gomysql.Interval{Start: gno, Stop: gno + 1}) + accSet.AddSet(trxGset) + + events = append(events, &replication.BinlogEvent{ + Header: &replication.EventHeader{EventType: replication.XID_EVENT}, + Event: &replication.XIDEvent{GSet: accSet.Clone()}, + }) + } + return events +} + +func buildFileEvents() []*replication.BinlogEvent { + events := make([]*replication.BinlogEvent, 0, benchTxCount*(benchRowsPerTx+1)) + + for i := 0; i < benchTxCount; i++ { + for r := 0; r < benchRowsPerTx; r++ { + events = append(events, &replication.BinlogEvent{ + Header: &replication.EventHeader{ + EventType: replication.WRITE_ROWS_EVENTv2, + LogPos: uint32(i*1000 + r + 1), + EventSize: 100, + }, + Event: &replication.RowsEvent{ + Table: &replication.TableMapEvent{ + Schema: []byte("mydb"), + Table: []byte("orders"), + }, + Rows: [][]interface{}{{int64(i), "value"}}, + }, + }) + } + + events = append(events, &replication.BinlogEvent{ + Header: &replication.EventHeader{ + EventType: replication.XID_EVENT, + LogPos: uint32(i*1000 + benchRowsPerTx + 1), + }, + Event: &replication.XIDEvent{}, + }) + } + return events +} + +// feedAndRun feeds events into a fresh streamer concurrently with StreamEvents. +// This avoids b.N scaling issues caused by heavy pre-fill setup dominating over +// the (very fast) file-mode processing time. +func feedAndRun(b *testing.B, label string, useGTIDs bool, events []*replication.BinlogEvent, initialCoords mysql.BinlogCoordinates) { + b.ReportAllocs() + + var iterations atomic.Int64 + done := make(chan struct{}) + + go func() { + spinner := []string{"|", "/", "-", "\\"} + tick := time.NewTicker(500 * time.Millisecond) + defer tick.Stop() + frame := 0 + for { + select { + case <-done: + fmt.Fprintf(os.Stderr, "\r%-30s done (%d iters) \n", label, iterations.Load()) + return + case <-tick.C: + fmt.Fprintf(os.Stderr, "\r%-30s %s iter %d", label, spinner[frame%4], iterations.Load()) + frame++ + } + } + }() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + // Small channel — events flow through as StreamEvents consumes them. + s := replication.NewBinlogStreamer() + + ctx := &base.MigrationContext{} + ctx.UseGTIDs = useGTIDs + reader := &GoMySQLReader{ + migrationContext: ctx, + currentCoordinatesMutex: &sync.Mutex{}, + currentCoordinates: initialCoords.Clone(), + binlogStreamer: s, + } + if useGTIDs { + reader.lastCommittedCoords = initialCoords.(*mysql.GTIDBinlogCoordinates) + } + entriesCh := make(chan *BinlogEntry, 100) + + // Feed events concurrently so AddEventToStreamer never blocks. + var feedDone sync.WaitGroup + feedDone.Add(1) + go func() { + defer feedDone.Done() + for _, ev := range events { + s.AddEventToStreamer(ev) + } + s.AddErrorToStreamer(io.EOF) + }() + + // Drain entries so StreamEvents never blocks writing to entriesCh. + var drainDone sync.WaitGroup + drainDone.Add(1) + go func() { + defer drainDone.Done() + for range entriesCh { + } + }() + + reader.StreamEvents(func() bool { return false }, entriesCh) + feedDone.Wait() + close(entriesCh) + drainDone.Wait() + + iterations.Add(1) + } + + close(done) +} + +func BenchmarkStreamingGTID(b *testing.B) { + initialSet := buildSyntheticGTIDSet(benchNumUUIDs) + events := buildGTIDEvents(initialSet) + initialCoords := mysql.NewGTIDBinlogCoordinatesFromSet(initialSet) + feedAndRun(b, fmt.Sprintf("GTID (%d UUIDs)", benchNumUUIDs), true, events, initialCoords) +} + +func BenchmarkStreamingFile(b *testing.B) { + events := buildFileEvents() + initialCoords := &mysql.FileBinlogCoordinates{LogFile: "mysql-bin.000001", LogPos: 0} + feedAndRun(b, "File", false, events, initialCoords) +} diff --git a/go/logic/streamer.go b/go/logic/streamer.go index 63afc3f3d..ec50c3a24 100644 --- a/go/logic/streamer.go +++ b/go/logic/streamer.go @@ -153,7 +153,7 @@ func (this *EventsStreamer) readCurrentBinlogCoordinates() error { if err != nil { return err } - this.initialBinlogCoordinates = &mysql.GTIDBinlogCoordinates{GTIDSet: gtidSet.(*gomysql.MysqlGTIDSet)} + this.initialBinlogCoordinates = mysql.NewGTIDBinlogCoordinatesFromSet(gtidSet.(*gomysql.MysqlGTIDSet)) } else { this.initialBinlogCoordinates = &mysql.FileBinlogCoordinates{ LogFile: m.GetString("File"), diff --git a/go/mysql/binlog_file_test.go b/go/mysql/binlog_file_test.go index 50d513698..5a85c946a 100644 --- a/go/mysql/binlog_file_test.go +++ b/go/mysql/binlog_file_test.go @@ -48,14 +48,14 @@ func TestBinlogCoordinates(t *testing.T) { 48e2bc1d-d66d-11e8-bf56-a0369f9437b8:1, 492e2980-4518-11e9-92c6-e4434b3eca94:1-4926754399`) - c5 := GTIDBinlogCoordinates{GTIDSet: gtidSet1.(*gomysql.MysqlGTIDSet)} - c6 := GTIDBinlogCoordinates{GTIDSet: gtidSet1.(*gomysql.MysqlGTIDSet)} - c7 := GTIDBinlogCoordinates{GTIDSet: gtidSet2.(*gomysql.MysqlGTIDSet)} - c8 := GTIDBinlogCoordinates{GTIDSet: gtidSet3.(*gomysql.MysqlGTIDSet)} - c9 := GTIDBinlogCoordinates{GTIDSet: gtidSetBig1.(*gomysql.MysqlGTIDSet)} - c10 := GTIDBinlogCoordinates{GTIDSet: gtidSetBig2.(*gomysql.MysqlGTIDSet)} - - require.True(t, c5.Equals(&c6)) + c5 := NewGTIDBinlogCoordinatesFromSet(gtidSet1.(*gomysql.MysqlGTIDSet)) + c6 := NewGTIDBinlogCoordinatesFromSet(gtidSet1.(*gomysql.MysqlGTIDSet)) + c7 := NewGTIDBinlogCoordinatesFromSet(gtidSet2.(*gomysql.MysqlGTIDSet)) + c8 := NewGTIDBinlogCoordinatesFromSet(gtidSet3.(*gomysql.MysqlGTIDSet)) + c9 := NewGTIDBinlogCoordinatesFromSet(gtidSetBig1.(*gomysql.MysqlGTIDSet)) + c10 := NewGTIDBinlogCoordinatesFromSet(gtidSetBig2.(*gomysql.MysqlGTIDSet)) + + require.True(t, c5.Equals(c6)) require.True(t, c1.Equals(&c2)) require.False(t, c1.Equals(&c3)) require.False(t, c1.Equals(&c4)) @@ -70,10 +70,10 @@ func TestBinlogCoordinates(t *testing.T) { require.True(t, c1.SmallerThanOrEquals(&c3)) require.True(t, c1.SmallerThanOrEquals(&c2)) require.True(t, c1.SmallerThanOrEquals(&c3)) - require.True(t, c6.SmallerThanOrEquals(&c7)) - require.True(t, c7.SmallerThanOrEquals(&c8)) - require.True(t, c9.SmallerThanOrEquals(&c9)) - require.True(t, c9.SmallerThanOrEquals(&c10)) + require.True(t, c6.SmallerThanOrEquals(c7)) + require.True(t, c7.SmallerThanOrEquals(c8)) + require.True(t, c9.SmallerThanOrEquals(c9)) + require.True(t, c9.SmallerThanOrEquals(c10)) } func TestBinlogCoordinatesAsKey(t *testing.T) { diff --git a/go/mysql/binlog_gtid.go b/go/mysql/binlog_gtid.go index d7b86c04f..4e4411e40 100644 --- a/go/mysql/binlog_gtid.go +++ b/go/mysql/binlog_gtid.go @@ -6,82 +6,104 @@ package mysql import ( + "fmt" + "sync" + gomysql "github.com/go-mysql-org/go-mysql/mysql" + uuid "github.com/google/uuid" ) // GTIDBinlogCoordinates describe binary log coordinates in MySQL GTID format. +// In pending mode (pendingGNO != 0), gtidSet is the base from the last committed +// transaction and pendingSID:pendingGNO is the in-flight GTID. The materialized +// set is computed lazily and cached on first use. type GTIDBinlogCoordinates struct { - GTIDSet *gomysql.MysqlGTIDSet - UUIDSet *gomysql.UUIDSet + gtidSet *gomysql.MysqlGTIDSet + pendingSID uuid.UUID + pendingGNO int64 + once sync.Once + resolved *gomysql.MysqlGTIDSet } -// NewGTIDBinlogCoordinates parses a MySQL GTID set into a *GTIDBinlogCoordinates struct. func NewGTIDBinlogCoordinates(gtidSet string) (*GTIDBinlogCoordinates, error) { set, err := gomysql.ParseMysqlGTIDSet(gtidSet) - return >IDBinlogCoordinates{ - GTIDSet: set.(*gomysql.MysqlGTIDSet), - }, err + return >IDBinlogCoordinates{gtidSet: set.(*gomysql.MysqlGTIDSet)}, err +} + +func NewGTIDBinlogCoordinatesFromSet(set *gomysql.MysqlGTIDSet) *GTIDBinlogCoordinates { + return >IDBinlogCoordinates{gtidSet: set} +} + +func (g *GTIDBinlogCoordinates) GTIDSet() *gomysql.MysqlGTIDSet { + return g.gtidSet +} + +// WithPendingGTID returns a new pending coordinate using g.gtidSet as the base. +// g.gtidSet is aliased without cloning; g must not be modified after this call. +func (g *GTIDBinlogCoordinates) WithPendingGTID(sid uuid.UUID, gno int64) *GTIDBinlogCoordinates { + return >IDBinlogCoordinates{gtidSet: g.resolvedGTIDSet(), pendingSID: sid, pendingGNO: gno} } -// DisplayString returns a user-friendly string representation of these current UUID set or the full GTID set. -func (this *GTIDBinlogCoordinates) DisplayString() string { - if this.UUIDSet != nil { - return this.UUIDSet.String() +func (g *GTIDBinlogCoordinates) resolvedGTIDSet() *gomysql.MysqlGTIDSet { + if g.pendingGNO != 0 { + g.once.Do(func() { + set := g.gtidSet.Clone().(*gomysql.MysqlGTIDSet) + set.AddGTID(g.pendingSID, g.pendingGNO) + g.resolved = set + }) + return g.resolved } - return this.String() + return g.gtidSet } -// String returns a user-friendly string representation of these full GTID set. -func (this GTIDBinlogCoordinates) String() string { - return this.GTIDSet.String() +// DisplayString returns sid:gno in pending mode, otherwise the full GTID set string. +func (g *GTIDBinlogCoordinates) DisplayString() string { + if g.pendingGNO != 0 { + return fmt.Sprintf("%s:%d", g.pendingSID, g.pendingGNO) + } + return g.String() +} + +func (g *GTIDBinlogCoordinates) String() string { + return g.resolvedGTIDSet().String() } -// Equals tests equality of this coordinate and another one. -func (this *GTIDBinlogCoordinates) Equals(other BinlogCoordinates) bool { - if other == nil || this.IsEmpty() || other.IsEmpty() { +func (g *GTIDBinlogCoordinates) Equals(other BinlogCoordinates) bool { + if other == nil || g.IsEmpty() || other.IsEmpty() { return false } - otherCoords, ok := other.(*GTIDBinlogCoordinates) if !ok { return false } - - return this.GTIDSet.Equal(otherCoords.GTIDSet) + return g.resolvedGTIDSet().Equal(otherCoords.resolvedGTIDSet()) } -// IsEmpty returns true if the GTID set is empty. -func (this *GTIDBinlogCoordinates) IsEmpty() bool { - return this.GTIDSet == nil +func (g *GTIDBinlogCoordinates) IsEmpty() bool { + return g.gtidSet == nil } -// SmallerThan returns true if this coordinate is strictly smaller than the other. -func (this *GTIDBinlogCoordinates) SmallerThan(other BinlogCoordinates) bool { - if other == nil || this.IsEmpty() || other.IsEmpty() { +func (g *GTIDBinlogCoordinates) SmallerThan(other BinlogCoordinates) bool { + if other == nil || g.IsEmpty() || other.IsEmpty() { return false } otherCoords, ok := other.(*GTIDBinlogCoordinates) if !ok { return false } - // if 'this' does not contain the same sets we assume we are behind 'other'. // there are probably edge cases where this isn't true - return !this.GTIDSet.Contain(otherCoords.GTIDSet) + return !g.resolvedGTIDSet().Contain(otherCoords.resolvedGTIDSet()) } -// SmallerThanOrEquals returns true if this coordinate is the same or equal to the other one. -func (this *GTIDBinlogCoordinates) SmallerThanOrEquals(other BinlogCoordinates) bool { - return this.Equals(other) || this.SmallerThan(other) +func (g *GTIDBinlogCoordinates) SmallerThanOrEquals(other BinlogCoordinates) bool { + return g.Equals(other) || g.SmallerThan(other) } -func (this *GTIDBinlogCoordinates) Clone() BinlogCoordinates { +func (g *GTIDBinlogCoordinates) Clone() BinlogCoordinates { out := >IDBinlogCoordinates{} - if this.GTIDSet != nil { - out.GTIDSet = this.GTIDSet.Clone().(*gomysql.MysqlGTIDSet) - } - if this.UUIDSet != nil { - out.UUIDSet = this.UUIDSet.Clone() + if g.gtidSet != nil { + out.gtidSet = g.resolvedGTIDSet().Clone().(*gomysql.MysqlGTIDSet) } return out }