Skip to content

Commit

Permalink
Updated backend to pySigma v0.10.1
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberphor committed Aug 6, 2023
1 parent 78f8f11 commit 72c0672
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 324 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
.vscode/
cov.xml
dist/
docs/_build
docs/_build
backup/
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@ It currently supports the following output formats:
## Testing
```python
python -m pip install --user pytest
python -m pytest
python -m pytest # test all functions
python -m pytest tests/test_backend_powershell.py::test_powershell_and_expression # test a specific function
```

## Updating to the Latest Version of pySigma
```python
python -m poetry add pysigma@latest
```

## References
Expand Down
266 changes: 74 additions & 192 deletions poetry.lock

Large diffs are not rendered by default.

9 changes: 2 additions & 7 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pySigma-backend-powershell"
version = "0.1.0"
version = "0.1.1"
description = "pySigma PowerShell Backend"
authors = ["Victor Fernandez III <@cyberphor>"]
license = "MIT"
Expand All @@ -11,12 +11,7 @@ packages = [

[tool.poetry.dependencies]
python = "^3.8"
pysigma = "^0.9.1"

[tool.poetry.dev-dependencies]
pytest = "^7.2.1"
pytest-cov = "^4.0.0"
coverage = "^7.1.0"
pysigma = "^0.10.1"

[build-system]
requires = ["poetry-core>=1.0.0"]
Expand Down
10 changes: 10 additions & 0 deletions rules/net_connection_win_wscript.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
logsource:
category: network_connection
product: windows
detection:
selection:
Image|endswith: '\wscript.exe'
Initiated: 'true'
DestinationPort:
- 25
condition: selection
179 changes: 83 additions & 96 deletions sigma/backends/powershell/powershell.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
from sigma.conditions import ConditionItem, ConditionAND, ConditionOR, ConditionNOT, ConditionFieldEqualsValueExpression
from sigma.conversion.base import TextQueryBackend, SpecialChars
from sigma.conversion.deferred import DeferredQueryExpression
from collections import defaultdict
from sigma.conditions import ConditionItem, ConditionAND, ConditionOR, ConditionNOT
from sigma.conversion.base import TextQueryBackend
from sigma.conversion.state import ConversionState
from sigma.pipelines.powershell import powershell_pipeline
from sigma.processing.pipeline import ProcessingPipeline
from sigma.rule import SigmaRule
from sigma.types import SigmaCompareExpression
from typing import ClassVar, Dict, Tuple, Pattern, List, Union
from sigma.types import SigmaCompareExpression, SigmaRegularExpressionFlag
from typing import ClassVar, Dict, Tuple, Pattern, List, Any, Optional
from re import compile

class PowerShellBackend(TextQueryBackend):
"""PowerShell backend."""
name: ClassVar[str] = "PowerShell backend"
formats: Dict[str, str] = {
"default": "plain PowerShell queries",
"script": "a PowerShell script",
"xml": "XML documents",
"xpath": "XML strings",
"subscription": "Windows event subscriptions"
"default": "PowerShell sentences",
"script": "PowerShell scripts",
}
requires_pipeline: bool = False
processing_pipeline: powershell_pipeline
last_processing_pipeline: powershell_pipeline
output_format_processing_pipeline: ClassVar[Dict[str, ProcessingPipeline]] = defaultdict(
ProcessingPipeline
)

# Operator precedence: tuple of Condition{AND,OR,NOT} in order of precedence.
# The backend generates grouping if required
precedence: ClassVar[Tuple[ConditionItem, ConditionItem, ConditionItem]] = (ConditionNOT, ConditionAND, ConditionOR)
Expand All @@ -36,125 +41,107 @@ class PowerShellBackend(TextQueryBackend):
### Quoting
field_quote: ClassVar[str] = ""
field_quote_pattern: ClassVar[Pattern] = compile("^\\w+$")
field_quote_pattern_negation : ClassVar[bool] = True # NEW
field_quote_pattern_negation: ClassVar[bool] = True

### Escaping
field_escape: ClassVar[str] = "\\"
field_escape_quote: ClassVar[bool] = False
field_escape_pattern: ClassVar[Pattern] = compile("\\s")
field_escape: ClassVar[str] = "\\" # Character to escape particular parts defined in field_escape_pattern.
field_escape_quote: ClassVar[bool] = False # Escape quote string defined in field_quote
field_escape_pattern: ClassVar[Pattern] = compile("\\s") # All matches of this pattern are prepended with the string contained in field_escape.

## Values
str_quote: ClassVar[str] = '"'
escape_char: ClassVar[str] = "\\"
wildcard_multi: ClassVar[str] = "*"
wildcard_single: ClassVar[str] = "*"
add_escaped: ClassVar[str] = "\\"
filter_chars: ClassVar[str] = ""
bool_values: ClassVar[Dict[bool, str]] = {True: "$true", False:"$false"}
str_quote: ClassVar[str] = '"' # string quoting character (added as escaping character)
escape_char: ClassVar[str] = "\\" # Escaping character for special characrers inside string
wildcard_multi: ClassVar[str] = "*" # Character used as multi-character wildcard
wildcard_single: ClassVar[str] = "*" # Character used as single-character wildcard
add_escaped: ClassVar[str] = "\\" # Characters quoted in addition to wildcards and string quote
filter_chars: ClassVar[str] = "" # Characters filtered
bool_values: ClassVar[Dict[bool, str]] = { # Values to which boolean values are mapped.
True: "$true",
False: "$false",
}

# String matching operators. if None is appropriate, eq_token is used.
startswith_expression: ClassVar[str] = "{field} -like {value}"
endswith_expression: ClassVar[str] = "{field} -like {value}"
contains_expression: ClassVar[str] = "{field} -contains {value}"
wildcard_match_expression: ClassVar[str] = "{field} -like {value}"
startswith_expression: ClassVar[str] = "{field}.StartsWith({value})"
endswith_expression: ClassVar[str] = "{field}.EndsWith({value})"
contains_expression: ClassVar[str] = "{field}.Contains({value})"
wildcard_match_expression: ClassVar[str] = "{field} -like {value}" # Special expression if wildcards can't be matched with the eq_token operator

# Regular expressions
re_expression: ClassVar[str] = '{field} -match "{regex}"'
re_escape_char: ClassVar[str] = "\\"
re_escape: ClassVar[Tuple[str]] = ()
re_escape_char: ClassVar[str] = "\\" # Character used for escaping in regular expressions
re_escape: ClassVar[Tuple[str]] = () # List of strings that are escaped
re_escape_escape_char: bool = True # If True, the escape character is also escaped
re_flag_prefix: bool = True # If True, the flags are prepended as (?x) group at the beginning of the regular expression, e.g. (?i). If this is not supported by the target, it should be set to False.
# Mapping from SigmaRegularExpressionFlag values to static string templates that are used in
# flag_x placeholders in re_expression template.
# By default, i, m and s are defined. If a flag is not supported by the target query language,
# remove it from re_flags or don't define it to ensure proper error handling in case of appearance.
re_flags: Dict[SigmaRegularExpressionFlag, str] = {
SigmaRegularExpressionFlag.IGNORECASE: "i",
SigmaRegularExpressionFlag.MULTILINE : "m",
SigmaRegularExpressionFlag.DOTALL : "s",
}

# Case sensitive string matching expression. String is quoted/escaped like a normal string.
# Placeholders {field} and {value} are replaced with field name and quoted/escaped string.
case_sensitive_match_expression: ClassVar[str] = "{field} casematch {value}"
# Case sensitive string matching operators similar to standard string matching. If not provided,
# case_sensitive_match_expression is used.
case_sensitive_startswith_expression: ClassVar[str] = "{field} casematch_startswith {value}"
case_sensitive_endswith_expression: ClassVar[str] = "{field} casematch_endswith {value}"
case_sensitive_contains_expression: ClassVar[str] = "{field} casematch_contains {value}"

# cidr expressions
cidr_wildcard: ClassVar[str] = "*" # Character used as single wildcard
cidr_wildcard: ClassVar[str] = "*" # Character used as single wildcard
# cidr_expression: ClassVar[str] = "cidrmatch({field}, {value})" # CIDR expression query as format string with placeholders {field} = {value}
# cidr_in_list_expression: ClassVar[str] = "{field} in ({value})" # CIDR expression query as format string with placeholders {field} = in({list})

# Numeric comparison operators
compare_op_expression: ClassVar[str] = "{field} {operator} {value}"
compare_op_expression: ClassVar[str] = "{field} {operator} {value}" # Compare operation query as format string with placeholders {field}, {operator} and {value}
# Mapping between CompareOperators elements and strings used as replacement for {operator} in compare_op_expression
compare_operators: ClassVar[Dict[SigmaCompareExpression.CompareOperators, str]] = {
SigmaCompareExpression.CompareOperators.LT: "-lt",
SigmaCompareExpression.CompareOperators.LTE: "-le",
SigmaCompareExpression.CompareOperators.GT: "-gt",
SigmaCompareExpression.CompareOperators.GTE: "-ge",
}

# Expression for comparing two event fields
field_equals_field_expression: ClassVar[Optional[str]] = None # Field comparison expression with the placeholders {field1} and {field2} corresponding to left field and right value side of Sigma detection item
field_equals_field_escaping_quoting: Tuple[bool, bool] = (True, True) # If regular field-escaping/quoting is applied to field1 and field2. A custom escaping/quoting can be implemented in the convert_condition_field_eq_field_escape_and_quote method.

# Null/None expressions
field_null_expression : ClassVar[str] = "{field} is null" # Expression for field has null value as format string with {field} placeholder for field name
field_null_expression: ClassVar[str] = "{field} -is null" # Expression for field has null value as format string with {field} placeholder for field name

# Field existence condition expressions.
field_exists_expression: ClassVar[str] = "exists({field})" # Expression for field existence as format string with {field} placeholder for field name
field_not_exists_expression: ClassVar[str] = "notexists({field})" # Expression for field non-existence as format string with {field} placeholder for field name. If not set, field_exists_expression is negated with boolean NOT.

# Field value in list, e.g. "field in (value list)" or "field containsall (value list)"
convert_or_as_in: ClassVar[bool] = True # Convert OR as in-expression
convert_and_as_in: ClassVar[bool] = True # Convert AND as in-expression
in_expressions_allow_wildcards: ClassVar[bool] = False # Values in list can contain wildcards. If set to False (default) only plain values are converted into in-expressions.
field_in_list_expression : ClassVar[str] = "{field} {op} ({list})" # Expression for field in list of values as format string with placeholders {field}, {op} and {list}
or_in_operator: ClassVar[str] = "-in" # Operator used to convert OR into in-expressions. Must be set if convert_or_as_in is set
and_in_operator: ClassVar[str] = "contains-all" # Operator used to convert AND into in-expressions. Must be set if convert_and_as_in is set
list_separator: ClassVar[str] = ", " # List element separator
convert_or_as_in: ClassVar[bool] = True # Convert OR as in-expression
convert_and_as_in: ClassVar[bool] = True # Convert AND as in-expression
in_expressions_allow_wildcards: ClassVar[bool] = False # Values in list can contain wildcards. If set to False (default) only plain values are converted into in-expressions.
field_in_list_expression: ClassVar[str] = "{field} {op} ({list})" # Expression for field in list of values as format string with placeholders {field}, {op} and {list}
or_in_operator: ClassVar[str] = "in" # Operator used to convert OR into in-expressions. Must be set if convert_or_as_in is set
and_in_operator: ClassVar[str] = "contains-all" # Operator used to convert AND into in-expressions. Must be set if convert_and_as_in is set
list_separator: ClassVar[str] = ", " # List element separator

# Value not bound to a field
unbound_value_str_expression : ClassVar[str] = '"{value}"' # Expression for string value not bound to a field as format string with placeholder {value}
unbound_value_num_expression : ClassVar[str] = '{value}' # Expression for number value not bound to a field as format string with placeholder {value}
unbound_value_re_expression : ClassVar[str] = '_=~{value}' # Expression for regular expression not bound to a field as format string with placeholder {value}
unbound_value_str_expression: ClassVar[str] = '"{value}"' # Expression for string value not bound to a field as format string with placeholder {value}
unbound_value_num_expression: ClassVar[str] = '{value}' # Expression for number value not bound to a field as format string with placeholder {value}
unbound_value_re_expression: ClassVar[str] = '_=~{value}' # Expression for regular expression not bound to a field as format string with placeholder {value} and {flag_x} as described for re_expression

# Query finalization: appending and concatenating deferred query part
deferred_start: ClassVar[str] = "\n| "
deferred_separator: ClassVar[str] = "\n| "
deferred_only_query: ClassVar[str] = "*"

def convert_condition_field_eq_val_str(self, cond: ConditionFieldEqualsValueExpression, state: ConversionState) -> Union[str, DeferredQueryExpression]:
"""Conversion of field = string value expressions"""
try:
if (
self.startswith_expression is not None
and cond.value.endswith(SpecialChars.WILDCARD_MULTI)
and not cond.value[:-1].contains_special()
):
expr = self.startswith_expression
value = cond.value # this was originally "value = cond.value[:-1]"
elif (
self.endswith_expression is not None
and cond.value.startswith(SpecialChars.WILDCARD_MULTI)
and not cond.value[1:].contains_special()
):
expr = self.endswith_expression
value = cond.value[1:]
elif (
self.contains_expression is not None
and cond.value.startswith(SpecialChars.WILDCARD_MULTI)
and cond.value.endswith(SpecialChars.WILDCARD_MULTI)
and not cond.value[1:-1].contains_special()
):
expr = self.contains_expression
value = cond.value[1:-1]
elif (
self.wildcard_match_expression is not None
and cond.value.contains_special()
):
expr = self.wildcard_match_expression
value = cond.value
else:
expr = "{field}" + self.eq_token + "{value}"
value = cond.value
return expr.format(field=self.escape_and_quote_field(cond.field), value=self.convert_value_str(value, state))
except TypeError:
raise NotImplementedError("Field equals string value expressions with strings are not supported by the backend.")

def convert_condition_not(self, cond: ConditionNOT, state: ConversionState) -> Union[str, DeferredQueryExpression]:
"""Conversion of NOT conditions."""
arg = cond.args[0]
if arg.__class__ in self.precedence:
return self.not_token + self.token_separator + self.convert_condition_group(arg, state)
else:
expr = self.convert_condition(arg, state)
if isinstance(expr, DeferredQueryExpression):
return expr.negate()
else:
return f'{arg.field} -ne "{arg.value}"'
deferred_start: ClassVar[str] = "\n| " # String used as separator between main query and deferred parts
deferred_separator: ClassVar[str] = "\n| " # String used to join multiple deferred query parts
deferred_only_query: ClassVar[str] = "*" # String used as query if final query only contains deferred expression

def finalize_query_default(self, rule: SigmaRule, query: str, index: int, state: ConversionState) -> str:
def finalize_query_default(self, rule: SigmaRule, query: Any, index: int, state: ConversionState) -> Any:
if hasattr(rule, "eventid"):
filter = f'-FilterHashTable @{{LogName = "{rule.logsource.service}"; Id = {rule.eventid}}} | '
else:
filter = f'-LogName "{rule.logsource.service}" | '
return "Get-WinEvent " + filter + f"Read-WinEvent | Where-Object {{{query}}}"

def finalize_output_default(self, queries: List[str]) -> str:
return list(queries)
def finalize_output_default(self, queries: List[str]) -> Any:
return queries
6 changes: 5 additions & 1 deletion sigma/pipelines/powershell/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
from .powershell import powershell_pipeline
# TODO: add all pipelines that should be exposed to the user of your backend in the import statement above.
# TODO: add all pipelines that should be exposed to the user of your backend in the import statement above.

pipelines = {
"powershell_pipeline": powershell_pipeline
}
21 changes: 3 additions & 18 deletions sigma/pipelines/powershell/powershell.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
from sigma.pipelines.common import logsource_windows, windows_logsource_mapping
from sigma.processing.conditions import IncludeFieldCondition, LogsourceCondition
from sigma.processing.pipeline import ProcessingPipeline, ProcessingItem
from sigma.processing.transformations import AddFieldnamePrefixTransformation, ChangeLogsourceTransformation, DropDetectionItemTransformation, RuleFailureTransformation, Transformation
from sigma.processing.transformations import AddConditionTransformation, AddFieldnamePrefixTransformation, ChangeLogsourceTransformation, DetectionItemFailureTransformation, DropDetectionItemTransformation, FieldMappingTransformation, RuleFailureTransformation, SetStateTransformation, Transformation
from sigma.rule import SigmaRule
from re import compile

@dataclass
class PromoteDetectionItemTransformation(Transformation):
Expand All @@ -18,24 +17,14 @@ def apply(self, pipeline, rule: SigmaRule) -> None:
# TODO: address situations where the detection item has more than one value
setattr(rule, self.field.lower(), detection_item.value[0])

@dataclass
class RemoveWhiteSpaceTransformation(Transformation):
"""Remove white space characters from detection item field names."""
def apply(self, pipeline, rule: SigmaRule) -> None:
super().apply(pipeline, rule)
for detection in rule.detection.detections.values():
for detection_item in detection.detection_items:
if compile(pattern = "\\w+\\s+\\w+").match(detection_item.field):
detection_item.field = detection_item.field.replace(" ", "")

def powershell_pipeline() -> ProcessingPipeline:
return ProcessingPipeline(
name = "PowerShell pipeline",
items = [
ProcessingItem(
rule_condition_negation = True,
rule_conditions = [LogsourceCondition(product = "windows")],
transformation = RuleFailureTransformation("Product not supported.")
transformation = RuleFailureTransformation(message = "Product not supported.")
)
] + [
ProcessingItem(
Expand Down Expand Up @@ -64,11 +53,7 @@ def powershell_pipeline() -> ProcessingPipeline:
)
] + [
ProcessingItem(
transformation = RemoveWhiteSpaceTransformation()
)
] + [
ProcessingItem(
transformation = AddFieldnamePrefixTransformation("$_.")
transformation = AddFieldnamePrefixTransformation(prefix = "$_.")
)
]
)
Loading

0 comments on commit 72c0672

Please sign in to comment.