Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions go/db/generate_patches.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,4 +622,22 @@ var generateSQLPatches = []string{
database_instance
ADD COLUMN provider_type varchar(20) CHARACTER SET ascii NOT NULL DEFAULT 'mysql' AFTER replication_group_primary_port
`,
// Multi-source replication (named channels) support
`
CREATE TABLE IF NOT EXISTS database_instance_channels (
hostname varchar(128) NOT NULL,
port smallint(5) unsigned NOT NULL,
channel_name varchar(128) NOT NULL,
master_host varchar(128) NOT NULL,
master_port smallint(5) unsigned NOT NULL,
master_uuid varchar(64) NOT NULL DEFAULT '',
replication_io_running tinyint NOT NULL DEFAULT 0,
replication_sql_running tinyint NOT NULL DEFAULT 0,
seconds_behind_master bigint DEFAULT NULL,
last_io_error text NOT NULL,
last_sql_error text NOT NULL,
last_seen timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (hostname, port, channel_name)
)
`,
}
33 changes: 33 additions & 0 deletions go/inst/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,35 @@ import (

const ReasonableDiscoveryLatency = 500 * time.Millisecond

// ChannelStatus represents the replication status of a single named channel.
// In multi-source replication (MySQL 5.7+), each channel has its own IO/SQL thread,
// coordinates, and lag information.
type ChannelStatus struct {
ChannelName string
MasterKey InstanceKey
MasterUUID string
ReplicationIOThreadRunning bool
ReplicationSQLThreadRunning bool
ReplicationIOThreadState ReplicationThreadState
ReplicationSQLThreadState ReplicationThreadState
ReadBinlogCoordinates BinlogCoordinates
ExecBinlogCoordinates BinlogCoordinates
RelaylogCoordinates BinlogCoordinates
SecondsBehindMaster sql.NullInt64
SQLDelay uint
LastSQLError string
LastIOError string
UsingOracleGTID bool
UsingMariaDBGTID bool
HasReplicationFilters bool
HasReplicationCredentials bool
}

// IsGRInternalChannel returns true if this channel is a Group Replication internal channel.
func (cs *ChannelStatus) IsGRInternalChannel() bool {
return cs.ChannelName == "group_replication_applier" || cs.ChannelName == "group_replication_recovery"
}

// Instance represents a database instance, including its current configuration & status.
// It presents important replication configuration and detailed replication status.
type Instance struct {
Expand Down Expand Up @@ -131,6 +160,10 @@ type Instance struct {

LastDiscoveryLatency time.Duration

// Multi-source replication (named channels)
ReplicationChannels []ChannelStatus // All replication channels (empty for single-source)
ManagedChannelName string // The channel orchestrator manages for this instance (empty = default)

seed bool // Means we force this instance to be written to backend, even if it's invalid, empty or forgotten

/* All things Group Replication below */
Expand Down
223 changes: 196 additions & 27 deletions go/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
masterHostnameTmp := ""
var masterPortTmp int
var masterKey *InstanceKey
var replicationChannels []ChannelStatus

if !instanceKey.IsValid() {
latency.Start("backend")
Expand Down Expand Up @@ -525,43 +526,74 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
return err
}

instance.HasReplicationCredentials = (user != "")
instance.ReplicationIOThreadState = ReplicationThreadStateFromStatus(m.GetString(instance.QSP.slave_io_running()))
instance.ReplicationSQLThreadState = ReplicationThreadStateFromStatus(m.GetString(instance.QSP.slave_sql_running()))
instance.ReplicationIOThreadRuning = instance.ReplicationIOThreadState.IsRunning()
// Extract the channel name. MySQL 5.7+ includes Channel_Name in SHOW SLAVE STATUS.
// For older versions or single-source replication, this will be empty string.
channelName := m.GetStringD("Channel_Name", "")

// Build a ChannelStatus for this row
channelStatus := ChannelStatus{
ChannelName: channelName,
MasterUUID: m.GetStringD(instance.QSP.master_uuid(), "No"),
HasReplicationCredentials: (user != ""),
ReplicationIOThreadState: ReplicationThreadStateFromStatus(m.GetString(instance.QSP.slave_io_running())),
ReplicationSQLThreadState: ReplicationThreadStateFromStatus(m.GetString(instance.QSP.slave_sql_running())),
SQLDelay: m.GetUintD("SQL_Delay", 0),
UsingOracleGTID: (m.GetIntD("Auto_Position", 0) == 1),
UsingMariaDBGTID: (m.GetStringD("Using_Gtid", "No") != "No"),
HasReplicationFilters: ((m.GetStringD("Replicate_Do_DB", "") != "") || (m.GetStringD("Replicate_Ignore_DB", "") != "") || (m.GetStringD("Replicate_Do_Table", "") != "") || (m.GetStringD("Replicate_Ignore_Table", "") != "") || (m.GetStringD("Replicate_Wild_Do_Table", "") != "") || (m.GetStringD("Replicate_Wild_Ignore_Table", "") != "")),
LastSQLError: emptyQuotesRegexp.ReplaceAllString(strconv.QuoteToASCII(m.GetString("Last_SQL_Error")), ""),
LastIOError: emptyQuotesRegexp.ReplaceAllString(strconv.QuoteToASCII(m.GetString("Last_IO_Error")), ""),
SecondsBehindMaster: m.GetNullInt64(instance.QSP.seconds_behind_master()),
}
channelStatus.ReplicationIOThreadRunning = channelStatus.ReplicationIOThreadState.IsRunning()
channelStatus.ReplicationSQLThreadRunning = channelStatus.ReplicationSQLThreadState.IsRunning()
channelStatus.ReadBinlogCoordinates.LogFile = m.GetString(instance.QSP.master_log_file())
channelStatus.ReadBinlogCoordinates.LogPos = m.GetInt64(instance.QSP.read_master_log_pos())
channelStatus.ExecBinlogCoordinates.LogFile = m.GetString(instance.QSP.relay_master_log_file())
channelStatus.ExecBinlogCoordinates.LogPos = m.GetInt64(instance.QSP.relay_master_log_position())
channelStatus.RelaylogCoordinates.LogFile = m.GetString("Relay_Log_File")
channelStatus.RelaylogCoordinates.LogPos = m.GetInt64("Relay_Log_Pos")
channelStatus.RelaylogCoordinates.Type = RelayLog

chMasterHost := m.GetString(instance.QSP.master_host())
if isMaxScale110 {
chMasterHost = maxScaleMasterHostname
}
channelStatus.MasterKey = InstanceKey{Hostname: chMasterHost, Port: m.GetInt(instance.QSP.master_port())}

replicationChannels = append(replicationChannels, channelStatus)

// Populate the canonical Instance fields from this row (same as before).
// For single-source, this is the only row. For multi-source, each row overwrites;
// we select the canonical channel below after the loop.
instance.HasReplicationCredentials = channelStatus.HasReplicationCredentials
instance.ReplicationIOThreadState = channelStatus.ReplicationIOThreadState
instance.ReplicationSQLThreadState = channelStatus.ReplicationSQLThreadState
instance.ReplicationIOThreadRuning = channelStatus.ReplicationIOThreadRunning
if isMaxScale110 {
// Covering buggy MaxScale 1.1.0
instance.ReplicationIOThreadRuning = instance.ReplicationIOThreadRuning && (m.GetString(instance.QSP.slave_io_state()) == "Binlog Dump")
}
instance.ReplicationSQLThreadRuning = instance.ReplicationSQLThreadState.IsRunning()
instance.ReadBinlogCoordinates.LogFile = m.GetString(instance.QSP.master_log_file())
instance.ReadBinlogCoordinates.LogPos = m.GetInt64(instance.QSP.read_master_log_pos())
instance.ExecBinlogCoordinates.LogFile = m.GetString(instance.QSP.relay_master_log_file())
instance.ExecBinlogCoordinates.LogPos = m.GetInt64(instance.QSP.relay_master_log_position())
instance.ReplicationSQLThreadRuning = channelStatus.ReplicationSQLThreadRunning
instance.ReadBinlogCoordinates = channelStatus.ReadBinlogCoordinates
instance.ExecBinlogCoordinates = channelStatus.ExecBinlogCoordinates
instance.IsDetached, _ = instance.ExecBinlogCoordinates.ExtractDetachedCoordinates()
instance.RelaylogCoordinates.LogFile = m.GetString("Relay_Log_File")
instance.RelaylogCoordinates.LogPos = m.GetInt64("Relay_Log_Pos")
instance.RelaylogCoordinates.Type = RelayLog
instance.LastSQLError = emptyQuotesRegexp.ReplaceAllString(strconv.QuoteToASCII(m.GetString("Last_SQL_Error")), "")
instance.LastIOError = emptyQuotesRegexp.ReplaceAllString(strconv.QuoteToASCII(m.GetString("Last_IO_Error")), "")
instance.SQLDelay = m.GetUintD("SQL_Delay", 0)
instance.UsingOracleGTID = (m.GetIntD("Auto_Position", 0) == 1)
instance.UsingMariaDBGTID = (m.GetStringD("Using_Gtid", "No") != "No")
instance.MasterUUID = m.GetStringD(instance.QSP.master_uuid(), "No")
instance.HasReplicationFilters = ((m.GetStringD("Replicate_Do_DB", "") != "") || (m.GetStringD("Replicate_Ignore_DB", "") != "") || (m.GetStringD("Replicate_Do_Table", "") != "") || (m.GetStringD("Replicate_Ignore_Table", "") != "") || (m.GetStringD("Replicate_Wild_Do_Table", "") != "") || (m.GetStringD("Replicate_Wild_Ignore_Table", "") != ""))
instance.RelaylogCoordinates = channelStatus.RelaylogCoordinates
instance.LastSQLError = channelStatus.LastSQLError
instance.LastIOError = channelStatus.LastIOError
instance.SQLDelay = channelStatus.SQLDelay
instance.UsingOracleGTID = channelStatus.UsingOracleGTID
instance.UsingMariaDBGTID = channelStatus.UsingMariaDBGTID
instance.MasterUUID = channelStatus.MasterUUID
instance.HasReplicationFilters = channelStatus.HasReplicationFilters

// Remember master hostname:port. Once we update resolve cache below
// we will use it to set instance's members
masterHostnameTmp = m.GetString(instance.QSP.master_host())
if isMaxScale110 {
// Buggy buggy maxscale 1.1.0. Reported Master_Host can be corrupted.
// Therefore we (currently) take @@hostname (which is masquarading as master host anyhow)
masterHostnameTmp = maxScaleMasterHostname
}
masterPortTmp = m.GetInt(instance.QSP.master_port())
masterHostnameTmp = channelStatus.MasterKey.Hostname
masterPortTmp = channelStatus.MasterKey.Port

instance.IsDetachedMaster = instance.MasterKey.IsDetached()
instance.SecondsBehindMaster = m.GetNullInt64(instance.QSP.seconds_behind_master())
instance.SecondsBehindMaster = channelStatus.SecondsBehindMaster
if instance.SecondsBehindMaster.Valid && instance.SecondsBehindMaster.Int64 < 0 {
_ = log.Warningf("Host: %+v, instance.SecondsBehindMaster < 0 [%+v], correcting to 0", instanceKey, instance.SecondsBehindMaster.Int64)
instance.SecondsBehindMaster.Int64 = 0
Expand All @@ -578,6 +610,43 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
goto Cleanup
}

// Store all discovered channels on the instance
instance.ReplicationChannels = replicationChannels

// For multi-source replication, select the canonical channel to represent
// the Instance's master. This ensures backward compatibility: for single-source
// instances, the behavior is identical to before.
if len(replicationChannels) > 1 {
canonicalIdx := selectCanonicalChannelIndex(replicationChannels)
if canonicalIdx >= 0 {
ch := replicationChannels[canonicalIdx]
instance.HasReplicationCredentials = ch.HasReplicationCredentials
instance.ReplicationIOThreadState = ch.ReplicationIOThreadState
instance.ReplicationSQLThreadState = ch.ReplicationSQLThreadState
instance.ReplicationIOThreadRuning = ch.ReplicationIOThreadRunning
instance.ReplicationSQLThreadRuning = ch.ReplicationSQLThreadRunning
instance.ReadBinlogCoordinates = ch.ReadBinlogCoordinates
instance.ExecBinlogCoordinates = ch.ExecBinlogCoordinates
instance.IsDetached, _ = instance.ExecBinlogCoordinates.ExtractDetachedCoordinates()
instance.RelaylogCoordinates = ch.RelaylogCoordinates
instance.LastSQLError = ch.LastSQLError
instance.LastIOError = ch.LastIOError
instance.SQLDelay = ch.SQLDelay
instance.UsingOracleGTID = ch.UsingOracleGTID
instance.UsingMariaDBGTID = ch.UsingMariaDBGTID
instance.MasterUUID = ch.MasterUUID
instance.HasReplicationFilters = ch.HasReplicationFilters
instance.SecondsBehindMaster = ch.SecondsBehindMaster
if instance.SecondsBehindMaster.Valid && instance.SecondsBehindMaster.Int64 < 0 {
instance.SecondsBehindMaster.Int64 = 0
}
instance.ReplicationLagSeconds = instance.SecondsBehindMaster
masterHostnameTmp = ch.MasterKey.Hostname
masterPortTmp = ch.MasterKey.Port
instance.ManagedChannelName = ch.ChannelName
}
}

if !isMaxScale {
// We begin with a few operations we can run concurrently, and which do not depend on anything
{
Expand Down Expand Up @@ -1172,6 +1241,10 @@ Cleanup:
} else {
WriteInstance(instance, instanceFound, err)
}
// Persist multi-source replication channel data
if len(instance.ReplicationChannels) > 0 {
_ = writeInstanceChannels(instance)
}
lastAttemptedCheckTimer.Stop()
latency.Stop("backend")
return instance, instanceDiscoverySkipped, nil
Expand Down Expand Up @@ -1514,6 +1587,9 @@ func readInstanceRow(m sqlutils.RowMap) *Instance {

instance.QSP = GetQueryStringProvider(instance.Version)

// Load multi-source replication channels from the backend
_ = readInstanceChannels(instance)

return instance
}

Expand Down Expand Up @@ -3615,3 +3691,96 @@ func isInjectedPseudoGTID(clusterName string) (injected bool, err error) {
clusterInjectedPseudoGTIDCache.Set(clusterName, injected, cache.DefaultExpiration)
return injected, log.Errore(err)
}

// selectCanonicalChannelIndex picks the canonical replication channel from a list of channels.
// For multi-source replication, this determines which channel orchestrator considers the
// "primary" channel for topology purposes.
// Selection priority:
// 1. The default channel (empty name "")
// 2. The first non-GR-internal channel
// 3. Index 0 as fallback
func selectCanonicalChannelIndex(channels []ChannelStatus) int {
if len(channels) == 0 {
return -1
}
// Prefer the default channel (empty string name)
for i, ch := range channels {
if ch.ChannelName == "" {
return i
}
}
// Skip GR internal channels, pick first non-GR channel
for i, ch := range channels {
if !ch.IsGRInternalChannel() {
return i
}
}
// Fallback
return 0
}

// writeInstanceChannels persists all replication channel data for an instance.
// It deletes existing rows and inserts current channel data.
func writeInstanceChannels(instance *Instance) error {
if len(instance.ReplicationChannels) == 0 {
return nil
}
writeFunc := func() error {
// Delete existing channel rows for this instance
_, err := db.ExecOrchestrator(`
delete from database_instance_channels
where hostname = ? and port = ?`,
instance.Key.Hostname, instance.Key.Port,
)
if err != nil {
return log.Errore(err)
}
// Insert one row per channel
for _, ch := range instance.ReplicationChannels {
_, err := db.ExecOrchestrator(`
insert into database_instance_channels (
hostname, port, channel_name, master_host, master_port,
master_uuid, replication_io_running, replication_sql_running,
seconds_behind_master, last_io_error, last_sql_error, last_seen
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW())`,
instance.Key.Hostname, instance.Key.Port, ch.ChannelName,
ch.MasterKey.Hostname, ch.MasterKey.Port, ch.MasterUUID,
ch.ReplicationIOThreadRunning, ch.ReplicationSQLThreadRunning,
ch.SecondsBehindMaster, ch.LastIOError, ch.LastSQLError,
)
if err != nil {
return log.Errore(err)
}
}
return nil
}
return ExecDBWriteFunc(writeFunc)
}

// readInstanceChannels populates instance.ReplicationChannels from the backend database.
func readInstanceChannels(instance *Instance) error {
query := `
select
channel_name, master_host, master_port, master_uuid,
replication_io_running, replication_sql_running,
seconds_behind_master, last_io_error, last_sql_error
from database_instance_channels
where hostname = ? and port = ?
order by channel_name`
err := db.QueryOrchestrator(query, sqlutils.Args(instance.Key.Hostname, instance.Key.Port), func(m sqlutils.RowMap) error {
ch := ChannelStatus{
ChannelName: m.GetString("channel_name"),
MasterUUID: m.GetString("master_uuid"),
ReplicationIOThreadRunning: m.GetBool("replication_io_running"),
ReplicationSQLThreadRunning: m.GetBool("replication_sql_running"),
SecondsBehindMaster: m.GetNullInt64("seconds_behind_master"),
LastIOError: m.GetString("last_io_error"),
LastSQLError: m.GetString("last_sql_error"),
}
ch.MasterKey.Hostname = m.GetString("master_host")
ch.MasterKey.Port = m.GetInt("master_port")
instance.ReplicationChannels = append(instance.ReplicationChannels, ch)
return nil
})
return err
}
Loading
Loading