Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ linters:
- copyloopvar
- dupl
- errcheck
- errorlint
- funlen
- gocritic
- gocyclo
Expand All @@ -25,6 +26,10 @@ linters:
settings:
dupl:
threshold: 400
errorlint:
errorf: true
asserts: true
comparison: true
funlen:
lines: 150
statements: 100
Expand Down
82 changes: 41 additions & 41 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (app *App) connectDCS() error {
// TODO: support other DCS systems
app.dcs, err = dcs.NewZookeeper(app.baseContext(), &app.config.Zookeeper, app.logger)
if err != nil {
return fmt.Errorf("failed to connect to zkDCS: %s", err.Error())
return fmt.Errorf("failed to connect to zkDCS: %w", err)
}
return nil
}
Expand All @@ -155,7 +155,7 @@ func (app *App) newDBCluster() error {
var err error
app.cluster, err = mysql.NewCluster(app.config, app.logger, app.dcs)
if err != nil {
return fmt.Errorf("failed to create database cluster %s", err.Error())
return fmt.Errorf("failed to create database cluster %w", err)
}
return nil
}
Expand Down Expand Up @@ -348,7 +348,7 @@ func (app *App) checkHAReplicasRunning(local *mysql.Node) (replicasRunning bool,
}
ssstatus, err := node.SemiSyncStatus()
if err != nil {
return fmt.Errorf("%s %v", host, err)
return fmt.Errorf("%s %w", host, err)
}
if ssstatus.SlaveEnabled == 0 {
return fmt.Errorf("replica %s is not semi-sync", host)
Expand Down Expand Up @@ -459,8 +459,8 @@ func (app *App) stateLost() appState {
if localNodeState.IsMaster {
err = node.SetReadOnlyWithForce(app.config.ExcludeUsers, true)

merr, ok := err.(*mysql_driver.MySQLError)
if !errors.Is(err, context.DeadlineExceeded) && (!ok || merr.Number != 1205) { // Error 1205: Lock wait timeout exceeded; try restarting transaction
var merr *mysql_driver.MySQLError
if !errors.Is(err, context.DeadlineExceeded) && (!errors.As(err, &merr) || merr.Number != 1205) { // Error 1205: Lock wait timeout exceeded; try restarting transaction
app.logger.Error().Err(err).Msgf("failed to set node %s read-only", node.Host())
return stateLost
}
Expand Down Expand Up @@ -519,10 +519,10 @@ func (app *App) stateMaintenance() appState {
app.writeMaintenanceFile()
}
maintenance, err := app.GetMaintenance()
if err != nil && err != dcs.ErrNotFound {
if err != nil && !errors.Is(err, dcs.ErrNotFound) {
return stateMaintenance
}
if err == dcs.ErrNotFound || maintenance.ShouldLeave {
if errors.Is(err, dcs.ErrNotFound) || maintenance.ShouldLeave {
return app.tryLeaveMaintenance()
}

Expand Down Expand Up @@ -587,7 +587,7 @@ func (app *App) stateManager() appState {

// check if we are in maintenance
maintenance, err := app.GetMaintenance()
if err != nil && err != dcs.ErrNotFound {
if err != nil && !errors.Is(err, dcs.ErrNotFound) {
app.logger.Error().Err(err).Msg("failed to get maintenance from zk")

// If maintenance file doesn't exist we were in light maintenance mode
Expand Down Expand Up @@ -662,7 +662,7 @@ func (app *App) stateManager() appState {
return stateManager
}
err = app.performSwitchover(clusterState, activeNodes, switchover, master)
if app.dcs.Get(pathCurrentSwitch, new(Switchover)) == dcs.ErrNotFound {
if errors.Is(app.dcs.Get(pathCurrentSwitch, new(Switchover)), dcs.ErrNotFound) {
app.logger.Error().Msgf("switchover was aborted")
} else {
if err != nil {
Expand All @@ -682,7 +682,7 @@ func (app *App) stateManager() appState {
}
return stateManager
}
} else if err != dcs.ErrNotFound {
} else if !errors.Is(err, dcs.ErrNotFound) {
app.logger.Error().Err(err).Msg("")
return stateManager
}
Expand Down Expand Up @@ -911,7 +911,7 @@ func (app *App) approveFailover(clusterState, clusterStateDcs map[string]*nodest

var lastSwitchover Switchover
err = app.dcs.Get(pathLastSwitch, &lastSwitchover)
if err != dcs.ErrNotFound {
if !errors.Is(err, dcs.ErrNotFound) {
if err != nil {
return err
}
Expand All @@ -936,7 +936,7 @@ func (app *App) stateCandidate() appState {
return stateCandidate
}
maintenance, err := app.GetMaintenance()
if err != nil && err != dcs.ErrNotFound {
if err != nil && !errors.Is(err, dcs.ErrNotFound) {
app.logger.Error().Err(err).Msg("candidate: failed to get maintenance from zk")
return stateCandidate
}
Expand Down Expand Up @@ -1253,15 +1253,15 @@ func (app *App) disableSemiSyncOnSlaves(becomeInactive, becomeDataLag []string)
for _, host := range becomeInactive {
err := app.disableSemiSyncOnSlave(host, true)
if err != nil {
err = fmt.Errorf("[%s]: %s", host, err)
err = fmt.Errorf("[%s]: %w", host, err)
nodeErrors = append(nodeErrors, err)
}
}

for _, host := range becomeDataLag {
err := app.disableSemiSyncOnSlave(host, false)
if err != nil {
err = fmt.Errorf("[%s]: %s", host, err)
err = fmt.Errorf("[%s]: %w", host, err)
nodeErrors = append(nodeErrors, err)
continue
}
Expand Down Expand Up @@ -1378,7 +1378,7 @@ func (app *App) performSwitchover(clusterState map[string]*nodestate.NodeState,
if app.config.ForceSwitchover {
err := node.SetOfflineForce()
if err != nil {
return fmt.Errorf("failed to set node %s force offline: %v", host, err)
return fmt.Errorf("failed to set node %s force offline: %w", host, err)
}

defer func() {
Expand All @@ -1393,7 +1393,7 @@ func (app *App) performSwitchover(clusterState map[string]*nodestate.NodeState,
if err != nil || app.emulateError("freeze_ro") {
app.logger.Info().Msgf("switchover: failed to set node %s read-only, trying kill bad queries: %v", host, err)
if err := node.SetReadOnlyWithForce(app.config.ExcludeUsers, true); err != nil {
return fmt.Errorf("failed to set node %s read-only: %v", host, err)
return fmt.Errorf("failed to set node %s read-only: %w", host, err)
}
}
app.logger.Info().Msgf("switchover: host %s set read-only", host)
Expand All @@ -1403,11 +1403,11 @@ func (app *App) performSwitchover(clusterState map[string]*nodestate.NodeState,
// if master was not among activeNodes - there will be no key in errs
// MasterTransition may not be set if issued from worker
if err, ok := errs[oldMaster]; ok && err != nil && switchover.MasterTransition != FailoverTransition {
err = fmt.Errorf("switchover: failed to set old master %s read-only %s", oldMaster, err)
err = fmt.Errorf("switchover: failed to set old master %s read-only %w", oldMaster, err)
app.logger.Info().Msg(err.Error())
switchErr := app.FinishSwitchover(switchover, err)
if switchErr != nil {
return fmt.Errorf("switchover: failed to reject switchover %s", switchErr)
return fmt.Errorf("switchover: failed to reject switchover %w", switchErr)
}
app.logger.Info().Msg("switchover: rejected")
return err
Expand All @@ -1419,7 +1419,7 @@ func (app *App) performSwitchover(clusterState map[string]*nodestate.NodeState,
if clusterState[oldMaster].PingOk {
err := app.externalReplication.Stop(oldMasterNode)
if err != nil {
return fmt.Errorf("got error: %s while stopping external replication on old master: %s", err, oldMaster)
return fmt.Errorf("got error: %w while stopping external replication on old master: %s", err, oldMaster)
}
}

Expand Down Expand Up @@ -1522,7 +1522,7 @@ func (app *App) performSwitchover(clusterState map[string]*nodestate.NodeState,
}
caught, err := app.waitForCatchUp(newMasterNode, mostRecentGtidSet, app.config.SlaveCatchUpTimeout, time.Second)
if err != nil || app.emulateError("catchup_master_status") {
return fmt.Errorf("failed to get gtid executed from %s: %s", newMaster, err)
return fmt.Errorf("failed to get gtid executed from %s: %w", newMaster, err)
}
if !caught || app.emulateError("catchup_failed") {
return fmt.Errorf("new master %s failed to catch up %s within %s",
Expand All @@ -1547,7 +1547,7 @@ func (app *App) performSwitchover(clusterState map[string]*nodestate.NodeState,
app.logger.Info().Msg("switchover: phase 5: turn to the new master")
err = app.cluster.Get(newMaster).SetOnline()
if err != nil {
return fmt.Errorf("got error on setting new master %s online %v", newMaster, err)
return fmt.Errorf("got error on setting new master %s online %w", newMaster, err)
}
errs = util.RunParallel(func(host string) error {
if host == newMaster || !clusterState[host].PingOk {
Expand All @@ -1571,25 +1571,25 @@ func (app *App) performSwitchover(clusterState map[string]*nodestate.NodeState,
if err != nil || oldMasterSlaveStatus == nil || isSlavePermanentlyLost(oldMasterSlaveStatus, mostRecentGtidSet) {
err = app.SetRecovery(oldMaster)
if err != nil {
return fmt.Errorf("failed to set old master %s to recovery: %v", oldMaster, err)
return fmt.Errorf("failed to set old master %s to recovery: %w", oldMaster, err)
}
} else {
app.logger.Info().Msgf("switchover: old master %s does not need recovery", oldMaster)
err = app.externalReplication.Reset(oldMasterNode)
if err != nil {
return fmt.Errorf("got error: %s while resetting external replication on old master: %s", err, oldMaster)
return fmt.Errorf("got error: %w while resetting external replication on old master: %s", err, oldMaster)
}
}

// promote new master
app.logger.Info().Msg("switchover: phase 6: promote new master")
err = newMasterNode.StopSlave()
if err != nil || app.emulateError("promote_stop_slave") {
return fmt.Errorf("failed to stop slave on new master %s: %s", newMaster, err)
return fmt.Errorf("failed to stop slave on new master %s: %w", newMaster, err)
}
err = newMasterNode.ResetSlaveAll()
if err != nil || app.emulateError("promote_reset_slave") {
return fmt.Errorf("failed to promote new master %s: %s", newMaster, err)
return fmt.Errorf("failed to promote new master %s: %w", newMaster, err)
}
app.logger.Info().Msgf("switchover: new master %s promoted", newMaster)

Expand All @@ -1603,14 +1603,14 @@ func (app *App) performSwitchover(clusterState map[string]*nodestate.NodeState,
// set new master writable
err = newMasterNode.SetWritable()
if err != nil || app.emulateError("promote_set_writable") {
return fmt.Errorf("failed to set new master %s writable: %s", newMaster, err)
return fmt.Errorf("failed to set new master %s writable: %w", newMaster, err)
}
app.logger.Info().Msgf("switchover: new master %s set writable", newMaster)

// reenable events
events, err := newMasterNode.ReenableEventsRetry()
if err != nil || app.emulateError("promote_reenable_events") {
return fmt.Errorf("failed to reenable slaveside disabled events on %s: %s", newMaster, err)
return fmt.Errorf("failed to reenable slaveside disabled events on %s: %w", newMaster, err)
}
if len(events) > 0 {
app.logger.Info().Msgf("switchover: events reenabled on %s: %v", newMaster, events)
Expand All @@ -1625,7 +1625,7 @@ func (app *App) performSwitchover(clusterState map[string]*nodestate.NodeState,
// set new master in dcs
err = app.dcs.Set(pathMasterNode, newMaster)
if err != nil || app.emulateError("promote_set_to_dcs") {
return fmt.Errorf("failed to set new master to dcs: %s", err)
return fmt.Errorf("failed to set new master to dcs: %w", err)
}

return nil
Expand Down Expand Up @@ -2239,19 +2239,19 @@ func (app *App) performChangeMaster(host, master string) error {
node := app.cluster.Get(host)
err := node.StopSlave()
if err != nil {
return fmt.Errorf("failed to stop slave on host %s: %s", host, err)
return fmt.Errorf("failed to stop slave on host %s: %w", host, err)
}
app.logger.Info().Msgf("changemaster: host %s replication stopped", host)

err = node.ChangeMaster(master)
if err != nil {
return fmt.Errorf("failed to change master on host %s: %s", host, err)
return fmt.Errorf("failed to change master on host %s: %w", host, err)
}
app.logger.Info().Msgf("changemaster: host %s turned to the new master %s", host, master)

err = node.StartSlave()
if err != nil {
return fmt.Errorf("failed to start slave on host %s: %s", host, err)
return fmt.Errorf("failed to start slave on host %s: %w", host, err)
}

deadline := time.Now().Add(app.config.WaitReplicationStartTimeout)
Expand Down Expand Up @@ -2436,7 +2436,7 @@ func (app *App) getClusterStateFromDcs() (map[string]*nodestate.NodeState, error
getter := func(host string) (*nodestate.NodeState, error) {
nodeState := new(nodestate.NodeState)
err := app.dcs.Get(dcs.JoinPath(pathHealthPrefix, host), nodeState)
if err != nil && err != dcs.ErrNotFound {
if err != nil && !errors.Is(err, dcs.ErrNotFound) {
return nil, err
}
nodeState.ShowOnlyGTIDDiff = app.config.ShowOnlyGTIDDiff
Expand All @@ -2457,7 +2457,7 @@ func (app *App) waitForCatchUp(node *mysql.Node, gtidset gtids.GTIDSet, timeout
return true, nil
}
switchover := new(Switchover)
if app.dcs.Get(pathCurrentSwitch, switchover) == dcs.ErrNotFound {
if errors.Is(app.dcs.Get(pathCurrentSwitch, switchover), dcs.ErrNotFound) {
return false, nil
}
if app.CheckAsyncSwitchAllowed(node, switchover) {
Expand Down Expand Up @@ -2491,12 +2491,12 @@ func (app *App) stopReplicationOnMaster(masterNode *mysql.Node) error {
app.logger.Info().Msgf("setting master %s offline", host)
err := masterNode.SetOffline()
if err != nil {
return fmt.Errorf("cannot set master %s offline: %s", host, err)
return fmt.Errorf("cannot set master %s offline: %w", host, err)
}
app.logger.Info().Msgf("disable semi-sync on master %s", host)
err = masterNode.SemiSyncDisable()
if err != nil {
return fmt.Errorf("failed to disable semi-sync on master %s: %v", host, err)
return fmt.Errorf("failed to disable semi-sync on master %s: %w", host, err)
}

return nil
Expand Down Expand Up @@ -2530,11 +2530,11 @@ func (app *App) getNodePositions(activeNodes []string) ([]nodePosition, error) {
node := app.cluster.Get(host)
sstatus, err := node.GetReplicaStatus()
if err != nil || app.emulateError("freeze_slave_status") {
return fmt.Errorf("failed to get slave status on host %s: %s", host, err)
return fmt.Errorf("failed to get slave status on host %s: %w", host, err)
}
lag, err := node.ReplicationLag(sstatus)
if err != nil || app.emulateError("freeze_slave_status2") {
return fmt.Errorf("failed to get slave replication lag on host %s: %s", host, err)
return fmt.Errorf("failed to get slave replication lag on host %s: %w", host, err)
}
if lag == nil {
// we can treat unknown lag as infinite for the replica compare purpose
Expand All @@ -2548,24 +2548,24 @@ func (app *App) getNodePositions(activeNodes []string) ([]nodePosition, error) {
app.logger.Info().Msgf("switchover: current master found: %s", host)
gtidset, err = node.GTIDExecutedParsed()
if err != nil || app.emulateError("freeze_master_status") {
return fmt.Errorf("failed to get master status on host %s: %s", host, err)
return fmt.Errorf("failed to get master status on host %s: %w", host, err)
}
} else {
gtidset = gtids.ParseGtidSet(sstatus.GetExecutedGtidSet())
if sstatus.GetRetrievedGtidSet() != "" {
// slave may have downloaded but not applied transactions
err := gtidset.Update(sstatus.GetRetrievedGtidSet())
if err != nil {
return fmt.Errorf("failed to parse RetrievedGtidSet from host %s: %s", host, err)
return fmt.Errorf("failed to parse RetrievedGtidSet from host %s: %w", host, err)
}
}
}

var nc mysql.NodeConfiguration
err = app.dcs.Get(dcs.JoinPath(pathHANodes, host), &nc)
if err != nil {
if err != dcs.ErrNotFound && err != dcs.ErrMalformed {
return fmt.Errorf("failed to get priority for host %s: %s", host, err)
if !errors.Is(err, dcs.ErrNotFound) && !errors.Is(err, dcs.ErrMalformed) {
return fmt.Errorf("failed to get priority for host %s: %w", host, err)
}
// Default value
nc = mysql.NodeConfiguration{Priority: 0}
Expand Down
Loading
Loading