Wolfram Computation Meets Knowledge

Mathematica Gets Big Data with HadoopLink

HadoopLink is a package that lets you write MapReduce programs in Mathematica and run them on your Hadoop cluster.

If that makes sense to you, feel free to skip this section and jump right to the code. For everyone else, let me unpack that for you.

A cluster is a bunch of Linux servers (or nodes) that are all connected to each other on the same network. (You probably have one at school or at work.)

Datasets are growing faster than hard disks. It’s pretty common now to encounter datasets of 100 terabytes or more. The Sloan Digital Sky Survey is 5 TB, the Common Crawl web corpus is 81 TB, and the 1000 Genomes Project is 200 TB, just to name a few. But disks are still just a few TB, so you can’t hold all that data on a single disk. It has to be split up between, say, 100 different machines with 1 TB per machine, to make 100 TB total. That means it’s a “big data” situation.

MapReduce is a way of writing programs that work on a huge dataset distributed over a cluster. It’s a big pain to send the data over the network to each node, so instead MapReduce sends the computation to the data. That’s what distributed means. We send a small package of code to each node, each node works independently on its own portion of the data, and the results are collected up at the end.

Hadoop is a popular open-source implementation of the MapReduce framework, written in Java. Lots of organizations now have Hadoop clusters for working with large datasets.

With HadoopLink, you can write your Hadoop jobs directly in Mathematica.

Here’s a simple example. The “Hello world!” analog for Hadoop is WordCount, which counts the number of times each word appears in a piece of text. Here’s how you’d write WordCount in Mathematica using HadoopLink.

Load the HadoopLink package (download it from GitHub, and see the code notebook for installation instructions):

<< HadoopLink`

Import some text from the web (Pride and Prejudice, by Jane Austen):

textRaw = Import["http://www.gutenberg.org/cache/epub/1342/pg1342.txt"];

tringTake[textRaw, 250]

text = StringSplit[     textRaw, {"By Jane Austen\n\n\n\n",       "End of the Project Gutenberg EBook of Pride and Prejudice, by \ Jane Austen"}][[2]];

We need a convenient unit of text for Hadoop to work with, so split the text into paragraphs:

paras = StringSplit[text, RegularExpression["\n{2,}"]];

Here are paragraph, word, and character counts for this text:

{Length@paras, Length[StringSplit[text, RegularExpression["[\\W_]+"]]], StringLength@text}

Here are the first few paragraphs:

Framed /@ Take[paras, 3] // Column

A MapReduce program works on key-value pairs. We’ll make the paragraph the key (I’m coloring them green), and use the integer 1 (in red) as the value for every paragraph key:

paraPairs = Transpose[{paras, Table[1, {Length@paras}]}];

Grid[{#}, Frame -> All, Background -> {{LightGreen, LightRed}}] & /@   paraPairs[[1 ;; 5]]

Now open a link to the Hadoop cluster and export the key-value pairs to the Hadoop Distributed File System (HDFS):

$$link = OpenHadoopLink[    "fs.default.name" -> "hdfs://hadoopheadlx.wolfram.com:8020", "mapred.job.tracker" -> "hadoopheadlx.wolfram.com:8021"];

inputfile["pap"] = "/user/paul-jean/hadooplink/pap-paras.seq";

If[DFSFileExistsQ[$$link, inputfile["pap"]],  DFSDeleteFile[$$link, inputfile["pap"]]  ]

The DFSExport function copies our input file(s) in sequence file format onto the cluster:

DFSExport[$$link, inputfile["pap"], paraPairs, "SequenceFile"]

At this point, HDFS will divide the file up into blocks and distribute them with redundant copies across the cluster. Now we have the data we need to run our MapReduce job.

The next step is to write a mapper and a reducer.

Here’s a diagram that shows how the mapper and reducer exchange key-value pairs:

How the mapper and reducer exchange key-value pairs

In step 1, the mapper reads a key-value pair (k1, v1).

We write a HadoopLink mapper as a pure function. Here’s how we write our WordCount mapper:

Clear@WordCountMapper WordCountMapper = Function[{k, v},With[{words = ToLowerCase /@ StringSplit[k, RegularExpression["[\\W_]+"]]}, Yield[#, 1] & /@ words]];

The function arguments are the paragraph key and value 1:

{k1, v1} = {paragraph, 1}

In step 2, the mapper outputs (one or more) new key-value pairs (k2, v2). Our WordCount mapper splits the paragraph up into words and outputs each word as a key with the value 1 again:

{k2, v2} = {word, 1}

Notice we’re calling HadoopLink‘s Yield function to output the key-value pairs from the mapper. With the package loaded, you can look up its description:

?Yield

In step 3, the pairs get collected by key (the “shuffle and sort” step), and the reducer reads each key with its list of values. Here’s the WordCount reducer:

Clear@SumReducer SumReducer = Function[{k, vs}, Module[{sum = 0}, While[vs@hasNext[], sum += vs@next[]]; Yield[k, sum]]];

The reducer’s arguments are the word key and a list of all the 1s that were yielded by the mapper for that word:

{k2, {v2 …} } = {word, {1,1,1,…,1} }

In step 4, the reducer outputs its own key-value pair (k3, v3). The WordCount reducer sums up the list of 1s for each of its word keys:

{k3, v3} = {word, Total[ {1,1,1,…,1} ] }

The total gives the number of times that word appeared in the original text.

Notice we didn’t use the Total function in the reducer. The values list isn’t actually a List expression, it’s a Java iterator object, so we have to iterate over the values and increment the sum one value at a time.

(Aside: An iterator lets you stream the data from a disk rather than load the whole data structure into memory. Why do we have to do this? Imagine this was all the text in the Common Crawl web corpus, with 100 trillion words. The list of values for the word “the” would have a length in the billions, which wouldn’t fit in the reducer’s memory.)

Now we clear our output directory and submit the MapReduce job to the Hadoop cluster, using the HadoopMapReduceJob function:

outputdir["pap"] = "/user/paul-jean/hadooplink/pap-wordcount";

If[DFSFileExistsQ[$$link, outputdir["pap"]], DFSDeleteDirectory[$$link, outputdir["pap"]]]

HadoopMapReduceJob[  $$link,  "pap wordcount",  inputfile["pap"],  outputdir["pap"],  WordCountMapper,  SumReducer  ]

At this point, HadoopLink packages up the Mathematica code and submits it to the Hadoop master node.

Now Hadoop can distribute the job across the slave nodes and collect the results:

Distributing the job across the slave nodes

In steps 1 to 4, Mathematica exports key-value data to HDFS, packages up the code, and submits the job to Hadoop’s JobTracker (JT). Then in step 5, the JobTracker farms the job out to many TaskTrackers (TT) on the slave nodes. In steps 6 and 7, the slaves launch a Java Virtual Machine (JVM) for each Map or Reduce task. Mathematica exchanges key-values pairs with the JVM over a MathLink connection as it performs the necessary computations. In steps 8 and 9, the mapper or reducer yields key-values pairs, which are written to HDFS.

Notice that a Mathematica kernel is required on each slave node running MapReduce tasks. So your cluster can do double-duty as a lightweight grid for running parallel computations in addition to running distributed computations on Hadoop. However, HDFS operations like DFSExport and DFSImport don’t require kernels on the slaves.

Finally, in step 10, we import our WordCount results back into Mathematica:

outputfiles["pap"] = DFSFileNames[$$link, "part-*", outputdir["pap"]]

wordcounts =    Join @@ (DFSImport[$$link, #, "SequenceFile"] & /@        outputfiles["pap"]) /. {s_String, i_Integer} :> {StringSplit[s, "|"], i}; Length@wordcounts

6320

Now we can look at the 10 most common words in Pride and Prejudice, with the number of times each word occurs:

Take[Reverse@SortBy[wordcounts, Last], 10] // Column

And here are some of the least common, with a count of 1:

Take[Reverse@SortBy[wordcounts, Last], -10] // Column

For fun, we can compare the word frequencies to what you would expect from a perfect Zipf distribution:

zipfdist =   EstimatedDistribution[wordcounts[[All, 2]], ZipfDistribution[Length@wordcounts, \[Rho]]]

zipffreqs = Table[{x, PDF[zipfdist, x]}, {x, 1, Length@wordcounts}];

Total[zipffreqs[[All, 2]]]

zipffreqs[[1 ;; 10]]

papfreqs =    Transpose[{Range@Length@wordcounts,      N@HistogramList[ wordcounts[[All, 2]], {1, Length@wordcounts + 1, 1}, "PDF"][[2]]}];

Total[papfreqs[[All, 2]]]

papfreqs[[1 ;; 10]]

Labeled[ListLogLogPlot[{papfreqs, zipffreqs}, AxesOrigin -> {0, 0},    PlotLegends -> SwatchLegend[{"PAP", "Zipf"}]], {"rank",    "word frequency"}, {Top, Left}, RotateLabel -> True, LabelStyle -> {FontFamily -> "Helvetica", FontSize -> 16}]

There is reasonable agreement with the Zipf distribution for the first 100 or so most common words in the text. (Zipf’s Law is known to break down for less common words.)

Okay, now we know how to write “Hello World!” in MapReduce using Mathematica and HadoopLink.

With a simple change to the WordCount mapper, we can compute n-gram counts instead of just word counts:

Clear@NGramMapper NGramMapper[n_] := Function[{k, v},    With[{ngrams =        Partition[        ToLowerCase /@ StringSplit[k, RegularExpression["[\\W_]+"]], n, 1]}, Do[ Yield[StringJoin[Riffle[ngrams[[i]], "|"]], 1], {i, Length@ngrams} ]]]    ];

Here the mapper takes an argument indicating how many consecutive words to use per n-gram. It outputs the n-gram as the key, with a value of 1 just like before.

For the reducer, we can just reuse the SumReducer from before, since the key doesn’t matter.

Let’s run the job to count 4-grams:

outputdir["ngrams"] = "/user/paul-jean/hadooplink/pap-ngrams";

If[DFSFileExistsQ[$$link, outputdir["ngrams"]],DFSDeleteDirectory[$$link, outputdir["ngrams"]]]

HadoopMapReduceJob[  $$link,  "pap n-grams", inputfile["pap"], outputdir["ngrams"], NGramMapper[4],  SumReducer  ]

Here are the top 4-grams in Pride and Prejudice (in a previous post, Oleksandr Pavlyk showed that 4-grams carry the essential information for Alice in Wonderland):

outputfiles["ngrams"] = DFSFileNames[$$link, "part-*", outputdir["ngrams"]]

ngrams = Join @@ (DFSImport[$$link, #, "SequenceFile"] & /@        outputfiles["ngrams"]) /. {s_String, i_Integer} :> {StringSplit[s, "|"], i}; Length@ngrams

113002

Take[Reverse@SortBy[ngrams, Last], 10] // Column

We have to sort the final key-value pairs because MapReduce doesn’t sort the output for you, for efficiency reasons. To sort by value, you need to do a secondary sort.

Hopefully I’ve given you a good starting point for writing MapReduce algorithms using Mathematica and HadoopLink. Now we’re ready to go beyond these simple examples and solve some real problems. Stay tuned for part 2 of this blog, where we’ll use HadoopLink to search the human genome!

Comments

Join the discussion

!Please enter your comment (at least 5 characters).

!Please enter your name.

!Please enter a valid email address.

3 comments

  1. Excellent post! It’s good to see WR positioning Mathematica to be a part of “big data” solutions. As a practicing biologist and teacher, I am awestruck with the size of today’s experimental datasets. Keep up the good work!

    Reply
  2. Do you have any plan to do a link to Spark ?

    Reply