• 周五. 12月 9th, 2022

5G编程聚合网

5G时代下一个聚合的编程学习网

热门标签

kafka原理剖析(3)-producer消息发送之缓冲区

[db:作者]

1月 6, 2022

1 整体发消息流程
kafka发送消息总流程.png
(1)第一步,等元数据拉取,上一回说过。
(2)元数据到位,对topic和key进行序列化。
(3)选取partition,3种情况

 a 如果消息里指定了patition的序号,先用指定的。但一般不会这么
b 没指定key,就用个原子int自增,和size取模选择partition,相当于轮询。
C 指定了key,那就把序列化后的key+topic,转化成hash,再取模。

2BD8FFD3-25DD-49E0-8077-E680FC5CA8A7.png

(4)对消息做大小校验,包括消息本身大小是否超过单条限制,是否超过缓冲区大小。
(5)消息进入缓冲区,Acumulator,重点,后面sender线程会操作缓冲区,进行网络发送/接收
(6)如果batch满了,或者有新batch了,唤醒send,准备发老batch。

2 缓冲区RecordAcumulator里加入消息-概览
52C92DB5-6C12-4028-A2E4-F4AB8D3FEFD2.png

(1)RecordAccumulator的数据结构,重点是batches这个map,topic->deque队列->batchs的三层结构
(2)发消息的第一步,就是把消息放入这个accumulator里,里面的deque,batch,batch对应的buffer,都需要初始化过程,初始化完成以后,只需要把对应消息放到某个topic的batch里就行了。
(3)每个组件的初始化,都保证了线程安全。

3 RecordAccumulator加入消息细节解析
根据上图,看看实现细节:
(1)getOrCreateDeque, 第一次来是空的,建立一个加入batches,上图看到batches是个concurrentHashMap,key是topicPartition,对应的hash值是topic + “-” + partition,就算是并发send,这里也能保证线程安全。
C63F6650-0542-46DD-8A53-22A44A7AB942.png
(2)tryappend, 尝试写入消息,
7E1B389F-78A1-4585-9CF8-C942529F2E2E.png

a 可以看到,tryappend是拿到topic+partition的最近一个recordbatch,加入消息。
b RecordBatch里面有MemoryRecords,封装了底层buffer。
c 写入数据的过程,其实是通过compressor组件,把消息格式解析成offset|size|crc|magic|attributes|timestamp|key size|key| value size | value 固定的kafka报文格式,然后通过输入流写入对应的buffer。

对于一般的异步写消息过程,其实这里写入成功了send就返回了。
由此可见,一个topic+patition,对应一个发送队列,对应n个发送批次,同时也对应n个发送缓冲区。
(3)第一次发这个topic-partition的消息,是没有batch的,也就没有对应的内存缓冲区,对应上图右上角返回null的时候,就要去申请内存缓冲buffer
(4)得到buffer以后,buffer封装到MemoryRecords里面,然后封装到RecordBatch代表一个批次,然后调用该batch的tryAppend,这次就可以写入了,最后把batch放入acummulator对应的topic-partition队列里。

4 申请内存缓冲区的过程
2DC01559-FA1B-41D2-B5CF-10F6AED08ADF.png

(1)第一次准备往某个topic的partition发消息的时候,RecordAccumulator里的对应的deque里肯定没有它的缓冲队列batch,那么就需要申请batch批次。bach批次的本质是封装的ByteBuffer,这又需要从机器内存申请。
(2)申请内存,需要从RecordAccumulator的BufferPool来申请,这是个buffer池,记录了buffer队列Deque<ByteBuffer> , 还维护了缓冲区的总内存totalMemory,可用内存availableMemory,每个batch的buffer大小poolableSize等。
(3)申请缓存,如果剩余缓存够,那么分两种情况

 a 如果来申请的内存就是批次大小,并且buffer池里有现成的,那么直接poll一个返回

B68E58AB-1C21-4BEF-A9B2-78A70E6472C3.png

 b 如果申请的大小大于一个批次大小,并且剩余缓存够,那么就从buffer池里释放一些缓存出来,分配一个大的缓存buffer出去。freeup就是while循环释放buffer池的buffer,直到剩余缓存够分配buffer为止。

FA1C0275-0623-44CF-A806-9830CA98D3F1.png
6205E67D-2BF0-4EEE-ABBA-2316211C65A6.png
(4)如果剩余缓存不够申请的,那就阻塞,等到别的线程释放出内存资源再来唤醒本线程,这里的唤醒可能是发送完一个batch的时候,batch空了就把占用的buffer还回队列里。
6F1923B3-2A00-4B32-905A-274BD5E749F7.png
假设唤醒了,这里要检查两个方面,一个是唤醒后剩余缓存或buffer池有没有空余了,一个是等待时间有没有超时(这里时间指的是producer的send的超时时间)。然后从buffer池里分配一个或者内存池里分配一份buffer出来
018BBEEE-712D-4A76-B0DD-80BFDF70AAAB.png

5 发消息缓冲区的数据结构
由此可见,简化版的发送缓冲区的数据结构,集成了消息批次划分,缓冲区暂存和系统内存分配三个功能RecordAccumulator-整体结构.png

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注