• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

Java DefaultSinkProcessor类的典型用法和代码示例

java 2次浏览

本文整理汇总了Java中org.apache.flume.sink.DefaultSinkProcessor的典型用法代码示例。如果您正苦于以下问题:Java DefaultSinkProcessor类的具体用法?Java DefaultSinkProcessor怎么用?Java DefaultSinkProcessor使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。

DefaultSinkProcessor类属于org.apache.flume.sink包,在下文中一共展示了DefaultSinkProcessor类的5个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: testBasic

点赞 2

import org.apache.flume.sink.DefaultSinkProcessor; //导入依赖的package包/类
@Test
public void testBasic() throws Exception {
  String consumerKey = System.getProperty("twitter.consumerKey");
  Assume.assumeNotNull(consumerKey);

  String consumerSecret = System.getProperty("twitter.consumerSecret");
  Assume.assumeNotNull(consumerSecret);

  String accessToken = System.getProperty("twitter.accessToken");
  Assume.assumeNotNull(accessToken);

  String accessTokenSecret = System.getProperty("twitter.accessTokenSecret");
  Assume.assumeNotNull(accessTokenSecret);

  Context context = new Context();
  context.put("consumerKey", consumerKey);
  context.put("consumerSecret", consumerSecret);
  context.put("accessToken", accessToken);
  context.put("accessTokenSecret", accessTokenSecret);
  context.put("maxBatchDurationMillis", "1000");

  TwitterSource source = new TwitterSource();
  source.configure(context);

  Map<String, String> channelContext = new HashMap();
  channelContext.put("capacity", "1000000");
  channelContext.put("keep-alive", "0"); // for faster tests
  Channel channel = new MemoryChannel();
  Configurables.configure(channel, new Context(channelContext));

  Sink sink = new LoggerSink();
  sink.setChannel(channel);
  sink.start();
  DefaultSinkProcessor proc = new DefaultSinkProcessor();
  proc.setSinks(Collections.singletonList(sink));
  SinkRunner sinkRunner = new SinkRunner(proc);
  sinkRunner.start();

  ChannelSelector rcs = new ReplicatingChannelSelector();
  rcs.setChannels(Collections.singletonList(channel));
  ChannelProcessor chp = new ChannelProcessor(rcs);
  source.setChannelProcessor(chp);
  source.start();

  Thread.sleep(5000);
  source.stop();
  sinkRunner.stop();
  sink.stop();
}
 

开发者ID:moueimei,
项目名称:flume-release-1.7.0,
代码行数:50,
代码来源:TestTwitterSource.java

示例2: runTestAndCheckResult

点赞 2

import org.apache.flume.sink.DefaultSinkProcessor; //导入依赖的package包/类
private void runTestAndCheckResult(Context context) throws IOException {
  Configurables.configure(source, context);
  Configurables.configure(channel, context);

  ChannelSelector cs = new KaaLoadChannelSelector();
  cs.setChannels(Lists.newArrayList(channel));

  Configurables.configure(cs, context);

  source.setChannelProcessor(new ChannelProcessor(cs));

  Configurables.configure(sink, context);
  sink.setChannel(channel);

  sinkRunner = new SinkRunner();
  SinkProcessor policy = new DefaultSinkProcessor();
  List<Sink> sinks = new ArrayList<Sink>();
  sinks.add(sink);
  policy.setSinks(sinks);
  sinkRunner.setSink(policy);

  sinkRunner.start();
  source.start();

  RecordHeader header = new RecordHeader();
  header.setApplicationToken(applicationToken);
  header.setEndpointKeyHash(new String(endpointKeyHash));
  header.setHeaderVersion(1);
  header.setTimestamp(System.currentTimeMillis());

  List<TestLogData> testLogs = generateAndSendRecords(header);

  LOG.info("Sent records count: " + testLogs.size());
  LOG.info("Waiting for sink...");

  int maxWaitTime = 5000;
  int elapsed = 0;

  while (sink.getEventDrainSuccessCount() < testLogs.size() && elapsed < maxWaitTime) {
    try {
      Thread.sleep(1000);
      elapsed += 1000;
    } catch (InterruptedException e) {
    }
  }

  Assert.assertTrue(sink.getEventDrainSuccessCount() == testLogs.size());

  source.stop();
  sinkRunner.stop();

  readAndCheckResultsFromHdfs(header, testLogs);
}
 

开发者ID:kaaproject,
项目名称:kaa,
代码行数:54,
代码来源:TestKaaHdfsSink.java

示例3: testBasic

点赞 2

import org.apache.flume.sink.DefaultSinkProcessor; //导入依赖的package包/类
@Test
public void testBasic() throws Exception {
    System.out.println(getTestTraceHead("[TwitterSourceTest.basic]")
            + "-------- Start source.");
    Context context = new Context();
    context.put("consumerKey", consumerKey);
    context.put("consumerSecret", consumerSecret);
    context.put("accessToken", accessToken);
    context.put("accessTokenSecret", accessTokenSecret);
    context.put("maxBatchDurationMillis", "1000");

    TwitterSource source = new TwitterSource();
    source.configure(context);

    Map<String, String> channelContext = new HashMap();
    channelContext.put("capacity", "1000000");
    channelContext.put("keep-alive", "0"); // for faster tests
    Channel channel = new MemoryChannel();
    Configurables.configure(channel, new Context(channelContext));

    Sink sink = new LoggerSink();
    sink.setChannel(channel);
    sink.start();
    DefaultSinkProcessor proc = new DefaultSinkProcessor();
    proc.setSinks(Collections.singletonList(sink));
    SinkRunner sinkRunner = new SinkRunner(proc);
    sinkRunner.start();

    ChannelSelector rcs = new ReplicatingChannelSelector();
    rcs.setChannels(Collections.singletonList(channel));
    ChannelProcessor chp = new ChannelProcessor(rcs);
    source.setChannelProcessor(chp);

    try {
        source.start();

        Thread.sleep(500);
        source.stop();
        System.out.println(getTestTraceHead("[TwitterSourceTest.basic]")
                + "-  OK  - Twitter source started properly.");
    } catch (AssertionError e) {
        System.out.println(getTestTraceHead("[TwitterSourceTest.basic]")
                + "- FAIL - Twitter source could not start.");
        throw e;
    } // try catch
    sinkRunner.stop();
    sink.stop();
}
 

开发者ID:telefonicaid,
项目名称:fiware-cygnus,
代码行数:49,
代码来源:TwitterSourceTest.java

示例4: loadSinkGroups

点赞 2

import org.apache.flume.sink.DefaultSinkProcessor; //导入依赖的package包/类
protected void loadSinkGroups(final FlumeConfiguration.AgentConfiguration agentConf,
                              final Map<String, Sink> sinks, final NodeConfiguration conf) {
    final Set<String> sinkgroupNames = agentConf.getSinkgroupSet();
    final Map<String, ComponentConfiguration> compMap = agentConf.getSinkGroupConfigMap();
    final Map<String, String> usedSinks = new HashMap<String, String>();
    for (final String groupName : sinkgroupNames) {
        final ComponentConfiguration comp = compMap.get(groupName);
        if (comp != null) {
            final SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
            final List<String> groupSinkList = groupConf.getSinks();
            final List<Sink> groupSinks = new ArrayList<Sink>();
            for (final String sink : groupSinkList) {
                final Sink s = sinks.remove(sink);
                if (s == null) {
                    final String sinkUser = usedSinks.get(sink);
                    if (sinkUser != null) {
                        throw new ConfigurationException(String.format(
                            "Sink %s of group %s already in use by group %s", sink, groupName, sinkUser));
                    }
                    throw new ConfigurationException(String.format(
                            "Sink %s of group %s does not exist or is not properly configured", sink,
                            groupName));
                }
                groupSinks.add(s);
                usedSinks.put(sink, groupName);
            }
            final SinkGroup group = new SinkGroup(groupSinks);
            Configurables.configure(group, groupConf);
            conf.getSinkRunners().put(comp.getComponentName(), new SinkRunner(group.getProcessor()));
        }
    }
    // add any unassigned sinks to solo collectors
    for (final Map.Entry<String, Sink> entry : sinks.entrySet()) {
        if (!usedSinks.containsValue(entry.getKey())) {
            final SinkProcessor pr = new DefaultSinkProcessor();
            final List<Sink> sinkMap = new ArrayList<Sink>();
            sinkMap.add(entry.getValue());
            pr.setSinks(sinkMap);
            Configurables.configure(pr, new Context());
            conf.getSinkRunners().put(entry.getKey(), new SinkRunner(pr));
        }
    }
}
 

开发者ID:OuZhencong,
项目名称:log4j2,
代码行数:44,
代码来源:FlumeConfigurationBuilder.java

示例5: loadSinkGroups

点赞 2

import org.apache.flume.sink.DefaultSinkProcessor; //导入依赖的package包/类
protected void loadSinkGroups(final FlumeConfiguration.AgentConfiguration agentConf,
    final Map<String, Sink> sinks, final NodeConfiguration conf) {
  final Set<String> sinkgroupNames = agentConf.getSinkgroupSet();
  final Map<String, ComponentConfiguration> compMap = agentConf.getSinkGroupConfigMap();
  final Map<String, String> usedSinks = new HashMap<String, String>();
  for (final String groupName : sinkgroupNames) {
    final ComponentConfiguration comp = compMap.get(groupName);
    if (comp != null) {
      final SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
      final List<String> groupSinkList = groupConf.getSinks();
      final List<Sink> groupSinks = new ArrayList<Sink>();
      for (final String sink : groupSinkList) {
        final Sink s = sinks.remove(sink);
        if (s == null) {
          final String sinkUser = usedSinks.get(sink);
          if (sinkUser != null) {
            throw new ConfigurationException(String.format(
                "Sink %s of group %s already in use by group %s", sink, groupName, sinkUser));
          } else {
            throw new ConfigurationException(String.format(
                "Sink %s of group %s does not exist or is not properly configured", sink,
                groupName));
          }
        }
        groupSinks.add(s);
        usedSinks.put(sink, groupName);
      }
      final SinkGroup group = new SinkGroup(groupSinks);
      Configurables.configure(group, groupConf);
      conf.getSinkRunners().put(comp.getComponentName(), new SinkRunner(group.getProcessor()));
    }
  }
  // add any unassigned sinks to solo collectors
  for (final Map.Entry<String, Sink> entry : sinks.entrySet()) {
    if (!usedSinks.containsValue(entry.getKey())) {
      final SinkProcessor pr = new DefaultSinkProcessor();
      final List<Sink> sinkMap = new ArrayList<Sink>();
      sinkMap.add(entry.getValue());
      pr.setSinks(sinkMap);
      Configurables.configure(pr, new Context());
      conf.getSinkRunners().put(entry.getKey(), new SinkRunner(pr));
    }
  }
}
 

开发者ID:jopecko,
项目名称:logback-flume-ng,
代码行数:45,
代码来源:FlumeConfigurationBuilder.java


版权声明:本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系管理员进行删除。
喜欢 (0)