diff --git a/source-mysql/database.go b/source-mysql/database.go index 7e7fdf81ee..92918cd646 100644 --- a/source-mysql/database.go +++ b/source-mysql/database.go @@ -144,7 +144,13 @@ type binlogStatus struct { // queryBinlogStatus fetches the current binary logging position and configuration using // the SHOW MASTER STATUS / SHOW BINARY LOG STATUS query (depending on the server version). +// In read-replica mode it uses SHOW REPLICA STATUS instead since the primary's binlog +// status queries are not available on a read replica. func (db *mysqlDatabase) queryBinlogStatus() (*binlogStatus, error) { + if db.featureFlags["cluster_read_replica"] { + return db.queryReplicaBinlogStatus() + } + // The 'SHOW MASTER STATUS' query is the only form that works on MySQL <8.0.22, but // support was dropped in MySQL 8.4.0 so we have to select the appropriate query for // the server version we're connected to. @@ -187,6 +193,53 @@ func (db *mysqlDatabase) queryBinlogStatus() (*binlogStatus, error) { }, nil } +// queryReplicaBinlogStatus uses SHOW REPLICA STATUS to determine the current binlog +// position on a cluster read replica. It reads the Source_Log_File and Read_Source_Log_Pos +// columns, which represent the position in the source's binlog that the replica's I/O +// thread has read up to. +func (db *mysqlDatabase) queryReplicaBinlogStatus() (*binlogStatus, error) { + var statusQuery = "SHOW REPLICA STATUS;" + + var results, err = db.conn.Execute(statusQuery) + if err != nil { + return nil, fmt.Errorf("error querying replica status: %w", err) + } + if len(results.Values) == 0 { + return nil, fmt.Errorf("error querying replica status: empty result set (is this a replica?)") + } + defer results.Close() + + // Build a map of column name to index for easy lookup + var columnIndex = make(map[string]int) + for idx, field := range results.Fields { + columnIndex[string(field.Name)] = idx + } + + var row = results.Values[0] + fileIdx, ok := columnIndex["Source_Log_File"] + if !ok { + return nil, fmt.Errorf("error querying replica status: column Source_Log_File not found") + } + posIdx, ok := columnIndex["Read_Source_Log_Pos"] + if !ok { + return nil, fmt.Errorf("error querying replica status: column Read_Source_Log_Pos not found") + } + var filename = string(row[fileIdx].AsString()) + var offset = uint32(row[posIdx].AsInt64()) + + logrus.WithFields(logrus.Fields{ + "file": filename, + "offset": offset, + }).Info("queried replica binlog status") + + return &binlogStatus{ + Position: mysql.Position{ + Name: filename, + Pos: offset, + }, + }, nil +} + func (db *mysqlDatabase) queryBinlogPosition() (mysql.Position, error) { var status, err = db.queryBinlogStatus() if err != nil { diff --git a/source-mysql/main.go b/source-mysql/main.go index 0490e87447..61fc9a4007 100644 --- a/source-mysql/main.go +++ b/source-mysql/main.go @@ -49,6 +49,12 @@ var featureFlagDefaults = map[string]bool{ // - Recommended Collection Names // - Internal StreamIDs "case_sensitive_table_names": false, + + // When set, the connector will operate in a mode compatible with Aurora cluster + // reader instances. This skips the `log_bin = ON` prerequisite check and uses + // `SHOW REPLICA STATUS` to determine the current binlog position instead of + // `SHOW BINARY LOG STATUS` / `SHOW MASTER STATUS`. + "cluster_read_replica": false, } type sshForwarding struct { diff --git a/source-mysql/prerequisites.go b/source-mysql/prerequisites.go index 9ef77c8719..4e997f4176 100644 --- a/source-mysql/prerequisites.go +++ b/source-mysql/prerequisites.go @@ -62,6 +62,10 @@ func (db *mysqlDatabase) prerequisiteVersion(_ context.Context) error { } func (db *mysqlDatabase) prerequisiteBinlogEnabled(ctx context.Context) error { + if db.featureFlags["cluster_read_replica"] { + logrus.Info("skipping log_bin check for cluster read replica") + return nil + } var results, err = db.conn.Execute(`SHOW VARIABLES LIKE 'log_bin';`) if err != nil { return fmt.Errorf("unable to query 'log_bin' system variable: %w", err)