diff --git a/src/main/java/com/rymcu/forest/config/TaskExecutorConfig.java b/src/main/java/com/rymcu/forest/config/TaskExecutorConfig.java new file mode 100644 index 0000000..0a69501 --- /dev/null +++ b/src/main/java/com/rymcu/forest/config/TaskExecutorConfig.java @@ -0,0 +1,49 @@ +package com.rymcu.forest.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Created on 2022/12/8 9:42. + * + * @author ronger + * @email ronger-x@outlook.com + * @desc : com.rymcu.forest.config + */ +@Configuration +public class TaskExecutorConfig { + /** + * Set the ThreadPoolExecutor's core pool size. + */ + private final static int CORE_POOL_SIZE = 10; + /** + * Set the ThreadPoolExecutor's maximum pool size. + */ + private final static int MAX_POOL_SIZE = 200; + /** + * Set the capacity for the ThreadPoolExecutor's BlockingQueue. + */ + private final static int QUEUE_CAPACITY = 10; + + /** + * 自定义异步线程池 + * + * @return + */ + @Bean + public AsyncTaskExecutor taskExecutor() { + ThreadPoolTaskExecutor executor = new VisitableThreadPoolTaskExecutor(); + executor.setCorePoolSize(CORE_POOL_SIZE); + executor.setMaxPoolSize(MAX_POOL_SIZE); + executor.setQueueCapacity(QUEUE_CAPACITY); + executor.setThreadNamePrefix("rymcu-Executor"); + // 使用预定义的异常处理类 + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.initialize(); + return executor; + } +} diff --git a/src/main/java/com/rymcu/forest/config/VisitableThreadPoolTaskExecutor.java b/src/main/java/com/rymcu/forest/config/VisitableThreadPoolTaskExecutor.java new file mode 100644 index 0000000..8e47aea --- /dev/null +++ b/src/main/java/com/rymcu/forest/config/VisitableThreadPoolTaskExecutor.java @@ -0,0 +1,63 @@ +package com.rymcu.forest.config; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.util.concurrent.ListenableFuture; + +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Created on 2022/12/8 9:52. + * + * @author ronger + * @email ronger-x@outlook.com + * @desc : com.rymcu.forest.config + */ +public class VisitableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { + private static final Logger logger = LoggerFactory.getLogger(VisitableThreadPoolTaskExecutor.class); + + private void showThreadPoolInfo(String prefix){ + ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor(); + + logger.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]", + this.getThreadNamePrefix(), + prefix, + threadPoolExecutor.getTaskCount(), + threadPoolExecutor.getCompletedTaskCount(), + threadPoolExecutor.getActiveCount(), + threadPoolExecutor.getQueue().size()); + } + + @Override + public void execute(Runnable task) { + showThreadPoolInfo("1. do execute"); + super.execute(task); + } + + @Override + public Future submit(Runnable task) { + showThreadPoolInfo("1. do submit"); + return super.submit(task); + } + + @Override + public Future submit(Callable task) { + showThreadPoolInfo("2. do submit"); + return super.submit(task); + } + + @Override + public ListenableFuture submitListenable(Runnable task) { + showThreadPoolInfo("1. do submitListenable"); + return super.submitListenable(task); + } + + @Override + public ListenableFuture submitListenable(Callable task) { + showThreadPoolInfo("2. do submitListenable"); + return super.submitListenable(task); + } +} diff --git a/src/main/java/com/rymcu/forest/handler/AccountHandler.java b/src/main/java/com/rymcu/forest/handler/AccountHandler.java index c6fb9e7..8f99204 100644 --- a/src/main/java/com/rymcu/forest/handler/AccountHandler.java +++ b/src/main/java/com/rymcu/forest/handler/AccountHandler.java @@ -23,7 +23,7 @@ public class AccountHandler { @Resource private UserMapper userMapper; - @Async("accountThreadPool") + @Async("taskExecutor") @EventListener public void processAccountLastOnlineTimeEvent(AccountEvent accountEvent) { userMapper.updateLastOnlineTimeByAccount(accountEvent.getAccount()); diff --git a/src/main/java/com/rymcu/forest/handler/ArticleHandler.java b/src/main/java/com/rymcu/forest/handler/ArticleHandler.java index ae01ceb..7588fb4 100644 --- a/src/main/java/com/rymcu/forest/handler/ArticleHandler.java +++ b/src/main/java/com/rymcu/forest/handler/ArticleHandler.java @@ -26,7 +26,7 @@ public class ArticleHandler { private LuceneService luceneService; @EventListener - @Async("articlePostThreadPool") + @Async("taskExecutor") public void processArticlePostEvent(ArticleEvent articleEvent) throws InterruptedException { Thread.sleep(1000); log.info(String.format("执行文章发布相关事件:[%s]", JSON.toJSONString(articleEvent))); @@ -56,7 +56,7 @@ public class ArticleHandler { } @EventListener - @Async("articleDeleteThreadPool") + @Async("taskExecutor") public void processArticleDeleteEvent(ArticleDeleteEvent articleDeleteEvent) throws InterruptedException { Thread.sleep(1000); log.info(String.format("执行文章删除相关事件:[%s]", JSON.toJSONString(articleDeleteEvent))); diff --git a/src/main/java/com/rymcu/forest/handler/CommentHandler.java b/src/main/java/com/rymcu/forest/handler/CommentHandler.java index 74f2688..a558dd1 100644 --- a/src/main/java/com/rymcu/forest/handler/CommentHandler.java +++ b/src/main/java/com/rymcu/forest/handler/CommentHandler.java @@ -29,7 +29,7 @@ public class CommentHandler { @Resource private CommentMapper commentMapper; - @Async("commentThreadPool") + @Async("taskExecutor") @EventListener public void processCommentCreatedEvent(CommentEvent commentEvent) throws InterruptedException { log.info(String.format("开始执行评论发布事件:[%s]", JSON.toJSONString(commentEvent)));