Pre Merge pull request !8 from qzx0712/master

This commit is contained in:
qzx0712 2023-10-12 14:21:48 +00:00 committed by Gitee
commit 3c389013d4
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
6 changed files with 256 additions and 25 deletions

14
pom.xml
View File

@ -149,6 +149,20 @@
<version>2.7.0</version>
</dependency>
<!--Canal 用于Mysql和ES的数据源同步-->
<!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.protocol -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.5</version>
</dependency>
</dependencies>
<build>

View File

@ -130,12 +130,13 @@ public class DiscussPostController implements CommunityConstant {
discussPostService.addDiscussPost(discussPost);
// 触发发帖事件通过消息队列将其存入 Elasticsearch 服务器
Event event = new Event()
.setTopic(TOPIC_PUBLISH)
.setUserId(user.getId())
.setEntityType(ENTITY_TYPE_POST)
.setEntityId(discussPost.getId());
eventProducer.fireEvent(event);
// 使用Canal进行数据源同步
// Event event = new Event()
// .setTopic(TOPIC_PUBLISH)
// .setUserId(user.getId())
// .setEntityType(ENTITY_TYPE_POST)
// .setEntityId(discussPost.getId());
// eventProducer.fireEvent(event);
// 计算帖子分数
String redisKey = RedisKeyUtil.getPostScoreKey();
@ -239,12 +240,13 @@ public class DiscussPostController implements CommunityConstant {
discussPostService.updateType(id, type);
// 触发发帖事件通过消息队列将其存入 Elasticsearch 服务器
Event event = new Event()
.setTopic(TOPIC_PUBLISH)
.setUserId(hostHolder.getUser().getId())
.setEntityType(ENTITY_TYPE_POST)
.setEntityId(id);
eventProducer.fireEvent(event);
// 使用Canal进行数据源同步
// Event event = new Event()
// .setTopic(TOPIC_PUBLISH)
// .setUserId(hostHolder.getUser().getId())
// .setEntityType(ENTITY_TYPE_POST)
// .setEntityId(id);
// eventProducer.fireEvent(event);
return CommunityUtil.getJSONString(0);
}
@ -261,12 +263,13 @@ public class DiscussPostController implements CommunityConstant {
discussPostService.updateStatus(id, 1);
// 触发发帖事件通过消息队列将其存入 Elasticsearch 服务器
Event event = new Event()
.setTopic(TOPIC_PUBLISH)
.setUserId(hostHolder.getUser().getId())
.setEntityType(ENTITY_TYPE_POST)
.setEntityId(id);
eventProducer.fireEvent(event);
// 使用Canal进行数据源同步
// Event event = new Event()
// .setTopic(TOPIC_PUBLISH)
// .setUserId(hostHolder.getUser().getId())
// .setEntityType(ENTITY_TYPE_POST)
// .setEntityId(id);
// eventProducer.fireEvent(event);
// 计算帖子分数
String redisKey = RedisKeyUtil.getPostScoreKey();
@ -287,12 +290,13 @@ public class DiscussPostController implements CommunityConstant {
discussPostService.updateStatus(id, 2);
// 触发删帖事件通过消息队列更新 Elasticsearch 服务器
Event event = new Event()
.setTopic(TOPIC_DELETE)
.setUserId(hostHolder.getUser().getId())
.setEntityType(ENTITY_TYPE_POST)
.setEntityId(id);
eventProducer.fireEvent(event);
// 使用Canal进行数据源同步
// Event event = new Event()
// .setTopic(TOPIC_DELETE)
// .setUserId(hostHolder.getUser().getId())
// .setEntityType(ENTITY_TYPE_POST)
// .setEntityId(id);
// eventProducer.fireEvent(event);
return CommunityUtil.getJSONString(0);
}

View File

@ -68,4 +68,16 @@ public class Event {
this.data.put(key, value);
return this;
}
@Override
public String toString() {
return "Event{" +
"topic='" + topic + '\'' +
", userId=" + userId +
", entityType=" + entityType +
", entityId=" + entityId +
", entityUserId=" + entityUserId +
", data=" + data +
'}';
}
}

View File

@ -0,0 +1,183 @@
package com.greate.community.sync;
import java.net.InetSocketAddress;
import java.util.List;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.greate.community.entity.Event;
import com.greate.community.event.EventProducer;
import com.greate.community.util.CommunityConstant;
/**
* Mysql与ES的帖子部分的数据源同步器
* 引入Canal实现Mysql与ES的数据源同步
*/
@Component
public class DiscussPostDataSynchronizer implements InitializingBean, CommunityConstant {
private static final Logger logger = LoggerFactory.getLogger(DiscussPostDataSynchronizer.class);
@Value("${canal-monitor.host:127.0.0.1}")
private String canalMonitorHost;
@Value("${canal-monitor.port:11111}")
private int canalMonitorPort;
@Value("${canal-monitor.table-name:discuss_post}")
private String canalMonitorTableName;
@Value("${canal-monitor.db-name:echo}")
private String canalMonitorDBName;
@Value("${canal-monitor.destination}")
private String canalMonitorDestination;
@Value("${canal-monitor.username:}")
private String canalMonitorUsername;
@Value("${canal-monitor.password:}")
private String canalMonitorPassword;
@Resource
private EventProducer eventProducer;
private final static int BATCH_SIZE = 10000;
@Override
public void afterPropertiesSet() throws Exception {
new Thread(() -> {
while (true) {
CanalConnector connector =
CanalConnectors.newSingleConnector(new InetSocketAddress(canalMonitorHost, canalMonitorPort),
canalMonitorDestination,
canalMonitorUsername, canalMonitorPassword);
try {
//打开连接
connector.connect();
logger.info("Canal connect success! Listening mysql table:" + canalMonitorTableName);
connector.subscribe(canalMonitorDBName+"."+canalMonitorTableName);
//回滚到未进行ack的地方下次fetch的时候可以从最后一个没有ack的地方开始拿
connector.rollback();
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(BATCH_SIZE);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
} else {
handleDataChange(message.getEntries());
}
// 提交确认
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
logger.error("Connect wrong! reconnecting...");
} finally {
connector.disconnect();
//防止频繁访问数据库链接: 线程睡眠 10秒
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
/**
* 打印canal server解析binlog获得的实体类信息
*/
private void handleDataChange(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
//RowChange对象包含了一行数据变化的所有特征
CanalEntry.RowChange rowChage;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
logger.error("Canal parse row has an error, data:{}, exception:{}", entry, e.toString());
throw new RuntimeException("Canal parse has an error , data:" + entry, e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
logger.info("Canal catching updating binlog:【{}】", entry.getHeader().getTableName());
switch (eventType) {
/**
* 删除操作
*/
case DELETE:
deleteDiscussPost(rowChage, entry);
break;
/**
* 添加与更新操作
*/
case INSERT:
case UPDATE:
addOrUpdateDiscussPost(rowChage, entry);
break;
default:
break;
}
}
}
private void deleteDiscussPost(CanalEntry.RowChange rowChange, CanalEntry.Entry entry) {
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
rowDatasList.forEach(each -> {
Event event = new Event()
.setTopic(TOPIC_DELETE)
.setEntityType(ENTITY_TYPE_POST);
List<CanalEntry.Column> beforeColumnsList = each.getBeforeColumnsList();
constructEventAndSend(event, beforeColumnsList);
});
}
private void addOrUpdateDiscussPost(CanalEntry.RowChange rowChange, CanalEntry.Entry entry) {
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
rowDatasList.forEach(each -> {
Event event = new Event()
.setTopic(TOPIC_PUBLISH)
.setEntityType(ENTITY_TYPE_POST);
List<CanalEntry.Column> afterColumnsList = each.getAfterColumnsList();
constructEventAndSend(event, afterColumnsList);
});
}
private void constructEventAndSend(Event event, List<CanalEntry.Column> afterColumnsList) {
afterColumnsList.forEach(eachColumn -> {
String columnName = eachColumn.getName();
String columnValue = eachColumn.getValue();
if ("user_id".equals(columnName)) {
event.setUserId(Integer.parseInt(columnValue));
} else if ("id".equals(columnName)) {
event.setEntityId(Integer.parseInt(columnValue));
}
});
logger.info("send event message:{}", event);
eventProducer.fireEvent(event);
}
}

View File

@ -70,4 +70,13 @@ qiniu.bucket.header.url = http://qnvxyvq1p.hd-bkt.clouddn.com
# Caffeine
caffeine.posts.max-size = 15
caffeine.posts.expire-seconds = 180
caffeine.posts.expire-seconds = 180
# Canal
canal-monitor.host = 127.0.0.1
canal-monitor.port = 11111
canal-monitor.table-name = discuss_post
canal-monitor.db-name = echo
canal-monitor.username = canal
canal-monitor.password = canal
canal-monitor.destination = example

View File

@ -68,4 +68,13 @@ qiniu.bucket.header.url = http://qnvxyvq1p.hd-bkt.clouddn.com
caffeine.posts.max-size = 15
caffeine.posts.expire-seconds = 180
# Canal
canal-monitor.host = 127.0.0.1
canal-monitor.port = 11111
canal-monitor.table-name = discuss_post
canal-monitor.db-name = echo
canal-monitor.username = canal
canal-monitor.password = canal
canal-monitor.destination = example