-->

2019-07-18

Running the latest Apache Spark version on an existing Hadoop cluster

The other day I heard that two colleagues had managed to run the latest Apache Spark release on an ageing HDP-2.6.x Hadoop cluster. I figured that was cool because I had tried to run Apache TinkerPop's OLAP queries on the same Hadoop cluster without success and knew that a Google search on this issue does not return any usable resources.

While replaying their experiment on my own machine, I hit upon a small configuration issue that did lead me to a previous description of the central idea needed to run vanilla Apache Spark on a commercial Hadoop distribution. It seems, however, that the author used a potentially offensive word in his blog title (which I will not repeat here for obvious reasons) that prevented the blog from appearing in the top 10 results of any Google query on the subject. So, the main intention of my current blog is to get this useful information higher in the Google search results. Posting on blogger.com might also help in this. While at the job, I will provide some addditional details.

The central idea is that you use the vanilla spark-2.4.3-bin-without-hadoop binary distribution. At first sight this seems counterintuitive: Spark provides binary distribution for the various Hadoop versions and the distribution without Hadoop seems only geared towards a stand-alone Spark cluster. However, on second thought it is only logical: the compatibility issues between vanilla Spark and commercial Hadoop distributions arise from the fact that commercial parties like Cloudera, (former) HortonWorks and MapR backport new Hadoop features into older Hadoop versions to satisfy their need for "stable" versions. The issue I ran into with HDP-2.6.x is that Hadoop services could raise HA-related exceptions that are not known to the vanilla Hadoop client. So, this also renders any vanilla Spark version unusable. By using spark-*-without-hadoop you can simply add your cluster's Hadoop binaries to your application classpath and everything will be fine.

Of course, there are still some catches. Apparently, in the case I described Hortonworks only modified the API'so of Hadoop services and left the API of the Hadoop client untouched. But one day, a provider could decide to "optimize" the interworking between the Hadoop client and Spark. Also, putting the complete Hadoop client binaries on your classpath bears the risk of additional dependency conflicts compared to the set of transitive dependencies that you already would get from just using Spark.

After seeing the logic of using spark-*-without-hadoop, the actual job configuration is surprisingly simple. The examples below assume that you have the spark-2.4.3-bin-without-hadoop distribution available in /opt. You only need the distribution on your local machine, not on the cluster. For the particular case of HDP, the various configuration files present in /etc/hadoop/conf require the hdp.version system property to be set on the JVM's of both the spark driver and the Yarn application manager. Other commercial distributions may have similar requirements.

Happy sparking!


Spark Java:

export SPARK_HOME=/opt/spark-2.4.3-bin-without-hadoop
export SPARK_MAJOR_VERSION=2
export SPARK_DIST_CLASSPATH=$(hadoop classpath)
export HADOOP_CONF_DIR=/etc/hadoop/conf/

$SPARK_HOME/bin/spark-submit --master yarn --deploy-mode client \
            --class org.apache.spark.examples.SparkPi \
            --conf "spark.driver.extraJavaOptions=-Dhdp.version=2.6.2.0-205" \
            --conf "spark.yarn.am.extraJavaOptions=-Dhdp.version=2.6.2.0-205" \
            $SPARK_HOME/examples/jars/spark-examples_2.11-2.4.3.jar


PySpark:

export SPARK_HOME=/opt/spark-2.4.3-bin-without-hadoop
export SPARK_MAJOR_VERSION=2
export SPARK_DIST_CLASSPATH=$(hadoop classpath)
export HADOOP_CONF_DIR=/etc/hadoop/conf/
export PYSPARK_PYTHON=/opt/rh/rh-python36/root/usr/bin/python

$SPARK_HOME/bin/spark-submit --master yarn --deploy-mode client \
            --conf "spark.driver.extraJavaOptions=-Dhdp.version=2.6.2.0-205" \
            --conf "spark.yarn.am.extraJavaOptions=-Dhdp.version=2.6.2.0-205" \
            $SPARK_HOME/examples/src/main/python/pi.py


2019-06-16

Transferring a subgraph from Janusgraph to Neo4j


This blog will not go in very much technical detail, but merely addresses the fact that a Google search on the blog title does not guide you to any immediately usable resource. Yet, I think this is a relevant use case. While the JanusGraph backends enable you to store and query huge datasets in a linearly scalable way, data science teams often prefer to work on smaller subsets of the graph data in the Neo4j clients because of the better support for visual data exploration and for mixing in additional data.

Exporting a subgraph from JanusGraph

The gremlin query language has a dedicated subgraph "step" to include edges and their attached vertices into a dataset that can be operated upon as a graph. The code below, for execution in the gremlin console, extracts some data from the Graph of the Gods sample graph and subsequently writes it to a file in the graphML format.

graph = JanusGraphFactory.open("inmemory")
GraphOfTheGodsFactory.loadWithoutMixedIndex(graph,true)
g = graph.traversal()

subGraph = g.V().has('name', 'jupiter').bothE().subgraph('jupiter').cap('jupiter').next()
 
stream = new FileOutputStream("data/jupiter.xml")
GraphMLWriter.build().vertexLabelKey("labels").create().writeGraph(stream, subGraph)

 
This code uses the GraphMLWriter class.  While the TinkerPop reference documentation prescribes the .io() method, you will see that the resulting graphML output uses the default "labelV" key to indicate the label of a vertex. However, this is not recognized by the Neo4j apoc plugin and you will get a Neo4j graph without vertex labels. Rather, the "labels" key should be used to have Neo4j understand that the key refers to vertex labels. This is possible by using the GraphMLWriter class. The mismatch in use of label keys probably occurred because TinkerPop supports a single vertex label only, while Neo4j supports multiple vertex labels.

Importing the graphml file into Neo4j

call apoc.import.graphml('../janusgraph-0.3.1-hadoop2/data/jupiter.xml', {batchSize: 10000, readLabels: true, storeNodeIds: false, defaultRelationshipType:"RELATED"})

MATCH (n) RETURN n

This is straight from the Neo4j documentation, which works after you find out about the required "labels" key in the graphML data.

References

https://tinkerpop.apache.org/javadocs/3.3.4/full/org/apache/tinkerpop/gremlin/structure/io/graphml/GraphMLWriter.html
https://github.com/neo4j-contrib/neo4j-apoc-procedures/issues/721

2019-03-31

Circular references in Plotly/Dash

Plotly Dash is a simple python web framework for quickly building interactive data visualizations. While Dash has a lot of power, it also has its limitations. One of the current limitations is that the dependencies between the Dash web components cannot be circular. If you create a callback for an output that is also part of the inputs, Dash will raise an exception that "this is bad". If you try to circumvent this check by creating two components having callbacks with interchanged inputs and outputs, the browser just shows "Error loading dependencies".

So, are circular references really "bad"? I would rather say that forbidding them is a consequence of the Dash principal to have each callback make a change in the resulting web page. But in fact, Dash is not consequent on this principal already:
  •  Dash defines a PreventUpdate exception that one can raise when the application's state does not require an update of the web page. This would be the pythonic way to handle intended circular references and is suggested in the previously linked Dash issue.
  • From the Dash gotchas: "If you have disabled callback validation in order to support dynamic layouts, then you won't be automatically alerted to the situation where a component within a callback is not found within a layout. In this situation, where a component registered with a callback is missing from the layout, the callback will fail to fire. For example, if you define a callback with only a subset of the specified Inputs present in the current page layout, the callback will simply not fire at all."
It turns out that his second "feature" can be exploited as a temporary workaround to have circular references, but before doing that, let us consider our use case. I simply wanted the style of checkboxes that you find in the column filters of Microsoft Excel and LibreOffice Calc, because I figured that my users would be accustomed to these. The list of checkboxes consists of one "select all" checkbox and the list of remaining checkboxes. Checking the "select all" box affects the state of one or more of the remaining checkboxes, while checking any of the remaining boxes can affect the state of the "select all" box. So, if we model our list of checkboxes with two Dash dcc.Checklist components, we certainly have circular dependencies.

Exploitation of the Dash gotcha works by including a Dash component in the loop, in the code example below the html.Div with id='loop_breaker'. This html.Div is dynamically generated inside the static html.Div with id='loop_breaker_container'. The 'loop_breaker' component is only generated when:
  • the user has deselected all options while the "all" checkbox is still checked
  • the user has selected all options while the "all" checkbox is still unchecked
In this way, we get our intended circular reference while the initial state of the application layout passes the Dash validation criteria on circular references.


# -*- coding: utf-8 -*-
import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output, State

external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']

app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.config['suppress_callback_exceptions'] = True

app.layout = html.Div(children=[
    html.H4(children='Excel-like checkboxes'),
    dcc.Checklist(
        id='all',
        options=[{'label': 'all', 'value': 'all'}],
        values=[]
    ),
    dcc.Checklist(
        id='cities',
        options=[
            {'label': 'New York City', 'value': 'NYC'},
            {'label': 'Montréal', 'value': 'MTL'},
            {'label': 'San Francisco', 'value': 'SF'}
        ],
        values=['MTL', 'SF']
    ),
    html.Div(id='loop_breaker_container', children=[])
])


@app.callback(Output('cities', 'values'),
              [Input('all', 'values')])
def update_cities(inputs):
    if len(inputs) == 0:
        return []
    else:
        return ['NYC', 'MTL', 'SF']


@app.callback(Output('loop_breaker_container', 'children'),
              [Input('cities', 'values')],
              [State('all', 'values')])
def update_all(inputs, _):
    states = dash.callback_context.states
    if len(inputs) == 3 and states['all.values'] == []:
        return [html.Div(id='loop_breaker', children=True)]
    elif len(inputs) == 0 and states['all.values'] == ['all']:
        return [html.Div(id='loop_breaker', children=False)]
    else:
        return []


@app.callback(Output('all', 'values'),
              [Input('loop_breaker', 'children')])
def update_loop(all_true):
    if all_true:
        return ['all']
    else:
        return []


if __name__ == '__main__':
    app.run_server(debug=True)