flume ng进击之路 (三) —— 自定义source API开发

xiaoxiao2021-02-27  313

概述

关于flume ng的简单介绍,可以参考flume ng进击之路 (一)—— 入门,同时flume ng也提供了各种各样的source和sink接口供我们在生成环境中使用,但是在生产环境中,我们常常需要定制的source或者sink来满足我们的要求。

好在flume ng提供了开放接口,我们可以根据这些接口,实现自己定制的source或者sink。下面我们来看一下如何实现自定义source框架。

实现

maven依赖

首先,要根据flume ng提供的接口来实现自定义source,需要我们依赖flume ng的配置,我们引入两个配置flume-ng-core和flume-ng-configuration,具体的maven配置如下:

<dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.6.0</version> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-configuration</artifactId> <version>1.6.0</version> </dependency>

source都包含哪些东西

在正式开始之前,我们先看看官网上一个source是如何使用的。我们在使用source的时候,只需要简单的配置一个文件,比如我们看看官网的Spooling Directory Source是如何监控一个文件夹的文件变化并且抓取的。 简单的配置如下:

a1.channels = ch-1 a1.sources = src-1 a1.sources.src-1.type = spooldir a1.sources.src-1.channels = ch-1 a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool a1.sources.src-1.fileHeader = true

这是配置一个Spooling Directory Source的最简单的配置,没有填写的配置会使用默认值,参考官网。 从上面可以看到,如果要实现自定义的source,我们要处理相应的自定义配置。 另外,我们知道flume是将消息(采集以及传输)封装成Event来实现通用source,channel和sink的组合,因此,要实现自定义source,必须要处理Event的来源。

自定义source

有了上面的简单了解,我们可以开始我们的自定义开发工作了。从上面可以知道,我们需要处理两个关键问题:配置和Event处理。当然是通过实现flume提供的接口来处理。

我们来实现一个自动间隔发送test文本信息的source。 代码组织结构如下: 具体代码如下: MySourceEventReader.java代码如下:

package flume.plugin; import com.google.common.collect.Lists; import org.apache.flume.Event; import org.apache.flume.client.avro.ReliableEventReader; import org.apache.flume.event.EventBuilder; import java.io.IOException; import java.nio.charset.Charset; import java.util.List; public class MySourceEventReader implements ReliableEventReader { private final Charset outputCharset = Charset.forName("UTF-8"); // 标识是否source的event信息是否已提交到channel private boolean committed = true; // true,已提交,false,有信息待提交 @Override public void close() throws IOException { /** * 执行关闭相关资源操作 */ } @Override public void commit() throws IOException { /** * 被调用,标识是否提交成功 */ if (!committed) { committed = true; } } @Override public Event readEvent() { /** * 发送单独的一个event,内容为test */ return EventBuilder.withBody("test", outputCharset); } @Override public List<Event> readEvents(int numEvents) { /** * 发送多个Event列表 */ List<Event> retList = Lists.newLinkedList(); for (int i = 0; i < numEvents; ++i) { retList.add(EventBuilder.withBody("test", outputCharset)); } if (retList.size() != 0) { // only non-empty events need to commit committed = false; } return retList; } }

MyFlumeSource.java代码如下:

package flume.plugin; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDrivenSource; import org.apache.flume.conf.Configurable; import org.apache.flume.instrumentation.SourceCounter; import org.apache.flume.source.AbstractSource; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class MyFlumeSource extends AbstractSource implements Configurable, EventDrivenSource { // process info private SourceCounter sourceCounter; private MySourceEventReader reader; private ScheduledExecutorService executor; private int intervalMillis; ; @Override public synchronized void configure(Context context) { //根据Context读取配置,Context会自动加载flume启动时指定的配置 // 读配置文件间隔时间,默认值100ms intervalMillis = context.getInteger("intervalMillis", 100); } @Override public synchronized void start() { // 初始化 if (sourceCounter == null) { sourceCounter = new SourceCounter(getName()); } executor = Executors.newSingleThreadScheduledExecutor(); reader = new MySourceEventReader(); // 每个2s执行一次 Runnable runner = new MyReaderRunnable(reader, sourceCounter); executor.scheduleWithFixedDelay(runner, 0, 2, TimeUnit.MILLISECONDS); super.start(); sourceCounter.start(); } @Override public synchronized void stop() { executor.shutdown(); try { executor.awaitTermination(10L, TimeUnit.SECONDS); } catch (InterruptedException ex) { ex.printStackTrace(); } executor.shutdownNow(); super.stop(); sourceCounter.stop(); } private class MyReaderRunnable implements Runnable { private MySourceEventReader reader; private SourceCounter sourceCounter; public MyReaderRunnable(MySourceEventReader reader, SourceCounter sourceCounter) { this.reader = reader; this.sourceCounter = sourceCounter; } @Override public void run() { while (!Thread.interrupted()) { // 读事件 List<Event> events = reader.readEvents(5); // 提交 sourceCounter.addToEventReceivedCount(events.size()); sourceCounter.incrementAppendBatchReceivedCount(); // sleep intervalMillis try { Thread.sleep(intervalMillis); } catch (Exception e) { e.printStackTrace(); } } } } }

上面就是一个最简单的source的实现了,实现每个一段时间自动发送“test”信息,对应的flume配置和启动命令如下: flume-conf.properties配置如下:

producer.sources = s #test producer.sources.s.type = flume.plugin.MyFlumeSource // 注意这个配置,这个配置是上述代码中唯一指定的配置,我们还可以通过这种方式提供更多的配置,都会通过Context来自动读取 producer.sources.s.intervalMillis=50 producer.sources.s.channels = c

启动flume的命令如下: flume-ng agent -n producer –conf conf -f conf/flume-conf.properties

至此,我们完成了flume自定义source的开发,可以根据想要的开发更多的自定义source。

转载请注明原文地址: https://www.6miu.com/read-2990.html

最新回复(0)