Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[-] fix non-Latin field name #575

Merged
merged 1 commit into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 31 additions & 31 deletions internal/sinks/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts, metri

func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts, metricDefs *metrics.Metrics) (pgw *PostgresWriter, err error) {
pgw = &PostgresWriter{
сtx: ctx,
ctx: ctx,
metricDefs: metricDefs,
opts: opts,
input: make(chan []metrics.MeasurementEnvelope, cacheLimit),
Expand Down Expand Up @@ -100,7 +100,7 @@ var (
// However, one is able to use any Postgres-compatible database as a storage backend,
// e.g. PGEE, Citus, Greenplum, CockroachDB, etc.
type PostgresWriter struct {
сtx context.Context
ctx context.Context
sinkDb db.PgxPoolIface
metricSchema DbStorageSchemaType
metricDefs *metrics.Metrics
Expand Down Expand Up @@ -133,7 +133,7 @@ func (pgw *PostgresWriter) ReadMetricSchemaType() (err error) {
var isTs bool
pgw.metricSchema = DbStorageSchemaPostgres
sqlSchemaType := `SELECT schema_type = 'timescale' FROM admin.storage_schema_type`
if err = pgw.sinkDb.QueryRow(pgw.сtx, sqlSchemaType).Scan(&isTs); err == nil && isTs {
if err = pgw.sinkDb.QueryRow(pgw.ctx, sqlSchemaType).Scan(&isTs); err == nil && isTs {
pgw.metricSchema = DbStorageSchemaTimescale
}
return
Expand Down Expand Up @@ -174,14 +174,14 @@ func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error) {

// EnsureMetricDummy creates an empty table for a metric measurements if it doesn't exist
func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error) {
_, err = pgw.sinkDb.Exec(pgw.сtx, "select admin.ensure_dummy_metrics_table($1)", metric)
_, err = pgw.sinkDb.Exec(pgw.ctx, "select admin.ensure_dummy_metrics_table($1)", metric)
return
}

// Write send the measurements to the cache channel
func (pgw *PostgresWriter) Write(msgs []metrics.MeasurementEnvelope) error {
if pgw.сtx.Err() != nil {
return pgw.сtx.Err()
if pgw.ctx.Err() != nil {
return pgw.ctx.Err()
}
select {
case pgw.input <- msgs:
Expand All @@ -204,7 +204,7 @@ func (pgw *PostgresWriter) poll() {
tick := time.NewTicker(cacheTimeout)
for {
select {
case <-pgw.сtx.Done(): //check context with high priority
case <-pgw.ctx.Done(): //check context with high priority
return
default:
select {
Expand All @@ -220,7 +220,7 @@ func (pgw *PostgresWriter) poll() {
case <-tick.C:
pgw.flush(cache)
cache = cache[:0]
case <-pgw.сtx.Done():
case <-pgw.ctx.Done():
return
}
}
Expand All @@ -232,7 +232,7 @@ func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) {
if len(msgs) == 0 {
return
}
logger := log.GetLogger(pgw.сtx).
logger := log.GetLogger(pgw.ctx).
WithField("sink", "postgres").
WithField("db", pgw.sinkDb.Config().ConnConfig.Database)
tsWarningPrinted := false
Expand Down Expand Up @@ -402,7 +402,7 @@ func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) {

// EnsureMetricTime creates special partitions if Timescale used for realtime metrics
func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPartitionInfo, force bool) error {
logger := log.GetLogger(pgw.сtx)
logger := log.GetLogger(pgw.ctx)
sqlEnsure := `select * from admin.ensure_partition_metric_time($1, $2)`
for metric, pb := range pgPartBounds {
if !strings.HasSuffix(metric, "_realtime") {
Expand All @@ -414,15 +414,15 @@ func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPart

partInfo, ok := partitionMapMetric[metric]
if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
err := pgw.sinkDb.QueryRow(pgw.сtx, sqlEnsure, metric, pb.StartTime).Scan(&partInfo)
err := pgw.sinkDb.QueryRow(pgw.ctx, sqlEnsure, metric, pb.StartTime).Scan(&partInfo)
if err != nil {
logger.Error("Failed to create partition on 'metrics':", err)
return err
}
partitionMapMetric[metric] = partInfo
}
if pb.EndTime.After(partInfo.EndTime) || force {
err := pgw.sinkDb.QueryRow(pgw.сtx, sqlEnsure, metric, pb.EndTime).Scan(&partInfo.EndTime)
err := pgw.sinkDb.QueryRow(pgw.ctx, sqlEnsure, metric, pb.EndTime).Scan(&partInfo.EndTime)
if err != nil {
logger.Error("Failed to create partition on 'metrics':", err)
return err
Expand All @@ -434,14 +434,14 @@ func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPart
}

func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo, force bool) (err error) {
logger := log.GetLogger(pgw.сtx)
logger := log.GetLogger(pgw.ctx)
sqlEnsure := `select * from admin.ensure_partition_timescale($1)`
for metric := range pgPartBounds {
if strings.HasSuffix(metric, "_realtime") {
continue
}
if _, ok := partitionMapMetric[metric]; !ok {
if _, err = pgw.sinkDb.Exec(pgw.сtx, sqlEnsure, metric); err != nil {
if _, err = pgw.sinkDb.Exec(pgw.ctx, sqlEnsure, metric); err != nil {
logger.Errorf("Failed to create a TimescaleDB table for metric '%s': %v", metric, err)
return err
}
Expand All @@ -466,7 +466,7 @@ func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[str
}
partInfo, ok := partitionMapMetricDbname[metric][dbname]
if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
if rows, err = pgw.sinkDb.Query(pgw.сtx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
return
}
if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
Expand All @@ -475,7 +475,7 @@ func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[str
partitionMapMetricDbname[metric][dbname] = partInfo
}
if pb.EndTime.After(partInfo.EndTime) || pb.EndTime.Equal(partInfo.EndTime) || force {
if rows, err = pgw.sinkDb.Query(pgw.сtx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
return
}
if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
Expand All @@ -494,9 +494,9 @@ func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration) {
if metricAgeDaysThreshold <= 0 {
return
}
logger := log.GetLogger(pgw.сtx)
logger := log.GetLogger(pgw.ctx)
select {
case <-pgw.сtx.Done():
case <-pgw.ctx.Done():
return
case <-time.After(delay):
// to reduce distracting log messages at startup
Expand All @@ -523,7 +523,7 @@ func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration) {
sqlDropTable := `DROP TABLE IF EXISTS ` + toDrop
logger.Debugf("Dropping old metric data partition: %s", toDrop)

if _, err := pgw.sinkDb.Exec(pgw.сtx, sqlDropTable); err != nil {
if _, err := pgw.sinkDb.Exec(pgw.ctx, sqlDropTable); err != nil {
logger.Errorf("Failed to drop old partition %s from Postgres metrics DB: %w", toDrop, err)
time.Sleep(time.Second * 300)
} else {
Expand All @@ -535,7 +535,7 @@ func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration) {
}
}
select {
case <-pgw.сtx.Done():
case <-pgw.ctx.Done():
return
case <-time.After(time.Hour * 12):
}
Expand All @@ -545,7 +545,7 @@ func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration) {
// maintainUniqueSources is a background task that maintains a listing of unique sources for each metric.
// This is used to avoid listing the same source multiple times in Grafana dropdowns.
func (pgw *PostgresWriter) maintainUniqueSources() {
logger := log.GetLogger(pgw.сtx)
logger := log.GetLogger(pgw.ctx)
// due to metrics deletion the listing can go out of sync (a trigger not really wanted)
sqlGetAdvisoryLock := `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock` // 1571543679778230000 is just a random bigint
sqlTopLevelMetrics := `SELECT table_name FROM admin.get_top_level_metric_tables()`
Expand All @@ -564,13 +564,13 @@ func (pgw *PostgresWriter) maintainUniqueSources() {

for {
select {
case <-pgw.сtx.Done():
case <-pgw.ctx.Done():
return
case <-time.After(time.Hour * 24):
}
var lock bool
logger.Infof("Trying to get metricsDb listing maintainer advisory lock...") // to only have one "maintainer" in case of a "push" setup, as can get costly
if err := pgw.sinkDb.QueryRow(pgw.сtx, sqlGetAdvisoryLock).Scan(&lock); err != nil {
if err := pgw.sinkDb.QueryRow(pgw.ctx, sqlGetAdvisoryLock).Scan(&lock); err != nil {
logger.Error("Getting metricsDb listing maintainer advisory lock failed:", err)
continue
}
Expand All @@ -580,7 +580,7 @@ func (pgw *PostgresWriter) maintainUniqueSources() {
}

logger.Info("Refreshing admin.all_distinct_dbname_metrics listing table...")
rows, _ := pgw.sinkDb.Query(pgw.сtx, sqlTopLevelMetrics)
rows, _ := pgw.sinkDb.Query(pgw.ctx, sqlTopLevelMetrics)
allDistinctMetricTables, err := pgx.CollectRows(rows, pgx.RowTo[string])
if err != nil {
logger.Error(err)
Expand All @@ -593,7 +593,7 @@ func (pgw *PostgresWriter) maintainUniqueSources() {
metricName := strings.Replace(tableName, "public.", "", 1)

logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName)
rows, _ := pgw.sinkDb.Query(pgw.сtx, fmt.Sprintf(sqlDistinct, tableName, tableName))
rows, _ := pgw.sinkDb.Query(pgw.ctx, fmt.Sprintf(sqlDistinct, tableName, tableName))
ret, err := pgx.CollectRows(rows, pgx.RowTo[string])
// ret, err := DBExecRead(mainContext, metricDb, fmt.Sprintf(sqlDistinct, tableName, tableName))
if err != nil {
Expand All @@ -611,19 +611,19 @@ func (pgw *PostgresWriter) maintainUniqueSources() {
if len(foundDbnamesArr) == 0 { // delete all entries for given metric
logger.Debugf("Deleting Postgres all_distinct_dbname_metrics listing table entries for metric '%s':", metricName)

_, err = pgw.sinkDb.Exec(pgw.сtx, sqlDeleteAll, metricName)
_, err = pgw.sinkDb.Exec(pgw.ctx, sqlDeleteAll, metricName)
if err != nil {
logger.Errorf("Could not delete Postgres all_distinct_dbname_metrics listing table entries for metric '%s': %s", metricName, err)
}
continue
}
cmdTag, err := pgw.sinkDb.Exec(pgw.сtx, sqlDelete, foundDbnamesArr, metricName)
cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDelete, foundDbnamesArr, metricName)
if err != nil {
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
} else if cmdTag.RowsAffected() > 0 {
logger.Infof("Removed %d stale entries from all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
}
cmdTag, err = pgw.sinkDb.Exec(pgw.сtx, sqlAdd, foundDbnamesArr, metricName)
cmdTag, err = pgw.sinkDb.Exec(pgw.ctx, sqlAdd, foundDbnamesArr, metricName)
if err != nil {
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
} else if cmdTag.RowsAffected() > 0 {
Expand All @@ -637,13 +637,13 @@ func (pgw *PostgresWriter) maintainUniqueSources() {

func (pgw *PostgresWriter) DropOldTimePartitions(metricAgeDaysThreshold int) (res int, err error) {
sqlOldPart := `select admin.drop_old_time_partitions($1, $2)`
err = pgw.sinkDb.QueryRow(pgw.сtx, sqlOldPart, metricAgeDaysThreshold, false).Scan(&res)
err = pgw.sinkDb.QueryRow(pgw.ctx, sqlOldPart, metricAgeDaysThreshold, false).Scan(&res)
return
}

func (pgw *PostgresWriter) GetOldTimePartitions(metricAgeDaysThreshold int) ([]string, error) {
sqlGetOldParts := `select admin.get_old_time_partitions($1)`
rows, err := pgw.sinkDb.Query(pgw.сtx, sqlGetOldParts, metricAgeDaysThreshold)
rows, err := pgw.sinkDb.Query(pgw.ctx, sqlGetOldParts, metricAgeDaysThreshold)
if err == nil {
return pgx.CollectRows(rows, pgx.RowTo[string])
}
Expand All @@ -656,6 +656,6 @@ func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric stri
where not exists (
select * from admin.all_distinct_dbname_metrics where dbname = $1 and metric = $2
)`
_, err := pgw.sinkDb.Exec(pgw.сtx, sql, dbUnique, metric)
_, err := pgw.sinkDb.Exec(pgw.ctx, sql, dbUnique, metric)
return err
}
6 changes: 3 additions & 3 deletions internal/sinks/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestReadMetricSchemaType(t *testing.T) {
assert.NoError(t, err)

pgw := PostgresWriter{
сtx: ctx,
ctx: ctx,
sinkDb: conn,
}

Expand Down Expand Up @@ -55,7 +55,7 @@ func TestSyncMetric(t *testing.T) {
conn, err := pgxmock.NewPool()
assert.NoError(t, err)
pgw := PostgresWriter{
сtx: ctx,
ctx: ctx,
sinkDb: conn,
}
dbUnique := "mydb"
Expand All @@ -77,7 +77,7 @@ func TestWrite(t *testing.T) {
assert.NoError(t, err)
ctx, cancel := context.WithCancel(ctx)
pgw := PostgresWriter{
сtx: ctx,
ctx: ctx,
sinkDb: conn,
}
messages := []metrics.MeasurementEnvelope{
Expand Down
Loading