# 发送系统通知 --- 点赞、关注、私信等系统都会发送通知,在流量巨大的社交网站中,这个系统通知的需求是非常庞大的,为保证系统性能,使用消息队列 Kafka 构建 TB 级异步消息系统。 > 掌握 Java 原生 API 阻塞队列 > > ![](https://gitee.com/veal98/images/raw/master/img/20210127204618.png) 下载安装 Kafka(Kafka 自带 Zookeeper,对其配置文件进行相应修改) 1)首先:启动 kafka: 第一步: ![](https://gitee.com/veal98/images/raw/master/img/20210127212202.png) ```shell cd d:\kafka_2.13-2.7.0 bin\windows\zookeeper-server-start.bat config\zookeeper.properties ``` 第二步:开启另一个命令行 ![](https://gitee.com/veal98/images/raw/master/img/20210127212417.png) ```shell cd d:\kafka_2.13-2.7.0 bin\windows\kafka-server-start.bat config\server.properties ``` 2)然后,开启另一个命令行,创建主题 ![](https://gitee.com/veal98/images/raw/master/img/20210127212922.png) 3)生产者生产消息 ![](https://gitee.com/veal98/images/raw/master/img/20210127213405.png) 4)开启另一个命令行,消费者读取消息 ![](https://gitee.com/veal98/images/raw/master/img/20210127213456.png) ## Spring 整合 Kafka ### 引入依赖 ### 配置 Kafka - 配置 server、consumer ## 访问 Kafka - 生产者 ```java KafkaTemplate.send(topic, data) ``` - 消费者 ```java @KafkaListener(topics = {"test"}) public void handleMessage(ConsumerRecord record){ } ``` ## 发送系统通知的需求 ![](https://gitee.com/veal98/images/raw/master/img/20210128142133.png) 系统通知也使用私信那张表 `message`,不过 `from_id` 固定为 1,表示是系统发送出来的,注意在 user 表中存储这个系统用户 ### 封装事件对象 ```java /** * 封装事件(用于系统通知) */ public class Event { private String topic; // 事件类型 private int userId; // 事件由谁触发 private int entityType; // 实体类型 private int entityId; // 实体 id private int entityUserId; // 实体的作者 private Map data = new HashMap<>(); // 存储未来可能需要用到的数据 public String getTopic() { return topic; } public Event setTopic(String topic) { this.topic = topic; return this; } public int getUserId() { return userId; } public Event setUserId(int userId) { this.userId = userId; return this; } public int getEntityType() { return entityType; } public Event setEntityType(int entityType) { this.entityType = entityType; return this; } public int getEntityId() { return entityId; } public Event setEntityId(int entityId) { this.entityId = entityId; return this; } public int getEntityUserId() { return entityUserId; } public Event setEntityUserId(int entityUserId) { this.entityUserId = entityUserId; return this; } public Map getData() { return data; } public Event setData(String key, Object value) { this.data.put(key, value); return this; } } ``` 小窍门,将 set 方法设置实体类型的返回值,就可以链式调用 解释一下上面的 `userId` 和 `entityUserId` :比如张三给李四点赞了,那么 `userId` 就是张三的 id,系统通知是发送给李四的,即 `entityUserId` 就是李四的 id. ### 事件的生产者 ```java /** * 事件的生产者 */ @Component public class EventProducer { @Autowired private KafkaTemplate kafkaTemplate; /** * 处理事件 * @param event */ public void fireEvent(Event event) { // 将事件发布到指定的主题 kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event)); } } ``` ### 事件的消费者 ```java /** * 事件消费者 */ @Component public class EventConsumer implements CommunityConstant { private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class); @Autowired private MessageService messageService; @KafkaListener(topics = {TOPIC_COMMNET, TOPIC_LIKE, TOPIC_FOLLOW}) public void handleMessage(ConsumerRecord record) { if (record == null || record.value() == null) { logger.error("消息的内容为空"); return ; } Event event = JSONObject.parseObject(record.value().toString(), Event.class); if (event == null) { logger.error("消息格式错误"); return ; } // 发送系统通知 Message message = new Message(); message.setFromId(SYSTEM_USER_ID); message.setToId(event.getEntityUserId()); message.setConversationId(event.getTopic()); message.setCreateTime(new Date()); Map content = new HashMap<>(); content.put("userId", event.getUserId()); content.put("entityType", event.getEntityType()); content.put("entityId", event.getEntityId()); if (!event.getData().isEmpty()) { for (Map.Entry entry : event.getData().entrySet()) { content.put(entry.getKey(), entry.getValue()); } } message.setContent(JSONObject.toJSONString(content)); messageService.addMessage(message); } } ``` 存储在 content 中的内容是 JSON 格式的,方便我们后续的读取。 ### 修改表现层逻辑 - 点击评论主题的系统通知(某个用户评论了你的帖子/评论/回复),进入该条评论所属的帖子详情页 - 点击点赞主题的系统通过(某个用户点赞了你的帖子/评论/回复),进入该条点赞所属的帖子详情页 - 点击关注主题的系统通知(某个用户关注了你),进入该用户的个人主页 在表现层添加触发事件(发送系统通知)的逻辑,以 `CommnetController` 为例: ```java /** * 添加评论 * @param discussPostId * @param comment * @return */ @PostMapping("/add/{discussPostId}") public String addComment(@PathVariable("discussPostId") int discussPostId, Comment comment) { comment.setUserId(hostHolder.getUser().getId()); comment.setStatus(0); comment.setCreateTime(new Date()); commentService.addComment(comment); // 触发评论事件(系统通知) Event event = new Event() .setTopic(TOPIC_COMMNET) .setUserId(hostHolder.getUser().getId()) .setEntityType(comment.getEntityType()) .setEntityId(comment.getEntityId()) .setData("postId", discussPostId); if (comment.getEntityType() == ENTITY_TYPE_POST) { DiscussPost target = discussPostSerivce.findDiscussPostById(comment.getEntityId()); event.setEntityUserId(target.getUserId()); } else if (comment.getEntityType() == ENTITY_TYPE_COMMENT) { Comment target = commentService.findCommentById(comment.getEntityId()); event.setEntityUserId(target.getUserId()); } eventProducer.fireEvent(event); return "redirect:/discuss/detail/" + discussPostId; } ``` 点击评论的系统通知后,进入该条评论所属的帖子 注意,对 `LikeController` 我们进行了微小的重构: ```java /** * 点赞 * @param entityType * @param entityId * @param entityUserId 赞的帖子/评论的作者 id * @param postId 帖子的 id (点赞了哪个帖子,点赞的评论属于哪个帖子,点赞的回复属于哪个帖子) * @return */ @PostMapping("/like") @ResponseBody public String like(int entityType, int entityId, int entityUserId, int postId) { User user = hostHolder.getUser(); // 点赞 likeService.like(user.getId(), entityType, entityId, entityUserId); // 点赞数量 long likeCount = likeService.findEntityLikeCount(entityType, entityId); // 点赞状态 int likeStatus = likeService.findEntityLikeStatus(user.getId(), entityType, entityId); Map map = new HashMap<>(); map.put("likeCount", likeCount); map.put("likeStatus", likeStatus); // 触发点赞事件(系统通知) - 取消点赞不通知 if (likeStatus == 1) { Event event = new Event() .setTopic(TOPIC_LIKE) .setUserId(hostHolder.getUser().getId()) .setEntityType(entityType) .setEntityId(entityId) .setEntityUserId(entityUserId) .setData("postId", postId); eventProducer.fireEvent(event); } return CommunityUtil.getJSONString(0, null, map); } ``` 方法参数中添加了 帖子的 id,主要是为了:无论是对帖子的点赞,还是对某个帖子评论/回复的点赞,点击该条系统通知后都需要进入对应的帖子。所以此处我们需要传入帖子的 id,并非和 entityId 重复 ## 注意 注意,修改一下 `ServiceLogAspect` 中的逻辑(统一日志记录),加入一个 `ServletRequestAttributes` 非空的判断: ![](https://gitee.com/veal98/images/raw/master/img/20210128154901.png) 这个方法拦截了所有的 Service,而在本节之前我们所有对 Service 的访问都是通过 Controller 的,但是!现在我们多出了一个消费者,它调用了 `MessageService`,不是通过 Controller 去调用的,也就是说在消费者的调用中,是不存在 request 的,也即 `ServletRequestAttributes` 为空。