diff --git a/pom.xml b/pom.xml index 9765fdc9..69ac50a9 100644 --- a/pom.xml +++ b/pom.xml @@ -149,6 +149,20 @@ 2.7.0 + + + + com.alibaba.otter + canal.client + 1.1.5 + + + + com.alibaba.otter + canal.protocol + 1.1.5 + + diff --git a/src/main/java/com/greate/community/controller/DiscussPostController.java b/src/main/java/com/greate/community/controller/DiscussPostController.java index b21e39ff..8c170d01 100644 --- a/src/main/java/com/greate/community/controller/DiscussPostController.java +++ b/src/main/java/com/greate/community/controller/DiscussPostController.java @@ -130,12 +130,13 @@ public class DiscussPostController implements CommunityConstant { discussPostService.addDiscussPost(discussPost); // 触发发帖事件,通过消息队列将其存入 Elasticsearch 服务器 - Event event = new Event() - .setTopic(TOPIC_PUBLISH) - .setUserId(user.getId()) - .setEntityType(ENTITY_TYPE_POST) - .setEntityId(discussPost.getId()); - eventProducer.fireEvent(event); + // 使用Canal进行数据源同步 +// Event event = new Event() +// .setTopic(TOPIC_PUBLISH) +// .setUserId(user.getId()) +// .setEntityType(ENTITY_TYPE_POST) +// .setEntityId(discussPost.getId()); +// eventProducer.fireEvent(event); // 计算帖子分数 String redisKey = RedisKeyUtil.getPostScoreKey(); @@ -239,12 +240,13 @@ public class DiscussPostController implements CommunityConstant { discussPostService.updateType(id, type); // 触发发帖事件,通过消息队列将其存入 Elasticsearch 服务器 - Event event = new Event() - .setTopic(TOPIC_PUBLISH) - .setUserId(hostHolder.getUser().getId()) - .setEntityType(ENTITY_TYPE_POST) - .setEntityId(id); - eventProducer.fireEvent(event); + // 使用Canal进行数据源同步 +// Event event = new Event() +// .setTopic(TOPIC_PUBLISH) +// .setUserId(hostHolder.getUser().getId()) +// .setEntityType(ENTITY_TYPE_POST) +// .setEntityId(id); +// eventProducer.fireEvent(event); return CommunityUtil.getJSONString(0); } @@ -261,12 +263,13 @@ public class DiscussPostController implements CommunityConstant { discussPostService.updateStatus(id, 1); // 触发发帖事件,通过消息队列将其存入 Elasticsearch 服务器 - Event event = new Event() - .setTopic(TOPIC_PUBLISH) - .setUserId(hostHolder.getUser().getId()) - .setEntityType(ENTITY_TYPE_POST) - .setEntityId(id); - eventProducer.fireEvent(event); + // 使用Canal进行数据源同步 +// Event event = new Event() +// .setTopic(TOPIC_PUBLISH) +// .setUserId(hostHolder.getUser().getId()) +// .setEntityType(ENTITY_TYPE_POST) +// .setEntityId(id); +// eventProducer.fireEvent(event); // 计算帖子分数 String redisKey = RedisKeyUtil.getPostScoreKey(); @@ -287,12 +290,13 @@ public class DiscussPostController implements CommunityConstant { discussPostService.updateStatus(id, 2); // 触发删帖事件,通过消息队列更新 Elasticsearch 服务器 - Event event = new Event() - .setTopic(TOPIC_DELETE) - .setUserId(hostHolder.getUser().getId()) - .setEntityType(ENTITY_TYPE_POST) - .setEntityId(id); - eventProducer.fireEvent(event); + // 使用Canal进行数据源同步 +// Event event = new Event() +// .setTopic(TOPIC_DELETE) +// .setUserId(hostHolder.getUser().getId()) +// .setEntityType(ENTITY_TYPE_POST) +// .setEntityId(id); +// eventProducer.fireEvent(event); return CommunityUtil.getJSONString(0); } diff --git a/src/main/java/com/greate/community/entity/Event.java b/src/main/java/com/greate/community/entity/Event.java index 656147f4..f01b1f3e 100644 --- a/src/main/java/com/greate/community/entity/Event.java +++ b/src/main/java/com/greate/community/entity/Event.java @@ -68,4 +68,16 @@ public class Event { this.data.put(key, value); return this; } + + @Override + public String toString() { + return "Event{" + + "topic='" + topic + '\'' + + ", userId=" + userId + + ", entityType=" + entityType + + ", entityId=" + entityId + + ", entityUserId=" + entityUserId + + ", data=" + data + + '}'; + } } diff --git a/src/main/java/com/greate/community/sync/DiscussPostDataSynchronizer.java b/src/main/java/com/greate/community/sync/DiscussPostDataSynchronizer.java new file mode 100644 index 00000000..c2cba57f --- /dev/null +++ b/src/main/java/com/greate/community/sync/DiscussPostDataSynchronizer.java @@ -0,0 +1,183 @@ +package com.greate.community.sync; + +import java.net.InetSocketAddress; +import java.util.List; + +import javax.annotation.Resource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.alibaba.otter.canal.client.CanalConnectors; +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.alibaba.otter.canal.protocol.Message; +import com.greate.community.entity.Event; +import com.greate.community.event.EventProducer; +import com.greate.community.util.CommunityConstant; + +/** + * Mysql与ES的帖子部分的数据源同步器 + * 引入Canal实现Mysql与ES的数据源同步 + */ + +@Component +public class DiscussPostDataSynchronizer implements InitializingBean, CommunityConstant { + + private static final Logger logger = LoggerFactory.getLogger(DiscussPostDataSynchronizer.class); + + @Value("${canal-monitor.host:127.0.0.1}") + private String canalMonitorHost; + + @Value("${canal-monitor.port:11111}") + private int canalMonitorPort; + + @Value("${canal-monitor.table-name:discuss_post}") + private String canalMonitorTableName; + + @Value("${canal-monitor.db-name:echo}") + private String canalMonitorDBName; + + @Value("${canal-monitor.destination}") + private String canalMonitorDestination; + + @Value("${canal-monitor.username:}") + private String canalMonitorUsername; + + @Value("${canal-monitor.password:}") + private String canalMonitorPassword; + + @Resource + private EventProducer eventProducer; + + private final static int BATCH_SIZE = 10000; + + + @Override + public void afterPropertiesSet() throws Exception { + new Thread(() -> { + while (true) { + CanalConnector connector = + CanalConnectors.newSingleConnector(new InetSocketAddress(canalMonitorHost, canalMonitorPort), + canalMonitorDestination, + canalMonitorUsername, canalMonitorPassword); + try { + //打开连接 + connector.connect(); + logger.info("Canal connect success! Listening mysql table:" + canalMonitorTableName); + connector.subscribe(canalMonitorDBName+"."+canalMonitorTableName); + //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿 + connector.rollback(); + while (true) { + // 获取指定数量的数据 + Message message = connector.getWithoutAck(BATCH_SIZE); + long batchId = message.getId(); + int size = message.getEntries().size(); + if (batchId == -1 || size == 0) { + } else { + handleDataChange(message.getEntries()); + } + // 提交确认 + connector.ack(batchId); + } + } catch (Exception e) { + e.printStackTrace(); + logger.error("Connect wrong! reconnecting..."); + } finally { + connector.disconnect(); + //防止频繁访问数据库链接: 线程睡眠 10秒 + try { + Thread.sleep(10 * 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + }).start(); + } + + + /** + * 打印canal server解析binlog获得的实体类信息 + */ + private void handleDataChange(List entrys) { + for (CanalEntry.Entry entry : entrys) { + if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || + entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { + continue; + } + //RowChange对象,包含了一行数据变化的所有特征 + CanalEntry.RowChange rowChage; + try { + rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); + } catch (Exception e) { + logger.error("Canal parse row has an error, data:{}, exception:{}", entry, e.toString()); + throw new RuntimeException("Canal parse has an error , data:" + entry, e); + } + CanalEntry.EventType eventType = rowChage.getEventType(); + logger.info("Canal catching updating binlog:【{}】", entry.getHeader().getTableName()); + switch (eventType) { + /** + * 删除操作 + */ + case DELETE: + + deleteDiscussPost(rowChage, entry); + break; + /** + * 添加与更新操作 + */ + case INSERT: + case UPDATE: + addOrUpdateDiscussPost(rowChage, entry); + break; + default: + break; + } + + } + } + + private void deleteDiscussPost(CanalEntry.RowChange rowChange, CanalEntry.Entry entry) { + List rowDatasList = rowChange.getRowDatasList(); + rowDatasList.forEach(each -> { + Event event = new Event() + .setTopic(TOPIC_DELETE) + .setEntityType(ENTITY_TYPE_POST); + List beforeColumnsList = each.getBeforeColumnsList(); + constructEventAndSend(event, beforeColumnsList); + }); + + + } + + private void addOrUpdateDiscussPost(CanalEntry.RowChange rowChange, CanalEntry.Entry entry) { + List rowDatasList = rowChange.getRowDatasList(); + rowDatasList.forEach(each -> { + Event event = new Event() + .setTopic(TOPIC_PUBLISH) + .setEntityType(ENTITY_TYPE_POST); + List afterColumnsList = each.getAfterColumnsList(); + constructEventAndSend(event, afterColumnsList); + }); + } + + private void constructEventAndSend(Event event, List afterColumnsList) { + afterColumnsList.forEach(eachColumn -> { + String columnName = eachColumn.getName(); + String columnValue = eachColumn.getValue(); + if ("user_id".equals(columnName)) { + event.setUserId(Integer.parseInt(columnValue)); + } else if ("id".equals(columnName)) { + event.setEntityId(Integer.parseInt(columnValue)); + } + }); + logger.info("send event message:{}", event); + eventProducer.fireEvent(event); + } +} + diff --git a/src/main/resources/application-develop.properties b/src/main/resources/application-develop.properties index 2a2a77de..ea74f173 100644 --- a/src/main/resources/application-develop.properties +++ b/src/main/resources/application-develop.properties @@ -70,4 +70,13 @@ qiniu.bucket.header.url = http://qnvxyvq1p.hd-bkt.clouddn.com # Caffeine caffeine.posts.max-size = 15 -caffeine.posts.expire-seconds = 180 \ No newline at end of file +caffeine.posts.expire-seconds = 180 + +# Canal +canal-monitor.host = 127.0.0.1 +canal-monitor.port = 11111 +canal-monitor.table-name = discuss_post +canal-monitor.db-name = echo +canal-monitor.username = canal +canal-monitor.password = canal +canal-monitor.destination = example \ No newline at end of file diff --git a/src/main/resources/application-produce.properties b/src/main/resources/application-produce.properties index 8929aee2..2febe664 100644 --- a/src/main/resources/application-produce.properties +++ b/src/main/resources/application-produce.properties @@ -68,4 +68,13 @@ qiniu.bucket.header.url = http://qnvxyvq1p.hd-bkt.clouddn.com caffeine.posts.max-size = 15 caffeine.posts.expire-seconds = 180 +# Canal +canal-monitor.host = 127.0.0.1 +canal-monitor.port = 11111 +canal-monitor.table-name = discuss_post +canal-monitor.db-name = echo +canal-monitor.username = canal +canal-monitor.password = canal +canal-monitor.destination = example +