0%

仿牛客论坛项目06-Kafka实现系统通知

仿牛客论坛项目06-Kafka实现系统通知

阻塞队列、spring整合kafka、发送系统通知、显示系统通知

解决系统发送通知的功能。

阻塞队列

kafka 是框架,用阻塞队列也能实现,便于理解 kafka

  • BlockingQueue
    • 解决线程通信问题
    • 阻塞方法:put、take
  • 生产者消费者模式。
    • 生产者:生产数据的线程
    • 消费者:使用数据的线程
    • BlockingQueue 避免两个线程直接连接
  • 实现类
    • ArrayBlockingQueue
    • LinkedBlockingQueue
    • PriorityBlockingQueue
    • SynchronousQueue

Kafka

  • 简介
    • 分布式的流媒体平台
    • 应用:消息系统、日志收集、用户行为追踪、流式处理
  • 特点
    • 高吞吐量(适合海量数据,如日志)、消息持久化(存到硬盘)、高可靠性(分布式)、高扩展性(便于增加服务器)
  • 术语
    • Broker(服务器)、Zookeeper(管理Kafka集群)
    • Topic(主题)、Partition(对主题的分区,增强并发能力)、Offset(消息在分区中的位置)
    • Leader Replica(主副本)、Follower Replica(随从副本)
      安装命令:brew install kafka
      启动zookeeper:brew services start zookeeper zookeeper-server-start /opt/homebrew/etc/kafka/zookeeper.properties`

启动kafka服务:brew services start kafka kafka-server-start /opt/homebrew/etc/kafka/server.properties`

停止命令:brew services stop zookeeper
创建主题:kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test 查看主题:kafka-topics –list –bootstrap-server localhost:9092 发布消息(生产者):kafka-console-producer –broker-list localhost:9092 –topic test 订阅消息(消费者):kafka-console-consumer –bootstrap-server localhost:9092 –topic test –from-beginning`

Spring 整合 Kafka

  • 引入依赖
    • spring-kafka
  • 配置 Kafka
    • 配置server、consumer
  • 访问 Kafka
    • 生产者:kafkaTemplate.send(topic, data)
    • 消费者:注解@KafkaListener(topics={"test"}) 修饰方法,得到消息

发送系统通知

发布消息对系统来说很频繁

  • 触发事件
    评论、点赞、关注后,发布通知
    定义三类不同的主题,评论、点赞、关注发生后,生产者放到队列中。
    异步,生产者、消费者可以同时处理
  • 处理事件
    封装事件对象,开发事件的生产者和消费者
  1. 定义通知消息模型
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public class Event {  
    private String topic;
    // 触发事件的用户
    private int userId;
    private int entityType;
    private int entityId;
    // 实体的作者
    private int entityUserId;
    private Map<String, Object> data = new HashMap<>();
    }
  2. 生产者发送通知消息
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Component  
    public class EventProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    // 处理事件(发送消息)
    public void fireEvent(Event event) {
    // 将事件发布到指定的主题
    kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
    }
    }
  3. 消费者接收通知消息:消费者订阅相应的 Kafka topic,接收通知消息,并根据消息类型执行相应的通知操作
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    @KafkaListener(topics = {TOPIC_COMMENT, TOPIC_LIKE, TOPIC_FOLLOW})  
    public void handleMessage(ConsumerRecord record) {
    if(record == null || record.value() == null) {
    logger.error("消息内容为空!");
    return;
    }

    Event event = JSONObject.parseObject(record.value().toString(), Event.class);
    if(event == null) {
    logger.error("消息格式错误!");
    return;
    }

    // 发送站内通知(后台统一发送)
    Message message = new Message();
    message.setFromId(SYSTEM_USER_ID);
    message.setToId(event.getEntityUserId());
    message.setConversationId(event.getTopic());
    message.setCreateTime(new Date());

    Map<String, Object> map = new HashMap<>();
    map.put("userId", event.getUserId());
    map.put("entityType", event.getEntityType());
    map.put("entityId", event.getEntityId());


    if(!event.getData().isEmpty()) {
    for(Map.Entry<String, Object> entry : event.getData().entrySet()) {
    map.put(entry.getKey(), entry.getValue());
    }
    }

    message.setContent(JSONObject.toJSONString(map));
    messageService.addMessage(message);
    }

显示系统通知

  • 通知列表
    • 显示评论、点赞、关注三种类型的通知
  • 通知详情
    • 分页显示某一类主题的通知
  • 唯独消息
    • 页面头部显示未读消息数(用拦截器实现,每个页面上都有)
1
2
3
4
5
6
7
8
9
10
// 在模板渲染之前,将未读消息的数量传递给模板
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
User user = hostHolder.getUser();
if(user != null && modelAndView != null) {
int letterUnreadCount = messageService.findLetterUnreadCount(user.getId(), null);
int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null);
modelAndView.addObject("allUnreadCount", letterUnreadCount + noticeUnreadCount);
}
}

视频代码里有不严谨的地方:
查询某一类消息的通知,当页面某一类消息为空的时候,页面会报错
原来模版里写的 th:if="${followNotice.message != null}" 这样当followNotice为空,就获取不到这个属性了
th:if="${followNotice != null and followNotice.message != null}"

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 查询点赞类的通知
message = messageService.findLatestNotice(user.getId(), TOPIC_LIKE);
messageVO = new HashMap<>();
if(message != null) {
messageVO.put("message", message);
String content = HtmlUtils.htmlUnescape(message.getContent());
// {&quot;entityType&quot;:1,&quot;entityId&quot;:234,&quot;postId&quot;:234,&quot;userId&quot;:156}
Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);
messageVO.put("user", userService.findUserById((Integer) data.get("userId")));
messageVO.put("entityType", data.get("entityType"));
messageVO.put("entityId", data.get("entityId"));
messageVO.put("postId", data.get("postId"));
int count = messageService.findNoticeCount(user.getId(), TOPIC_LIKE);
messageVO.put("count", count);
int unread = messageService.findNoticeUnreadCount(user.getId(), TOPIC_LIKE);

messageVO.put("unread", unread);

}
// 当页面某一类消息为空的时候,页面会报错
model.addAttribute("likeNotice", messageVO);

问题

  1. 为什么系统发送通知要用 Kafka 实现
    需求:有大量用户操作时,触发大量的通知需求,造成传统数据库负载较高
    Kafka:而Kafka支持每秒处理数百万条数据,有效处理高并发的通知需求

  2. 系统发送通知(评论、点赞、关注)如何用 Kafka 实现的

    1. 定义通知消息模型:封装通知的相关数据(如用户ID、实体类型、消息内容等)。
    2. 生产者发送通知消息:用户进行点赞、评论或关注时,将事件封装成消息发送到 Kafka。
    3. 消费者接收通知消息:消费者订阅相应的 Kafka topic,接收通知消息,并根据消息类型执行相应的通知操作。
    4. 通知服务处理通知:根据消息内容发送通知(如站内信、邮件、短信、推送等)。
  3. 消费者接收通知消息时,最后还会把消息放到数据库里,在高并发的情况下,数据库的写入操作依然会成为性能瓶颈
    可能的解决方法:消息队列+缓存+异步写入数据库

    1. 消费者将消息先存储到 Redis 缓存中;
    2. 异步的线程批量将 Redis 中的消息写入数据库。