Friday, November 11, 2011

Hadoop: Chukwa


Chukwa: Architecture and Design

Introduction

Log processing was one of the original purposes of MapReduce. Unfortunately, using Hadoop for MapReduce processing of logs is somewhat troublesome. Logs are generated incrementally across many machines, but Hadoop MapReduce works best on a small number of large files. And HDFS doesn't currently support appends, making it difficult to keep the distributed copy fresh.
Chukwa aims to provide a flexible and powerful platform for distributed data collection and rapid data processing. Our goal is to produce a system that's usable today, but that can be modified to take advantage of newer storage technologies (HDFS appends, HBase, etc) as they mature. In order to maintain this flexibility, Chukwa is structured as a pipeline of collection and processing stages, with clean and narrow interfaces between stages. This will facilitate future innovation without breaking existing code.
Chukwa has four primary components:
  1. Agents that run on each machine and emit data.
  2. Collectors that receive data from the agent and write it to stable storage.
  3. MapReduce jobs for parsing and archiving the data.
  4. HICC, the Hadoop Infrastructure Care Center; a web-portal style interface for displaying data.
Below is a figure showing the Chukwa data pipeline, annotated with data dwell times at each stage. A more detailed figure is available at the end of this document.
A picture of the chukwa data pipeline

Agents and Adaptors

Chukwa agents do not collect some particular fixed set of data. Rather, they support dynamically starting and stopping Adaptors, which small dynamically-controllable modules that run inside the Agent process and are responsible for the actual collection of data.
These dynamically controllable data sources are called adaptors, since they generally are wrapping some other data source, such as a file or a Unix command-line tool. The Chukwaagent guide includes an up-to-date list of available Adaptors.
Data sources need to be dynamically controllable because the particular data being collected from a machine changes over time, and varies from machine to machine. For example, as Hadoop tasks start and stop, different log files must be monitored. We might want to increase our collection rate if we detect anomalies. And of course, it makes no sense to collect Hadoop metrics on an NFS server.

Data Model

Chukwa Adaptors emit data in Chunks. A Chunk is a sequence of bytes, with some metadata. Several of these are set automatically by the Agent or Adaptors. Two of them require user intervention: cluster name and datatype. Cluster name is specified in conf/chukwa-env.sh, and is global to each Agent process. Datatype describes the expected format of the data collected by an Adaptor instance, and it is specified when that instance is started.
The following table lists the Chunk metadata fields.
FieldMeaningSource
SourceHostname where Chunk was generatedAutomatic
ClusterCluster host is associated withSpecified by user in agent config
DatatypeFormat of outputSpecified by user when Adaptor started
Sequence IDOffset of Chunk in streamAutomatic, initial offset specified when Adaptor started
NameName of data sourceAutomatic, chosen by Adaptor
Conceptually, each Adaptor emits a semi-infinite stream of bytes, numbered starting from zero. The sequence ID specifies how many bytes each Adaptor has sent, including the current chunk. So if an adaptor emits a chunk containing the first 100 bytes from a file, the sequenceID of that Chunk will be 100. And the second hundred bytes will have sequence ID 200. This may seem a little peculiar, but it's actually the same way that TCP sequence numbers work.
Adaptors need to take sequence ID as a parameter so that they can resume correctly after a crash, and not send redundant data. When starting adaptors, it's usually save to specify 0 as an ID, but it's sometimes useful to specify something else. For instance, it lets you do things like only tail the second half of a file.

Collectors

Rather than have each adaptor write directly to HDFS, data is sent across the network to a collector process, that does the HDFS writes. Each collector receives data from up to several hundred hosts, and writes all this data to a single sink file, which is a Hadoop sequence file of serialized Chunks. Periodically, collectors close their sink files, rename them to mark them available for processing, and resume writing a new file. Data is sent to collectors over HTTP.
Collectors thus drastically reduce the number of HDFS files generated by Chukwa, from one per machine or adaptor per unit time, to a handful per cluster. The decision to put collectors between data sources and the data store has other benefits. Collectors hide the details of the HDFS file system in use, such as its Hadoop version, from the adaptors. This is a significant aid to configuration. It is especially helpful when using Chukwa to monitor a development cluster running a different version of Hadoop or when using Chukwa to monitor a non-Hadoop cluster.
For more information on configuring collectors, see the Collector documentation.

MapReduce processing

Collectors write data in sequence files. This is convenient for rapidly getting data committed to stable storage. But it's less convenient for analysis or finding particular data items. As a result, Chukwa has a toolbox of MapReduce jobs for organizing and processing incoming data.
These jobs come in two kinds: Archiving and Demux. The archiving jobs simply take Chunks from their input, and output new sequence files of Chunks, ordered and grouped. They do no parsing or modification of the contents. (There are several different archiving jobs, that differ in precisely how they group the data.)
The Demux job, in contrast, take Chunks as input and parse them to produce ChukwaRecords, which are sets of key-value pairs.
For details on controlling this part of the pipeline, see the Administration guide. For details about the file formats, and how to use the collected data, see the Programming guide.

HICC

HICC, the Hadoop Infrastructure Care Center is a web-portal style interface for displaying data. Data is fetched from a MySQL database, which in turn is populated by a mapreduce job that runs on the collected data, after Demux. The Administration guide has details on setting up HICC.
And now, the full-size picture of Chukwa:
Chukwa Components

Chukwa Administration Guide

Purpose

Chukwa is a system for large-scale reliable log collection and processing with Hadoop. The Chukwa design overview discusses the overall architecture of Chukwa. You should read that document before this one. The purpose of this document is to help you install and configure Chukwa.

Pre-requisites

Chukwa should work on any POSIX platform, but GNU/Linux is the only production platform that has been tested extensively. Chukwa has also been used successfully on Mac OS X, which several members of the Chukwa team use for development.
The only absolute software requirements are Java 1.6 or better and Hadoop 0.18+. HICC, the Chukwa visualization interface, requires MySQL 5.1.30+.
The Chukwa cluster management scripts rely on ssh; these scripts, however, are not required if you have some alternate mechanism for starting and stopping daemons.

Installing Chukwa

A minimal Chukwa deployment has three components:
  • A Hadoop cluster on which Chukwa will store data (referred to as the Chukwa cluster).
  • A collector process, that writes collected data to HDFS, the Hadoop file system.
  • One or more agent processes, that send monitoring data to the collector. The nodes with active agent processes are referred to as the monitored source nodes.
In addition, you may wish to run the Chukwa Demux jobs, which parse collected data, or HICC, the Chukwa visualization tool.
Chukwa Components

First Steps

  1. Obtain a copy of Chukwa. You can find the latest release on the Chukwa release page.
  2. Un-tar the release, via tar xzf.
  3. Make sure a copy of Chukwa is available on each node being monitored, and on each node that will run a collector.
  4. We refer to the directory containing Chukwa as CHUKWA_HOME. It may be helpful to set CHUKWA_HOME explicitly in your environment, but Chukwa does not require that you do so.

General Configuration

Agents and collectors are configured differently, but part of the process is common to both.
  • Make sure that JAVA_HOME is set correctly and points to a Java 1.6 JRE. It's generally best to set this in conf/chukwa-env.sh.
  • In conf/chukwa-env.sh, set CHUKWA_LOG_DIR and CHUKWA_PID_DIR to the directories where Chukwa should store its console logs and pid files. The pid directory must not be shared between different Chukwa instances: it should be local, not NFS-mounted.
  • Optionally, set CHUKWA_IDENT_STRING. This string is used to name Chukwa's own console log files.

Agents

Agents are the Chukwa processes that actually produce data. This section describes how to configure and run them. More details are available in the Agent configuration guide.

Configuration

This section describes how to set up the agent process on the source nodes.
The one mandatory configuration step is to set up $CHUKWA_HOME/conf/collectors. This file should contain a list of hosts that will run Chukwa collectors. Agents will pick a random collector from this list to try sending to, and will fail-over to another listed collector on error. The file should look something like:
http://<collector1HostName>:<collector1Port>/
http://<collector2HostName>:<collector2Port>/
http://<collector3HostName>:<collector3Port>/
Edit the CHUKWA_HOME/conf/initial_adaptors configuration file. This is where you tell Chukwa what log files to monitor. See the adaptor configuration guide for a list of available adaptors.
There are a number of optional settings in $CHUKWA_HOME/conf/chukwa-agent-conf.xml:
  • The most important of these is the cluster/group name that identifies the monitored source nodes. This value is stored in each Chunk of collected data; you can therefore use it to distinguish data coming from different groups of machines.
    <property>
    <name>chukwaAgent.tags</name>
    <value>cluster="demo"</value>
    e for this agent</description> </property>
    <description>The cluster's na
    m
  • Another important option is chukwaAgent.checkpoint.dir. This is the directory Chukwa will use for its periodic checkpoints of running adaptors. It must not be a shared directory; use a local, not NFS-mount, directory.
  • Setting the option chukwaAgent.control.remote will disallow remote connections to the agent control socket.

Starting, stopping, and monitoring

To run an agent process on a single node, use bin/chukwa agent.
Typically, agents run as daemons. The script bin/start-agents.sh will ssh to each machine listed in conf/agents and start an agent, running in the background. The scriptbin/stop-agents.sh does the reverse.
You can, of course, use any other daemon-management system you like. For instance, tools/init.d includes init scripts for running Chukwa agents.
To check if an agent is working properly, you can telnet to the control port (9093 by default) and hit "enter". You will get a status message if the agent is running normally.

Configuring Hadoop for monitoring

One of the key goals for Chukwa is to collect logs from Hadoop clusters. This section describes how to configure Hadoop to send its logs to Chukwa. Note that these directions require Hadoop 0.20.0+. Earlier versions of Hadoop do not have the hooks that Chukwa requires in order to grab MapReduce job logs.
The Hadoop configuration files are located in HADOOP_HOME/conf. To setup Chukwa to collect logs from Hadoop, you need to change some of the Hadoop configuration files.
  1. Copy CHUKWA_HOME/conf/hadoop-log4j.properties file to HADOOP_HOME/conf/log4j.properties
  2. Copy CHUKWA_HOME/conf/hadoop-metrics.properties file to HADOOP_HOME/conf/hadoop-metrics.properties
  3. Edit HADOOP_HOME/conf/hadoop-metrics.properties file and change @CHUKWA_LOG_DIR@ to your actual CHUKWA log dirctory (ie, CHUKWA_HOME/var/log)

Collectors

This section describes how to set up the Chukwa collectors. For more details, see the collector configuration guide.

Configuration

First, edit $CHUKWA_HOME/conf/chukwa-env.sh In addition to the general directions given above, you should set HADOOP_HOME. This should be the Hadoop deployment Chukwa will use to store collected data. You will get a version mismatch error if this is configured incorrectly.
Next, edit $CHUKWA_HOME/conf/chukwa-collector-conf.xml. The one mandatory configuration parameter is writer.hdfs.filesystem. This should be set to the HDFS root URL on which Chukwa will store data. Various optional configuration options are described in the collector configuration guide and in the collector configuration file itself.

Starting, stopping, and monitoring

To run a collector process on a single node, use bin/chukwa collector.
Typically, collectors run as daemons. The script bin/start-collectors.sh will ssh to each collector listed in conf/collectors and start a collector, running in the background. The script bin/stop-collectors.sh does the reverse.
You can, of course, use any other daemon-management system you like. For instance, tools/init.d includes init scripts for running Chukwa collectors.
To check if a collector is working properly, you can simply access http://collectorhost:collectorport/chukwa?ping=true with a web browser. If the collector is running, you should see a status page with a handful of statistics.

Demux and HICC


Start the Chukwa Processes

The Chukwa startup scripts are located in the CHUKWA_HOME/tools/init.d directory.
  • Start the Chukwa data processors script (execute this command only on the data processor node):
CHUKWA_HOME/tools/init.d/chukwa-data-processors start 
  • Create down sampling daily cron job:
CHUKWA_HOME/bin/downSampling.sh --config <path to chukwa conf> -n add 

Set Up the Database

Set up and configure the MySQL database.

Install MySQL

Download MySQL 5.1 from the MySQL site.
tar fxvz mysql-*.tar.gz -C $CHUKWA_HOME/opt
cd $CHUKWA_HOME/opt/mysql-*
Configure and then copy the my.cnf file to the CHUKWA_HOME/opt/mysql-* directory:
./scripts/mysql_install_db
./bin/mysqld_safe&
root create <clustername> ./bin/mysql -u roo
./bin/mysqladmin -
ut <clustername> < $CHUKWA_HOME/conf/database_create_table
Edit the CHUKWA_HOME/conf/jdbc.conf configuration file.
Set the clustername to the MYSQL root URL:
<clustername>=jdbc:mysql://localhost:3306/<clustername>?user=root
Download the MySQL Connector/J 5.1 from the MySQL site, and place the jar file in $CHUKWA_HOME/lib.

Set Up MySQL for Replication

Start the MySQL shell:
mysql -u root -p
Enter password:
From the MySQL shell, enter these commands (replace <username> and <password> with actual values):
GRANT REPLICATION SLAVE ON *.* TO '<username>'@'%' IDENTIFIED BY '<password>';
FLUSH PRIVILEGES;

Set Up HICC

The Hadoop Infrastructure Care Center (HICC) is the Chukwa web user interface. To set up HICC, do the following:
  • Download apache-tomcat 6.0.18+ from Apache Tomcat and decompress the tarball to CHUKWA_HOME/opt.
  • Copy CHUKWA_HOME/hicc.war to apache-tomcat-6.0.18/webapps.
  • Start up HICC by running:
$CHUKWA_HOME/bin/hicc.sh start
  • Point your favorite browser to: http://<server>:8080/hicc

Troubleshooting Tips


UNIX Processes For Chukwa Agents

The Chukwa agent process name is identified by:
  • org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent
Command line to use to search for the process name:
  • ps ax | grep org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent

UNIX Processes For Chukwa Collectors

Chukwa Collector name is identified by:
  • org.apache.hadoop.chukwa.datacollection.collector.CollectorStub

UNIX Processes For Chukwa Data Processes

Chukwa Data Processors are identified by:
  • org.apache.hadoop.chukwa.extraction.demux.Demux
  • org.apache.hadoop.chukwa.extraction.database.DatabaseLoader
  • org.apache.hadoop.chukwa.extraction.archive.ChukwaArchiveBuilder
The processes are scheduled execution, therefore they are not always visible from the process list.

Checks for MySQL Replication

At slave server, MySQL prompt, run:
show slave status\G
Make sure both Slave_IO_Running and Slave_SQL_Running are both "Yes".
Things to check if MySQL replication fails:
  • Make sure grant permission has been enabled on master MySQL server.
  • Check disk space availability.
  • Check Error status in slave status.
To reset MySQL replication, run these commands on MySQL:
STOP SLAVE;
CHANGE MASTER TO
MASTER_HOST='hostname',
MASTER_USER='username',
d', MASTER_PORT=3306, MAS
MASTER_PASSWORD='passwo rTER_LOG_FILE='master2-bin.001', MASTER_LOG_POS=4,
MASTER_CONNECT_RETRY=10;
START SLAVE;

Checks For Disk Full

If anything is wrong, use /etc/init.d/chukwa-agent and CHUKWA_HOME/tools/init.d/chukwa-system-metrics stop to shutdown Chukwa. Look at agent.log and collector.log file to determine the problems.
The most common problem is the log files are growing unbounded. Set up a cron job to remove old log files:
 0 12 * * * CHUKWA_HOME/tools/expiration.sh 10 !CHUKWA_HOME/var/log nowait
This will set up the log file expiration for CHUKWA_HOME/var/log for log files older than 10 days.

Emergency Shutdown Procedure

If the system is not functioning properly and you cannot find an answer in the Administration Guide, execute the kill command. The current state of the java process will be written to the log files. You can analyze these files to determine the cause of the problem.
kill -3 <pid>

Agent Setup

Overview

In a normal Chukwa installation, an Agent process runs on every machine being monitored. This process is responsible for all the data collection on that host. Data collection might mean periodically running a Unix command, or tailing a file, or listening for incoming UDP packets.
Each particular data source corresponds to a so-called Adaptor. Adaptors are dynamically loadable modules that run inside the Agent process. There is generally one Adaptor for each data source: for each file being watched or for each Unix command being executed. Each adaptor has a unique name. If you do not specify a name, one will be auto-generated by hashing the Adaptor type and parameters.
There are a number of Adaptors built into Chukwa, and you can also develop your own. Chukwa will use them if you add them to the Chukwa library search path (e.g., by putting them in a jarfile in $CHUKWA_HOME/lib.)

Agent Control

Once an Agent process is running, there are a number of commands that you can use to inspect and control it. By default, Agents listen for incoming commands on port 9093. Commands are case-insensitive
CommandPurposeOptions
addStart an adaptor.See below
closeClose socket connection to agent.None
helpDisplay a list of available commandsNone
listList currently running adaptorsNone
reloadcollectorsRe-read list of collectorsNone
stopStop adaptor, abruptlyAdaptor name
stopallStop all adaptors, abruptlyAdaptor name
shutdownStop adaptor, gracefullyAdaptor name
stopagentStop agent processNone
The add command is by far the most complex; it takes several mandatory and optional parameters. The general form is as follows:
add [name =] <adaptor_class_name> <datatype> <adaptor
specific params> <initial offset>.
There are four mandatory fields: The word add, the class name for the Adaptor, the datatype of the Adaptor's output, and the sequence number for the first byte. There are two optional fields; the adaptor instance name, and the adaptor parameters.
The adaptor name, if specified, should go after the add command, and be followed with an equals sign. It should be a string of printable characters, without whitespace or '='. Chukwa Adaptor names all start with "adaptor_". If you specify an adaptor name which does not start with that prefix, it will be added automatically.
Adaptor parameters aren't required by the Chukwa agent, but each class of adaptor may itself specify both mandatory and optional parameters. See below.

Command-line options

Normally, agents are configured via the file conf/chukwa-agent-conf.xml. However, there are a few command-line options that are sometimes useful in troubleshooting. If you specify "local" as an option, then the agent will print chunks to standard out, rather than to a collector. If you specify a URI, then that will be used as collector, overriding the collectors specified in conf/collectors. These options are intended for testing and debugging, not for production use.
bin/chukwa agent local

Adaptors

This section lists the standard adaptors, and the arguments they take.
  • FileAdaptor: Pushes a whole file, as one Chunk, then exits. Takes one mandatory parameter; the file to push.
    add FileTailer FooData /tmp/foo 0
    This pushes file /tmp/foo as one chunk, with datatype FooData.
  • filetailer.LWFTAdaptor Repeatedly tails a file, treating the file as a sequence of bytes, ignoring the content. Chunk boundaries are arbitrary. This is useful for streaming binary data. Takes one mandatory parameter; a path to the file to tail. If log file is rotated while there is unread data, this adaptor will not attempt to recover it.
    add filetailer.LWFTAdaptor BarData /foo/bar 0
    This pushes /foo/bar in a sequence of Chunks of type BarData
  • filetailer.FileTailingAdaptor Repeatedly tails a file, again ignoring content and with unspecified Chunk boundaries. Takes one mandatory parameter; a path to the file to tail. Keeps a file handle open in order to detect log file rotation.
    add filetailer.FileTailingAdaptor BarData /foo/bar 0
    This pushes /foo/bar in a sequence of Chunks of type BarData
  • filetailer.RCheckFTAdaptor An experimental modification of the above, which avoids the need to keep a file handle open. Same parameters and usage as the above.
  • filetailer.CharFileTailingAdaptorUTF8 The same as the base FileTailingAdaptor, except that chunks are guaranteed to end only at carriage returns. This is useful for most ASCII log file formats.
  • filetailer.CharFileTailingAdaptorUTF8NewLineEscaped The same, except that chunks are guaranteed to end only at non-escaped carriage returns. This is useful for pushing Chukwa-formatted log files, where exception stack traces stay in a single chunk.
  • DirTailingAdaptor Takes a directory path and an adaptor name as mandatory parameters; repeatedly scans that directory and all subdirectories, and starts the indicated adaptor running on each file. Since the DirTailingAdaptor does not, itself, emit data, the datatype parameter is applied to the newly-spawned adaptors. Note that if you try this on a large directory with an adaptor that keeps file handles open, it is possible to exceed your system's limit on open files. A file pattern can be specified as an optional second parameter.
    add DirTailingAdaptor logs /var/log/ *.log filetailer.CharFileTailingAdaptorUTF8 0
  • ExecAdaptor Takes a frequency (in milliseconds) as optional parameter, and then program name as mandatory parameter. Runs that program repeatedly at a rate specified by frequency.
    add ExecAdaptor Df 60000 /bin/df -x nfs -x none 0
    This adaptor will run df every minute, labeling output as Df.
  • UDPAdaptor Takes a port number as mandatory parameter. Binds to the indicated UDP port, and emits one Chunk for each received packet.
    add UdpAdaptor Packets 1234 0
    This adaptor will listen for incoming traffic on port 1234, labeling output as Packets.
  • edu.berkeley.chukwa_xtrace.XtrAdaptor (available in contrib) Takes an Xtrace ReportSource class name [without package] as mandatory argument, and no optional parameters. Listens for incoming reports in the same way as that ReportSource would.
    add edu.berkeley.chukwa_xtrace.XtrAdaptor Xtrace UdpReportSource 0
    This adaptor will create and start a UdpReportSource, labeling its output datatype as Xtrace.

Chukwa Storage Layout

HDFS File System Structure

The general layout of the Chukwa filesystem is as follows.
/chukwa/
archivesProcessing/
dataSinkArchives/
finalArchives/
demuxProcessing/ logs/ postProcess/ repos/
rolling/
temp/

Raw Log Collection and Aggregation Workflow

What data is stored where is best described by stepping through the Chukwa workflow.
  1. Collectors write chunks to logs/*.chukwa files until a 64MB chunk size is reached or a given time interval has passed.
    • logs/*.chukwa
  2. Collectors close chunks and rename them to *.done
    • from logs/*.chukwa
    • to logs/*.done
  3. DemuxManager checks for *.done files every 20 seconds.
    1. If *.done files exist, moves files in place for demux processing:
      • from: logs/*.done
      • to: demuxProcessing/mrInput
    2. The Demux MapReduce job is run on the data in demuxProcessing/mrInput.
    3. If demux is successful within 3 attempts, archives the completed files:
      • from: demuxProcessing/mrOutput
      • to: dataSinkArchives/[yyyyMMdd]/*/*.done
    4. Otherwise moves the completed files to an error folder:
      • from: demuxProcessing/mrOutput
      • to: dataSinkArchives/InError/[yyyyMMdd]/*/*.done
  4. PostProcessManager wakes up every few minutes and aggregates, orders and de-dups record files.
    • from: postProcess/demuxOutputDir_*/[clusterName]/[dataType]/[dataType]_[yyyyMMdd]_[HH].R.evt
    • to: repos/[clusterName]/[dataType]/[yyyyMMdd]/[HH]/[mm]/[dataType]_[yyyyMMdd]_[HH]_[N].[N].evt
  5. HourlyChukwaRecordRolling runs M/R jobs at 16 past the hour to group 5 minute logs to hourly.
    • from: repos/[clusterName]/[dataType]/[yyyyMMdd]/[HH]/[mm]/[dataType]_[yyyyMMdd]_[mm].[N].evt
    • to: temp/hourlyRolling/[clusterName]/[dataType]/[yyyyMMdd]
    • to: repos/[clusterName]/[dataType]/[yyyyMMdd]/[HH]/[dataType]_HourlyDone_[yyyyMMdd]_[HH].[N].evt
    • leaves: repos/[clusterName]/[dataType]/[yyyyMMdd]/[HH]/rotateDone/
  6. DailyChukwaRecordRolling runs M/R jobs at 1:30AM to group hourly logs to daily.
    • from: repos/[clusterName]/[dataType]/[yyyyMMdd]/[HH]/[dataType]_[yyyyMMdd]_[HH].[N].evt
    • to: temp/dailyRolling/[clusterName]/[dataType]/[yyyyMMdd]
    • to: repos/[clusterName]/[dataType]/[yyyyMMdd]/[dataType]_DailyDone_[yyyyMMdd].[N].evt
    • leaves: repos/[clusterName]/[dataType]/[yyyyMMdd]/rotateDone/
  7. ChukwaArchiveManager every half hour or so aggregates and removes dataSinkArchives data using M/R.
    • from: dataSinkArchives/[yyyyMMdd]/*/*.done
    • to: archivesProcessing/mrInput
    • to: archivesProcessing/mrOutput
    • to: finalArchives/[yyyyMMdd]/*/chukwaArchive-part-*

Log Directories Requiring Cleanup

The following directories will grow over time and will need to be periodically pruned:
  • finalArchives/[yyyyMMdd]/*
  • repos/[clusterName]/[dataType]/[yyyyMMdd]/*.evt

Chukwa User and Programming Guide

At the core of Chukwa is a flexible system for collecting and processing monitoring data, particularly log files. This document describes how to use the collected data. (For an overview of the Chukwa data model and collection pipeline, see the Design Guide.)
In particular, this document discusses the Chukwa archive file formats, the demux and archiving mapreduce jobs, and the layout of the Chukwa storage directories.

Reading data from the sink or the archive

Chukwa gives you several ways of inspecting or processing collected data.

Dumping some data

It very often happens that you want to retrieve one or more files that have been collected with Chukwa. If the total volume of data to be recovered is not too great, you can usebin/chukwa dumpArchive, a command-line tool that does the job. The dump tool does an in-memory sort of the data, so you'll be constrained by the Java heap size (typically a few hundred MB).
The dump tool takes a search pattern as its first argument, followed by a list of files or file-globs. It will then print the contents of every data stream in those files that matches the pattern. (A data stream is a sequence of chunks with the same host, source, and datatype.) Data is printed in order, with duplicates removed. No metadata is printed. Separate streams are separated by a row of dashes.
For example, the following command will dump all data from every file that matches the glob pattern. Note the use of single quotes to pass glob patterns through to the application, preventing the shell from expanding them.
$CHUKWA_HOME/bin/chukwa dumpArchive 'datatype=.*' 'hdfs://host:9000/chukwa/archive/*.arc'
The patterns used by dump are based on normal regular expressions. They are of the form field1=regex&field2=regex. That is, they are a sequence of rules, separated by ampersand signs. Each rule is of the form metadatafield=regex, where metadatafield is one of the Chukwa metadata fields, and regex is a regular expression. The valid metadata field names are: datatypehostclustercontentname. Note that the name field matches the stream name -- often the filename that the data was extracted from.
In addition, you can match arbitrary tags via tags.tagname. So for instance, to match chunks with tag foo="bar" you could say tags.foo=bar. Note that quotes are present in the tag, but not in the filter rule.
A stream matches the search pattern only if every rule matches. So to retrieve HadoopLog data from cluster foo, you might search for cluster=foo&datatype=HadoopLog.

Exploring the Sink or Archive

Another common task is finding out what data has been collected. Chukwa offers a specialized tool for this purpose: DumpArchive. This tool has two modes: summarize and verbose, with the latter being the default.
In summarize mode, DumpArchive prints a count of chunks in each data stream. In verbose mode, the chunks themselves are dumped.
You can invoke the tool by running $CHUKWA_HOME/bin/dumpArchive.sh. To specify summarize mode, pass --summarize as the first argument.
bin/chukwa dumpArchive --summarize 'hdfs://host:9000/chukwa/logs/*.done'

Using MapReduce

A key goal of Chukwa was to facilitate MapReduce processing of collected data. The next section discusses the file formats. An understanding of MapReduce and SequenceFiles is helpful in understanding the material.

Sink File Format

As data is collected, Chukwa dumps it into sink files in HDFS. By default, these are located in hdfs:///chukwa/logs. If the file name ends in .chukwa, that means the file is still being written to. Every few minutes, the collector will close the file, and rename the file to '*.done'. This marks the file as available for processing.
Each sink file is a Hadoop sequence file, containing a succession of key-value pairs, and periodic synch markers to facilitate MapReduce access. They key type is ChukwaArchiveKey; the value type is ChunkImpl. See the Chukwa Javadoc for details about these classes.
Data in the sink may include duplicate and omitted chunks.

Demux and Archiving

It's possible to write MapReduce jobs that directly examine the data sink, but it's not extremely convenient. Data is not organized in a useful way, so jobs will likely discard most of their input. Data quality is imperfect, since duplicates and omissions may exist. And MapReduce and HDFS are optimized to deal with a modest number of large files, not many small ones.
Chukwa therefore supplies several MapReduce jobs for organizing collected data and putting it into a more useful form; these jobs are typically run regularly from cron. Knowing how to use Chukwa-collected data requires understanding how these jobs lay out storage. For now, this document only discusses one such job: the Simple Archiver.

Simple Archiver

The simple archiver is designed to consolidate a large number of data sink files into a small number of archive files, with the contents grouped in a useful way. Archive files, like raw sink files, are in Hadoop sequence file format. Unlike the data sink, however, duplicates have been removed. (Future versions of the Simple Archiver will indicate the presence of gaps.)
The simple archiver moves every .done file out of the sink, and then runs a MapReduce job to group the data. Output Chunks will be placed into files with names of the formhdfs:///chukwa/archive/clustername/Datatype_date.arc. Date corresponds to when the data was collected; Datatype is the datatype of each Chunk.
If archived data corresponds to an existing filename, a new file will be created with a disambiguating suffix.

Demux

A key use for Chukwa is processing arriving data, in parallel, using MapReduce. The most common way to do this is using the Chukwa demux framework. As data flows through Chukwa, the demux job is often the first job that runs.
By default, Chukwa will use the default TsProcessor. This parser will try to extract the real log statement from the log entry using the ISO8601 date format. If it fails, it will use the time at which the chunk was written to disk (collector timestamp).

Writing a custom demux Mapper

If you want to extract some specific information and perform more processing you need to write your own parser. Like any M/R program, your have to write at least the Map side for your parser. The reduce side is Identity by default.
On the Map side,you can write your own parser from scratch or extend the AbstractProcessor class that hides all the low level action on the chunk. Seeorg.apache.hadoop.chukwa.extraction.demux.processor.mapper.Df for an example of a Map class for use with Demux.
For Chukwa to invoke your Mapper code, you have to specify which data types it should run on. Edit ${CHUKWA_HOME}/conf/chukwa-demux-conf.xml and add the following lines:
<property>
<name>MyDataType</name>
<value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MyParser</value>
<description>Parser class for MyDataType.</description>
</property>
You can use the same parser for several different recordTypes.

Writing a custom reduce

You only need to implement a reduce side if you need to group records together. The interface that your need to implement is ReduceProcessor:
public interface ReduceProcessor
{
public String getDataType();
public void process(ChukwaRecordKey key,Iterator<ChukwaRecord> values,
OutputCollector<ChukwaRecordKey,
orter); }
ChukwaRecord> output, Reporter re
p
The link between the Map side and the reduce is done by setting your reduce class into the reduce type: key.setReduceType("MyReduceClass");. Note that in the current version of Chukwa, your class needs to be in the package org.apache.hadoop.chukwa.extraction.demux.processor Seeorg.apache.hadoop.chukwa.extraction.demux.processor.reducer.SystemMetrics for an example of a Demux reducer.

Output

Your data is going to be sorted by RecordType then by the key field. The default implementation use the following grouping for all records:
  1. Time partition (Time up to the hour)
  2. Machine name (physical input source)
  3. Record timestamp
The demux process will use the recordType to save similar records together (same recordType) to the same directory: >cluster name>/<record type>/

No comments:

Post a Comment

Thank you for Commenting Will reply soon ......

Featured Posts

No /mnt/hgfs in Ubuntu guest under VMWare Fusion

Title: VMware Shared Folder Not Appearing in Ubuntu Guest OS Description: I'm having trouble accessing a shared folder from my Window...