Wednesday, December 30, 2015

Hbase, Spark and HDFS - Setup and a Sample Application

Apache Spark is a framework where the hype is largely justified. It is both innovative as a model for computation and well done as a product. People may be tempted to compare with another framework for distributed computing that has become popular recently, Apache Storm, e,g. with statements like "Spark is for batch processing while Storm is for streaming". But those are entirely different beasts. Storm is a dataflow framework, very similar to the HyperGraph DataFlow framework for example, and there are others like it, it's based on a model that has existed at least since the 1970s even though its author seem to be claiming credit for it. Spark on the other hand is novel approach to deal with large quantities of data and complex, arbitrary computations on it. Note the "arbitrary" - unlike Map/Reduce, Spark will let you do anything with the data. I hope to post more about what's fundamentally different between something like Storm and Spark because it is interesting, theoretically. But I highly recommend reading the original paper describing RDD (Resilient Distributed Dataset), which is the abstraction at the foundation of Spark.

This post is just a quick HOWTO if you want to start programming against the popular tech stack trio made up of HDFS (the Hadoop Distirbuted File System), HBase and Apache Spark.  I might follow with the intricacies of cluster setup some time in the future. But the idea here is to give you just a list of minimal steps that will work (fingers crossed). Whenever I mention relative file paths, they are to be resolved within the main installation directory of the component I'm talking about: if I'm talking about Hadoop and I write 'etc/hadoop', I mean 'etc/hadoop' under your Hadoop installation, not '/etc/hadoop' in your root file system!

All code, including sample data file and Maven POM can be found at the following Github GIST: https://gist.github.com/bolerio/f70c374c3e96495f5a69

HBase Install

We start with HBase because in fact you don't even need Hadoop to develop locally with it. There is of course a standard package for HBase and the latest stable release (at the time of this writing) is 1.1.2. But that's not going to do it for us because we want Spark. There is an integration of Spark with HBase that is being included as an official module in HBase, but only as of the latest 2.0.0 version, which is still in an unstable SNAPSHOT state. If you are just starting with these technologies you don't care about having a production blessed version. So, to install HBase, you need to build it first. Here are the steps:
  1. Clone the whole project from GIT with:
         git clone https://github.com/apache/hbase.git
  2. Make sure you have the latest Apache Maven (3.0.5 won't work),  get 3.3+, if you don't already have it.
  3. Go to the Hbase project directory and build it with:
         mvn -DskipTests=true installThat will put all hbase modules in your local maven repo, which you'll need for a local maven-based Spark project.
  4. Then create an hbase distribution with:
        mvn -DskipTests=true package assembly:single
    You will find tarball under hbase-assembly/target/hbase-2.0.0-SNAPSHOT-bin.tar.gz.
  5. This is what you are going to unpack in your installation directory of choice, say /opt/hbase.
  6. Great. Now you have to configure your new hbase installation (untar of the build you just created). Edit the file /opt/hbase/conf/hbase-site.xml to specify where Hbase should store its data. Use this template and replace the paths inside:
    <configuration>
      <property>
        <name>hbase.rootdir</name>
        <value>file:///home/boris/temp/hbase</value>
      </property>
    </configuration>
    
  7. Make sure you can start HBase by running:
         bin/start-hbase.sh (or .cmd)
  8. Then start the HBase command line shell with:
       bin/hbase shell
    and type the command list to view the list of all available tables - there will be none of course.
Spark Development Setup

Spark itself is really the framework you use to do your data processing. It can run in a cluster environment, with a server to which you submit jobs from a client. But to start, you don't actually need any special downloads or deployments. Just including Maven dependencies in your project will bring the framework in, and you can call it from a main program and run it in a single process.

So assuming you built HBase as explained above, let's setup a Maven project with HBase and Spark. Then we'll get some sample data to play with and go over a sample application that makes use of two different approaches in Spark: the plain API and the SQL-like Spark module which essentially gives a scalable, distributed SQL-query engine, but we won't talk about it here, in part because the HBase integration of that module is sort of undocumented work in progress. However, you can see an example in the sample code provided in the GIST!

No point repeating the code in its entirety here, but I will show the relevant parts. The Maven pom only needs to contain the hbase-spark dependency:

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-spark</artifactId>
    <version>${hbase.version}</version>
</dependency>

That will pull in everything you need to load that into a local HBase instance and process it with Spark. That Spark SQL module I mentioned is a separate dependency only necessary for the SQL portion of the example:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>1.5.1</version>
</dependency>

There are two other auxiliary dependencies of the sample project: one for CSV parsing and the mJson library, which you can see in the pom.xml file from the Github GIST.

Playing with Some Data

We will do some processing now with some Open Government data, specifically from Miami-Dade County's list of recently arrested individuals. You can get it from here: https://opendata.miamidade.gov/Corrections/Jail-Bookings-May-29-2015-to-current/7nhc-4yqn - export in Excel CSV format as a file named jaildata.csv. I've already made the export and placed in the GIST even though that's a bit of a large file. The data set is simple: it contains arrests for a big part of the year 2015. Each record/line has the date the arrest was made, where it was made, the name of the offender, their date of birth and a list of up to 3 charges. We could for example find out how popular is the "battery" charge for offenders under 30 compared to offenders above 30.

First step is to import the data from the CSV file to HBase (note that Spark could work very well directly from a CSV file). This is done in the ImportData.java program where in a short main program we create a connection to HBase, create a brand new table called jaildata, then loop through all the rows in the CSV file to import the non-empty ones. I've annotated the source code directly. The connection assumes a local HBase server running on a default port and that the table doesn't exist yet. Note that data is inserted as a batch of put operations, one per column value. Each put operation specifies the column family, column qualifier and the value while the version is automatically assigned by the system. Perhaps the most important part in that uploading of data is how the row keys are constructed. HBase gives you complete freedom to create a unique key for each row inserted and it's sort of an artform to pick a good one. In our case, we chose the offender's name combined with the date of the arrest, the assumption being of course that the same person cannot be arrested twice in the same day (not a very solid assumption, of course).

Now we can show off some Spark in action. The file is ProcessData.java, which is also annotated, but I'll go over some parts here.

We have a few context objects and configuration objects that need to be initialized:

  private void init()
  {
      // Default HBase configuration for connecting to localhost on default port.
      conf = HBaseConfiguration.create();
      // Simple Spark configuration where everything runs in process using 2 worker threads.
      sparkconf = new SparkConf().setAppName("Jail With Spark").setMaster("local[2]");
      // The Java Spark context provides Java-friendly interface for working with Spark RDDs
      jsc = new JavaSparkContext(sparkconf);
      // The HBase context for Spark offers general purpose method for bulk read/write
      hbaseContext = new JavaHBaseContext(jsc, conf);
      // The entry point interface for the Spark SQL processing module. SQL-like data frames
      // can be created from it.
      sqlContext = new org.apache.spark.sql.SQLContext(jsc);
  }


A configuration object for HBase will tell the client where the server is etc., in our case default values for local server work. Next line, the Spark configuration gives it an application name and then it tells it where the main driver of the computation is - in our case, we have a local in-process driver that is allowed to use two concurrent threads. The Spark architecture comprises a main driver and then workers spread around in a cluster. A location of a remote main driver might look something like this: spark://masterhost:7077 (instead of "local[2]"). Then we create a Spark context, specifically a JavaSparkContext because the original Scala API is not so easy to work with from Java - possible, just not very user-friendly. Then we create something called a JavaHBaseContext which comes from the HBase-Spark module and it knows how talk to an HBase instances using the Spark data model - it can do bulk inserts, deletes, it can scan an HBase table as a Spark RDD and more. Finally, we create a context object representing an SQL layer on top of Spark data sets. Note that the SQL context object does not depend on HBase. In fact, different data sources can be brought under the same SQL processing API as long as there is some way to "relationalize" it. For example, there is a module that lets you process data coming from MongoDB as Spark SQL data as well. So in fact, you can have a federated data environment using a Spark cluster to perform relational joins between MongoDB collections and HBase table (and flat files and ...).    

Now, reading data from HBase is commonly done by scanning. You can perform some filtering operations, but there's no general purpose query language for it. That's the role of Spark and other frameworks like Apache Phoenix for example. Also, scanning HBase rows will give you binary values which need to be converted to the appropriate runtime Java type. So for each column value, you have to manage yourself what its type is and perform the conversion. The HBase API has a convenience class named Bytes that handles all basic Java types. To represent whole records, we use JSON as a data structure so individual column values are first converted to JSON values with this utility method:

  static final  Json value(Result result,  String family, String qualifier, BiFunction converter)
  {
      Cell cell = result.getColumnLatestCell(Bytes.toBytes(family), Bytes.toBytes(qualifier));
      if (cell == null)
          return Json.nil();
      else
          return Json.make(converter.apply(cell.getValueArray(), cell.getValueOffset()));
  }

Given an HBase result row, we create a JSON object for our jail arrests records like this:

static final Function tojsonMap = (result) -> {
    Json data = Json.object()
        .set("name", value(result, "arrest", "name", Bytes::toString))
        .set("bookdate", value(result, "arrest", "bookdate", Bytes::toString))
        .set("dob", value(result, "arrest", "dob", Bytes::toString))
        .set("charge1", value(result, "charge", "charge1", Bytes::toString))
        .set("charge2", value(result, "charge", "charge2", Bytes::toString))
        .set("charge3", value(result, "charge", "charge3", Bytes::toString))
    ;
    return data;
};

With this mapping from row binary HBase to a runtime JSON structure we can construct a Spark RDD for the whole table as JSON records:

JavaRDD<Json> data = hbaseContext.hbaseRDD(TableName.valueOf("jaildata"), new Scan())
                                       .map(tuple -> tojsonMap.apply(tuple._2()));

We can then filter or transform that data anyway we want. For example:

data = data.filter(j -> j.at("name").asString().contains("John"));

would gives a new data set which contains only offenders named John. An instance of JavaRDD is really an abstract representation of a data set. When you invoke filtering or transformation methods on it, it will just produce an abstract representation of a new data set, but it won't actually compute the result. Only when you invoke what is called an action method, that has to return something different than an RDD as its value, the lazy computation is triggered and an actual data set will be produced. For instance, collect and count are such action methods.

Ok, good. Running ProcessData.main should output something like this:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/12/29 00:41:00 INFO Remoting: Starting remoting
15/12/29 00:41:00 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.0.0.13:55405]
Older 'battery' offenders: 2548, younger 'battery' offenders: 1709, ratio(older/younger)=1.4909303686366295
Thefts: 34368
Contempts of court: 32

Hadoop

To conclude, I will just show you how to use Hadoop/HDFS to store HBase data instead of the normal OS filesystem. First, download Hadoop from https://hadoop.apache.org/releases.html. I used version 2.7.1. You can unzip the tarball/zip file in a standard location for your OS (e.g. /opt/hadoop on Linux).  Two configuration steps are important before you can actually start it:
  1. Point it to the JVM. It needs at least Java 7.  Edit etc/hadoop/hadoop-end.sh (or hadoop-env.cmd) and change the line export JAVA_HOME=${JAVA_HOME} to point the Java home you want, unless your OS/shell environment already does.
  2. Next you need to configure where Hadoop will store its data and what will be the URL for clients to access it. The URL for clients is configured in the etc/hadoop/core-site.xml file:
    <configuration>
     <property>
      <name>fs.defaultFS</name>
      <value>hdfs://localhost:9000</value>
     </property>
    </configuration>
     
    The location for data is in the etc/hadoop/hdfs-site.xml file. And there are in fact two locations: one for the Hadoop Namenode and one for the Hadoop Datanode:
    
    <configuration>
      <property>
        <name>dfs.namenode.name.dir</name>
        <value>file:///var/dfs/name</value>
      </property>
      <property>
        <name>dfs.datanode.data.dir</name>
        <value>file:///var/dfs/data</value>
      </property>
    </configuration>
  3. Before starting HDFS, you need to format the namenode (that's the piece the handles file names and directory names and knows what is where) by doing
          bin/hadoop namenode -format
  4. When starting hadoop, you only need to start the hdfs file system by running
          sbin/start-dfs.shMake sure it has permissions to write to the directories you have configured.
  5. If after hadoop has started, you don't see the admin web UI at http://localhost:50070, check the logs.
  6. To connect HBase to hadoop you must change the hbase root directory to be an HDFS one:
      <property>
        <name>hbase.rootdir</name>
        <value>hdfs://localhost:9000/hbase</value>
      </property>
    
  7. Restarting HBase now will bring you back with an empty database as you could verify on the hbase shell with the 'list' command. So try running the import program and then the "data crunching" program see what happens.

Wednesday, November 11, 2015

Announcing Seco 0.6 - Collaborative Scripting For Java

This is a short post to announce an incremental 0.6 release of Seco. The release comes with important bug fixes and a simplified first-time user experience.


Seco is a collaborative scripting development environment for the Java platform. You can write code in many JVM scripting languages. The code editor in Seco is based on the Mathematica notebook UI, but the full GUI is richer and much more ambitious. In a notebook, you can mix rich text with code and output, including interactive components created by your code. This makes Seco into a live environment because you can evaluate expression and immediately see the changes to your program.


Monday, August 31, 2015

Scheduling Tasks and Drawing Graphs — The Coffman-Graham Algorithm

When an algorithm developed for one problem domain applies almost verbatim to another, completely unrelated domain, that is the type of insight, beauty and depth that makes computer science a science on its own, and not a branch of something else, namely mathematics, like many professionals educated in the field mistakenly believe. For example, one of the common algorithmic problems during the 60s was the scheduling of tasks on multiprocessor machines. The problem is, you are given a large set of tasks, some of which depend on others, that have to be scheduled for processing on N number of processors in such a way as to maximize processor use. A well-known algorithm for this problem is the Coffman-Graham algorithm. It assumes that there are no circular dependencies between the tasks, as is usually the case when it comes to real world tasks, except in catch 22 situations at some bureaucracies run amok! To do that, the tasks and their dependencies are modeled as a DAG (a directed acyclic graph). In mathematics, this is also known as a partial order: if a tasks T1 depends on T2, we say that T2 preceeds T1, and we write T2 < T1. The ordering is called partial because not all tasks are related in this precedence relation, some are simply independent of each other and can be safely carried out in parallel.

The Coffman-Graham algorithm works by creating a sequence of execution rounds where at each round at most N tasks execute simultaneously. The algorithm also has to make sure that all dependencies of the current round have been executed in previous rounds. Those two constraints are what makes the problem non-trivial: we want exactly N tasks at each round of execution if possible, so that all processors get used, and we also have to complete all tasks that precede a given task T before scheduling it. There are 3 basic steps to achieving the objective:


  1. Cleanup the graph so that only direct dependencies are represented. So if there is a task A that depends on B and B depends on another task C, we already know that A depends “indirectly” on C (transitivity is one of defining features of partial orders), so that dependency does not need to be stated explicitly. Sometimes the input of a problem will have such superfluous information, but in fact this could only confuse the algorithm! Removing the indirect dependencies is called transitive reduction, as opposed to the more commonly operation of transitive closure which explicitly computes all indirect dependencies.
  2. Order the tasks in a single list so that the dependent ones come after their dependencies and they are sort of evenly spread apart. This is the crucial and most interesting part of the algorithm. So how are we to compare two tasks and decide which one should be run first. The trick is to proceed from the starting tasks, the roots of the graph that don’t have any dependencies whatsoever, and then progressively add tasks that depend only on them and then tasks then depend only on them etc. This is called topological ordering of the dependency graph. There are usually many possible such orderings and some of them will lead to a good balanced distribution of tasks for the purpose of CPU scheduling while others will leave lots of CPUs unused. In step (3) of the algorithm, we are just going to take the tasks one by one from this ordering and assign them to execution rounds as they come. Therefore, to make it so that at each round, the number of CPUs is maximized, the ordering must somehow space the dependencies apart as much as possible. That is, if the order is written as [T1, T2, T3, …, Tn] and if Tj depends on Ti, we want j-i to be as big as possible. Intuitively, this is desirable because the closer they are, the sooner we’d have to schedule Tj for execution after Ti, and since they can’t be executed on the same parallel round, we’d end up with unused CPUs. To space the tasks apart, here is what we do. Suppose we have tasks A and B, with all their dependencies already ordered in our list and we have to pick which one is going to come next. From A’s dependencies, we take the one most recently placed in the ordering and we check if it comes before or after the most recently placed task from B’s dependencies. If it comes before, then we choose A, if it comes after then we chose B. If it turns out A and B’s most recently placed dependency is actually the same task that both depend on, we look at the next most recent dependency etc. This way, by picking the next task as the one whose closest dependency is the furthest away, at every step we space out dependencies in our ordering as much as possible.
  3. Assign tasks to rounds so as to maximize the number of tasks executed on each round. This is the easiest step - we just reap the rewards from doing all the hard work of the previous steps. Going through our ordering [T1, T2, T3, …, Tn], we fill up available CPUs by assigning the tasks one by one. When all CPUs are occupied, we move to the next round and we start filling CPUs again. If while at a particular round the next task to be scheduled has a dependency that’s also scheduled for that same round, we have no choice but to leave the remaining CPUs unused and start the next round. The algorithm does not take into account how long each tasks can potentially take.
Now, I said that this algorithm is also used to solve a completely different problem. The problem I was referring to is drawing networks in a visually appealing way. This is a pretty difficult problem and there are many different approaches whose effectiveness often depends on the structure of the network. When a network is devoid of cycles (paths from on node back to itself), the Coffman-Grahan algorithm just described can be applied!
The idea is to think of the network nodes as the tasks and of the network connections as the dependencies between the tasks, and then build a list of consecutive layers analogous to the task execution rounds. Instead of specifying a number of available CPUs, one specifies how many nodes per layer are allowed, which is generally convenient because the drawing is done on a fixed width computer screen. Because the algorithm does not like circular dependencies, there is an extra step here to remove a select set of connections so that the network becomes a DAG. This is in addition to transitive reduction where we only keep direct connections and drop all the rest. Once the algorithm is complete, the drawing of those provisionally removed connections can be performed on top of the nice layering produced. Thus, the Coffman-Graham is (also!) one of hierarchical drawing algorithms, a general framework for graph drawing developed by Kozo Sugiyama.

Tuesday, July 28, 2015

HyperGraphDB 1.3 Released

Kobrix Software is pleased to announce the release of HyperGraphDB 1.3.

This is a maintenance release containing many bugs fixes and small improvements. Most of the efforts in this round have gone towards the various application modules built upon the core database facility.

Go directly to the download page.

HyperGraphDB is a general purpose, free open-source data storage mechanism. Geared toward modern applications with complex and evolving domain models, it is suitable for semantic web, artificial intelligence, social networking or regular object-oriented business applications.
This release contains numerous bug fixes and improvements over the previous 1.2 release. A fairly complete list of changes can be found at the Changes for HyperGraphDB, Release 1.3 wiki page.

HyperGraphDB is a Java based product built on top of the Berkeley DB storage library.

Key Features of HyperGraphDB include:
  • Powerful data modeling and knowledge representation.
  • Graph-oriented storage.
  • N-ary, higher order relationships (edges) between graph nodes.
  • Graph traversals and relational-style queries.
  • Customizable indexing.
  • Customizable storage management.
  • Extensible, dynamic DB schema through custom typing.
  • Out of the box Java OO database.
  • Fully transactional and multi-threaded, MVCC/STM.
  • P2P framework for data distribution.
In addition, the project includes several practical domain specific components for semantic web, reasoning and natural language processing. For more information, documentation and downloads, please visit the HyperGraphDB Home Page.

Many thanks to all who supported the project and actively participated in testing and development!