Friday, November 11, 2011

Hadoop : Hive


Hadoop is a batch processing system, and Hadoop jobs tend to have high latency and incur substantial overhead in job submission and scheduling. As a result, latency for Hive queries is generally very high (minutes) even when data sets involved are very small (say a few hundred megabytes). As a result, it is difficult to compare Hive with systems such as Oracle, where analyses are conducted on a significantly smaller amount of data, but the analyses proceed much more iteratively with the response times between iterations being less than a few minutes. Hive aims to provide acceptable (but not optimal) latency for interactive data browsing, queries over small data sets, or test queries. Hive also does not provide the kind of data or query cache needed to make repeated queries over the same data set faster.
Hive is not designed for online transaction processing and does not offer real-time queries and row level updates. It is best used for batch jobs over large sets of append-only data (like web logs). What Hive values most are scalability (scale out with more machines added dynamically to the Hadoop cluster), extensibility (with MapReduce framework and UDF/UDAF/UDTF), fault-tolerance, and loose-coupling with its input formats.

Installation and Configuration


Installing Hive is simple and only requires having Java 1.6 and Ant installed on your machine.
Hive is available via SVN at http://svn.apache.org/repos/asf/hive/trunk. You can download it by running the following command.
$ svn co http://svn.apache.org/repos/asf/hive/trunk hive
To build hive, execute the following command on the base directory:
$ ant package
It will create the subdirectory build/dist with the following contents:
  • README.txt: readme file.
  • bin/: directory containing all the shell scripts
  • lib/: directory containing all required jar files)
  • conf/: directory with configuration files
  • examples/: directory with sample input and query files
Subdirectory build/dist should contain all the files necessary to run hive. You can run it from there or copy it to a different location, if you prefer.
In order to run Hive, you must have hadoop in your path or have defined the environment variable HADOOP_HOME with the hadoop installation directory.
Moreover, we strongly advise users to create the HDFS directories /tmp and /user/hive/warehouse
(aka hive.metastore.warehouse.dir) and set them chmod g+w before tables are created in Hive.
To use hive command line interface (cli) go to the hive home directory (the one with the contents of build/dist) and execute the following command:
$ bin/hive
Metadata is stored in an embedded Derby database whose disk storage location is determined by the hive configuration variable named javax.jdo.option.ConnectionURL. By default (see conf/hive-default.xml), this location is ./metastore_db
Using Derby in embedded mode allows at most one user at a time. To configure Derby to run in server mode, look at HiveDerbyServerMode.


Requirements

  • Java 1.6
  • Hadoop 0.20.x.

    Installing Hive from a Stable Release

Start by downloading the most recent stable release of Hive from one of the Apache download mirrors (see Hive Releases).
Next you need to unpack the tarball. This will result in the creation of a subdirectory named hive-x.y.z:
  $ tar -xzvf hive-x.y.z.tar.gz
Set the environment variable HIVE_HOME to point to the installation directory:
  $ cd hive-x.y.z
  $ export HIVE_HOME={{pwd}}
Finally, add $HIVE_HOME/bin to your PATH:
  $ export PATH=$HIVE_HOME/bin:$PATH

Building Hive from Source

The Hive SVN repository is located here: http://svn.apache.org/repos/asf/hive/trunk
  $ svn co http://svn.apache.org/repos/asf/hive/trunk hive
  $ cd hive
  $ ant clean package
  $ cd build/dist
  $ ls
  README.txt
  bin/ (all the shell scripts)
  lib/ (required jar files)
  conf/ (configuration files)
  examples/ (sample input and query files)
In the rest of the page, we use build/dist and <install-dir> interchangeably.

Running Hive

Hive uses hadoop that means:
  • you must have hadoop in your path OR
  • export HADOOP_HOME=<hadoop-install-dir>
In addition, you must create /tmp and /user/hive/warehouse
(aka hive.metastore.warehouse.dir) and set them chmod g+w in
HDFS before a table can be created in Hive.
Commands to perform this setup
  $ $HADOOP_HOME/bin/hadoop fs -mkdir       /tmp
  $ $HADOOP_HOME/bin/hadoop fs -mkdir       /user/hive/warehouse
  $ $HADOOP_HOME/bin/hadoop fs -chmod g+w   /tmp
  $ $HADOOP_HOME/bin/hadoop fs -chmod g+w   /user/hive/warehouse
I also find it useful but not necessary to set HIVE_HOME
  $ export HIVE_HOME=<hive-install-dir>
To use hive command line interface (cli) from the shell:
  $ $HIVE_HOME/bin/hive

Configuration management overview

  • Hive default configuration is stored in <install-dir>/conf/hive-default.xml
    Configuration variables can be changed by (re-)defining them in <install-dir>/conf/hive-site.xml
  • The location of the Hive configuration directory can be changed by setting the HIVE_CONF_DIR environment variable.
  • Log4j configuration is stored in <install-dir>/conf/hive-log4j.properties
  • Hive configuration is an overlay on top of hadoop - meaning the hadoop configuration variables are inherited by default.
  • Hive configuration can be manipulated by:
  • Editing hive-site.xml and defining any desired variables (including hadoop variables) in it
  • From the cli using the set command (see below)
  • By invoking hive using the syntax:
    • $ bin/hive -hiveconf x1=y1 -hiveconf x2=y2
      this sets the variables x1 and x2 to y1 and y2 respectively
  • By setting the HIVE_OPTS environment variable to "-hiveconf x1=y1 -hiveconf x2=y2" which does the same as above

    Runtime configuration

  • Hive queries are executed using map-reduce queries and, therefore, the behavior
    of such queries can be controlled by the hadoop configuration variables.
  • The cli command 'SET' can be used to set any hadoop (or hive) configuration variable. For example:
    hive> SET mapred.job.tracker=myhost.mycompany.com:50030;
    hive> SET -v;
The latter shows all the current settings. Without the -v option only the
variables that differ from the base hadoop configuration are displayed

Hive, Map-Reduce and Local-Mode

Hive compiler generates map-reduce jobs for most queries. These jobs are then submitted to the Map-Reduce cluster indicated by the variable:
 
  mapred.job.tracker
While this usually points to a map-reduce cluster with multiple nodes, Hadoop also offers a nifty option to run map-reduce jobs locally on the user's workstation. This can be very useful to run queries over small data sets - in such cases local mode execution is usually significantly faster than submitting jobs to a large cluster. Data is accessed transparently from HDFS. Conversely, local mode only runs with one reducer and can be very slow processing larger data sets.
Starting v-0.7, Hive fully supports local mode execution. To enable this, the user can enable the following option:
  hive> SET mapred.job.tracker=local;
In addition, mapred.local.dir should point to a path that's valid on the local machine (for example /tmp/<username>/mapred/local). (Otherwise, the user will get an exception allocating local disk space).
Starting v-0.7, Hive also supports a mode to run map-reduce jobs in local-mode automatically. The relevant options are:
  hive> SET hive.exec.mode.local.auto=false;
note that this feature is disabled by default. If enabled - Hive analyzes the size of each map-reduce job in a query and may run it locally if the following thresholds are satisfied:
  • The total input size of the job is lower than: hive.exec.mode.local.auto.inputbytes.max (128MB by default)
  • The total number of map-tasks is less than: hive.exec.mode.local.auto.tasks.max (4 by default)
  • The total number of reduce tasks required is 1 or 0.
So for queries over small data sets, or for queries with multiple map-reduce jobs where the input to subsequent jobs is substantially smaller (because of reduction/filtering in the prior job), jobs may be run locally.
Note that there may be differences in the runtime environment of hadoop server nodes and the machine running the hive client (because of different jvm versions or different software libraries). This can cause unexpected behavior/errors while running in local mode. Also note that local mode execution is done in a separate, child jvm (of the hive client). If the user so wishes, the maximum amount of memory for this child jvm can be controlled via the optionhive.mapred.local.mem. By default, it's set to zero, in which case Hive lets Hadoop determine the default memory limits of the child jvm.

Error Logs

Hive uses log4j for logging. By default logs are not emitted to the
console by the CLI. The default logging level is WARN and the logs are stored in the folder:
  • /tmp/<user.name>/hive.log
If the user wishes - the logs can be emitted to the console by adding
the arguments shown below:
  • bin/hive -hiveconf hive.root.logger=INFO,console
Alternatively, the user can change the logging level only by using:
  • bin/hive -hiveconf hive.root.logger=INFO,DRFA
Note that setting hive.root.logger via the 'set' command does not
change logging properties since they are determined at initialization time.
Logging during Hive execution on a Hadoop cluster is controlled by Hadoop configuration. Usually Hadoop will produce one log file per map and reduce task stored on the cluster machine(s) where the task was executed. The log files can be obtained by clicking through to the Task Details page from the Hadoop JobTracker Web UI.
When using local mode (using mapred.job.tracker=local), Hadoop/Hive execution logs are produced on the client machine itself. Starting v-0.6 - Hive uses the hive-exec-log4j.properties (falling back to hive-log4j.properties only if it's missing) to determine where these logs are delivered by default. The default configuration file produces one log file per query executed in local mode and stores it under /tmp/<user.name>. The intent of providing a separate configuration file is to enable administrators to centralize execution log capture if desired (on a NFS file server for example). Execution logs are invaluable for debugging run-time errors.
Error logs are very useful to debug problems. Please send them with any bugs (of which there are many!) to hive-dev@hadoop.apache.org.

DDL Operations

Creating Hive tables and browsing through them
  hive> CREATE TABLE pokes (foo INT, bar STRING);  
Creates a table called pokes with two columns, the first being an integer and the other a string
  hive> CREATE TABLE invites (foo INT, bar STRING) PARTITIONED BY (ds STRING);  
Creates a table called invites with two columns and a partition column
called ds. The partition column is a virtual column. It is not part
of the data itself but is derived from the partition that a
particular dataset is loaded into.
By default, tables are assumed to be of text input format and the
delimiters are assumed to be ^A(ctrl-a).
  hive> SHOW TABLES;
lists all the tables
  hive> SHOW TABLES '.*s';
lists all the table that end with 's'. The pattern matching follows Java regular
expressions. Check out this link for documentation http://java.sun.com/javase/6/docs/api/java/util/regex/Pattern.html
hive> DESCRIBE invites;
shows the list of columns
As for altering tables, table names can be changed and additional columns can be dropped:
  hive> ALTER TABLE pokes ADD COLUMNS (new_col INT);
  hive> ALTER TABLE invites ADD COLUMNS (new_col2 INT COMMENT 'a comment');
  hive> ALTER TABLE events RENAME TO 3koobecaf;
Dropping tables:
  hive> DROP TABLE pokes;

Metadata Store

Metadata is in an embedded Derby database whose disk storage location is determined by the
hive configuration variable named javax.jdo.option.ConnectionURL. By default
(see conf/hive-default.xml), this location is ./metastore_db
Right now, in the default configuration, this metadata can only be seen by
one user at a time.
Metastore can be stored in any database that is supported by JPOX. The
location and the type of the RDBMS can be controlled by the two variables
javax.jdo.option.ConnectionURL and javax.jdo.option.ConnectionDriverName.
Refer to JDO (or JPOX) documentation for more details on supported databases.
The database schema is defined in JDO metadata annotations file package.jdo
at src/contrib/hive/metastore/src/model.
In the future, the metastore itself can be a standalone server.
If you want to run the metastore as a network server so it can be accessed
from multiple nodes try HiveDerbyServerMode.

DML Operations

Loading data from flat files into Hive:
  hive> LOAD DATA LOCAL INPATH './examples/files/kv1.txt' OVERWRITE INTO TABLE pokes; 
Loads a file that contains two columns separated by ctrl-a into pokes table.
'local' signifies that the input file is on the local file system. If 'local'
is omitted then it looks for the file in HDFS.
The keyword 'overwrite' signifies that existing data in the table is deleted.
If the 'overwrite' keyword is omitted, data files are appended to existing data sets.
NOTES:
  • NO verification of data against the schema is performed by the load command.
  • If the file is in hdfs, it is moved into the Hive-controlled file system namespace.
    The root of the Hive directory is specified by the option hive.metastore.warehouse.dir
    in hive-default.xml. We advise users to create this directory before
    trying to create tables via Hive.
  hive> LOAD DATA LOCAL INPATH './examples/files/kv2.txt' OVERWRITE INTO TABLE invites PARTITION (ds='2008-08-15');
  hive> LOAD DATA LOCAL INPATH './examples/files/kv3.txt' OVERWRITE INTO TABLE invites PARTITION (ds='2008-08-08');
The two LOAD statements above load data into two different partitions of the table
invites. Table invites must be created as partitioned by the key ds for this to succeed.
  hive> LOAD DATA INPATH '/user/myname/kv2.txt' OVERWRITE INTO TABLE invites PARTITION (ds='2008-08-15');
The above command will load data from an HDFS file/directory to the table.
Note that loading data from HDFS will result in moving the file/directory. As a result, the operation is almost instantaneous.

SQL Operations

Example Queries

Some example queries are shown below. They are available in build/dist/examples/queries.
More are available in the hive sources at ql/src/test/queries/positive

SELECTS and FILTERS

  hive> SELECT a.foo FROM invites a WHERE a.ds='2008-08-15';
selects column 'foo' from all rows of partition ds=2008-08-15 of the invites table. The results are not
stored anywhere, but are displayed on the console.
Note that in all the examples that follow, INSERT (into a hive table, local
directory or HDFS directory) is optional.
  hive> INSERT OVERWRITE DIRECTORY '/tmp/hdfs_out' SELECT a.* FROM invites a WHERE a.ds='2008-08-15';
selects all rows from partition ds=2008-08-15 of the invites table into an HDFS directory. The result data
is in files (depending on the number of mappers) in that directory.
NOTE: partition columns if any are selected by the use of *. They can also
be specified in the projection clauses.
Partitioned tables must always have a partition selected in the WHERE clause of the statement.
  hive> INSERT OVERWRITE LOCAL DIRECTORY '/tmp/local_out' SELECT a.* FROM pokes a;
Selects all rows from pokes table into a local directory
  hive> INSERT OVERWRITE TABLE events SELECT a.* FROM profiles a;
  hive> INSERT OVERWRITE TABLE events SELECT a.* FROM profiles a WHERE a.key < 100; 
  hive> INSERT OVERWRITE LOCAL DIRECTORY '/tmp/reg_3' SELECT a.* FROM events a;
  hive> INSERT OVERWRITE DIRECTORY '/tmp/reg_4' select a.invites, a.pokes FROM profiles a;
  hive> INSERT OVERWRITE DIRECTORY '/tmp/reg_5' SELECT COUNT(*) FROM invites a WHERE a.ds='2008-08-15';
  hive> INSERT OVERWRITE DIRECTORY '/tmp/reg_5' SELECT a.foo, a.bar FROM invites a;
  hive> INSERT OVERWRITE LOCAL DIRECTORY '/tmp/sum' SELECT SUM(a.pc) FROM pc1 a;
Sum of a column. avg, min, max can also be used. Note that for versions of Hive which don't include HIVE-287, you'll need to use COUNT(1) in place of COUNT(*).

GROUP BY

  hive> FROM invites a INSERT OVERWRITE TABLE events SELECT a.bar, count(*) WHERE a.foo > 0 GROUP BY a.bar;
  hive> INSERT OVERWRITE TABLE events SELECT a.bar, count(*) FROM invites a WHERE a.foo > 0 GROUP BY a.bar;
Note that for versions of Hive which don't include HIVE-287, you'll need to use COUNT(1) in place of COUNT(*).

JOIN

  hive> FROM pokes t1 JOIN invites t2 ON (t1.bar = t2.bar) INSERT OVERWRITE TABLE events SELECT t1.bar, t1.foo, t2.foo;

MULTITABLE INSERT

  FROM src
  INSERT OVERWRITE TABLE dest1 SELECT src.* WHERE src.key < 100
  INSERT OVERWRITE TABLE dest2 SELECT src.key, src.value WHERE src.key >= 100 and src.key < 200
  INSERT OVERWRITE TABLE dest3 PARTITION(ds='2008-04-08', hr='12') SELECT src.key WHERE src.key >= 200 and src.key < 300
  INSERT OVERWRITE LOCAL DIRECTORY '/tmp/dest4.out' SELECT src.value WHERE src.key >= 300;

STREAMING

  hive> FROM invites a INSERT OVERWRITE TABLE events SELECT TRANSFORM(a.foo, a.bar) AS (oof, rab) USING '/bin/cat' WHERE a.ds > '2008-08-09';
This streams the data in the map phase through the script /bin/cat (like hadoop streaming).
Similarly - streaming can be used on the reduce side (please see the Hive Tutorial for examples)

Simple Example Use Cases

MovieLens User Ratings

First, create a table with tab-delimited text file format:
CREATE TABLE u_data (
  userid INT,
  movieid INT,
  rating INT,
  unixtime STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;
Then, download and extract the data files:
wget http://www.grouplens.org/system/files/ml-data.tar+0.gz
tar xvzf ml-data.tar+0.gz
And load it into the table that was just created:
LOAD DATA LOCAL INPATH 'ml-data/u.data'
OVERWRITE INTO TABLE u_data;
Count the number of rows in table u_data:
SELECT COUNT(*) FROM u_data;
Note that for versions of Hive which don't include HIVE-287, you'll need to use COUNT(1) in place of COUNT(*).
Now we can do some complex data analysis on the table u_data:
Create weekday_mapper.py:
import sys
import datetime

for line in sys.stdin:
  line = line.strip()
  userid, movieid, rating, unixtime = line.split('\t')
  weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
  print '\t'.join([userid, movieid, rating, str(weekday)])
Use the mapper script:
CREATE TABLE u_data_new (
  userid INT,
  movieid INT,
  rating INT,
  weekday INT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';

add FILE weekday_mapper.py;

INSERT OVERWRITE TABLE u_data_new
SELECT
  TRANSFORM (userid, movieid, rating, unixtime)
  USING 'python weekday_mapper.py'
  AS (userid, movieid, rating, weekday)
FROM u_data;

SELECT weekday, COUNT(*)
FROM u_data_new
GROUP BY weekday;
Note that if you're using Hive 0.5.0 or earlier you will need to use COUNT(1) in place of COUNT(*).

Apache Weblog Data

The format of Apache weblog is customizable, while most webmasters uses the default.
For default Apache weblog, we can create a table with the following command.
More about !RegexSerDe can be found here: http://issues.apache.org/jira/browse/HIVE-662
add jar ../build/contrib/hive_contrib.jar;

CREATE TABLE apachelog (
  host STRING,
  identity STRING,
  user STRING,
  time STRING,
  request STRING,
  status STRING,
  size STRING,
  referer STRING,
  agent STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
  "input.regex" = "([^]*) ([^]*) ([^]*) (-|\\[^\\]*\\]) ([^ \"]*|\"[^\"]*\") (-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\".*\") ([^ \"]*|\".*\"))?",
  "output.format.string" = "%1$s %2$s %3$s %4$s %5$s %6$s %7$s %8$s %9$s"
)
STORED AS TEXTFILE;

What is Hive

Hive is a data warehousing infrastructure based on the Hadoop. Hadoop provides massive scale out and fault tolerance capabilities for data storage and processing (using the map-reduce programming paradigm) on commodity hardware.
Hive is designed to enable easy data summarization, ad-hoc querying and analysis of large volumes of data. It provides a simple query language called Hive QL, which is based on SQL and which enables users familiar with SQL to do ad-hoc querying, summarization and data analysis easily. At the same time, Hive QL also allows traditional map/reduce programmers to be able to plug in their custom mappers and reducers to do more sophisticated analysis that may not be supported by the built-in capabilities of the language.

What is NOT Hive

Hadoop is a batch processing system and Hadoop jobs tend to have high latency and incur substantial overheads in job submission and scheduling. As a result - latency for Hive queries is generally very high (minutes) even when data sets involved are very small (say a few hundred megabytes). As a result it cannot be compared with systems such as Oracle where analyses are conducted on a significantly smaller amount of data but the analyses proceed much more iteratively with the response times between iterations being less than a few minutes. Hive aims to provide acceptable (but not optimal) latency for interactive data browsing, queries over small data sets or test queries.
Hive is not designed for online transaction processing and does not offer real-time queries and row level updates. It is best used for batch jobs over large sets of immutable data (like web logs).
In the following sections we provide a tutorial on the capabilities of the system. We start by describing the concepts of data types, tables and partitions (which are very similar to what you would find in a traditional relational DBMS) and then illustrate the capabilities of the QL language with the help of some examples.

Data Units

In the order of granularity - Hive data is organized into:
  • Databases: Namespaces that separate tables and other data units from naming confliction.
  • Tables: Homogeneous units of data which have the same schema. An example of a table could be page_views table, where each row could comprise of the following columns (schema):
    • timestamp - which is of INT type that corresponds to a unix timestamp of when the page was viewed.
    • userid - which is of BIGINT type that identifies the user who viewed the page.
    • page_url - which is of STRING type that captures the location of the page.
    • referer_url - which is of STRING that captures the location of the page from where the user arrived at the current page.
    • IP - which is of STRING type that captures the IP address from where the page request was made.
  • Partitions: Each Table can have one or more partition Keys which determines how the data is stored. Partitions - apart from being storage units - also allow the user to efficiently identify the rows that satisfy a certain criteria. For example, a date_partition of type STRING and country_partition of type STRING. Each unique value of the partition keys defines a partition of the Table. For example all "US" data from "2009-12-23" is a partition of the page_views table. Therefore, if you run analysis on only the "US" data for 2009-12-23, you can run that query only on the relevant partition of the table thereby speeding up the analysis significantly. Note however, that just because a partition is named 2009-12-23 does not mean that it contains all or only data from that date; partitions are named after dates for convenience but it is the user's job to guarantee the relationship between partition name and data content!). Partition columns are virtual columns, they are not part of the data itself but are derived on load.
  • Buckets (or Clusters): Data in each partition may in turn be divided into Buckets based on the value of a hash function of some column of the Table. For example the page_views table may be bucketed by userid, which is one of the columns, other than the partitions columns, of the page_view table. These can be used to efficiently sample the data.
Note that it is not necessary for tables to be partitioned or bucketed, but these abstractions allow the system to prune large quantities of data during query processing, resulting in faster query execution.

Type System

Primitive Types

  • Types are associated with the columns in the tables. The following Primitive types are supported:
  • Integers
    • TINYINT - 1 byte integer
    • SMALLINT - 2 byte integer
    • INT - 4 byte integer
    • BIGINT - 8 byte integer
  • Boolean type
    • BOOLEAN - TRUE/FALSE
  • Floating point numbers
    • FLOAT - single precision
    • DOUBLE - Double precision
  • String type
    • STRING - sequence of characters in a specified character set
The Types are organized in the following hierarchy (where the parent is a super type of all the children instances):
  • Type
    • Primitive Type
      • Number
        • DOUBLE
          • BIGINT
            • INT
              • TINYINT
            • FLOAT
            • INT
              • TINYINT
            • STRING
          • BOOLEAN
This type hierarchy defines how the types are implicitly converted in the query language. Implicit conversion is allowed for types from child to an ancestor. So when a query expression expects type1 and the data is of type2 type2 is implicitly converted to type1 if type1 is an ancestor of type2 in the type hierarchy. Apart from these fundamental rules for implicit conversion based on type system, Hive also allows the special case for conversion:
  • <STRING> to <DOUBLE>
Explicit type conversion can be done using the cast operator as shown in the Built in functions section below.

Complex Types

Complex Types can be built up from primitive types and other composite types using:
  • Structs: the elements within the type can be accessed using the DOT (.) notation. For example, for a column c of type STRUCT {a INT; b INT} the a field is accessed by the expression c.a
  • Maps (key-value tuples): The elements are accessed using ['element name'] notation. For example in a map M comprising of a mapping from 'group' -> gid the gid value can be accessed using M['group']
  • Arrays (indexable lists): The elements in the array have to be in the same type. Elements can be accessed using the [n] notation where n is an index (zero-based) into the array. For example for an array A having the elements ['a', 'b', 'c'], A[1] retruns 'b'.
Using the primitive types and the constructs for creating complex types, types with arbitrary levels of nesting can be created. For example, a type User may comprise of the following fields:
  • gender - which is a STRING.
  • active - which is a BOOLEAN.

Built in operators and functions

Built in operators

  • Relational Operators - The following operators compare the passed operands and generate a TRUE or FALSE value depending on whether the comparison between the operands holds or not.
Relational OperatorOperand typesDescription
A = Ball primitive typesTRUE if expression A is equivalent to expression B otherwise FALSE
A != Ball primitive typesTRUE if expression A is not equivalent to expression B otherwise FALSE
A < Ball primitive typesTRUE if expression A is less than expression B otherwise FALSE
A <= Ball primitive typesTRUE if expression A is less than or equal to expression B otherwise FALSE
A > Ball primitive typesTRUE if expression A is greater than expression B otherwise FALSE
A >= Ball primitive typesTRUE if expression A is greater than or equal to expression B otherwise FALSE
A IS NULLall typesTRUE if expression A evaluates to NULL otherwise FALSE
A IS NOT NULLall typesFALSE if expression A evaluates to NULL otherwise TRUE
A LIKE BstringsTRUE if string A matches the SQL simple regular expression B, otherwise FALSE. The comparison is done character by character. The _ character in B matches any character in A (similar to . in posix regular expressions), and the % character in B matches an arbitrary number of characters in A (similar to .* in posix regular expressions). For example, 'foobar' LIKE 'foo' evaluates to FALSE where as 'foobar' LIKE 'foo___' evaluates to TRUE and so does 'foobar' LIKE 'foo%'. To escape % use \ (% matches one % character)
A RLIKE BstringsTRUE if string A matches the Java regular expression B (See Java regular expressions syntax), otherwise FALSE. For example, 'foobar' rlike 'foo' evaluates to FALSE whereas 'foobar' rlike '^f.*r$' evaluates to TRUE
A REGEXP BstringsSame as RLIKE
  • Arithmetic Operators - The following operators support various common arithmetic operations on the operands. All of them return number types.
Arithmetic OperatorsOperand typesDescription
A + Ball number typesGives the result of adding A and B. The type of the result is the same as the common parent(in the type hierarchy) of the types of the operands. e.g. since every integer is a float, therefore float is a containing type of integer so the + operator on a float and an int will result in a float.
A - Ball number typesGives the result of subtracting B from A. The type of the result is the same as the common parent(in the type hierarchy) of the types of the operands.
A * Ball number typesGives the result of multiplying A and B. The type of the result is the same as the common parent(in the type hierarchy) of the types of the operands. Note that if the multiplication causing overflow, you will have to cast one of the operators to a type higher in the type hierarchy.
A / Ball number typesGives the result of dividing B from A. The type of the result is the same as the common parent(in the type hierarchy) of the types of the operands. If the operands are integer types, then the result is the quotient of the division.
A % Ball number typesGives the reminder resulting from dividing A by B. The type of the result is the same as the common parent(in the type hierarchy) of the types of the operands.
A & Ball number typesGives the result of bitwise AND of A and B. The type of the result is the same as the common parent(in the type hierarchy) of the types of the operands.
A | Ball number typesGives the result of bitwise OR of A and B. The type of the result is the same as the common parent(in the type hierarchy) of the types of the operands.
A ^ Ball number typesGives the result of bitwise XOR of A and B. The type of the result is the same as the common parent(in the type hierarchy) of the types of the operands.
~Aall number typesGives the result of bitwise NOT of A. The type of the result is the same as the type of A.
  • Logical Operators - The following operators provide support for creating logical expressions. All of them return boolean TRUE or FALSE depending upon the boolean values of the operands.
Logical OperatorsOperands typesDescription
A AND BbooleanTRUE if both A and B are TRUE, otherwise FALSE
A && BbooleanSame as A AND B
A OR BbooleanTRUE if either A or B or both are TRUE, otherwise FALSE
A | BbooleanSame as A OR B
NOT AbooleanTRUE if A is FALSE, otherwise FALSE
!AbooleanSame as NOT A
  • Operators on Complex Types - The following operators provide mechanisms to access elements in Complex Types
OperatorOperand typesDescription
A[n]A is an Array and n is an intreturns the nth element in the array A. The first element has index 0 e.g. if A is an array comprising of ['foo', 'bar'] then A[0] returns 'foo' and A[1] returns 'bar'
M[key]M is a Map<K, V> and key has type Kreturns the value corresponding to the key in the map e.g. if M is a map comprising of {'f' -> 'foo', 'b' -> 'bar', 'all' -> 'foobar'} then M['all'] returns 'foobar'
S.xS is a structreturns the x field of S e.g for struct foobar {int foo, int bar} foobar.foo returns the integer stored in the foo field of the struct.

Built in functions

Return TypeFunction Name (Signature)Description
BIGINTround(double a)returns the rounded BIGINT value of the double
BIGINTfloor(double a)returns the maximum BIGINT value that is equal or less than the double
BIGINTceil(double a)returns the minimum BIGINT value that is equal or greater than the double
doublerand(), rand(int seed)returns a random number (that changes from row to row). Specifiying the seed will make sure the generated random number sequence is deterministic.
stringconcat(string A, string B,...)returns the string resulting from concatenating B after A. For example, concat('foo', 'bar') results in 'foobar'. This function accepts arbitrary number of arguments and return the concatenation of all of them.
stringsubstr(string A, int start)returns the substring of A starting from start position till the end of string A. For example, substr('foobar', 4) results in 'bar'
stringsubstr(string A, int start, int length)returns the substring of A starting from start position with the given length e.g. substr('foobar', 4, 2) results in 'ba'
stringupper(string A)returns the string resulting from converting all characters of A to upper case e.g. upper('fOoBaR') results in 'FOOBAR'
stringucase(string A)Same as upper
stringlower(string A)returns the string resulting from converting all characters of B to lower case e.g. lower('fOoBaR') results in 'foobar'
stringlcase(string A)Same as lower
stringtrim(string A)returns the string resulting from trimming spaces from both ends of A e.g. trim(' foobar ') results in 'foobar'
stringltrim(string A)returns the string resulting from trimming spaces from the beginning(left hand side) of A. For example, ltrim(' foobar ') results in 'foobar '
stringrtrim(string A)returns the string resulting from trimming spaces from the end(right hand side) of A. For example, rtrim(' foobar ') results in ' foobar'
stringregexp_replace(string A, string B, string C)returns the string resulting from replacing all substrings in B that match the Java regular expression syntax(See Java regular expressions syntax) with C. For example, regexp_replace('foobar', 'oo|ar', ) returns 'fb'
intsize(Map<K.V>)returns the number of elements in the map type
intsize(Array<T>)returns the number of elements in the array type
value of <type>cast(<expr> as <type>)converts the results of the expression expr to <type> e.g. cast('1' as BIGINT) will convert the string '1' to it integral representation. A null is returned if the conversion does not succeed.
stringfrom_unixtime(int unixtime)convert the number of seconds from unix epoch (1970-01-01 00:00:00 UTC) to a string representing the timestamp of that moment in the current system time zone in the format of "1970-01-01 00:00:00"
stringto_date(string timestamp)Return the date part of a timestamp string: to_date("1970-01-01 00:00:00") = "1970-01-01"
intyear(string date)Return the year part of a date or a timestamp string: year("1970-01-01 00:00:00") = 1970, year("1970-01-01") = 1970
intmonth(string date)Return the month part of a date or a timestamp string: month("1970-11-01 00:00:00") = 11, month("1970-11-01") = 11
intday(string date)Return the day part of a date or a timestamp string: day("1970-11-01 00:00:00") = 1, day("1970-11-01") = 1
stringget_json_object(string json_string, string path)Extract json object from a json string based on json path specified, and return json string of the extracted json object. It will return null if the input json string is invalid
  • The following built in aggregate functions are supported in Hive:
Return TypeAggregation Function Name (Signature)Description
BIGINTcount(*), count(expr), count(DISTINCT expr[, expr_.])count(*) - Returns the total number of retrieved rows, including rows containing NULL values; count(expr) - Returns the number of rows for which the supplied expression is non-NULL; count(DISTINCT expr[, expr]) - Returns the number of rows for which the supplied expression(s) are unique and non-NULL.
DOUBLEsum(col), sum(DISTINCT col)returns the sum of the elements in the group or the sum of the distinct values of the column in the group
DOUBLEavg(col), avg(DISTINCT col)returns the average of the elements in the group or the average of the distinct values of the column in the group
DOUBLEmin(col)returns the minimum value of the column in the group
DOUBLEmax(col)returns the maximum value of the column in the group

Language capabilities

Hive query language provides the basic SQL like operations. These operations work on tables or partitions. These operations are:
  • Ability to filter rows from a table using a where clause.
  • Ability to select certain columns from the table using a select clause.
  • Ability to do equi-joins between two tables.
  • Ability to evaluate aggregations on multiple "group by" columns for the data stored in a table.
  • Ability to store the results of a query into another table.
  • Ability to download the contents of a table to a local (e.g., nfs) directory.
  • Ability to store the results of a query in a hadoop dfs directory.
  • Ability to manage tables and partitions (create, drop and alter).
  • Ability to plug in custom scripts in the language of choice for custom map/reduce jobs.

Usage and Examples

The following examples highlight some salient features of the system. A detailed set of query test cases can be found at Hive Query Test Cases and the corresponding results can be found at Query Test Case Results

Creating Tables

An example statement that would create the page_view table mentioned above would be like:
    CREATE TABLE page_view(viewTime INT, userid BIGINT,
                    page_url STRING, referrer_url STRING,
                    ip STRING COMMENT 'IP Address of the User')
    COMMENT 'This is the page view table'
    PARTITIONED BY(dt STRING, country STRING)
    STORED AS SEQUENCEFILE;
In this example the columns of the table are specified with the corresponding types. Comments can be attached both at the column level as well as at the table level. Additionally the partitioned by clause defines the partitioning columns which are different from the data columns and are actually not stored with the data. When specified in this way, the data in the files is assumed to be delimited with ASCII 001(ctrl-A) as the field delimiter and newline as the row delimiter.
The field delimiter can be parametrized if the data is not in the above format as illustrated in the following example:
    CREATE TABLE page_view(viewTime INT, userid BIGINT,
                    page_url STRING, referrer_url STRING,
                    ip STRING COMMENT 'IP Address of the User')
    COMMENT 'This is the page view table'
    PARTITIONED BY(dt STRING, country STRING)
    ROW FORMAT DELIMITED
            FIELDS TERMINATED BY '1'
    STORED AS SEQUENCEFILE;
The row deliminator currently cannot be changed since it is not determined by Hive but Hadoop. e delimiters.
It is also a good idea to bucket the tables on certain columns so that efficient sampling queries can be executed against the data set. If bucketing is absent, random sampling can still be done on the table but it is not efficient as the query has to scan all the data. The following example illustrates the case of the page_view table that is bucketed on the userid column:
    CREATE TABLE page_view(viewTime INT, userid BIGINT,
                    page_url STRING, referrer_url STRING,
                    ip STRING COMMENT 'IP Address of the User')
    COMMENT 'This is the page view table'
    PARTITIONED BY(dt STRING, country STRING)
    CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS
    ROW FORMAT DELIMITED
            FIELDS TERMINATED BY '1'
            COLLECTION ITEMS TERMINATED BY '2'
            MAP KEYS TERMINATED BY '3'
    STORED AS SEQUENCEFILE;
In the example above, the table is clustered by a hash function of userid into 32 buckets. Within each bucket the data is sorted in increasing order of viewTime. Such an organization allows the user to do efficient sampling on the clustered column - in this case userid. The sorting property allows internal operators to take advantage of the better-known data structure while evaluating queries with greater efficiency.
    CREATE TABLE page_view(viewTime INT, userid BIGINT,
                    page_url STRING, referrer_url STRING,
                    friends ARRAY<BIGINT>, properties MAP<STRING, STRING>
                    ip STRING COMMENT 'IP Address of the User')
    COMMENT 'This is the page view table'
    PARTITIONED BY(dt STRING, country STRING)
    CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS
    ROW FORMAT DELIMITED
            FIELDS TERMINATED BY '1'
            COLLECTION ITEMS TERMINATED BY '2'
            MAP KEYS TERMINATED BY '3'
    STORED AS SEQUENCEFILE;
In this example the columns that comprise of the table row are specified in a similar way as the definition of types. Comments can be attached both at the column level as well as at the table level. Additionally the partitioned by clause defines the partitioning columns which are different from the data columns and are actually not stored with the data. The CLUSTERED BY clause specifies which column to use for bucketing as well as how many buckets to create. The delimited row format specifies how the rows are stored in the hive table. In the case of the delimited format, this specifies how the fields are terminated, how the items within collections (arrays or maps) are terminated and how the map keys are terminated. STORED AS SEQUENCEFILE indicates that this data is stored in a binary format (using hadoop SequenceFiles) on hdfs. The values shown for the ROW FORMAT and STORED AS clauses in the above example represent the system defaults.
Table names and column names are case insensitive.

Browsing Tables and Partitions

    SHOW TABLES;
To list existing tables in the warehouse; there are many of these, likely more than you want to browse.
    SHOW TABLES 'page.*';
To list tables with prefix 'page'. The pattern follows Java regular expression syntax (so the period is a wildcard).
    SHOW PARTITIONS page_view;
To list partitions of a table. If the table is not a partitioned table then an error is thrown.
    DESCRIBE page_view;
To list columns and column types of table.
    DESCRIBE EXTENDED page_view;
To list columns and all other properties of table. This prints lot of information and that too not in a pretty format. Usually used for debugging.
   DESCRIBE EXTENDED page_view PARTITION (ds='2008-08-08');
To list columns and all other properties of a partition. This also prints lot of information which is usually used for debugging.

Loading Data

There are multiple ways to load data into Hive tables. The user can create an external table that points to a specified location within HDFS. In this particular usage, the user can copy a file into the specified location using the HDFS put or copy commands and create a table pointing to this location with all the relevant row format information. Once this is done, the user can transform the data and insert them into any other Hive table. For example, if the file /tmp/pv_2008-06-08.txt contains comma separated page views served on 2008-06-08, and this needs to be loaded into the page_view table in the appropriate partition, the following sequence of commands can achieve this:
    CREATE EXTERNAL TABLE page_view_stg(viewTime INT, userid BIGINT,
                    page_url STRING, referrer_url STRING,
                    ip STRING COMMENT 'IP Address of the User',
                    country STRING COMMENT 'country of origination')
    COMMENT 'This is the staging page view table'
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '44' LINES TERMINATED BY '12'
    STORED AS TEXTFILE
    LOCATION '/user/data/staging/page_view';

    hadoop dfs -put /tmp/pv_2008-06-08.txt /user/data/staging/page_view

    FROM page_view_stg pvs
    INSERT OVERWRITE TABLE page_view PARTITION(dt='2008-06-08', country='US')
    SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null, null, pvs.ip
    WHERE pvs.country = 'US';
In the example above nulls are inserted for the array and map types in the destination tables but potentially these can also come from the external table if the proper row formats are specified.
This method is useful if there is already legacy data in HDFS on which the user wants to put some metadata so that the data can be queried and manipulated using Hive.
Additionally, the system also supports syntax that can load the data from a file in the local files system directly into a Hive table where the input data format is the same as the table format. If /tmp/pv_2008-06-08_us.txt already contains the data for US, then we do not need any additional filtering as shown in the previous example. The load in this case can be done using the following syntax:
   LOAD DATA LOCAL INPATH /tmp/pv_2008-06-08_us.txt INTO TABLE page_view PARTITION(date='2008-06-08', country='US')
The path argument can take a directory (in which case all the files in the directory are loaded), a single file name, or a wildcard (in which case all the matching files are uploaded). If the argument is a directory - it cannot contain subdirectories. Similarly - the wildcard must match file names only.
In the case that the input file /tmp/pv_2008-06-08_us.txt is very large, the user may decide to do a parallel load of the data (using tools that are external to Hive). Once the file is in HDFS - the following syntax can be used to load the data into a Hive table:
   LOAD DATA INPATH '/user/data/pv_2008-06-08_us.txt' INTO TABLE page_view PARTITION(date='2008-06-08', country='US')
It is assumed that the array and map fields in the input.txt files are null fields for these examples.

Simple Query

For all the active users, one can use the query of the following form:
    INSERT OVERWRITE TABLE user_active
    SELECT user.*
    FROM user
    WHERE user.active = 1;
Note that unlike SQL, we always insert the results into a table. We will illustrate later how the user can inspect these results and even dump them to a local file. You can also run the following query on Hive CLI:
    SELECT user.*
    FROM user
    WHERE user.active = 1;
This will be internally rewritten to some temporary file and displayed to the Hive client side.

Partition Based Query

What partitions to use in a query is determined automatically by the system on the basis of where clause conditions on partition columns. For example, in order to get all the page_views in the month of 03/2008 referred from domain xyz.com, one could write the following query:
    INSERT OVERWRITE TABLE xyz_com_page_views
    SELECT page_views.*
    FROM page_views
    WHERE page_views.date >= '2008-03-01' AND page_views.date <= '2008-03-31' AND
          page_views.referrer_url like '%xyz.com';
Note that page_views.date is used here because the table (above) was defined with PARTITIONED BY(date DATETIME, country STRING) ; if you name your partition something different, don't expect .date to do what you think!

Joins

In order to get a demographic breakdown (by gender) of page_view of 2008-03-03 one would need to join the page_view table and the user table on the userid column. This can be accomplished with a join as shown in the following query:
    INSERT OVERWRITE TABLE pv_users
    SELECT pv.*, u.gender, u.age
    FROM user u JOIN page_view pv ON (pv.userid = u.id)
    WHERE pv.date = '2008-03-03';
In order to do outer joins the user can qualify the join with LEFT OUTER, RIGHT OUTER or FULL OUTER keywords in order to indicate the kind of outer join (left preserved, right preserved or both sides preserved). For example, in order to do a full outer join in the query above, the corresponding syntax would look like the following query:
    INSERT OVERWRITE TABLE pv_users
    SELECT pv.*, u.gender, u.age
    FROM user u FULL OUTER JOIN page_view pv ON (pv.userid = u.id)
    WHERE pv.date = '2008-03-03';
In order check the existence of a key in another table, the user can use LEFT SEMI JOIN as illustrated by the following example.
    INSERT OVERWRITE TABLE pv_users
    SELECT u.*
    FROM user u LEFT SEMI JOIN page_view pv ON (pv.userid = u.id)
    WHERE pv.date = '2008-03-03';
In order to join more than one tables, the user can use the following syntax:
    INSERT OVERWRITE TABLE pv_friends
    SELECT pv.*, u.gender, u.age, f.friends
    FROM page_view pv JOIN user u ON (pv.userid = u.id) JOIN friend_list f ON (u.id = f.uid)
    WHERE pv.date = '2008-03-03';
Note that Hive only supports equi-joins. Also it is best to put the largest table on the rightmost side of the join to get the best performance.

Aggregations

In order to count the number of distinct users by gender one could write the following query:
    INSERT OVERWRITE TABLE pv_gender_sum
    SELECT pv_users.gender, count (DISTINCT pv_users.userid)
    FROM pv_users
    GROUP BY pv_users.gender;
Multiple aggregations can be done at the same time, however, no two aggregations can have different DISTINCT columns .e.g while the following is possible
    INSERT OVERWRITE TABLE pv_gender_agg
    SELECT pv_users.gender, count(DISTINCT pv_users.userid), count(*), sum(DISTINCT pv_users.userid)
    FROM pv_users
    GROUP BY pv_users.gender;
however, the following query is not allowed
    INSERT OVERWRITE TABLE pv_gender_agg
    SELECT pv_users.gender, count(DISTINCT pv_users.userid), count(DISTINCT pv_users.ip)
    FROM pv_users
    GROUP BY pv_users.gender;

Multi Table/File Inserts

The output of the aggregations or simple selects can be further sent into multiple tables or even to hadoop dfs files (which can then be manipulated using hdfs utilities). e.g. if along with the gender breakdown, one needed to find the breakdown of unique page views by age, one could accomplish that with the following query:
    FROM pv_users
    INSERT OVERWRITE TABLE pv_gender_sum
        SELECT pv_users.gender, count_distinct(pv_users.userid)
        GROUP BY pv_users.gender

    INSERT OVERWRITE DIRECTORY '/user/data/tmp/pv_age_sum'
        SELECT pv_users.age, count_distinct(pv_users.userid)
        GROUP BY pv_users.age;
The first insert clause sends the results of the first group by to a Hive table while the second one sends the results to a hadoop dfs files.

Dynamic-partition Insert

In the previous examples, the user has to know which partition to insert into and only one partition can be inserted in one insert statement. If you want to load into multiple partitions, you have to use multi-insert statement as illustrated below.
    FROM page_view_stg pvs
    INSERT OVERWRITE TABLE page_view PARTITION(dt='2008-06-08', country='US')
           SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null, null, pvs.ip WHERE pvs.country = 'US'
    INSERT OVERWRITE TABLE page_view PARTITION(dt='2008-06-08', country='CA')
           SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null, null, pvs.ip WHERE pvs.country = 'CA'
    INSERT OVERWRITE TABLE page_view PARTITION(dt='2008-06-08', country='UK')
           SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null, null, pvs.ip WHERE pvs.country = 'UK';
In order to load data into all country partitions in a particular day, you have to add an insert statement for each country in the input data. This is very inconvenient since you have to have the priori knowledge of the list of countries exist in the input data and create the partitions beforehand. If the list changed for another day, you have to modify your insert DML as well as the partition creation DDLs. It is also inefficient since each insert statement may be turned into a MapReduce Job.
Dynamic-partition insert (or multi-partition insert) is designed to solve this problem by dynamically determining which partitions should be created and populated while scanning the input table. This is a newly added feature that is only available from version 0.6.0 (trunk now). In the dynamic partition insert, the input column values are evaluated to determine which partition this row should be inserted into. If that partition has not been created, it will create that partition automatically. Using this feature you need only one insert statement to create and populate all necessary partitions. In addition, since there is only one insert statement, there is only one corresponding MapReduce job. This significantly improves performance and reduce the Hadoop cluster workload comparing to the multiple insert case.
Below is an example of loading data to all country partitions using one insert statement:
    FROM page_view_stg pvs
    INSERT OVERWRITE TABLE page_view PARTITION(dt='2008-06-08', country)
           SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null, null, pvs.ip, pvs.country
There are several syntactic differences from the multi-insert statement:
  • country appears in the PARTITION specification, but with no value associated. In this case, country is a dynamic partition column. On the other hand, ds has a value associated with it, which means it is a static partition column. If a column is dynamic partition column, its value will be coming from the input column. Currently we only allow dynamic partition columns to be the last column(s) in the partition clause because the partition column order indicates its hierarchical order (meaning dt is the root partition, and country is the child partition). You cannot specify a partition clause with (dt, country='US') because that means you need to update all partitions with any date and its country sub-partition is 'US'.
  • An additional pvs.country column is added in the select statement. This is the corresponding input column for the dynamic partition column. Note that you do not need to add an input column for the static partition column because its value is already known in the PARTITION clause. Note that the dynamic partition values are selected by ordering, not name, and taken as the last columns from the select clause.
Semantics of the dynamic partition insert statement:
  • When there are already non-empty partitions exists for the dynamic partition columns, (e.g., country='CA' exists under some ds root partition), it will be overwritten if the dynamic partition insert saw the same value (say 'CA') in the input data. This is in line with the 'insert overwrite' semantics. However, if the partition value 'CA' does not appear in the input data, the existing partition will not be overwritten.
  • Since a Hive partition corresponds to a directory in HDFS, the partition value has to conform to the HDFS path format (URI in Java). Any character having a special meaning in URI (e.g., '%', ':', '/', '#') will be escaped with '%' followed by 2 bytes of its ASCII value.
  • If the input column is a type different than STRING, its value will be first converted to STRING to be used to construct the HDFS path.
  • If the input column value is NULL or empty string, the row will be put into a special partition, whose name is controlled by the hive parameter hive.exec.default.partition.name. The default value is __HIVE_DEFAULT_PARTITION__. Basically this partition will contain all "bad" rows whose value are not valid partition names. The caveat of this approach is that the bad value will be lost and is replaced by __HIVE_DEFAULT_PARTITION__ if you select them Hive. JIRA HIVE-1309 is a solution to let user specify "bad file" to retain the input partition column values as well.
  • Dynamic partition insert could potentially resource hog in that it could generate a large number of partitions in a short time. To get yourself buckled, we define three parameters:
    • hive.exec.max.dynamic.partitions.pernode (default value being 100) is the maximum dynamic partitions that can be created by each mapper or reducer. If one mapper or reducer created more than that the threshold, a fatal error will be raised from the mapper/reducer (through counter) and the whole job will be killed.
    • hive.exec.max.dynamic.partitions (default value being 1000) is the total number of dynamic partitions could be created by one DML. If each mapper/reducer did not exceed the limit but the total number of dynamic partitions does, then an exception is raised at the end of the job before the intermediate data are moved to the final destination.
    • hive.exec.max.created.files (default value being 100000) is the maximum total number of files created by all mappers and reducers. This is implemented by updating a Hadoop counter by each mapper/reducer whenever a new file is created. If the total number is exceeding hive.exec.max.created.files, a fatal error will be thrown and the job will be killed.
  • Another situation we want to protect against dynamic partition insert is that the user may accidentally specify all partitions to be dynamic partitions without specifying one static partition, while the original intention is to just overwrite the sub-partitions of one root partition. We define another parameter hive.exec.dynamic.partition.mode=strict to prevent the all-dynamic partition case. In the strict mode, you have to specify at least one static partition. The default mode is strict. In addition, we have a parameter hive.exec.dynamic.partition=true/false to control whether to allow dynamic partition at all. The default value is false.
  • In Hive 0.6, dynamic partition insert does not work with hive.merge.mapfiles=true or hive.merge.mapredfiles=true, so it internally turns off the merge parameters. Merging files in dynamic partition inserts are supported in Hive 0.7 (see JIRA HIVE-1307 for details).
Troubleshooting and best practices:
  • As stated above, there are too many dynamic partitions created by a particular mapper/reducer, a fatal error could be raised and the job will be killed. The error message looks something like:
        hive> set hive.exec.dynamic.partition.mode=nonstrict;
        hive> FROM page_view_stg pvs
              INSERT OVERWRITE TABLE page_view PARTITION(dt, country)
                     SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null, null, pvs.ip,
                            from_unixtimestamp(pvs.viewTime, 'yyyy-MM-dd') ds, pvs.country;
    ...
    2010-05-07 11:10:19,816 Stage-1 map = 0%,  reduce = 0%
    [Fatal Error] Operator FS_28 (id=41): fatal error. Killing the job.
    Ended Job = job_201005052204_28178 with errors
    ...
    
    The problem of this that one mapper will take a random set of rows and it is very likely that the number of distinct (dt, country) pairs will exceed the limit of hive.exec.max.dynamic.partitions.pernode. One way around it is to group the rows by the dynamic partition columns in the mapper and distribute them to the reducers where the dynamic partitions will be created. In this case the number of distinct dynamic partitions will be significantly reduced. The above example query could be rewritten to:
        hive> set hive.exec.dynamic.partition.mode=nonstrict;
        hive> FROM page_view_stg pvs
              INSERT OVERWRITE TABLE page_view PARTITION(dt, country)
                     SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null, null, pvs.ip,
                            from_unixtimestamp(pvs.viewTime, 'yyyy-MM-dd') ds, pvs.country
                     DISTRIBUTE BY ds, country;
    
    This query will generate a MapReduce job rather than Map-only job. The SELECT-clause will be converted to a plan to the mappers and the output will be distributed to the reducers based on the value of (ds, country) pairs. The INSERT-clause will be converted to the plan in the reducer which writes to the dynamic partitions.

    Inserting into local files

    In certain situations you would want to write the output into a local file so that you could load it into an excel spreadsheet. This can be accomplished with the following command:
    INSERT OVERWRITE LOCAL DIRECTORY '/tmp/pv_gender_sum'
    SELECT pv_gender_sum.*
    FROM pv_gender_sum;

Sampling

The sampling clause allows the users to write queries for samples of the data instead of the whole table. Currently the sampling is done on the columns that are specified in the CLUSTERED BY clause of the CREATE TABLE statement. In the following example we choose 3rd bucket out of the 32 buckets of the pv_gender_sum table:
    INSERT OVERWRITE TABLE pv_gender_sum_sample
    SELECT pv_gender_sum.*
    FROM pv_gender_sum TABLESAMPLE(BUCKET 3 OUT OF 32);
In general the TABLESAMPLE syntax looks like:
    TABLESAMPLE(BUCKET x OUT OF y)
y has to be a multiple or divisor of the number of buckets in that table as specified at the table creation time. The buckets chosen are determined if bucket_number module y is equal to x. So in the above example the following tablesample clause
      TABLESAMPLE(BUCKET 3 OUT OF 16)
would pick out the 3rd and 19th buckets. The buckets are numbered starting from 0.
On the other hand the tablesample clause
     TABLESAMPLE(BUCKET 3 OUT OF 64 ON userid)
would pick out half of the 3rd bucket.

Union all

The language also supports union all, e.g. if we suppose there are two different tables that track which user has published a video and which user has published a comment, the following query joins the results of a union all with the user table to create a single annotated stream for all the video publishing and comment publishing events:
    INSERT OVERWRITE TABLE actions_users
    SELECT u.id, actions.date
    FROM (
        SELECT av.uid AS uid
        FROM action_video av
        WHERE av.date = '2008-06-03'

        UNION ALL

        SELECT ac.uid AS uid
        FROM action_comment ac
        WHERE ac.date = '2008-06-03'
        ) actions JOIN users u ON(u.id = actions.uid);

Array Operations

Array columns in tables can only be created programmatically currently. We will be extending this soon to be available as part of the create table statement. For the purpose of the current example assume that pv.friends is of the type array<INT> i.e. it is an array of integers.The user can get a specific element in the array by its index as shown in the following command:
    SELECT pv.friends[2]
    FROM page_views pv;
The select expressions gets the third item in the pv.friends array.
The user can also get the length of the array using the size function as shown below:
   SELECT pv.userid, size(pv.friends)
   FROM page_view pv;

Map(Associative Arrays) Operations

Maps provide collections similar to associative arrays. Such structures can only be created programmatically currently. We will be extending this soon. For the purpose of the current example assume that pv.properties is of the type map<String, String> i.e. it is an associative array from strings to string. Accordingly, the following query:
    INSERT OVERWRITE page_views_map
    SELECT pv.userid, pv.properties['page type']
    FROM page_views pv;
can be used to select the 'page_type' property from the page_views table.
Similar to arrays, the size function can also be used to get the number of elements in a map as shown in the following query:
   SELECT size(pv.properties)
   FROM page_view pv;

Custom map/reduce scripts

Users can also plug in their own custom mappers and reducers in the data stream by using features natively supported in the Hive language. e.g. in order to run a custom mapper script - map_script - and a custom reducer script - reduce_script - the user can issue the following command which uses the TRANSFORM clause to embed the mapper and the reducer scripts.
Note that columns will be transformed to string and delimited by TAB before feeding to the user script, and the standard output of the user script will be treated as TAB-separated string columns. User scripts can output debug information to standard error which will be shown on the task detail page on hadoop.
   FROM (
        FROM pv_users
        MAP pv_users.userid, pv_users.date
        USING 'map_script'
        AS dt, uid
        CLUSTER BY dt) map_output

    INSERT OVERWRITE TABLE pv_users_reduced
        REDUCE map_output.dt, map_output.uid
        USING 'reduce_script'
        AS date, count;
Sample map script (weekday_mapper.py )
import sys
import datetime

for line in sys.stdin:
  line = line.strip()
  userid, unixtime = line.split('\t')
  weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
  print ','.join([userid, str(weekday)])
Of course, both MAP and REDUCE are "syntactic sugar" for the more general select transform. The inner query could also have been written as such:
    SELECT TRANSFORM(pv_users.userid, pv_users.date) USING 'map_script' AS dt, uid CLUSTER BY dt FROM pv_users;
Schema-less map/reduce: If there is no "AS" clause after "USING map_script", Hive assumes the output of the script contains 2 parts: key which is before the first tab, and value which is the rest after the first tab. Note that this is different from specifying "AS key, value" because in that case value will only contains the portion between the first tab and the second tab if there are multiple tabs.
In this way, we allow users to migrate old map/reduce scripts without knowing the schema of the map output. User still needs to know the reduce output schema because that has to match what is in the table that we are inserting to.
    FROM (
        FROM pv_users
        MAP pv_users.userid, pv_users.date
        USING 'map_script'
        CLUSTER BY key) map_output

    INSERT OVERWRITE TABLE pv_users_reduced

        REDUCE map_output.dt, map_output.uid
        USING 'reduce_script'
        AS date, count;
Distribute By and Sort By: Instead of specifying "cluster by", the user can specify "distribute by" and "sort by", so the partition columns and sort columns can be different. The usual case is that the partition columns are a prefix of sort columns, but that is not required.
    FROM (
        FROM pv_users
        MAP pv_users.userid, pv_users.date
        USING 'map_script'
        AS c1, c2, c3
        DISTRIBUTE BY c2
        SORT BY c2, c1) map_output

    INSERT OVERWRITE TABLE pv_users_reduced

        REDUCE map_output.c1, map_output.c2, map_output.c3
        USING 'reduce_script'
        AS date, count;

Co-Groups

Amongst the user community using map/reduce, cogroup is a fairly common operation wherein the data from multiple tables are sent to a custom reducer such that the rows are grouped by the values of certain columns on the tables. With the UNION ALL operator and the CLUSTER BY specification, this can be achieved in the Hive query language in the following way. Suppose we wanted to cogroup the rows from the actions_video and action_comments table on the uid column and send them to the 'reduce_script' custom reducer, the following syntax can be used by the user:
   FROM (
        FROM (
                FROM action_video av
                SELECT av.uid AS uid, av.id AS id, av.date AS date

               UNION ALL

                FROM action_comment ac
                SELECT ac.uid AS uid, ac.id AS id, ac.date AS date
        ) union_actions
        SELECT union_actions.uid, union_actions.id, union_actions.date
        CLUSTER BY union_actions.uid) map

    INSERT OVERWRITE TABLE actions_reduced
        SELECT TRANSFORM(map.uid, map.id, map.date) USING 'reduce_script' AS (uid, id, reduced_val);

Altering Tables

To rename existing table to a new name. If a table with new name already exists then an error is returned:
    ALTER TABLE old_table_name RENAME TO new_table_name;
To rename the columns of an existing table. Be sure to use the same column types, and to include an entry for each preexisting column:
    ALTER TABLE old_table_name REPLACE COLUMNS (col1 TYPE, ...);
To add columns to an existing table:
    ALTER TABLE tab1 ADD COLUMNS (c1 INT COMMENT 'a new int column', c2 STRING DEFAULT 'def val');
Note that a change in the schema (such as the adding of the columns), preserves the schema for the old partitions of the table in case it is a partitioned table. All the queries that access these columns and run over the old partitions implicitly return a null value or the specified default values for these columns.
In the later versions we can make the behavior of assuming certain values as opposed to throwing an error in case the column is not found in a particular partition configurable.

Dropping Tables and Partitions

Dropping tables is fairly trivial. A drop on the table would implicitly drop any indexes(this is a future feature) that would have been built on the table. The associated command is
    DROP TABLE pv_users;
To dropping a partition. Alter the table to drop the partition.
    ALTER TABLE pv_users DROP PARTITION (ds='2008-08-08')
  • Note that any data for this table or partitions will be dropped and may not be recoverable. *



1 comment:

  1. Thank you so much for this nice information. Hope so many people will get aware of this and useful as well. And please keep update like this.

    Big Data Services

    Data Lake Services

    Advanced Analytics Solutions

    Full Stack Development Services

    ReplyDelete

Thank you for Commenting Will reply soon ......

Featured Posts

#Linux Commands Unveiled: #date, #uname, #hostname, #hostid, #arch, #nproc

 #Linux Commands Unveiled: #date, #uname, #hostname, #hostid, #arch, #nproc Linux is an open-source operating system that is loved by millio...