• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

慕课网_《Kafka流处理平台》学习总结

kafka 来源:妙手空空 4次浏览

慕课网《Kafka流处理平台》学习总结

第一章:课程介绍

1-1 课程介绍

课程介绍

  • Kafka概念解析
  • Kafka结构设计
  • Kafka场景应用
  • Kafka高级特性

第二章:概念解析

2-1 发展背景

LinkedIn 开源

  • Databus 分布式数据同步系统
  • Cubert 高性能计算引擎
  • ParSeq Java异步处理框架
  • Kafka 分布式发布订阅消息系统,流处理平台

Kafka发展历程

  • LinkedIn 开发
  • 2011年初开源,加入Apache基金会
  • 2012年从Apache Incubator毕业
  • Apache顶级开源项目

Kafka的特性

  • 可以发布和订阅且记录数据的流,类似于消息队列
  • 数据流存储的平台,具备容错能力
  • 在数据产生时就可以进行处理

Kafka通常被用于

  • 构建实时数据流管道
  • 构建实时数据流处理

Kafka是什么

  • 面向于数据流的生产、转换、存储、消费整体的流处理平台
  • Kafka不仅仅是一个消息队列

2-2 基本概念

Producer:数据生产者

  • 消息和数据的生产者
  • 向Kafka的一个topic发布消息的进程或代码或服务

Consumer:数据消费者

  • 消息和数据的消费者
  • 向Kafka订阅数据(topic)并且处理其发布的消息的进程或代码或服务

Consumer Group:消费者组

  • 对于同一个topic,会广播给不同的Group
  • 一个Group中,只有一个Consumer可以消费该消息

Broker:服务节点

  • Kafka集群中的每个Kafka节点

Topic:主题

  • Kafka消息的类别
  • 对数据进行区分、隔离

Partition:分区

  • Kafka中数据存储的基本单元
  • 一个topic数据,会被分散存储到多个Partition
  • 一个Partition只会存在一个Broker上
  • 每个Partition是有序的

Replication:分区的副本

  • 同一个Partition可能会有多个Replication
  • 多个Replication之间数据是一样的

Replication Leader:副本的老大

  • 一个Partition的多个Replication上
  • 需要一个Leader负责该Partition上与Producer和Consumer交互

Replication Manager:副本的管理者

  • 负责管理当前Broker所有分区和副本的信息
  • 处理KafkaController发起的一些请求
  • 副本状态的切换
  • 添加、读取消息等

2-3 概念延伸

Partition:分区

  • 每一个Topic被切分为多个Partition
  • 消费者数目少于或等于Partition的数目
  • Broker Group中的每一个Broker保存Topic的一个或多个Partition
  • Consumer Group中的仅有一个Consumer读取Topic的一个或多个Partition,并且是惟一的Consumer

Replication:分区的副本

  • 当集群中有Broker挂掉的情况,系统可以主动地使Replication提供服务
  • 系统默认设置每一个Topic的Replication系数为1,可以在创建Topic时单独设置
  • Replication的基本单位是Topic的Partition
  • 所有的读和写都从Replication Leader进行,Replication Followers只是作为备份
  • Replication Followers必须能够及时复制Replication Leader的数据
  • 增加容错性与可扩展性

第三章:结构设计

3-1 基本结构

Kafka功能结构

Kafka数据流势

Kafka消息结构

  • Offset:当前消息所处于的偏移
  • Length:消息的长度
  • CRC32:校验字段,用于校验当前信息的完整性
  • Magic:很多分布式系统都会设计该字段,固定的数字,用于快速判定当前信息是否为Kafka消息
  • attributes:可选字段,消息的属性
  • Timestamp:时间戳
  • Key Length:Key的长度
  • Key:Key
  • Value Length:Value的长度
  • Value:Value

3-2 功能特点

Kafka特点:分布式

  • 多分区
  • 多副本
  • 多订阅者
  • 基于Zookeeper调度

Kafka特点:高性能

  • 高吞吐量
  • 低延迟
  • 高并发
  • 时间复杂度为O(1)

Kafka特点:持久性与扩展性

  • 数据可持久化
  • 容错性
  • 支持在线水平扩展
  • 消息自动平衡

第四章:场景应用

4-1 应用场景

Kafka应用场景

  • 消息队列
  • 行为跟踪
  • 元信息监控
  • 日志收集
  • 流处理
  • 事件源
  • 持久性日志(commit log)

4-2 应用案例

Kafka简单案例

  • 部署启动
  • 简单生产者
  • 简单消费者

学习笔记

1.下载与安装
Zookeeper下载:https://zookeeper.apache.org/releases.html#download
Kafka下载:http://kafka.apache.org/downloads
安装:解压、配置环境变量

2.Zookeeper启动
解压:tar -zxf zookeeper-3.4.12.tar.gz
目录:cd zookeeper-3.4.12/bin
启动:./zkServer.sh start /home/zc/server/kafka_2.12-2.0.0/config/zookeeper.properties

3.Kafka启动
解压:tar -zxf kafka_2.12-2.0.0.tgz
目录:cd kafka_2.12-2.0.0
启动:sudo bin/kafka-server-start.sh  config/server.properties

4.使用控制台操作生产者与消费者
创建Topic:sudo ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic myimooc-kafka-topic
查看Topic:sudo ./bin/kafka-topics.sh --list --zookeeper localhost:2181
启动生产者:sudo ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myimooc-kafka-topic
启动消费者:sudo ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myimooc-kafka-topic --from-beginning
生产消息:first message
生产消息:second message

4-3 代码案例

创建49-kafka-example的maven工程pom如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>49-kafka</artifactId>
        <groupId>com.myimooc</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>49-kafka-example</artifactId>

    <properties>
        <spring.boot.version>2.0.4.RELEASE</spring.boot.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-parent</artifactId>
                <version>${spring.boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.36</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

1.编写MessageEntity

package com.myimooc.kafka.example.common;

import java.util.Objects;

/**
 * <br>
 * 标题: 消息实体<br>
 * 描述: 消息实体<br>
 * 时间: 2018/09/09<br>
 *
 * @author zc
 */
public class MessageEntity {
    /**
     * 标题
     */
    private String title;
    /**
     * 内容
     */
    private String body;

    @Override
    public String toString() {
        return "MessageEntity{" +
                "title='" + title + '\'' +
                ", body='" + body + '\'' +
                '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        MessageEntity that = (MessageEntity) o;
        return Objects.equals(title, that.title) &&
                Objects.equals(body, that.body);
    }

    @Override
    public int hashCode() {
        return Objects.hash(title, body);
    }

    public String getTitle() {
        return title;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public String getBody() {
        return body;
    }

    public void setBody(String body) {
        this.body = body;
    }
}

2.编写SimpleProducer

package com.myimooc.kafka.example.producer;

import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * <br>
 * 标题: 生产者<br>
 * 描述: 生产者<br>
 * 时间: 2018/09/09<br>
 *
 * @author zc
 */
@Component
public class SimpleProducer<T> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    public void send(String topic, String key, Object entity) {
        logger.info("发送消息入参:{}", entity);
        ProducerRecord<String, Object> record = new ProducerRecord<>(
                topic,
                key,
                JSON.toJSONString(entity)
        );

        long startTime = System.currentTimeMillis();
        ListenableFuture<SendResult<String, Object>> future = this.kafkaTemplate.send(record);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                logger.error("消息发送失败:{}", ex);
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                long elapsedTime = System.currentTimeMillis() - startTime;

                RecordMetadata metadata = result.getRecordMetadata();
                StringBuilder record = new StringBuilder(128);
                record.append("message(")
                        .append("key = ").append(key).append(",")
                        .append("message = ").append(entity).append(")")
                        .append("send to partition(").append(metadata.partition()).append(")")
                        .append("with offset(").append(metadata.offset()).append(")")
                        .append("in ").append(elapsedTime).append(" ms");
                logger.info("消息发送成功:{}", record.toString());
            }
        });
    }
}

3.编写SimpleConsumer

package com.myimooc.kafka.example.consumer;

import com.alibaba.fastjson.JSONObject;
import com.myimooc.kafka.example.common.MessageEntity;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * <br>
 * 标题: 消费者<br>
 * 描述: 消费者<br>
 * 时间: 2018/09/09<br>
 *
 * @author zc
 */
@Component
public class SimpleConsumer {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @KafkaListener(topics = "${kafka.topic.default}")
    public void listen(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        //判断是否NULL
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            //获取消息
            Object message = kafkaMessage.get();

            MessageEntity messageEntity = JSONObject.parseObject(message.toString(), MessageEntity.class);

            logger.info("接收消息Topic:{}", topic);
            logger.info("接收消息Record:{}", record);
            logger.info("接收消息Message:{}", messageEntity);
        }
    }

}

4.编写Response

package com.myimooc.kafka.example.common;

import java.io.Serializable;

/**
 * <br>
 * 标题: REST请求统一响应对象<br>
 * 描述: REST请求统一响应对象<br>
 * 时间: 2018/09/09<br>
 *
 * @author zc
 */
public class Response implements Serializable {

    private static final long serialVersionUID = -972246069648445912L;
    /**
     * 响应编码
     */
    private int code;
    /**
     * 响应消息
     */
    private String message;

    public Response() {
    }

    public Response(int code, String message) {
        this.code = code;
        this.message = message;
    }

    @Override
    public String toString() {
        return "Response{" +
                "code=" + code +
                ", message='" + message + '\'' +
                '}';
    }

    public int getCode() {
        return code;
    }

    public void setCode(int code) {
        this.code = code;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }
}

5.编写ErrorCode

package com.myimooc.kafka.example.common;

/**
 * <br>
 * 标题: 错误编码<br>
 * 描述: 错误编码<br>
 * 时间: 2018/09/09<br>
 *
 * @author zc
 */
public class ErrorCode {
    /**
     * 成功
     */
    public final static int SUCCESS = 200;
    /**
     * 失败
     */
    public final static int EXCEPTION = 500;

}

6.编写ProducerController

package com.myimooc.kafka.example.controller;

import com.alibaba.fastjson.JSON;
import com.myimooc.kafka.example.common.ErrorCode;
import com.myimooc.kafka.example.common.MessageEntity;
import com.myimooc.kafka.example.common.Response;
import com.myimooc.kafka.example.producer.SimpleProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;

/**
 * <br>
 * 标题: 生产者Controller<br>
 * 描述: 生产者Controller<br>
 * 时间: 2018/09/09<br>
 *
 * @author zc
 */
@RestController
@RequestMapping("/producer")
public class ProducerController {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private SimpleProducer simpleProducer;

    @Value("${kafka.topic.default}")
    private String topic;

    private static final String KEY = "key";

    @PostMapping("/send")
    public Response sendKafka(@RequestBody MessageEntity message) {
        try {
            logger.info("kafka的消息:{}", JSON.toJSONString(message));
            this.simpleProducer.send(topic, KEY, message);
            logger.info("kafka消息发送成功!");
            return new Response(ErrorCode.SUCCESS,"kafka消息发送成功");
        } catch (Exception ex) {
            logger.error("kafka消息发送失败:", ex);
            return new Response(ErrorCode.EXCEPTION,"kafka消息发送失败");
        }
    }
}

7.编写application.properties

##----------kafka配置
## TOPIC
kafka.topic.default=myimooc-kafka-topic
# kafka地址
spring.kafka.bootstrap-servers=192.168.0.105:9092
# 生产者配置
spring.kafka.producer.retries=0
# 批量发送消息的数量
spring.kafka.producer.batch-size=4096
# 缓存容量
spring.kafka.producer.buffer-memory=40960
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 消费者配置
spring.kafka.consumer.group-id=myimooc
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=true
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 指定listener 容器中的线程数,用于提高并发量
spring.kafka.listener.concurrency=3

8.编写ExampleApplication

package com.myimooc.kafka.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;

/**
 * <br>
 * 标题: 启动类<br>
 * 描述: 启动类<br>
 * 时间: 2018/09/09<br>
 *
 * @author zc
 */
@SpringBootApplication
@EnableKafka
public class ExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(ExampleApplication.class, args);
    }

}

第五章:高级特性

5-1 消息事务

为什么要支持事务

  • 满足“读取-处理-写入”模式
  • 流处理需求的不断增强
  • 不准确的数据处理的容忍度不断降低

数据传输的事务定义

  • 最多一次:消息不会被重复发送,最多被传输一次,但也有可能一次不传输
  • 最少一次:消息不会被漏发送,最少被传输一次,但也有可能被重复传输
  • 精确的一次(Exactly once):不会漏传输也不会重复传输,每个消息都被传输一次且仅仅被传输一次,这是大家所期望的

事务保证

  • 内部重试问题:Procedure幂等处理
  • 多分区原子写入
  • 避免僵尸实例

    每个事务Procedure分配一个 transactionl. id,在进程重新启动时能够识别相同的Procedure实例

    Kafka增加了一个与transactionl.id相关的epoch,存储每个transactionl.id内部元数据

    一旦epoch被触发,任务具有相同的transactionl.id和更旧的epoch的Producer被视为僵尸,Kafka会拒绝来自这些Producer的后续事务性写入

5-2 零拷贝

零拷贝简介

  • 网络传输持久性日志块
  • Java Nio channel.transforTo()方法
  • Linux sendfile系统调用

文件传输到网络的公共数据路径

  • 第一次拷贝:操作系统将数据从磁盘读入到内核空间的页缓存
  • 第二次拷贝:应用程序将数据从内核空间读入到用户空间缓存中
  • 第三次拷贝:应用程序将数据写回到内核空间到socket缓存中
  • 第四次拷贝:操作系统将数据从socket缓冲区复制到网卡缓冲区,以便将数据经网络发出

零拷贝过程(指内核空间和用户空间的交互拷贝次数为零)

  • 第一次拷贝:操作系统将数据从磁盘读入到内核空间的页缓存
  • 将数据的位置和长度的信息的描述符增加至内核空间(socket缓存区)
  • 第二次拷贝:操作系统将数据从内核拷贝到网卡缓冲区,以便将数据经网络发出

文件传输到网络的公共数据路径演变

第六章:课程总结

6-1 课程总结

课程总结

  • Kafka基础概念与结构
  • Kafka的特点
  • Kafka应用场景
  • Kafka应用案例
  • Kafka高级特性

版权声明:本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系管理员进行删除。
喜欢 (0)