HAWQ InputFormat for MapReduce
MapReduce is a programming model developed by Google for processing and generating large data sets on an array of commodity servers. You can use the HAWQ InputFormat class to enable MapReduce jobs to access HAWQ data stored in HDFS.
To use HAWQ InputFormat, you need only to provide the URL of the database to connect to, along with the table name you want to access. HAWQ InputFormat fetches only the metadata of the database and table of interest, which is much less data than the table data itself. After getting the metadata, HAWQ InputFormat determines where and how the table data is stored in HDFS. It reads and parses those HDFS files and processes the parsed table tuples directly inside a Map task.
This chapter describes the document format and schema for defining HAWQ MapReduce jobs.
Supported Data Types
HAWQ InputFormat supports the following data types:
SQL/HAWQ | JDBC/JAVA | setXXX | getXXX |
---|---|---|---|
DECIMAL/NUMERIC | java.math.BigDecimal | setBigDecimal | getBigDecimal |
FLOAT8/DOUBLE PRECISION | double | setDouble | getDouble |
INT8/BIGINT | long | setLong | getLong |
INTEGER/INT4/INT | int | setInt | getInt |
FLOAT4/REAL | float | setFloat | getFloat |
SMALLINT/INT2 | short | setShort | getShort |
BOOL/BOOLEAN | boolean | setBoolean | getBoolean |
VARCHAR/CHAR/TEXT | String | setString | getString |
DATE | java.sql.Date | setDate | getDate |
TIME/TIMETZ | java.sql.Time | setTime | getTime |
TIMESTAMP/TIMSTAMPTZ | java.sql.Timestamp | setTimestamp | getTimestamp |
ARRAY | java.sq.Array | setArray | getArray |
BIT/VARBIT | com.pivotal.hawq.mapreduce.datatype. | setVarbit | getVarbit |
BYTEA | byte[] | setByte | getByte |
INTERVAL | com.pivotal.hawq.mapreduce.datatype.HAWQInterval | setInterval | getInterval |
POINT | com.pivotal.hawq.mapreduce.datatype.HAWQPoint | setPoint | getPoint |
LSEG | com.pivotal.hawq.mapreduce.datatype.HAWQLseg | setLseg | getLseg |
BOX | com.pivotal.hawq.mapreduce.datatype.HAWQBox | setBox | getBox |
CIRCLE | com.pivotal.hawq.mapreduce.datatype.HAWQCircle | setVircle | getCircle |
PATH | com.pivotal.hawq.mapreduce.datatype.HAWQPath | setPath | getPath |
POLYGON | com.pivotal.hawq.mapreduce.datatype.HAWQPolygon | setPolygon | getPolygon |
MACADDR | com.pivotal.hawq.mapreduce.datatype.HAWQMacaddr | setMacaddr | getMacaddr |
INET | com.pivotal.hawq.mapreduce.datatype.HAWQInet | setInet | getInet |
CIDR | com.pivotal.hawq.mapreduce.datatype.HAWQCIDR | setCIDR | getCIDR |
HAWQ InputFormat Example
The following example shows how you can use the HAWQInputFormat
class to access HAWQ table data from MapReduce jobs.
package com.mycompany.app;
import com.pivotal.hawq.mapreduce.HAWQException;
import com.pivotal.hawq.mapreduce.HAWQInputFormat;
import com.pivotal.hawq.mapreduce.HAWQRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.io.IntWritable;
import java.io.IOException;
public class HAWQInputFormatDemoDriver extends Configured
implements Tool {
// CREATE TABLE employees (
// id INTEGER NOT NULL, name VARCHAR(32) NOT NULL);
public static class DemoMapper extends
Mapper<Void, HAWQRecord, IntWritable, Text> {
int id = 0;
String name = null;
public void map(Void key, HAWQRecord value, Context context)
throws IOException, InterruptedException {
try {
id = value.getInt(1);
name = value.getString(2);
} catch (HAWQException hawqE) {
throw new IOException(hawqE.getMessage());
}
context.write(new IntWritable(id), new Text(name));
}
}
private static int printUsage() {
System.out.println("HAWQInputFormatDemoDriver
<database_url> <table_name> <output_path> [username]
[password]");
ToolRunner.printGenericCommandUsage(System.out);
return 2;
}
public int run(String[] args) throws Exception {
if (args.length < 3) {
return printUsage();
}
Job job = Job.getInstance(getConf());
job.setJobName("hawq-inputformat-demo");
job.setJarByClass(HAWQInputFormatDemoDriver.class);
job.setMapperClass(DemoMapper.class);
job.setMapOutputValueClass(Text.class);
job.setOutputValueClass(Text.class);
String db_url = args[0];
String table_name = args[1];
String output_path = args[2];
String user_name = null;
if (args.length > 3) {
user_name = args[3];
}
String password = null;
if (args.length > 4) {
password = args[4];
}
job.setInputFormatClass(HAWQInputFormat.class);
HAWQInputFormat.setInput(job.getConfiguration(), db_url,
user_name, password, table_name);
FileOutputFormat.setOutputPath(job, new
Path(output_path));
job.setNumReduceTasks(0);
int res = job.waitForCompletion(true) ? 0 : 1;
return res;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(),
new HAWQInputFormatDemoDriver(), args);
System.exit(res);
}
}
To compile and run the example:
Create a work directory:
$ mkdir mrwork $ cd mrwork
Copy and paste the Java code above into a
.java
file.$ mkdir -p com/mycompany/app $ cd com/mycompany/app $ vi HAWQInputFormatDemoDriver.java
Note the following dependencies required for compilation:
-
HAWQInputFormat
jars (located in the$GPHOME/lib/postgresql/hawq-mr-io
directory):-
hawq-mapreduce-common.jar
-
hawq-mapreduce-ao.jar
-
hawq-mapreduce-parquet.jar
-
hawq-mapreduce-tool.jar
-
- Required 3rd party jars (located in the
$GPHOME/lib/postgresql/hawq-mr-io/lib
directory):-
parquet-common-1.1.0.jar
-
parquet-format-1.1.0.jar
-
parquet-hadoop-1.1.0.jar
-
postgresql-n.n-n-jdbc4.jar
-
snakeyaml-n.n.jar
-
- Hadoop Mapreduce related jars (located in the install directory of your Hadoop distribution).
-
Compile the Java program. You may choose to use a different compilation command:
javac -classpath /usr/hdp/2.4.2.0-258/hadoop-mapreduce/*:/usr/local/hawq/lib/postgresql/hawq-mr-io/*:/usr/local/hawq/lib/postgresql/hawq-mr-io/lib/*:/usr/hdp/current/hadoop-client/* HAWQInputFormatDemoDriver.java
Build the JAR file.
$ cd ../../.. $ jar cf my-app.jar com $ cp my-app.jar /tmp
Check that you have installed HAWQ and HDFS and your HAWQ cluster is running.
Create sample table:
Log in to HAWQ:
$ psql -d postgres
Create the table:
CREATE TABLE employees ( id INTEGER NOT NULL, name TEXT NOT NULL);
Or a Parquet table:
CREATE TABLE employees ( id INTEGER NOT NULL, name TEXT NOT NULL) WITH (APPENDONLY=true, ORIENTATION=parquet);
Insert one tuple:
INSERT INTO employees VALUES (1, 'Paul'); \q
Ensure the system
pg_hba.conf
configuration file is set up to allowgpadmin
access to thepostgres
database.Use the following shell script snippet showing how to run the Mapreduce job:
#!/bin/bash # set up environment variables HAWQMRLIB=/usr/local/hawq/lib/postgresql/hawq-mr-io export HADOOP_CLASSPATH=$HAWQMRLIB/hawq-mapreduce-ao.jar:$HAWQMRLIB/hawq-mapreduce-common.jar:$HAWQMRLIB/hawq-mapreduce-tool.jar:$HAWQMRLIB/hawq-mapreduce-parquet.jar:$HAWQMRLIB/lib/postgresql-9.2-1003-jdbc4.jar:$HAWQMRLIB/lib/snakeyaml-1.12.jar:$HAWQMRLIB/lib/parquet-hadoop-1.1.0.jar:$HAWQMRLIB/lib/parquet-common-1.1.0.jar:$HAWQMRLIB/lib/parquet-format-1.0.0.jar export LIBJARS=$HAWQMRLIB/hawq-mapreduce-ao.jar,$HAWQMRLIB/hawq-mapreduce-common.jar,$HAWQMRLIB/hawq-mapreduce-tool.jar,$HAWQMRLIB/lib/postgresql-9.2-1003-jdbc4.jar,$HAWQMRLIB/lib/snakeyaml-1.12.jar,$HAWQMRLIB/hawq-mapreduce-parquet.jar,$HAWQMRLIB/lib/parquet-hadoop-1.1.0.jar,$HAWQMRLIB/lib/parquet-common-1.1.0.jar,$HAWQMRLIB/lib/parquet-format-1.0.0.jar # usage: hadoop jar JARFILE CLASSNAME -libjars JARS <database_url> <table_name> <output_path_on_HDFS> # - writing output to HDFS, so run as hdfs user # - if not using the default postgres port, replace 5432 with port number for your HAWQ cluster HADOOP_USER_NAME=hdfs hadoop jar /tmp/my-app.jar com.mycompany.app.HAWQInputFormatDemoDriver -libjars $LIBJARS localhost:5432/postgres employees /tmp/employees
The MapReduce job output is written to the
/tmp/employees
directory on the HDFS file system.Use the following command to check the result of the Mapreduce job:
$ sudo -u hdfs hdfs dfs -ls /tmp/employees $ sudo -u hdfs hdfs dfs -cat /tmp/employees/*
The output will appear as follows:
1 Paul
If you choose to run the program again, delete the output file and directory:
$ sudo -u hdfs hdfs dfs -rm /tmp/employees/* $ sudo -u hdfs hdfs dfs -rmdir /tmp/employees
Accessing HAWQ Data
You can access HAWQ data using the HAWQInputFormat.setInput()
interface. You will use a different API signature depending on whether HAWQ is running or not.
- When HAWQ is running, use
HAWQInputFormat.setInput(Configuration conf, String db_url, String username, String password, String tableName)
. - When HAWQ is not running, first extract the table metadata to a file with the Metadata Export Tool and then use
HAWQInputFormat.setInput(Configuration conf, String pathStr)
.
HAWQ is Running
/**
* Initializes the map-part of the job with the appropriate input settings
* through connecting to Database.
*
* @param conf
* The map-reduce job configuration
* @param db_url
* The database URL to connect to
* @param username
* The username for setting up a connection to the database
* @param password
* The password for setting up a connection to the database
* @param tableName
* The name of the table to access to
* @throws Exception
*/
public static void setInput(Configuration conf, String db_url,
String username, String password, String tableName)
throws Exception;
HAWQ is not Running
Use the metadata export tool, hawq extract
, to export the metadata of the target table into a local YAML file:
$ hawq extract [-h hostname] [-p port] [-U username] [-d database] [-o output_file] [-W] <tablename>
Using the extracted metadata, access HAWQ data through the following interface. Pass the complete path to the .yaml
file in the pathStr
argument.
/**
* Initializes the map-part of the job with the appropriate input settings through reading metadata file stored in local filesystem.
*
* To get metadata file, please use hawq extract first
*
* @param conf
* The map-reduce job configuration
* @param pathStr
* The metadata file path in local filesystem. e.g.
* /home/gpadmin/metadata/postgres_test
* @throws Exception
*/
public static void setInput(Configuration conf, String pathStr)
throws Exception;