
多线程
spring中使用线程池
java
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@Configuration
public class ExecutorServiceConfig {
private static final Integer CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
@Bean
@Qualifier("buyerPerformanceThreadPool")
public ExecutorService buyerPerformanceThreadPool() {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("buyerPerformanceThreadPool-thread-%d").build();
return new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE * 2, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1024 * 2), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t != null) {
log.error("预警|buyerPerformanceThreadPool 线程运行异常", t);
}
}
};
}
}
多任务等待执行
awaitTermination实现
shutdown():停止接收新任务,原来的任务继续执行
shutdownNow():停止接收新任务,原来的任务停止执行
awaitTermination(long timeOut, TimeUnit unit):当前线程阻塞
当前线程阻塞,直到:
- 等所有已提交的任务(包括正在跑的和队列中等待的)执行完;
- 或者 等超时时间到了(timeout 和 TimeUnit设定的时间);
- 或者 线程被中断,抛出InterruptedException
java
/**
* awaitTermination 必须搭配 shutdown使用。如果没有shutdown,则会一直阻塞在awaitTermination方法上。
* 如果线程池的方法都执行完了,则会返回true,否则返回false
* @throws Exception
*/
@Test
public void test3() throws Exception {
// 创建一个线程池对象
ExecutorService executorService = Executors.newFixedThreadPool(10);
long a = System.currentTimeMillis();
// 循环提交100个 SQL 查询任务给线程池
for (int i = 1; i <= 10; i++) {
int finalI = i;
executorService.execute(() -> {
int randomNumber = (int) (Math.random() * 100);
System.out.println(finalI + "--开始" + "等待:" + randomNumber);
try {
Thread.sleep(randomNumber);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(finalI + "--执行完了");
});
}
// 等待所有任务执行完成
executorService.shutdown();
boolean boo = executorService.awaitTermination(40, TimeUnit.MILLISECONDS);
long b = System.currentTimeMillis();
System.out.println( "总耗时:"+ (b-a) +",线程池是否完成:" + boo);
}
CountDownLatch实现
在主线程中,调用 CountDownLatch 的 await() 方法在所有 SQL 查询任务执行完成前阻塞线程,直到计数器减为 0 时,所有任务都已经执行完成可以继续往下执行,此时可以对查询结果进行汇总处理。
java
@Test
public void test3() throws Exception {
int TASK_COUNT = 10;
// 创建 CountDownLatch 对象
CountDownLatch latch = new CountDownLatch(TASK_COUNT);
long a = System.currentTimeMillis();
// 创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);
// 提交 SQL 查询任务
for (int i = 1; i <= TASK_COUNT; i++) {
int finalI = i;
executorService.execute(() -> {
int randomNumber = (int) (Math.random() * 100);
System.out.println(finalI + "--开始" + "等待:" + randomNumber);
try {
Thread.sleep(randomNumber);
} catch (InterruptedException e) {
e.printStackTrace();
}
// CountDownLatch 计数器减 1
latch.countDown();
System.out.println(finalI + "--执行完了");
});
}
// 等待所有任务执行完成
latch.await();
// 关闭线程池
executorService.shutdown();
long b = System.currentTimeMillis();
System.out.println( "总耗时:"+ (b-a));
}
信号量-Semaphore
可以用到限流的场景
java
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
* @Description
* @auther 公众号:干货食堂
* @create 2022/6/4 20:52
* 信号量:主要可以控制最大的并发数量。首先提前设置好最大的并发这里是3 然后由线程首先获取到许可。然后进行业务操作。完成后释放
*/
public class SemaphoreTest {
public static void main(String[] args) {
ExecutorService pool = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3,true);
for (int i = 0; i < 10; i++) {
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();//获取信号灯许可
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Thread "+Thread.currentThread().getName()+" 进入" +"当前系统的并发数是:"+(3-semaphore.availablePermits()));
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Thread "+Thread.currentThread().getName()+" 即将离开");
semaphore.release();//释放信号灯
System.out.println("Thread "+Thread.currentThread().getName()+" 已经离开,当前系统的并发数是:"+(3-semaphore.availablePermits()));
}
};
pool.execute(runnable);
}
}
}
结果:
text
Thread pool-1-thread-1 进入当前系统的并发数是:1
Thread pool-1-thread-7 进入当前系统的并发数是:2
Thread pool-1-thread-4 进入当前系统的并发数是:3
Thread pool-1-thread-7 即将离开
Thread pool-1-thread-7 已经离开,当前系统的并发数是:2
Thread pool-1-thread-3 进入当前系统的并发数是:3
Thread pool-1-thread-1 即将离开
Thread pool-1-thread-1 已经离开,当前系统的并发数是:2
Thread pool-1-thread-5 进入当前系统的并发数是:3
Thread pool-1-thread-4 即将离开
Thread pool-1-thread-4 已经离开,当前系统的并发数是:2
栅栏-递增CyclicBarrier
CyclicBarrier 适用于需要多个线程相互等待,到达一个共同屏障点后再继续执行的场景
CyclicBarrier支持reset()方法重置,而CountDownLatch没有此功能
java
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
/**
* 集齐7颗龙珠召唤神龙
*/
// 召唤龙珠的线程
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("召唤神龙成功!");
System.out.println("====================");
});
for (int i = 1; i <=7 ; i++) {
final int temp = i;
// lambda能操作到 i 吗
new Thread(()->{
System.out.println("线程" + Thread.currentThread().getName()+"收集"+temp+"个龙珠");
try {
cyclicBarrier.await(); // 等待
// 所有线程到达后继续执行
continueWorking();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
private static void continueWorking() {
System.out.println("线程" + Thread.currentThread().getName() + " 继续执行其他任务");
}
}
结果:
text
线程Thread-0收集1个龙珠
线程Thread-4收集5个龙珠
线程Thread-6收集7个龙珠
线程Thread-3收集4个龙珠
线程Thread-2收集3个龙珠
线程Thread-1收集2个龙珠
线程Thread-5收集6个龙珠
召唤神龙成功!
====================
线程Thread-5 继续执行其他任务
线程Thread-0 继续执行其他任务
线程Thread-3 继续执行其他任务
线程Thread-1 继续执行其他任务
线程Thread-2 继续执行其他任务
线程Thread-4 继续执行其他任务
线程Thread-6 继续执行其他任务