仿牛客论坛项目06-Kafka实现系统通知
阻塞队列、spring整合kafka、发送系统通知、显示系统通知
解决系统发送通知的功能。
阻塞队列
kafka 是框架,用阻塞队列也能实现,便于理解 kafka
- BlockingQueue
- 解决线程通信问题
- 阻塞方法:put、take
- 生产者消费者模式。
- 生产者:生产数据的线程
- 消费者:使用数据的线程
- BlockingQueue 避免两个线程直接连接
- 实现类
- ArrayBlockingQueue
- LinkedBlockingQueue
- PriorityBlockingQueue
- SynchronousQueue
Kafka
- 简介
- 分布式的流媒体平台
- 应用:消息系统、日志收集、用户行为追踪、流式处理
- 特点
- 高吞吐量(适合海量数据,如日志)、消息持久化(存到硬盘)、高可靠性(分布式)、高扩展性(便于增加服务器)
- 术语
- Broker(服务器)、Zookeeper(管理Kafka集群)
- Topic(主题)、Partition(对主题的分区,增强并发能力)、Offset(消息在分区中的位置)
- Leader Replica(主副本)、Follower Replica(随从副本)
安装命令:brew install kafka
启动zookeeper:brew services start zookeeper
zookeeper-server-start /opt/homebrew/etc/kafka/zookeeper.properties`
启动kafka服务:brew services start kafka
kafka-server-start /opt/homebrew/etc/kafka/server.properties`
停止命令:brew services stop zookeeper
创建主题:kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test 查看主题:
kafka-topics –list –bootstrap-server localhost:9092 发布消息(生产者):
kafka-console-producer –broker-list localhost:9092 –topic test 订阅消息(消费者):
kafka-console-consumer –bootstrap-server localhost:9092 –topic test –from-beginning`
Spring 整合 Kafka
- 引入依赖
- spring-kafka
- 配置 Kafka
- 配置server、consumer
- 访问 Kafka
- 生产者:
kafkaTemplate.send(topic, data)
- 消费者:注解
@KafkaListener(topics={"test"})
修饰方法,得到消息
- 生产者:
发送系统通知
发布消息对系统来说很频繁
- 触发事件
评论、点赞、关注后,发布通知
定义三类不同的主题,评论、点赞、关注发生后,生产者放到队列中。
异步,生产者、消费者可以同时处理 - 处理事件
封装事件对象,开发事件的生产者和消费者
- 定义通知消息模型
1
2
3
4
5
6
7
8
9
10public class Event {
private String topic;
// 触发事件的用户
private int userId;
private int entityType;
private int entityId;
// 实体的作者
private int entityUserId;
private Map<String, Object> data = new HashMap<>();
} - 生产者发送通知消息
1
2
3
4
5
6
7
8
9
10
11
12
public class EventProducer {
private KafkaTemplate kafkaTemplate;
// 处理事件(发送消息)
public void fireEvent(Event event) {
// 将事件发布到指定的主题
kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
}
} - 消费者接收通知消息:消费者订阅相应的 Kafka topic,接收通知消息,并根据消息类型执行相应的通知操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void handleMessage(ConsumerRecord record) {
if(record == null || record.value() == null) {
logger.error("消息内容为空!");
return;
}
Event event = JSONObject.parseObject(record.value().toString(), Event.class);
if(event == null) {
logger.error("消息格式错误!");
return;
}
// 发送站内通知(后台统一发送)
Message message = new Message();
message.setFromId(SYSTEM_USER_ID);
message.setToId(event.getEntityUserId());
message.setConversationId(event.getTopic());
message.setCreateTime(new Date());
Map<String, Object> map = new HashMap<>();
map.put("userId", event.getUserId());
map.put("entityType", event.getEntityType());
map.put("entityId", event.getEntityId());
if(!event.getData().isEmpty()) {
for(Map.Entry<String, Object> entry : event.getData().entrySet()) {
map.put(entry.getKey(), entry.getValue());
}
}
message.setContent(JSONObject.toJSONString(map));
messageService.addMessage(message);
}
显示系统通知
- 通知列表
- 显示评论、点赞、关注三种类型的通知
- 通知详情
- 分页显示某一类主题的通知
- 唯独消息
- 页面头部显示未读消息数(用拦截器实现,每个页面上都有)
1 | // 在模板渲染之前,将未读消息的数量传递给模板 |
视频代码里有不严谨的地方:
查询某一类消息的通知,当页面某一类消息为空的时候,页面会报错
原来模版里写的 th:if="${followNotice.message != null}"
这样当followNotice为空,就获取不到这个属性了th:if="${followNotice != null and followNotice.message != null}"
1 | // 查询点赞类的通知 |
问题
为什么系统发送通知要用 Kafka 实现
需求:有大量用户操作时,触发大量的通知需求,造成传统数据库负载较高
Kafka:而Kafka支持每秒处理数百万条数据,有效处理高并发的通知需求系统发送通知(评论、点赞、关注)如何用 Kafka 实现的
- 定义通知消息模型:封装通知的相关数据(如用户ID、实体类型、消息内容等)。
- 生产者发送通知消息:用户进行点赞、评论或关注时,将事件封装成消息发送到 Kafka。
- 消费者接收通知消息:消费者订阅相应的 Kafka topic,接收通知消息,并根据消息类型执行相应的通知操作。
- 通知服务处理通知:根据消息内容发送通知(如站内信、邮件、短信、推送等)。
消费者接收通知消息时,最后还会把消息放到数据库里,在高并发的情况下,数据库的写入操作依然会成为性能瓶颈
可能的解决方法:消息队列+缓存+异步写入数据库- 消费者将消息先存储到 Redis 缓存中;
- 异步的线程批量将 Redis 中的消息写入数据库。