Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions source-mysql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,13 @@ type binlogStatus struct {

// queryBinlogStatus fetches the current binary logging position and configuration using
// the SHOW MASTER STATUS / SHOW BINARY LOG STATUS query (depending on the server version).
// In read-replica mode it uses SHOW REPLICA STATUS instead since the primary's binlog
// status queries are not available on a read replica.
func (db *mysqlDatabase) queryBinlogStatus() (*binlogStatus, error) {
if db.featureFlags["cluster_read_replica"] {
return db.queryReplicaBinlogStatus()
}

// The 'SHOW MASTER STATUS' query is the only form that works on MySQL <8.0.22, but
// support was dropped in MySQL 8.4.0 so we have to select the appropriate query for
// the server version we're connected to.
Expand Down Expand Up @@ -187,6 +193,53 @@ func (db *mysqlDatabase) queryBinlogStatus() (*binlogStatus, error) {
}, nil
}

// queryReplicaBinlogStatus uses SHOW REPLICA STATUS to determine the current binlog
// position on a cluster read replica. It reads the Source_Log_File and Read_Source_Log_Pos
// columns, which represent the position in the source's binlog that the replica's I/O
// thread has read up to.
func (db *mysqlDatabase) queryReplicaBinlogStatus() (*binlogStatus, error) {
var statusQuery = "SHOW REPLICA STATUS;"

var results, err = db.conn.Execute(statusQuery)
if err != nil {
return nil, fmt.Errorf("error querying replica status: %w", err)
}
if len(results.Values) == 0 {
return nil, fmt.Errorf("error querying replica status: empty result set (is this a replica?)")
}
defer results.Close()

// Build a map of column name to index for easy lookup
var columnIndex = make(map[string]int)
for idx, field := range results.Fields {
columnIndex[string(field.Name)] = idx
}

var row = results.Values[0]
fileIdx, ok := columnIndex["Source_Log_File"]
if !ok {
return nil, fmt.Errorf("error querying replica status: column Source_Log_File not found")
}
posIdx, ok := columnIndex["Read_Source_Log_Pos"]
if !ok {
return nil, fmt.Errorf("error querying replica status: column Read_Source_Log_Pos not found")
}
var filename = string(row[fileIdx].AsString())
var offset = uint32(row[posIdx].AsInt64())

logrus.WithFields(logrus.Fields{
"file": filename,
"offset": offset,
}).Info("queried replica binlog status")

return &binlogStatus{
Position: mysql.Position{
Name: filename,
Pos: offset,
},
}, nil
}

func (db *mysqlDatabase) queryBinlogPosition() (mysql.Position, error) {
var status, err = db.queryBinlogStatus()
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions source-mysql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ var featureFlagDefaults = map[string]bool{
// - Recommended Collection Names
// - Internal StreamIDs
"case_sensitive_table_names": false,

// When set, the connector will operate in a mode compatible with Aurora cluster
// reader instances. This skips the `log_bin = ON` prerequisite check and uses
// `SHOW REPLICA STATUS` to determine the current binlog position instead of
// `SHOW BINARY LOG STATUS` / `SHOW MASTER STATUS`.
"cluster_read_replica": false,
}

type sshForwarding struct {
Expand Down
4 changes: 4 additions & 0 deletions source-mysql/prerequisites.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ func (db *mysqlDatabase) prerequisiteVersion(_ context.Context) error {
}

func (db *mysqlDatabase) prerequisiteBinlogEnabled(ctx context.Context) error {
if db.featureFlags["cluster_read_replica"] {
logrus.Info("skipping log_bin check for cluster read replica")
return nil
}
var results, err = db.conn.Execute(`SHOW VARIABLES LIKE 'log_bin';`)
if err != nil {
return fmt.Errorf("unable to query 'log_bin' system variable: %w", err)
Expand Down
Loading