Skip to content

Commit e5e833c

Browse files
committed
Add channel-aware replication operations for multi-source support
Phase 3 of issue #77: Make stop/start replication and change master operations channel-aware for named replication channels. - Add FOR CHANNEL clause helpers to QueryStringProvider for all replication commands (stop, start, reset, IO/SQL thread control) - Add StopReplicationForChannel and StartReplicationForChannel that target a specific channel or fall back to the managed channel - Add ChangeMasterToForChannel with FOR CHANNEL clause on all CHANGE MASTER TO variants (GTID, binlog pos, MariaDB, Oracle) - Original StopReplication/StartReplication/ChangeMasterTo delegate to the new functions with empty channel for backward compatibility
1 parent f998dcc commit e5e833c

2 files changed

Lines changed: 99 additions & 13 deletions

File tree

go/inst/instance_topology_dao.go

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,13 @@ func StopReplicas(replicas [](*Instance), stopReplicationMethod StopReplicationM
398398

399399
// StopReplication stops replication on a given instance
400400
func StopReplication(instanceKey *InstanceKey) (*Instance, error) {
401+
return StopReplicationForChannel(instanceKey, "")
402+
}
403+
404+
// StopReplicationForChannel stops replication on a given instance, optionally for a specific channel.
405+
// When channelName is empty, it stops all replication (backward compatible behavior).
406+
// When channelName is specified, it stops only that channel.
407+
func StopReplicationForChannel(instanceKey *InstanceKey, channelName string) (*Instance, error) {
401408
instance, err := ReadTopologyInstance(instanceKey)
402409
if err != nil {
403410
return instance, log.Errore(err)
@@ -407,7 +414,13 @@ func StopReplication(instanceKey *InstanceKey) (*Instance, error) {
407414
return instance, fmt.Errorf("instance is not a replica: %+v", instanceKey)
408415
}
409416

410-
_, err = ExecInstance(instanceKey, instance.QSP.stop_slave())
417+
// For multi-source instances with no explicit channel, use the managed channel
418+
effectiveChannel := channelName
419+
if effectiveChannel == "" && len(instance.ReplicationChannels) > 1 {
420+
effectiveChannel = instance.ManagedChannelName
421+
}
422+
423+
_, err = ExecInstance(instanceKey, instance.QSP.StopReplicaForChannel(effectiveChannel))
411424
if err != nil {
412425
// Patch; current MaxScale behavior for STOP SLAVE is to throw an error if replica already stopped.
413426
if instance.isMaxScale() && err.Error() == "Error 1199: Slave connection is not running" {
@@ -419,7 +432,7 @@ func StopReplication(instanceKey *InstanceKey) (*Instance, error) {
419432
}
420433
instance, err = ReadTopologyInstance(instanceKey)
421434

422-
log.Infof("Stopped replication on %+v, Self:%+v, Exec:%+v", *instanceKey, instance.SelfBinlogCoordinates, instance.ExecBinlogCoordinates)
435+
log.Infof("Stopped replication on %+v (channel %q), Self:%+v, Exec:%+v", *instanceKey, effectiveChannel, instance.SelfBinlogCoordinates, instance.ExecBinlogCoordinates)
423436
return instance, err
424437
}
425438

@@ -448,6 +461,13 @@ func waitForReplicationState(instance *Instance, instanceKey *InstanceKey, expec
448461

449462
// StartReplication starts replication on a given instance.
450463
func StartReplication(instanceKey *InstanceKey) (*Instance, error) {
464+
return StartReplicationForChannel(instanceKey, "")
465+
}
466+
467+
// StartReplicationForChannel starts replication on a given instance, optionally for a specific channel.
468+
// When channelName is empty, it starts all replication (backward compatible behavior).
469+
// When channelName is specified, it starts only that channel.
470+
func StartReplicationForChannel(instanceKey *InstanceKey, channelName string) (*Instance, error) {
451471
instance, err := ReadTopologyInstance(instanceKey)
452472
if err != nil {
453473
return instance, log.Errore(err)
@@ -472,11 +492,17 @@ func StartReplication(instanceKey *InstanceKey) (*Instance, error) {
472492
return instance, log.Errore(err)
473493
}
474494

475-
_, err = ExecInstance(instanceKey, instance.QSP.start_slave())
495+
// For multi-source instances with no explicit channel, use the managed channel
496+
effectiveChannel := channelName
497+
if effectiveChannel == "" && len(instance.ReplicationChannels) > 1 {
498+
effectiveChannel = instance.ManagedChannelName
499+
}
500+
501+
_, err = ExecInstance(instanceKey, instance.QSP.StartReplicaForChannel(effectiveChannel))
476502
if err != nil {
477503
return instance, log.Errore(err)
478504
}
479-
log.Infof("Started replication on %+v", instanceKey)
505+
log.Infof("Started replication on %+v (channel %q)", instanceKey, effectiveChannel)
480506

481507
waitForReplicationState(instance, instanceKey, ReplicationThreadStateRunning)
482508

@@ -942,6 +968,13 @@ func workaroundBug83713(instance *Instance, instanceKey *InstanceKey) {
942968

943969
// ChangeMasterTo changes the given instance's master according to given input.
944970
func ChangeMasterTo(instanceKey *InstanceKey, masterKey *InstanceKey, masterBinlogCoordinates *BinlogCoordinates, skipUnresolve bool, gtidHint OperationGTIDHint) (*Instance, error) {
971+
return ChangeMasterToForChannel(instanceKey, masterKey, masterBinlogCoordinates, skipUnresolve, gtidHint, "")
972+
}
973+
974+
// ChangeMasterToForChannel changes the given instance's master for a specific replication channel.
975+
// When channelName is empty, this behaves identically to the original ChangeMasterTo for
976+
// backward compatibility with single-source replication.
977+
func ChangeMasterToForChannel(instanceKey *InstanceKey, masterKey *InstanceKey, masterBinlogCoordinates *BinlogCoordinates, skipUnresolve bool, gtidHint OperationGTIDHint, channelName string) (*Instance, error) {
945978
instance, err := ReadTopologyInstance(instanceKey)
946979
if err != nil {
947980
return instance, log.Errore(err)
@@ -950,7 +983,7 @@ func ChangeMasterTo(instanceKey *InstanceKey, masterKey *InstanceKey, masterBinl
950983
if instance.ReplicationThreadsExist() && !instance.ReplicationThreadsStopped() {
951984
return instance, fmt.Errorf("ChangeMasterTo: Cannot change master on: %+v because replication threads are not stopped", *instanceKey)
952985
}
953-
log.Debugf("ChangeMasterTo: will attempt changing master on %+v to %+v, %+v", *instanceKey, *masterKey, *masterBinlogCoordinates)
986+
log.Debugf("ChangeMasterTo: will attempt changing master on %+v to %+v, %+v (channel %q)", *instanceKey, *masterKey, *masterBinlogCoordinates, channelName)
954987
changeToMasterKey := masterKey
955988
if !skipUnresolve {
956989
unresolvedMasterKey, nameUnresolved, err := UnresolveHostname(masterKey)
@@ -971,20 +1004,23 @@ func ChangeMasterTo(instanceKey *InstanceKey, masterKey *InstanceKey, masterBinl
9711004
originalMasterKey := instance.MasterKey
9721005
originalExecBinlogCoordinates := instance.ExecBinlogCoordinates
9731006

1007+
// Build the FOR CHANNEL suffix for multi-source replication
1008+
channelClause := forChannelClause(channelName)
1009+
9741010
var changeMasterFunc func() error
9751011
changedViaGTID := false
9761012
if instance.UsingMariaDBGTID && gtidHint != GTIDHintDeny {
9771013
// Keep on using GTID
9781014
changeMasterFunc = func() error {
979-
_, err := ExecInstance(instanceKey, instance.QSP.change_master_to_master_host_port(),
1015+
_, err := ExecInstance(instanceKey, instance.QSP.change_master_to_master_host_port()+channelClause,
9801016
changeToMasterKey.Hostname, changeToMasterKey.Port)
9811017
return err
9821018
}
9831019
changedViaGTID = true
9841020
} else if instance.UsingMariaDBGTID && gtidHint == GTIDHintDeny {
9851021
// Make sure to not use GTID
9861022
changeMasterFunc = func() error {
987-
_, err = ExecInstance(instanceKey, instance.QSP.change_master_to_master_host_port_log_gtid_no(),
1023+
_, err = ExecInstance(instanceKey, instance.QSP.change_master_to_master_host_port_log_gtid_no()+channelClause,
9881024
changeToMasterKey.Hostname, changeToMasterKey.Port, masterBinlogCoordinates.LogFile, masterBinlogCoordinates.LogPos)
9891025
return err
9901026
}
@@ -1001,38 +1037,38 @@ func ChangeMasterTo(instanceKey *InstanceKey, masterKey *InstanceKey, masterBinl
10011037
mariadbGTIDHint = "current_pos"
10021038
}
10031039
changeMasterFunc = func() error {
1004-
_, err = ExecInstance(instanceKey, fmt.Sprintf("change master to master_host=?, master_port=?, master_use_gtid=%s", mariadbGTIDHint),
1040+
_, err = ExecInstance(instanceKey, fmt.Sprintf("change master to master_host=?, master_port=?, master_use_gtid=%s", mariadbGTIDHint)+channelClause,
10051041
changeToMasterKey.Hostname, changeToMasterKey.Port)
10061042
return err
10071043
}
10081044
changedViaGTID = true
10091045
} else if instance.UsingOracleGTID && gtidHint != GTIDHintDeny {
10101046
// Is Oracle; already uses GTID; keep using it.
10111047
changeMasterFunc = func() error {
1012-
_, err = ExecInstance(instanceKey, instance.QSP.change_master_to_master_host_port(),
1048+
_, err = ExecInstance(instanceKey, instance.QSP.change_master_to_master_host_port()+channelClause,
10131049
changeToMasterKey.Hostname, changeToMasterKey.Port)
10141050
return err
10151051
}
10161052
changedViaGTID = true
10171053
} else if instance.UsingOracleGTID && gtidHint == GTIDHintDeny {
10181054
// Is Oracle; already uses GTID
10191055
changeMasterFunc = func() error {
1020-
_, err = ExecInstance(instanceKey, instance.QSP.change_master_to_master_host_port_log_autoposition_no(),
1056+
_, err = ExecInstance(instanceKey, instance.QSP.change_master_to_master_host_port_log_autoposition_no()+channelClause,
10211057
changeToMasterKey.Hostname, changeToMasterKey.Port, masterBinlogCoordinates.LogFile, masterBinlogCoordinates.LogPos)
10221058
return err
10231059
}
10241060
} else if instance.SupportsOracleGTID && gtidHint == GTIDHintForce {
10251061
// Is Oracle; not using GTID right now; turn into GTID
10261062
changeMasterFunc = func() error {
1027-
_, err = ExecInstance(instanceKey, instance.QSP.change_master_to_master_host_port_autoposition_yes(),
1063+
_, err = ExecInstance(instanceKey, instance.QSP.change_master_to_master_host_port_autoposition_yes()+channelClause,
10281064
changeToMasterKey.Hostname, changeToMasterKey.Port)
10291065
return err
10301066
}
10311067
changedViaGTID = true
10321068
} else {
10331069
// Normal binlog file:pos
10341070
changeMasterFunc = func() error {
1035-
_, err = ExecInstance(instanceKey, instance.QSP.change_master_to_master_host_port_log(),
1071+
_, err = ExecInstance(instanceKey, instance.QSP.change_master_to_master_host_port_log()+channelClause,
10361072
changeToMasterKey.Hostname, changeToMasterKey.Port, masterBinlogCoordinates.LogFile, masterBinlogCoordinates.LogPos)
10371073
return err
10381074
}
@@ -1049,7 +1085,7 @@ func ChangeMasterTo(instanceKey *InstanceKey, masterKey *InstanceKey, masterBinl
10491085
WriteMasterPositionEquivalence(&originalMasterKey, &originalExecBinlogCoordinates, changeToMasterKey, masterBinlogCoordinates)
10501086
ResetInstanceRelaylogCoordinatesHistory(instanceKey)
10511087

1052-
log.Infof("ChangeMasterTo: Changed master on %+v to: %+v, %+v. GTID: %+v", *instanceKey, masterKey, masterBinlogCoordinates, changedViaGTID)
1088+
log.Infof("ChangeMasterTo: Changed master on %+v to: %+v, %+v. GTID: %+v, Channel: %q", *instanceKey, masterKey, masterBinlogCoordinates, changedViaGTID, channelName)
10531089

10541090
instance, err = ReadTopologyInstance(instanceKey)
10551091
return instance, err

go/inst/query_string_provider.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,3 +391,53 @@ func GetQueryStringProvider(version string) QueryStringProvider {
391391

392392
return queryStringProvider80
393393
}
394+
395+
// forChannelClause returns the "FOR CHANNEL 'name'" SQL suffix for channel-aware commands.
396+
// When channelName is empty (the default channel), it returns an empty string for
397+
// backward compatibility with single-source replication.
398+
func forChannelClause(channelName string) string {
399+
if channelName == "" {
400+
return ""
401+
}
402+
return " FOR CHANNEL '" + channelName + "'"
403+
}
404+
405+
// StopReplicaForChannel returns the stop slave/replica SQL with an optional channel clause.
406+
func (qps *QueryStringProvider) StopReplicaForChannel(channelName string) string {
407+
return qps.stop_slave() + forChannelClause(channelName)
408+
}
409+
410+
// StartReplicaForChannel returns the start slave/replica SQL with an optional channel clause.
411+
func (qps *QueryStringProvider) StartReplicaForChannel(channelName string) string {
412+
return qps.start_slave() + forChannelClause(channelName)
413+
}
414+
415+
// ResetReplicaForChannel returns the reset slave/replica SQL with an optional channel clause.
416+
func (qps *QueryStringProvider) ResetReplicaForChannel(channelName string) string {
417+
return qps.reset_slave() + forChannelClause(channelName)
418+
}
419+
420+
// ResetReplicaAllForChannel returns the reset slave all SQL with an optional channel clause.
421+
func (qps *QueryStringProvider) ResetReplicaAllForChannel(channelName string) string {
422+
return qps.reset_slave_50603_all() + forChannelClause(channelName)
423+
}
424+
425+
// StopReplicaIOThreadForChannel returns the stop IO thread SQL with an optional channel clause.
426+
func (qps *QueryStringProvider) StopReplicaIOThreadForChannel(channelName string) string {
427+
return qps.stop_slave_io_thread() + forChannelClause(channelName)
428+
}
429+
430+
// StopReplicaSQLThreadForChannel returns the stop SQL thread SQL with an optional channel clause.
431+
func (qps *QueryStringProvider) StopReplicaSQLThreadForChannel(channelName string) string {
432+
return qps.stop_slave_sql_thread() + forChannelClause(channelName)
433+
}
434+
435+
// StartReplicaSQLThreadForChannel returns the start SQL thread SQL with an optional channel clause.
436+
func (qps *QueryStringProvider) StartReplicaSQLThreadForChannel(channelName string) string {
437+
return qps.start_slave_sql_thread() + forChannelClause(channelName)
438+
}
439+
440+
// StartReplicaIOThreadForChannel returns the start IO thread SQL with an optional channel clause.
441+
func (qps *QueryStringProvider) StartReplicaIOThreadForChannel(channelName string) string {
442+
return qps.start_slave_io_thread() + forChannelClause(channelName)
443+
}

0 commit comments

Comments
 (0)