Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor to psutil #2, added tests #95

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,8 @@ htmlcov
.mr.developer.cfg
.project
.pydevproject
.idea/
.vagrant/
Vagrantfile

pg_view.log
172 changes: 98 additions & 74 deletions pg_view/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,26 @@
from pg_view import flags
from pg_view.collectors.host_collector import HostStatCollector
from pg_view.collectors.memory_collector import MemoryStatCollector
from pg_view.collectors.partition_collector import PartitionStatCollector, DetachedDiskStatCollector
from pg_view.collectors.pg_collector import PgstatCollector
from pg_view.collectors.partition_collector import PartitionStatCollector, DetachedDiskStatCollector, \
DiskCollectorConsumer
from pg_view.collectors.pg_collector import PgStatCollector
from pg_view.collectors.system_collector import SystemStatCollector
from pg_view.exceptions import NoPidConnectionError, InvalidConnectionParamError, NotConnectedError, \
DuplicatedConnectionError
from pg_view.loggers import logger, enable_logging_to_stderr, disable_logging_to_stderr
from pg_view.models.consumers import DiskCollectorConsumer
from pg_view.models.db_client import build_connection, detect_db_connection_arguments, \
establish_user_defined_connection, make_cluster_desc, get_postmasters_directories
from pg_view.models.outputs import CommonOutput, CursesOutput
from pg_view.utils import get_valid_output_methods, OUTPUT_METHOD, \
output_method_is_valid, read_configuration, process_single_collector, process_groups
from pg_view.models.db_client import make_cluster_desc, DBClient
from pg_view.models.outputs import CommonOutput, CursesOutput, get_displayer_by_class
from pg_view.models.parsers import ProcWorker
from pg_view.utils import get_valid_output_methods, OUTPUT_METHOD, output_method_is_valid, \
read_configuration, process_single_collector, process_groups, validate_autodetected_conn_param

try:
import psycopg2
import psycopg2.extras

psycopg2_available = True
except ImportError:
psycopg2_available = False
print('Unable to import psycopg2 module, please, install it (python-psycopg2). Can not continue')
sys.exit(254)

try:
import curses

Expand All @@ -40,6 +42,10 @@
print('Unable to import ncurses, curses output will be unavailable')
curses_available = False

# setup system constants
output_method = OUTPUT_METHOD.curses
options = None


def parse_args():
"""parse command-line options"""
Expand All @@ -56,7 +62,7 @@ def parse_args():
parser.add_option('-o', '--output-method', help='send output to the following source', action='store',
default=OUTPUT_METHOD.curses, dest='output_method')
parser.add_option('-V', '--use-version',
help='version of the instance to monitor (in case it can\'t be autodetected)',
help="version of the instance to monitor (in case it can't be autodetected)",
action='store', dest='version', type='float')
parser.add_option('-l', '--log-file', help='direct log output to the file', action='store',
dest='log_file')
Expand All @@ -79,11 +85,6 @@ def parse_args():
return options, args


# setup system constants
output_method = OUTPUT_METHOD.curses
options = None


# execution starts here
def loop(collectors, consumer, groups, output_method):
if output_method == OUTPUT_METHOD.curses:
Expand Down Expand Up @@ -114,10 +115,8 @@ def poll_keys(screen, output):
return True


def do_loop(screen, groups, output_method, collectors, consumer):
""" Display output (or pass it through to ncurses) """

if output_method == OUTPUT_METHOD.curses:
def get_output(method, screen):
if method == OUTPUT_METHOD.curses:
if screen is None:
logger.error('No parent screen is passed to the curses application')
sys.exit(1)
Expand All @@ -129,28 +128,42 @@ def do_loop(screen, groups, output_method, collectors, consumer):
sys.exit(1)
else:
output = CommonOutput()
return output


def do_loop(screen, groups, output_method, collectors, consumer):
""" Display output (or pass it through to ncurses) """

output = get_output(output_method, screen)
while 1:
# process input:
consumer.consume()
for st in collectors:
for collector in collectors:
if output_method == OUTPUT_METHOD.curses and not poll_keys(screen, output):
# bail out immediately
return
st.set_units_display(flags.display_units)
st.set_ignore_autohide(not flags.autohide_fields)
st.set_notrim(flags.notrim)
process_single_collector(st)
if not poll_keys(screen, output):
# bail out immediately
return

process_single_collector(collector, flags.filter_aux)
if output_method == OUTPUT_METHOD.curses and not poll_keys(screen, output):
return
if not poll_keys(screen, output):
return

if output_method == OUTPUT_METHOD.curses:
process_groups(groups)
# in the non-curses cases display actually shows the data and refresh
# clears the screen, so we need to refresh before display to clear the old data.
if options.clear_screen and output_method != OUTPUT_METHOD.curses:
output.refresh()
for st in collectors:
output.display(st.output(output_method))
for collector in collectors:
displayer = get_displayer_by_class(
output_method, collector,
show_units=flags.display_units,
ignore_autohide=not flags.autohide_fields,
notrim=flags.notrim
)
formatted_data = collector.output(displayer)
output.display(formatted_data)
# in the curses case, refresh shows the data queued by display
if output_method == OUTPUT_METHOD.curses:
output.refresh()
Expand All @@ -166,13 +179,8 @@ def main():
print('Non Linux database hosts are not supported at the moment. Can not continue')
sys.exit(243)

if not psycopg2_available:
print('Unable to import psycopg2 module, please, install it (python-psycopg2). Can not continue')
sys.exit(254)

options, args = parse_args()
consts.TICK_LENGTH = options.tick

output_method = options.output_method

if not output_method_is_valid(output_method):
Expand All @@ -197,60 +205,75 @@ def main():
if options.instance and instance != options.instance:
continue
# pass already aquired connections to make sure we only list unique clusters.
host = config[instance].get('host')
port = config[instance].get('port')
conn = build_connection(host, port,
config[instance].get('user'), config[instance].get('dbname'))

if not establish_user_defined_connection(instance, conn, clusters):
logger.error('failed to acquire details about ' +
'the database cluster {0}, the server will be skipped'.format(instance))
db_client = DBClient.from_config(config[instance])
try:
cluster = db_client.establish_user_defined_connection(instance, clusters)
except (NotConnectedError, NoPidConnectionError):
logger.error('failed to acquire details about the database cluster {0}, the server '
'will be skipped'.format(instance))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker, but this is really not an improvement over the old version. Error messages should not be broken into multiple lines for grep-ability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point

except DuplicatedConnectionError:
pass
else:
clusters.append(cluster)

elif options.host:
# connect to the database using the connection string supplied from command-line
conn = build_connection(options.host, options.port, options.username, options.dbname)
db_client = DBClient.from_options(options)
instance = options.instance or "default"
if not establish_user_defined_connection(instance, conn, clusters):
try:
cluster = db_client.establish_user_defined_connection(instance, clusters)
except (NotConnectedError, NoPidConnectionError):
logger.error("unable to continue with cluster {0}".format(instance))
except DuplicatedConnectionError:
pass
else:
clusters.append(cluster)
elif options.use_service and options.instance:
db_client = DBClient({'service': options.instance})
# connect to the database using the service name
if not establish_user_defined_connection(options.instance, {'service': options.instance}, clusters):
if not db_client.establish_user_defined_connection(options.instance, clusters):
logger.error("unable to continue with cluster {0}".format(options.instance))
else:
# do autodetection
postmasters = get_postmasters_directories()

postmasters = ProcWorker().get_postmasters_directories()
# get all PostgreSQL instances
for result_work_dir, data in postmasters.items():
(ppid, dbversion, dbname) = data
# if user requested a specific database name and version - don't try to connect to others
for result_work_dir, connection_params in postmasters.items():
(ppid, dbversion, dbname) = connection_params
try:
validate_autodetected_conn_param(dbname, dbversion, result_work_dir, connection_params)
except InvalidConnectionParamError:
continue

if options.instance:
if dbname != options.instance or not result_work_dir or not ppid:
continue
if options.version is not None and dbversion != options.version:
continue
db_client = DBClient.from_postmasters(result_work_dir, ppid, dbversion, options)
if db_client is None:
continue
try:
conndata = detect_db_connection_arguments(
result_work_dir, ppid, dbversion, options.username, options.dbname)
if conndata is None:
continue
host = conndata['host']
port = conndata['port']
conn = build_connection(host, port, options.username, options.dbname)
pgcon = psycopg2.connect(**conn)
pgcon = psycopg2.connect(**db_client.connection_params)
except Exception as e:
logger.error('PostgreSQL exception {0}'.format(e))
pgcon = None
if pgcon:
desc = make_cluster_desc(name=dbname, version=dbversion, workdir=result_work_dir,
pid=ppid, pgcon=pgcon, conn=conn)
else:
desc = make_cluster_desc(
name=dbname,
version=dbversion,
workdir=result_work_dir,
pid=ppid,
pgcon=pgcon,
conn=db_client.connection_params
)
clusters.append(desc)

collectors = []
groups = {}
try:
if len(clusters) == 0:
if not clusters:
logger.error('No suitable PostgreSQL instances detected, exiting...')
logger.error('hint: use -v for details, ' +
'or specify connection parameters manually in the configuration file (-c)')
logger.error('hint: use -v for details, or specify connection parameters '
'manually in the configuration file (-c)')
sys.exit(1)

# initialize the disks stat collector process and create an exchange queue
Expand All @@ -265,13 +288,14 @@ def main():
collectors.append(HostStatCollector())
collectors.append(SystemStatCollector())
collectors.append(MemoryStatCollector())
for cl in clusters:
part = PartitionStatCollector(cl['name'], cl['ver'], cl['wd'], consumer)
pg = PgstatCollector(cl['pgcon'], cl['reconnect'], cl['pid'], cl['name'], cl['ver'], options.pid)
groupname = cl['wd']
groups[groupname] = {'pg': pg, 'partitions': part}
collectors.append(part)
collectors.append(pg)

for cluster in clusters:
partition_collector = PartitionStatCollector.from_cluster(cluster, consumer)
pg_collector = PgStatCollector.from_cluster(cluster, options.pid)

groups[cluster['wd']] = {'pg': pg_collector, 'partitions': partition_collector}
collectors.append(partition_collector)
collectors.append(pg_collector)

# we don't want to mix diagnostics messages with useful output, so we log the former into a file.
disable_logging_to_stderr()
Expand Down
Loading