From 078834623953071d6ef1ca9cdef5dcbc500da0b7 Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Fri, 30 Jun 2017 18:30:31 +0200 Subject: [PATCH 1/3] Code refactoring around postgres versions. Use PG version represented as an integer internally insteead of the float (float comparisons are imprecise). Instead of passing one version of the disk stat collector pass one version per each cluster (since wal directories are per cluster). Minor refactoring. --- pg_view/__init__.py | 30 ++++-------- pg_view/collectors/partition_collector.py | 57 ++++++++++++----------- pg_view/collectors/pg_collector.py | 47 ++++++++++--------- pg_view/models/db_client.py | 23 ++++----- pg_view/utils.py | 49 +++++++++++++++++-- 5 files changed, 117 insertions(+), 89 deletions(-) diff --git a/pg_view/__init__.py b/pg_view/__init__.py index babef24..a394387 100644 --- a/pg_view/__init__.py +++ b/pg_view/__init__.py @@ -55,9 +55,6 @@ def parse_args(): action='store', dest='tick', type='int', default=1) parser.add_option('-o', '--output-method', help='send output to the following source', action='store', default=OUTPUT_METHOD.curses, dest='output_method') - parser.add_option('-V', '--use-version', - help='version of the instance to monitor (in case it can\'t be autodetected)', - action='store', dest='version', type='float') parser.add_option('-l', '--log-file', help='direct log output to the file', action='store', dest='log_file') parser.add_option('-R', '--reset-output', help='clear screen after each tick', action='store_true', default=False, @@ -190,7 +187,6 @@ def main(): clusters = [] config = read_configuration(options.config_file) if options.config_file else None - dbversion = None # configuration file takes priority over the rest of database connection information sources. if config: for instance in config: @@ -221,29 +217,24 @@ def main(): # get all PostgreSQL instances for result_work_dir, data in postmasters.items(): - (ppid, dbversion, dbname) = data - # if user requested a specific database name and version - don't try to connect to others + (ppid, version, dbname) = data + # if user requested a specific database don't try to connect to others if options.instance: if dbname != options.instance or not result_work_dir or not ppid: continue - if options.version is not None and dbversion != options.version: - continue try: conndata = detect_db_connection_arguments( - result_work_dir, ppid, dbversion, options.username, options.dbname) + result_work_dir, ppid, version, options.username, options.dbname) if conndata is None: continue host = conndata['host'] port = conndata['port'] conn = build_connection(host, port, options.username, options.dbname) - pgcon = psycopg2.connect(**conn) + psycopg2.connect(**conn).close() # test if we can connect + desc = make_cluster_desc(name=dbname, version=version, workdir=result_work_dir, conn=conn) + clusters.append(desc) except Exception as e: logger.error('PostgreSQL exception {0}'.format(e)) - pgcon = None - if pgcon: - desc = make_cluster_desc(name=dbname, version=dbversion, workdir=result_work_dir, - pid=ppid, pgcon=pgcon, conn=conn) - clusters.append(desc) collectors = [] groups = {} try: @@ -255,10 +246,7 @@ def main(): # initialize the disks stat collector process and create an exchange queue q = JoinableQueue(1) - work_directories = [cl['wd'] for cl in clusters if 'wd' in cl] - dbversion = dbversion or clusters[0]['ver'] - - collector = DetachedDiskStatCollector(q, work_directories, dbversion) + collector = DetachedDiskStatCollector(q, clusters) collector.start() consumer = DiskCollectorConsumer(q) @@ -266,8 +254,8 @@ def main(): collectors.append(SystemStatCollector()) collectors.append(MemoryStatCollector()) for cl in clusters: - part = PartitionStatCollector(cl['name'], cl['ver'], cl['wd'], consumer) - pg = PgstatCollector(cl['pgcon'], cl['reconnect'], cl['pid'], cl['name'], cl['ver'], options.pid) + part = PartitionStatCollector(cl['name'], cl['version'], cl['wd'], consumer) + pg = PgstatCollector(cl['name'], cl['reconnect'], options.pid) groupname = cl['wd'] groups[groupname] = {'pg': pg, 'partitions': part} collectors.append(part) diff --git a/pg_view/collectors/partition_collector.py b/pg_view/collectors/partition_collector.py index c0802be..851de79 100644 --- a/pg_view/collectors/partition_collector.py +++ b/pg_view/collectors/partition_collector.py @@ -22,10 +22,10 @@ class PartitionStatCollector(StatCollector): XLOG_NAME = 'xlog' BLOCK_SIZE = 1024 - def __init__(self, dbname, dbversion, work_directory, consumer): + def __init__(self, dbname, version, work_directory, consumer): super(PartitionStatCollector, self).__init__(ticks_per_refresh=1) self.dbname = dbname - self.dbver = dbversion + self.version = version self.queue_consumer = consumer self.work_directory = work_directory self.df_list_transformation = [{'out': 'dev', 'in': 0, 'fn': self._dereference_dev_name}, @@ -130,7 +130,7 @@ def __init__(self, dbname, dbversion, work_directory, consumer): self.postinit() def ident(self): - return '{0} ({1}/{2})'.format(super(PartitionStatCollector, self).ident(), self.dbname, self.dbver) + return '{0} ({1}/{2})'.format(super(PartitionStatCollector, self).ident(), self.dbname, self.version) @staticmethod def _dereference_dev_name(devname): @@ -163,7 +163,7 @@ def refresh(self): self._do_refresh([result[PartitionStatCollector.DATA_NAME], result[PartitionStatCollector.XLOG_NAME]]) @staticmethod - def calculate_time_until_full(colname, prev, cur): + def calculate_time_until_full(_, prev, cur): # both should be expressed in common units, guaranteed by BLOCK_SIZE if (cur.get('path_size', 0) > 0 and prev.get('path_size', 0) > 0 and @@ -178,8 +178,8 @@ def get_io_data(pnames): result = {} found = 0 # stop if we found records for all partitions total = len(pnames) + fp = None try: - fp = None fp = open(PartitionStatCollector.DISK_STAT_FILE, 'rU') for l in fp: elements = l.split() @@ -208,22 +208,21 @@ class DetachedDiskStatCollector(Process): OLD_WAL_SUBDIR = '/pg_xlog/' WAL_SUBDIR = '/pg_wal/' - NEW_WAL_SINCE = 10.0 + NEW_WAL_SINCE = 100000 - def __init__(self, q, work_directories, db_version): + def __init__(self, q, clusters): super(DetachedDiskStatCollector, self).__init__() - self.work_directories = work_directories self.q = q self.daemon = True - self.db_version = db_version + self.clusters = clusters self.df_cache = {} - @property - def wal_directory(self): + @staticmethod + def wal_directory(version): """ Since Postgresql 10.0 wal directory was renamed, so we need to choose actual wal directory based on a db_version. """ - if self.db_version < DetachedDiskStatCollector.NEW_WAL_SINCE: + if version < DetachedDiskStatCollector.NEW_WAL_SINCE: return DetachedDiskStatCollector.OLD_WAL_SUBDIR else: return DetachedDiskStatCollector.WAL_SUBDIR @@ -234,32 +233,36 @@ def run(self): self.q.join() result = {} self.df_cache = {} - for wd in self.work_directories: - du_data = self.get_du_data(wd) - df_data = self.get_df_data(wd) - result[wd] = [du_data, df_data] + for cluster in self.clusters: + work_directory = cluster['wd'] + wal_directory = self.wal_directory(cluster['version']) + du_data = self.get_du_data(work_directory, wal_directory) + df_data = self.get_df_data(work_directory, wal_directory) + result[work_directory] = [du_data, df_data] self.q.put(result) time.sleep(consts.TICK_LENGTH) - def get_du_data(self, wd): + def get_du_data(self, work_directory, wal_directory): data_size = 0 xlog_size = 0 result = {'data': [], 'xlog': []} try: - data_size = self.run_du(wd, BLOCK_SIZE) - xlog_size = self.run_du(wd + self.wal_directory, BLOCK_SIZE) + data_size = self.run_du(work_directory, BLOCK_SIZE) + xlog_size = self.run_du(work_directory + wal_directory, BLOCK_SIZE) except Exception as e: logger.error('Unable to read free space information for the pg_xlog and data directories for the directory\ - {0}: {1}'.format(wd, e)) + {0}: {1}'.format(work_directory, e)) else: # XXX: why do we pass the block size there? - result['data'] = str(data_size), wd - result['xlog'] = str(xlog_size), wd + self.wal_directory + result['data'] = str(data_size), work_directory + result['xlog'] = str(xlog_size), work_directory + wal_directory return result @staticmethod - def run_du(pathname, block_size=BLOCK_SIZE, exclude=['lost+found']): + def run_du(pathname, block_size=BLOCK_SIZE, exclude=None): + if exclude == None: + exclude = ["lost+found"] size = 0 folders = [pathname] root_dev = os.lstat(pathname).st_dev @@ -285,13 +288,13 @@ def run_du(pathname, block_size=BLOCK_SIZE, exclude=['lost+found']): size += st.st_size return long(size / block_size) - def get_df_data(self, work_directory): + def get_df_data(self, work_directory, wal_directory): """ Retrive raw data from df (transformations are performed via df_list_transformation) """ result = {'data': [], 'xlog': []} # obtain the device names data_dev = self.get_mounted_device(self.get_mount_point(work_directory)) - xlog_dev = self.get_mounted_device(self.get_mount_point(work_directory + self.wal_directory)) + xlog_dev = self.get_mounted_device(self.get_mount_point(work_directory + wal_directory)) if data_dev not in self.df_cache: data_vfs = os.statvfs(work_directory) self.df_cache[data_dev] = data_vfs @@ -299,7 +302,7 @@ def get_df_data(self, work_directory): data_vfs = self.df_cache[data_dev] if xlog_dev not in self.df_cache: - xlog_vfs = os.statvfs(work_directory + self.wal_directory) + xlog_vfs = os.statvfs(work_directory + wal_directory) self.df_cache[xlog_dev] = xlog_vfs else: xlog_vfs = self.df_cache[xlog_dev] @@ -353,7 +356,7 @@ def get_mounted_device(pathname): @staticmethod def get_mount_point(pathname): """Get the mounlst point of the filesystem containing pathname""" - + mount_point = None pathname = os.path.normcase(os.path.realpath(pathname)) parent_device = path_device = os.stat(pathname).st_dev while parent_device == path_device: diff --git a/pg_view/collectors/pg_collector.py b/pg_view/collectors/pg_collector.py index 70941d4..7043af1 100644 --- a/pg_view/collectors/pg_collector.py +++ b/pg_view/collectors/pg_collector.py @@ -6,7 +6,7 @@ from pg_view.collectors.base_collector import StatCollector from pg_view.loggers import logger from pg_view.models.outputs import COLSTATUS, COLALIGN -from pg_view.utils import MEM_PAGE_SIZE, dbversion_as_float +from pg_view.utils import MEM_PAGE_SIZE if sys.hexversion >= 0x03000000: long = int @@ -20,22 +20,17 @@ class PgstatCollector(StatCollector): STATM_FILENAME = '/proc/{0}/statm' - def __init__(self, pgcon, reconnect, pid, dbname, dbver, always_track_pids): + def __init__(self, dbname, reconnect, always_track_pids): super(PgstatCollector, self).__init__() - self.postmaster_pid = pid - self.pgcon = pgcon - self.reconnect = reconnect + self.reconnect_fn = reconnect self.pids = [] self.rows_diff = [] self.rows_diff_output = [] - # figure out our backend pid - self.connection_pid = pgcon.get_backend_pid() - self.max_connections = self._get_max_connections() + # figure out connection invariants + self.connect_to_postgres() self.recovery_status = self._get_recovery_status() self.always_track_pids = always_track_pids self.dbname = dbname - self.dbver = dbver - self.server_version = pgcon.get_parameter_status('server_version') self.filter_aux_processes = True self.total_connections = 0 self.active_connections = 0 @@ -240,7 +235,7 @@ def idle_format_fn(self, text): if not r: return text else: - if self.dbver >= 9.2: + if self.pgversion >= 90200: return 'idle in transaction for ' + StatCollector.time_pretty_print(int(r.group(1))) else: return 'idle in transaction ' + StatCollector.time_pretty_print(int(r.group(1))) \ @@ -256,7 +251,7 @@ def query_status_fn(self, row, col): return {-1: COLSTATUS.cs_ok} def ident(self): - return '{0} ({1}/{2})'.format('postgres', self.dbname, self.dbver) + return '{0} ({1}/{2})'.format('postgres', self.dbname, self.pgversion) @staticmethod def _get_psinfo(cmdline): @@ -293,6 +288,16 @@ def ncurses_filter_row(self, row): else: return False + def connect_to_postgres(self): + self.pgcon, self.postmaster_pid = self.reconnect_fn() + self._read_connection_invariants(self.pgcon) + + def _read_connection_invariants(self, pgcon): + self.connection_pid = self.pgcon.get_backend_pid() + self.max_connections = self._get_max_connections(pgcon) + self.pgversion = int(self.pgcon.server_version) + self.server_version_as_string = self.pgcon.get_parameter_status('server_version') + def refresh(self): """ Reads data from /proc and PostgreSQL stats """ result = [] @@ -302,11 +307,7 @@ def refresh(self): if not self.pgcon: # if we've lost the connection, try to reconnect and # re-initialize all connection invariants - self.pgcon, self.postmaster_pid = self.reconnect() - self.connection_pid = self.pgcon.get_backend_pid() - self.max_connections = self._get_max_connections() - self.dbver = dbversion_as_float(self.pgcon) - self.server_version = self.pgcon.get_parameter_status('server_version') + self.connect_to_postgres() stat_data = self._read_pg_stat_activity() except psycopg2.OperationalError as e: logger.info("failed to query the server: {}".format(e)) @@ -408,10 +409,10 @@ def _get_memory_usage(self, pid): uss = (long(statm[1]) - long(statm[2])) * MEM_PAGE_SIZE return uss - def _get_max_connections(self): + def _get_max_connections(self, pgcon): """ Read max connections from the database """ - cur = self.pgcon.cursor(cursor_factory=psycopg2.extras.RealDictCursor) + cur = pgcon.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute('show max_connections') result = cur.fetchone() cur.close() @@ -434,7 +435,7 @@ def _read_pg_stat_activity(self): # the pg_stat_activity format has been changed to 9.2, avoiding ambigiuous meanings for some columns. # since it makes more sense then the previous layout, we 'cast' the former versions to 9.2 - if self.dbver < 9.2: + if self.pgversion < 90200: cur.execute(""" SELECT datname, procpid as pid, @@ -486,7 +487,7 @@ def _read_pg_stat_activity(self): WHERE procpid != pg_backend_pid() GROUP BY 1,2,3,4,5,6,7,9 """) - elif self.dbver < 9.6: + elif self.pgversion < 90600: cur.execute(""" SELECT datname, a.pid as pid, @@ -607,7 +608,7 @@ def ncurses_produce_prefix(self): if self.pgcon: return "{dbname} {version} {role} connections: {conns} of {max_conns} allocated, {active_conns} active\n". \ format(dbname=self.dbname, - version=self.server_version, + version=self.server_version_as_string, role=self.recovery_status, conns=self.total_connections, max_conns=self.max_connections, @@ -615,7 +616,7 @@ def ncurses_produce_prefix(self): else: return "{dbname} {version} (offline)\n". \ format(dbname=self.dbname, - version=self.server_version) + version=self.server_version_as_string) @staticmethod def process_sort_key(process): diff --git a/pg_view/models/db_client.py b/pg_view/models/db_client.py index a779e16..0c28c8a 100644 --- a/pg_view/models/db_client.py +++ b/pg_view/models/db_client.py @@ -6,7 +6,7 @@ from pg_view.loggers import logger from pg_view.models.parsers import ProcNetParser -from pg_view.utils import STAT_FIELD, dbversion_as_float +from pg_view.utils import STAT_FIELD, postgres_major_version_to_int def read_postmaster_pid(work_directory, dbname): @@ -81,10 +81,11 @@ def detect_db_connection_arguments(work_directory, pid, version, username, dbnam next reading the postgresql.conf if necessary and, at last, """ conn_args = detect_with_proc_net(pid) - if not conn_args: + if not conn_args and version >= 90100: # if we failed to detect the arguments via the /proc/net/ readings, # perhaps we'll get better luck with just peeking into postmaster.pid. - conn_args = detect_with_postmaster_pid(work_directory, version) + # Only try for PostgreSQL 9.1 and above, earlier version doesn't contain enough data. + conn_args = detect_with_postmaster_pid(work_directory) if not conn_args: logger.error('unable to detect connection parameters for the PostgreSQL cluster at {0}'.format( work_directory)) @@ -138,7 +139,7 @@ def establish_user_defined_connection(instance, conn, clusters): return True -def make_cluster_desc(name, version, workdir, pid, pgcon, conn): +def make_cluster_desc(name, version, workdir, conn): """Create cluster descriptor, complete with the reconnect function.""" def reconnect(): @@ -148,10 +149,8 @@ def reconnect(): return { 'name': name, - 'ver': version, + 'version': version, 'wd': workdir, - 'pid': pid, - 'pgcon': pgcon, 'reconnect': reconnect } @@ -231,8 +230,7 @@ def get_postmasters_directories(): try: fp = open(PG_VERSION_FILENAME, 'rU') val = fp.read().strip() - if val is not None and len(val) >= 2: - version = float(val) + version = postgres_major_version_to_int(val) except os.error as e: logger.error( 'unable to read version number from PG_VERSION directory {0}, have to skip it'.format(pg_dir)) @@ -240,6 +238,8 @@ def get_postmasters_directories(): except ValueError: logger.error('PG_VERSION doesn\'t contain a valid version number: {0}'.format(val)) continue + except Exception as e: + logger.error("could not parse postgres version number: {0}".format(e)) else: dbname = get_dbname_from_path(pg_dir) postmasters[pg_dir] = [pid, version, dbname] @@ -269,12 +269,9 @@ def fetch_socket_inodes_for_process(pid): return inodes -def detect_with_postmaster_pid(work_directory, version): +def detect_with_postmaster_pid(work_directory): - # PostgreSQL 9.0 doesn't have enough data result = {} - if version is None or version == 9.0: - return None PID_FILE = '{0}/postmaster.pid'.format(work_directory) # try to access the socket directory diff --git a/pg_view/utils.py b/pg_view/utils.py index 6d7f3d5..7b6819b 100644 --- a/pg_view/utils.py +++ b/pg_view/utils.py @@ -20,7 +20,6 @@ def enum(**enums): MEM_PAGE_SIZE = resource.getpagesize() OUTPUT_METHOD = enum(console='console', json='json', curses='curses') - def get_valid_output_methods(): result = [] for key in OUTPUT_METHOD.__dict__.keys(): @@ -96,7 +95,47 @@ def process_groups(groups): part.ncurses_set_prefix(pg.ncurses_produce_prefix()) -def dbversion_as_float(pgcon): - version_num = pgcon.server_version - version_num /= 100 - return float('{0}.{1}'.format(version_num / 100, version_num % 100)) + +# The version parsing is shamelessly stolen from https://github.com/zalando/patroni/blob/master/patroni/postgresql.py +def postgres_version_to_int(pg_version): + """ Convert the server_version to integer + + >>> postgres_version_to_int('9.5.3') + 90503 + >>> postgres_version_to_int('9.3.13') + 90313 + >>> postgres_version_to_int('10.1') + 100001 + >>> postgres_version_to_int('10') + Traceback (most recent call last): + ... + Exception: Invalid PostgreSQL format: X.Y or X.Y.Z is accepted: 10 + >>> postgres_version_to_int('a.b.c') + Traceback (most recent call last): + ... + Exception: Invalid PostgreSQL version: a.b.c + """ + components = pg_version.split('.') + + result = [] + if len(components) < 2 or len(components) > 3: + raise Exception("Invalid PostgreSQL format: X.Y or X.Y.Z is accepted: {0}".format(pg_version)) + if len(components) == 2: + # new style verion numbers, i.e. 10.1 becomes 100001 + components.insert(1, '0') + try: + result = [c if int(c) > 10 else '0{0}'.format(c) for c in components] + result = int(''.join(result)) + except ValueError: + raise Exception("Invalid PostgreSQL version: {0}".format(pg_version)) + return result + +def postgres_major_version_to_int(pg_version): + """ + >>> postgres_major_version_to_int('10') + 100000 + >>> postgres_major_version_to_int('9.6') + 90600 + """ + return postgres_version_to_int(pg_version + '.0') + From f4a404e5b65ff5abe8fdbada3b65b039bd0ad6e9 Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Fri, 25 May 2018 16:05:01 +0200 Subject: [PATCH 2/3] Remove last call to dbversion_as_float. Missed in the previos commit. --- pg_view/models/db_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pg_view/models/db_client.py b/pg_view/models/db_client.py index 0c28c8a..617de27 100644 --- a/pg_view/models/db_client.py +++ b/pg_view/models/db_client.py @@ -111,7 +111,7 @@ def establish_user_defined_connection(instance, conn, clusters): logger.error('PostgreSQL exception: {0}'.format(e)) return None # get the database version from the pgcon properties - dbver = dbversion_as_float(pgcon) + pg_version = int(pgcon.server_version) cur = pgcon.cursor() cur.execute('show data_directory') work_directory = cur.fetchone()[0] @@ -133,7 +133,7 @@ def establish_user_defined_connection(instance, conn, clusters): pgcon.close() return True # now we have all components to create a cluster descriptor - desc = make_cluster_desc(name=instance, version=dbver, workdir=work_directory, + desc = make_cluster_desc(name=instance, version=pg_version, workdir=work_directory, pid=pid, pgcon=pgcon, conn=conn) clusters.append(desc) return True From 768656b9f2a60af86375afc5b11d81255a20853f Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Fri, 25 May 2018 16:52:53 +0200 Subject: [PATCH 3/3] Remove extra parameters from make_cluster_desc. --- pg_view/models/db_client.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pg_view/models/db_client.py b/pg_view/models/db_client.py index 617de27..cba2638 100644 --- a/pg_view/models/db_client.py +++ b/pg_view/models/db_client.py @@ -133,8 +133,7 @@ def establish_user_defined_connection(instance, conn, clusters): pgcon.close() return True # now we have all components to create a cluster descriptor - desc = make_cluster_desc(name=instance, version=pg_version, workdir=work_directory, - pid=pid, pgcon=pgcon, conn=conn) + desc = make_cluster_desc(name=instance, version=pg_version, workdir=work_directory, conn=conn) clusters.append(desc) return True