Wednesday, April 10, 2013

Reading & Writing Hadoop Sequence Files in Java

In this blog I'll show you how to write a simple hadoop client application in Java. This app will handle reading, writing and copying Hadoop Sequence Files on local or remote Hadoop file systems (HDFS).

1. HadoopClient.java
package com.noushin.hadoop.client;

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;

/**
 * This class handles interactions with Hadoop.
 * 
 * @author nbashir
 * 
 */
@Component
public class HadoopClient {

   private static Configuration conf = new Configuration();
   private final static Logger logger = Logger.getLogger(HadoopClient.class);

   /**
    * Convert the lines of text in a file to binary and write to a Hadoop
    * sequence file.
    * 
    * @param dataFile File containing lines of text
    * @param sequenceFileName Name of the sequence file to create
    * @param hadoopFS Hadoop file system
    * 
    * @throws IOException
    */
   public static void writeToSequenceFile(File dataFile, String sequenceFileName, String hadoopFS) throws IOException {

      IntWritable key = null;
      BytesWritable value = null;

      conf.set("fs.defaultFS", hadoopFS);
      Path path = new Path(sequenceFileName);

      if ((conf != null) && (dataFile != null) && (dataFile.exists())) {
         SequenceFile.Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(path),
               SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD, new GzipCodec()),
               SequenceFile.Writer.keyClass(IntWritable.class), SequenceFile.Writer.valueClass(BytesWritable.class));

         List<String> lines = FileUtils.readLines(dataFile);

         for (int i = 0; i < lines.size(); i++) {
            value = new BytesWritable(lines.get(i).getBytes());
            key = new IntWritable(i);
            writer.append(key, value);
         }
         IOUtils.closeStream(writer);
      }
   }

   /**
    * Read a Hadoop sequence file on HDFS.
    * 
    * @param sequenceFileName Name of the sequence file to read
    * @param hadoopFS Hadoop file system
    * 
    * @throws IOException
    */
   public static void readSequenceFile(String sequenceFileName, String hadoopFS) throws IOException {
      conf.set("fs.defaultFS", hadoopFS);
      Path path = new Path(sequenceFileName);
      SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(path));
      IntWritable key = (IntWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
      BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
      while (reader.next(key, value)) {
         logger.info("key : " + key + " - value : " + new String(value.getBytes()));
      }
      IOUtils.closeStream(reader);
   }

   /**
    * Copy a local sequence file to a remote file on HDFS.
    * 
    * @param from Name of the sequence file to copy
    * @param to Name of the sequence file to copy to
    * @param remoteHadoopFS HDFS host URI
    * 
    * @throws IOException
    */
   public static void copySequenceFile(String from, String to, String remoteHadoopFS) throws IOException {
      conf.set("fs.defaultFS", remoteHadoopFS);
      FileSystem fs = FileSystem.get(conf);

      Path localPath = new Path(from);
      Path hdfsPath = new Path(to);
      boolean deleteSource = true;

      fs.copyFromLocalFile(deleteSource, localPath, hdfsPath);
      logger.info("Copied SequenceFile from: " + from + " to: " + to);
   }

   /**
    * Print all the values in Hadoop HDFS configuration object.
    * 
    * @param conf
    */
   public static void listHadoopConfiguration(Configuration conf) {
      int i = 0;
      logger.info("------------------------------------------------------------------------------------------");
      Iterator iterator = conf.iterator();
      while (iterator.hasNext()) {
         i++;
         iterator.next();
         logger.info(i + " - " + iterator.next());
      }
      logger.info("------------------------------------------------------------------------------------------");
   }
}

2. HadoopClientTest.java
package com.noushin.hadoop.client;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
public class HadoopClientTest {

   @Autowired
   HadoopClient hadoopClient;

   String sequenceFileName = "/tmp/nb.sgz";
   String hadoopLocalFS = "file:///";
   String hadoopRemoteFS = "hdfs://stage-hadoop01:8020";


   @Test
   public void testConfig() {
      Configuration conf = new Configuration();
      HadoopClient.listHadoopConfiguration(conf);
   }

   @Test
   public void testWriteSequenceFile() {
      String dataFileName = "/tmp/test.txt";

      try {
         int numOfLines = 20;
         String baseStr = "....Test...";
         List<String> lines = new ArrayList<String>();
         for (int i = 0; i < numOfLines; i++)
            lines.add(i + baseStr + UUID.randomUUID());

         File dataFile = new File(dataFileName);
         FileUtils.writeLines(dataFile, lines, true);
         Thread.sleep(2000);
         HadoopClient.writeToSequenceFile(dataFile, sequenceFileName, hadoopLocalFS);
      }
      catch (IOException e) {
          e.printStackTrace();
      }
      catch (InterruptedException e) {
         e.printStackTrace();
      }
   }

   @Test
   public void testReadSequenceFile() {

      try {       
         HadoopClient.readSequenceFile(sequenceFileName, hadoopLocalFS);
      }
      catch (IOException e) {
         e.printStackTrace();
      }
   }

   @Test
   public void testCopySequenceFileToRemoteHDFS() {
      String tempFileName = "/tmp/local-test.txt";
      String sequenceFileName = "/tmp/seqfile-record-compressed.sgz";
      String hadoopLocalFS = "file:///";
      String hadoopRemoteFS = "hdfs://stage-hadoop01:8020";

      try {
         int numOfLines = 5;
         String baseStr = "....Test...";
         List<String> lines = new ArrayList<String>();
         for (int i = 0; i < numOfLines; i++)
            lines.add(i + baseStr + UUID.randomUUID());

         File dataFile = new File(tempFileName);
         FileUtils.writeLines(dataFile, lines, true);
         Thread.sleep(2000);
         HadoopClient.writeToSequenceFile(dataFile, sequenceFileName, hadoopLocalFS);
         HadoopClient.readSequenceFile(sequenceFileName, hadoopLocalFS);
         HadoopClient.copySequenceFile(sequenceFileName, sequenceFileName, hadoopRemoteFS);
         HadoopClient.readSequenceFile(sequenceFileName, hadoopRemoteFS);
      }
      catch (IOException e) {
          e.printStackTrace();
      }
      catch (InterruptedException e) {
         e.printStackTrace();
      }
   }   
}

3. pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.noushin.hadoop</groupId>
    <artifactId>client</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>hdpc</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <hadoop-client.version>2.0.0-cdh4.2.0</hadoop-client.version>
        <junit.version>4.10</junit.version>
        <log4j.version>1.2.17</log4j.version>
        <spring.version>3.2.0.RELEASE</spring.version>
    </properties>

    <dependencies>
        <!-- Test -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>

        <!-- Hadoop -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop-client.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Logging -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <!-- Spring -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>
        
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>org.springframework.test</artifactId>
            <version>${spring.version}</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>
If you are using Eclipse for development and testing like I do, you need to add the following step, so you can compress your sequence file using GZip.

If you notice, I am using Hadoop by Cloudera in my pom file. To use GZip, I need to add a native library to my development environment which is Ubuntu 12.10.
sudo apt-get update; sudo apt-get install hadoop

This will install Hadoop native libraries in /usr/lib/hadoop/lib/native. Now, In Eclipse, edit ->Properties->Java Build Path->Libraries->Maven Dependencies->Native library location, and set "Location Path" to /usr/lib/hadoop/lib/native.
Please make sure the version of Hadoop client dependecy you use in your pom file matches the version of Hadoop you downloaded to your system, otherwise you will get a run time error:
ERROR nativeio.NativeIO: Unable to initialize NativeIO libraries
To verify a sequence file was created on HDFS, log into one of your hadoop nodes and run this command:
hadoop fs -ls /tmp/nb.sgz
And, if you run into a problem and need to see what Hadoop is doing, turn on debugging for Hadoop classes by adding the following entry to your log4j.properties: #Turn on hadoop logging

log4j.logger.org.apache.hadoop=DEBUG
To run Hive: login using a hadoop user such as oozie_job, so that environment is set up.

$ sudo su - oozie_job
To use hive:

$ hive
now you can query data using sql like commands:

DESCRIBE my_transactions;

SELECT * FROM my_transactions WHERE year=2013 AND month=3 AND day=14; 
To see where a partition is pointing to:

DESCRIBE EXTENDED my_transactions PARTITION(year=2013, month=3, day=28);
To create a partition, so Hive can find data for its queries:

ALTER TABLE my_transactions ADD PARTITION(year=2013, month=3, day=26) LOCATION '/tmp/2013/03/26';
To drop a partition and point it to a new location:

ALTER TABLE my_transactions DROP PARTITION(year=2013, month=3, day=26);