• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

Java FlinkKafkaConsumer082类的典型用法和代码示例

java 1次浏览

本文整理汇总了Java中org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082的典型用法代码示例。如果您正苦于以下问题:Java FlinkKafkaConsumer082类的具体用法?Java FlinkKafkaConsumer082怎么用?Java FlinkKafkaConsumer082使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。

FlinkKafkaConsumer082类属于org.apache.flink.streaming.connectors.kafka包,在下文中一共展示了FlinkKafkaConsumer082类的6个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: main

点赞 3

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082; //导入依赖的package包/类
public static void main(String args[]) throws Exception{

        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        DataStream<String> messageStream = env
                .addSource(new FlinkKafkaConsumer082 <>(
                parameterTool.getRequired("topic"),
                new SimpleStringSchema(),
                parameterTool.getProperties()));

        // print() will write the contents of the stream to the TaskManager's standard out stream
        // the rebelance call is causing a repartitioning of the data so that all machines
        // see the messages (for example in cases when "num kafka partitions" < "num flink operators"
        messageStream.rebalance().map(new MapFunction<String, String>() {
            private static final long serialVersionUID = -6867736771747690202L;

            @Override
            public String map(String value) throws Exception {
                return "Kafka and Flink says: " + value;
            }
        }).print();

        env.execute();


    }
 

开发者ID:jdc91,
项目名称:StreamProcessingInfrastructure,
代码行数:30,
代码来源:FlinkPoc.java

示例2: stringStreamFromKafka

点赞 3

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082; //导入依赖的package包/类
@Override
public WorkloadOperator<String> stringStreamFromKafka(String zkConStr,
                                                      String kafkaServers,
                                                      String group,
                                                      String topics,
                                                      String offset,
                                                      String componentId,
                                                      int parallelism) {
    Properties properties = new Properties();
    properties.put("bootstrap.servers", kafkaServers);
    properties.put("zookeeper.connect", zkConStr);
    properties.put("group.id", group);
    properties.put("topic", topics);
    properties.put("auto.commit.enable", false);
    properties.put("auto.offset.reset", offset);

    env.setParallelism(parallelism);
    DataStream<String> stream = env
            .addSource(new FlinkKafkaConsumer082<>(topics, new SimpleStringSchema(), properties));
    return new FlinkWorkloadOperator<>(stream, parallelism);
}
 

开发者ID:wangyangjun,
项目名称:StreamBench,
代码行数:22,
代码来源:FlinkOperatorCreator.java

示例3: main

点赞 3

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
	// create execution environment
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

	// parse user parameters
	ParameterTool parameterTool = ParameterTool.fromArgs(args);

	DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));

	// print() will write the contents of the stream to the TaskManager's standard out stream
	// the rebelance call is causing a repartitioning of the data so that all machines
	// see the messages (for example in cases when "num kafka partitions" < "num flink operators"
	messageStream.rebalance().map(new MapFunction<String, String>() {
		private static final long serialVersionUID = -6867736771747690202L;

		@Override
		public String map(String value) throws Exception {
			return "Kafka and Flink says: " + value;
		}
	}).print();

	env.execute();
}
 

开发者ID:dataArtisans,
项目名称:kafka-example,
代码行数:24,
代码来源:ReadFromKafka.java

示例4: stringStreamFromKafkaWithTime

点赞 2

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082; //导入依赖的package包/类
@Override
public WorkloadOperator<WithTime<String>> stringStreamFromKafkaWithTime(String zkConStr,
                                                                        String kafkaServers,
                                                                        String group,
                                                                        String topics,
                                                                        String offset,
                                                                        String componentId,
                                                                        int parallelism) {
    /*
    * Note that the Kafka source is expecting the following parameters to be set
    *  - "bootstrap.servers" (comma separated list of kafka brokers)
    *  - "zookeeper.connect" (comma separated list of zookeeper servers)
    *  - "group.id" the id of the consumer group
    *  - "topic" the name of the topic to read data from.
    *  "--bootstrap.servers host:port,host1:port1 --zookeeper.connect host:port --topic testTopic"
    */
    Properties properties = new Properties();
    properties.put("bootstrap.servers", kafkaServers);
    properties.put("zookeeper.connect", zkConStr);
    properties.put("group.id", group);
    properties.put("topic", topics);
    properties.put("auto.commit.enable", false);
    properties.put("auto.offset.reset", offset);

    env.setParallelism(parallelism);
    DataStream<String> stream = env
            .addSource(new FlinkKafkaConsumer082<>(topics, new SimpleStringSchema(), properties));
    DataStream<WithTime<String>> withTimeDataStream = stream.map(new MapFunction<String, WithTime<String>>() {
        @Override
        public WithTime<String> map(String value) throws Exception {
            String[] list = value.split(Constant.TimeSeparatorRegex);
            if (list.length == 2) {
                return new WithTime<>(list[0], Long.parseLong(list[1]));
            }
            return new WithTime<>(value, System.currentTimeMillis());
        }
    });
    return new FlinkWorkloadOperator<>(withTimeDataStream, parallelism);
}
 

开发者ID:wangyangjun,
项目名称:StreamBench,
代码行数:40,
代码来源:FlinkOperatorCreator.java

示例5: pointStreamFromKafka

点赞 2

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082; //导入依赖的package包/类
@Override
public WorkloadOperator<Point> pointStreamFromKafka(String zkConStr, String kafkaServers, String group, String topics, String offset, String componentId, int parallelism) {
    Properties properties = new Properties();
    properties.put("bootstrap.servers", kafkaServers);
    properties.put("zookeeper.connect", zkConStr);
    properties.put("group.id", group);
    properties.put("topic", topics);
    properties.put("auto.commit.enable", false);
    properties.put("auto.offset.reset", offset);

    env.setParallelism(parallelism);
    DataStream<String> stream = env
            .addSource(new FlinkKafkaConsumer082<>(topics, new SimpleStringSchema(), properties));
    DataStream<Point> pointStream = stream.map(new MapFunction<String, Point>() {
        @Override
        public Point map(String value) throws Exception {
            String[] list = value.split(Constant.TimeSeparatorRegex);
            long time = System.currentTimeMillis();
            if (list.length == 2) {
                time = Long.parseLong(list[1]);
            }
            String[] strs = list[0].split("\t");
            double[] position = new double[strs.length];
            for (int i = 0; i < strs.length; i++) {
                position[i] = Double.parseDouble(strs[i]);
            }
            return new Point(position, time);
        }
    });
    return new FlinkWorkloadOperator<>(pointStream, parallelism);
}
 

开发者ID:wangyangjun,
项目名称:StreamBench,
代码行数:32,
代码来源:FlinkOperatorCreator.java

示例6: main

点赞 2

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082; //导入依赖的package包/类
public static void main(final String[] args) throws Exception {

        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        Map conf = Utils.findAndReadConfigFile(parameterTool.getRequired("confPath"), true);
        int kafkaPartitions = ((Number)conf.get("kafka.partitions")).intValue();
        int hosts = ((Number)conf.get("process.hosts")).intValue();
        int cores = ((Number)conf.get("process.cores")).intValue();

        ParameterTool flinkBenchmarkParams = ParameterTool.fromMap(getFlinkConfs(conf));

        LOG.info("conf: {}", conf);
        LOG.info("Parameters used: {}", flinkBenchmarkParams.toMap());

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(flinkBenchmarkParams);

		// Set the buffer timeout (default 100)
        // Lowering the timeout will lead to lower latencies, but will eventually reduce throughput.
        env.setBufferTimeout(flinkBenchmarkParams.getLong("flink.buffer-timeout", 100));

        if(flinkBenchmarkParams.has("flink.checkpoint-interval")) {
            // enable checkpointing for fault tolerance
            env.enableCheckpointing(flinkBenchmarkParams.getLong("flink.checkpoint-interval", 1000));
        }
        // set default parallelism for all operators (recommended value: number of available worker CPU cores in the cluster (hosts * cores))
        env.setParallelism(hosts * cores);

        DataStream<String> messageStream = env
                .addSource(new FlinkKafkaConsumer082<String>(
                        flinkBenchmarkParams.getRequired("topic"),
                        new SimpleStringSchema(),
                        flinkBenchmarkParams.getProperties())).setParallelism(Math.min(hosts * cores, kafkaPartitions));

        messageStream
                .rebalance()
                // Parse the String as JSON
                .flatMap(new DeserializeBolt())

                //Filter the records if event type is "view"
                .filter(new EventFilterBolt())

                // project the event
                .<Tuple2<String, String>>project(2, 5)

                // perform join with redis data
                .flatMap(new RedisJoinBolt())

                // process campaign
                .keyBy(0)
                .flatMap(new CampaignProcessor());


        env.execute();
    }
 

开发者ID:yahoo,
项目名称:streaming-benchmarks,
代码行数:56,
代码来源:AdvertisingTopologyNative.java


版权声明:本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系管理员进行删除。
喜欢 (0)