Skip to content
鼓励作者:欢迎打赏犒劳

多线程

spring中使用线程池

java
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

@Configuration
public class ExecutorServiceConfig {
     private static final Integer CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
    @Bean
    @Qualifier("buyerPerformanceThreadPool")
    public ExecutorService buyerPerformanceThreadPool() {
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("buyerPerformanceThreadPool-thread-%d").build();
    
        return new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE * 2, 0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(1024 * 2), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()) {
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                super.afterExecute(r, t);
                if (t != null) {
                    log.error("预警|buyerPerformanceThreadPool 线程运行异常", t);
                }
            }
        };
    }
}

多任务等待执行

awaitTermination实现

shutdown():停止接收新任务,原来的任务继续执行

shutdownNow():停止接收新任务,原来的任务停止执行

awaitTermination(long timeOut, TimeUnit unit):当前线程阻塞

当前线程阻塞,直到:

  • 等所有已提交的任务(包括正在跑的和队列中等待的)执行完;
  • 或者 等超时时间到了(timeout 和 TimeUnit设定的时间);
  • 或者 线程被中断,抛出InterruptedException
java
/**
 * awaitTermination 必须搭配 shutdown使用。如果没有shutdown,则会一直阻塞在awaitTermination方法上。
 * 如果线程池的方法都执行完了,则会返回true,否则返回false
 * @throws Exception
*/
@Test
public void test3() throws Exception {
    // 创建一个线程池对象
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    long a = System.currentTimeMillis();
    // 循环提交100个 SQL 查询任务给线程池
    for (int i = 1; i <= 10; i++) {
        int finalI = i;
        executorService.execute(() -> {
            int randomNumber = (int) (Math.random() * 100);
            System.out.println(finalI + "--开始" + "等待:" + randomNumber);
            try {
                Thread.sleep(randomNumber);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(finalI + "--执行完了");
        });
    }
    // 等待所有任务执行完成
    executorService.shutdown();
    boolean boo = executorService.awaitTermination(40, TimeUnit.MILLISECONDS);
    long b = System.currentTimeMillis();
    System.out.println( "总耗时:"+  (b-a) +",线程池是否完成:" + boo);
}

CountDownLatch实现

在主线程中,调用 CountDownLatch 的 await() 方法在所有 SQL 查询任务执行完成前阻塞线程,直到计数器减为 0 时,所有任务都已经执行完成可以继续往下执行,此时可以对查询结果进行汇总处理。

java
@Test
public void test3() throws Exception {
    int TASK_COUNT = 10;
    // 创建 CountDownLatch 对象
    CountDownLatch latch = new CountDownLatch(TASK_COUNT);
    long a = System.currentTimeMillis();
    // 创建线程池
    ExecutorService executorService = Executors.newFixedThreadPool(5);

    // 提交 SQL 查询任务
    for (int i = 1; i <= TASK_COUNT; i++) {
        int finalI = i;
        executorService.execute(() -> {
            int randomNumber = (int) (Math.random() * 100);
            System.out.println(finalI + "--开始" + "等待:" + randomNumber);
            try {
                Thread.sleep(randomNumber);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // CountDownLatch 计数器减 1
            latch.countDown();
            System.out.println(finalI + "--执行完了");
        });
    }

    // 等待所有任务执行完成
    latch.await();

    // 关闭线程池
    executorService.shutdown();
    long b = System.currentTimeMillis();
    System.out.println( "总耗时:"+  (b-a));
}

信号量-Semaphore

可以用到限流的场景

java
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * @Description
 * @auther 公众号:干货食堂
 * @create 2022/6/4 20:52
 * 信号量:主要可以控制最大的并发数量。首先提前设置好最大的并发这里是3 然后由线程首先获取到许可。然后进行业务操作。完成后释放
 */
public class SemaphoreTest {
    public static void main(String[] args) {
        ExecutorService pool =  Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(3,true);

        for (int i = 0; i < 10; i++) {
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        semaphore.acquire();//获取信号灯许可
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    System.out.println("Thread "+Thread.currentThread().getName()+" 进入" +"当前系统的并发数是:"+(3-semaphore.availablePermits()));
                    try {
                        Thread.sleep(new Random().nextInt(1000));
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    System.out.println("Thread "+Thread.currentThread().getName()+" 即将离开");
                    semaphore.release();//释放信号灯
                    System.out.println("Thread "+Thread.currentThread().getName()+" 已经离开,当前系统的并发数是:"+(3-semaphore.availablePermits()));
                }
            };
            pool.execute(runnable);

        }
    }
}

结果:

text
Thread pool-1-thread-1 进入当前系统的并发数是:1
Thread pool-1-thread-7 进入当前系统的并发数是:2
Thread pool-1-thread-4 进入当前系统的并发数是:3
Thread pool-1-thread-7 即将离开
Thread pool-1-thread-7 已经离开,当前系统的并发数是:2
Thread pool-1-thread-3 进入当前系统的并发数是:3
Thread pool-1-thread-1 即将离开
Thread pool-1-thread-1 已经离开,当前系统的并发数是:2
Thread pool-1-thread-5 进入当前系统的并发数是:3
Thread pool-1-thread-4 即将离开
Thread pool-1-thread-4 已经离开,当前系统的并发数是:2

栅栏-递增CyclicBarrier

CyclicBarrier 适用于需要多个线程相互等待,到达一个共同屏障点后再继续执行的场景

CyclicBarrier支持reset()方法重置,而CountDownLatch没有此功能

java
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        /**
         * 集齐7颗龙珠召唤神龙
         */
        // 召唤龙珠的线程
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
            System.out.println("召唤神龙成功!");
            System.out.println("====================");
        });

        for (int i = 1; i <=7 ; i++) {
            final int temp = i;
            // lambda能操作到 i 吗
            new Thread(()->{
                System.out.println("线程" + Thread.currentThread().getName()+"收集"+temp+"个龙珠");
                try {
                    cyclicBarrier.await(); // 等待
                     // 所有线程到达后继续执行
                     continueWorking();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }

    private static void continueWorking() {
        System.out.println("线程" + Thread.currentThread().getName() + " 继续执行其他任务");
    }
}

结果:

text
线程Thread-0收集1个龙珠
线程Thread-4收集5个龙珠
线程Thread-6收集7个龙珠
线程Thread-3收集4个龙珠
线程Thread-2收集3个龙珠
线程Thread-1收集2个龙珠
线程Thread-5收集6个龙珠
召唤神龙成功!
====================
线程Thread-5 继续执行其他任务
线程Thread-0 继续执行其他任务
线程Thread-3 继续执行其他任务
线程Thread-1 继续执行其他任务
线程Thread-2 继续执行其他任务
线程Thread-4 继续执行其他任务
线程Thread-6 继续执行其他任务

如有转载或 CV 的请标注本站原文地址