-->

2018-08-26

Code execution order in Luigi pipelines

Luigi is a popular framework for writing and managing batch-oriented data pipelines in python. While the Luigi documentation and several places on the web provide simple examples to get started with Luigi, you soon discover that you have to understand Luigi's code execution order really well once your application becomes more complex. This blog intends to provide this insight.

Reading the documentation


As a developer you provide a configured pipeline to Luigi as a python module with a number of interdependent task classes. And as a developer you want to know how Luigi instantiates these classes and calls their methods and in which order. After reading through the execution model in the Luigi docs and the source code of the Luigi worker, you know a lot about Luigi workers but still not how they call your code. That is when I decided to write a simple pipeline that just logs any call from Luigi and to infer what actually happens.

Rather explore


The figure above depicts a simple pipeline for running task F(), which depends on task C() and task E() instances, which on their turn depend on a single task B() and two task D() instances, respectively. The corresponding Luigi pipeline with all the logging statements is listed below.

 import logging  
 import luigi  
   
 logger = logging.getLogger('abcdef_pipeline')  
 logger.setLevel(logging.INFO)  
 fh = logging.FileHandler('output/abcdef_pipeline.log', 'w')  
 ff = logging.Formatter(fmt='%(asctime)s - %(levelname)s - %(message)s')  
 fh.setFormatter(ff)  
 logger.addHandler(fh)  
   
 logger.info('Module abcdef_pipeline initialization')  
   
   
 class TaskA(luigi.Task):  
   
   logger.info('CLASS TaskA')  
   
   def output(self):  
     logger.info('OUTPUT TaskA')  
     return luigi.LocalTarget('output/TaskA.output')  
   
   def run(self):  
     logger.info('RUN TaskA')  
     with self.output().open('w') as f:  
       f.write('Task A output')  
   
   
 class TaskB(luigi.Task):  
   
   logger.info('CLASS TaskB')  
   
   def output(self):  
     logger.info('OUTPUT TaskB')  
     return luigi.LocalTarget('output/TaskB.output')  
   
   def run(self):  
     logger.info('RUN TaskB')  
     with self.output().open('w') as f:  
       f.write('Task B output')  
   
   
 class TaskC(luigi.Task):  
   
   logger.info('CLASS TaskC')  
   
   def requires(self):  
     logger.info('REQUIRES TaskC')  
     return TaskB()  
   
   def output(self):  
     logger.info('OUTPUT TaskC')  
     return luigi.LocalTarget('output/TaskC.output')  
   
   def run(self):  
     logger.info('RUN TaskC')  
     with self.output().open('w') as f:  
       f.write('Task C output')  
   
   
 class TaskD(luigi.Task):  
   
   param = luigi.Parameter()  
   logger.info('CLASS TaskD')  
   
   def output(self):  
     logger.info('OUTPUT Task{} output'.format(self.param))  
     return luigi.LocalTarget('output/Task{}.output'.format(self.param))  
   
   def run(self):  
     logger.info('RUN Task{} output'.format(self.param))  
     with self.output().open('w') as f:  
       f.write('Task {} output'.format(self.param))  
   
   
 class TaskE(luigi.Task):  
   
   logger.info('CLASS TaskC')  
   
   def requires(self):  
     logger.info('REQUIRES TaskE')  
     yield TaskD(param='D0')  
     yield TaskD(param='D1')  
   
   def output(self):  
     logger.info('OUTPUT TaskE')  
     return luigi.LocalTarget('output/TaskE.output')  
   
   def run(self):  
     logger.info('RUN TaskE')  
     with self.output().open('w') as f:  
       f.write('Task E output')  
   
   
 class TaskF(luigi.Task):  
   
   logger.info('CLASS TaskF')  
   
   def requires(self):  
     logger.info('REQUIRES TaskF')  
     yield TaskA()  
     yield TaskC()  
     yield TaskE()  
   
   def output(self):  
     logger.info('OUTPUT TaskF')  
     return luigi.LocalTarget('output/TaskF.output')  
   
   def run(self):  
     logger.info('RUN TaskF')  
     with self.output().open('w') as f:  
       f.write('Task F output')  
   

This results in the logging output listed below.


# Initialization
2018-08-25 21:53:00,889 - INFO - Module abcd_pipeline initialization
2018-08-25 21:53:00,889 - INFO - CLASS TaskA
2018-08-25 21:53:00,889 - INFO - CLASS TaskB
2018-08-25 21:53:00,889 - INFO - CLASS TaskC
2018-08-25 21:53:00,890 - INFO - CLASS TaskD
2018-08-25 21:53:00,890 - INFO - CLASS TaskC
2018-08-25 21:53:00,890 - INFO - CLASS TaskF

# Depth-first search of task graph
2018-08-25 21:53:00,923 - INFO - OUTPUT TaskF
2018-08-25 21:53:00,924 - INFO - REQUIRES TaskF
2018-08-25 21:53:00,924 - INFO - OUTPUT TaskA
2018-08-25 21:53:00,925 - INFO - OUTPUT TaskC
2018-08-25 21:53:00,925 - INFO - OUTPUT TaskE
2018-08-25 21:53:00,925 - INFO - REQUIRES TaskE
2018-08-25 21:53:00,926 - INFO - OUTPUT TaskD0
2018-08-25 21:53:00,926 - INFO - OUTPUT TaskD1
2018-08-25 21:53:00,927 - INFO - REQUIRES TaskD1
2018-08-25 21:53:00,927 - INFO - REQUIRES TaskD0
2018-08-25 21:53:00,927 - INFO - REQUIRES TaskC
2018-08-25 21:53:00,928 - INFO - OUTPUT TaskB
2018-08-25 21:53:00,928 - INFO - REQUIRES TaskB
2018-08-25 21:53:00,928 - INFO - REQUIRES TaskA

# TaskA execution
2018-08-25 21:53:00,929 - INFO - REQUIRES TaskA
2018-08-25 21:53:00,929 - INFO - RUN TaskA
2018-08-25 21:53:00,929 - INFO - OUTPUT TaskA

# TaskD1 execution
2018-08-25 21:53:00,931 - INFO - REQUIRES TaskD1
2018-08-25 21:53:00,931 - INFO - RUN TaskD1
2018-08-25 21:53:00,931 - INFO - OUTPUT TaskD1

# TaskD0 execution
2018-08-25 21:53:00,933 - INFO - REQUIRES TaskD0
2018-08-25 21:53:00,933 - INFO - RUN TaskD0
2018-08-25 21:53:00,933 - INFO - OUTPUT TaskD0

# TaskE execution
2018-08-25 21:53:00,934 - INFO - REQUIRES TaskE
2018-08-25 21:53:00,934 - INFO - OUTPUT TaskD0
2018-08-25 21:53:00,934 - INFO - OUTPUT TaskD1
2018-08-25 21:53:00,934 - INFO - RUN TaskE
2018-08-25 21:53:00,934 - INFO - OUTPUT TaskE

# TaskB execution
2018-08-25 21:53:00,936 - INFO - REQUIRES TaskB
2018-08-25 21:53:00,936 - INFO - RUN TaskB
2018-08-25 21:53:00,936 - INFO - OUTPUT TaskB

# TaskC execution
2018-08-25 21:53:00,937 - INFO - REQUIRES TaskC
2018-08-25 21:53:00,937 - INFO - OUTPUT TaskB
2018-08-25 21:53:00,937 - INFO - RUN TaskC
2018-08-25 21:53:00,937 - INFO - OUTPUT TaskC

# TaskF execution
2018-08-25 21:53:00,938 - INFO - REQUIRES TaskF
2018-08-25 21:53:00,938 - INFO - OUTPUT TaskA
2018-08-25 21:53:00,938 - INFO - OUTPUT TaskC
2018-08-25 21:53:00,939 - INFO - OUTPUT TaskE
2018-08-25 21:53:00,939 - INFO - RUN TaskF
2018-08-25 21:53:00,939 - INFO - OUTPUT TaskF

And then infer

Apparently, this is what Luigi does:

def depth_first_search(tasks, start):
    task = tasks[start]
    if not task.output().exists():
        yield task
        for t in task.requires()
            depth_first_search(tasks, t)


def execute_task(task):
    if not all([exists(t.output() for t in task.requires())]):
        return False
    task.run()  
    return task.output().exists()
   
   
import your_module                                                  # 1.
tasks = {t: type(t)() for t in dir(your_module)}                    # 2.
scheduled_tasks = list(depth_first_search(tasks, 'TaskF'))          # 3.
scheduled_tasks.reverse()
results = {t.task_id: execute_task(t) for t in scheduled_tasks}     # 3.
if results['TaskF']:  
    print(':-)')  
else:  
    print(':-(')  

Actually, this is more of a pseudocode as it hides details about composite targets and requirements and the disitribution of task execution over workers. But it nicely shows that:
  1. Your pipeline module code is imported once 
  2. Task classes are instantiated once (so, under the hood, members of a task family use the same instance)
  3. Luigi has a scheduling phase and an execution phase

No comments:

Post a Comment