• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

标签:apache-flink

apache-flink

Flink ParquetSinkWriter FileAlreadyExistsException

Flink ParquetSinkWriter FileAlreadyExistsException
我正在尝试通过使用BucketingSink和自定义ParquetSinkWriter在HDFS上使用Apache Flink编写Parquet文件。 这是代码,上面的错误指示从下面开始启用检查点(在BucketingSink类中调用snapshotState())刷新方法时无法正常工作。甚至writer也被“ writer.close();”关闭。但仍然收……继续阅读 »

apache

如何在Flink中获得另一个工作的工作结果?

如何在Flink中获得另一个工作的工作结果?
Closed. This question needs details or clarity。它当前不接受答案。 想改善这个问题吗?添加详细信息,并通过 editing this post阐明问题。 3年前关闭。 Improve this question 这是一种情况,我有两个数据源,分别是消息队列和MySQL表,它们可以分别称为Data……继续阅读 »

apache-flink

YARN上Spark提交和Flink的HDFS路径

YARN上Spark提交和Flink的HDFS路径
我使用cloudera live vm,那里有一个hadoop和spral独立集群。现在我想用spark提交和flink运行脚本来提交我的工作。这也有效。但是我的应用程序可以在hdfs中找到输入和输出文件的路径。我将路径设置为:hdfs://127.0.0.1:50010 / user / cloudera / outputs我从港口得到的信息是这样的: 如……继续阅读 »

apache-flink

MapFunction的实现不可序列化Flink

MapFunction的实现不可序列化Flink
我正在尝试实现一个类,该类使用户可以在不限制输入流类型的情况下操纵N个输入流。 首先,我想将所有输入数据流转换为keyedStreams。因此,我将输入数据流映射到一个元组中,然后,我应用KeyBy将其转换为键流。 我总是遇到序列化的问题,我尝试按照本指南https://ci.apache.org/projects/flink/flink-docs-stab……继续阅读 »

apache-flink

Flink动态更新流作业

Flink动态更新流作业
我收到了有关不同主题的一系列Avro格式的事件。我想使用这些并以拼花格式写入s3。我写了下面的工作,为每个事件创建一个不同的流,并从合计架构注册表中获取其架构,以创建事件的拼花槽。这工作正常,但是我面临的唯一问题是,每当有新事件开始时,我都必须更改YAML配置并每次都重新启动作业。有什么办法我不必重新启动作业,它就开始消耗新的事件集了。 YamlReader……继续阅读 »

apache-flink

Apache Flink:如何使用Java Map(或包含DTO的Map)流?

Apache Flink:如何使用Java Map(或包含DTO的Map)流?
我正在使用Flink,并且具有动态更改的字段和嵌套字段的JSON字符串流到达我的系统。因此,我无法将此传入的JSON模拟并将其转换为静态POJO,而必须依赖于Map。 我的第一个转换是使用GSON解析将JSON字符串流转换为Map对象流,然后将地图包装在名为Data的DTO中。 (inside the first map transformation) Li……继续阅读 »

apache-flink

Flink和Cassandra连接问题

Flink和Cassandra连接问题
当正常在Flink的DataStream外部进行连接时,是否有人遇到过从Flink作业连接到Cassandra的任何问题? Session session = clusterBuilder.getCluster().connect(); ResultSet resultSet = session.execute(resultStatement……继续阅读 »