From 023af04c7772558eb7b9563fff0a6696f8e57258 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Thu, 25 Apr 2024 14:37:53 -0500 Subject: [PATCH] Add reporting of lag as a metric --- daemon.go | 25 +++++++++++++++++++++---- indexers/base.go | 11 +++++++++-- indexers/contacts.go | 14 ++++++++++++++ 3 files changed, 44 insertions(+), 6 deletions(-) diff --git a/daemon.go b/daemon.go index 1a1097b..e475054 100644 --- a/daemon.go +++ b/daemon.go @@ -1,6 +1,7 @@ package indexer import ( + "context" "database/sql" "log/slog" "sync" @@ -78,6 +79,9 @@ func (d *Daemon) startIndexer(indexer indexers.Indexer) { func (d *Daemon) startStatsReporter(interval time.Duration) { d.wg.Add(1) // add ourselves to the wait group + // calculating lag is more expensive than reading indexer stats so we only do it every 5th iteration + var iterations int64 + go func() { defer func() { slog.Info("analytics exiting") @@ -89,13 +93,19 @@ func (d *Daemon) startStatsReporter(interval time.Duration) { case <-d.quit: return case <-time.After(interval): - d.reportStats() + d.reportStats(iterations%5 == 0) } + + iterations++ } }() } -func (d *Daemon) reportStats() { +func (d *Daemon) reportStats(includeLag bool) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + log := slog.New(slog.Default().Handler()) metrics := make(map[string]float64, len(d.indexers)*2) for _, ix := range d.indexers { @@ -115,9 +125,16 @@ func (d *Daemon) reportStats() { metrics[ix.Name()+"_rate"] = rateInPeriod d.prevStats[ix] = stats - } - log := slog.New(slog.Default().Handler()) + if includeLag { + lag, err := ix.Lag(ctx, d.db) + if err != nil { + log.Error("error calculating lag", "index", ix.Name(), "error", err) + } else { + metrics[ix.Name()+"_lag"] = lag.Seconds() + } + } + } for k, v := range metrics { analytics.Gauge("indexer."+k, v) diff --git a/indexers/base.go b/indexers/base.go index a12e690..c3419d5 100644 --- a/indexers/base.go +++ b/indexers/base.go @@ -1,6 +1,7 @@ package indexers import ( + "context" "database/sql" "encoding/json" "fmt" @@ -31,6 +32,7 @@ type Indexer interface { Name() string Index(db *sql.DB, rebuild, cleanup bool) (string, error) Stats() Stats + Lag(ctx context.Context, db *sql.DB) (time.Duration, error) } // IndexDefinition is what we pass to elastic to create an index, @@ -318,8 +320,13 @@ func (i *baseIndexer) GetLastModified(index string) (time.Time, error) { lastModified := time.Time{} // get the newest document on our index - queryResponse := queryResponse{} - _, err := utils.MakeJSONRequest(http.MethodPost, fmt.Sprintf("%s/%s/_search", i.elasticURL, index), []byte(`{ "sort": [{ "modified_on_mu": "desc" }]}`), &queryResponse) + queryResponse := &queryResponse{} + _, err := utils.MakeJSONRequest( + http.MethodPost, + fmt.Sprintf("%s/%s/_search", i.elasticURL, index), + []byte(`{ "sort": [{ "modified_on_mu": "desc" }], "_source": {"includes": ["modified_on", "id"]}, "size": 1}`), + queryResponse, + ) if err != nil { return lastModified, err } diff --git a/indexers/contacts.go b/indexers/contacts.go index cf7c88b..09349e0 100644 --- a/indexers/contacts.go +++ b/indexers/contacts.go @@ -274,3 +274,17 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st return totalCreated, totalDeleted, nil } + +func (i *ContactIndexer) Lag(ctx context.Context, db *sql.DB) (time.Duration, error) { + esLastModified, err := i.GetLastModified(i.name) + if err != nil { + return 0, errors.Wrap(err, "error getting ES last modified") + } + + var dbLastModified time.Time + if err := db.QueryRowContext(ctx, "SELECT MAX(modified_on) FROM contacts_contact").Scan(&dbLastModified); err != nil { + return 0, errors.Wrap(err, "error getting ES last modified") + } + + return dbLastModified.Sub(esLastModified), nil +}