Skip to content

Commit

Permalink
code: fix CWD directory bugs, add oper debug w/ color formatter
Browse files Browse the repository at this point in the history
- all include paths are relative to the directory of the include
containing file.
  • Loading branch information
choppsv1 committed Feb 15, 2023
1 parent 22037b7 commit fbb8cbb
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 23 deletions.
14 changes: 14 additions & 0 deletions munet/logconf-mutest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ version: 1
formatters:
brief:
format: '%(levelname)5s: %(message)s'
operfmt:
class: munet.logging.ColorFormatter
format: ' ------| %(message)s'
exec:
format: '%(asctime)s %(levelname)5s: %(name)s: %(message)s'
output:
Expand All @@ -21,6 +24,11 @@ handlers:
class: logging.StreamHandler
formatter: brief
stream: ext://sys.stderr
oper_console:
level: DEBUG
class: logging.StreamHandler
formatter: operfmt
stream: ext://sys.stderr
exec:
level: DEBUG
class: logging.FileHandler
Expand Down Expand Up @@ -68,3 +76,9 @@ loggers:
# We don't propagate this b/c we want a lower level accept on the console
# Instead we use info_console and exec to cover what root would log to.
propagate: false
# This is used to debug the operation of mutest
mutest.oper:
# Records are emitted at DEBUG so this will normally filter everything
level: INFO
handlers: [ "oper_console" ]
propagate: false
26 changes: 26 additions & 0 deletions munet/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,29 @@ def emit(self, record):
logging.getLogger(record.name).addHandler(h)
h.emit(record)
super().emit(record)


class ColorFormatter(logging.Formatter):
"""A formatter that adds color sequences based on level."""

def __init__(self, fmt=None, datefmt=None, style="%", **kwargs):
grey = "\x1b[90m"
yellow = "\x1b[33m"
red = "\x1b[31m"
bold_red = "\x1b[31;1m"
reset = "\x1b[0m"
# basefmt = " ------| %(message)s "

self.formatters = {
logging.DEBUG: logging.Formatter(grey + fmt + reset),
logging.INFO: logging.Formatter(grey + fmt + reset),
logging.WARNING: logging.Formatter(yellow + fmt + reset),
logging.ERROR: logging.Formatter(red + fmt + reset),
logging.CRITICAL: logging.Formatter(bold_red + fmt + reset),
}
# Why are we even bothering?
super().__init__(fmt, datefmt, style, **kwargs)

def format(self, record):
formatter = self.formatters.get(record.levelno)
return formatter.format(record)
116 changes: 94 additions & 22 deletions munet/mutest/userapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import functools
import json
import logging
import pprint
import re
import time

Expand All @@ -93,9 +94,6 @@ class TestCaseInfo:

def __init__(self, tag: str, name: str, path: Path):
self.path = path.absolute()
self.script_dir = self.path.parent
assert self.script_dir.is_dir()

self.tag = tag
self.name = name
self.steps = 0
Expand All @@ -105,6 +103,12 @@ def __init__(self, tag: str, name: str, path: Path):
self.step_start_time = self.start_time
self.run_time = None

def __repr__(self):
return (
f"TestCaseInfo({self.tag} {self.name} steps {self.steps} "
f"p {self.passed} f {self.failed} path {self.path})"
)


class TestCase:
"""A mutest testcase.
Expand Down Expand Up @@ -175,6 +179,10 @@ def __init__(
self.olog = output_logger
self.logf = functools.partial(self.olog.log, logging.INFO)

oplog = logging.getLogger("mutest.oper")
self.oplogf = oplog.debug
self.oplogf("new TestCase: tag: %s name: %s path: %s", tag, name, path)

# find the longerst target name and make target field that wide
nmax = max(len(x) for x in targets)
nmax = max(nmax, len("TARGET"))
Expand Down Expand Up @@ -210,6 +218,7 @@ def execute(self):
:meta private:
"""
assert TestCase.g_tc is None
self.oplogf("execute")
try:
TestCase.g_tc = self
e = self.__exec_script(self.info.path, True, False)
Expand All @@ -224,19 +233,27 @@ def __del__(self):
TestCase.g_tc = None

def __push_execinfo(self, path: Path):
self.oplogf(
"__push_execinfo: path: %s current top is %s",
path,
pprint.pformat(self.info),
)
newname = self.name + path.stem
self.info.steps += 1
self.__saved_info.append(self.info)
tag = f"{self.info.tag}.{self.info.steps}"
self.info = TestCaseInfo(tag, newname, path)
self.oplogf("__push_execinfo: now on top: %s", pprint.pformat(self.info))

def __pop_execinfo(self):
# do something with tag?
finised_info = self.info
finished_info = self.info
self.info = self.__saved_info.pop()
return finised_info
self.oplogf(" __pop_execinfo: poppped: %s", pprint.pformat(finished_info))
self.oplogf(" __pop_execinfo: now on top: %s", pprint.pformat(self.info))
return finished_info

def __print_header(self, tag, header, add_newline=True):
def __print_header(self, tag, header, add_newline=False):
# self.olog.info(self.sum_fmt, tag, "", "", "", header)
self.olog.info("== %s ==", f"TEST: {tag}. {header}")
if add_newline:
Expand Down Expand Up @@ -266,6 +283,7 @@ def __exec_script(self, path, print_header, add_newline):

_ok_result = "marker"
try:
self.oplogf("__exec_script: path %s", path)
script = open(path, "r", encoding="utf-8").read()

# Load the script into a function.
Expand Down Expand Up @@ -304,9 +322,14 @@ def __exec_script(self, path, print_header, add_newline):
else:
if result is not _ok_result:
logging.info("%s returned early, result: %s", name, result)
else:
self.oplogf("__exec_script: name %s completed normally", name)
return None

def __post_result(self, target, success, rstr, logstr=None):
self.oplogf(
"__post_result: target: %s success %s rstr %s", target, success, rstr
)
if success:
self.info.passed += 1
status = "PASS"
Expand Down Expand Up @@ -342,6 +365,7 @@ def __end_test(self) -> (int, int):
Returns:
number of steps, number passed, number failed, run time.
"""
self.oplogf("__end_test: __in_section: %s", self.__in_section)
if self.__in_section:
self.__end_section()

Expand Down Expand Up @@ -512,45 +536,93 @@ def include(self, pathname: str, new_section: bool = False):
:meta private:
"""
path = Path(pathname)
path = self.info.script_dir.joinpath(path)
path = self.info.path.parent.joinpath(path)

self.oplogf(
"include: new path: %s create section: %s currently __in_section: %s",
path,
new_section,
self.__in_section,
)

if new_section:
if self.__in_section:
self.__end_section()
self.__push_execinfo(path)
self.oplogf("include: starting new exec section")
self.__start_exec_section(path)
our_info = self.info
# Note we do *not* mark __in_section True
else:
# swap the current path inside the top info
old_path = self.info.path
self.info.path = path
self.oplogf("include: swapped info path: new %s old %s", path, old_path)

self.__exec_script(path, new_section, True)
self.__exec_script(path, print_header=new_section, add_newline=new_section)

if new_section:
info = self.__pop_execinfo()
passed, failed = info.passed, info.failed
self.info.passed += passed
self.info.failed += failed
# Something within the section creating include has also created a section
# end it, sections do not cross section creating file boundaries
if self.__in_section:
self.oplogf(
"include done: path: %s __in_section calling __end_section", path
)
self.__end_section()

# This is to add a space after an include completes
self.__space_before_result = True
# We should now be back to the info we started with, b/c we don't actually
# start a new section (__in_section) that then could have been ended inside
# the included file.
assert our_info == self.info

self.oplogf(
"include done: path: %s new_section calling __end_section", path
)
self.__end_section()
else:
# The current top path could be anything due to multiple inline includes as
# well as section swap in and out. Forcibly return the top path to the file
# we are returning to
self.info.path = old_path
self.oplogf("include: restored info path: %s", old_path)

def __end_section(self):
self.oplogf("__end_section: __in_section: %s", self.__in_section)
info = self.__pop_execinfo()
passed, failed = info.passed, info.failed
self.info.passed += passed
self.info.failed += failed
self.__space_before_result = True
self.oplogf("__end_section setting __in_section to False")
self.__in_section = False

def __start_exec_section(self, path):
self.oplogf("__start_exec_section: __in_section: %s", self.__in_section)
if self.__in_section:
self.__end_section()

self.__push_execinfo(path)
self.__space_before_result = False
self.oplogf("NOT setting __in_section to True")
assert not self.__in_section

def section(self, desc: str):
"""See :py:func:`~munet.mutest.userapi.section`.
:meta private:
"""
self.oplogf("section: __in_section: %s", self.__in_section)
# Grab path before we pop the current info off the top
path = self.info.path
old_steps = self.info.steps

if self.__in_section:
self.__end_section()

add_nl = True if self.info.steps > 0 else False
self.__push_execinfo(self.info.path)
self.__print_header(self.info.tag, desc, add_nl)
self.__push_execinfo(path)
add_nl = self.info.steps <= old_steps

self.__space_before_result = False
self.__in_section = True
self.oplogf(" section setting __in_section to True")
self.__print_header(self.info.tag, desc, add_nl)

def step(self, target: str, cmd: str) -> str:
"""See :py:func:`~munet.mutest.userapi.step`.
Expand Down Expand Up @@ -784,10 +856,10 @@ def script_dir() -> Path:
When an include() is called the script_dir is updated to be current with the
includeded file, and is reverted to the previous value when the include completes.
"""
return TestCase.g_tc.info.script_dir
return TestCase.g_tc.info.path.parent


def target(name: str) -> Commander:
def get_target(name: str) -> Commander:
"""Get the target object with the given ``name``."""
return TestCase.g_tc.targets[name]

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "munet"
version = "0.12.6"
version = "0.12.7"
description = "A package to facilitate network simulations"
authors = ["Christian Hopps <[email protected]>"]
license = "GPL-2.0-or-later"
Expand Down

0 comments on commit fbb8cbb

Please sign in to comment.