• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

Apache-Beam将序列号添加到PCollection中

python 来源:Manuel Valero 4次浏览

我试图构建一个ETL来加载一个Dimension表。我使用Python和DataFlow和BigQuery来分配Apache Bea。Apache-Beam将序列号添加到PCollection中

我需要为pcollection的每个元素分配一个序列号,以便将其加载到BigQuery中,但我找不到任何方法来执行此操作。

我想我需要DataFlow使先前的聚合和连接,以获得我最后的pcollection添加序列号,但在这一刻我需要停止并行处理,并将我的pcollection投到列表(如在Spark中,当你使用.collect()),然后制作一个简单的循环来分配序列号。这样对吗?

这是管道,我编码:

p | ReadFromAvro(known_args.input) | beam.Map(adapt) | beam.GroupByKey() | beam.Map(adaptGroupBy) 

我读过没有办法摆脱pcollection列表: How to get a list of elements out of a PCollection in Google Dataflow and use it in the pipeline to loop Write Transforms?

我怎样才能实现呢?任何帮助?


===========解决方案如下:

如果您想要的是获取PCollection中每个元素的列表,则可以使用侧面输入。请记住,这将从您的结果中删除所有并行性,并且您的管道可能会变慢。

如果您仍然想这样做的话:

side_input_coll = beam.pvalue.AsIterable(my_collection) 

(p 
| beam.Create([0]) 
| beam.FlatMap(lambda _, my_seq: [(elem, i) for i, elem in enumerate(my_seq)], 
       my_seq=side_input_coll)) 

但是不要忘记,为了维护并行性,它可能是最好简单地生成一个随机ID。请记住PCollections本质上是无序的。

要了解更多关于侧面输入,看到Beam Programming Guide on Side Inputs


版权声明:本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系管理员进行删除。
喜欢 (0)