Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic tests for mdatp #3511

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions microsoft/testsuites/mdatp/mdatp_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Copyright (c) Microsoft Corporation.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please put them under the vm_extensions folder, so we can manage vm extension cases easier. You can have a subfolder there.

# Licensed under the MIT license.
import time
from typing import Any

from assertpy import assert_that

from lisa import (
Logger,
Node,
TestCaseMetadata,
TestSuite,
TestSuiteMetadata,
simple_requirement,
)
from lisa.operating_system import BSD
from lisa.testsuite import TestResult
from lisa.tools import MDE, Curl
from lisa.util import LisaException, SkippedException

from microsoft.testsuites.mdatp.mdatp_tools import Mdatp


@TestSuiteMetadata(
area="vm_extension",
category="functional",
description="""
Verify MDE installation
Microsoft Defender for Endpoint(MDE) for Linux includes
antimalware and endpoint detection and response (EDR) capabilities.

This test suites validates if MDE can be installed, onboarded
and detect an EICAR file.

The test requires the onboarding script to be kept in Azure Storage Account
and provide the SAS url for downloading under the
secret variable `onboarding_script_sas_uri`.

The suite runs the following tests:
1. Installation test
2. Onboarding test
3. Health test
4. EICAR detection test
""",
)
class MDETest(TestSuite):
def before_case(self, log: Logger, **kwargs: Any) -> None:
variables = kwargs["variables"]
self.onboarding_script_sas_uri = variables.get("onboarding_script_sas_uri", "")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can it support non-sas uri? The extra requirement makes the test case skipped in most pipelines.

if not self.onboarding_script_sas_uri:
raise SkippedException("Onboarding script SAS URI is not provided.")

@TestCaseMetadata(
description="""
Verify MDE installation, onboarding, health and EICAR detection.
""",
priority=1,
requirement=simple_requirement(
min_core_count=2, min_memory_mb=1024, unsupported_os=[BSD]
),
)
def verify_mde(self, node: Node, log: Logger, result: TestResult) -> None:
# Invoking tools first time, intalls the tool.
try:
output = node.tools[Mdatp]._check_exists()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Don't call the private method like _check_exists, call exists.
  2. But when you refer a tool like this, the check/install happens automatically. So, the code can be like _ = node.tools[Mdatp], and capture the exception if there is.

except LisaException as e:
log.error(e)
output = False

assert_that(output).described_as("Unable to install MDE").is_equal_to(True)

self.verify_onboard(node, log, result)

self.verify_health(node, log, result)

self.verify_eicar_detection(node, log, result)

def verify_onboard(self, node: Node, log: Logger, result: TestResult) -> None:
onboarding_result = node.tools[Mdatp].onboard(self.onboarding_script_sas_uri)

assert_that(onboarding_result).described_as(
"Unable to onboard MDE"
).is_equal_to(True)

output = node.tools[Mdatp].get_result("health --field licensed")

assert_that(output).described_as("MDE is not licensed").is_equal_to(["true"])

def verify_health(self, node: Node, log: Logger, result: TestResult) -> None:
output = node.tools[Mdatp].get_result("health", json_out=True)

log.info(output)

assert_that(output["healthy"]).described_as("MDE is not healthy").is_equal_to(
True
)

def verify_eicar_detection(
self, node: Node, log: Logger, result: TestResult
) -> None:
log.info("Running EICAR test")

output = node.tools[Mdatp].get_result(
"health --field real_time_protection_enabled"
)
if output == ["false"]:
output = node.tools[Mdatp].get_result(
"config real-time-protection --value enabled", sudo=True
)
assert_that(" ".join(output)).described_as(
"Unable to enable RTP for MDE"
).is_equal_to("Configuration property updated.")

current_threat_list = node.tools[Mdatp].get_result("threat list")
log.info(current_threat_list)

node.tools[Curl].fetch(
arg="-o /tmp/eicar.com.txt",
execute_arg="",
url="https://secure.eicar.org/eicar.com.txt",
)

time.sleep(5) # Wait for remediation

new_threat_list = node.tools[Mdatp].get_result("threat list")
log.info(new_threat_list)

eicar_detect = " ".join(new_threat_list).replace(
" ".join(current_threat_list), ""
)

log.info(eicar_detect)
assert_that("Name: Virus:DOS/EICAR_Test_File" in eicar_detect).described_as(
"MDE is not able to detect EICAR file"
).is_equal_to(True)
93 changes: 93 additions & 0 deletions microsoft/testsuites/mdatp/mdatp_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import json
from typing import Any

from lisa.tools import Chmod
from lisa.base_tools import Wget
from lisa.executable import Tool

class Mdatp(Tool):
@property
def command(self) -> str:
return "mdatp"

@property
def can_install(self) -> bool:
return True

def get_mde_installer(self) -> bool:
if not hasattr(self, "mde_installer"):
wget = self.node.tools[Wget]

download_path = wget.get(
url="https://raw.githubusercontent.com/microsoft/mdatp-xplat/"
"master/linux/installation/mde_installer.sh",
filename="mde_installer.sh",
)
self.mde_installer = download_path
self.node.tools[Chmod].update_folder(self.mde_installer, "777", sudo=True)
return True

def _install(self) -> bool:
if not self.get_mde_installer():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks the get_mde_installer return true always. The code logic won't be hit.

self._log.error(
"Unable to download mde_installer.sh script. MDE can't be installed"
)

self._log.info("Installing MDE")
result1 = self.node.execute(
f"{self.mde_installer} --install", shell=True, sudo=True
)
self._log.info(result1)

return self._check_exists()

def onboard(self, onboarding_script_sas_uri: str) -> bool:
if not self._check_exists():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't need to check, when you have an instance of the tool, it means the check exists already passed.

self._log.error("MDE is not installed, onboarding not possible")
return False

wget = self.node.tools[Wget]

download_path = wget.get(
url=onboarding_script_sas_uri,
filename="MicrosoftDefenderATPOnboardingLinuxServer.py",
)

if not self.get_mde_installer():
self._log.error(
"Unable to download mde_installer.sh script. MDE can't be onboarded"
)

self._log.info("Onboarding MDE")
result1 = self.node.execute(
f"{self.mde_installer} --onboard {download_path}", shell=True, sudo=True
)
self._log.info(result1)

output = self.get_result("health --field licensed")

self._log.info(output)

return bool(output == ["true"])

def get_result(
self,
arg: str,
json_out: bool = False,
sudo: bool = False,
) -> Any:
if json_out:
arg += " --output json"
result = self.run(
arg,
sudo=sudo,
shell=True,
force_run=True,
)

result.assert_exit_code(include_output=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add error message for assertion, so the test case can bring more information, when it's failed.

if json_out:
return json.loads(result.stdout)
return result.stdout.split()
Loading