Skip to content

Commit

Permalink
Removed storage writer session write methods log2timeline#3880
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz committed Oct 24, 2021
1 parent 705dd89 commit eedc2ae
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 137 deletions.
21 changes: 14 additions & 7 deletions plaso/cli/extraction_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,15 @@ def _ProcessSources(self, session, storage_writer):
configuration = self._CreateProcessingConfiguration(
extraction_engine.knowledge_base)

# TODO: move the following session values into separage processing/session
# configuration container.
session.enabled_parser_names = (
configuration.parser_filter_expression.split(','))
session.parser_filter_expression = self._parser_filter_expression
session.preferred_language = self._preferred_language or 'en-US'
session.preferred_time_zone = self._preferred_time_zone
session.preferred_year = self._preferred_year
session.text_prepend = self._text_prepend

number_of_enabled_parsers = len(session.enabled_parser_names)

Expand Down Expand Up @@ -481,7 +487,8 @@ def _ProcessSources(self, session, storage_writer):
session.source_configurations = (
extraction_engine.knowledge_base.GetSourceConfigurationArtifacts())

storage_writer.WriteSessionConfiguration(session)
session_configuration = session.CreateSessionConfiguration()
storage_writer.AddAttributeContainer(session_configuration)

status_update_callback = (
self._status_view.GetExtractionStatusUpdateCallback())
Expand Down Expand Up @@ -627,10 +634,7 @@ def ExtractEventsFromSources(self):
command_line_arguments=self._command_line_arguments,
debug_mode=self._debug_mode,
filter_file_path=self._filter_file,
preferred_encoding=self.preferred_encoding,
preferred_time_zone=self._preferred_time_zone,
preferred_year=self._preferred_year,
text_prepend=self._text_prepend)
preferred_encoding=self.preferred_encoding)

storage_writer = storage_factory.StorageFactory.CreateStorageWriter(
self._storage_format)
Expand All @@ -647,14 +651,17 @@ def ExtractEventsFromSources(self):
processing_status = None

try:
storage_writer.WriteSessionStart(session)
session_start = session.CreateSessionStart()
storage_writer.AddAttributeContainer(session_start)

try:
processing_status = self._ProcessSources(session, storage_writer)

finally:
session.aborted = getattr(processing_status, 'aborted', True)
storage_writer.WriteSessionCompletion(session)

session_completion = session.CreateSessionCompletion()
storage_writer.AddAttributeContainer(session_completion)

except IOError as exception:
raise IOError('Unable to write to storage with error: {0!s}'.format(
Expand Down
1 change: 1 addition & 0 deletions plaso/cli/psort_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ def ProcessStorage(self):
session = engine.BaseEngine.CreateSession(
command_line_arguments=self._command_line_arguments,
preferred_encoding=self.preferred_encoding)
session.preferred_language = self._preferred_language or 'en-US'

storage_reader = storage_factory.StorageFactory.CreateStorageReaderForFile(
self._storage_file_path)
Expand Down
1 change: 1 addition & 0 deletions plaso/cli/psteal_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ def AnalyzeEvents(self):
session = engine.BaseEngine.CreateSession(
command_line_arguments=self._command_line_arguments,
preferred_encoding=self.preferred_encoding)
session.preferred_language = self._preferred_language or 'en-US'

storage_reader = storage_factory.StorageFactory.CreateStorageReaderForFile(
self._storage_file_path)
Expand Down
12 changes: 1 addition & 11 deletions plaso/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,7 @@ def _StopProfiling(self):
@classmethod
def CreateSession(
cls, artifact_filter_names=None, command_line_arguments=None,
debug_mode=False, filter_file_path=None, preferred_encoding='utf-8',
preferred_language='en-US', preferred_time_zone=None,
preferred_year=None, text_prepend=None):
debug_mode=False, filter_file_path=None, preferred_encoding='utf-8'):
"""Creates a session attribute container.
Args:
Expand All @@ -199,10 +197,6 @@ def CreateSession(
debug_mode (Option[bool]): True if debug mode was enabled.
filter_file_path (Optional[str]): path to a file with find specifications.
preferred_encoding (Optional[str]): preferred encoding.
preferred_language (Optional[str]): preferred language.
preferred_time_zone (Optional[str]): preferred time zone.
preferred_year (Optional[int]): preferred year.
text_prepend (Optional[str]): text to prepend to every display name.
Returns:
Session: session attribute container.
Expand All @@ -214,10 +208,6 @@ def CreateSession(
session.debug_mode = debug_mode
session.filter_file = filter_file_path
session.preferred_encoding = preferred_encoding
session.preferred_language = preferred_language
session.preferred_time_zone = preferred_time_zone
session.preferred_year = preferred_year
session.text_prepend = text_prepend

return session

Expand Down
10 changes: 7 additions & 3 deletions plaso/multi_process/analysis_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,10 +552,12 @@ def AnalyzeEvents(
self._StartStatusUpdateThread()

try:
storage_writer.WriteSessionStart(session)
session_start = session.CreateSessionStart()
storage_writer.AddAttributeContainer(session_start)

try:
storage_writer.WriteSessionConfiguration(self._session)
session_configuration = self._session.CreateSessionConfiguration()
storage_writer.AddAttributeContainer(session_configuration)

self._AnalyzeEvents(
storage_writer, analysis_plugins, event_filter=event_filter)
Expand All @@ -573,7 +575,9 @@ def AnalyzeEvents(
finally:
self._processing_status.aborted = self._abort
self._session.aborted = self._abort
storage_writer.WriteSessionCompletion(self._session)

session_completion = self._session.CreateSessionCompletion()
storage_writer.AddAttributeContainer(session_completion)

finally:
# Stop the status update thread after close of the storage writer
Expand Down
42 changes: 0 additions & 42 deletions plaso/storage/redis/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,45 +89,3 @@ def WritePreprocessingInformation(self, knowledge_base):
"""
raise IOError(
'Preprocessing information is not supported by the redis store.')

def WriteSessionCompletion(self, session):
"""Writes session completion information.
Args:
session (Session): session the storage changes are part of.
Raises:
IOError: always, as the Redis store does not support writing a session
completion.
OSError: always, as the Redis store does not support writing a session
completion.
"""
raise IOError('Session completion is not supported by the redis store.')

def WriteSessionConfiguration(self, session):
"""Writes session configuration information.
Args:
session (Session): session the storage changes are part of.
Raises:
IOError: always, as the Redis store does not support writing a session
configuration.
OSError: always, as the Redis store does not support writing a session
configuration.
"""
raise IOError('Session configuration is not supported by the redis store.')

def WriteSessionStart(self, session):
"""Writes session start information.
Args:
session (Session): session the storage changes are part of.
Raises:
IOError: always, as the Redis store does not support writing a session
start.
OSError: always, as the Redis store does not support writing a session
start.
"""
raise IOError('Session start is not supported by the redis store.')
60 changes: 0 additions & 60 deletions plaso/storage/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,63 +302,3 @@ def UpdateAttributeContainer(self, container):
self._RaiseIfNotWritable()

self._store.UpdateAttributeContainer(container)

def WriteSessionCompletion(self, session):
"""Writes session completion information.
Args:
session (Session): session the storage changes are part of.
Raises:
IOError: when the storage writer is closed or if the storage type is
not supported.
OSError: when the storage writer is closed or if the storage type is
not supported.
"""
self._RaiseIfNotWritable()

if self._storage_type != definitions.STORAGE_TYPE_SESSION:
raise IOError('Unsupported storage type.')

session_completion = session.CreateSessionCompletion()
self._store.AddAttributeContainer(session_completion)

def WriteSessionConfiguration(self, session):
"""Writes session configuration information.
Args:
session (Session): session the storage changes are part of.
Raises:
IOError: when the storage writer is closed or if the storage type is
not supported.
OSError: when the storage writer is closed or if the storage type is
not supported.
"""
self._RaiseIfNotWritable()

if self._storage_type != definitions.STORAGE_TYPE_SESSION:
raise IOError('Unsupported storage type.')

session_configuration = session.CreateSessionConfiguration()
self._store.AddAttributeContainer(session_configuration)

def WriteSessionStart(self, session):
"""Writes session start information.
Args:
session (Session): session the storage changes are part of.
Raises:
IOError: when the storage writer is closed or if the storage type is
not supported.
OSError: when the storage writer is closed or if the storage type is
not supported.
"""
self._RaiseIfNotWritable()

if self._storage_type != definitions.STORAGE_TYPE_SESSION:
raise IOError('Unsupported storage type.')

session_start = session.CreateSessionStart()
self._store.AddAttributeContainer(session_start)
54 changes: 40 additions & 14 deletions tests/engine/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ def _TestProcessPathSpec(
storage_writer.Open()

try:
storage_writer.WriteSessionStart(session)
session_start = session.CreateSessionStart()
storage_writer.AddAttributeContainer(session_start)

extraction_worker.ProcessPathSpec(parser_mediator, path_spec)
event_source = storage_writer.GetFirstWrittenEventSource()
Expand All @@ -122,7 +123,8 @@ def _TestProcessPathSpec(
parser_mediator, event_source.path_spec)
event_source = storage_writer.GetNextWrittenEventSource()

storage_writer.WriteSessionCompletion(session)
session_completion = session.CreateSessionCompletion()
storage_writer.AddAttributeContainer(session_completion)

if expected_event_counters:
self.CheckEventCounters(storage_writer, expected_event_counters)
Expand Down Expand Up @@ -186,7 +188,9 @@ def testAnalyzeDataStream(self):
extraction_worker._analyzers = [test_analyzer]

storage_writer.Open()
storage_writer.WriteSessionStart(session)

session_start = session.CreateSessionStart()
storage_writer.AddAttributeContainer(session_start)

file_entry = self._GetTestFileEntry(['syslog.tgz'])
parser_mediator.SetFileEntry(file_entry)
Expand All @@ -197,7 +201,9 @@ def testAnalyzeDataStream(self):
extraction_worker._AnalyzeDataStream(
file_entry, '', display_name, event_data_stream)

storage_writer.WriteSessionCompletion(session)
session_completion = session.CreateSessionCompletion()
storage_writer.AddAttributeContainer(session_completion)

storage_writer.Close()

self.assertIsNotNone(event_data_stream)
Expand Down Expand Up @@ -232,7 +238,9 @@ def testAnalyzeFileObject(self):
extraction_worker._analyzers = [test_analyzer]

storage_writer.Open()
storage_writer.WriteSessionStart(session)

session_start = session.CreateSessionStart()
storage_writer.AddAttributeContainer(session_start)

file_entry = self._GetTestFileEntry(['syslog.tgz'])
parser_mediator.SetFileEntry(file_entry)
Expand All @@ -244,7 +252,9 @@ def testAnalyzeFileObject(self):
extraction_worker._AnalyzeFileObject(
file_object, display_name, event_data_stream)

storage_writer.WriteSessionCompletion(session)
session_completion = session.CreateSessionCompletion()
storage_writer.AddAttributeContainer(session_completion)

storage_writer.Close()

self.assertIsNotNone(event_data_stream)
Expand Down Expand Up @@ -297,15 +307,19 @@ def testExtractContentFromDataStream(self):
extraction_worker._analyzers = [test_analyzer]

storage_writer.Open()
storage_writer.WriteSessionStart(session)

session_start = session.CreateSessionStart()
storage_writer.AddAttributeContainer(session_start)

file_entry = self._GetTestFileEntry(['syslog.tgz'])
parser_mediator.SetFileEntry(file_entry)

extraction_worker._ExtractContentFromDataStream(
parser_mediator, file_entry, '')

storage_writer.WriteSessionCompletion(session)
session_completion = session.CreateSessionCompletion()
storage_writer.AddAttributeContainer(session_completion)

storage_writer.Close()

# TODO: check results in storage writer
Expand Down Expand Up @@ -337,15 +351,19 @@ def testExtractMetadataFromFileEntry(self):
extraction_worker._analyzers = [test_analyzer]

storage_writer.Open()
storage_writer.WriteSessionStart(session)

session_start = session.CreateSessionStart()
storage_writer.AddAttributeContainer(session_start)

file_entry = self._GetTestFileEntry(['syslog.tgz'])
parser_mediator.SetFileEntry(file_entry)

extraction_worker._ExtractMetadataFromFileEntry(
parser_mediator, file_entry, '')

storage_writer.WriteSessionCompletion(session)
session_completion = session.CreateSessionCompletion()
storage_writer.AddAttributeContainer(session_completion)

storage_writer.Close()

# TODO: check results in storage writer
Expand Down Expand Up @@ -377,7 +395,9 @@ def testGetArchiveTypes(self):
extraction_worker._analyzers = [test_analyzer]

storage_writer.Open()
storage_writer.WriteSessionStart(session)

session_start = session.CreateSessionStart()
storage_writer.AddAttributeContainer(session_start)

extraction_worker = worker.EventExtractionWorker()

Expand All @@ -387,7 +407,9 @@ def testGetArchiveTypes(self):
parser_mediator, path_spec)
self.assertEqual(type_indicators, [dfvfs_definitions.TYPE_INDICATOR_TAR])

storage_writer.WriteSessionCompletion(session)
session_completion = session.CreateSessionCompletion()
storage_writer.AddAttributeContainer(session_completion)

storage_writer.Close()

def testGetCompressedStreamTypes(self):
Expand Down Expand Up @@ -417,7 +439,9 @@ def testGetCompressedStreamTypes(self):
extraction_worker._analyzers = [test_analyzer]

storage_writer.Open()
storage_writer.WriteSessionStart(session)

session_start = session.CreateSessionStart()
storage_writer.AddAttributeContainer(session_start)

extraction_worker = worker.EventExtractionWorker()

Expand All @@ -427,7 +451,9 @@ def testGetCompressedStreamTypes(self):
parser_mediator, path_spec)
self.assertEqual(type_indicators, [dfvfs_definitions.TYPE_INDICATOR_GZIP])

storage_writer.WriteSessionCompletion(session)
session_completion = session.CreateSessionCompletion()
storage_writer.AddAttributeContainer(session_completion)

storage_writer.Close()

def testIsMetadataFile(self):
Expand Down

0 comments on commit eedc2ae

Please sign in to comment.