From fb5fe6fd295e457008f24a32203f1fa5dec64ccd Mon Sep 17 00:00:00 2001 From: Pavlo Golub Date: Mon, 28 Oct 2024 16:09:17 +0100 Subject: [PATCH] [-] fix non-Latin field name --- internal/sinks/postgres.go | 62 ++++++++++++++++----------------- internal/sinks/postgres_test.go | 6 ++-- 2 files changed, 34 insertions(+), 34 deletions(-) diff --git a/internal/sinks/postgres.go b/internal/sinks/postgres.go index 99460cefc..71d89262a 100644 --- a/internal/sinks/postgres.go +++ b/internal/sinks/postgres.go @@ -32,7 +32,7 @@ func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts, metri func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts, metricDefs *metrics.Metrics) (pgw *PostgresWriter, err error) { pgw = &PostgresWriter{ - сtx: ctx, + ctx: ctx, metricDefs: metricDefs, opts: opts, input: make(chan []metrics.MeasurementEnvelope, cacheLimit), @@ -100,7 +100,7 @@ var ( // However, one is able to use any Postgres-compatible database as a storage backend, // e.g. PGEE, Citus, Greenplum, CockroachDB, etc. type PostgresWriter struct { - сtx context.Context + ctx context.Context sinkDb db.PgxPoolIface metricSchema DbStorageSchemaType metricDefs *metrics.Metrics @@ -133,7 +133,7 @@ func (pgw *PostgresWriter) ReadMetricSchemaType() (err error) { var isTs bool pgw.metricSchema = DbStorageSchemaPostgres sqlSchemaType := `SELECT schema_type = 'timescale' FROM admin.storage_schema_type` - if err = pgw.sinkDb.QueryRow(pgw.сtx, sqlSchemaType).Scan(&isTs); err == nil && isTs { + if err = pgw.sinkDb.QueryRow(pgw.ctx, sqlSchemaType).Scan(&isTs); err == nil && isTs { pgw.metricSchema = DbStorageSchemaTimescale } return @@ -174,14 +174,14 @@ func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error) { // EnsureMetricDummy creates an empty table for a metric measurements if it doesn't exist func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error) { - _, err = pgw.sinkDb.Exec(pgw.сtx, "select admin.ensure_dummy_metrics_table($1)", metric) + _, err = pgw.sinkDb.Exec(pgw.ctx, "select admin.ensure_dummy_metrics_table($1)", metric) return } // Write send the measurements to the cache channel func (pgw *PostgresWriter) Write(msgs []metrics.MeasurementEnvelope) error { - if pgw.сtx.Err() != nil { - return pgw.сtx.Err() + if pgw.ctx.Err() != nil { + return pgw.ctx.Err() } select { case pgw.input <- msgs: @@ -204,7 +204,7 @@ func (pgw *PostgresWriter) poll() { tick := time.NewTicker(cacheTimeout) for { select { - case <-pgw.сtx.Done(): //check context with high priority + case <-pgw.ctx.Done(): //check context with high priority return default: select { @@ -220,7 +220,7 @@ func (pgw *PostgresWriter) poll() { case <-tick.C: pgw.flush(cache) cache = cache[:0] - case <-pgw.сtx.Done(): + case <-pgw.ctx.Done(): return } } @@ -232,7 +232,7 @@ func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) { if len(msgs) == 0 { return } - logger := log.GetLogger(pgw.сtx). + logger := log.GetLogger(pgw.ctx). WithField("sink", "postgres"). WithField("db", pgw.sinkDb.Config().ConnConfig.Database) tsWarningPrinted := false @@ -402,7 +402,7 @@ func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) { // EnsureMetricTime creates special partitions if Timescale used for realtime metrics func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPartitionInfo, force bool) error { - logger := log.GetLogger(pgw.сtx) + logger := log.GetLogger(pgw.ctx) sqlEnsure := `select * from admin.ensure_partition_metric_time($1, $2)` for metric, pb := range pgPartBounds { if !strings.HasSuffix(metric, "_realtime") { @@ -414,7 +414,7 @@ func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPart partInfo, ok := partitionMapMetric[metric] if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force { - err := pgw.sinkDb.QueryRow(pgw.сtx, sqlEnsure, metric, pb.StartTime).Scan(&partInfo) + err := pgw.sinkDb.QueryRow(pgw.ctx, sqlEnsure, metric, pb.StartTime).Scan(&partInfo) if err != nil { logger.Error("Failed to create partition on 'metrics':", err) return err @@ -422,7 +422,7 @@ func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPart partitionMapMetric[metric] = partInfo } if pb.EndTime.After(partInfo.EndTime) || force { - err := pgw.sinkDb.QueryRow(pgw.сtx, sqlEnsure, metric, pb.EndTime).Scan(&partInfo.EndTime) + err := pgw.sinkDb.QueryRow(pgw.ctx, sqlEnsure, metric, pb.EndTime).Scan(&partInfo.EndTime) if err != nil { logger.Error("Failed to create partition on 'metrics':", err) return err @@ -434,14 +434,14 @@ func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPart } func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo, force bool) (err error) { - logger := log.GetLogger(pgw.сtx) + logger := log.GetLogger(pgw.ctx) sqlEnsure := `select * from admin.ensure_partition_timescale($1)` for metric := range pgPartBounds { if strings.HasSuffix(metric, "_realtime") { continue } if _, ok := partitionMapMetric[metric]; !ok { - if _, err = pgw.sinkDb.Exec(pgw.сtx, sqlEnsure, metric); err != nil { + if _, err = pgw.sinkDb.Exec(pgw.ctx, sqlEnsure, metric); err != nil { logger.Errorf("Failed to create a TimescaleDB table for metric '%s': %v", metric, err) return err } @@ -466,7 +466,7 @@ func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[str } partInfo, ok := partitionMapMetricDbname[metric][dbname] if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force { - if rows, err = pgw.sinkDb.Query(pgw.сtx, sqlEnsure, metric, dbname, pb.StartTime); err != nil { + if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime); err != nil { return } if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil { @@ -475,7 +475,7 @@ func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[str partitionMapMetricDbname[metric][dbname] = partInfo } if pb.EndTime.After(partInfo.EndTime) || pb.EndTime.Equal(partInfo.EndTime) || force { - if rows, err = pgw.sinkDb.Query(pgw.сtx, sqlEnsure, metric, dbname, pb.StartTime); err != nil { + if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime); err != nil { return } if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil { @@ -494,9 +494,9 @@ func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration) { if metricAgeDaysThreshold <= 0 { return } - logger := log.GetLogger(pgw.сtx) + logger := log.GetLogger(pgw.ctx) select { - case <-pgw.сtx.Done(): + case <-pgw.ctx.Done(): return case <-time.After(delay): // to reduce distracting log messages at startup @@ -523,7 +523,7 @@ func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration) { sqlDropTable := `DROP TABLE IF EXISTS ` + toDrop logger.Debugf("Dropping old metric data partition: %s", toDrop) - if _, err := pgw.sinkDb.Exec(pgw.сtx, sqlDropTable); err != nil { + if _, err := pgw.sinkDb.Exec(pgw.ctx, sqlDropTable); err != nil { logger.Errorf("Failed to drop old partition %s from Postgres metrics DB: %w", toDrop, err) time.Sleep(time.Second * 300) } else { @@ -535,7 +535,7 @@ func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration) { } } select { - case <-pgw.сtx.Done(): + case <-pgw.ctx.Done(): return case <-time.After(time.Hour * 12): } @@ -545,7 +545,7 @@ func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration) { // maintainUniqueSources is a background task that maintains a listing of unique sources for each metric. // This is used to avoid listing the same source multiple times in Grafana dropdowns. func (pgw *PostgresWriter) maintainUniqueSources() { - logger := log.GetLogger(pgw.сtx) + logger := log.GetLogger(pgw.ctx) // due to metrics deletion the listing can go out of sync (a trigger not really wanted) sqlGetAdvisoryLock := `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock` // 1571543679778230000 is just a random bigint sqlTopLevelMetrics := `SELECT table_name FROM admin.get_top_level_metric_tables()` @@ -564,13 +564,13 @@ func (pgw *PostgresWriter) maintainUniqueSources() { for { select { - case <-pgw.сtx.Done(): + case <-pgw.ctx.Done(): return case <-time.After(time.Hour * 24): } var lock bool logger.Infof("Trying to get metricsDb listing maintainer advisory lock...") // to only have one "maintainer" in case of a "push" setup, as can get costly - if err := pgw.sinkDb.QueryRow(pgw.сtx, sqlGetAdvisoryLock).Scan(&lock); err != nil { + if err := pgw.sinkDb.QueryRow(pgw.ctx, sqlGetAdvisoryLock).Scan(&lock); err != nil { logger.Error("Getting metricsDb listing maintainer advisory lock failed:", err) continue } @@ -580,7 +580,7 @@ func (pgw *PostgresWriter) maintainUniqueSources() { } logger.Info("Refreshing admin.all_distinct_dbname_metrics listing table...") - rows, _ := pgw.sinkDb.Query(pgw.сtx, sqlTopLevelMetrics) + rows, _ := pgw.sinkDb.Query(pgw.ctx, sqlTopLevelMetrics) allDistinctMetricTables, err := pgx.CollectRows(rows, pgx.RowTo[string]) if err != nil { logger.Error(err) @@ -593,7 +593,7 @@ func (pgw *PostgresWriter) maintainUniqueSources() { metricName := strings.Replace(tableName, "public.", "", 1) logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName) - rows, _ := pgw.sinkDb.Query(pgw.сtx, fmt.Sprintf(sqlDistinct, tableName, tableName)) + rows, _ := pgw.sinkDb.Query(pgw.ctx, fmt.Sprintf(sqlDistinct, tableName, tableName)) ret, err := pgx.CollectRows(rows, pgx.RowTo[string]) // ret, err := DBExecRead(mainContext, metricDb, fmt.Sprintf(sqlDistinct, tableName, tableName)) if err != nil { @@ -611,19 +611,19 @@ func (pgw *PostgresWriter) maintainUniqueSources() { if len(foundDbnamesArr) == 0 { // delete all entries for given metric logger.Debugf("Deleting Postgres all_distinct_dbname_metrics listing table entries for metric '%s':", metricName) - _, err = pgw.sinkDb.Exec(pgw.сtx, sqlDeleteAll, metricName) + _, err = pgw.sinkDb.Exec(pgw.ctx, sqlDeleteAll, metricName) if err != nil { logger.Errorf("Could not delete Postgres all_distinct_dbname_metrics listing table entries for metric '%s': %s", metricName, err) } continue } - cmdTag, err := pgw.sinkDb.Exec(pgw.сtx, sqlDelete, foundDbnamesArr, metricName) + cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDelete, foundDbnamesArr, metricName) if err != nil { logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err) } else if cmdTag.RowsAffected() > 0 { logger.Infof("Removed %d stale entries from all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName) } - cmdTag, err = pgw.sinkDb.Exec(pgw.сtx, sqlAdd, foundDbnamesArr, metricName) + cmdTag, err = pgw.sinkDb.Exec(pgw.ctx, sqlAdd, foundDbnamesArr, metricName) if err != nil { logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err) } else if cmdTag.RowsAffected() > 0 { @@ -637,13 +637,13 @@ func (pgw *PostgresWriter) maintainUniqueSources() { func (pgw *PostgresWriter) DropOldTimePartitions(metricAgeDaysThreshold int) (res int, err error) { sqlOldPart := `select admin.drop_old_time_partitions($1, $2)` - err = pgw.sinkDb.QueryRow(pgw.сtx, sqlOldPart, metricAgeDaysThreshold, false).Scan(&res) + err = pgw.sinkDb.QueryRow(pgw.ctx, sqlOldPart, metricAgeDaysThreshold, false).Scan(&res) return } func (pgw *PostgresWriter) GetOldTimePartitions(metricAgeDaysThreshold int) ([]string, error) { sqlGetOldParts := `select admin.get_old_time_partitions($1)` - rows, err := pgw.sinkDb.Query(pgw.сtx, sqlGetOldParts, metricAgeDaysThreshold) + rows, err := pgw.sinkDb.Query(pgw.ctx, sqlGetOldParts, metricAgeDaysThreshold) if err == nil { return pgx.CollectRows(rows, pgx.RowTo[string]) } @@ -656,6 +656,6 @@ func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric stri where not exists ( select * from admin.all_distinct_dbname_metrics where dbname = $1 and metric = $2 )` - _, err := pgw.sinkDb.Exec(pgw.сtx, sql, dbUnique, metric) + _, err := pgw.sinkDb.Exec(pgw.ctx, sql, dbUnique, metric) return err } diff --git a/internal/sinks/postgres_test.go b/internal/sinks/postgres_test.go index d5660b719..6f69039d5 100644 --- a/internal/sinks/postgres_test.go +++ b/internal/sinks/postgres_test.go @@ -18,7 +18,7 @@ func TestReadMetricSchemaType(t *testing.T) { assert.NoError(t, err) pgw := PostgresWriter{ - сtx: ctx, + ctx: ctx, sinkDb: conn, } @@ -55,7 +55,7 @@ func TestSyncMetric(t *testing.T) { conn, err := pgxmock.NewPool() assert.NoError(t, err) pgw := PostgresWriter{ - сtx: ctx, + ctx: ctx, sinkDb: conn, } dbUnique := "mydb" @@ -77,7 +77,7 @@ func TestWrite(t *testing.T) { assert.NoError(t, err) ctx, cancel := context.WithCancel(ctx) pgw := PostgresWriter{ - сtx: ctx, + ctx: ctx, sinkDb: conn, } messages := []metrics.MeasurementEnvelope{