Reading the documentation
As a developer you provide a configured pipeline to Luigi as a python module with a number of interdependent task classes. And as a developer you want to know how Luigi instantiates these classes and calls their methods and in which order. After reading through the execution model in the Luigi docs and the source code of the Luigi worker, you know a lot about Luigi workers but still not how they call your code. That is when I decided to write a simple pipeline that just logs any call from Luigi and to infer what actually happens.
Rather explore
The figure above depicts a simple pipeline for running task F(), which depends on task C() and task E() instances, which on their turn depend on a single task B() and two task D() instances, respectively. The corresponding Luigi pipeline with all the logging statements is listed below.
import logging
import luigi
logger = logging.getLogger('abcdef_pipeline')
logger.setLevel(logging.INFO)
fh = logging.FileHandler('output/abcdef_pipeline.log', 'w')
ff = logging.Formatter(fmt='%(asctime)s - %(levelname)s - %(message)s')
fh.setFormatter(ff)
logger.addHandler(fh)
logger.info('Module abcdef_pipeline initialization')
class TaskA(luigi.Task):
logger.info('CLASS TaskA')
def output(self):
logger.info('OUTPUT TaskA')
return luigi.LocalTarget('output/TaskA.output')
def run(self):
logger.info('RUN TaskA')
with self.output().open('w') as f:
f.write('Task A output')
class TaskB(luigi.Task):
logger.info('CLASS TaskB')
def output(self):
logger.info('OUTPUT TaskB')
return luigi.LocalTarget('output/TaskB.output')
def run(self):
logger.info('RUN TaskB')
with self.output().open('w') as f:
f.write('Task B output')
class TaskC(luigi.Task):
logger.info('CLASS TaskC')
def requires(self):
logger.info('REQUIRES TaskC')
return TaskB()
def output(self):
logger.info('OUTPUT TaskC')
return luigi.LocalTarget('output/TaskC.output')
def run(self):
logger.info('RUN TaskC')
with self.output().open('w') as f:
f.write('Task C output')
class TaskD(luigi.Task):
param = luigi.Parameter()
logger.info('CLASS TaskD')
def output(self):
logger.info('OUTPUT Task{} output'.format(self.param))
return luigi.LocalTarget('output/Task{}.output'.format(self.param))
def run(self):
logger.info('RUN Task{} output'.format(self.param))
with self.output().open('w') as f:
f.write('Task {} output'.format(self.param))
class TaskE(luigi.Task):
logger.info('CLASS TaskC')
def requires(self):
logger.info('REQUIRES TaskE')
yield TaskD(param='D0')
yield TaskD(param='D1')
def output(self):
logger.info('OUTPUT TaskE')
return luigi.LocalTarget('output/TaskE.output')
def run(self):
logger.info('RUN TaskE')
with self.output().open('w') as f:
f.write('Task E output')
class TaskF(luigi.Task):
logger.info('CLASS TaskF')
def requires(self):
logger.info('REQUIRES TaskF')
yield TaskA()
yield TaskC()
yield TaskE()
def output(self):
logger.info('OUTPUT TaskF')
return luigi.LocalTarget('output/TaskF.output')
def run(self):
logger.info('RUN TaskF')
with self.output().open('w') as f:
f.write('Task F output')
This results in the logging output listed below.
# Initialization 2018-08-25 21:53:00,889 - INFO - Module abcd_pipeline initialization 2018-08-25 21:53:00,889 - INFO - CLASS TaskA 2018-08-25 21:53:00,889 - INFO - CLASS TaskB 2018-08-25 21:53:00,889 - INFO - CLASS TaskC 2018-08-25 21:53:00,890 - INFO - CLASS TaskD 2018-08-25 21:53:00,890 - INFO - CLASS TaskC 2018-08-25 21:53:00,890 - INFO - CLASS TaskF # Depth-first search of task graph 2018-08-25 21:53:00,923 - INFO - OUTPUT TaskF 2018-08-25 21:53:00,924 - INFO - REQUIRES TaskF 2018-08-25 21:53:00,924 - INFO - OUTPUT TaskA 2018-08-25 21:53:00,925 - INFO - OUTPUT TaskC 2018-08-25 21:53:00,925 - INFO - OUTPUT TaskE 2018-08-25 21:53:00,925 - INFO - REQUIRES TaskE 2018-08-25 21:53:00,926 - INFO - OUTPUT TaskD0 2018-08-25 21:53:00,926 - INFO - OUTPUT TaskD1 2018-08-25 21:53:00,927 - INFO - REQUIRES TaskD1 2018-08-25 21:53:00,927 - INFO - REQUIRES TaskD0 2018-08-25 21:53:00,927 - INFO - REQUIRES TaskC 2018-08-25 21:53:00,928 - INFO - OUTPUT TaskB 2018-08-25 21:53:00,928 - INFO - REQUIRES TaskB 2018-08-25 21:53:00,928 - INFO - REQUIRES TaskA # TaskA execution 2018-08-25 21:53:00,929 - INFO - REQUIRES TaskA 2018-08-25 21:53:00,929 - INFO - RUN TaskA 2018-08-25 21:53:00,929 - INFO - OUTPUT TaskA # TaskD1
execution
2018-08-25 21:53:00,931 - INFO - REQUIRES TaskD1 2018-08-25 21:53:00,931 - INFO - RUN TaskD1 2018-08-25 21:53:00,931 - INFO - OUTPUT TaskD1 # TaskD0
execution
2018-08-25 21:53:00,933 - INFO - REQUIRES TaskD0 2018-08-25 21:53:00,933 - INFO - RUN TaskD0 2018-08-25 21:53:00,933 - INFO - OUTPUT TaskD0 # TaskE
execution
2018-08-25 21:53:00,934 - INFO - REQUIRES TaskE 2018-08-25 21:53:00,934 - INFO - OUTPUT TaskD0 2018-08-25 21:53:00,934 - INFO - OUTPUT TaskD1 2018-08-25 21:53:00,934 - INFO - RUN TaskE 2018-08-25 21:53:00,934 - INFO - OUTPUT TaskE # TaskB
execution
2018-08-25 21:53:00,936 - INFO - REQUIRES TaskB 2018-08-25 21:53:00,936 - INFO - RUN TaskB 2018-08-25 21:53:00,936 - INFO - OUTPUT TaskB # TaskC
execution
2018-08-25 21:53:00,937 - INFO - REQUIRES TaskC 2018-08-25 21:53:00,937 - INFO - OUTPUT TaskB 2018-08-25 21:53:00,937 - INFO - RUN TaskC 2018-08-25 21:53:00,937 - INFO - OUTPUT TaskC # TaskF
execution
2018-08-25 21:53:00,938 - INFO - REQUIRES TaskF 2018-08-25 21:53:00,938 - INFO - OUTPUT TaskA 2018-08-25 21:53:00,938 - INFO - OUTPUT TaskC 2018-08-25 21:53:00,939 - INFO - OUTPUT TaskE 2018-08-25 21:53:00,939 - INFO - RUN TaskF 2018-08-25 21:53:00,939 - INFO - OUTPUT TaskF
And then infer
Apparently, this is what Luigi does:
def depth_first_search(tasks, start):
task = tasks[start]
if not task.output().exists():
yield task
for t in task.requires()
depth_first_search(tasks, t)
def execute_task(task):
if not all([exists(t.output() for t in task.requires())]):
return False
task.run()
return task.output().exists()
import your_module # 1.
tasks = {t: type(t)() for t in dir(your_module)} # 2.
scheduled_tasks = list(depth_first_search(tasks, 'TaskF')) # 3.
scheduled_tasks.reverse()
results = {t.task_id: execute_task(t) for t in scheduled_tasks} # 3.
if results['TaskF']:
print(':-)')
else:
print(':-(')
Actually, this is more of a pseudocode as it hides details about composite targets and requirements and the disitribution of task execution over workers. But it nicely shows that:
- Your pipeline module code is imported once
- Task classes are instantiated once (so, under the hood, members of a task family use the same instance)
- Luigi has a scheduling phase and an execution phase