1. HadoopClient.java
package com.noushin.hadoop.client;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;
/**
* This class handles interactions with Hadoop.
*
* @author nbashir
*
*/
@Component
public class HadoopClient {
private static Configuration conf = new Configuration();
private final static Logger logger = Logger.getLogger(HadoopClient.class);
/**
* Convert the lines of text in a file to binary and write to a Hadoop
* sequence file.
*
* @param dataFile File containing lines of text
* @param sequenceFileName Name of the sequence file to create
* @param hadoopFS Hadoop file system
*
* @throws IOException
*/
public static void writeToSequenceFile(File dataFile, String sequenceFileName, String hadoopFS) throws IOException {
IntWritable key = null;
BytesWritable value = null;
conf.set("fs.defaultFS", hadoopFS);
Path path = new Path(sequenceFileName);
if ((conf != null) && (dataFile != null) && (dataFile.exists())) {
SequenceFile.Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(path),
SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD, new GzipCodec()),
SequenceFile.Writer.keyClass(IntWritable.class), SequenceFile.Writer.valueClass(BytesWritable.class));
List<String> lines = FileUtils.readLines(dataFile);
for (int i = 0; i < lines.size(); i++) {
value = new BytesWritable(lines.get(i).getBytes());
key = new IntWritable(i);
writer.append(key, value);
}
IOUtils.closeStream(writer);
}
}
/**
* Read a Hadoop sequence file on HDFS.
*
* @param sequenceFileName Name of the sequence file to read
* @param hadoopFS Hadoop file system
*
* @throws IOException
*/
public static void readSequenceFile(String sequenceFileName, String hadoopFS) throws IOException {
conf.set("fs.defaultFS", hadoopFS);
Path path = new Path(sequenceFileName);
SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(path));
IntWritable key = (IntWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
while (reader.next(key, value)) {
logger.info("key : " + key + " - value : " + new String(value.getBytes()));
}
IOUtils.closeStream(reader);
}
/**
* Copy a local sequence file to a remote file on HDFS.
*
* @param from Name of the sequence file to copy
* @param to Name of the sequence file to copy to
* @param remoteHadoopFS HDFS host URI
*
* @throws IOException
*/
public static void copySequenceFile(String from, String to, String remoteHadoopFS) throws IOException {
conf.set("fs.defaultFS", remoteHadoopFS);
FileSystem fs = FileSystem.get(conf);
Path localPath = new Path(from);
Path hdfsPath = new Path(to);
boolean deleteSource = true;
fs.copyFromLocalFile(deleteSource, localPath, hdfsPath);
logger.info("Copied SequenceFile from: " + from + " to: " + to);
}
/**
* Print all the values in Hadoop HDFS configuration object.
*
* @param conf
*/
public static void listHadoopConfiguration(Configuration conf) {
int i = 0;
logger.info("------------------------------------------------------------------------------------------");
Iterator iterator = conf.iterator();
while (iterator.hasNext()) {
i++;
iterator.next();
logger.info(i + " - " + iterator.next());
}
logger.info("------------------------------------------------------------------------------------------");
}
}
2. HadoopClientTest.java
package com.noushin.hadoop.client;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
public class HadoopClientTest {
@Autowired
HadoopClient hadoopClient;
String sequenceFileName = "/tmp/nb.sgz";
String hadoopLocalFS = "file:///";
String hadoopRemoteFS = "hdfs://stage-hadoop01:8020";
@Test
public void testConfig() {
Configuration conf = new Configuration();
HadoopClient.listHadoopConfiguration(conf);
}
@Test
public void testWriteSequenceFile() {
String dataFileName = "/tmp/test.txt";
try {
int numOfLines = 20;
String baseStr = "....Test...";
List<String> lines = new ArrayList<String>();
for (int i = 0; i < numOfLines; i++)
lines.add(i + baseStr + UUID.randomUUID());
File dataFile = new File(dataFileName);
FileUtils.writeLines(dataFile, lines, true);
Thread.sleep(2000);
HadoopClient.writeToSequenceFile(dataFile, sequenceFileName, hadoopLocalFS);
}
catch (IOException e) {
e.printStackTrace();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void testReadSequenceFile() {
try {
HadoopClient.readSequenceFile(sequenceFileName, hadoopLocalFS);
}
catch (IOException e) {
e.printStackTrace();
}
}
@Test
public void testCopySequenceFileToRemoteHDFS() {
String tempFileName = "/tmp/local-test.txt";
String sequenceFileName = "/tmp/seqfile-record-compressed.sgz";
String hadoopLocalFS = "file:///";
String hadoopRemoteFS = "hdfs://stage-hadoop01:8020";
try {
int numOfLines = 5;
String baseStr = "....Test...";
List<String> lines = new ArrayList<String>();
for (int i = 0; i < numOfLines; i++)
lines.add(i + baseStr + UUID.randomUUID());
File dataFile = new File(tempFileName);
FileUtils.writeLines(dataFile, lines, true);
Thread.sleep(2000);
HadoopClient.writeToSequenceFile(dataFile, sequenceFileName, hadoopLocalFS);
HadoopClient.readSequenceFile(sequenceFileName, hadoopLocalFS);
HadoopClient.copySequenceFile(sequenceFileName, sequenceFileName, hadoopRemoteFS);
HadoopClient.readSequenceFile(sequenceFileName, hadoopRemoteFS);
}
catch (IOException e) {
e.printStackTrace();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
3. pom.xml<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.noushin.hadoop</groupId>
<artifactId>client</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>hdpc</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop-client.version>2.0.0-cdh4.2.0</hadoop-client.version>
<junit.version>4.10</junit.version>
<log4j.version>1.2.17</log4j.version>
<spring.version>3.2.0.RELEASE</spring.version>
</properties>
<dependencies>
<!-- Test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<!-- Hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop-client.version}</version>
<scope>provided</scope>
</dependency>
<!-- Logging -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<!-- Spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>org.springframework.test</artifactId>
<version>${spring.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
If you are using Eclipse for development and testing like I do, you need to add the following step, so you can compress your sequence file using GZip.If you notice, I am using Hadoop by Cloudera in my pom file. To use GZip, I need to add a native library to my development environment which is Ubuntu 12.10.
sudo apt-get update; sudo apt-get install hadoop
This will install Hadoop native libraries in /usr/lib/hadoop/lib/native. Now, In Eclipse, edit ERROR nativeio.NativeIO: Unable to initialize NativeIO libraries
To verify a sequence file was created on HDFS, log into one of your hadoop nodes and run this command:hadoop fs -ls /tmp/nb.sgz
And, if you run into a problem and need to see what Hadoop is doing, turn on debugging for Hadoop classes by adding the following entry to your log4j.properties:
#Turn on hadoop logging
log4j.logger.org.apache.hadoop=DEBUG
To run Hive:
login using a hadoop user such as oozie_job, so that environment is set up.
$ sudo su - oozie_job
To use hive:
$ hive
now you can query data using sql like commands:
DESCRIBE my_transactions;
SELECT * FROM my_transactions WHERE year=2013 AND month=3 AND day=14;
To see where a partition is pointing to:
DESCRIBE EXTENDED my_transactions PARTITION(year=2013, month=3, day=28);
To create a partition, so Hive can find data for its queries:
ALTER TABLE my_transactions ADD PARTITION(year=2013, month=3, day=26) LOCATION '/tmp/2013/03/26';
To drop a partition and point it to a new location:
ALTER TABLE my_transactions DROP PARTITION(year=2013, month=3, day=26);