• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

标签:apache-beam

apache-beam

数据流单元测试

数据流单元测试
我正在尝试为我的管道构建单元测试。 该管道从pubsub读取,执行转换并将结果再次写入pubsub。 为了进一步简化单元测试,直到它起作用为止,该单元测试将只接收一个字符串作为输入,并测试输出是否是某个字符串。 该代码如下所示: @RunWith(JUnit4.class) public class TesterPipeline { // O……继续阅读 »

apache-beam

如何通过键合并两个PCollection KV <>?

如何通过键合并两个PCollection KV <>?
我试图为同一密钥输出SUM和COUNT。例如。给定具有数百万个飞机延误事件的.csv。我想使用Apache Beam(Java)求和每个平面的延迟时间,并计算每个平面有多少延迟。 每行都有plane_id, delay_duration, date,依此类推。 我正在尝试创建两个PCollections,并希望在输出之前进行合并。 PCollection&l……继续阅读 »

apache-beam

从Google Cloud Dataflow内部写入Firestore

从Google Cloud Dataflow内部写入Firestore
我现在遇到的核心问题是,当我运行部署到Google Cloud Dataflow的Dataflow管道时,出现错误: java.lang.IllegalStateException:名称为[DEFAULT]的FirebaseApp不存在。 如果我在本地运行相同的管道,则一切正常。因此,我怀疑是身份验证问题还是环境问题。 代码位: DEPLOY和REAL变量……继续阅读 »

apache-beam

DoFn.Setup和DoFn.StartBundle有什么区别?

DoFn.Setup和DoFn.StartBundle有什么区别?
这两个注释之间有什么区别? DoFn.Setup 用于准备要处理元素束的实例的方法的注释。 使用单词“bundle”,参数为零。 DoFn.StartBundle 用于准备要处理一批元素的实例的方法的注释。 使用单词“batch”,接受零个或一个参数( StartBundleContext ,一种访问PipelineOptions的方法)。 我想做什……继续阅读 »

apache-beam

是否可以使用KafkaIO.read为单个管道为两个不同的集群指定Kafka引导服务器?

是否可以使用KafkaIO.read为单个管道为两个不同的集群指定Kafka引导服务器?
我目前正在使用Google Cloud Dataflow和Apache Beam来消耗来自Kafka主题的消息,该主题存在于两个不同的Kafka集群中,两个集群均包含相同的主题名称,但主题中的数据不同。Kafka群集是分开的,因为它们包含来自不同区域的数据。 我只是想知道是否可以通过在单个KafkaIO.read Dataflow管道步骤中列出两个集群的所有……继续阅读 »