-->

2019-03-31

Circular references in Plotly/Dash

Plotly Dash is a simple python web framework for quickly building interactive data visualizations. While Dash has a lot of power, it also has its limitations. One of the current limitations is that the dependencies between the Dash web components cannot be circular. If you create a callback for an output that is also part of the inputs, Dash will raise an exception that "this is bad". If you try to circumvent this check by creating two components having callbacks with interchanged inputs and outputs, the browser just shows "Error loading dependencies".

So, are circular references really "bad"? I would rather say that forbidding them is a consequence of the Dash principal to have each callback make a change in the resulting web page. But in fact, Dash is not consequent on this principal already:
  •  Dash defines a PreventUpdate exception that one can raise when the application's state does not require an update of the web page. This would be the pythonic way to handle intended circular references and is suggested in the previously linked Dash issue.
  • From the Dash gotchas: "If you have disabled callback validation in order to support dynamic layouts, then you won't be automatically alerted to the situation where a component within a callback is not found within a layout. In this situation, where a component registered with a callback is missing from the layout, the callback will fail to fire. For example, if you define a callback with only a subset of the specified Inputs present in the current page layout, the callback will simply not fire at all."
It turns out that his second "feature" can be exploited as a temporary workaround to have circular references, but before doing that, let us consider our use case. I simply wanted the style of checkboxes that you find in the column filters of Microsoft Excel and LibreOffice Calc, because I figured that my users would be accustomed to these. The list of checkboxes consists of one "select all" checkbox and the list of remaining checkboxes. Checking the "select all" box affects the state of one or more of the remaining checkboxes, while checking any of the remaining boxes can affect the state of the "select all" box. So, if we model our list of checkboxes with two Dash dcc.Checklist components, we certainly have circular dependencies.

Exploitation of the Dash gotcha works by including a Dash component in the loop, in the code example below the html.Div with id='loop_breaker'. This html.Div is dynamically generated inside the static html.Div with id='loop_breaker_container'. The 'loop_breaker' component is only generated when:
  • the user has deselected all options while the "all" checkbox is still checked
  • the user has selected all options while the "all" checkbox is still unchecked
In this way, we get our intended circular reference while the initial state of the application layout passes the Dash validation criteria on circular references.


# -*- coding: utf-8 -*-
import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output, State

external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']

app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.config['suppress_callback_exceptions'] = True

app.layout = html.Div(children=[
    html.H4(children='Excel-like checkboxes'),
    dcc.Checklist(
        id='all',
        options=[{'label': 'all', 'value': 'all'}],
        values=[]
    ),
    dcc.Checklist(
        id='cities',
        options=[
            {'label': 'New York City', 'value': 'NYC'},
            {'label': 'Montréal', 'value': 'MTL'},
            {'label': 'San Francisco', 'value': 'SF'}
        ],
        values=['MTL', 'SF']
    ),
    html.Div(id='loop_breaker_container', children=[])
])


@app.callback(Output('cities', 'values'),
              [Input('all', 'values')])
def update_cities(inputs):
    if len(inputs) == 0:
        return []
    else:
        return ['NYC', 'MTL', 'SF']


@app.callback(Output('loop_breaker_container', 'children'),
              [Input('cities', 'values')],
              [State('all', 'values')])
def update_all(inputs, _):
    states = dash.callback_context.states
    if len(inputs) == 3 and states['all.values'] == []:
        return [html.Div(id='loop_breaker', children=True)]
    elif len(inputs) == 0 and states['all.values'] == ['all']:
        return [html.Div(id='loop_breaker', children=False)]
    else:
        return []


@app.callback(Output('all', 'values'),
              [Input('loop_breaker', 'children')])
def update_loop(all_true):
    if all_true:
        return ['all']
    else:
        return []


if __name__ == '__main__':
    app.run_server(debug=True)

2018-08-26

Code execution order in Luigi pipelines

Luigi is a popular framework for writing and managing batch-oriented data pipelines in python. While the Luigi documentation and several places on the web provide simple examples to get started with Luigi, you soon discover that you have to understand Luigi's code execution order really well once your application becomes more complex. This blog intends to provide this insight.

Reading the documentation


As a developer you provide a configured pipeline to Luigi as a python module with a number of interdependent task classes. And as a developer you want to know how Luigi instantiates these classes and calls their methods and in which order. After reading through the execution model in the Luigi docs and the source code of the Luigi worker, you know a lot about Luigi workers but still not how they call your code. That is when I decided to write a simple pipeline that just logs any call from Luigi and to infer what actually happens.

Rather explore


The figure above depicts a simple pipeline for running task F(), which depends on task C() and task E() instances, which on their turn depend on a single task B() and two task D() instances, respectively. The corresponding Luigi pipeline with all the logging statements is listed below.

 import logging  
 import luigi  
   
 logger = logging.getLogger('abcdef_pipeline')  
 logger.setLevel(logging.INFO)  
 fh = logging.FileHandler('output/abcdef_pipeline.log', 'w')  
 ff = logging.Formatter(fmt='%(asctime)s - %(levelname)s - %(message)s')  
 fh.setFormatter(ff)  
 logger.addHandler(fh)  
   
 logger.info('Module abcdef_pipeline initialization')  
   
   
 class TaskA(luigi.Task):  
   
   logger.info('CLASS TaskA')  
   
   def output(self):  
     logger.info('OUTPUT TaskA')  
     return luigi.LocalTarget('output/TaskA.output')  
   
   def run(self):  
     logger.info('RUN TaskA')  
     with self.output().open('w') as f:  
       f.write('Task A output')  
   
   
 class TaskB(luigi.Task):  
   
   logger.info('CLASS TaskB')  
   
   def output(self):  
     logger.info('OUTPUT TaskB')  
     return luigi.LocalTarget('output/TaskB.output')  
   
   def run(self):  
     logger.info('RUN TaskB')  
     with self.output().open('w') as f:  
       f.write('Task B output')  
   
   
 class TaskC(luigi.Task):  
   
   logger.info('CLASS TaskC')  
   
   def requires(self):  
     logger.info('REQUIRES TaskC')  
     return TaskB()  
   
   def output(self):  
     logger.info('OUTPUT TaskC')  
     return luigi.LocalTarget('output/TaskC.output')  
   
   def run(self):  
     logger.info('RUN TaskC')  
     with self.output().open('w') as f:  
       f.write('Task C output')  
   
   
 class TaskD(luigi.Task):  
   
   param = luigi.Parameter()  
   logger.info('CLASS TaskD')  
   
   def output(self):  
     logger.info('OUTPUT Task{} output'.format(self.param))  
     return luigi.LocalTarget('output/Task{}.output'.format(self.param))  
   
   def run(self):  
     logger.info('RUN Task{} output'.format(self.param))  
     with self.output().open('w') as f:  
       f.write('Task {} output'.format(self.param))  
   
   
 class TaskE(luigi.Task):  
   
   logger.info('CLASS TaskC')  
   
   def requires(self):  
     logger.info('REQUIRES TaskE')  
     yield TaskD(param='D0')  
     yield TaskD(param='D1')  
   
   def output(self):  
     logger.info('OUTPUT TaskE')  
     return luigi.LocalTarget('output/TaskE.output')  
   
   def run(self):  
     logger.info('RUN TaskE')  
     with self.output().open('w') as f:  
       f.write('Task E output')  
   
   
 class TaskF(luigi.Task):  
   
   logger.info('CLASS TaskF')  
   
   def requires(self):  
     logger.info('REQUIRES TaskF')  
     yield TaskA()  
     yield TaskC()  
     yield TaskE()  
   
   def output(self):  
     logger.info('OUTPUT TaskF')  
     return luigi.LocalTarget('output/TaskF.output')  
   
   def run(self):  
     logger.info('RUN TaskF')  
     with self.output().open('w') as f:  
       f.write('Task F output')  
   

This results in the logging output listed below.


# Initialization
2018-08-25 21:53:00,889 - INFO - Module abcd_pipeline initialization
2018-08-25 21:53:00,889 - INFO - CLASS TaskA
2018-08-25 21:53:00,889 - INFO - CLASS TaskB
2018-08-25 21:53:00,889 - INFO - CLASS TaskC
2018-08-25 21:53:00,890 - INFO - CLASS TaskD
2018-08-25 21:53:00,890 - INFO - CLASS TaskC
2018-08-25 21:53:00,890 - INFO - CLASS TaskF

# Depth-first search of task graph
2018-08-25 21:53:00,923 - INFO - OUTPUT TaskF
2018-08-25 21:53:00,924 - INFO - REQUIRES TaskF
2018-08-25 21:53:00,924 - INFO - OUTPUT TaskA
2018-08-25 21:53:00,925 - INFO - OUTPUT TaskC
2018-08-25 21:53:00,925 - INFO - OUTPUT TaskE
2018-08-25 21:53:00,925 - INFO - REQUIRES TaskE
2018-08-25 21:53:00,926 - INFO - OUTPUT TaskD0
2018-08-25 21:53:00,926 - INFO - OUTPUT TaskD1
2018-08-25 21:53:00,927 - INFO - REQUIRES TaskD1
2018-08-25 21:53:00,927 - INFO - REQUIRES TaskD0
2018-08-25 21:53:00,927 - INFO - REQUIRES TaskC
2018-08-25 21:53:00,928 - INFO - OUTPUT TaskB
2018-08-25 21:53:00,928 - INFO - REQUIRES TaskB
2018-08-25 21:53:00,928 - INFO - REQUIRES TaskA

# TaskA execution
2018-08-25 21:53:00,929 - INFO - REQUIRES TaskA
2018-08-25 21:53:00,929 - INFO - RUN TaskA
2018-08-25 21:53:00,929 - INFO - OUTPUT TaskA

# TaskD1 execution
2018-08-25 21:53:00,931 - INFO - REQUIRES TaskD1
2018-08-25 21:53:00,931 - INFO - RUN TaskD1
2018-08-25 21:53:00,931 - INFO - OUTPUT TaskD1

# TaskD0 execution
2018-08-25 21:53:00,933 - INFO - REQUIRES TaskD0
2018-08-25 21:53:00,933 - INFO - RUN TaskD0
2018-08-25 21:53:00,933 - INFO - OUTPUT TaskD0

# TaskE execution
2018-08-25 21:53:00,934 - INFO - REQUIRES TaskE
2018-08-25 21:53:00,934 - INFO - OUTPUT TaskD0
2018-08-25 21:53:00,934 - INFO - OUTPUT TaskD1
2018-08-25 21:53:00,934 - INFO - RUN TaskE
2018-08-25 21:53:00,934 - INFO - OUTPUT TaskE

# TaskB execution
2018-08-25 21:53:00,936 - INFO - REQUIRES TaskB
2018-08-25 21:53:00,936 - INFO - RUN TaskB
2018-08-25 21:53:00,936 - INFO - OUTPUT TaskB

# TaskC execution
2018-08-25 21:53:00,937 - INFO - REQUIRES TaskC
2018-08-25 21:53:00,937 - INFO - OUTPUT TaskB
2018-08-25 21:53:00,937 - INFO - RUN TaskC
2018-08-25 21:53:00,937 - INFO - OUTPUT TaskC

# TaskF execution
2018-08-25 21:53:00,938 - INFO - REQUIRES TaskF
2018-08-25 21:53:00,938 - INFO - OUTPUT TaskA
2018-08-25 21:53:00,938 - INFO - OUTPUT TaskC
2018-08-25 21:53:00,939 - INFO - OUTPUT TaskE
2018-08-25 21:53:00,939 - INFO - RUN TaskF
2018-08-25 21:53:00,939 - INFO - OUTPUT TaskF

And then infer

Apparently, this is what Luigi does:

def depth_first_search(tasks, start):
    task = tasks[start]
    if not task.output().exists():
        yield task
        for t in task.requires()
            depth_first_search(tasks, t)


def execute_task(task):
    if not all([exists(t.output() for t in task.requires())]):
        return False
    task.run()  
    return task.output().exists()
   
   
import your_module                                                  # 1.
tasks = {t: type(t)() for t in dir(your_module)}                    # 2.
scheduled_tasks = list(depth_first_search(tasks, 'TaskF'))          # 3.
scheduled_tasks.reverse()
results = {t.task_id: execute_task(t) for t in scheduled_tasks}     # 3.
if results['TaskF']:  
    print(':-)')  
else:  
    print(':-(')  

Actually, this is more of a pseudocode as it hides details about composite targets and requirements and the disitribution of task execution over workers. But it nicely shows that:
  1. Your pipeline module code is imported once 
  2. Task classes are instantiated once (so, under the hood, members of a task family use the same instance)
  3. Luigi has a scheduling phase and an execution phase

2018-08-12

Connecting to TinkerPop's gremlin server from R Studio with a java wrapper

Apache TinkerPop supports a number of gremlin language variants to provide native language support for connecting to gremlin server. The variants include java, python, .NET and javascript, but not R. So, when making available graph data in a TinkerPop-compatible graph database, you are likely to receive user requests to access these data from R studio. Yet, a google search on "gremlin R" only results in a suggestion to use rJava to connect to gremlin server and a cumbersome way to read some graph data into a TinkerGraph and querying it successively. Therefore, this post documents how to connect to gremlin server from R Studio using rJava.

Building the java wrapper

After experimenting a few hours with the minimally documented rJava package in R Studio, I decided this was not the way I wanted to duct-tape R Studio to gremlin server. So, rather I wrote a simple java class that does all the interfacing to gremlin server, but is easy to access from rJava. The source code is available from https://gist.github.com/vtslab/676419e6f205672aa935bb3dbfe2d1d8.

Assuming you have Apache Maven installed on your system, you can build the jar by placing the RGremlin.java file in the src/main/java directory relative to the rgremlin_pom.xml and issuing:

$ mvn install -f rgremlin_pom.xml

This gives you a target directory with the rgremlin-0.0.1.jar and a target/lib folder with dependencies of the RGremlin java class.

In this particular example, the JanusGraphIoRegistry is added to the connection. Of course, you can easily replace it with an IoRegistry for other TinkerPop implementations.

Running a gremlin query

With the java wrapper, connecting to gremlin server and submitting a gremlin query string is now as simple as:

# install.packages("rJava")
# install.packages("rjson")
library(rJava)
library(rjson)

params ← c("-Dgremlin.log4j.level=INFO")
jinit(parameters=params)
.jaddClassPath(
     dir("/some/path/lib/janusgraph-0.2.1-hadoop2/lib",  full.names=TRUE))
.jclassPath()

client ← .jnew("rgremlin.RGremlin", "localhost")
jsonStr ← .jcall(
     client, "Ljava/lang/String;",
     "submit", "g.V().id()")

result ← fromJSON(jsonStr)
print(result)

<- c="" gremlin.log4j.level="INFO" p=""><- .jnew="" p="" rgremlin.rgremlin="" some.gremlin.server=""><- .jcall="" p=""><- fromjson="" jsonstr="" p="">Note that query results are serialized as json between the java wrapper and R. Since rjson only recognizes standard types and collections, the java wrapper is not suitable for queries that return TinkerPop objects such as vertices, edges and TinkerGraphs. In the example above, vertices are returned as maps using the gremlin valueMap() step.

In the example above, the rgremlin-0.0.1.jar was simply copied to the JanusGraph lib directory, which on its turn was added to the rJava classpath. As an alternative, you can add the target and target/lib directories from the previous section to the rJava classpath.

Closing remark


The solution presented is only a quick and dirty manner to get your users going and so gather additional user requirements. Ideally, R would become available as a gremlin language variant for complete TinkerPop support and the possibility to write gremlin queries natively in R instead of submitting query strings.

2018-04-27

Understanding reponse times
for single vertex queries in JanusGraph-HBase

When building an application on JanusGraph, insight and control over the query response times is important. But when measuring these reponse times, one finds they can vary wildly for even the simplest query, from submillisecond to seconds. This blog analyses the various ways in which JanusGraph-HBase serves vertices required for answering a gremlin query. In addition, it provides some hints how to control the way a vertex is retrieved during measurements.

Digging into the JanusGraph-HBase stack

The figure shows which steps JanusGraph-HBases makes to retrieve a vertex. JanusGraph instances maintain a memory-based cache and first try to use this to serve vertex requests. To hide uncommitted changes from other transactions, mutated vertices and edges are cached per transaction while unchanged vertices are stored in a so-called database cache. Retrieving a vertex with its properties by a gremlin query from the JanusGraph cache typically takes 0.5 ms.

When a vertex request does not hit the JanusGraph cache, a connection is set up to the HBase regionserver that serves the vertex id, cq row key.  For a large graph, separate connections are established for all regionservers involved. Connections will time out after a while and have to be re-established. Connection setup time typically takes 15 ms on a kerberized cluster.

HBase regionservers maintain a so-called blockcache, consisting of small blocks of data of typical 64 kB. This way, JanusGraph can keep a very large graph in the distributed memory of an HBase cluster. However, the improved scalability comes at a price: response times for single-vertex gremlin queries now amount to about 7 ms.

The HBase blockcache will not be able to serve all vertex requests (unless you configure the JanusGraph table to be memory-resident): the cache needs to be warmed and blocks can be evicted due to the use of the blockcache by other applications. Then, the HBase regionserver needs to retrieve a particular row from its storefile. Typically, regionservers have their storefiles on Hadoop HDFS with a local replica. However, after a while local replicas of HDFS blocks may have got lost and major compactions to restore this may not have been run, resulting in a remote HDFS block access to retrieve a particular row. Response times for single-vertex gremlin queries served from local and remote storage by HBase typically amount to 30 ms and 100 ms, respectively. 

Measuring reponse times

When measuring reponse times of single-vertex gremlin queries on JanusGraph-HBase, it is useful to set the logging level of the JanusGraph client to DEBUG, because the debug messages show:

  • when a JanusGraph transaction cache is instantiated

  • when connections with HBase are established

  • when rows are retrieved from HBase.

In addition it is useful to set tracing of class loading with: export JAVA_OPTIONS='-XX:+TraceClassLoading'
because the first query of a repeated set of queries can suffer delay from class loading.

Controlling the way in which vertex retrievals are served, goes as follows:

  • From the JanusGraph transaction cache
    Set cache.db-cache=false in the JanusGraph properties file and simply repeat a single-vertex gremlin query, like: g.V(12345L).valueMap().toList()
  • From the JanusGraph database cacheSet cache.db-cache=true in the JanusGraph properties file and issue a g.tx().close() before repeating the query (setting storage.transactions=false or cache.tx-cache-size = 0 and cache.tx-dirty-size = 0 does not work)
  • Set up HBase regionserver connection
    Select relevant debug log messages during the first the round of queries
  • From the HBase blockcacheSet cache.db-cache=false in the JanusGraph properties file and issue a g.tx().close() before repeating the query. See results in the figure below.
Response times of queries served by the HBase blockcache
Response times of queries for a vertex not in the HBase blockcache

Tweaking HBase major compactions

A warning first: I am not an HBase admin, neither did I completely study any "definitive guide" on HBase. But I am not afraid to try things and I have problems with the impossible.

After some haphazard reading I figured I could get a JanusGraph-HBase table with an empty blockcache by taking a snapshot from my table and creating a clone from the snapshot. However, this clone is inititally still based on the store files from the original table and the blockcache will serve requests for both the original and cloned tables from the same blockcache items.

Next trial was to run a major compaction on the cloned table. This should rewrite the store files if they can be compacted, resulting in the blockcache not longer recognizing that rows from the original and cloned tables are identical. However, this move did not do anything to my response time measurements. Apparently, the major compaction process decided that my storefiles were fine already and it challenged me to take more drastic measures. This brought me to lower the MAX_FILESIZE attribute of the cloned table, The complete procedure in the hbase shell looks as follows:
> snapshot 'some_table', 'some_table_shot'
> clone_snapshot 'some_table_shot', 'cloned_table'
> alter 'cloned_table', MAX_FILESIZE => '536870912'
> major_compact 'cloned_table'

Now, I finally got the response time measurements presented earlier. While writing this, I realize that these figures still look as if the storefile measurements are somewhat contaminated by blockcache hits, although they do not conflict with figures mentioned elsewhere.

Unexpectedly, the major compaction with a new, smaller MAX_FILESIZE gave me a bonus. Apparently, this forced rewrite also restored data locality at the region servers as it should have done in the first place. Measurements before the compaction looked as in the figure below.
Response times for rows not in the HBase blockcache, before major compaction

Conclusions

Understanding reponse times for single vertex queries in JanusGraph-HBase requires careful scrutiny.

Major compactions on HBase tables seem unsufficiently documented and can be tweaked with some perseverence. Of course, lowering MAX_FILESIZE on a regular basis is not a winning procedure for production tables (although you could preceed it with a number of forced region merges), but it is practical for the baseline measurements presented in this blog.

2017-07-06

Configuring JanusGraph for Spark-Yarn

JanusGraph is an Apache Tinkerpop implementation of a graph database; it is based on a number of popular storage and search backends (e.g. Apache HBase, Apache Cassandra, Apache Solr, Elasticsearch). While provisioning of a storage backend seems to point to mainly transactional use (OLTP), the storage backend connectors also come with InputFormat classes for analytical use with Hadoop/Spark (OLAP), in particular with the Tinkerpop GraphComputer implementations for the Tinkerpop HadoopGraph.
In this post, I describe how to configure the JanusGraph-0.1.1 binary distribution for use with an existing Hadoop/HBase/Spark-Yarn cluster. This post is a follow-up on an earlier post in which I did the same for the bare Apache Tinkerpop-3.2.3 distribution.

Some background

The configurations for running OLAP queries on JanusGraph turn out to be more complicated than for Tinkerpop. While the JanusGraph team did a great job in releasing the Titan fork so soon after their foundation, the janusgraph-hbase module still has some particular quirks:
  • proper working depends on the order of items on the classpath (e.g. because janusgraph-hbase contains a literal copy of the guava-12 StopWatch class while the classpath alse needs the guava-18 dependency)
  • the hbase cluster configs need to be present in both the hbase-site.xml file on the classpath and in the HadoopGraph properties file
Running Apache Spark as an Apache Hadoop yarn-client application results in a distributed setup of java containers (JVM's), each with its own classpath:
  • the client application (e.g. the gremlin console) runs the yarn client and contains the spark driver and the SparkContext
  • Yarn runs the spark cluster manager in a separate container on the cluster, the yarn ApplicationManager
  • Yarn runs a separate container for each requested spark executor

Prerequisites

The prerequisites for following the recipe for configuring JanusGraph are the same as for Tinkerpop in my earlier post, so I will not repeat those. Where the current recipe makes JanusGraph perform an OLAP job on a graph persisted in HBase, the earlier Tinkerpop recipe allows you to perform an OLAP job on a graph persisted as kryo file on hdfs.
Note that:
  • now you need the JanusGraph-0.1.1 binary distribution instead of the Tinkerpop-3.2.3 distribution;
  • the JanusGraph distribution already includes the hadoop-gremlin and spark-gremlin plugins. The plugin libs are simply present in the lib folder and the jars to be added are assumed to be in the lib2 folder with respect to the root of the JanusGraph distribution;
  • In effect, you can also follow the Tinkerpop for Spark-Yarn recipe using the JanusGraph-0.1.1 distribution, provided that you replace the single occurrence of '/ext/spark-gremlin' in the configs by '/'. 

Configuration

Create a shell script (e.g. bin/jg011.sh) with the following contents:

#!/bin/bash

GREMLIN_HOME=/home/biko/JG011binary
cd $GREMLIN_HOME

# Have janusgraph find the hadoop and hbase cluster configs and spark-yarn dependencies
export CLASSPATH=/usr/hdp/current/hadoop-client/conf:/usr/hdp/current/hbase-client/conf:$GREMLIN_HOME/lib/*:$GREMLIN_HOME/lib2/*

# Have hadoop find its native libraries
export JAVA_OPTIONS="-Djava.library.path=/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64"

# Does not work for spark-yarn, see spark.yarn.appMasterEnv.CLASSPATH and
# spark.executor.extraClassPath. Set nevertheless to get rid of the warning.
export HADOOP_GREMLIN_LIBS=$GREMLIN_HOME/empty

bin/gremlin.sh

Create the file conf/hadoop-graph/hadoop-gryo-yarn.properties:

#
# Hadoop Graph Configuration
#
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=org.janusgraph.hadoop.formats.hbase.HBaseInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat

gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=output

#
# JanusGraph HBase InputFormat configuration
#
janusgraphmr.ioformat.conf.storage.backend=hbase
#janusgraphmr.ioformat.conf.storage.hostname=fqdn1,fqdn2,fqdn3
janusgraphmr.ioformat.conf.storage.hostname=127.0.0.1
janusgraphmr.ioformat.conf.storage.hbase.table=janusgraph
zookeeper.znode.parent=/hbase-unsecure
# Security configs are needed in case of a secure cluster
#zookeeper.znode.parent=/hbase-secure
#hbase.rpc.protection=privacy
#hbase.security.authentication=kerberos

#
# SparkGraphComputer with Yarn Configuration
#
spark.master=yarn-client
spark.executor.memory=512m
spark.serializer=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer
spark.yarn.dist.archives=/home/biko/JG011binary/lib.zip
spark.yarn.dist.files=/home/biko/JG011binary/janusgraph-hbase-0.1.1.jar
spark.yarn.appMasterEnv.CLASSPATH=/usr/hdp/current/hadoop-client/conf:./lib.zip/*:
spark.executor.extraClassPath=/usr/hdp/current/hadoop-client/conf:/usr/hdp/current/hbase-client/conf:janusgraph-hbase-0.1.1.jar:./lib.zip/*
spark.driver.extraLibraryPath /usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64
spark.executor.extraLibraryPath /usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64

#
# Relevant configs from spark-defaults.conf
#
spark.eventLog.dir hdfs:///spark-history
spark.eventLog.enabled true
spark.yarn.historyServer.address sandbox.hortonworks.com:18080
spark.history.fs.logDirectory hdfs:///spark-history
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
spark.history.ui.port 18080
spark.history.kerberos.enabled false
spark.history.kerberos.keytab none
spark.history.kerberos.principal none

spark.yarn.am.waitTime 10
spark.yarn.containerLauncherMaxThreads 25
spark.yarn.executor.memoryOverhead 384
spark.yarn.preserve.staging.files false
spark.yarn.queue default
spark.yarn.scheduler.heartbeat.interval-ms 5000
spark.yarn.submit.file.replication 3


Demonstration

If you followed the recipe this far, you are ready to run your own demo:

[root@sandbox ~]# . /home/yourdir/bin/jg011.sh

         \,,,/
         (o o)
-----oOOo-(3)-oOOo-----
plugin activated: janusgraph.imports
plugin activated: tinkerpop.server
plugin activated: tinkerpop.utilities
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/yourdir/JG011binary/lib/slf4j-log4j12-1.7.12.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/yourdir/JG011binary/lib/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
19:07:43,534  INFO HadoopGraph:87 - HADOOP_GREMLIN_LIBS is set to: /home/yourdir/JG011binary/empty
plugin activated: tinkerpop.hadoop
plugin activated: tinkerpop.spark
plugin activated: tinkerpop.tinkergraph

// Loading a graph into the default janusgraph table
gremlin] graph = JanusGraphFactory.open('conf/janusgraph-hbase.properties')
==] standardjanusgraph[hbase:[127.0.0.1]]
gremlin] GraphOfTheGodsFactory.loadWithoutMixedIndex(graph,true)
==] null
gremlin] g=graph.traversal()
==] graphtraversalsource[standardjanusgraph[hbase:[127.0.0.1]], standard]
gremlin] g.V().count()
19:10:45,921  WARN StandardJanusGraphTx:1273 - Query requires iterating over all vertices [()]. For better performance, use indexes
==]12

// Loading of a HadoopGraph from janusgraph's hbase table
gremlin] graph = GraphFactory.open('conf/hadoop-graph/read-hbase-spark-yarn.properties')
==] hadoopgraph[hbaseinputformat-gryooutputformat]
gremlin] g = graph.traversal().withComputer(SparkGraphComputer)
==] graphtraversalsource[hadoopgraph[hbaseinputformat-
      gryooutputformat], sparkgraphcomputer]
gremlin] g.V().count()
==] 12
gremlin]

Final remarks

  • As for the Tinkerpop recipe, the current recipe was also tested on a real, secure spark-yarn cluster (HDP-2.5.3.0-37). Some configs are superfluous on the sandbox but are needed on the real cluster (e.g. the yarn.dist.files property)
  • As for the Tinkerpop recipe, the current recipe should work for other client applications than the gremlin console, as long as you do not use spark-submit or spark-shell. Getting your dependencies right turns out to be tedious though (maybe some other post);
  • Critical readers may note that the CLASSPATH in the run script contains a lot of superfluous items, because the gremlin.sh script already puts most items on. Leaving $GREMLIN_HOME/lib/* out, however, interferes with the logging configuration in ways I still do not understand.

2017-06-29

Configuring Apache Tinkerpop for Spark-Yarn

Apache TinkerPop is a graph computing framework for both graph databases (OLTP) and graph analytic systems (OLAP). Today, we focus on the OLAP part using Tinkerpop's own HadoopGraph and SparkGraphComputer implementations. While all OLAP tests and examples in the reference documentation run fine on a standalone machine, many people including myself have spent days and days in (trying) getting OLAP working on an actual Spark-Yarn cluster:
https://groups.google.com/forum/#!searchin/gremlin-users/yarn%7Csort:relevance
https://groups.google.com/forum/#!searchin/aureliusgraphs/yarn%7Csort:relevance
https://groups.google.com/forum/#!searchin/janusgraph-users/yarn%7Csort:relevance

With more experience in jar hell than a few years back, and with the recent Tinkerpop-based JanusGraph initiative as a stimulus, I decided to give the configuration of Apache TinkerPop for Spark-Yarn another try.

Prerequisites

The recipes below were tested on a Hortonworks Data Platform sandbox (version 2.5.0.0-1245) and on an actual multi-node HDP cluster (version 2.5.3.0-37), but I expect the recipes to work on other hadoop distributions or a vanilla hadoop cluster as well.
I assume that you have downloaded the Hortonworks VM, imported it into Virtualbox, can ssh into the sandbox and upload files to it (e.g. using an sftp:// location in Nautilus). Beware: set the sandbox configs from NAT to "bridged" and use port 2222 to ssh into the actual Docker container in the sandbox. Of course, you can also use putty and winscp to the same effect.

You will also need the following jars (commands from within the sandbox):
wget http://central.maven.org/maven2/org/apache/spark/spark-yarn_2.10/1.6.1/spark-yarn_2.10-1.6.1.jar 
wget http://central.maven.org/maven2/com/google/inject/extensions/guice-servlet/3.0/guice-servlet-3.0.jar
wget http://central.maven.org/maven2/org/apache/hadoop/hadoop-yarn-server-web-proxy/2.2.0/hadoop-yarn-server-web-proxy-2.2.0.jar
wget http://central.maven.org/maven2/org/scala-lang/scala-reflect/2.10.5/scala-reflect-2.10.5.jar

Ideally, the Tinkerpop spark-gremlin plugin would have the spark-yarn dependency, so that the corresponding jars are included in the binary distribution.
If your hadoop cluster is configured with lzo compression, you will also need the hadoop-lzo jar. In case of the Hortonworks sandbox you will find it at:
cp /usr/hdp/current/hadoop-client/lib/hadoop-lzo-0.6.0.2.5.0.0-1245.jar .

Finally, you will need the actual Tinkerpop binaries for the gremlin-console. I used version 3.2.3 because of its compatibility with JanusGraph-0.1.1. Unpack this archive and install the spark-gremlin plugin (you do not need the hadoop-gremlin plugin). Copy the 4(5) jars gathered above into a ext/spark-gremlin/lib2 folder with respect root of the Tinkerpop distribution. Create a lib.zip archive in the root of the Tinkerpop distribution with all jars from the lib folder, the ext/spark-gremlin/lib folder and the ext/spark-gremlin/lib2 folder.

Configuration

Most configuration problems of Tinkerpop on Spark-Yarn stem from three reasons:
  1. SparkGraphComputer creates its own SparkContext so it does not get any configs from the usual spark-submit command.

  2. The Tinkerpop gremlin-spark plugin does not include spark-yarn dependencies.

  3. Adding the cluster's spark-assembly to the classpath creates a host of version conflicts, because Spark 1.x dependency versions have remained frozen since 2014.
In the recipe below I follow a minimalist approach in which I only add the absolutely necessary dependencies in addition to the dependencies included in the Tinkerpop binary distribution. The sandbox/cluster spark installation is almost completely ignored, with the exception of the history service. This approach minimizes the chance of jar version conflicts. 
Note that I also ignore the instructions from the reference documentation regarding the use of HADOOP_GREMLIN_LIBS and the bin/hadoop/init-tp-spark.sh script. On a yarn cluster this approach runs into a bootstrap problem where the Spark application master does not have access to the file cache. Rather, I exploit the spark.yarn.dist.archives feature that does not have this problem and does not need write access to the datanodes either.

Create a shell script (e.g. bin/tp323.sh) with the following contents:
#!/bin/bash
GREMLIN_HOME=/home/yourdir/TP323binary
cd $GREMLIN_HOME

# Have tinkerpop find the hadoop cluster configs and spark-yarn dependencies
export CLASSPATH=/etc/hadoop/conf:/home/yourname/TP323binary/ext/spark-gremlin/lib2/*

# Have hadoop find its native libraries
export JAVA_OPTIONS="-Djava.library.path=/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64"

# Does not work for spark-yarn, see spark.yarn.appMasterEnv.CLASSPATH and 
# spark.executor.extraClassPath. Set nevertheless to get rid of the warning.
export HADOOP_GREMLIN_LIBS=$GREMLIN_HOME/empty

bin/gremlin.sh


Create the file conf/hadoop/hadoop-gryo-yarn.properties:

gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphReader=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
gremlin.hadoop.graphWriter=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat
gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.defaultGraphComputer=org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer

gremlin.hadoop.inputLocation=tinkerpop-modern.kryo
gremlin.hadoop.outputLocation=output

##############################################
# SparkGraphComputer with Yarn Configuration #
##############################################
spark.master=yarn-client
spark.executor.memory=512m
spark.serializer=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer
spark.yarn.dist.archives=/home/yourdir/TP323binary/lib.zip
spark.yarn.appMasterEnv.CLASSPATH=./lib.zip/*:/usr/hdp/current/hadoop-client/conf
spark.executor.extraClassPath=./lib.zip/*:/usr/hdp/current/hadoop-client/conf
spark.driver.extraLibraryPath /usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64
spark.executor.extraLibraryPath /usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64
 
#############################################
# Relevant configs from spark-defaults.conf #
#############################################
spark.eventLog.dir hdfs:///spark-history
spark.eventLog.enabled true
spark.yarn.historyServer.address sandbox.hortonworks.com:18080
spark.history.fs.logDirectory hdfs:///spark-history
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
spark.history.ui.port 18080
spark.history.kerberos.keytab none
spark.history.kerberos.principal none

spark.yarn.am.waitTime 10
spark.yarn.containerLauncherMaxThreads 25
spark.yarn.executor.memoryOverhead 384
spark.yarn.preserve.staging.files false
spark.yarn.queue default
spark.yarn.scheduler.heartbeat.interval-ms 5000
spark.yarn.submit.file.replication 3


Note that:
  1. the spark.yarn.dist.archives property points to the archive with jars on the local file system and is loaded into the various Yarn containers. As a result the lib.zip archive becomes available as the directory lib.zip in the Yarn containers.
  2. spark.executor.extraClassPath gives the classpath to the jar files internally in the Yarn container. This is why it contains ./lib.zip/*. Just because a spark executor got the jars loaded into its container, does not mean it knows how to access them automatically.

Demonstration

Now, you are ready for the demo.
[root@sandbox TP323binary]# hdfs dfs -put data/tinkerpop-modern.kryo .
[root@sandbox TP323binary]# . bin/tp323.sh

         \,,,/
         (o o)
-----oOOo-(3)-oOOo-----
plugin activated: tinkerpop.spark
plugin activated: tinkerpop.server
plugin activated: tinkerpop.utilities
plugin activated: tinkerpop.tinkergraph
gremlin] graph = GraphFactory.open('conf/hadoop/hadoop-gryo-yarn.properties')
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/yourdir/TP323binary/ext/spark-gremlin/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/yourdir/TP323binary/lib/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
==]hadoopgraph[gryoinputformat->gryooutputformat]
gremlin] g = graph.traversal().withComputer(SparkGraphComputer)
==]graphtraversalsource[hadoopgraph[gryoinputformat->gryooutputformat], sparkgraphcomputer]
gremlin] g.V().count()
==]6
gremlin]


If you still run into exceptions, the best way to see what is going in is to look into the yarn resource manager (http://192.168.178.75:8088/cluster) for your applicationId and get the logs using "yarn logs -applicationId application_1498627870374_0008" from the command shell.

Final remarks

A few final remarks:

  • I have not yet tested the recipe yet for a java/scala application. However, things should not be very different compared to the gremlin/groovy console as long as you do not use spark-submit or spark-shell.
  • You may not like the idea that the hadoop and spark versions from the Tinkerpop binaries differ from the verions in your cluster. If so, just build Tinkerpop from source with the corresponding dependencies changed in the various pom files (e.g. spark-core_2.10-1.6.1.2.5.0.0-1245.jar instead of
    spark-core_2.10-1.6.1.jar.
  • While this recipe may save you a lot of time if you are new to TinkerPop, actually using TinkerPop OLAP will still require a fair amount of time unless you have deep knowledge of configuring JVM projects and Apache Spark. 

2016-01-02

Extracting ego networks with Apache Spark GraphX

Suppose you want to do graph computations on a social network with tens of millions of vertices, but all your data are stored in a Hortonworks secure Hadoop cluster and are supposed to stay there: what are your options? Having installed Neo4j on the cluster is a no-go: the devops team is having a good time getting the cluster and its associated services in production and distractions cannot be allowed. The first thing that comes to mind is applying Apache Spark GraphX, which is part of the Hortonworks Data Platform. It provides a scalable solution to graph computing based on Google's Pregel algorithm. Is Spark GraphX a suitable tool for devops teams to develop and maintain graph-based information services? Let's see.