Using Hadoop Streaming API to perform a word count job in R and C++

by Marek Gagolewski, Maciej Bartoszuk, Anna Cena, and Jan Lasek (Rexamine).


In a recent blog post we explained how we managed to set up a working Hadoop environment on a few CentOS7 machines. To test the installation, let’s play with a simple example.

Hadoop Streaming API allows to run Map/Reduce jobs with any programs as the mapper and/or the reducer.

Files are processed line-by-line. Mappers get appropriate chunks of the input file. Each line is assume to store information on key-value pairs. By default, the following form is used:

key1 \t val1 \n
key2 \t val2 \n

If there is no TAB character, then the value is assumed to be NULL.

In fact this is a hadoop version of a program that rearranges lines in the input file so that duplicated lines appear one after another – the output is always sorted by key.

This is because:

hadoop jar /opt/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \
   -input /input/test.txt \
   -output /output
   -mapper /bin/cat
   -reducer /bin/cat
hdfs dfs -cat /output/part-00000

This is roughly equivalent to:

cat input | mapper | sort | reducer > output

More specifically, in our case that was:

cat input | cat | sort | cat > output

A sample Map/Reduce job

Let’s run a simple Map/Reduce job written in R and C++ (just for fun – we assume that all the nodes run the same operating system and they use the same CPU architecture).

  1. As we are in the CentOS 7 environment, we will need a newer version of R on all the nodes.
$ su
# yum install readline-devel
# cd
# wget
# tar -zxf R-3.1.2.tar.gz
# cd R-3.1.2
# /configure --with-x=no --with-recommended-packages=no
# make
# make install
# R
R> install.packages('stringi')
R> q()
  1. Edit yarn-site.xml (on all nodes):

Without that, Hadoop may complain about too huge virtual memory memory consumption by R.

  1. Create script wc_mapper.R:
#!/usr/bin/env Rscript

stdin <- file('stdin', open='r')

while(length(x <- readLines(con=stdin, n=1024L))>0) {
   x <- unlist(stri_extract_all_words(x))
   xt <- table(x)
   words <- names(xt)
   counts <- as.integer(xt)
   cat(stri_paste(words, counts, sep='\t'), sep='\n')
  1. Create a source file wc_reducer.cpp:
#include <iostream>
#include <string>
#include <cstdlib>

using namespace std;

int main()
  string line;
  string last_word = "";
  int last_count = 0;

    size_t found = line.find_first_of("\t");
    if(found != string::npos)
      string key = line.substr(0,found);
      string value = line.substr(found);
      int valuei = atoi(value.c_str());
      //cerr << "key=" << key << " value=" << value <<endl;
      if(key != last_word)
              if(last_word != "") cout << last_word << "\t" << last_count << endl;

              last_word = key;
              last_count = valuei;
              last_count += valuei;
  if(last_word != "") cout << last_word << "\t" << last_count << endl;

  return 0;

Now it’s time to compile the above C++ source file:

$ g++ -O3 wc_reducer.cpp -o wc_reducer
  1. Let’s submit a map/reduce job via the Hadoop Streaming API
$ chmod 755 wc_mapper.R
$ hadoop jar /opt/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \
   -input /input/test.txt \
   -output /output
   -mapper wc_mapper.R
   -reducer wc_reducer
   -file wc_mapper.R
   -file wc_reducer

By the way, Fedora 20 RPM Hadoop distribution provides Hadoop Streaming API jar file under /usr/share/hadoop/mapreduce/hadoop-streaming.jar.


In this tutorial we showed how to submit a simple Map/Reduce job via the Hadoop Streaming API. Interestingly, we used an R script as the mapper and a C++ program as the reducer. In an upcoming blog post we’ll explain how to run a job using the rmr2 package.

Tagged with: , , , , ,
Posted in Blog/Hadoop, Blog/R, Blog/R-bloggers