Echo/docs/230-发送系统通知.md
2021-01-28 22:09:02 +08:00

9.3 KiB
Raw Blame History

发送系统通知


点赞、关注、私信等系统都会发送通知,在流量巨大的社交网站中,这个系统通知的需求是非常庞大的,为保证系统性能,使用消息队列 Kafka 构建 TB 级异步消息系统。

掌握 Java 原生 API 阻塞队列

下载安装 KafkaKafka 自带 Zookeeper对其配置文件进行相应修改

1首先启动 kafka:

第一步:

第二步:开启另一个命令行

2然后开启另一个命令行创建主题

3生产者生产消息

4开启另一个命令行消费者读取消息

Spring 整合 Kafka

引入依赖

配置 Kafka

  • 配置 server、consumer

访问 Kafka

  • 生产者
KafkaTemplate.send(topic, data)
  • 消费者

    @KafkaListener(topics = {"test"})
    public void handleMessage(ConsumerRecord record){ }
    

发送系统通知的需求

系统通知也使用私信那张表 message,不过 from_id 固定为 1表示是系统发送出来的注意在 user 表中存储这个系统用户

封装事件对象

/**
 * 封装事件(用于系统通知)
 */
public class Event {

    private String topic; // 事件类型
    private int userId; // 事件由谁触发
    private int entityType; // 实体类型
    private int entityId; // 实体 id
    private int entityUserId; // 实体的作者
    private Map<String, Object> data = new HashMap<>(); // 存储未来可能需要用到的数据

    public String getTopic() {
        return topic;
    }

    public Event setTopic(String topic) {
        this.topic = topic;
        return this;
    }

    public int getUserId() {
        return userId;
    }

    public Event setUserId(int userId) {
        this.userId = userId;
        return this;
    }

    public int getEntityType() {
        return entityType;
    }

    public Event setEntityType(int entityType) {
        this.entityType = entityType;
        return this;
    }

    public int getEntityId() {
        return entityId;
    }

    public Event setEntityId(int entityId) {
        this.entityId = entityId;
        return this;
    }

    public int getEntityUserId() {
        return entityUserId;
    }

    public Event setEntityUserId(int entityUserId) {
        this.entityUserId = entityUserId;
        return this;
    }

    public Map<String, Object> getData() {
        return data;
    }

    public Event setData(String key, Object value) {
        this.data.put(key, value);
        return this;
    }
}

小窍门,将 set 方法设置实体类型的返回值,就可以链式调用

解释一下上面的 userIdentityUserId :比如张三给李四点赞了,那么 userId 就是张三的 id系统通知是发送给李四的entityUserId 就是李四的 id.

事件的生产者

/**
 * 事件的生产者
 */
@Component
public class EventProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 处理事件
     * @param event
     */
    public void fireEvent(Event event) {
        // 将事件发布到指定的主题
        kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
    }

}

事件的消费者

/**
 * 事件消费者
 */
@Component
public class EventConsumer implements CommunityConstant {

    private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);

    @Autowired
    private MessageService messageService;

    @KafkaListener(topics = {TOPIC_COMMNET, TOPIC_LIKE, TOPIC_FOLLOW})
    public void handleMessage(ConsumerRecord record) {
        if (record == null || record.value() == null) {
            logger.error("消息的内容为空");
            return ;
        }
        Event event = JSONObject.parseObject(record.value().toString(), Event.class);
        if (event == null) {
            logger.error("消息格式错误");
            return ;
        }

        // 发送系统通知
        Message message = new Message();
        message.setFromId(SYSTEM_USER_ID);
        message.setToId(event.getEntityUserId());
        message.setConversationId(event.getTopic());
        message.setCreateTime(new Date());

        Map<String, Object> content = new HashMap<>();
        content.put("userId", event.getUserId());
        content.put("entityType", event.getEntityType());
        content.put("entityId", event.getEntityId());
        if (!event.getData().isEmpty()) {
            for (Map.Entry<String, Object> entry : event.getData().entrySet()) {
                content.put(entry.getKey(), entry.getValue());
            }
        }
        message.setContent(JSONObject.toJSONString(content));

        messageService.addMessage(message);

    }

}

存储在 content 中的内容是 JSON 格式的,方便我们后续的读取。

修改表现层逻辑

  • 点击评论主题的系统通知(某个用户评论了你的帖子/评论/回复),进入该条评论所属的帖子详情页
  • 点击点赞主题的系统通过(某个用户点赞了你的帖子/评论/回复),进入该条点赞所属的帖子详情页
  • 点击关注主题的系统通知(某个用户关注了你),进入该用户的个人主页

在表现层添加触发事件(发送系统通知)的逻辑,以 CommnetController 为例:

/**
 * 添加评论
 * @param discussPostId
 * @param comment
 * @return
 */
@PostMapping("/add/{discussPostId}")
public String addComment(@PathVariable("discussPostId") int discussPostId, Comment comment) {
    comment.setUserId(hostHolder.getUser().getId());
    comment.setStatus(0);
    comment.setCreateTime(new Date());
    commentService.addComment(comment);

    // 触发评论事件(系统通知)
    Event event = new Event()
            .setTopic(TOPIC_COMMNET)
            .setUserId(hostHolder.getUser().getId())
            .setEntityType(comment.getEntityType())
            .setEntityId(comment.getEntityId())
            .setData("postId", discussPostId);
    if (comment.getEntityType() == ENTITY_TYPE_POST) {
        DiscussPost target = discussPostSerivce.findDiscussPostById(comment.getEntityId());
        event.setEntityUserId(target.getUserId());
    }
    else if (comment.getEntityType() == ENTITY_TYPE_COMMENT) {
        Comment target = commentService.findCommentById(comment.getEntityId());
        event.setEntityUserId(target.getUserId());
    }

    eventProducer.fireEvent(event);

    return "redirect:/discuss/detail/" + discussPostId;
}

点击评论的系统通知后,进入该条评论所属的帖子

注意,对 LikeController 我们进行了微小的重构:

/**
 * 点赞
 * @param entityType
 * @param entityId
 * @param entityUserId 赞的帖子/评论的作者 id
 * @param postId 帖子的 id (点赞了哪个帖子,点赞的评论属于哪个帖子,点赞的回复属于哪个帖子)
 * @return
 */
@PostMapping("/like")
@ResponseBody
public String like(int entityType, int entityId, int entityUserId, int postId) {
    User user = hostHolder.getUser();
    // 点赞
    likeService.like(user.getId(), entityType, entityId, entityUserId);
    // 点赞数量
    long likeCount = likeService.findEntityLikeCount(entityType, entityId);
    // 点赞状态
    int likeStatus = likeService.findEntityLikeStatus(user.getId(), entityType, entityId);

    Map<String, Object> map = new HashMap<>();
    map.put("likeCount", likeCount);
    map.put("likeStatus", likeStatus);

    // 触发点赞事件(系统通知) - 取消点赞不通知
    if (likeStatus == 1) {
        Event event = new Event()
                .setTopic(TOPIC_LIKE)
                .setUserId(hostHolder.getUser().getId())
                .setEntityType(entityType)
                .setEntityId(entityId)
                .setEntityUserId(entityUserId)
                .setData("postId", postId);
        eventProducer.fireEvent(event);
    }

    return CommunityUtil.getJSONString(0, null, map);
}

方法参数中添加了 帖子的 id主要是为了无论是对帖子的点赞还是对某个帖子评论/回复的点赞,点击该条系统通知后都需要进入对应的帖子。所以此处我们需要传入帖子的 id并非和 entityId 重复

注意

注意,修改一下 ServiceLogAspect 中的逻辑(统一日志记录),加入一个 ServletRequestAttributes 非空的判断:

这个方法拦截了所有的 Service而在本节之前我们所有对 Service 的访问都是通过 Controller 的,但是!现在我们多出了一个消费者,它调用了 MessageService,不是通过 Controller 去调用的,也就是说在消费者的调用中,是不存在 request 的,也即 ServletRequestAttributes 为空。