Skip to content

Commit

Permalink
Changes for handling multiple system configurations #2286 (#4612)
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz authored Apr 3, 2023
1 parent 6ab89e9 commit d571579
Show file tree
Hide file tree
Showing 28 changed files with 242 additions and 298 deletions.
11 changes: 6 additions & 5 deletions plaso/analysis/mediator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,21 @@ class AnalysisMediator(object):
number_of_produced_event_tags (int): number of produced event tags.
"""

def __init__(self, data_location=None):
def __init__(self, data_location=None, user_accounts=None):
"""Initializes an analysis plugin mediator.
Args:
data_location (Optional[str]): location of data files used during
analysis.
user_accounts (Optional[list[UserAccountArtifact]]): user accounts.
"""
super(AnalysisMediator, self).__init__()
self._abort = False
self._data_location = data_location
self._event_filter_expression = None
self._number_of_warnings = 0
self._storage_writer = None
self._user_accounts = user_accounts
self._username_by_user_directory = {}

self.analysis_reports_counter = collections.Counter()
Expand Down Expand Up @@ -85,9 +87,8 @@ def GetUsernameForPath(self, path):
path = path.lower()

username = self._username_by_user_directory.get(path, None)
if not username and self._storage_writer:
for user_account in self._storage_writer.GetAttributeContainers(
'user_account'):
if not username and self._user_accounts:
for user_account in self._user_accounts:
if user_account.user_directory:
user_directory = user_account.user_directory.lower()
if path.startswith(user_directory):
Expand Down Expand Up @@ -139,7 +140,7 @@ def ProduceAnalysisWarning(self, message, plugin_name):
"""
if self._storage_writer:
warning = warnings.AnalysisWarning(
message=message, plugin_name=plugin_name)
message=message, plugin_name=plugin_name)
self._storage_writer.AddAttributeContainer(warning)

self._number_of_warnings += 1
Expand Down
4 changes: 1 addition & 3 deletions plaso/cli/analysis_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from plaso.cli import tools
from plaso.cli import views
from plaso.containers import reports
from plaso.engine import knowledge_base
from plaso.multi_process import analysis_engine as multi_analysis_engine
from plaso.storage import factory as storage_factory

Expand Down Expand Up @@ -46,7 +45,6 @@ def __init__(self, input_reader=None, output_writer=None):
self._command_line_arguments = None
self._event_filter_expression = None
self._event_filter = None
self._knowledge_base = knowledge_base.KnowledgeBase()
self._number_of_stored_analysis_reports = 0
self._status_view_interval = 0.5
self._storage_file_path = None
Expand Down Expand Up @@ -94,7 +92,7 @@ def _AnalyzeEvents(self, session, configuration, status_update_callback=None):

try:
processing_status = analysis_engine.AnalyzeEvents(
session, self._knowledge_base, storage_writer, self._data_location,
session, storage_writer, self._data_location,
self._analysis_plugins, configuration,
event_filter=self._event_filter,
event_filter_expression=self._event_filter_expression,
Expand Down
18 changes: 1 addition & 17 deletions plaso/cli/psort_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,12 @@ def _CreateOutputAndFormattingProcessingConfiguration(self):
Returns:
ProcessingConfiguration: output and formatting processing configuration.
"""
if self._preferred_language:
preferred_language = self._preferred_language
else:
# TODO: remove after refactor
preferred_language = self._knowledge_base.language

configuration = configurations.ProcessingConfiguration()
configuration.data_location = self._data_location
configuration.debug_output = self._debug_mode
configuration.dynamic_time = self._output_dynamic_time
configuration.log_filename = self._log_file
configuration.preferred_language = preferred_language
configuration.preferred_language = self._preferred_language
configuration.preferred_time_zone = self._output_time_zone
configuration.profiling.directory = self._profiling_directory
configuration.profiling.profilers = self._profilers
Expand Down Expand Up @@ -482,19 +476,9 @@ def ProcessStorage(self):
raise RuntimeError('Unable to create storage reader.')

try:
# TODO: remove after refactor
for session_index, session in enumerate(storage_reader.GetSessions()):
self._knowledge_base.SetActiveSession(session.identifier)

system_configuration = storage_reader.GetAttributeContainerByIndex(
'system_configuration', session_index)
self._knowledge_base.ReadSystemConfigurationArtifact(
system_configuration)

self._number_of_stored_analysis_reports = (
storage_reader.GetNumberOfAttributeContainers(
self._CONTAINER_TYPE_ANALYSIS_REPORT))

finally:
storage_reader.Close()

Expand Down
9 changes: 1 addition & 8 deletions plaso/cli/psteal_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from plaso.containers import reports
from plaso.engine import configurations
from plaso.engine import engine
from plaso.engine import knowledge_base
from plaso.lib import errors
from plaso.lib import loggers
from plaso.multi_process import output_engine as multi_output_engine
Expand Down Expand Up @@ -87,7 +86,6 @@ def __init__(self, input_reader=None, output_writer=None):
input_reader=input_reader, output_writer=output_writer)
self._artifacts_registry = None
self._deduplicate_events = True
self._knowledge_base = knowledge_base.KnowledgeBase()
self._number_of_analysis_reports = 0
self._output_format = None
self._parsers_manager = parsers_manager.ParsersManager
Expand All @@ -107,17 +105,12 @@ def _CreateOutputAndFormattingProcessingConfiguration(self):
Returns:
ProcessingConfiguration: output and formatting processing configuration.
"""
if self._preferred_language:
preferred_language = self._preferred_language
else:
preferred_language = self._knowledge_base.language

configuration = configurations.ProcessingConfiguration()
configuration.data_location = self._data_location
configuration.debug_output = self._debug_mode
configuration.dynamic_time = self._output_dynamic_time
configuration.log_filename = self._log_file
configuration.preferred_language = preferred_language
configuration.preferred_language = self._preferred_language
configuration.preferred_time_zone = self._output_time_zone
configuration.profiling.directory = self._profiling_directory
configuration.profiling.profilers = self._profilers
Expand Down
23 changes: 19 additions & 4 deletions plaso/engine/timeliner.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,27 @@ class EventDataTimeliner(object):

_TIMELINER_CONFIGURATION_FILENAME = 'timeliner.yaml'

def __init__(self, data_location=None, preferred_year=None):
def __init__(
self, data_location=None, preferred_year=None,
system_configurations=None):
"""Initializes an event data timeliner.
Args:
data_location (Optional[str]): path of the timeliner configuration file.
preferred_year (Optional[int]): preferred initial year value for year-less
date and time values.
system_configurations (Optional[list[SystemConfigurationArtifact]]):
system configurations.
"""
super(EventDataTimeliner, self).__init__()
self._attribute_mappings = {}
self._base_years = {}
self._current_year = self._GetCurrentYear()
self._data_location = data_location
self._place_holder_event = set()
self._preferred_time_zone = None
self._preferred_year = preferred_year
self._time_zone = None
self._system_configurations = system_configurations

self.number_of_produced_events = 0
self.parsers_counter = collections.Counter()
Expand Down Expand Up @@ -172,7 +177,7 @@ def _GetEvent(
self._ProduceTimeliningWarning(storage_writer, event_data, message)

if not time_zone:
time_zone = self._time_zone
time_zone = self._GetTimeZone()

if not time_zone:
message = 'date and time is in local time and no time zone is defined'
Expand Down Expand Up @@ -205,6 +210,16 @@ def _GetEvent(

return event

def _GetTimeZone(self):
"""Retrieves the time zone related to the event data.
Returns:
datetime.tzinfo: time zone.
"""
# TODO: determine time zone from system_configurations.

return self._preferred_time_zone or self._DEFAULT_TIME_ZONE

def _ProduceTimeliningWarning(self, storage_writer, event_data, message):
"""Produces a timelining warning.
Expand Down Expand Up @@ -332,4 +347,4 @@ def SetPreferredTimeZone(self, time_zone_string):
raise ValueError('Unsupported time zone: {0!s}'.format(
time_zone_string))

self._time_zone = time_zone or self._DEFAULT_TIME_ZONE
self._preferred_time_zone = time_zone
18 changes: 9 additions & 9 deletions plaso/multi_process/analysis_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ def __init__(self, worker_memory_limit=None, worker_timeout=None):
self._event_labels_counter = None
self._event_queues = {}
self._events_status = processing_status.EventsStatus()
self._knowledge_base = None
self._memory_profiler = None
self._merge_task = None
self._number_of_consumed_analysis_reports = 0
Expand All @@ -77,6 +76,7 @@ def __init__(self, worker_memory_limit=None, worker_timeout=None):
self._session = None
self._status = definitions.STATUS_INDICATOR_IDLE
self._status_update_callback = None
self._user_accounts = None
self._worker_memory_limit = worker_memory_limit
self._worker_timeout = worker_timeout or definitions.DEFAULT_WORKER_TIMEOUT

Expand Down Expand Up @@ -376,8 +376,8 @@ def _StartWorkerProcess(self, process_name):
timeout_seconds=self._QUEUE_TIMEOUT)

process = analysis_process.AnalysisProcess(
input_event_queue, self._knowledge_base, self._session, analysis_plugin,
self._processing_configuration, data_location=self._data_location,
input_event_queue, analysis_plugin, self._processing_configuration,
self._user_accounts, data_location=self._data_location,
event_filter_expression=self._event_filter_expression,
name=process_name)

Expand Down Expand Up @@ -538,16 +538,14 @@ def _UpdateStatus(self):

# pylint: disable=too-many-arguments
def AnalyzeEvents(
self, session, knowledge_base_object, storage_writer, data_location,
analysis_plugins, processing_configuration, event_filter=None,
self, session, storage_writer, data_location, analysis_plugins,
processing_configuration, event_filter=None,
event_filter_expression=None, status_update_callback=None,
storage_file_path=None):
"""Analyzes events in a Plaso storage.
Args:
session (Session): session in which the events are analyzed.
knowledge_base_object (KnowledgeBase): contains information from
the source data needed for processing.
storage_writer (StorageWriter): storage writer.
data_location (str): path to the location that data files should
be loaded from.
Expand Down Expand Up @@ -579,12 +577,14 @@ def AnalyzeEvents(
self._data_location = data_location
self._event_filter_expression = event_filter_expression
self._events_status = processing_status.EventsStatus()
self._knowledge_base = knowledge_base_object
self._processing_configuration = processing_configuration
self._session = session
self._status_update_callback = status_update_callback
self._storage_file_path = storage_file_path

self._user_accounts = list(
storage_writer.GetAttributeContainers('user_account'))

stored_event_labels_counter = {}
if storage_writer.HasAttributeContainers('event_label_count'):
stored_event_labels_counter = {
Expand Down Expand Up @@ -696,11 +696,11 @@ def AnalyzeEvents(
self._analysis_plugins = {}
self._data_location = None
self._event_filter_expression = None
self._knowledge_base = None
self._processing_configuration = None
self._session = None
self._status_update_callback = None
self._storage_file_path = None
self._user_accounts = None

if keyboard_interrupt:
raise KeyboardInterrupt
Expand Down
17 changes: 8 additions & 9 deletions plaso/multi_process/analysis_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,20 @@ class AnalysisProcess(task_process.MultiProcessTaskProcess):
_FOREMAN_STATUS_WAIT = 5 * 60

def __init__(
self, event_queue, knowledge_base, session, analysis_plugin,
processing_configuration, data_location=None,
event_filter_expression=None, **kwargs):
self, event_queue, analysis_plugin, processing_configuration,
user_accounts, data_location=None, event_filter_expression=None,
**kwargs):
"""Initializes an analysis worker process.
Non-specified keyword arguments (kwargs) are directly passed to
multiprocessing.Process.
Args:
event_queue (plaso_queue.Queue): event queue.
knowledge_base (KnowledgeBase): contains information from the source
data needed for analysis.
session (Session): session.
analysis_plugin (AnalysisPlugin): plugin running in the process.
processing_configuration (ProcessingConfiguration): processing
configuration.
user_accounts (list[UserAccountArtifact]): user accounts.
data_location (Optional[str]): path to the location that data files
should be loaded from.
event_filter_expression (Optional[str]): event filter expression.
Expand All @@ -48,11 +46,10 @@ def __init__(
self._event_filter_expression = event_filter_expression
self._event_queue = event_queue
self._foreman_status_wait_event = None
self._knowledge_base = knowledge_base
self._number_of_consumed_events = 0
self._session = session
self._status = definitions.STATUS_INDICATOR_INITIALIZED
self._task = None
self._user_accounts = user_accounts

def _GetStatus(self):
"""Retrieves status information.
Expand Down Expand Up @@ -141,7 +138,9 @@ def _Main(self):
task_storage_writer.Open(path=storage_file_path)

self._analysis_mediator = analysis_mediator.AnalysisMediator(
data_location=self._data_location)
data_location=self._data_location, user_accounts=self._user_accounts)

# TODO: move into analysis process.
self._analysis_mediator.SetStorageWriter(task_storage_writer)

# TODO: set event_filter_expression in mediator.
Expand Down
17 changes: 10 additions & 7 deletions plaso/multi_process/extraction_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,9 +743,14 @@ def _StartWorkerProcess(self, process_name):
port=self._task_queue_port,
timeout_seconds=self._TASK_QUEUE_TIMEOUT_SECONDS)

# Ensure environment_variables is a list otherwise pickle will fail
# on Windows when creating a new process.
environment_variables = list(self.knowledge_base.GetEnvironmentVariables())

process = extraction_process.ExtractionWorkerProcess(
task_queue, self.collection_filters_helper, self.knowledge_base,
task_queue, self.collection_filters_helper,
self._processing_configuration, self._system_configurations,
environment_variables,
enable_sigsegv_handler=self._enable_sigsegv_handler, name=process_name)

# Remove all possible log handlers to prevent a child process from logging
Expand Down Expand Up @@ -958,14 +963,12 @@ def ProcessSource(

self._event_data_timeliner = timeliner.EventDataTimeliner(
data_location=processing_configuration.data_location,
preferred_year=processing_configuration.preferred_year)

preferred_time_zone = processing_configuration.preferred_time_zone
if not preferred_time_zone and self.knowledge_base:
preferred_time_zone = self.knowledge_base.timezone.zone
preferred_year=processing_configuration.preferred_year,
system_configurations=system_configurations)

try:
self._event_data_timeliner.SetPreferredTimeZone(preferred_time_zone)
self._event_data_timeliner.SetPreferredTimeZone(
processing_configuration.preferred_time_zone)
except ValueError as exception:
raise errors.BadConfigOption(exception)

Expand Down
Loading

0 comments on commit d571579

Please sign in to comment.