kafka精通1-创新互联
使用同步方式来解决多个服务之间的通信
成都创新互联公司是专业的乌拉特后网站建设公司,乌拉特后接单;提供网站制作、成都网站建设,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行乌拉特后网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!
同步通信的方式方式会存在性能和稳定性的问题使用异步的通信方式
异步的优势:可以让上游快速成功,明显提升系统吞吐量;即使有服务失败,也可以通过分布式事务解决方案来保证最终是成功的,也能保障业务执行之后的最终一致性。
消息队列解决具体的是什么问题--------通信问题
目前消息队列的中间件选型有很多种:
- rabbitMQ
- rocketMQ
- kafka(全球消息处理性能最快的一款MQ)
- zeroMQ
这些消息队列中间件有什么区别?
- 有broker
重topic:kafka、rocketMQ
整个topic,依据topic来进行消息的中转,在重topic的消息队列里必然需要topic的存在
轻topic:RabbitMQ
topic只是一种中转模式 - 无broker
在生产者和消费者之间没有使用broker,例如zeroMQ,直接使用socket来进行通信
kafka是一个分布式、支持分区(partition)、多副本(replica),基于zookeeper协调的分布式消息系统,大特点是可以实时的处理大量数据
kafka使用场景- 日志收集:用kafka收集各种服务的日志,通过kafka以统一接口服务的方式开放给consumer
- 消息系统:解耦和生产者和消费者、缓存消息等
- 用户活动跟踪:kafka经常被用来记录web用户或者app用户的各种活动,比如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘
- 运营指标:kafka也经常用来记录运营监控指标。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告
kafka的安装
1.部署一台zookeeper服务器 2.安装jdk 3.下载kafka的安装包:https:kafka.apache.org/download 4.上传到kafka服务器上并解压:/usr/local/kafka 5. 进入conf目录内,修改server.properties ```powershell #broker.id属性在kafka集群中必须要唯一 broker.id=0 #kafka部署的机器ip和提供服务的端口 listeners=PLAINTEXT://10.234.252.122:9092 #kafka消息存储文件 log.dirs=/usr/local/kafka #kafka连接zookeeper的地址 zookeeper.connect=10.234.252.122:2181 ``` ```powershell #进入到bin目录内,执行以下命令来启动kafka服务器(带着配置文件) ./kafka-server-start.sh -daemon ../config/server.properties #校验kafka是否启动成功,进入到zk内查看是否有kafka的节点 ls /brokers/ids #查询出有broker的id则存在 ```
创建topic
执行以下命令创建名为“test”的topic,这个topic只有一个partition,并且备份因子也设置为1:
#./kafka-topic.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看当前kafka内有那些topic
./kafka-topic.sh --zookeeper localhost:2182 --list
- 发送消息
kafka自带一个producer命令客户端,可以从本地文件中读取内容,或者我们也可以从命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中,在默认情况下,每一行会被当成一个独立的消息,使用kafka的发送消息的客户端,指定发送到kafka服务器地址和topic
./kafka-console-producer.sh --broker-list 10.234.252.122:9092 --topic test
>hello
>world
>1111
>22222222
- 消费消息
对于consumer,kafka同样也携带了一个命令行客户端,会将获取到内容在命令中进行输出,默认是消费最新的消息,使用kafka的消费者消息的客户端,从指定kafka服务器的指定topic中消费消息
方式1:从最后一条消息的偏移量(offset)+1开始消费
#./kafka-console.consumer.sh --bootstrap-server 10.234.252.122:9092 --topic test
方式2:从开始消费
#./kafka-console.consumer.sh --bootstrap-server 10.234.252.122:9092 --topic test --from-beginning
>hello
>world
>1111
>22222222
- 关于消费消息的细节
- 生产者将消息发送给broker,broker会将消息保存在本地日志文件中
/usr/local/kafka/kafka-logs/主题-分区/000000.log
- 消息的保存是有序的,通过offset偏移量来描述消息的有序性
- 消费者消费消息时也是通过offset来描述当前要消费的那条消息的位置
- 单播消息
如果多个消费者在同一个消费组,那么只有一个消费者可以收到订阅的topic中的消息;换言之,同一个消费组中只能有一个消费者收到一个topic中的消息
./kafka-console.consumer.sh --bootstrap-server 10.234.252.122:9092 --topic test --consumer-property group.id=testgroup
- 多播消息
不同的消费组订阅同一个topic,那么不同的消费组中只有一个消费者能收到消息;实际上也是多个消费组中的多个消费者收到同一个topic的消息
./kafka-console.consumer.sh --bootstrap-server 10.234.252.122:9092 --topic test --consumer-property group.id=testgroup1
./kafka-console.consumer.sh --bootstrap-server 10.234.252.122:9092 --topic test --consumer-property group.id=testgroup2
- 查看消费组及信息
查看当前主题下有哪些消费组
./kafka-consumer-groups.sh --bootstrap-server 10.234.252.122:9092 --list
查看消费组中的具体信息:比如当前偏移量、最后一条消息的偏移量、堆积的消息数量
./kafka-consumer-groups.sh --bootstrap-server 10.234.252.122:9092 --describe --group testgroup
注意:
- current-offset:当前消费组的已消费偏移量
- log-end-offset:主题对应分区消费的结束偏移量
- lag:当前消费组未消费的消息数
主题topic
主题-topic在kafka中是一个逻辑的概念,kafka通过topic将消息进行分类,不同的topic会被订阅该topic的消费者消费。
但是有一个问题,如果说这个topic中的消息非常非常多,多到需要几T来存,因为消息是会被保存到log日志文件中的,为了解决这个文件过大的问题,kafka提出来分区的概念partition分区
通过partition将一个topic中的消息分区来存储,这样的好处:
- 分区存储,可以解决统一存储文件过大的问题
- 提高了读写的吞吐量,读和写可以同时在多个分区中进行
分区的作用:
分布式存储
可以并行写
**为一个主题创建多个分区**
./kafka-topics.sh --create --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --topic test1
kafka中消息日志文件中保存的内容
00000.log:这个文件中保存的就是消息 __consumer_offsets-49:kafka内部自己创建了 __consumer_offsets主题,包含了50个分区,这个主题用来存放消费者消费某个主题的偏移量,因为每个消费者都会自己维护着消费的主题的偏移量,也就是说每个消费者会把消费的主题的偏移量自主的上报给kafka中的默认主题:__consumer_offsets。因此kafka为了提升这个主题的并发性,默认设置了50个分区 提交到那个分区:通过hash函数:hash(consumergroupid)%_consumer_offsets主题的分区数 提交到该主题中的内容是:key是consumergroupid+topic+分区号,value就是当前offsets的值 文件中保存的消息,默认保存7天,7天后消息会被删除
在创建主题时,除了指明主题的分区数以外,还指明了副本数
副本是为了为主题中的分区创建多个备份,多个副本在kafka集群的多个broker中,会有一个副本作为leader,其他是follower
- leader:
kafka的读和写的操作,都发生在leader上,leader负责把数据同步给follower。当leader挂了,经过主从选举,从多个follower中选举产生一个新的leader - follower:
接受leader的同步的数据 - isr:
可以同步和已同步的节点会被存入到isr集合中,这里有一个细节:如果isr中的节点性能较差,会被踢出isr集合
./kafka-console-producer.sh --broker-list 10.234.252.122:9092,10.234.252.209:9092,10.234.253.22:9092 --topic my-replicated-topic
./kafka-console-consumer.sh --bootstrap-server 10.234.252.122:9092,10.234.252.209:9092,10.234.253.22:9092 --from-beginning --topic my-replicated-topic
关于分区消费组消费者的细节- 一个partition只能被一个消费组中的一个消费者消费,目的是为了保证消费的顺序性,但是多个partition的多个消费者消费的总的顺序性是得不到保证的。
- partition的数量决定了消费组中消费者的数量,建议同一个消费组中消费者的数量不要超过partition的数量,否则多的消费者消费不到消息
- 如果消费者挂了,会触发rebalance机制,会让其他消费者来消费该分区
如果生产者发送消息没有收到ack,生产者会阻塞,阻塞到3s的时间,如果还没有收到消息,会进行重试。重试的次数为3次
异步发送,生产者发完消息后就可以执行之后的业务,broker在收到消息后异步调用生产者提供的callback回调方法
在同步发送的前提下,生产者在获得集群返回的ack之前会一直阻塞,那么集群什么时候返回ACK呢?此时ACK有三个配置:
- ACK=0:kafka-cluster不需要任何的broker收到消息,就立即返回ACK给生产者,最容易丢消息的,效率是最高的
- ACK=1(默认):多副本之间的leader已经收到消息,并把消息写入到本地log中,才会返回ACK给生产者,性能和安全性是最均衡的
- ACK=-1/all:里面有默认的配置
min.insync.replica=2
(默认为1,推荐配置大于等于2),此时就需要leader和一个follower同步完成后,才会返回ACK给生产者(此时集群中有2个broker已完成数据的接收),这种方式最安全,但性能最差
其他一些细节:
- 发送默认会重试3次,每次间隔100ms
- 发送的消息会先进入到本地缓冲区(32MB),用来存放要发送的消息,kafka会跑一个线程,该线程去缓冲区中取16k(也是可以配置的,默认是16k)的数据,发送到kafka,如果到10毫秒数据没取满16k,也会发送一次
消费者poll到消息后默认情况下,会自动向broker的_consumer_offsets
主题提交当前主题-分区消费的偏移量
提交的内容
消费者无论是自动提交还是手动提交,都需要把所属的消费组+消费的主题+消费的某个分区及消费的偏移量
这样的信息提交到集群的_consumer_offsets
主题里面自动提交
消费者poll消息下来以后就会自动提交offset
自动提交会丢消息:因为如果消费者还没消费完poll下来的消息就会自动提交了偏移量,那么此时消费者挂了,于是下一个消费者会从已提交的offset的下一个位置开始消费消息,之前未被消费的消息就会丢失掉了手动提交
需要把自动提交的配置改成false
手动提交又分成两种- 手动同步提交 在消费完消息后调用同步提交的方法,当集群返回ACK前一直阻塞,返回ACK后表示提交成功,执行之后的逻辑 - 手动异步提交 在消息消费完后提交,不需要等到集群ACK,直接执行之后的逻辑,可以设置一个回调方法,供集群调用
- 默认情况下,消费者一次会poll 500条消息
- 长轮询的时间默认是1000毫秒
意味着:
1、如果一次poll到500条,就执行for循环
2、如果这一次没有poll到500
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧
网站名称:kafka精通1-创新互联
URL链接:http://pcwzsj.com/article/doocps.html