Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ImplementationofStandardMethodswithTypeannotationsandOtherimprovements #695

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 112 additions & 126 deletions trailscraper/iam.py
Original file line number Diff line number Diff line change
@@ -1,186 +1,172 @@
"""Classes to deal with IAM Policies"""
import json
import os

import re
from typing import List, Dict, Union

import six
from toolz import pipe
from toolz.curried import groupby as groupbyz
from toolz.curried import map as mapz

BASE_ACTION_PREFIXES = ["Describe", "Create", "Delete", "Update", "Detach", "Attach", "List", "Put", "Get", ]
BASE_ACTION_PREFIXES = ["Describe", "Create", "Delete", "Update", "Detach", "Attach", "List", "Put", "Get"]

# pylint: disable=invalid-name
class BaseElement:
"""Base Class for all IAM Policy classes"""

def json_repr(self):
"""JSON representation of the class"""
raise NotImplementedError
    """Base Class for all IAM Policy classes"""

def __eq__(self, other):
if isinstance(other, self.__class__):
return self.json_repr() == other.json_repr()
    def json_repr(self) -> Union[str, dict]:
        """JSON representation of the class"""
        raise NotImplementedError

return False
    def __eq__(self, other) -> bool:
        return isinstance(other, self.__class__) and self.json_repr() == other.json_repr()

def __ne__(self, other):
return not self == other
    def __ne__(self, other) -> bool:
        return not self == other

def __hash__(self):
return hash(self.json_repr())
    def __hash__(self) -> int:
        return hash(self.json_repr())

def __repr__(self):
return str(self.json_repr())
    def __repr__(self) -> str:
        return str(self.json_repr())


class Action(BaseElement):
"""Action in an IAM Policy."""

def __init__(self, prefix, action):
self.action = action
self.prefix = prefix

def json_repr(self):
return ':'.join([self.prefix, self.action])
    """Action in an IAM Policy."""

def _base_action(self):
without_prefix = self.action
for prefix in BASE_ACTION_PREFIXES:
without_prefix = re.sub(prefix, "", without_prefix)
    def __init__(self, prefix: str, action: str):
        self.prefix = prefix
        self.action = action

without_plural = re.sub(r"s$", "", without_prefix)
    def json_repr(self) -> str:
        return ':'.join([self.prefix, self.action])

return without_plural
    def _base_action(self) -> str:
        without_prefix = self.action
        for prefix in BASE_ACTION_PREFIXES:
            without_prefix = re.sub(prefix, "", without_prefix)

def matching_actions(self, allowed_prefixes):
"""Return a matching create action for this Action"""
        without_plural = re.sub(r"s$", "", without_prefix)
        return without_plural

if not allowed_prefixes:
allowed_prefixes = BASE_ACTION_PREFIXES
    def matching_actions(self, allowed_prefixes: List[str] = None) -> List['Action']:
        """Return matching actions for this Action."""
        if allowed_prefixes is None:
            allowed_prefixes = BASE_ACTION_PREFIXES

potential_matches = [Action(prefix=self.prefix, action=action_prefix + self._base_action())
for action_prefix in allowed_prefixes]

potential_matches += [Action(prefix=self.prefix, action=action_prefix + self._base_action() + "s")
for action_prefix in allowed_prefixes]

return [potential_match
for potential_match in potential_matches
if potential_match in known_iam_actions(self.prefix) and potential_match != self]
        base_action = self._base_action()
        potential_matches = [
            Action(prefix=self.prefix, action=f"{action_prefix}{base_action}{'s' if plural else ''}")
            for action_prefix in allowed_prefixes
            for plural in [False, True]
        ]

        return [pm for pm in potential_matches if pm in known_iam_actions(self.prefix) and pm != self]


class Statement(BaseElement):
"""Statement in an IAM Policy."""

def __init__(self, Action, Effect, Resource): # pylint: disable=redefined-outer-name
self.Action = Action # pylint: disable=invalid-name
self.Effect = Effect # pylint: disable=invalid-name
self.Resource = Resource # pylint: disable=invalid-name

def json_repr(self):
return {
'Action': self.Action,
'Effect': self.Effect,
'Resource': self.Resource,
}
    """Statement in an IAM Policy."""

def merge(self, other):
"""Merge two statements into one."""
if self.Effect != other.Effect:
raise ValueError(f"Trying to combine two statements with differing effects: {self.Effect} {other.Effect}")
    def __init__(self, actions: List[Action], effect: str, resources: List[str]):
        self.actions = actions
        self.effect = effect
        self.resources = resources

effect = self.Effect
    def json_repr(self) -> dict:
        return {
            'Action': [action.json_repr() for action in self.actions],
            'Effect': self.effect,
            'Resource': self.resources,
        }

actions = list(sorted(set(self.Action + other.Action), key=lambda action: action.json_repr()))
resources = list(sorted(set(self.Resource + other.Resource)))
    def merge(self, other: 'Statement') -> 'Statement':
        """Merge two statements into one."""
        if self.effect != other.effect:
            raise StatementMergeError(f"Cannot merge statements with different effects: {self.effect} vs {other.effect}")

return Statement(
Effect=effect,
Action=actions,
Resource=resources,
)
        actions = list(sorted(set(self.actions + other.actions), key=lambda action: action.json_repr()))
        resources = list(sorted(set(self.resources + other.resources)))

def __action_list_strings(self):
return "-".join([a.json_repr() for a in self.Action])
        return Statement(actions=actions, effect=self.effect, resources=resources)

def __lt__(self, other):
if self.Effect != other.Effect:
return self.Effect < other.Effect
if self.Action != other.Action:
# pylint: disable=W0212
return self.__action_list_strings() < other.__action_list_strings()
    def __lt__(self, other: 'Statement') -> bool:
        if self.effect != other.effect:
            return self.effect < other.effect
        if self.actions != other.actions:
            return "-".join(a.json_repr() for a in self.actions) < "-".join(a.json_repr() for a in other.actions)

return "".join(self.Resource) < "".join(other.Resource)
        return "".join(self.resources) < "".join(other.resources)


class PolicyDocument(BaseElement):
"""IAM Policy Doument."""
    """IAM Policy Document."""

def __init__(self, Statement, Version="2012-10-17"): # pylint: disable=redefined-outer-name
self.Version = Version # pylint: disable=invalid-name
self.Statement = Statement # pylint: disable=invalid-name
    def __init__(self, statements: List[Statement], version: str = "2012-10-17"):
        self.version = version
        self.statements = statements

def json_repr(self):
return {
'Version': self.Version,
'Statement': self.Statement
}
    def json_repr(self) -> dict:
        return {
            'Version': self.version,
            'Statement': [statement.json_repr() for statement in self.statements]
        }

def to_json(self):
"""Render object into IAM Policy JSON"""
return json.dumps(self.json_repr(), cls=IAMJSONEncoder, indent=4, sort_keys=True)
    def to_json(self) -> str:
        """Render object into IAM Policy JSON"""
        return json.dumps(self.json_repr(), cls=IAMJSONEncoder, indent=4, sort_keys=True)


class IAMJSONEncoder(json.JSONEncoder):
"""JSON Encoder using the json_repr functions"""
    """JSON Encoder using the json_repr functions"""

def default(self, o): # pylint: disable=method-hidden
if hasattr(o, 'json_repr'):
return o.json_repr()
return json.JSONEncoder.default(self, o)
    def default(self, o):  # pylint: disable=method-hidden
        if hasattr(o, 'json_repr'):
            return o.json_repr()
        return super().default(o)


def _parse_action(action):
parts = action.split(":")
return Action(parts[0], parts[1])
def _parse_action(action: str) -> Action:
    parts = action.split(":")
    return Action(parts[0], parts[1])


def _parse_statement(statement):
return Statement(Action=[_parse_action(action) for action in statement['Action']],
Effect=statement['Effect'],
Resource=statement['Resource'])
def _parse_statement(statement: dict) -> Statement:
    return Statement(
        actions=[_parse_action(action) for action in statement['Action']],
        effect=statement['Effect'],
        resources=statement['Resource']
    )


def _parse_statements(json_data):
# TODO: jsonData could also be dict, aka one statement; similar things happen in the rest of the policy pylint: disable=fixme
# https://github.com/flosell/iam-policy-json-to-terraform/blob/fafc231/converter/decode.go#L12-L22
return [_parse_statement(statement) for statement in json_data]
def _parse_statements(json_data: Union[List[dict], dict]) -> List[Statement]:
    if isinstance(json_data, dict):
    # jsonData could also be dict, aka one statement; similar things happen in the rest of the policy pylint: disable=fixme
    # https://github.com/flosell/iam-policy-json-to-terraform/blob/fafc231/converter/decode.go#L12-L22
        return [_parse_statement(json_data)]

    # Handle multiple statements case
    return [_parse_statement(statement) for statement in json_data]


def parse_policy_document(stream):
"""Parse a stream of JSON data to a PolicyDocument object"""
if isinstance(stream, six.string_types):
json_dict = json.loads(stream)
else:
json_dict = json.load(stream)
def parse_policy_document(stream: Union[str, bytes]) -> PolicyDocument:
    """Parse a stream of JSON data to a PolicyDocument object"""
    if isinstance(stream, six.string_types):
        json_dict = json.loads(stream)
    else:
        json_dict = json.load(stream)

return PolicyDocument(_parse_statements(json_dict['Statement']), Version=json_dict['Version'])
    return PolicyDocument(statements=_parse_statements(json_dict['Statement']), version=json_dict.get('Version'))


def all_known_iam_permissions():
"Return a list of all known IAM actions"
with open(os.path.join(os.path.dirname(__file__), 'known-iam-actions.txt'), encoding="UTF-8") as iam_file:
return {line.rstrip('\n') for line in iam_file.readlines()}
def all_known_iam_permissions() -> set:
    """Return a set of all known IAM actions."""
    with open(os.path.join(os.path.dirname(__file__), 'known-iam-actions.txt'), encoding="UTF-8") as iam_file:
        return {line.strip() for line in iam_file}


def known_iam_actions(prefix):
"""Return known IAM actions for a prefix, e.g. all ec2 actions"""
# This could be memoized for performance improvements
knowledge = pipe(all_known_iam_permissions(),
mapz(_parse_action),
groupbyz(lambda x: x.prefix))
def known_iam_actions(prefix: str) -> List[Action]:
    """Return known IAM actions for a prefix."""
    knowledge = pipe(all_known_iam_permissions(),
                     mapz(_parse_action),
                     groupbyz(lambda x: x.prefix))

return knowledge.get(prefix, [])
    return knowledge.get(prefix, [])