From 926424432d744761bd113638da2c872952e530ee Mon Sep 17 00:00:00 2001 From: meiji163 Date: Mon, 26 Feb 2024 20:59:21 -0800 Subject: [PATCH 1/3] add service URL for MySQL backend --- pkg/config/config.go | 4 ++++ pkg/group/mysql.go | 29 +++++++++++++++++++++-------- pkg/throttle/throttler.go | 6 +++--- 3 files changed, 28 insertions(+), 11 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 8a54fff..ed15a52 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -97,6 +97,7 @@ type ConfigurationSettings struct { Environment string Domain string ShareDomain string + ServiceURL string RaftBind string RaftDataDir string DefaultRaftPort int // if a RaftNodes entry does not specify port, use this one @@ -158,6 +159,9 @@ func (settings *ConfigurationSettings) postReadAdjustments() error { if submatch := envVariableRegexp.FindStringSubmatch(settings.ShareDomain); len(submatch) > 1 { settings.ShareDomain = os.Getenv(submatch[1]) } + if submatch := envVariableRegexp.FindStringSubmatch(settings.ServiceURL); len(submatch) > 1 { + settings.ServiceURL = os.Getenv(submatch[1]) + } if settings.RaftDataDir == "" && settings.BackendMySQLHost == "" { return fmt.Errorf("Either RaftDataDir or BackendMySQLHost must be set") } diff --git a/pkg/group/mysql.go b/pkg/group/mysql.go index a2fab61..4388aa0 100644 --- a/pkg/group/mysql.go +++ b/pkg/group/mysql.go @@ -6,6 +6,7 @@ CREATE TABLE service_election ( domain varchar(32) NOT NULL, share_domain varchar(32) NOT NULL, + service_url varchar(64) NOT NULL DEFAULT '', service_id varchar(128) NOT NULL, last_seen_active timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (domain), @@ -61,6 +62,7 @@ type MySQLBackend struct { domain string shareDomain string serviceId string + serviceUrl string leaderState int64 healthState int64 throttler *throttle.Throttler @@ -94,12 +96,14 @@ func NewMySQLBackend(throttler *throttle.Throttler) (*MySQLBackend, error) { domain = fmt.Sprintf("%s:%s", config.Settings().DataCenter, config.Settings().Environment) } shareDomain := config.Settings().ShareDomain + serviceUrl := config.Settings().ServiceURL serviceId := fmt.Sprintf("%s:%d", hostname, config.Settings().ListenPort) backend := &MySQLBackend{ db: db, domain: domain, shareDomain: shareDomain, serviceId: serviceId, + serviceUrl: serviceUrl, throttler: throttler, } go backend.continuousOperations() @@ -261,15 +265,16 @@ func (backend *MySQLBackend) GetHealthyDomainServices() (services []string, err func (backend *MySQLBackend) AttemptLeadership() error { query := ` insert ignore into service_election ( - domain, share_domain, service_id, last_seen_active + domain, share_domain, service_id, service_url, last_seen_active ) values ( - ?, ?, ?, now() + ?, ?, ?, ?, now() ) on duplicate key update service_id = if(last_seen_active < now() - interval ? second, values(service_id), service_id), + service_url = if(service_id = values(service_id), values(service_url), service_url), share_domain = if(service_id = values(service_id), values(share_domain), share_domain), last_seen_active = if(service_id = values(service_id), values(last_seen_active), last_seen_active) ` - args := sqlutils.Args(backend.domain, backend.shareDomain, backend.serviceId, electionExpireSeconds) + args := sqlutils.Args(backend.domain, backend.shareDomain, backend.serviceId, backend.serviceUrl, electionExpireSeconds) _, err := sqlutils.ExecNoPrepare(backend.db, query, args...) return err } @@ -277,12 +282,12 @@ func (backend *MySQLBackend) AttemptLeadership() error { func (backend *MySQLBackend) ForceLeadership() error { query := ` replace into service_election ( - domain, service_id, last_seen_active + domain, service_id, service_url, last_seen_active ) values ( - ?, ?, now() + ?, ?, ?, now() ) ` - args := sqlutils.Args(backend.domain, backend.serviceId) + args := sqlutils.Args(backend.domain, backend.serviceId, backend.serviceUrl) _, err := sqlutils.ExecNoPrepare(backend.db, query, args...) return err } @@ -325,7 +330,8 @@ func (backend *MySQLBackend) GetSharedDomainServices() (services map[string]stri query := ` select domain, - service_id + service_id, + service_url from service_election where @@ -335,7 +341,14 @@ func (backend *MySQLBackend) GetSharedDomainServices() (services map[string]stri ` args := sqlutils.Args(backend.shareDomain, electionExpireSeconds, backend.serviceId) err = sqlutils.QueryRowsMap(backend.db, query, func(m sqlutils.RowMap) error { - services[m.GetString("domain")] = m.GetString("service_id") + serviceUrl := m.GetString("service_url") + domain := m.GetString("domain") + if serviceUrl != "" { + services[domain] = serviceUrl + } else { + // fall back to using service_id + services[domain] = m.GetString("service_id") + } return nil }, args...) diff --git a/pkg/throttle/throttler.go b/pkg/throttle/throttler.go index 7da7ca6..1a4f17f 100644 --- a/pkg/throttle/throttler.go +++ b/pkg/throttle/throttler.go @@ -3,7 +3,6 @@ package throttle import ( "encoding/json" "fmt" - "io/ioutil" "math/rand" "net/http" "strings" @@ -23,6 +22,7 @@ import ( "github.com/bradfitz/gomemcache/memcache" metrics "github.com/rcrowley/go-metrics" + "io" ) const leaderCheckInterval = 1 * time.Second @@ -671,7 +671,7 @@ func (throttler *Throttler) collectShareDomainMetricHealth() error { aggregatedMetricHealth := make(base.MetricHealthMap) for _, service := range services { err := func() error { - uri := fmt.Sprintf("http://%s/metrics-health", service) + uri := fmt.Sprintf("https://%s/metrics-health", service) resp, err := throttler.httpClient.Get(uri) if err != nil { @@ -679,7 +679,7 @@ func (throttler *Throttler) collectShareDomainMetricHealth() error { } defer resp.Body.Close() - b, err := ioutil.ReadAll(resp.Body) + b, err := io.ReadAll(resp.Body) if err != nil { return err } From ad46de02ba0923874997b06a37ad8a7e9059848b Mon Sep 17 00:00:00 2001 From: meiji163 Date: Mon, 8 Apr 2024 17:55:43 -0700 Subject: [PATCH 2/3] set IgnoreHostsThreshold in ClusterProbe --- pkg/mysql/mysql_http_check.go | 1 - pkg/throttle/mysql.go | 2 ++ pkg/throttle/throttler.go | 14 ++++++++------ 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/pkg/mysql/mysql_http_check.go b/pkg/mysql/mysql_http_check.go index 12cd2dc..e36d50d 100644 --- a/pkg/mysql/mysql_http_check.go +++ b/pkg/mysql/mysql_http_check.go @@ -40,7 +40,6 @@ func MySQLHttpCheckHashKey(clusterName string, key *InstanceKey) string { } func CheckHttp(clusterName string, probe *Probe) (httpCheckResult *MySQLHttpCheck) { - if probe.HttpCheckPort <= 0 { go func() { metrics.GetOrRegisterCounter("httpcheck.skip", nil).Inc(1) }() return NewMySQLHttpCheck(clusterName, &probe.Key, http.StatusOK) diff --git a/pkg/throttle/mysql.go b/pkg/throttle/mysql.go index 2489b34..82be691 100644 --- a/pkg/throttle/mysql.go +++ b/pkg/throttle/mysql.go @@ -6,6 +6,7 @@ import ( "github.com/github/freno/pkg/base" "github.com/github/freno/pkg/mysql" + "github.com/outbrain/golib/log" ) func aggregateMySQLProbes( @@ -31,6 +32,7 @@ func aggregateMySQLProbes( value, err := instanceMetricResult.Get() if err != nil { + log.Errorf("error getting metric from %s: %v", probe.Key.Hostname, err) if ignoreDialTcpErrors && base.IsDialTcpError(err) { continue } diff --git a/pkg/throttle/throttler.go b/pkg/throttle/throttler.go index 1a4f17f..307356a 100644 --- a/pkg/throttle/throttler.go +++ b/pkg/throttle/throttler.go @@ -365,9 +365,10 @@ func (throttler *Throttler) refreshMySQLInventory() error { } log.Debugf("Read %+v hosts from ProxySQL %s, hostgroup id: %d (%s)", len(servers), dsn, clusterSettings.ProxySQLSettings.HostgroupID, clusterName) clusterProbes := &mysql.ClusterProbes{ - ClusterName: clusterName, - IgnoreHostsCount: clusterSettings.IgnoreHostsCount, - InstanceProbes: mysql.NewProbes(), + ClusterName: clusterName, + IgnoreHostsCount: clusterSettings.IgnoreHostsCount, + IgnoreHostsThreshold: clusterSettings.IgnoreHostsThreshold, + InstanceProbes: mysql.NewProbes(), } for _, server := range servers { key := mysql.InstanceKey{Hostname: server.Host, Port: int(server.Port)} @@ -389,9 +390,10 @@ func (throttler *Throttler) refreshMySQLInventory() error { keyspace, shard, strings.Join(vitess.ParseCells(clusterSettings.VitessSettings), ","), ) clusterProbes := &mysql.ClusterProbes{ - ClusterName: clusterName, - IgnoreHostsCount: clusterSettings.IgnoreHostsCount, - InstanceProbes: mysql.NewProbes(), + ClusterName: clusterName, + IgnoreHostsCount: clusterSettings.IgnoreHostsCount, + IgnoreHostsThreshold: clusterSettings.IgnoreHostsThreshold, + InstanceProbes: mysql.NewProbes(), } for _, tablet := range tablets { key := mysql.InstanceKey{Hostname: tablet.MysqlHostname, Port: int(tablet.MysqlPort)} From a3cc2c07cff882ecd6da162f43e00c0cf0e073ed Mon Sep 17 00:00:00 2001 From: meiji163 Date: Tue, 9 Apr 2024 08:11:26 -0700 Subject: [PATCH 3/3] remove log Error --- pkg/throttle/mysql.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/throttle/mysql.go b/pkg/throttle/mysql.go index 82be691..2489b34 100644 --- a/pkg/throttle/mysql.go +++ b/pkg/throttle/mysql.go @@ -6,7 +6,6 @@ import ( "github.com/github/freno/pkg/base" "github.com/github/freno/pkg/mysql" - "github.com/outbrain/golib/log" ) func aggregateMySQLProbes( @@ -32,7 +31,6 @@ func aggregateMySQLProbes( value, err := instanceMetricResult.Get() if err != nil { - log.Errorf("error getting metric from %s: %v", probe.Key.Hostname, err) if ignoreDialTcpErrors && base.IsDialTcpError(err) { continue }