From 90f4dc65bdd46de0d93aa024ece9b022ec33629b Mon Sep 17 00:00:00 2001 From: Felipe <65371336+mrfelpa@users.noreply.github.com> Date: Wed, 13 Nov 2024 13:07:15 +0000 Subject: [PATCH] ImplementationofStandardMethodswithTypeannotationsandOtherimprovements I implemented the improvements to make the code more adjusted following the best programming practices. --- trailscraper/iam.py | 238 +++++++++++++++++++++----------------------- 1 file changed, 112 insertions(+), 126 deletions(-) diff --git a/trailscraper/iam.py b/trailscraper/iam.py index ddf58bb..af453fb 100644 --- a/trailscraper/iam.py +++ b/trailscraper/iam.py @@ -1,186 +1,172 @@ -"""Classes to deal with IAM Policies""" import json import os - import re +from typing import List, Dict, Union import six from toolz import pipe from toolz.curried import groupby as groupbyz from toolz.curried import map as mapz -BASE_ACTION_PREFIXES = ["Describe", "Create", "Delete", "Update", "Detach", "Attach", "List", "Put", "Get", ] +BASE_ACTION_PREFIXES = ["Describe", "Create", "Delete", "Update", "Detach", "Attach", "List", "Put", "Get"] -# pylint: disable=invalid-name class BaseElement: - """Base Class for all IAM Policy classes""" - - def json_repr(self): - """JSON representation of the class""" - raise NotImplementedError +    """Base Class for all IAM Policy classes""" - def __eq__(self, other): - if isinstance(other, self.__class__): - return self.json_repr() == other.json_repr() +    def json_repr(self) -> Union[str, dict]: +        """JSON representation of the class""" +        raise NotImplementedError - return False +    def __eq__(self, other) -> bool: +        return isinstance(other, self.__class__) and self.json_repr() == other.json_repr() - def __ne__(self, other): - return not self == other +    def __ne__(self, other) -> bool: +        return not self == other - def __hash__(self): - return hash(self.json_repr()) +    def __hash__(self) -> int: +        return hash(self.json_repr()) - def __repr__(self): - return str(self.json_repr()) +    def __repr__(self) -> str: +        return str(self.json_repr()) class Action(BaseElement): - """Action in an IAM Policy.""" - - def __init__(self, prefix, action): - self.action = action - self.prefix = prefix - - def json_repr(self): - return ':'.join([self.prefix, self.action]) +    """Action in an IAM Policy.""" - def _base_action(self): - without_prefix = self.action - for prefix in BASE_ACTION_PREFIXES: - without_prefix = re.sub(prefix, "", without_prefix) +    def __init__(self, prefix: str, action: str): +        self.prefix = prefix +        self.action = action - without_plural = re.sub(r"s$", "", without_prefix) +    def json_repr(self) -> str: +        return ':'.join([self.prefix, self.action]) - return without_plural +    def _base_action(self) -> str: +        without_prefix = self.action +        for prefix in BASE_ACTION_PREFIXES: +            without_prefix = re.sub(prefix, "", without_prefix) - def matching_actions(self, allowed_prefixes): - """Return a matching create action for this Action""" +        without_plural = re.sub(r"s$", "", without_prefix) +        return without_plural - if not allowed_prefixes: - allowed_prefixes = BASE_ACTION_PREFIXES +    def matching_actions(self, allowed_prefixes: List[str] = None) -> List['Action']: +        """Return matching actions for this Action.""" +        if allowed_prefixes is None: +            allowed_prefixes = BASE_ACTION_PREFIXES - potential_matches = [Action(prefix=self.prefix, action=action_prefix + self._base_action()) - for action_prefix in allowed_prefixes] - - potential_matches += [Action(prefix=self.prefix, action=action_prefix + self._base_action() + "s") - for action_prefix in allowed_prefixes] - - return [potential_match - for potential_match in potential_matches - if potential_match in known_iam_actions(self.prefix) and potential_match != self] +        base_action = self._base_action() +        potential_matches = [ +            Action(prefix=self.prefix, action=f"{action_prefix}{base_action}{'s' if plural else ''}") +            for action_prefix in allowed_prefixes +            for plural in [False, True] +        ] +        return [pm for pm in potential_matches if pm in known_iam_actions(self.prefix) and pm != self] class Statement(BaseElement): - """Statement in an IAM Policy.""" - - def __init__(self, Action, Effect, Resource): # pylint: disable=redefined-outer-name - self.Action = Action # pylint: disable=invalid-name - self.Effect = Effect # pylint: disable=invalid-name - self.Resource = Resource # pylint: disable=invalid-name - - def json_repr(self): - return { - 'Action': self.Action, - 'Effect': self.Effect, - 'Resource': self.Resource, - } +    """Statement in an IAM Policy.""" - def merge(self, other): - """Merge two statements into one.""" - if self.Effect != other.Effect: - raise ValueError(f"Trying to combine two statements with differing effects: {self.Effect} {other.Effect}") +    def __init__(self, actions: List[Action], effect: str, resources: List[str]): +        self.actions = actions +        self.effect = effect +        self.resources = resources - effect = self.Effect +    def json_repr(self) -> dict: +        return { +            'Action': [action.json_repr() for action in self.actions], +            'Effect': self.effect, +            'Resource': self.resources, +        } - actions = list(sorted(set(self.Action + other.Action), key=lambda action: action.json_repr())) - resources = list(sorted(set(self.Resource + other.Resource))) +    def merge(self, other: 'Statement') -> 'Statement': +        """Merge two statements into one.""" +        if self.effect != other.effect: +            raise StatementMergeError(f"Cannot merge statements with different effects: {self.effect} vs {other.effect}") - return Statement( - Effect=effect, - Action=actions, - Resource=resources, - ) +        actions = list(sorted(set(self.actions + other.actions), key=lambda action: action.json_repr())) +        resources = list(sorted(set(self.resources + other.resources))) - def __action_list_strings(self): - return "-".join([a.json_repr() for a in self.Action]) +        return Statement(actions=actions, effect=self.effect, resources=resources) - def __lt__(self, other): - if self.Effect != other.Effect: - return self.Effect < other.Effect - if self.Action != other.Action: - # pylint: disable=W0212 - return self.__action_list_strings() < other.__action_list_strings() +    def __lt__(self, other: 'Statement') -> bool: +        if self.effect != other.effect: +            return self.effect < other.effect +        if self.actions != other.actions: +            return "-".join(a.json_repr() for a in self.actions) < "-".join(a.json_repr() for a in other.actions) - return "".join(self.Resource) < "".join(other.Resource) +        return "".join(self.resources) < "".join(other.resources) class PolicyDocument(BaseElement): - """IAM Policy Doument.""" +    """IAM Policy Document.""" - def __init__(self, Statement, Version="2012-10-17"): # pylint: disable=redefined-outer-name - self.Version = Version # pylint: disable=invalid-name - self.Statement = Statement # pylint: disable=invalid-name +    def __init__(self, statements: List[Statement], version: str = "2012-10-17"): +        self.version = version +        self.statements = statements - def json_repr(self): - return { - 'Version': self.Version, - 'Statement': self.Statement - } +    def json_repr(self) -> dict: +        return { +            'Version': self.version, +            'Statement': [statement.json_repr() for statement in self.statements] +        } - def to_json(self): - """Render object into IAM Policy JSON""" - return json.dumps(self.json_repr(), cls=IAMJSONEncoder, indent=4, sort_keys=True) +    def to_json(self) -> str: +        """Render object into IAM Policy JSON""" +        return json.dumps(self.json_repr(), cls=IAMJSONEncoder, indent=4, sort_keys=True) class IAMJSONEncoder(json.JSONEncoder): - """JSON Encoder using the json_repr functions""" +    """JSON Encoder using the json_repr functions""" - def default(self, o): # pylint: disable=method-hidden - if hasattr(o, 'json_repr'): - return o.json_repr() - return json.JSONEncoder.default(self, o) +    def default(self, o):  # pylint: disable=method-hidden +        if hasattr(o, 'json_repr'): +            return o.json_repr() +        return super().default(o) -def _parse_action(action): - parts = action.split(":") - return Action(parts[0], parts[1]) +def _parse_action(action: str) -> Action: +    parts = action.split(":") +    return Action(parts[0], parts[1]) -def _parse_statement(statement): - return Statement(Action=[_parse_action(action) for action in statement['Action']], - Effect=statement['Effect'], - Resource=statement['Resource']) +def _parse_statement(statement: dict) -> Statement: +    return Statement( +        actions=[_parse_action(action) for action in statement['Action']], +        effect=statement['Effect'], +        resources=statement['Resource'] +    ) -def _parse_statements(json_data): - # TODO: jsonData could also be dict, aka one statement; similar things happen in the rest of the policy pylint: disable=fixme - # https://github.com/flosell/iam-policy-json-to-terraform/blob/fafc231/converter/decode.go#L12-L22 - return [_parse_statement(statement) for statement in json_data] +def _parse_statements(json_data: Union[List[dict], dict]) -> List[Statement]: +    if isinstance(json_data, dict): +    # jsonData could also be dict, aka one statement; similar things happen in the rest of the policy pylint: disable=fixme +    # https://github.com/flosell/iam-policy-json-to-terraform/blob/fafc231/converter/decode.go#L12-L22 +        return [_parse_statement(json_data)] +    +    # Handle multiple statements case +    return [_parse_statement(statement) for statement in json_data] -def parse_policy_document(stream): - """Parse a stream of JSON data to a PolicyDocument object""" - if isinstance(stream, six.string_types): - json_dict = json.loads(stream) - else: - json_dict = json.load(stream) +def parse_policy_document(stream: Union[str, bytes]) -> PolicyDocument: +    """Parse a stream of JSON data to a PolicyDocument object""" +    if isinstance(stream, six.string_types): +        json_dict = json.loads(stream) +    else: +        json_dict = json.load(stream) - return PolicyDocument(_parse_statements(json_dict['Statement']), Version=json_dict['Version']) +    return PolicyDocument(statements=_parse_statements(json_dict['Statement']), version=json_dict.get('Version')) -def all_known_iam_permissions(): - "Return a list of all known IAM actions" - with open(os.path.join(os.path.dirname(__file__), 'known-iam-actions.txt'), encoding="UTF-8") as iam_file: - return {line.rstrip('\n') for line in iam_file.readlines()} +def all_known_iam_permissions() -> set: +    """Return a set of all known IAM actions.""" +    with open(os.path.join(os.path.dirname(__file__), 'known-iam-actions.txt'), encoding="UTF-8") as iam_file: +        return {line.strip() for line in iam_file} -def known_iam_actions(prefix): - """Return known IAM actions for a prefix, e.g. all ec2 actions""" - # This could be memoized for performance improvements - knowledge = pipe(all_known_iam_permissions(), - mapz(_parse_action), - groupbyz(lambda x: x.prefix)) +def known_iam_actions(prefix: str) -> List[Action]: +    """Return known IAM actions for a prefix.""" +    knowledge = pipe(all_known_iam_permissions(), +                     mapz(_parse_action), +                     groupbyz(lambda x: x.prefix)) - return knowledge.get(prefix, []) +    return knowledge.get(prefix, [])