-->

2018-08-26

Code execution order in Luigi pipelines

Luigi is a popular framework for writing and managing batch-oriented data pipelines in python. While the Luigi documentation and several places on the web provide simple examples to get started with Luigi, you soon discover that you have to understand Luigi's code execution order really well once your application becomes more complex. This blog intends to provide this insight.

Reading the documentation


As a developer you provide a configured pipeline to Luigi as a python module with a number of interdependent task classes. And as a developer you want to know how Luigi instantiates these classes and calls their methods and in which order. After reading through the execution model in the Luigi docs and the source code of the Luigi worker, you know a lot about Luigi workers but still not how they call your code. That is when I decided to write a simple pipeline that just logs any call from Luigi and to infer what actually happens.

Rather explore


The figure above depicts a simple pipeline for running task F(), which depends on task C() and task E() instances, which on their turn depend on a single task B() and two task D() instances, respectively. The corresponding Luigi pipeline with all the logging statements is listed below.

 import logging  
 import luigi  
   
 logger = logging.getLogger('abcdef_pipeline')  
 logger.setLevel(logging.INFO)  
 fh = logging.FileHandler('output/abcdef_pipeline.log', 'w')  
 ff = logging.Formatter(fmt='%(asctime)s - %(levelname)s - %(message)s')  
 fh.setFormatter(ff)  
 logger.addHandler(fh)  
   
 logger.info('Module abcdef_pipeline initialization')  
   
   
 class TaskA(luigi.Task):  
   
   logger.info('CLASS TaskA')  
   
   def output(self):  
     logger.info('OUTPUT TaskA')  
     return luigi.LocalTarget('output/TaskA.output')  
   
   def run(self):  
     logger.info('RUN TaskA')  
     with self.output().open('w') as f:  
       f.write('Task A output')  
   
   
 class TaskB(luigi.Task):  
   
   logger.info('CLASS TaskB')  
   
   def output(self):  
     logger.info('OUTPUT TaskB')  
     return luigi.LocalTarget('output/TaskB.output')  
   
   def run(self):  
     logger.info('RUN TaskB')  
     with self.output().open('w') as f:  
       f.write('Task B output')  
   
   
 class TaskC(luigi.Task):  
   
   logger.info('CLASS TaskC')  
   
   def requires(self):  
     logger.info('REQUIRES TaskC')  
     return TaskB()  
   
   def output(self):  
     logger.info('OUTPUT TaskC')  
     return luigi.LocalTarget('output/TaskC.output')  
   
   def run(self):  
     logger.info('RUN TaskC')  
     with self.output().open('w') as f:  
       f.write('Task C output')  
   
   
 class TaskD(luigi.Task):  
   
   param = luigi.Parameter()  
   logger.info('CLASS TaskD')  
   
   def output(self):  
     logger.info('OUTPUT Task{} output'.format(self.param))  
     return luigi.LocalTarget('output/Task{}.output'.format(self.param))  
   
   def run(self):  
     logger.info('RUN Task{} output'.format(self.param))  
     with self.output().open('w') as f:  
       f.write('Task {} output'.format(self.param))  
   
   
 class TaskE(luigi.Task):  
   
   logger.info('CLASS TaskC')  
   
   def requires(self):  
     logger.info('REQUIRES TaskE')  
     yield TaskD(param='D0')  
     yield TaskD(param='D1')  
   
   def output(self):  
     logger.info('OUTPUT TaskE')  
     return luigi.LocalTarget('output/TaskE.output')  
   
   def run(self):  
     logger.info('RUN TaskE')  
     with self.output().open('w') as f:  
       f.write('Task E output')  
   
   
 class TaskF(luigi.Task):  
   
   logger.info('CLASS TaskF')  
   
   def requires(self):  
     logger.info('REQUIRES TaskF')  
     yield TaskA()  
     yield TaskC()  
     yield TaskE()  
   
   def output(self):  
     logger.info('OUTPUT TaskF')  
     return luigi.LocalTarget('output/TaskF.output')  
   
   def run(self):  
     logger.info('RUN TaskF')  
     with self.output().open('w') as f:  
       f.write('Task F output')  
   

This results in the logging output listed below.


# Initialization
2018-08-25 21:53:00,889 - INFO - Module abcd_pipeline initialization
2018-08-25 21:53:00,889 - INFO - CLASS TaskA
2018-08-25 21:53:00,889 - INFO - CLASS TaskB
2018-08-25 21:53:00,889 - INFO - CLASS TaskC
2018-08-25 21:53:00,890 - INFO - CLASS TaskD
2018-08-25 21:53:00,890 - INFO - CLASS TaskC
2018-08-25 21:53:00,890 - INFO - CLASS TaskF

# Depth-first search of task graph
2018-08-25 21:53:00,923 - INFO - OUTPUT TaskF
2018-08-25 21:53:00,924 - INFO - REQUIRES TaskF
2018-08-25 21:53:00,924 - INFO - OUTPUT TaskA
2018-08-25 21:53:00,925 - INFO - OUTPUT TaskC
2018-08-25 21:53:00,925 - INFO - OUTPUT TaskE
2018-08-25 21:53:00,925 - INFO - REQUIRES TaskE
2018-08-25 21:53:00,926 - INFO - OUTPUT TaskD0
2018-08-25 21:53:00,926 - INFO - OUTPUT TaskD1
2018-08-25 21:53:00,927 - INFO - REQUIRES TaskD1
2018-08-25 21:53:00,927 - INFO - REQUIRES TaskD0
2018-08-25 21:53:00,927 - INFO - REQUIRES TaskC
2018-08-25 21:53:00,928 - INFO - OUTPUT TaskB
2018-08-25 21:53:00,928 - INFO - REQUIRES TaskB
2018-08-25 21:53:00,928 - INFO - REQUIRES TaskA

# TaskA execution
2018-08-25 21:53:00,929 - INFO - REQUIRES TaskA
2018-08-25 21:53:00,929 - INFO - RUN TaskA
2018-08-25 21:53:00,929 - INFO - OUTPUT TaskA

# TaskD1 execution
2018-08-25 21:53:00,931 - INFO - REQUIRES TaskD1
2018-08-25 21:53:00,931 - INFO - RUN TaskD1
2018-08-25 21:53:00,931 - INFO - OUTPUT TaskD1

# TaskD0 execution
2018-08-25 21:53:00,933 - INFO - REQUIRES TaskD0
2018-08-25 21:53:00,933 - INFO - RUN TaskD0
2018-08-25 21:53:00,933 - INFO - OUTPUT TaskD0

# TaskE execution
2018-08-25 21:53:00,934 - INFO - REQUIRES TaskE
2018-08-25 21:53:00,934 - INFO - OUTPUT TaskD0
2018-08-25 21:53:00,934 - INFO - OUTPUT TaskD1
2018-08-25 21:53:00,934 - INFO - RUN TaskE
2018-08-25 21:53:00,934 - INFO - OUTPUT TaskE

# TaskB execution
2018-08-25 21:53:00,936 - INFO - REQUIRES TaskB
2018-08-25 21:53:00,936 - INFO - RUN TaskB
2018-08-25 21:53:00,936 - INFO - OUTPUT TaskB

# TaskC execution
2018-08-25 21:53:00,937 - INFO - REQUIRES TaskC
2018-08-25 21:53:00,937 - INFO - OUTPUT TaskB
2018-08-25 21:53:00,937 - INFO - RUN TaskC
2018-08-25 21:53:00,937 - INFO - OUTPUT TaskC

# TaskF execution
2018-08-25 21:53:00,938 - INFO - REQUIRES TaskF
2018-08-25 21:53:00,938 - INFO - OUTPUT TaskA
2018-08-25 21:53:00,938 - INFO - OUTPUT TaskC
2018-08-25 21:53:00,939 - INFO - OUTPUT TaskE
2018-08-25 21:53:00,939 - INFO - RUN TaskF
2018-08-25 21:53:00,939 - INFO - OUTPUT TaskF

And then infer

Apparently, this is what Luigi does:

def depth_first_search(tasks, start):
    task = tasks[start]
    if not task.output().exists():
        yield task
        for t in task.requires()
            depth_first_search(tasks, t)


def execute_task(task):
    if not all([exists(t.output() for t in task.requires())]):
        return False
    task.run()  
    return task.output().exists()
   
   
import your_module                                                  # 1.
tasks = {t: type(t)() for t in dir(your_module)}                    # 2.
scheduled_tasks = list(depth_first_search(tasks, 'TaskF'))          # 3.
scheduled_tasks.reverse()
results = {t.task_id: execute_task(t) for t in scheduled_tasks}     # 3.
if results['TaskF']:  
    print(':-)')  
else:  
    print(':-(')  

Actually, this is more of a pseudocode as it hides details about composite targets and requirements and the disitribution of task execution over workers. But it nicely shows that:
  1. Your pipeline module code is imported once 
  2. Task classes are instantiated once (so, under the hood, members of a task family use the same instance)
  3. Luigi has a scheduling phase and an execution phase

2018-08-12

Connecting to TinkerPop's gremlin server from R Studio with a java wrapper

Apache TinkerPop supports a number of gremlin language variants to provide native language support for connecting to gremlin server. The variants include java, python, .NET and javascript, but not R. So, when making available graph data in a TinkerPop-compatible graph database, you are likely to receive user requests to access these data from R studio. Yet, a google search on "gremlin R" only results in a suggestion to use rJava to connect to gremlin server and a cumbersome way to read some graph data into a TinkerGraph and querying it successively. Therefore, this post documents how to connect to gremlin server from R Studio using rJava.

Building the java wrapper

After experimenting a few hours with the minimally documented rJava package in R Studio, I decided this was not the way I wanted to duct-tape R Studio to gremlin server. So, rather I wrote a simple java class that does all the interfacing to gremlin server, but is easy to access from rJava. The source code is available from https://gist.github.com/vtslab/676419e6f205672aa935bb3dbfe2d1d8.

Assuming you have Apache Maven installed on your system, you can build the jar by placing the RGremlin.java file in the src/main/java directory relative to the rgremlin_pom.xml and issuing:

$ mvn install -f rgremlin_pom.xml

This gives you a target directory with the rgremlin-0.0.1.jar and a target/lib folder with dependencies of the RGremlin java class.

In this particular example, the JanusGraphIoRegistry is added to the connection. Of course, you can easily replace it with an IoRegistry for other TinkerPop implementations.

Running a gremlin query

With the java wrapper, connecting to gremlin server and submitting a gremlin query string is now as simple as:

# install.packages("rJava")
# install.packages("rjson")
library(rJava)
library(rjson)

params ← c("-Dgremlin.log4j.level=INFO")
jinit(parameters=params)
.jaddClassPath(
     dir("/some/path/lib/janusgraph-0.2.1-hadoop2/lib",  full.names=TRUE))
.jclassPath()

client ← .jnew("rgremlin.RGremlin", "localhost")
jsonStr ← .jcall(
     client, "Ljava/lang/String;",
     "submit", "g.V().id()")

result ← fromJSON(jsonStr)
print(result)

<- c="" gremlin.log4j.level="INFO" p=""><- .jnew="" p="" rgremlin.rgremlin="" some.gremlin.server=""><- .jcall="" p=""><- fromjson="" jsonstr="" p="">Note that query results are serialized as json between the java wrapper and R. Since rjson only recognizes standard types and collections, the java wrapper is not suitable for queries that return TinkerPop objects such as vertices, edges and TinkerGraphs. In the example above, vertices are returned as maps using the gremlin valueMap() step.

In the example above, the rgremlin-0.0.1.jar was simply copied to the JanusGraph lib directory, which on its turn was added to the rJava classpath. As an alternative, you can add the target and target/lib directories from the previous section to the rJava classpath.

Closing remark


The solution presented is only a quick and dirty manner to get your users going and so gather additional user requirements. Ideally, R would become available as a gremlin language variant for complete TinkerPop support and the possibility to write gremlin queries natively in R instead of submitting query strings.