Source的目的是从外部客户端接收数据并将其存储到配置的Channel中。 Source可以获得自己的ChannelProcessor的一个实例,以串行方式处理在Channel本地事务中提交的事件。在异常的情况下,所需的通道将传播异常,所有通道将回滚其事务,但先前在其他通道上处理的事件将继续执行。
类似于SinkRunner.PollingRunner Runnable,有一个PollingRunner Runnable可以在Flume框架调用PollableSourceRunner.start()时创建的线程上执行。每个配置的PollableSource与运行PollingRunner的自己的线程相关联。该线程管理PollableSource的生命周期,如启动和停止。 PollableSource实现必须实现在LifecycleAware接口中声明的start()和stop()方法。 PollableSource的转轮调用了Source的process()方法。 process()方法应检查新数据,并将其作为Flume事件存储到Channel中。
注意,实际上有两种类型的源。 PollableSource已经被提到了。另一个是EventDrivenSource。 EventDrivenSource与PollableSource不同,必须有自己的回调机制来捕获新数据并将其存储到Channel中。 EventDrivenSources并不是每个人都像PollableSources一样由他们自己的线程驱动。以下是一个自定义PollableSource的示例:
public class MySource extends AbstractSource implements Configurable, PollableSource {
private String myProp;
@Override
public void configure(Context context) {
String myProp = context.getString("myProp", "defaultValue");
// Process the myProp value (e.g. validation, convert to another type, ...)
// Store myProp for later retrieval by process() method
this.myProp = myProp;
}
@Override
public void start() {
// Initialize the connection to the external client
}
@Override
public void stop () {
// Disconnect from external client and do any additional cleanup
// (e.g. releasing resources or nulling-out field values) ..
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
try {
// This try clause includes whatever Channel/Event operations you want to do
// Receive new data
Event e = getSomeData();
// Store the Event into this Source's associated Channel(s)
getChannelProcessor().processEvent(e);
status = Status.READY;
} catch (Throwable t) {
// Log exception, handle individual exceptions as needed
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}
return status;
}
}
转载请注明出处: http://www.julyme.com/20170815/91.html
打赏一个呗~~(微信)
Julyme
感觉还行吧。
Julyme的IT技术分享