flume自带很长多的source,如:exe、kafka...其中有一个非常简单的source——httpsource,使用httpSource,flume启动后会拉起一个web服务来监听指定的ip和port。常用的使用场景:对于有些应用环境中,不能部署Flume SDK及其依赖项,可以在代码中通过HTTP而不是Flume的PRC发送数据的情况,此时HTTP SOURCE可以用来将数据接收到Flume中。

1、httpsource 参数:

注意:要求必须在 --conf 参数指定的目录下有 log4j的配置文件,可以通过-Dflume.root.logger=INFO,console在命令启动时手动指定log4j参数。

3、简单的httpSource实例:

1)下载flume、解压:

[html] view plain copy
  • cd /usr/local/
  • wget http://mirror.bit.edu.cn/apache/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
  • tar -xvzf apache-flume-1.7.9-bin.tar.gz
  • 配置flume的环境变量:

    [html] view plain copy
  • vim /etc/profile
  • export PS1="[\u@`/sbin/ifconfig eth0|grep 'inet '|awk -F'[: ]+' '{print $4}'` \W]" '$ '
  • export FLUME_HOME=/usr/local/apache-flume-1.6.0-bin
  • export PATH=$PATH:$FLUME_HOME/bin

  • 2)安装jdk、配置环境变量;

    3)配置flume:

    [html] view plain copy
  • log4j.appender.stdout = org.apache.log4j.ConsoleAppender
  • log4j.appender.stdout.Target = System.out
  • log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
  • log4j.appender.stdout.layout.ConversionPattern =  [%d{MM-dd HH:mm:ss}] [%p] [%c:%L] %m%n
  • ### 输出到日志文件 ###
  • log4j.appender.D = org.apache.log4j.DailyRollingFileAppender
  • log4j.appender.D.File = /data/logs/flume/flume.log
  • log4j.appender.D.Append = true
  • log4j.appender.D.Threshold = info
  • log4j.appender.D.layout = org.apache.log4j.PatternLayout
  • log4j.appender.D.layout.ConversionPattern = [%d{MM-dd HH:mm:ss}] [%p] [%c:%L] %m%n
  • ### 保存异常信息到单独文件 ###
  • log4j.appender.E = org.apache.log4j.DailyRollingFileAppender
  • log4j.appender.E.File =/data/logs/flume/flume_error.log
  • log4j.appender.E.Append = true
  • log4j.appender.E.Threshold = ERROR
  • log4j.appender.E.layout = org.apache.log4j.PatternLayout
  • log4j.appender.E.layout.ConversionPattern = [%d{MM-dd HH:mm:ss}] [%p] [%c:%L] %m%n
  • ### sink
  • log4j.logger.com.iqiyi.ttbrain.log.flume.sink.MysqlSink= INFO, F, EE
  • log4j.additivity.com.iqiyi.ttbrain.log.flume.sink.MysqlSink = false
  • log4j.appender.F= org.apache.log4j.DailyRollingFileAppender
  • log4j.appender.F.File=/data/logs/flume/flume_sink.log
  • log4j.appender.F.Append = true
  • log4j.appender.F.Threshold = info
  • log4j.appender.F.layout= org.apache.log4j.PatternLayout
  • log4j.appender.F.layout.ConversionPattern= [%d{MM-dd HH:mm:ss}] [%p] [%c:%L] %m%n
  • log4j.appender.EE= org.apache.log4j.DailyRollingFileAppender
  • log4j.appender.EE.File=/data/logs/flume/flume_sink_error.log
  • log4j.appender.EE.Append = true
  • log4j.appender.EE.Threshold = ERROR
  • log4j.appender.EE.layout= org.apache.log4j.PatternLayout
  • log4j.appender.EE.layout.ConversionPattern= [%d{MM-dd HH:mm:ss}] [%p] [%c:%L] %m%n
  • 4)配置httpSource:

    [html] view plain copy
  • curl -X POST -d'[{"headers":{"h1":"v1","h2":"v2"},"body":"hello body"}]'  http://localhost:50000
  • 在/data/log/flume/flume.log 文件中可以看到:

    [html] view plain copy
  • < project xmlns= "http://maven.apache.org/POM/4.0.0" xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance"
  • xsi:schemaLocation= "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd" >
  • < modelVersion >4.0.0 </ modelVersion >
  • < groupId >org.pq </ groupId >
  • < artifactId >flume-demo </ artifactId >
  • < packaging >jar </ packaging >
  • < version >1.0 </ version >
  • < name >flume-demo Maven jar </ name >
  • < url >http://maven.apache.org </ url >
  • < dependencies >
  • < dependency >
  • < groupId >junit </ groupId >
  • < artifactId >junit </ artifactId >
  • < version >4.8.2 </ version >
  • < scope >test </ scope >
  • </ dependency >
  • < dependency >
  • < groupId >org.slf4j </ groupId >
  • < artifactId >slf4j-log4j12 </ artifactId >
  • < version >1.7.7 </ version >
  • < scope >compile </ scope >
  • </ dependency >
  • < dependency >
  • < groupId >org.apache.flume </ groupId >
  • < artifactId >flume-ng-core </ artifactId >
  • < version >1.6.0 </ version >
  • < scope >compile </ scope >
  • </ dependency >
  • </ dependencies >
  • < build >
  • < finalName >flume-demo </ finalName >
  • </ build >
  • </ project >
  • 2)自定义handler:

    [html] view plain copy
  • package org.pq.flumeDemo.sources;
  • import com.google.common.base.Preconditions;
  • import org.apache.flume.Context;
  • import org.apache.flume.Event;
  • import org.apache.flume.event.EventBuilder;
  • import org.apache.flume.source.http.HTTPBadRequestException;
  • import org.apache.flume.source.http.HTTPSourceHandler;
  • import org.slf4j.Logger;
  • import org.slf4j.LoggerFactory;
  • import org.w3c.dom.Document;
  • import org.w3c.dom.Element;
  • import org.w3c.dom.Node;
  • import org.w3c.dom.NodeList;
  • import org.xml.sax.SAXException;
  • import javax.servlet.http.HttpServletRequest;
  • import javax.xml.parsers.DocumentBuilder;
  • import javax.xml.parsers.DocumentBuilderFactory;
  • import java.util.ArrayList;
  • import java.util.HashMap;
  • import java.util.List;
  • import java.util.Map;
  • public class HTTPSourceXMLHandler implements HTTPSourceHandler {
  • private final String ROOT = "events";
  • private final String EVENT_TAG = "event";
  • private final String HEADERS_TAG = "headers";
  • private final String BODY_TAG = "body";
  • private final String CONF_INSERT_TIMESTAMP = "insertTimestamp";
  • private final String TIMESTAMP_HEADER = "timestamp";
  • private final DocumentBuilderFactory documentBuilderFactory
  • = DocumentBuilderFactory.newInstance();
  • // Document builders are not thread-safe.
  • // So make sure we have one for each thread.
  • private final ThreadLocal < DocumentBuilder > docBuilder
  • = new ThreadLocal < DocumentBuilder >();
  • private boolean insertTimestamp;
  • private static final Logger LOG = LoggerFactory.getLogger(HTTPSourceXMLHandler.class);
  • public List < Event > getEvents(HttpServletRequest httpServletRequest) throws HTTPBadRequestException, Exception {
  • if (docBuilder.get() == null) {
  • docBuilder.set(documentBuilderFactory.newDocumentBuilder());
  • Document doc;
  • final List < Event > events;
  • try {
  • doc = docBuilder.get().parse(httpServletRequest.getInputStream());
  • Element root = doc.getDocumentElement();
  • root.normalize();
  • // Verify that the root element is "events"
  • Preconditions.checkState(
  • ROOT.equalsIgnoreCase(root.getTagName()));
  • NodeList nodes = root.getElementsByTagName(EVENT_TAG);
  • LOG.info("get nodes={}",nodes);
  • int eventCount = nodes.getLength();
  • events = new ArrayList < Event >(eventCount);
  • for (int i = 0; i < eventCount; i++) {
  • Element event = (Element) nodes.item(i);
  • // Get all headers. If there are multiple header sections,
  • // combine them.
  • NodeList headerNodes
  • = event.getElementsByTagName(HEADERS_TAG);
  • Map < String, String > eventHeaders
  • = new HashMap < String, String >();
  • for (int j = 0; j < headerNodes.getLength(); j++) {
  • Node headerNode = headerNodes.item(j);
  • NodeList headers = headerNode.getChildNodes();
  • for (int k = 0; k < headers.getLength(); k++) {
  • Node header = headers.item(k);
  • // Read only element nodes
  • if (header.getNodeType() != Node.ELEMENT_NODE) {
  • continue;
  • // Make sure a header is inserted only once,
  • // else the event is malformed
  • Preconditions.checkState(
  • !eventHeaders.containsKey(header.getNodeName()),
  • "Header expected only once " + header.getNodeName());
  • eventHeaders.put(
  • header.getNodeName(), header.getTextContent());
  • Node body = event.getElementsByTagName(BODY_TAG).item(0);
  • if (insertTimestamp) {
  • eventHeaders.put(TIMESTAMP_HEADER, String.valueOf(System
  • .currentTimeMillis()));
  • events.add(EventBuilder.withBody(
  • body.getTextContent().getBytes(
  • httpServletRequest.getCharacterEncoding()),
  • eventHeaders));
  • } catch (SAXException ex) {
  • throw new HTTPBadRequestException(
  • "Request could not be parsed into valid XML", ex);
  • } catch (Exception ex) {
  • throw new HTTPBadRequestException(
  • "Request is not in expected format. " +
  • "Please refer documentation for expected format.", ex);
  • return events;
  • public void configure(Context context) {
  • insertTimestamp = context.getBoolean(CONF_INSERT_TIMESTAMP,
  • false);
  • 打包成dependency,然后放到flume的lib下。

    3)flume配置文件:

    [html] view plain copy
  • a1.sources.r1.port= 50000
  • a1.sources.r1.channels= c1
  • a1.sources.r1.handler= org.pq.flumeDemo.sources.HTTPSourceXMLHandler
  • a1.sources.r1.insertTimestamp= true
  • a1.sinks.k1.type= logger
  • a1.sinks.k1.channel= c1
  • a1.channels.c1.type= memory
  • a1.channels.c1.capacity= 1000
  • a1.channels.c1.transactionCapacity= 100
  • 4)启动: