From ba80910b0dce1a935b640a1b091006b36c22c8e8 Mon Sep 17 00:00:00 2001 From: ronger Date: Thu, 8 Dec 2022 10:01:27 +0800 Subject: [PATCH] =?UTF-8?q?:art:=20=E6=8C=87=E5=AE=9A=20TaskExecutor=20Bea?= =?UTF-8?q?n=20=E5=90=8D=E7=A7=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../forest/config/TaskExecutorConfig.java | 49 +++++++++++++++ .../VisitableThreadPoolTaskExecutor.java | 63 +++++++++++++++++++ .../rymcu/forest/handler/AccountHandler.java | 2 +- .../rymcu/forest/handler/ArticleHandler.java | 4 +- .../rymcu/forest/handler/CommentHandler.java | 2 +- 5 files changed, 116 insertions(+), 4 deletions(-) create mode 100644 src/main/java/com/rymcu/forest/config/TaskExecutorConfig.java create mode 100644 src/main/java/com/rymcu/forest/config/VisitableThreadPoolTaskExecutor.java diff --git a/src/main/java/com/rymcu/forest/config/TaskExecutorConfig.java b/src/main/java/com/rymcu/forest/config/TaskExecutorConfig.java new file mode 100644 index 0000000..0a69501 --- /dev/null +++ b/src/main/java/com/rymcu/forest/config/TaskExecutorConfig.java @@ -0,0 +1,49 @@ +package com.rymcu.forest.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Created on 2022/12/8 9:42. + * + * @author ronger + * @email ronger-x@outlook.com + * @desc : com.rymcu.forest.config + */ +@Configuration +public class TaskExecutorConfig { + /** + * Set the ThreadPoolExecutor's core pool size. + */ + private final static int CORE_POOL_SIZE = 10; + /** + * Set the ThreadPoolExecutor's maximum pool size. + */ + private final static int MAX_POOL_SIZE = 200; + /** + * Set the capacity for the ThreadPoolExecutor's BlockingQueue. + */ + private final static int QUEUE_CAPACITY = 10; + + /** + * 自定义异步线程池 + * + * @return + */ + @Bean + public AsyncTaskExecutor taskExecutor() { + ThreadPoolTaskExecutor executor = new VisitableThreadPoolTaskExecutor(); + executor.setCorePoolSize(CORE_POOL_SIZE); + executor.setMaxPoolSize(MAX_POOL_SIZE); + executor.setQueueCapacity(QUEUE_CAPACITY); + executor.setThreadNamePrefix("rymcu-Executor"); + // 使用预定义的异常处理类 + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.initialize(); + return executor; + } +} diff --git a/src/main/java/com/rymcu/forest/config/VisitableThreadPoolTaskExecutor.java b/src/main/java/com/rymcu/forest/config/VisitableThreadPoolTaskExecutor.java new file mode 100644 index 0000000..8e47aea --- /dev/null +++ b/src/main/java/com/rymcu/forest/config/VisitableThreadPoolTaskExecutor.java @@ -0,0 +1,63 @@ +package com.rymcu.forest.config; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.util.concurrent.ListenableFuture; + +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Created on 2022/12/8 9:52. + * + * @author ronger + * @email ronger-x@outlook.com + * @desc : com.rymcu.forest.config + */ +public class VisitableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { + private static final Logger logger = LoggerFactory.getLogger(VisitableThreadPoolTaskExecutor.class); + + private void showThreadPoolInfo(String prefix){ + ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor(); + + logger.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]", + this.getThreadNamePrefix(), + prefix, + threadPoolExecutor.getTaskCount(), + threadPoolExecutor.getCompletedTaskCount(), + threadPoolExecutor.getActiveCount(), + threadPoolExecutor.getQueue().size()); + } + + @Override + public void execute(Runnable task) { + showThreadPoolInfo("1. do execute"); + super.execute(task); + } + + @Override + public Future submit(Runnable task) { + showThreadPoolInfo("1. do submit"); + return super.submit(task); + } + + @Override + public Future submit(Callable task) { + showThreadPoolInfo("2. do submit"); + return super.submit(task); + } + + @Override + public ListenableFuture submitListenable(Runnable task) { + showThreadPoolInfo("1. do submitListenable"); + return super.submitListenable(task); + } + + @Override + public ListenableFuture submitListenable(Callable task) { + showThreadPoolInfo("2. do submitListenable"); + return super.submitListenable(task); + } +} diff --git a/src/main/java/com/rymcu/forest/handler/AccountHandler.java b/src/main/java/com/rymcu/forest/handler/AccountHandler.java index c6fb9e7..8f99204 100644 --- a/src/main/java/com/rymcu/forest/handler/AccountHandler.java +++ b/src/main/java/com/rymcu/forest/handler/AccountHandler.java @@ -23,7 +23,7 @@ public class AccountHandler { @Resource private UserMapper userMapper; - @Async("accountThreadPool") + @Async("taskExecutor") @EventListener public void processAccountLastOnlineTimeEvent(AccountEvent accountEvent) { userMapper.updateLastOnlineTimeByAccount(accountEvent.getAccount()); diff --git a/src/main/java/com/rymcu/forest/handler/ArticleHandler.java b/src/main/java/com/rymcu/forest/handler/ArticleHandler.java index ae01ceb..7588fb4 100644 --- a/src/main/java/com/rymcu/forest/handler/ArticleHandler.java +++ b/src/main/java/com/rymcu/forest/handler/ArticleHandler.java @@ -26,7 +26,7 @@ public class ArticleHandler { private LuceneService luceneService; @EventListener - @Async("articlePostThreadPool") + @Async("taskExecutor") public void processArticlePostEvent(ArticleEvent articleEvent) throws InterruptedException { Thread.sleep(1000); log.info(String.format("执行文章发布相关事件:[%s]", JSON.toJSONString(articleEvent))); @@ -56,7 +56,7 @@ public class ArticleHandler { } @EventListener - @Async("articleDeleteThreadPool") + @Async("taskExecutor") public void processArticleDeleteEvent(ArticleDeleteEvent articleDeleteEvent) throws InterruptedException { Thread.sleep(1000); log.info(String.format("执行文章删除相关事件:[%s]", JSON.toJSONString(articleDeleteEvent))); diff --git a/src/main/java/com/rymcu/forest/handler/CommentHandler.java b/src/main/java/com/rymcu/forest/handler/CommentHandler.java index 74f2688..a558dd1 100644 --- a/src/main/java/com/rymcu/forest/handler/CommentHandler.java +++ b/src/main/java/com/rymcu/forest/handler/CommentHandler.java @@ -29,7 +29,7 @@ public class CommentHandler { @Resource private CommentMapper commentMapper; - @Async("commentThreadPool") + @Async("taskExecutor") @EventListener public void processCommentCreatedEvent(CommentEvent commentEvent) throws InterruptedException { log.info(String.format("开始执行评论发布事件:[%s]", JSON.toJSONString(commentEvent)));