33"""dftimewolf main entrypoint."""
44
55import argparse
6- import datetime
76import logging
87import os
98import signal
109import sys
11- import time
10+ import typing
1211import uuid
1312
1413from typing import TYPE_CHECKING , List , Optional , Dict , Any , cast
2221from dftimewolf .lib import logging_utils
2322from dftimewolf .lib import telemetry
2423from dftimewolf import config
25-
24+ from dftimewolf . lib . modules import module_runner
2625from dftimewolf .lib import errors
2726from dftimewolf .lib import utils
2827
2928if TYPE_CHECKING :
30- from dftimewolf .lib import state as dftw_state
3129 from dftimewolf .lib import resources
3230
3331TELEMETRY = telemetry
8987# pylint: enable=line-too-long
9088
9189from dftimewolf .lib .recipes import manager as recipes_manager
92- from dftimewolf . lib . state import DFTimewolfState
90+
9391
9492logger = cast (logging_utils .WolfLogger , logging .getLogger ('dftimewolf' ))
9593
94+
9695class DFTimewolfTool (object ):
9796 """DFTimewolf tool."""
9897
@@ -102,29 +101,25 @@ class DFTimewolfTool(object):
102101
103102 def __init__ (
104103 self ,
105- workflow_uuid : Optional [str ] = None ) -> None :
104+ workflow_uuid : Optional [str ] = None ,
105+ telemetry_ : Optional [telemetry .BaseTelemetry ] = None ) -> None :
106106 """Initializes a DFTimewolf tool."""
107107 super (DFTimewolfTool , self ).__init__ ()
108- self ._command_line_options : Optional [argparse .Namespace ]
109108 self ._data_files_path = ''
110109 self ._recipes_manager = recipes_manager .RecipesManager ()
111110 self ._recipe = {} # type: Dict[str, Any]
112- self ._state = None # type: Optional[DFTimewolfState]
113111 self ._command_line_options = argparse .Namespace ()
112+ self ._running_args : dict [str , typing .Any ] = {}
114113 self .dry_run = False
115- if not workflow_uuid :
116- workflow_uuid = str (uuid .uuid4 ())
117- self .uuid = workflow_uuid
118114
119- self .telemetry = self .InitializeTelemetry ()
120- self ._DetermineDataFilesPath ()
115+ self ._uuid = workflow_uuid or str (uuid .uuid4 ())
121116
122- @ property
123- def state ( self ) -> "dftw_state.DFTimewolfState" :
124- """Returns the internal state object."""
125- if not self ._state :
126- raise errors . CriticalError ( 'State not initialized' )
127- return self ._state
117+ logger . success ( f'dfTimewolf tool initialized with UUID: { self . _uuid } ' )
118+
119+ self . _telemetry = telemetry_ or self . InitializeTelemetry ()
120+ self ._DetermineDataFilesPath ()
121+ self . _module_runner = module_runner . ModuleRunner (
122+ logger , self ._telemetry , self . PublishMessage )
128123
129124 def _AddRecipeOptions (self , argument_parser : argparse .ArgumentParser ) -> None :
130125 """Adds the recipe options to the argument group.
@@ -285,20 +280,18 @@ def ParseArguments(self, arguments: List[str]) -> None:
285280 self ._recipe = self ._command_line_options .recipe
286281 self .dry_run = self ._command_line_options .dry_run
287282
288- state = DFTimewolfState (config .Config )
289- state .telemetry = self .telemetry
290- self ._state = state
283+ self ._telemetry .SetRecipeName (self ._recipe ['name' ])
291284
292285 logger .info ('Loading recipe {0:s}...' .format (self ._recipe ['name' ]))
293286 # Raises errors.RecipeParseError on error.
294- self ._state . LoadRecipe (self ._recipe , MODULES )
287+ self ._module_runner . Initialise (self ._recipe , MODULES )
295288
296289 module_cnt = len (self ._recipe .get ('modules' , [])) + \
297290 len (self ._recipe .get ('preflights' , []))
298291 logger .info ('Loaded recipe {0:s} with {1:d} modules' .format (
299292 self ._recipe ['name' ], module_cnt ))
300293
301- self ._state . command_line_options = vars (self ._command_line_options )
294+ self ._running_args = vars (self ._command_line_options )
302295
303296 def ValidateArguments (self , dry_run : bool = False ) -> None :
304297 """Validate the arguments.
@@ -317,13 +310,13 @@ def ValidateArguments(self, dry_run: bool=False) -> None:
317310
318311 switch = expanded_argument .switch .replace ('--' , '' )
319312 argument_mandatory = switch == arg .switch
320- argument_value = self .state . command_line_options .get (switch )
313+ argument_value = self ._running_args .get (switch )
321314
322315 if argument_mandatory or argument_value is not None :
323316 try :
324317 valid_value = validators_manager .ValidatorsManager .Validate (
325318 argument_value , arg , dry_run )
326- self .state . command_line_options [switch ] = valid_value
319+ self ._running_args [switch ] = valid_value
327320 except errors .RecipeArgsValidationFailure as exception :
328321 error_messages .append (
329322 f'Invalid argument: "{ arg .switch } " with value "{ argument_value } ".'
@@ -340,6 +333,14 @@ def ValidateArguments(self, dry_run: bool=False) -> None:
340333 raise errors .CriticalError (
341334 'At least one argument failed validation' )
342335
336+ def InterpolateArgs (self ) -> None :
337+ """Interpolate config values and CLI args into the recipe args."""
338+ for module in (self ._recipe .get ('preflights' , []) +
339+ self ._recipe .get ('modules' , [])):
340+ module ['args' ] = utils .ImportArgsFromDict (module ['args' ],
341+ self ._running_args ,
342+ config .Config )
343+
343344 def _SubstituteValidationParameters (
344345 self , arg : "resources.RecipeArgument" ) -> "resources.RecipeArgument" :
345346 """Replaces parameters in the format specification of an argument validator.
@@ -354,52 +355,58 @@ def _SubstituteValidationParameters(
354355 for key , value in arg .validation_params .items ():
355356 if isinstance (value , str ) and '@' in value :
356357 to_substitute = value .replace ('@' , '' )
357- if to_substitute in self .state . command_line_options :
358+ if to_substitute in self ._running_args :
358359 arg .validation_params [key ] = (
359- self .state . command_line_options [to_substitute ])
360+ self ._running_args [to_substitute ])
360361 return arg
361362
362- def RunPreflights (self ) -> None :
363- """Runs preflight modules."""
364- logger .info ('Running preflights...' )
365- self .state .RunPreflights ()
366-
367363 def ReadRecipes (self ) -> None :
368364 """Reads the recipe files."""
369365 if os .path .isdir (self ._data_files_path ):
370366 recipes_path = os .path .join (self ._data_files_path , 'recipes' )
371367 if os .path .isdir (recipes_path ):
372368 self ._recipes_manager .ReadRecipesFromDirectory (recipes_path )
373369
374- def RunModules (self ) -> None :
370+ def ReadAdditionalRecipes (self , directory : str ) -> None :
371+ """Reads additional recipes from a given directory."""
372+ self ._recipes_manager .ReadRecipesFromDirectory (directory )
373+
374+ def RunAllModules (self ) -> None :
375375 """Runs the modules."""
376376 logger .info ('Running modules...' )
377- self .state . RunModules ( )
377+ self ._module_runner . Run ( self . _running_args )
378378 logger .info ('Modules run successfully!' )
379379
380- def SetupModules (self ) -> None :
381- """Sets up the modules."""
382- # TODO: refactor to only load modules that are used by the recipe.
383-
384- logger .info ('Setting up modules...' )
385- self .state .SetupModules ()
386- logger .info ('Modules successfully set up!' )
387-
388- def CleanUpPreflights (self ) -> None :
389- """Calls the preflight's CleanUp functions."""
390- self .state .CleanUpPreflights ()
391-
392- def FormatTelemetry (self ) -> str :
380+ def LogTelemetry (self ) -> None :
393381 """Prints collected telemetry if existing."""
394- return self .telemetry .FormatTelemetry ()
382+
383+ for line in self ._telemetry .FormatTelemetry ().split ('\n ' ):
384+ logger .debug (line )
395385
396386 def RecipesManager (self ) -> recipes_manager .RecipesManager :
397387 """Returns the recipes manager."""
398388 return self ._recipes_manager
399389
400390 def InitializeTelemetry (self ) -> telemetry .BaseTelemetry :
401391 """Initializes the telemetry object."""
402- return telemetry .GetTelemetry (uuid = self .uuid )
392+ return telemetry .GetTelemetry (uuid = self ._uuid )
393+
394+ def PublishMessage (
395+ self , source : str , message : str , is_error : bool = False ) -> None :
396+ """Receives a message for publishing.
397+
398+ The base class does nothing with this (as the method in module also logs the
399+ message). This method exists to be overridden for other UIs.
400+
401+ Args:
402+ source: The source of the message.
403+ message: The message content.
404+ is_error: True if the message is an error message, False otherwise.
405+ """
406+
407+ def LogExecutionPlan (self ) -> None :
408+ """log the execution plan."""
409+ self ._module_runner .LogExecutionPlan ()
403410
404411
405412def SignalHandler (* unused_argvs : Any ) -> None :
@@ -463,12 +470,10 @@ def RunTool() -> int:
463470 Returns:
464471 int: 0 DFTimewolf could be run successfully, 1 otherwise.
465472 """
466- time_start = time .time ()* 1000
467473 tool = DFTimewolfTool ()
468474
469475 # TODO: log errors if this fails.
470476 tool .LoadConfiguration ()
471- logger .success (f'dfTimewolf tool initialized with UUID: { tool .uuid } ' )
472477
473478 try :
474479 tool .ReadRecipes ()
@@ -484,75 +489,22 @@ def RunTool() -> int:
484489 logger .critical (str (exception ))
485490 return 1
486491
487- modules = [
488- module ['name' ] for module in tool .state .recipe .get ('modules' , [])
489- ]
490- modules .extend ([
491- module ['name' ] for module in tool .state .recipe .get ('preflights' , [])
492- ])
493- recipe_name = tool .state .recipe ['name' ]
494-
495- for module in sorted (modules ):
496- tool .telemetry .LogTelemetry ('module' , module , 'core' , recipe_name )
497-
498- tool .telemetry .LogTelemetry (
499- 'workflow_start' ,
500- datetime .datetime .now ().strftime ('%Y-%m-%dT%H:%M:%S.%fZ' ),
501- 'core' ,
502- recipe_name )
503-
504492 try :
505493 tool .ValidateArguments (tool .dry_run )
506494 except errors .CriticalError as exception :
507495 logger .critical (str (exception ))
508496 return 1
509497
510- # Interpolate arguments into recipe
511- recipe = tool .state .recipe
512- for module in recipe .get ('preflights' , []) + recipe .get ('modules' , []):
513- module ['args' ] = utils .ImportArgsFromDict (module ['args' ],
514- tool .state .command_line_options ,
515- tool .state .config )
516-
517- tool .state .LogExecutionPlan ()
498+ tool .InterpolateArgs ()
499+ tool .LogExecutionPlan ()
518500
519501 if tool .dry_run :
520502 logger .info ("Exiting as --dry_run flag is set." )
521503 return 0
522504
523- time_ready = time .time ()* 1000
524- tool .RunPreflights ()
525- time_preflights = time .time ()* 1000
526- tool .telemetry .LogTelemetry (
527- 'preflights_delta' , str (time_preflights - time_ready ), 'core' , recipe_name )
528-
529- try :
530- tool .SetupModules ()
531- except errors .CriticalError as exception :
532- logger .critical (str (exception ))
533- return 1
534-
535- time_setup = time .time ()* 1000
536- tool .telemetry .LogTelemetry (
537- 'setup_delta' , str (time_setup - time_preflights ), 'core' , recipe_name )
505+ tool .RunAllModules ()
538506
539- try :
540- tool .RunModules ()
541- except errors .CriticalError as exception :
542- logger .critical (str (exception ))
543- return 1
544- finally :
545- time_run = time .time ()* 1000
546- tool .telemetry .LogTelemetry (
547- 'run_delta' , str (time_run - time_setup ), 'core' , recipe_name )
548-
549- tool .CleanUpPreflights ()
550-
551- total_time = time .time ()* 1000 - time_start
552- tool .telemetry .LogTelemetry (
553- 'total_time' , str (total_time ), 'core' , recipe_name )
554- for telemetry_row in tool .FormatTelemetry ().split ('\n ' ):
555- logger .debug (telemetry_row )
507+ tool .LogTelemetry ()
556508
557509 return 0
558510
0 commit comments