Flume 插件开发--source

大数据

2017-08-15

1271

0

     Source的目的是从外部客户端接收数据并将其存储到配置的Channel中。 Source可以获得自己的ChannelProcessor的一个实例,以串行方式处理在Channel本地事务中提交的事件。在异常的情况下,所需的通道将传播异常,所有通道将回滚其事务,但先前在其他通道上处理的事件将继续执行。

     类似于SinkRunner.PollingRunner Runnable,有一个PollingRunner Runnable可以在Flume框架调用PollableSourceRunner.start()时创建的线程上执行。每个配置的PollableSource与运行PollingRunner的自己的线程相关联。该线程管理PollableSource的生命周期,如启动和停止。 PollableSource实现必须实现在LifecycleAware接口中声明的start()和stop()方法。 PollableSource的转轮调用了Source的process()方法。 process()方法应检查新数据,并将其作为Flume事件存储到Channel中。

     注意,实际上有两种类型的源。 PollableSource已经被提到了。另一个是EventDrivenSource。 EventDrivenSource与PollableSource不同,必须有自己的回调机制来捕获新数据并将其存储到Channel中。 EventDrivenSources并不是每个人都像PollableSources一样由他们自己的线程驱动。以下是一个自定义PollableSource的示例:

public class MySource extends AbstractSource implements Configurable, PollableSource {
  private String myProp;

  @Override
  public void configure(Context context) {
    String myProp = context.getString("myProp", "defaultValue");

    // Process the myProp value (e.g. validation, convert to another type, ...)

    // Store myProp for later retrieval by process() method
    this.myProp = myProp;
  }

  @Override
  public void start() {
    // Initialize the connection to the external client
  }

  @Override
  public void stop () {
    // Disconnect from external client and do any additional cleanup
    // (e.g. releasing resources or nulling-out field values) ..
  }

  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;

    try {
      // This try clause includes whatever Channel/Event operations you want to do

      // Receive new data
      Event e = getSomeData();

      // Store the Event into this Source's associated Channel(s)
      getChannelProcessor().processEvent(e);

      status = Status.READY;
    } catch (Throwable t) {
      // Log exception, handle individual exceptions as needed

      status = Status.BACKOFF;

      // re-throw all Errors
      if (t instanceof Error) {
        throw (Error)t;
      }
    } finally {
      txn.close();
    }
    return status;
  }
}

 

转载请注明出处: http://www.julyme.com/20170815/91.html

发表评论

全部评论:0条

Julyme

感觉还行吧。

Julyme的IT技术分享



/sitemap