1. Source的简单介绍
Source是负责接收数据到Flume Agent的组件
。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory
、netcat、sequence generator、syslog、http、legacy。官方提供的source类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些source。
configure(Context context)//初始化context(读取配置文件内容)
- 1
- 2
- 3
- 1
- 2
2. 需求/分析
3. 编码
3.1 导入pom依赖
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.7.0</version> </dependency> </dependencies>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
3.2 编写代码
package com.buwenbuhuo;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import java.util.HashMap;
import java.util.Map;
* @author 卜温不火
* @create 2020-05-04 16:00
* com.buwenbuhuo - the name of the target package where the new class or interface will be created.
* flumeplugins0504 - the name of the current project.
public class MySource extends AbstractSource implements Configurable, PollableSource { //定义需要从配置中读取的字段 //两条数据之间的间隔 private long delay; private String field; public Status process() throws EventDeliveryException { try { Map<String, String> header = new HashMap<>(); SimpleEvent event = new SimpleEvent(); //拿到数据 for (int i = 0; i < 5; i++) { event.setHeaders(header); event.setBody((field + i).getBytes()); getChannelProcessor().processEvent(event); Thread.sleep(delay); } } catch (Exception e) { return Status.BACKOFF; } return Status.READY; } public long getBackOffSleepIncrement() { return 0; } public long getMaxBackOffSleepInterval() { return 0; } public void configure(Context context) { delay = context.getLong("delay", 2000l); field = context.getString("field", "buwenbuhuo"); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
4. 测试
- 1 打包上传
- 2. 配置文件
[bigdata@hadoop002 job]$ cp flume-netcat-logger.conf flume-mysource-logger.conf
[bigdata@hadoop002 job]$ vim flume-mysource-logger.conf # Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.buwenbuhuo.MySource
a1.sources.r1.delay = 2000
a1.sources.r1.field = buwenbuhuo
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 3. 开启任务
[bigdata@hadoop002 flume]$ [bigdata@hadoop002 flume]$ bin/flume-ng agent -c conf/ -f job/flume-mysource-logger.conf -n a1 -Dflume.root.logger=INFO,console
- 1
- 2
- 4. 结果展示
看 完 就 赞 , 养 成 习 惯 ! ! ! \color{#FF0000}{看完就赞,养成习惯!!!} 看完就赞,养成习惯!!!^ _ ^ ❤️ ❤️ ❤️
文章来源: buwenbuhuo.blog.csdn.net,作者:不温卜火,版权归原作者所有,如需转载,请联系作者。