Wednesday, April 10, 2013

Reading & Writing Hadoop Sequence Files in Java

In this blog I'll show you how to write a simple hadoop client application in Java. This app will handle reading, writing and copying Hadoop Sequence Files on local or remote Hadoop file systems (HDFS).

1. HadoopClient.java
package com.noushin.hadoop.client;

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;

/**
 * This class handles interactions with Hadoop.
 * 
 * @author nbashir
 * 
 */
@Component
public class HadoopClient {

   private static Configuration conf = new Configuration();
   private final static Logger logger = Logger.getLogger(HadoopClient.class);

   /**
    * Convert the lines of text in a file to binary and write to a Hadoop
    * sequence file.
    * 
    * @param dataFile File containing lines of text
    * @param sequenceFileName Name of the sequence file to create
    * @param hadoopFS Hadoop file system
    * 
    * @throws IOException
    */
   public static void writeToSequenceFile(File dataFile, String sequenceFileName, String hadoopFS) throws IOException {

      IntWritable key = null;
      BytesWritable value = null;

      conf.set("fs.defaultFS", hadoopFS);
      Path path = new Path(sequenceFileName);

      if ((conf != null) && (dataFile != null) && (dataFile.exists())) {
         SequenceFile.Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(path),
               SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD, new GzipCodec()),
               SequenceFile.Writer.keyClass(IntWritable.class), SequenceFile.Writer.valueClass(BytesWritable.class));

         List<String> lines = FileUtils.readLines(dataFile);

         for (int i = 0; i < lines.size(); i++) {
            value = new BytesWritable(lines.get(i).getBytes());
            key = new IntWritable(i);
            writer.append(key, value);
         }
         IOUtils.closeStream(writer);
      }
   }

   /**
    * Read a Hadoop sequence file on HDFS.
    * 
    * @param sequenceFileName Name of the sequence file to read
    * @param hadoopFS Hadoop file system
    * 
    * @throws IOException
    */
   public static void readSequenceFile(String sequenceFileName, String hadoopFS) throws IOException {
      conf.set("fs.defaultFS", hadoopFS);
      Path path = new Path(sequenceFileName);
      SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(path));
      IntWritable key = (IntWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
      BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
      while (reader.next(key, value)) {
         logger.info("key : " + key + " - value : " + new String(value.getBytes()));
      }
      IOUtils.closeStream(reader);
   }

   /**
    * Copy a local sequence file to a remote file on HDFS.
    * 
    * @param from Name of the sequence file to copy
    * @param to Name of the sequence file to copy to
    * @param remoteHadoopFS HDFS host URI
    * 
    * @throws IOException
    */
   public static void copySequenceFile(String from, String to, String remoteHadoopFS) throws IOException {
      conf.set("fs.defaultFS", remoteHadoopFS);
      FileSystem fs = FileSystem.get(conf);

      Path localPath = new Path(from);
      Path hdfsPath = new Path(to);
      boolean deleteSource = true;

      fs.copyFromLocalFile(deleteSource, localPath, hdfsPath);
      logger.info("Copied SequenceFile from: " + from + " to: " + to);
   }

   /**
    * Print all the values in Hadoop HDFS configuration object.
    * 
    * @param conf
    */
   public static void listHadoopConfiguration(Configuration conf) {
      int i = 0;
      logger.info("------------------------------------------------------------------------------------------");
      Iterator iterator = conf.iterator();
      while (iterator.hasNext()) {
         i++;
         iterator.next();
         logger.info(i + " - " + iterator.next());
      }
      logger.info("------------------------------------------------------------------------------------------");
   }
}

2. HadoopClientTest.java
package com.noushin.hadoop.client;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
public class HadoopClientTest {

   @Autowired
   HadoopClient hadoopClient;

   String sequenceFileName = "/tmp/nb.sgz";
   String hadoopLocalFS = "file:///";
   String hadoopRemoteFS = "hdfs://stage-hadoop01:8020";


   @Test
   public void testConfig() {
      Configuration conf = new Configuration();
      HadoopClient.listHadoopConfiguration(conf);
   }

   @Test
   public void testWriteSequenceFile() {
      String dataFileName = "/tmp/test.txt";

      try {
         int numOfLines = 20;
         String baseStr = "....Test...";
         List<String> lines = new ArrayList<String>();
         for (int i = 0; i < numOfLines; i++)
            lines.add(i + baseStr + UUID.randomUUID());

         File dataFile = new File(dataFileName);
         FileUtils.writeLines(dataFile, lines, true);
         Thread.sleep(2000);
         HadoopClient.writeToSequenceFile(dataFile, sequenceFileName, hadoopLocalFS);
      }
      catch (IOException e) {
          e.printStackTrace();
      }
      catch (InterruptedException e) {
         e.printStackTrace();
      }
   }

   @Test
   public void testReadSequenceFile() {

      try {       
         HadoopClient.readSequenceFile(sequenceFileName, hadoopLocalFS);
      }
      catch (IOException e) {
         e.printStackTrace();
      }
   }

   @Test
   public void testCopySequenceFileToRemoteHDFS() {
      String tempFileName = "/tmp/local-test.txt";
      String sequenceFileName = "/tmp/seqfile-record-compressed.sgz";
      String hadoopLocalFS = "file:///";
      String hadoopRemoteFS = "hdfs://stage-hadoop01:8020";

      try {
         int numOfLines = 5;
         String baseStr = "....Test...";
         List<String> lines = new ArrayList<String>();
         for (int i = 0; i < numOfLines; i++)
            lines.add(i + baseStr + UUID.randomUUID());

         File dataFile = new File(tempFileName);
         FileUtils.writeLines(dataFile, lines, true);
         Thread.sleep(2000);
         HadoopClient.writeToSequenceFile(dataFile, sequenceFileName, hadoopLocalFS);
         HadoopClient.readSequenceFile(sequenceFileName, hadoopLocalFS);
         HadoopClient.copySequenceFile(sequenceFileName, sequenceFileName, hadoopRemoteFS);
         HadoopClient.readSequenceFile(sequenceFileName, hadoopRemoteFS);
      }
      catch (IOException e) {
          e.printStackTrace();
      }
      catch (InterruptedException e) {
         e.printStackTrace();
      }
   }   
}

3. pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.noushin.hadoop</groupId>
    <artifactId>client</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>hdpc</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <hadoop-client.version>2.0.0-cdh4.2.0</hadoop-client.version>
        <junit.version>4.10</junit.version>
        <log4j.version>1.2.17</log4j.version>
        <spring.version>3.2.0.RELEASE</spring.version>
    </properties>

    <dependencies>
        <!-- Test -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>

        <!-- Hadoop -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop-client.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Logging -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <!-- Spring -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>
        
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>org.springframework.test</artifactId>
            <version>${spring.version}</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>
If you are using Eclipse for development and testing like I do, you need to add the following step, so you can compress your sequence file using GZip.

If you notice, I am using Hadoop by Cloudera in my pom file. To use GZip, I need to add a native library to my development environment which is Ubuntu 12.10.
sudo apt-get update; sudo apt-get install hadoop

This will install Hadoop native libraries in /usr/lib/hadoop/lib/native. Now, In Eclipse, edit ->Properties->Java Build Path->Libraries->Maven Dependencies->Native library location, and set "Location Path" to /usr/lib/hadoop/lib/native.
Please make sure the version of Hadoop client dependecy you use in your pom file matches the version of Hadoop you downloaded to your system, otherwise you will get a run time error:
ERROR nativeio.NativeIO: Unable to initialize NativeIO libraries
To verify a sequence file was created on HDFS, log into one of your hadoop nodes and run this command:
hadoop fs -ls /tmp/nb.sgz
And, if you run into a problem and need to see what Hadoop is doing, turn on debugging for Hadoop classes by adding the following entry to your log4j.properties: #Turn on hadoop logging

log4j.logger.org.apache.hadoop=DEBUG
To run Hive: login using a hadoop user such as oozie_job, so that environment is set up.

$ sudo su - oozie_job
To use hive:

$ hive
now you can query data using sql like commands:

DESCRIBE my_transactions;

SELECT * FROM my_transactions WHERE year=2013 AND month=3 AND day=14; 
To see where a partition is pointing to:

DESCRIBE EXTENDED my_transactions PARTITION(year=2013, month=3, day=28);
To create a partition, so Hive can find data for its queries:

ALTER TABLE my_transactions ADD PARTITION(year=2013, month=3, day=26) LOCATION '/tmp/2013/03/26';
To drop a partition and point it to a new location:

ALTER TABLE my_transactions DROP PARTITION(year=2013, month=3, day=26);

18 comments:

  1. The Information which you provided is very much useful for Hadoop Online Training Learners Thank You for Sharing Valuable Information

    ReplyDelete
  2. Fantastic article ! You havemade some very astute statements and I appreciate the the effort you have put into your writing. Its clear that you know what you are writing about. I am excited to read more of your sites content.

    Hadoop online training

    ReplyDelete
  3. The information which you have provided is very good and easily understood.
    It is very useful who is looking for Hadoop Online Training.

    ReplyDelete
  4. Thank you very much for the excellent code sample. Even better your code is up to date with Hadoop 2.20+ (no deprecated methods).

    ReplyDelete
  5. Hi,

    In HadoopClient.listHadoopConfiguration(), there is an extra iterator.next() which will cause NoSuchElementException.

    public static void listHadoopConfiguration(Configuration conf) {
    int i = 0;
    logger.info("--------");
    Iterator iterator = conf.iterator();
    while (iterator.hasNext()) {
    i++;
    //iterator.next(); // <--- This line will cause error
    logger.info(i + " - " + iterator.next());
    }
    logger.info("--------");
    }

    ReplyDelete
  6. Does the spring test artifact name is correct? artifact id suppose to be spring-test rather org.springframework.test ? or do you refering this artifact from different repository?

    ReplyDelete
  7. Hi,
    I'm creating sequenceFile on a remote server, my hadoop server user/password is different that the machine I tried to connect from. How do I mention the remote server credential information? SequenceFile writer somehow picks my system user name and failed to create SequenceFile with authentication failure error, any idea how we can override the system property with help of Configuration interface? if yes, what would the property name for user and password value. Your help would be appreciated.

    ReplyDelete
  8. Very useful... beginners see this http://www.javatrainingchennai.in/ as well to get some basics

    ReplyDelete
  9. Hai,Thanks for your article.This program useful for my hadoop software project works.

    Hadoop Training in Chennai

    ReplyDelete
  10. There are lots of information about latest technology and how to get trained in them, like Hadoop Training Chennai have spread around the web, but this is a unique one according to me. The strategy you have updated here will make me to get trained in future technologies(Hadoop Training in Chennai). By the way you are running a great blog. Thanks for sharing this (Salesforce Training in Chennai).

    ReplyDelete
  11. I have read your blog, it was good to read & I am getting some useful info's through your blog keep sharing... Informatica is an ETL tools helps to transform your old business leads into new vision. Learn Informatica training in chennai from corporate professionals with very good experience in informatica tool.
    Regards,
    Best Informatica Training In Chennai|Informatica training center in Chennai|Informatica training chennai

    ReplyDelete
  12. I really enjoyed while reading your article, the information you have mentioned in this post was damn good. Keep sharing your blog with updated and useful information.
    Regards,
    sas course in Chennai|sas training center in Chennai|sas training in Velachery

    ReplyDelete
  13. Thanks for Sharing this valuble information and itis useful for me and CORE SAP learners.We also provides the best SAP Online Training

    SAP Online Training | sap abap online training course | sap crm online training | sap fico online training | sap sd online training

    ReplyDelete
  14. It is amazing and wonderful to visit your site.Thanks for sharing this information,this is useful to me...
    Android Training in Chennai
    Ios Training in Chennai

    ReplyDelete
  15. Good work sir, Thanks for the proper explanation about Read & write data to hdfs java api . I found one of the good resource related Read & Write Data To HDFS Using Java API Programs and hadoop tutorial. It is providing in-depth knowledge on Read & Write Data To HDFS Using Java API Programs and hadoop tutorial. which I am sharing a link with you where you can get more clear on HAdoop file system programs . To know more Just have a look at this link

    Top Read & Write Data To HDFS Using Java API Programs
    Top Read & Write Data To HDFS Using Java API Programs 2

    ReplyDelete