Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 40 additions & 15 deletions go/binlog/gomysql_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

gomysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
uuid "github.com/google/uuid"
"github.com/google/uuid"
"golang.org/x/net/context"
)

Expand All @@ -31,6 +31,16 @@ type GoMySQLReader struct {
// LastTrxCoords are the coordinates of the last transaction completely read.
// If using the file coordinates it is binlog position of the transaction's XID event.
LastTrxCoords mysql.BinlogCoordinates
// currentTrxCoords is set once per GTIDEvent and shared by all RowsEvents within
// the same transaction. It points to currentCoordinates (a lazy *GTIDBinlogCoordinates),
// which is replaced at the next GTIDEvent — so old entries retain valid references.
// Only accessed from within the StreamEvents goroutine; no mutex needed.
currentTrxCoords mysql.BinlogCoordinates
// lastCommittedCoords is the GTIDBinlogCoordinates from the most recently seen
// XIDEvent (or the initial coordinates). WithPendingGTID aliases its GTIDSet as
// the base for each new in-flight coord, so the Clone is deferred until actually
// needed. Only written from within StreamEvents; no mutex needed.
lastCommittedCoords *mysql.GTIDBinlogCoordinates
}

func NewGoMySQLReader(migrationContext *base.MigrationContext) *GoMySQLReader {
Expand Down Expand Up @@ -68,6 +78,7 @@ func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordin
// Start sync with specified GTID set or binlog file and position
if this.migrationContext.UseGTIDs {
coords := coordinates.(*mysql.GTIDBinlogCoordinates)
this.lastCommittedCoords = coords
this.binlogStreamer, err = this.binlogSyncer.StartSyncGTID(coords.GTIDSet)
} else {
coords := this.currentCoordinates.(*mysql.FileBinlogCoordinates)
Expand All @@ -86,23 +97,31 @@ func (this *GoMySQLReader) GetCurrentBinlogCoordinates() mysql.BinlogCoordinates
}

func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error {
currentCoords := this.GetCurrentBinlogCoordinates()
var currentCoords mysql.BinlogCoordinates
if this.migrationContext.UseGTIDs && this.currentTrxCoords != nil {
currentCoords = this.currentTrxCoords
} else {
currentCoords = this.GetCurrentBinlogCoordinates()
}

dml := ToEventDML(ev.Header.EventType.String())
if dml == NotDML {
return fmt.Errorf("Unknown DML type: %s", ev.Header.EventType.String())
return fmt.Errorf("Unknown DML type: %v", ev.Header.EventType)
}

// Convert schema and table names once per RowsEvent, not per row
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we don't need this specific comment

schemaName := string(rowsEvent.Table.Schema)
tableName := string(rowsEvent.Table.Table)

for i, row := range rowsEvent.Rows {
if dml == UpdateDML && i%2 == 1 {
// An update has two rows (WHERE+SET)
// We do both at the same time
continue
}
binlogEntry := NewBinlogEntryAt(currentCoords)
binlogEntry.DmlEvent = NewBinlogDMLEvent(
string(rowsEvent.Table.Schema),
string(rowsEvent.Table.Table),
dml,
)
binlogEntry.DmlEvent = NewBinlogDMLEvent(schemaName, tableName, dml)

switch dml {
case InsertDML:
{
Expand Down Expand Up @@ -166,14 +185,11 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
if err != nil {
return err
}
pending := this.lastCommittedCoords.WithPendingGTID(sid, event.GNO)
this.currentCoordinatesMutex.Lock()
if this.LastTrxCoords != nil {
this.currentCoordinates = this.LastTrxCoords.Clone()
}
coords := this.currentCoordinates.(*mysql.GTIDBinlogCoordinates)
trxGset := gomysql.NewUUIDSet(sid, gomysql.Interval{Start: event.GNO, Stop: event.GNO + 1})
coords.GTIDSet.AddSet(trxGset)
this.currentCoordinates = pending
this.currentCoordinatesMutex.Unlock()
this.currentTrxCoords = pending
case *replication.RotateEvent:
if this.migrationContext.UseGTIDs {
continue
Expand All @@ -185,7 +201,16 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
this.currentCoordinatesMutex.Unlock()
case *replication.XIDEvent:
if this.migrationContext.UseGTIDs {
this.LastTrxCoords = &mysql.GTIDBinlogCoordinates{GTIDSet: event.GSet.(*gomysql.MysqlGTIDSet)}
// go-mysql allocates a fresh MysqlGTIDSet for every XIDEvent it decodes
// from the binlog stream, so we can alias event.GSet directly without
// cloning it. The pointer is then shared by LastTrxCoords and
// lastCommittedCoords. lastCommittedCoords is subsequently used as the
// base inside WithPendingGTID: it is cloned there only if a comparison
// or string representation is actually requested, and never mutated.
// Any future code that modifies the set after this point must Clone first.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any future code that modifies the set after this point must Clone first. I don't really like this being available for potential accidents in the future

committed := &mysql.GTIDBinlogCoordinates{GTIDSet: event.GSet.(*gomysql.MysqlGTIDSet)}
this.LastTrxCoords = committed
this.lastCommittedCoords = committed
} else {
this.LastTrxCoords = this.currentCoordinates.Clone()
}
Expand Down
208 changes: 208 additions & 0 deletions go/binlog/streaming_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package binlog

import (
"fmt"
"io"
"os"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/mysql"
gomysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
guuid "github.com/google/uuid"
)

const (
benchTxCount = 1_000
benchRowsPerTx = 5
)

const benchNumUUIDs = 182

// benchServerSID is a fixed synthetic UUID used as the "active" server in benchmarks.
var benchServerSID = guuid.MustParse("aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee")

// buildSyntheticGTIDSet generates a realistic-sized GTID set with numUUIDs unique
// server UUIDs, each with a transaction range of 1 to a varying upper bound.
func buildSyntheticGTIDSet(numUUIDs int) *gomysql.MysqlGTIDSet {
set := new(gomysql.MysqlGTIDSet)
set.Sets = make(map[string]*gomysql.UUIDSet)
for i := 0; i < numUUIDs; i++ {
// Deterministic UUIDs seeded from index
sid := guuid.MustParse(fmt.Sprintf("%08x-0000-4000-8000-%012x", i, i))
gno := int64(1_000_000 + i*100_000)
uuidSet := gomysql.NewUUIDSet(sid, gomysql.Interval{Start: 1, Stop: gno + 1})
set.Sets[sid.String()] = uuidSet
}
// Include the benchmark server UUID so the initial set size stays consistent
// throughout the benchmark (no extra map entry created on first AddSet).
set.Sets[benchServerSID.String()] = gomysql.NewUUIDSet(benchServerSID, gomysql.Interval{Start: 1, Stop: 2})
return set
}

func buildGTIDEvents(initialSet *gomysql.MysqlGTIDSet) []*replication.BinlogEvent {
events := make([]*replication.BinlogEvent, 0, benchTxCount*(benchRowsPerTx+2))
accSet := initialSet.Clone().(*gomysql.MysqlGTIDSet)
sid := benchServerSID
sidBytes, _ := sid.MarshalBinary()

for i := 0; i < benchTxCount; i++ {
gno := int64(i + 1)

events = append(events, &replication.BinlogEvent{
Header: &replication.EventHeader{EventType: replication.GTID_EVENT},
Event: &replication.GTIDEvent{SID: sidBytes, GNO: gno},
})

for r := 0; r < benchRowsPerTx; r++ {
events = append(events, &replication.BinlogEvent{
Header: &replication.EventHeader{
EventType: replication.WRITE_ROWS_EVENTv2,
LogPos: uint32(i*1000 + r + 1),
EventSize: 100,
},
Event: &replication.RowsEvent{
Table: &replication.TableMapEvent{
Schema: []byte("mydb"),
Table: []byte("orders"),
},
Rows: [][]interface{}{{int64(i), "value"}},
},
})
}

trxGset := gomysql.NewUUIDSet(sid, gomysql.Interval{Start: gno, Stop: gno + 1})
accSet.AddSet(trxGset)

events = append(events, &replication.BinlogEvent{
Header: &replication.EventHeader{EventType: replication.XID_EVENT},
Event: &replication.XIDEvent{GSet: accSet.Clone()},
})
}
return events
}

func buildFileEvents() []*replication.BinlogEvent {
events := make([]*replication.BinlogEvent, 0, benchTxCount*(benchRowsPerTx+1))

for i := 0; i < benchTxCount; i++ {
for r := 0; r < benchRowsPerTx; r++ {
events = append(events, &replication.BinlogEvent{
Header: &replication.EventHeader{
EventType: replication.WRITE_ROWS_EVENTv2,
LogPos: uint32(i*1000 + r + 1),
EventSize: 100,
},
Event: &replication.RowsEvent{
Table: &replication.TableMapEvent{
Schema: []byte("mydb"),
Table: []byte("orders"),
},
Rows: [][]interface{}{{int64(i), "value"}},
},
})
}

events = append(events, &replication.BinlogEvent{
Header: &replication.EventHeader{
EventType: replication.XID_EVENT,
LogPos: uint32(i*1000 + benchRowsPerTx + 1),
},
Event: &replication.XIDEvent{},
})
}
return events
}

// feedAndRun feeds events into a fresh streamer concurrently with StreamEvents.
// This avoids b.N scaling issues caused by heavy pre-fill setup dominating over
// the (very fast) file-mode processing time.
func feedAndRun(b *testing.B, label string, useGTIDs bool, events []*replication.BinlogEvent, initialCoords mysql.BinlogCoordinates) {
b.ReportAllocs()

var iterations atomic.Int64
done := make(chan struct{})

go func() {
spinner := []string{"|", "/", "-", "\\"}
tick := time.NewTicker(500 * time.Millisecond)
defer tick.Stop()
frame := 0
for {
select {
case <-done:
fmt.Fprintf(os.Stderr, "\r%-30s done (%d iters) \n", label, iterations.Load())
return
case <-tick.C:
fmt.Fprintf(os.Stderr, "\r%-30s %s iter %d", label, spinner[frame%4], iterations.Load())
frame++
}
}
}()

b.ResetTimer()

for i := 0; i < b.N; i++ {
// Small channel — events flow through as StreamEvents consumes them.
s := replication.NewBinlogStreamer()

ctx := &base.MigrationContext{}
ctx.UseGTIDs = useGTIDs
reader := &GoMySQLReader{
migrationContext: ctx,
currentCoordinatesMutex: &sync.Mutex{},
currentCoordinates: initialCoords.Clone(),
binlogStreamer: s,
}
if useGTIDs {
reader.lastCommittedCoords = initialCoords.(*mysql.GTIDBinlogCoordinates)
}
entriesCh := make(chan *BinlogEntry, 100)

// Feed events concurrently so AddEventToStreamer never blocks.
var feedDone sync.WaitGroup
feedDone.Add(1)
go func() {
defer feedDone.Done()
for _, ev := range events {
s.AddEventToStreamer(ev)
}
s.AddErrorToStreamer(io.EOF)
}()

// Drain entries so StreamEvents never blocks writing to entriesCh.
var drainDone sync.WaitGroup
drainDone.Add(1)
go func() {
defer drainDone.Done()
for range entriesCh {
}
}()

reader.StreamEvents(func() bool { return false }, entriesCh)
feedDone.Wait()
close(entriesCh)
drainDone.Wait()

iterations.Add(1)
}

close(done)
}

func BenchmarkStreamingGTID(b *testing.B) {
initialSet := buildSyntheticGTIDSet(benchNumUUIDs)
events := buildGTIDEvents(initialSet)
initialCoords := &mysql.GTIDBinlogCoordinates{GTIDSet: initialSet}
feedAndRun(b, fmt.Sprintf("GTID (%d UUIDs)", benchNumUUIDs), true, events, initialCoords)
}

func BenchmarkStreamingFile(b *testing.B) {
events := buildFileEvents()
initialCoords := &mysql.FileBinlogCoordinates{LogFile: "mysql-bin.000001", LogPos: 0}
feedAndRun(b, "File", false, events, initialCoords)
}
Loading
Loading