- 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧
我正在尝试通过使用BucketingSink和自定义ParquetSinkWriter在HDFS上使用Apache Flink编写Parquet文件。
这是代码,上面的错误指示从下面开始启用检查点(在BucketingSink类中调用snapshotState())刷新方法时无法正常工作。甚至writer也被“ writer.close();”关闭。但仍然收……继续阅读 »
我想拥有一个能够满足两个要求的Kakfa消费者:
要求时使用所有消息(将其获取)放弃/忽略所有尚未提交的消息(需要帮助)
为简单起见,我只运行一个主题和一个分区。这是我设置消费者的方式:
private Consumer<Long, String> createConsumer() {
final Properties props = n……继续阅读 »
按照Avro docs中“默认”属性的定义:“此字段的默认值,在读取缺少此字段的实例时使用(可选)。”
这意味着如果缺少相应的字段,则采用默认值。
但这似乎并非如此。考虑以下student模式:
{
"type": "record",
"namespace": "com.example",
"name": "S……继续阅读 »
我正在尝试从架构注册表中检索给定kafka主题的架构主题版本。我可以使用client.register(schema-name, schema)成功发布新版本,但是不确定如何获取版本。我在下面使用curl请求进行了尝试,但结果立即达到-1(空)。
CachedSchemaRegistryClient client = new CachedSchemaRegi……继续阅读 »
我是Apache Kafka的新手,我正尝试在Android Studio上使用它,以便使用A simple Kafka Consumer and Producer example中的代码将数据生成到位于我PC上的服务器
摇篮代码:
apply plugin: 'com.android.application'
android {
packagin……继续阅读 »
我正在尝试使用Spring云流+ Kafka绑定对Apache Kafka进行“恰好一个交付”概念的一些PoC。春云流Kafka粘结剂:“尝试从状态IN_TRANSACTION到状态IN_TRANSACTION的无效转换”
我安装了Apache Kafka“kafka_2.11-1.0.0”,并在生产者中定义了“transactionIdPrefix”,我知……继续阅读 »
我只是在学习spring boot和kakfa。我进行了一些探索,并配置了一个示例生产者应用程序,如下所示。但是,我无法发布消息。如果我能在这里错过什么得到帮助,那就太好了。我已经启动了zookeeper服务和kakfa服务,并确保该主题可用。
Config:
import java.util.HashMap;
import java.ut……继续阅读 »
我有来自两个主题orders和fsource的两个流。主要是订单是静态的,很少更新,而fsource则以每秒1000的速率更新。在这里,我使用了KTable-KTabke连接,因为它们具有相同的键。
PObject:
private String orderId;{1,2,3,4,5,6}
private Double price;
private Long……继续阅读 »
我有“ N”个Kafka主题,我需要根据业务逻辑使用来自不同主题的消息。我需要处理/过滤它们并将其发送到服务总线。我不想创建N个kafka配置来使用消息,我只想使其成为一个库,在这里我可以简单地将属性外部化以进行选择并配置使用者。这样我就可以将业务逻辑放入应用程序中。以前有没有人做过这种实现。请让我知道这种方法的最佳实践。
编辑:这是我的KafkaConsu……继续阅读 »
我正在运行火花流作业以使用直接方法从kafka消耗(对于kafka 0.1.0或更高版本)。使用maven-assembly-plugin构建POM文件,并使用jar tf <jar file> | grep ConsumerRecord检查jar文件的内容。我得到以下输出
org / apache / kafka / clients / co……继续阅读 »