-->

2018-08-26

Code execution order in Luigi pipelines

Luigi is a popular framework for writing and managing batch-oriented data pipelines in python. While the Luigi documentation and several places on the web provide simple examples to get started with Luigi, you soon discover that you have to understand Luigi's code execution order really well once your application becomes more complex. This blog intends to provide this insight.

Reading the documentation


As a developer you provide a configured pipeline to Luigi as a python module with a number of interdependent task classes. And as a developer you want to know how Luigi instantiates these classes and calls their methods and in which order. After reading through the execution model in the Luigi docs and the source code of the Luigi worker, you know a lot about Luigi workers but still not how they call your code. That is when I decided to write a simple pipeline that just logs any call from Luigi and to infer what actually happens.

Rather explore


The figure above depicts a simple pipeline for running task F(), which depends on task C() and task E() instances, which on their turn depend on a single task B() and two task D() instances, respectively. The corresponding Luigi pipeline with all the logging statements is listed below.

 import logging  
 import luigi  
   
 logger = logging.getLogger('abcdef_pipeline')  
 logger.setLevel(logging.INFO)  
 fh = logging.FileHandler('output/abcdef_pipeline.log', 'w')  
 ff = logging.Formatter(fmt='%(asctime)s - %(levelname)s - %(message)s')  
 fh.setFormatter(ff)  
 logger.addHandler(fh)  
   
 logger.info('Module abcdef_pipeline initialization')  
   
   
 class TaskA(luigi.Task):  
   
   logger.info('CLASS TaskA')  
   
   def output(self):  
     logger.info('OUTPUT TaskA')  
     return luigi.LocalTarget('output/TaskA.output')  
   
   def run(self):  
     logger.info('RUN TaskA')  
     with self.output().open('w') as f:  
       f.write('Task A output')  
   
   
 class TaskB(luigi.Task):  
   
   logger.info('CLASS TaskB')  
   
   def output(self):  
     logger.info('OUTPUT TaskB')  
     return luigi.LocalTarget('output/TaskB.output')  
   
   def run(self):  
     logger.info('RUN TaskB')  
     with self.output().open('w') as f:  
       f.write('Task B output')  
   
   
 class TaskC(luigi.Task):  
   
   logger.info('CLASS TaskC')  
   
   def requires(self):  
     logger.info('REQUIRES TaskC')  
     return TaskB()  
   
   def output(self):  
     logger.info('OUTPUT TaskC')  
     return luigi.LocalTarget('output/TaskC.output')  
   
   def run(self):  
     logger.info('RUN TaskC')  
     with self.output().open('w') as f:  
       f.write('Task C output')  
   
   
 class TaskD(luigi.Task):  
   
   param = luigi.Parameter()  
   logger.info('CLASS TaskD')  
   
   def output(self):  
     logger.info('OUTPUT Task{} output'.format(self.param))  
     return luigi.LocalTarget('output/Task{}.output'.format(self.param))  
   
   def run(self):  
     logger.info('RUN Task{} output'.format(self.param))  
     with self.output().open('w') as f:  
       f.write('Task {} output'.format(self.param))  
   
   
 class TaskE(luigi.Task):  
   
   logger.info('CLASS TaskC')  
   
   def requires(self):  
     logger.info('REQUIRES TaskE')  
     yield TaskD(param='D0')  
     yield TaskD(param='D1')  
   
   def output(self):  
     logger.info('OUTPUT TaskE')  
     return luigi.LocalTarget('output/TaskE.output')  
   
   def run(self):  
     logger.info('RUN TaskE')  
     with self.output().open('w') as f:  
       f.write('Task E output')  
   
   
 class TaskF(luigi.Task):  
   
   logger.info('CLASS TaskF')  
   
   def requires(self):  
     logger.info('REQUIRES TaskF')  
     yield TaskA()  
     yield TaskC()  
     yield TaskE()  
   
   def output(self):  
     logger.info('OUTPUT TaskF')  
     return luigi.LocalTarget('output/TaskF.output')  
   
   def run(self):  
     logger.info('RUN TaskF')  
     with self.output().open('w') as f:  
       f.write('Task F output')  
   

This results in the logging output listed below.


# Initialization
2018-08-25 21:53:00,889 - INFO - Module abcd_pipeline initialization
2018-08-25 21:53:00,889 - INFO - CLASS TaskA
2018-08-25 21:53:00,889 - INFO - CLASS TaskB
2018-08-25 21:53:00,889 - INFO - CLASS TaskC
2018-08-25 21:53:00,890 - INFO - CLASS TaskD
2018-08-25 21:53:00,890 - INFO - CLASS TaskC
2018-08-25 21:53:00,890 - INFO - CLASS TaskF

# Depth-first search of task graph
2018-08-25 21:53:00,923 - INFO - OUTPUT TaskF
2018-08-25 21:53:00,924 - INFO - REQUIRES TaskF
2018-08-25 21:53:00,924 - INFO - OUTPUT TaskA
2018-08-25 21:53:00,925 - INFO - OUTPUT TaskC
2018-08-25 21:53:00,925 - INFO - OUTPUT TaskE
2018-08-25 21:53:00,925 - INFO - REQUIRES TaskE
2018-08-25 21:53:00,926 - INFO - OUTPUT TaskD0
2018-08-25 21:53:00,926 - INFO - OUTPUT TaskD1
2018-08-25 21:53:00,927 - INFO - REQUIRES TaskD1
2018-08-25 21:53:00,927 - INFO - REQUIRES TaskD0
2018-08-25 21:53:00,927 - INFO - REQUIRES TaskC
2018-08-25 21:53:00,928 - INFO - OUTPUT TaskB
2018-08-25 21:53:00,928 - INFO - REQUIRES TaskB
2018-08-25 21:53:00,928 - INFO - REQUIRES TaskA

# TaskA execution
2018-08-25 21:53:00,929 - INFO - REQUIRES TaskA
2018-08-25 21:53:00,929 - INFO - RUN TaskA
2018-08-25 21:53:00,929 - INFO - OUTPUT TaskA

# TaskD1 execution
2018-08-25 21:53:00,931 - INFO - REQUIRES TaskD1
2018-08-25 21:53:00,931 - INFO - RUN TaskD1
2018-08-25 21:53:00,931 - INFO - OUTPUT TaskD1

# TaskD0 execution
2018-08-25 21:53:00,933 - INFO - REQUIRES TaskD0
2018-08-25 21:53:00,933 - INFO - RUN TaskD0
2018-08-25 21:53:00,933 - INFO - OUTPUT TaskD0

# TaskE execution
2018-08-25 21:53:00,934 - INFO - REQUIRES TaskE
2018-08-25 21:53:00,934 - INFO - OUTPUT TaskD0
2018-08-25 21:53:00,934 - INFO - OUTPUT TaskD1
2018-08-25 21:53:00,934 - INFO - RUN TaskE
2018-08-25 21:53:00,934 - INFO - OUTPUT TaskE

# TaskB execution
2018-08-25 21:53:00,936 - INFO - REQUIRES TaskB
2018-08-25 21:53:00,936 - INFO - RUN TaskB
2018-08-25 21:53:00,936 - INFO - OUTPUT TaskB

# TaskC execution
2018-08-25 21:53:00,937 - INFO - REQUIRES TaskC
2018-08-25 21:53:00,937 - INFO - OUTPUT TaskB
2018-08-25 21:53:00,937 - INFO - RUN TaskC
2018-08-25 21:53:00,937 - INFO - OUTPUT TaskC

# TaskF execution
2018-08-25 21:53:00,938 - INFO - REQUIRES TaskF
2018-08-25 21:53:00,938 - INFO - OUTPUT TaskA
2018-08-25 21:53:00,938 - INFO - OUTPUT TaskC
2018-08-25 21:53:00,939 - INFO - OUTPUT TaskE
2018-08-25 21:53:00,939 - INFO - RUN TaskF
2018-08-25 21:53:00,939 - INFO - OUTPUT TaskF

And then infer

Apparently, this is what Luigi does:

def depth_first_search(tasks, start):
    task = tasks[start]
    if not task.output().exists():
        yield task
        for t in task.requires()
            depth_first_search(tasks, t)


def execute_task(task):
    if not all([exists(t.output() for t in task.requires())]):
        return False
    task.run()  
    return task.output().exists()
   
   
import your_module                                                  # 1.
tasks = {t: type(t)() for t in dir(your_module)}                    # 2.
scheduled_tasks = list(depth_first_search(tasks, 'TaskF'))          # 3.
scheduled_tasks.reverse()
results = {t.task_id: execute_task(t) for t in scheduled_tasks}     # 3.
if results['TaskF']:  
    print(':-)')  
else:  
    print(':-(')  

Actually, this is more of a pseudocode as it hides details about composite targets and requirements and the disitribution of task execution over workers. But it nicely shows that:
  1. Your pipeline module code is imported once 
  2. Task classes are instantiated once (so, under the hood, members of a task family use the same instance)
  3. Luigi has a scheduling phase and an execution phase

2018-08-12

Connecting to TinkerPop's gremlin server from R Studio with a java wrapper

Apache TinkerPop supports a number of gremlin language variants to provide native language support for connecting to gremlin server. The variants include java, python, .NET and javascript, but not R. So, when making available graph data in a TinkerPop-compatible graph database, you are likely to receive user requests to access these data from R studio. Yet, a google search on "gremlin R" only results in a suggestion to use rJava to connect to gremlin server and a cumbersome way to read some graph data into a TinkerGraph and querying it successively. Therefore, this post documents how to connect to gremlin server from R Studio using rJava.

Building the java wrapper

After experimenting a few hours with the minimally documented rJava package in R Studio, I decided this was not the way I wanted to duct-tape R Studio to gremlin server. So, rather I wrote a simple java class that does all the interfacing to gremlin server, but is easy to access from rJava. The source code is available from https://gist.github.com/vtslab/676419e6f205672aa935bb3dbfe2d1d8.

Assuming you have Apache Maven installed on your system, you can build the jar by placing the RGremlin.java file in the src/main/java directory relative to the rgremlin_pom.xml and issuing:

$ mvn install -f rgremlin_pom.xml

This gives you a target directory with the rgremlin-0.0.1.jar and a target/lib folder with dependencies of the RGremlin java class.

In this particular example, the JanusGraphIoRegistry is added to the connection. Of course, you can easily replace it with an IoRegistry for other TinkerPop implementations.

Running a gremlin query

With the java wrapper, connecting to gremlin server and submitting a gremlin query string is now as simple as:

# install.packages("rJava")
# install.packages("rjson")
library(rJava)
library(rjson)

params ← c("-Dgremlin.log4j.level=INFO")
jinit(parameters=params)
.jaddClassPath(
     dir("/some/path/lib/janusgraph-0.2.1-hadoop2/lib",  full.names=TRUE))
.jclassPath()

client ← .jnew("rgremlin.RGremlin", "localhost")
jsonStr ← .jcall(
     client, "Ljava/lang/String;",
     "submit", "g.V().id()")

result ← fromJSON(jsonStr)
print(result)

<- c="" gremlin.log4j.level="INFO" p=""><- .jnew="" p="" rgremlin.rgremlin="" some.gremlin.server=""><- .jcall="" p=""><- fromjson="" jsonstr="" p="">Note that query results are serialized as json between the java wrapper and R. Since rjson only recognizes standard types and collections, the java wrapper is not suitable for queries that return TinkerPop objects such as vertices, edges and TinkerGraphs. In the example above, vertices are returned as maps using the gremlin valueMap() step.

In the example above, the rgremlin-0.0.1.jar was simply copied to the JanusGraph lib directory, which on its turn was added to the rJava classpath. As an alternative, you can add the target and target/lib directories from the previous section to the rJava classpath.

Closing remark


The solution presented is only a quick and dirty manner to get your users going and so gather additional user requirements. Ideally, R would become available as a gremlin language variant for complete TinkerPop support and the possibility to write gremlin queries natively in R instead of submitting query strings.

2018-04-27

Understanding reponse times
for single vertex queries in JanusGraph-HBase

When building an application on JanusGraph, insight and control over the query response times is important. But when measuring these reponse times, one finds they can vary wildly for even the simplest query, from submillisecond to seconds. This blog analyses the various ways in which JanusGraph-HBase serves vertices required for answering a gremlin query. In addition, it provides some hints how to control the way a vertex is retrieved during measurements.

Digging into the JanusGraph-HBase stack

The figure shows which steps JanusGraph-HBases makes to retrieve a vertex. JanusGraph instances maintain a memory-based cache and first try to use this to serve vertex requests. To hide uncommitted changes from other transactions, mutated vertices and edges are cached per transaction while unchanged vertices are stored in a so-called database cache. Retrieving a vertex with its properties by a gremlin query from the JanusGraph cache typically takes 0.5 ms.

When a vertex request does not hit the JanusGraph cache, a connection is set up to the HBase regionserver that serves the vertex id, cq row key.  For a large graph, separate connections are established for all regionservers involved. Connections will time out after a while and have to be re-established. Connection setup time typically takes 15 ms on a kerberized cluster.

HBase regionservers maintain a so-called blockcache, consisting of small blocks of data of typical 64 kB. This way, JanusGraph can keep a very large graph in the distributed memory of an HBase cluster. However, the improved scalability comes at a price: response times for single-vertex gremlin queries now amount to about 7 ms.

The HBase blockcache will not be able to serve all vertex requests (unless you configure the JanusGraph table to be memory-resident): the cache needs to be warmed and blocks can be evicted due to the use of the blockcache by other applications. Then, the HBase regionserver needs to retrieve a particular row from its storefile. Typically, regionservers have their storefiles on Hadoop HDFS with a local replica. However, after a while local replicas of HDFS blocks may have got lost and major compactions to restore this may not have been run, resulting in a remote HDFS block access to retrieve a particular row. Response times for single-vertex gremlin queries served from local and remote storage by HBase typically amount to 30 ms and 100 ms, respectively. 

Measuring reponse times

When measuring reponse times of single-vertex gremlin queries on JanusGraph-HBase, it is useful to set the logging level of the JanusGraph client to DEBUG, because the debug messages show:

  • when a JanusGraph transaction cache is instantiated

  • when connections with HBase are established

  • when rows are retrieved from HBase.

In addition it is useful to set tracing of class loading with: export JAVA_OPTIONS='-XX:+TraceClassLoading'
because the first query of a repeated set of queries can suffer delay from class loading.

Controlling the way in which vertex retrievals are served, goes as follows:

  • From the JanusGraph transaction cache
    Set cache.db-cache=false in the JanusGraph properties file and simply repeat a single-vertex gremlin query, like: g.V(12345L).valueMap().toList()
  • From the JanusGraph database cacheSet cache.db-cache=true in the JanusGraph properties file and issue a g.tx().close() before repeating the query (setting storage.transactions=false or cache.tx-cache-size = 0 and cache.tx-dirty-size = 0 does not work)
  • Set up HBase regionserver connection
    Select relevant debug log messages during the first the round of queries
  • From the HBase blockcacheSet cache.db-cache=false in the JanusGraph properties file and issue a g.tx().close() before repeating the query. See results in the figure below.
Response times of queries served by the HBase blockcache
Response times of queries for a vertex not in the HBase blockcache

Tweaking HBase major compactions

A warning first: I am not an HBase admin, neither did I completely study any "definitive guide" on HBase. But I am not afraid to try things and I have problems with the impossible.

After some haphazard reading I figured I could get a JanusGraph-HBase table with an empty blockcache by taking a snapshot from my table and creating a clone from the snapshot. However, this clone is inititally still based on the store files from the original table and the blockcache will serve requests for both the original and cloned tables from the same blockcache items.

Next trial was to run a major compaction on the cloned table. This should rewrite the store files if they can be compacted, resulting in the blockcache not longer recognizing that rows from the original and cloned tables are identical. However, this move did not do anything to my response time measurements. Apparently, the major compaction process decided that my storefiles were fine already and it challenged me to take more drastic measures. This brought me to lower the MAX_FILESIZE attribute of the cloned table, The complete procedure in the hbase shell looks as follows:
> snapshot 'some_table', 'some_table_shot'
> clone_snapshot 'some_table_shot', 'cloned_table'
> alter 'cloned_table', MAX_FILESIZE => '536870912'
> major_compact 'cloned_table'

Now, I finally got the response time measurements presented earlier. While writing this, I realize that these figures still look as if the storefile measurements are somewhat contaminated by blockcache hits, although they do not conflict with figures mentioned elsewhere.

Unexpectedly, the major compaction with a new, smaller MAX_FILESIZE gave me a bonus. Apparently, this forced rewrite also restored data locality at the region servers as it should have done in the first place. Measurements before the compaction looked as in the figure below.
Response times for rows not in the HBase blockcache, before major compaction

Conclusions

Understanding reponse times for single vertex queries in JanusGraph-HBase requires careful scrutiny.

Major compactions on HBase tables seem unsufficiently documented and can be tweaked with some perseverence. Of course, lowering MAX_FILESIZE on a regular basis is not a winning procedure for production tables (although you could preceed it with a number of forced region merges), but it is practical for the baseline measurements presented in this blog.