From 68274951ba29d55a117c7a2993897c1fcf5db701 Mon Sep 17 00:00:00 2001 From: Mathieu LE CLEACH Date: Tue, 24 Sep 2024 15:09:42 +0200 Subject: [PATCH] add: MSSP mode in Microsoft XDR --- src/droid/convert.py | 8 +- src/droid/export.py | 12 +- src/droid/integrity.py | 65 ++++++++-- src/droid/platforms/ms_xdr.py | 222 +++++++++++++++++++++----------- src/droid/platforms/sentinel.py | 8 +- src/droid/search.py | 60 +++++++-- 6 files changed, 273 insertions(+), 102 deletions(-) diff --git a/src/droid/convert.py b/src/droid/convert.py index 1c934a3..4899eca 100644 --- a/src/droid/convert.py +++ b/src/droid/convert.py @@ -189,10 +189,14 @@ def convert_rules(parameters, droid_config, base_config, logger_param): platform = SentinelPlatform(droid_config, logger_param, export_mssp=True) elif "microsoft_sentinel" in platform_name: platform = SentinelPlatform(droid_config, logger_param, export_mssp=False) + elif "microsoft_xdr" in platform_name and parameters.sentinel_xdr and parameters.mssp: + platform = SentinelPlatform(droid_config, logger_param, export_mssp=True) elif "microsoft_xdr" in platform_name and parameters.sentinel_xdr: - platform = SentinelPlatform(droid_config, logger_param) + platform = SentinelPlatform(droid_config, logger_param, export_mssp=False) + elif "microsoft_xdr" in platform_name and parameters.mssp: + platform = MicrosoftXDRPlatform(droid_config, logger_param, export_mssp=True) elif "microsoft_xdr" in platform_name: - platform = MicrosoftXDRPlatform(droid_config, logger_param) + platform = MicrosoftXDRPlatform(droid_config, logger_param, export_mssp=False) if path.is_dir(): error_i = False diff --git a/src/droid/export.py b/src/droid/export.py index b2f3eb6..30ce7a8 100644 --- a/src/droid/export.py +++ b/src/droid/export.py @@ -70,14 +70,20 @@ def export_rule_raw(parameters: dict, export_config: dict, logger_param: dict): if parameters.platform == "splunk": platform = SplunkPlatform(export_config, logger_param) + elif parameters.platform == "esql" or parameters.platform == "eql": + platform = ElasticPlatform(export_config, logger_param, parameters.platform, raw=True) elif parameters.platform == "microsoft_sentinel" and parameters.mssp: platform = SentinelPlatform(export_config, logger_param, export_mssp=True) elif parameters.platform == "microsoft_sentinel": platform = SentinelPlatform(export_config, logger_param, export_mssp=False) + elif "microsoft_xdr" in parameters.platform and parameters.sentinel_xdr and parameters.mssp: + platform = SentinelPlatform(export_config, logger_param, export_mssp=True) + elif "microsoft_xdr" in parameters.platform and parameters.sentinel_xdr: + platform = SentinelPlatform(export_config, logger_param, export_mssp=False) + elif parameters.platform == "microsoft_xdr" and parameters.mssp: + platform = MicrosoftXDRPlatform(export_config, logger_param, export_mssp=True) elif parameters.platform == "microsoft_xdr": - platform = MicrosoftXDRPlatform(export_config, logger_param) - elif parameters.platform == "esql" or parameters.platform == "eql": - platform = ElasticPlatform(export_config, logger_param, parameters.platform, raw=True) + platform = MicrosoftXDRPlatform(export_config, logger_param, export_mssp=False) if path.is_dir(): error_i = False diff --git a/src/droid/integrity.py b/src/droid/integrity.py index 70ae369..9d8d50b 100644 --- a/src/droid/integrity.py +++ b/src/droid/integrity.py @@ -86,7 +86,7 @@ def integrity_rule_splunk(rule_converted, rule_content, platform: SplunkPlatform def integrity_rule_sentinel_mssp(rule_converted, rule_content, platform: SentinelPlatform, rule_file, parameters, logger, error): try: - export_list = platform.get_integrity_export_mssp() + export_list = platform.get_export_list_mssp() except Exception as e: logger.error(f"Couldn't get the export list for the designated customers - error {e}") return error @@ -184,10 +184,42 @@ def integrity_rule_sentinel( return error -def integrity_rule_ms_xdr(rule_converted, rule_content, platform: MicrosoftXDRPlatform, rule_file, parameters, logger, error): +def integrity_rule_ms_xdr_mssp(rule_converted, rule_content, platform: MicrosoftXDRPlatform, rule_file, parameters, logger, error): try: - saved_search: dict = platform.get_rule(rule_content["id"]) + export_list = platform.get_export_list_mssp() + except Exception as e: + logger.error(f"Couldn't get the export list for the designated customers - error {e}") + return error + + logger.info("Integrity check for designated customers") + + error_occured = False + + for group, info in export_list.items(): + + tenant_id = info['tenant_id'] + + logger.debug(f"Processing rule on tenant {tenant_id} from group id {group}") + try: + saved_search: dict = platform.get_rule(rule_content["id"], tenant_id) + except Exception as e: + logger.error(f"Couldn't check the integrity for the rule {rule_file} on tenant {tenant_id} from {group} - error {e}") + return error + + error = integrity_rule_ms_xdr(rule_converted, rule_content, platform, rule_file, parameters, logger, error, saved_search=saved_search) + + if error: + error_occured = True + + if error_occured: + return error + +def integrity_rule_ms_xdr(rule_converted, rule_content, platform: MicrosoftXDRPlatform, rule_file, parameters, logger, error, saved_search=None): + + try: + if not saved_search: + saved_search: dict = platform.get_rule(rule_content["id"]) except Exception as e: logger.error(f"Couldn't check the integrity for the rule {rule_file} - error {e}") return error @@ -314,15 +346,24 @@ def integrity_rule(parameters, rule_converted, rule_content, platform, rule_file elif parameters.platform in ["esql", "eql"]: error = integrity_rule_elastic(rule_converted, rule_content, platform, rule_file, parameters, logger, error) return error - elif parameters.platform == "microsoft_xdr": - error = integrity_rule_ms_xdr(rule_converted, rule_content, platform, rule_file, parameters, logger, error) - return error elif "microsoft_sentinel" in parameters.platform and parameters.mssp: error = integrity_rule_sentinel_mssp(rule_converted, rule_content, platform, rule_file, parameters, logger, error) return error elif "microsoft_sentinel" in parameters.platform: error = integrity_rule_sentinel(rule_converted, rule_content, platform, rule_file, parameters, logger, error) return error + elif "microsoft_xdr" in parameters.platform and parameters.sentinel_xdr and parameters.mssp: + error = integrity_rule_sentinel_mssp(rule_converted, rule_content, platform, rule_file, parameters, logger, error) + return error + elif "microsoft_xdr" in parameters.platform and parameters.sentinel_xdr: + error = integrity_rule_sentinel(rule_converted, rule_content, platform, rule_file, parameters, logger, error) + return error + elif parameters.platform == "microsoft_xdr" and parameters.mssp: + error = integrity_rule_ms_xdr_mssp(rule_converted, rule_content, platform, rule_file, parameters, logger, error) + return error + elif parameters.platform == "microsoft_xdr": + error = integrity_rule_ms_xdr(rule_converted, rule_content, platform, rule_file, parameters, logger, error) + return error def integrity_rule_raw(parameters: dict, export_config: dict, logger_param: dict, raw_rule=False): @@ -332,14 +373,20 @@ def integrity_rule_raw(parameters: dict, export_config: dict, logger_param: dict if parameters.platform == "splunk": platform = SplunkPlatform(export_config, logger_param) + elif parameters.platform == "esql" or parameters.platform == "eql": + platform = ElasticPlatform(export_config, logger_param, parameters.platform, raw=True) elif parameters.platform == "microsoft_sentinel" and parameters.mssp: platform = SentinelPlatform(export_config, logger_param, export_mssp=True) elif parameters.platform == "microsoft_sentinel": platform = SentinelPlatform(export_config, logger_param, export_mssp=False) + elif "microsoft_xdr" in parameters.platform and parameters.sentinel_xdr and parameters.mssp: + platform = SentinelPlatform(export_config, logger_param, export_mssp=True) + elif "microsoft_xdr" in parameters.platform and parameters.sentinel_xdr: + platform = SentinelPlatform(export_config, logger_param, export_mssp=False) + elif parameters.platform == "microsoft_xdr" and parameters.mssp: + platform = MicrosoftXDRPlatform(export_config, logger_param, export_mssp=True) elif parameters.platform == "microsoft_xdr": - platform = MicrosoftXDRPlatform(export_config, logger_param) - elif parameters.platform == "esql" or parameters.platform == "eql": - platform = ElasticPlatform(export_config, logger_param, parameters.platform, raw=True) + platform = MicrosoftXDRPlatform(export_config, logger_param, export_mssp=False) if path.is_dir(): error_i = False diff --git a/src/droid/platforms/ms_xdr.py b/src/droid/platforms/ms_xdr.py index 7cec198..559daa2 100644 --- a/src/droid/platforms/ms_xdr.py +++ b/src/droid/platforms/ms_xdr.py @@ -15,13 +15,14 @@ class MicrosoftXDRPlatform(AbstractPlatform): - def __init__(self, parameters: dict, logger_param: dict) -> None: + def __init__(self, parameters: dict, logger_param: dict, export_mssp: bool=False) -> None: super().__init__(name="Microsoft XDR") self.logger = ColorLogger(__name__, **logger_param) self._parameters = parameters + self._export_mssp = export_mssp if "query_period" not in self._parameters: raise Exception( @@ -65,7 +66,9 @@ def __init__(self, parameters: dict, logger_param: dict) -> None: ) self._api_base_url = "https://graph.microsoft.com/beta" + self._token = self.acquire_token() + self._headers = { "Authorization": f"Bearer {self._token}", "Content-Type": "application/json", @@ -75,35 +78,30 @@ def __init__(self, parameters: dict, logger_param: dict) -> None: else: self._alert_prefix = None - def mssp_run_xdr_search( - self, client, rule_converted, start_time, current_time, customer_info - ): - # TODO: Provide a list of tenant ids and process - return None - - customer = customer_info["customer"] - workspace_id = customer_info["workspace_id"] - - results = client.query_resource( - workspace_id, - rule_converted, - timespan=(start_time, current_time), - server_timeout=self._timeout, - ) + if 'export_list_mssp' in self._parameters: + self._export_list_mssp = self._parameters["export_list_mssp"] - result = 0 + def get_export_list_mssp(self) -> list: - for table in results.tables: - result += len(table.rows) - - return customer, result + if self._export_list_mssp: + self.logger.info("Integrity check for designated customers") + return self._export_list_mssp + else: + self.logger.error("No export_list_mssp found") + raise - def run_xdr_search(self, rule_converted, rule_file): + def run_xdr_search(self, rule_converted, rule_file, tenant_id=None): payload = {"Query": rule_converted, "Timespan": "P1D"} try: + if tenant_id: + self.logger.info(f"Searching for rule {rule_file} on tenant {tenant_id}") + else: + self.logger.info(f"Searching for rule {rule_file} on tenant {self._tenant_id}") + results, status_code = self._post( - url="/security/runHuntingQuery", payload=payload + url="/security/runHuntingQuery", payload=payload, tenant_id=tenant_id ) + if "error" in results: self.logger.error( f"Error while running the query {results['error']['message']}" @@ -115,14 +113,13 @@ def run_xdr_search(self, rule_converted, rule_file): self.logger.error(f"Error while running the query {e}") raise - def get_rule(self, rule_id): + def get_rule(self, rule_id, tenant_id=None): """Retrieve a scheduled alert rule in Microsoft XDR - Remove a scheduled alert rule in Microsoft XDR """ try: params = {"$filter": f"contains(displayName, '{rule_id}')"} rule, status_code = self._get( - url="/security/rules/detectionRules", params=params + url="/security/rules/detectionRules", params=params, tenant_id=tenant_id ) if len(rule["value"]) > 0: return rule["value"][0] @@ -172,10 +169,13 @@ def remove_rule(self, rule_content, rule_converted, rule_file): }, ) - def acquire_token(self): + def acquire_token(self, tenant_id=None): # MSAL configuration scope = ["https://graph.microsoft.com/.default"] + if not tenant_id: + tenant_id = self._tenant_id + if self._parameters["search_auth"] == "default": self.logger.debug("Default credential selected") @@ -185,7 +185,7 @@ def acquire_token(self): return token else: - authority = f"https://login.microsoftonline.com/{self._tenant_id}" + authority = f"https://login.microsoftonline.com/{tenant_id}" # Create a confidential client application app = ConfidentialClientApplication( self._client_id, @@ -318,26 +318,57 @@ def create_rule(self, rule_content, rule_converted, rule_file): alert_rule["detectionAction"]["alertTemplate"][ "impactedAssets" ] = impactedAssets - try: - self.push_detection_rule( - alert_rule=alert_rule, - rule_content=rule_content, - rule_file=rule_file, - rule_converted=rule_converted, - ) - # Send the JSON payload to Microsoft Graph Security API - except Exception as e: - self.logger.error( - f"Could not export the rule {rule_file}", - extra={ - "rule_file": rule_file, - "rule_converted": rule_converted, - "rule_content": rule_content, - "error": e, - }, - ) - raise + if self._export_mssp: + if self._export_list_mssp: + self.logger.info("Exporting to designated customers") + for group, info in self._export_list_mssp.items(): + + tenant_id = info['tenant_id'] + self.logger.debug(f"Exporting to tenant {tenant_id} from group id {group}") + + try: + self.push_detection_rule( + alert_rule=alert_rule, + rule_content=rule_content, + rule_file=rule_file, + rule_converted=rule_converted, + tenant_id=tenant_id + ) + except Exception as e: + self.logger.error( + f"Could not export the rule {rule_file} to tenant {tenant_id}", + extra={ + "rule_file": rule_file, + "rule_converted": rule_converted, + "rule_content": rule_content, + "error": e, + }, + ) + raise + else: + self.logger.error("Export list not found. Please provide the list of designated customers") + raise + else: + try: + self.push_detection_rule( + alert_rule=alert_rule, + rule_content=rule_content, + rule_file=rule_file, + rule_converted=rule_converted, + ) + + except Exception as e: + self.logger.error( + f"Could not export the rule {rule_file}", + extra={ + "rule_file": rule_file, + "rule_converted": rule_converted, + "rule_content": rule_content, + "error": e, + }, + ) + raise def check_rule_changes(self, existing_rule, new_rule): try: @@ -390,7 +421,8 @@ def check_rule_changes(self, existing_rule, new_rule): return False def push_detection_rule( - self, alert_rule=None, rule_content=None, rule_file=None, rule_converted=None + self, alert_rule=None, rule_content=None, rule_file=None, + rule_converted=None, tenant_id=None ): existing_rule = self.get_rule(rule_content["id"]) if existing_rule: @@ -399,10 +431,10 @@ def push_detection_rule( return True else: api_url = f"/security/rules/detectionRules/{existing_rule['id']}" - response, status_code = self._patch(url=api_url, payload=alert_rule) + response, status_code = self._patch(url=api_url, payload=alert_rule, tenant_id=tenant_id) else: api_url = "/security/rules/detectionRules" - response, status_code = self._post(url=api_url, payload=alert_rule) + response, status_code = self._post(url=api_url, payload=alert_rule, tenant_id=tenant_id) if status_code == 400: self.logger.error( @@ -607,57 +639,97 @@ def parse_impactedAssets(self, impactedAssets, rule_file=None): raise return impactedAssetsList - def _get(self, url=None, headers=None, params=None): - # Send the JSON payload to Microsoft Graph Security API + def _get(self, url=None, headers=None, params=None, tenant_id=None, timeout=120): + # Send the GET request to Microsoft Graph Security API api_url = self._api_base_url + url + + if not tenant_id: + token = self._token + else: + token = self.acquire_token(tenant_id=tenant_id) + headers = { - "Authorization": f"Bearer {self._token}", + "Authorization": f"Bearer {token}", "Content-Type": "application/json", } + if headers: headers.update(headers) + while True: - response = requests.get(api_url, headers=headers, params=params) - if response.status_code == 429: - self.logger.debug("Rate limit reached, waiting 60 seconds") - time.sleep(60) - else: - break + try: + response = requests.get(api_url, headers=headers, params=params, timeout=timeout) + if response.status_code == 429: + self.logger.debug("Rate limit reached, waiting 60 seconds") + time.sleep(60) + else: + break + except requests.exceptions.Timeout: + self.logger.error(f"GET request timed out after {timeout} seconds") + return None, 408 return response.json(), response.status_code - def _post(self, url=None, payload=None, headers=None, params=None): + def _post(self, url=None, payload=None, headers=None, params=None, tenant_id=None, timeout=120): # Send the JSON payload to Microsoft Graph Security API api_url = self._api_base_url + url + + if not tenant_id: + token = self._token + else: + token = self.acquire_token(tenant_id=tenant_id) + headers = { - "Authorization": f"Bearer {self._token}", + "Authorization": f"Bearer {token}", "Content-Type": "application/json", } + if headers: headers.update(headers) + while True: - response = requests.post(api_url, headers=headers, json=payload) - if response.status_code == 429: - self.logger.debug("Rate limit reached, waiting 60 seconds") - time.sleep(60) - else: - break + try: + response = requests.post(api_url, headers=headers, json=payload, timeout=timeout) + if response.status_code == 429: + self.logger.debug("Rate limit reached, waiting 60 seconds") + time.sleep(60) + else: + break + except requests.exceptions.Timeout: + self.logger.error(f"Request timed out after {timeout} seconds") + return None, 408 + return response.json(), response.status_code - def _patch(self, url=None, payload=None, headers=None, params=None): + def _patch(self, url=None, payload=None, headers=None, params=None, tenant_id=None, timeout=120): # Send the JSON payload to Microsoft Graph Security API api_url = self._api_base_url + url + + # Use tenant_id to acquire token, if provided + if not tenant_id: + token = self._token + else: + token = self.acquire_token(tenant_id=tenant_id) + headers = { - "Authorization": f"Bearer {self._token}", + "Authorization": f"Bearer {token}", "Content-Type": "application/json", } + if headers: headers.update(headers) + while True: - response = requests.patch(api_url, headers=headers, json=payload) - if response.status_code == 429: - self.logger.debug("Rate limit reached, waiting 60 seconds") - time.sleep(60) - else: - break + try: + response = requests.patch(api_url, headers=headers, json=payload, timeout=timeout) + if response.status_code == 429: + self.logger.debug("Rate limit reached, waiting 60 seconds") + time.sleep(60) + else: + break + except requests.exceptions.Timeout: + self.logger.error(f"PATCH request timed out after {timeout} seconds") + return None, 408 # Return a timeout error status code + return response.json(), response.status_code + diff --git a/src/droid/platforms/sentinel.py b/src/droid/platforms/sentinel.py index aab9988..a746007 100644 --- a/src/droid/platforms/sentinel.py +++ b/src/droid/platforms/sentinel.py @@ -199,7 +199,7 @@ def get_workspaces(self, credential, export_mode=False): return workspace_list - def get_integrity_export_mssp(self) -> list: + def get_export_list_mssp(self) -> list: if self._export_list_mssp: self.logger.info("Integrity check for designated customers") @@ -207,8 +207,6 @@ def get_integrity_export_mssp(self) -> list: else: self.logger.error("No export_list_mssp found") raise - # TODO: Integrity check to all customers - def mssp_run_sentinel_search(self, client, @@ -505,8 +503,8 @@ def create_rule(self, rule_content, rule_converted, rule_file): self.logger.error(f"Failed to export the rule {rule_file} to {workspace_name} - error: {e}") raise else: - client_workspaces = self.get_workspaces(credential, export_mode=True) - # TODO: Export to all customers + self.logger.error("Export list not found. Please provide the list of designated customers") + raise else: credential = self.get_credentials() client = SecurityInsights(credential, self._subscription_id) diff --git a/src/droid/search.py b/src/droid/search.py index 287a818..ed8f36e 100644 --- a/src/droid/search.py +++ b/src/droid/search.py @@ -61,6 +61,40 @@ def search_rule_sentinel(rule_converted, platform: SentinelPlatform, rule_file, error = True return error, search_warning +def search_rule_ms_xdr_mssp(rule_converted, platform: MicrosoftXDRPlatform, rule_file, parameters, logger, error, search_warning): + + try: + export_list = platform.get_export_list_mssp() + except Exception as e: + logger.error(f"Couldn't get the export list for the designated customers - error {e}") + return error + + logger.info("Searching for designated customers") + + for group, info in export_list.items(): + + tenant_id = info['tenant_id'] + + logger.debug(f"Processing rule on {tenant_id} from group id {group}") + + try: + result: int = platform.run_xdr_search(rule_converted, rule_file, tenant_id=tenant_id) + + logger.info(f"Successfully searched the rule {rule_file}") + + if result > 0: # If the rule has match + logger.warning(f"{result} Matches found for {rule_file}") + search_warning = True + return error, search_warning + else: + logger.info(f"No hits for {rule_file}") + return error, search_warning + + except Exception as e: + logger.error(f"Couldn't search for the rule {rule_file} for tenant {tenant_id} - error {e}") + error = True + return error, search_warning + def search_rule_ms_xdr(rule_converted, platform: MicrosoftXDRPlatform, rule_file, parameters, logger, error, search_warning): try: @@ -128,8 +162,12 @@ def search_rule(parameters, rule_content, rule_converted, platform, rule_file, e error, search_warning = search_rule_sentinel(rule_converted, platform, rule_file, parameters, logger, error, search_warning, mssp_mode=False) return error, search_warning elif parameters.platform == "microsoft_xdr": - error, search_warning = search_rule_ms_xdr(rule_converted, platform, rule_file, parameters, logger, error, search_warning) - return error, search_warning + if parameters.mssp: + error, search_warning = search_rule_ms_xdr_mssp(rule_converted, platform, rule_file, parameters, logger, error, search_warning) + return error, search_warning + else: + error, search_warning = search_rule_ms_xdr(rule_converted, platform, rule_file, parameters, logger, error, search_warning) + return error, search_warning def search_rule_raw(parameters: dict, export_config: dict, logger_param: dict): @@ -142,14 +180,20 @@ def search_rule_raw(parameters: dict, export_config: dict, logger_param: dict): if parameters.platform == "splunk": platform = SplunkPlatform(export_config, logger_param) - elif parameters.platform == "microsoft_sentinel": - platform = SentinelPlatform(export_config, logger_param) - elif parameters.platform == "microsoft_sentinel" and parameters.sentinel_xdr: - platform = SentinelPlatform(export_config, logger_param) - elif parameters.platform == "microsoft_xdr": - platform = MicrosoftXDRPlatform(export_config, logger_param) elif parameters.platform == "esql" or parameters.platform == "eql": platform = ElasticPlatform(export_config, logger_param, parameters.platform, raw=True) + elif parameters.platform == "microsoft_sentinel" and parameters.mssp: + platform = SentinelPlatform(export_config, logger_param, export_mssp=True) + elif parameters.platform == "microsoft_sentinel": + platform = SentinelPlatform(export_config, logger_param, export_mssp=False) + elif "microsoft_xdr" in parameters.platform and parameters.sentinel_xdr and parameters.mssp: + platform = SentinelPlatform(export_config, logger_param, export_mssp=True) + elif "microsoft_xdr" in parameters.platform and parameters.sentinel_xdr: + platform = SentinelPlatform(export_config, logger_param, export_mssp=False) + elif parameters.platform == "microsoft_xdr" and parameters.mssp: + platform = MicrosoftXDRPlatform(export_config, logger_param, export_mssp=True) + elif parameters.platform == "microsoft_xdr": + platform = MicrosoftXDRPlatform(export_config, logger_param, export_mssp=False) if path.is_dir(): error_i = False