Java多线程编程学习笔记

/ 0评 / 0

Java多线程编程

Java进程

进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是操作系统进行资源分配与调度的基本单位。

可以把进程简单的理解为正在操作系统中运行的一个程序。

Java线程

线程(thread)是进程的一个执行单元。

一个线程就是进程中一个单一顺序的控制流, 进程的一个执行分支。

进程是线程的容器,一个进程至少有一个线程.一个进程中也可以有多个线程。

在操作系统中是以进程为单位分配资源,如虚拟存储空间,文件描述符等. 每个线程都有各自的线程栈,自己的寄存器环境,自己的线程本地存储。

Java主线程与子线程

JVM启动时会创建一个主线程,该主线程负责执行main方法 . 主线程就是运行main方法的线程。

Java中的线程不孤立的,线程之间存在一些联系. 如果在A线程中创建了B线程, 称B线程为A线程的子线程, 相应的A线程就是B线程的父线程。

Java串行、并发与并行

img

并发可以提高以事物的处理效率, 即一段时间内可以处理或者完成更多的事情。

并行是一种更为严格,理想的并发。

从硬件角度来说, 如果单核CPU,一个处理器一次只能执行一个线程的情况下,处理器可以使用时间片轮转技术 ,可以让CPU快速的在各个线程之间进行切换, 对于用来来说,感觉是三个线程在同时执行.如果是多核心CPU,可以为不同的线程分配不同的CPU内核。

Java多线程的创建与启动方式

在Java中,创建一个线程就是创建一个Thread类(子类)的对象(实例)。

Thread类有两个常用的构造方法:Thread()与Thread(Runnable).对应的创建线程的两种方式:

这两种创建线程的方式没有本质的区别。

Thread类子类:

public class MyThread extends Thread{
    @Override
    public void run() {
        System.out.println("这是子线程打印的内容");
    }
}
---------
public class App
{
    public static void main( String[] args )
    {
        System.out.println("JVM启动main线程,main线程执行main方法");
        // 创建子线程对象
        MyThread myThread = new MyThread();
        /*
            调用线程的start()方法来启动线程, 启动线程的实质就是请求JVM运行相应的线程,这个线程具体在什么时候运行由线程调度器(Scheduler)决定
            注意:
                start()方法调用结束并不意味着子线程开始运行
                新开启的线程会执行run()方法
                如果开启了多个线程,start()调用的顺序并不一定就是线程启动的顺序
                多线程运行结果与代码执行顺序或调用顺序无关
        */
        // 启动线程
        myThread.start();
    }
}

Runnable接口的实现类:

public class MyRunnable implements Runnable{
    @Override
    public void run() {
        System.out.println("这是子线程打印的内容");
    }
}
--------
public class App
{
    public static void main( String[] args )
    {
        // 创建Runnable接口的实现类对象
        MyRunnable myRunnable = new MyRunnable();
        // 创建线程对象
        Thread thread = new Thread(myRunnable);
        // 开启线程
        thread.start();


        // 有时调用Thread(Runnable)构造方法时,实参也会传递匿名内部类对象
        Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("匿名内部类对象打印的内容");
            }
        });
        thread1.start();
    }
}

Java多线程常用的方法

currentThread()方法

Thread.currentThread()方法可以获得当前线程。

Java中的任何一段代码都是执行在某个线程当中的. 执行当前代码的线程就是当前线程。

同一段代码可能被不同的线程执行,因此当前线程是相对的,Thread.currentThread()方法的返回值是在代码实际运行时候的线程对象。

public class SubThread extends Thread{

    public SubThread() {
        System.out.println("构造方法打印当前线程的名称:"+Thread.currentThread().getName());
    }

    @Override
    public void run() {
        System.out.println("run方法打印当前线程名称:" + Thread.currentThread().getName());
    }
}

------
    @Test
    public void Test01CurrentThread()
    {
        System.out.println("main方法中打印当前线程:"  + Thread.currentThread().getName());
        //创建子线程, 调用SubThread1()构造方法, 在main线程中调用构造方法,所以构造方法中 的当前线程就是main线程
        SubThread subThread = new SubThread();
        //启动子线程,子线程会调用run()方法,所以run()方法中 的当前线程就是Thread-0子线程
        //subThread.start();
        //在main方法中直接调用run()方法,没有开启新的线程,所以在run方法中的当前线程就是main线程
        subThread.run();
    }

main方法中打印当前线程:main
构造方法打印当前线程的名称:main
run方法打印当前线程名称:main
public class SubThread2 extends Thread{

    public SubThread2() {
        System.out.println("构造方法中,Thread.currentThread().getName() : " + Thread.currentThread().getName() );
        System.out.println("构造方法,this.getName() : " + this.getName());
    }

    @Override
    public void run() {
        System.out.println("run方法中,Thread.currentThread().getName() : " + Thread.currentThread().getName() );
        System.out.println("run方法,this.getName() : " + this.getName());
    }
}

    @Test
    public void Test02CurrentThread() throws InterruptedException {

        SubThread2 t2 = new SubThread2();
        // 设置线程名称
        t2.setName("t2");
        t2.start();

        Thread.sleep(200);

        //Thread(Runnable)构造方法形参是Runnable接口,调用时传递的实参是接口的实现类对象
        Thread t3 = new Thread(t2);
        t3.start();
    }

构造方法中,Thread.currentThread().getName() : main
构造方法,this.getName() : Thread-0
run方法中,Thread.currentThread().getName() : t2
run方法,this.getName() : t2
run方法中,Thread.currentThread().getName() : Thread-1
run方法,this.getName() : t2

thread.setName(线程名称), 设置线程名称。

thread.getName()返回线程名称。

通过设置线程名称,有助于程序调试,提高程序的可读性, 建议为每个线程都设置一个能够体现线程功能的名称。

isAlive()

thread.isAlive()判断当前线程是否处于活动状态。

活动状态就是线程已启动并且尚未终止。

public class SubThread3 extends Thread{
    @Override
    public void run() {
        //运行状态,true
        System.out.println("run方法,isAlive = "+this.isAlive());
    }
}


    @Test
    public void Test03CurrentThread() throws InterruptedException {
        SubThread3 t3 = new SubThread3();
        System.out.println("begin==" + t3.isAlive());//false,在启动线程之前
        t3.start();
        Thread.sleep(500);
        // 结果不一定,打印这一行时,如果t3线程还没结束就返回true, 如果t3线程已结束,返回false
        System.out.println("end==" + t3.isAlive());
    }

sleep()

Thread.sleep(millis)让当前线程休眠指定的毫秒数。

getId()

thread.getId()可以获得线程的唯一标识。

注意:

某个编号的线程运行结束后,该编号可能被后续创建的线程使用。

重启的JVM后,同一个线程的编号可能不一样。

public class SubThread5 extends Thread{
    @Override
    public void run() {
        System.out.print("thread name = "+ Thread.currentThread().getName());
        System.out.println(",thread id = "+ this.getId());
    }
}

    @Test
    public void Test05CurrentThread() throws InterruptedException {

        //子线程的id
        for(int i = 1; i <= 10; i++){
            new SubThread5().start();
            Thread.sleep(100);
        }
    }
-----    
thread name = Thread-0,thread id = 14
thread name = Thread-1,thread id = 15
thread name = Thread-2,thread id = 16
thread name = Thread-3,thread id = 17
thread name = Thread-4,thread id = 18
thread name = Thread-5,thread id = 19
thread name = Thread-6,thread id = 20
thread name = Thread-7,thread id = 21
thread name = Thread-8,thread id = 22
thread name = Thread-9,thread id = 23

yield()

Thread.yield()方法的作用是放弃当前的CPU资源。(线程让步)

用一个简单的比喻来形容就是:

三个人塞米赛跑,三人都快跑到90m位置的时候,2号突然被传送到了起点,三人继续跑,这样1号和3号赢的机会就大得多

2号就相当于调用了yield()的线程,线程调用了yiled()之后回退到可运行状态,将抢占资源的机会让给其他相同优先级的线程。

public class SubThread6 extends Thread {
    @Override
    public void run() {
        long begin = System.currentTimeMillis();
        long sum = 0;
        for (int i =1;i<=100000;i++){
            sum +=i;
            //线程让步, 放弃CPU执行权
            Thread.yield();
        }
        long end = System.currentTimeMillis();
        System.out.println("用时: " + (end - begin)+",sum="+sum);
    }
}
--------
    @Test
    public void Test06CurrentThread(){
        SubThread6 t6 = new SubThread6();
        t6.start();

        long begin = System.currentTimeMillis();
        long sum = 0;
        for(int i = 1; i <= 1000000; i++){
            sum += i;
        }
        long end = System.currentTimeMillis();
        System.out.println("用时: " + (end - begin)+",sum="+sum);
    }

setPriority()

thread.setPriority( num ) 设置线程的优先级。

Java线程的优先级取值范围是 1 ~ 10 , 如果超出这个范围会抛出异常IllegalArgumentException

在操作系统中,优先级较高的线程获得CPU的资源越多。

线程优先级本质上是只是给线程调度器一个提示信息,以便于调度器决定先调度哪些线程. 注意不能保证优先级高的线程先运行。

Java优先级设置不当或者滥用可能会导致某些线程永远无法得到运行,即产生了线程饥饿。

线程的优先级并不是设置的越高越好,一般情况下使用普通的优先级即可,即在开发时不必设置线程的优先级。

线程的优先级具有继承性, 在A线程中创建了B线程,则B线程的优先级与A线程是一样的。

interrupt()

中断线程。

注意调用interrupt()方法仅仅是在当前线程打一个停止标志,并不是真正的停止线程。

public class SubThread8 extends Thread{
    @Override
    public void run() {
        super.run();
        int i = 100000;
        for (int i1 = 0; i1 < i; i1++) {
            //判断线程的中断标志,线程有 isInterrupted()方法,该方法返回线程的中断标志
            if (this.isInterrupted()) {
                System.out.println("当前线程的中断标志为true, 我要退出了");
                return;
            }
            System.out.println("sub thread --> " + i1);
        }
    }
}

    @Test
    public void Test08CurrentThread(){
        SubThread8 t8 = new SubThread8();
        t8.start();

        for (int i = 0;i<10;i++){
            System.out.println("main ==> " + i);
        }
        //中断子线程
        t8.interrupt();
    }
main ==> 0
main ==> 1
main ==> 2
main ==> 3
main ==> 4
main ==> 5
main ==> 6
main ==> 7
sub thread --> 0
main ==> 8
sub thread --> 1
main ==> 9
sub thread --> 2
当前线程的中断标志为true, 我要退出了

setDaemon()

Java中的线程分为用户线程与守护线程。

守护线程是为其他线程提供服务的线程,如垃圾回收器(GC)就是一个典型的守护线程。

守护线程不能单独运行, 当JVM中没有其他用户线程,只有守护线程时,守护线程会自动销毁, JVM会退出。

Java多线程的生命周期

线程的生命周期是线程对象的生老病死,即线程的状态。

线程生命周期可以通过getState()方法获得, 线程的状态是Thread.State枚举类型定义的, 由以下几种:

image-20210907211400969

Java多线程编程的优势与风险

Java多线程编程具有以下优势:

1、提高系统的吞吐率(Throughout). 多线程编程可以使一个进程有多个并发(concurrent,即同时进行的)的操作。

2、提高响应性(Responsiveness).Web服务器会采用一些专门的线程负责用户的请求处理,缩短了用户的等待时间。

3、充分利用多核(Multicore)处理器资源. 通过多线程可以充分的利用CPU资源。

Java多线程编程存在的问题与风险:

1、线程安全(Thread safe)问题.多线程共享数据时,如果没有采取正确的并发访问控制措施,就可能会产生数据一致性问题,如读取脏数据(过期的数据), 如丢失数据更新。

2、线程活性(thread liveness)问题.由于程序自身的缺陷或者由资源稀缺性导致线程一直处于非RUNNABLE状态,这就是线程活性问题,常见的活性故障有以下几种:

3、上下文切换(Context Switch). 处理器从执行一个线程切换到执行另外一个线程。

4、可靠性. 可能会由一个线程导致JVM意外终止,其他的线程也无法执行。

Java多线程原子性与可见性

非线程安全主要是指多个线程对同一个对象的实例变量进行操作时,会出现值被更改,值不同步的情况。

线程安全问题表现为三个方面: 原子性,可见性和有序性

原子性

原子(Atomic)就是不可分割的意思. 原子操作的不可分割有两层含义:

如现实生活中从ATM机取款, 对于用户来说,要么操作成功,用户拿到钱, 余额减少了,增加了一条交易记录; 要么没拿到钱,相当于取款操作没有发生。

Java有两种方式实现原子性:

一种是使用锁; 另一种利用处理器的CAS(Compare and Swap)指令。

锁具有排它性,保证共享变量在某一时刻只能被一个线程访问。

CAS指令直接在硬件(处理器和内存)层次上实现,看作是硬件锁。

可见性

在多线程环境中, 一个线程对某个共享变量进行更新之后 , 后续其他的线程可能无法立即读到这个更新的结果, 这就是线程安全问题的另外一种形式: 可见性(visibility)。

如果一个线程对共享变量更新后, 后续访问该变量的其他线程可以读到更新的结果, 称这个线程对共享变量的更新对其他线程可见, 否则称这个线程对共享变量的更新对其他线程不可见。

多线程程序因为可见性问题可能会导致其他线程读取到了旧数据(脏数据)。

有序性

有序性(Ordering)是指在什么情况下一个处理器上运行的一个线程所执行的 内存访问操作在另外一个处理器运行的其他线程看来是乱序的(Out of Order)。

乱序是指内存访问操作的顺序看起来发生了变化。

重排序

在多核处理器的环境下,编写的顺序结构,这种操作执行的顺序可能是没有保障的:

1.编译器可能会改变两个操作的先后顺序;

2.处理器也可能不会按照目标代码的顺序执行;

这种一个处理器上执行的多个操作,在其他处理器来看它的顺序与目标代码指定的顺序可能不一样,这种现象称为重排序

重排序是对内存访问有序操作的一种优化,可以在不影响单线程程序正确的情况下提升程序的性能.但是,可能对多线程程序的正确性产生影响,即可能导致线程安全问题。

重排序与可见性问题类似,不是必然出现的。

与内存操作顺序有关的几个概念:

源代码顺序, 就是源码中指定的内存访问顺序。

程序顺序, 处理器上运行的目标代码所指定的内存访问顺序。

执行顺序,内存访问操作在处理器上的实际执行顺序。

感知顺序,给定处理器所感知到的该处理器及其他处理器的内存访问操作的顺序。

可以把重排序分为指令重排序与存储子系统重排序两种:

1.指令重排序主要是由JIT编译器,处理器引起的, 指程序顺序与执行顺序不一样。

2.存储子系统重排序是由高速缓存,写缓冲器引起的, 感知顺序与执行顺序不一致。

指令重排序

在源码顺序与程序顺序不一致,或者 程序顺序与执行顺序不一致的情况下,我们就说发生了指令重排序(Instruction Reorder)。

指令重排是一种动作,确实对指令的顺序做了调整, 重排序的对象指令。

javac编译器一般不会执行指令重排序, 而JIT编译器可能执行指令重排序。

处理器也可能执行指令重排序, 使得执行顺序与程序顺序不一致。

指令重排不会对单线程程序的结果正确性产生影响,可能导致多线程程序出现非预期的结果。

存储子系统重排序

存储子系统是指写缓冲器与高速缓存。

高速缓存(Cache)是CPU中为了匹配与主内存处理速度不匹配而设计的一个高速缓存。

写缓冲器(Store buffer, Write buffer)用来提高写高速缓存操作的效率。

即使处理器严格按照程序顺序执行两个内存访问操作,在存储子系统的作用下, 其他处理器对这两个操作的感知顺序与程序顺序不一致,即这两个操作的顺序顺序看起来像是发生了变化, 这种现象称为存储子系统重排序。

存储子系统重排序并没有真正的对指令执行顺序进行调整,而是造成一种指令执行顺序被调整的现象。

存储子系统重排序对象是内存操作的结果。

保证内存访问的顺序性

可以使用volatile关键字, synchronized关键字实现有序性。

Java多线程内存模型

img

img

Java多线程同步机制简介

线程同步机制是一套用于协调线程之间的数据访问的机制.该机制可以保障线程安全。

Java平台提供的线程同步机制包括: 锁, volatile关键字, final关键字,static关键字,以及相关的API,如Object.wait()/Object.notify()等。

每一个服务进程的运行,都包含若干进程(Thread),线程是调度的基本单位,进程则是资源拥有的基本单位。

线程有自己的私有数据,比如栈和寄存器,同时与其它线程共享相同的虚拟内存和全局变量等资源,当多个线程同时读写同一份共享资源的时候,会引起冲突,这时候就需要引入线程同步机制使各个线程排队一个一个的对共享资源进行操作,而不是同时进行。

img

  1. 线程同步其实实现的是线程排队
  2. 防止线程同步访问共享资源造成冲突
  3. 变量需要同步,常量不需要(常量存放于方法区)。
  4. 多个线程访问共享资源的代码有可能是同一份代码,也有可能是不同的代码;无论是否执行同一份代码,只要这些线程的代码访问同一份可变的共享资源,这些线程之间就需要同步。

线程同步好处:

多个线程同时访问共享数据时,防止数据被损坏。

线程同步带来的问题:

1、实现比较繁琐,而且容易出错。

必须对多个线程可能同时访问的所有数据,用额外的代码包围起来,以获得和释放一个同步锁。这需要由程序员来保证没有遗漏,对多线程共享的数据的加锁工作。并且,在程序完成时,需要进行压力测试以保证多个线程并发时,结果如预期。

2、它会损害性能。

获取和释放一个锁是需要时间的。因为需要额外的调用一些方法,并且需要协调调度下一个获得锁的线程。

3、每次只能允许一个线程访问资源。这是锁的全部意义所在,但也是问题所在,因为阻塞一个线程可能会造成更多的线程被创建。

线程同步建议:

1、线程同步并不是一件好事,设计自己的应用程序是应酌情考虑,尽量避免线程同步。

2、避免使用一些共享数据,如静态字段。(如果有多线程同时读写这个静态字段就有问题)

3、试着用值类型,因为值类型总是会被复制,每个线程操作的都是自己的那个副本。

4、多线程对共享数据进行只读访问是没有任何问题的。

Java多线程锁

线程安全问题的产生前提是多个线程并发访问共享数据。

将多个线程对共享数据的并发访问转换为串行访问,即一个共享数据一次只能被一个线程访问.锁就是复用这种思路来保障线程安全的。

锁(Lock)可以理解为对共享数据进行保护的一个许可证. 对于同一个许可证保护的共享数据来说,任何线程想要访问这些共享数据必须先持有该许可证. 一个线程只有在持有许可证的情况下才能对这些共享数据进行访问; 并且一个许可证一次只能被一个线程持有; 许可证线程在结束对共享数据的访问后必须释放其持有的许可证。

一线程在访问共享数据前必须先获得锁; 获得锁的线程称为锁的持有线程; 一个锁一次只能被一个线程持有. 锁的持有线程在获得锁之后 和释放锁之前这段时间所执行的代码称为临界区(Critical Section)。

锁具有排他性(Exclusive), 即一个锁一次只能被一个线程持有.这种锁称为排它锁或互斥锁(Mutex)

img

JVM把锁分为内部锁和显示锁两种. 内部锁通过synchronized关键字实现; 显示锁通过java.concurrent.locks.Lock接口的实现类实现的。

Java线程锁的作用

锁可以实现对共享数据的安全访问. 保障线程的原子性,可见性与有序性。

锁是通过互斥保障原子性. 一个锁只能被一个线程持有, 这就保证临界区的代码一次只能被一个线程执行.使得临界区代码所执行的操作自然而然的具有不可分割的特性,即具备了原子性

可见性的保障是通过写线程冲刷处理器的缓存和读线程刷新处理器缓存这两个动作实现的. 在java平台中,锁的获得隐含着刷新处理器缓存的动作, 锁的释放隐含着冲刷处理器缓存的动作。

锁能够保障有序性.写线程在临界区所执行的在读线程所执行的临界区看来像是完全按照源码顺序执行的。

注意:

使用锁保障线程的安全性,必须满足以下条件:

线程锁相关的概念

可重入性

可重入性(Reentrancy)描述这样一个问题: 一个线程持有该锁的时候能再次(多次)申请该锁。

如果一个线程持有一个锁的时候还能够继续成功申请该锁,称该锁是可重入的, 否则就称该锁为不可重入的。

锁的争用与调度

Java平台中内部锁属于非公平锁, 显示Lock锁既支持公平锁又支持非公平锁。

锁的粒度

一个锁可以保护的共享数据的数量大小称为锁的粒度。

锁保护共享数据量大,称该锁的粒度粗, 否则就称该锁的粒度细。

锁的粒度过粗会导致线程在申请锁时会进行不必要的等待.锁的粒度过细会增加锁调度的开销。

Java多线程同步代码块Synchronized

Java中的每个对象都有一个与之关联的内部锁(Intrinsic lock). 这种锁也称为监视器(Monitor), 这种内部锁是一种排他锁,可以保障原子性,可见性与有序性。

内部锁是通过synchronized关键字实现的.synchronized关键字修饰代码块,修饰该方法。

修饰代码块的语法:

synchronized( 对象锁 ) {
   同步代码块,可以在同步代码块中访问共享数据
}

修饰实例方法就称为同步实例方法

修饰静态方法称称为同步静态方法

synchronized同步代码块实例:

this锁对象

public class Test01 {

    public void mm(){
        synchronized (this){
            for (int i=0;i<=100;i++){
                System.out.println(Thread.currentThread().getName() + " --> " + i);
            }
        }
    }
}
    ---
    @Test
    public void Test10CurrentThread(){
        //创建两个线程,分别调用mm()方法
        Test01 t1 = new Test01();
        new Thread(new Runnable() {
            @Override
            public void run() {
                t1.mm();//使用的锁对象this就是t1对象
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                t1.mm();//使用的锁对象this就是t1对象
            }
        }).start();

    }

如果线程的锁不同, 不能实现同步,想要同步必须使用同一个锁对象

    @Test
    public void Test11CurrentThread(){
        //创建两个线程,分别调用mm()方法
        Test01 t1 = new Test01();
        Test01 t2 = new Test01();
        new Thread(new Runnable() {
            @Override
            public void run() {
                t1.mm();//使用的锁对象this就是t1对象
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                t2.mm();//使用的锁对象this就是t2对象
            }
        }).start();
    }

使用一个常量对象作为锁对象

public class Test01 {

    //定义一个常量
    public static final  Object obj = new Object();

    public void mm(){
        synchronized (obj){
            for (int i=0;i<=100;i++){
                System.out.println(Thread.currentThread().getName() + " --> " + i);
            }
        }
    }
}

    @Test
    public void Test12CurrentThread(){
        //创建两个线程,分别调用mm()方法
        Test01 t1 = new Test01();
        Test01 t2 = new Test01();

        new Thread(new Runnable() {
            @Override
            public void run() {
                t1.mm();//使用的锁对象this就是t1对象
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                t2.mm();//使用的锁对象this就是t2对象
            }
        }).start();
    }

使用一个常量对象作为锁对象,不同方法中 的同步代码块也可以同步

public class Test01 {

    //定义一个常量
    public static final  Object obj = new Object();

    //方法1
    public void mm(){
        //使用一个常量对象作为锁对象
        synchronized (obj){
            for (int i=0;i<=100;i++){
                System.out.println(Thread.currentThread().getName() + " --> " + i);
            }
        }
    }
    //方法2
    public void mmm(){
        //使用一个常量对象作为锁对象
        synchronized (obj){
            for (int i=0;i<=100;i++){
                System.out.println(Thread.currentThread().getName() + " --> " + i);
            }
        }
    }
}

    @Test
    public void Test13CurrentThread(){
        //创建两个线程,分别调用mm()方法
        Test01 t1 = new Test01();
        Test01 t2 = new Test01();

        new Thread(new Runnable() {
            @Override
            public void run() {
                t1.mm();//使用的锁对象this就是t1对象
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                t2.mmm();//使用的锁对象this就是t2对象
            }
        }).start();
    }

Java多线程同步方法

synchronized同步实例方法

默认的锁对象是this对象

public class Test02 {
    public void mm(){
        synchronized (this){
            for (int i = 1; i <= 100; i++){
                System.out.println(Thread.currentThread().getName() + " --> " + i);
            }
        }
    }

    public synchronized void mmm (){
        for (int i = 1; i <= 100; i++){
            System.out.println(Thread.currentThread().getName() + " --> " + i);
        }
    }
}

    @Test
    public void Test14CurrentThread(){
        //创建两个线程,分别调用mm()方法
        Test02 t1 = new Test02();

        new Thread(new Runnable() {
            @Override
            public void run() {
                t1.mm();//使用的锁对象this就是t1对象
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
//                t1.mmm();//使用的锁对象this也是obj对象, 可以同步
                new Test02().mmm();//使用的锁对象this是刚刚new创建的一个新对象,不是同一个锁对象不能同步
            }
        }).start();
    }

synchronized同步静态方法

默认的锁对象是当前类的运行时类对象, Test03.class, 有人称它为类锁

public class Test02 {
    public void mm(){
        synchronized (this){
            for (int i = 1; i <= 100; i++){
                System.out.println(Thread.currentThread().getName() + " --> " + i);
            }
        }
    }

    public synchronized void mmm (){
        for (int i = 1; i <= 100; i++){
            System.out.println(Thread.currentThread().getName() + " --> " + i);
        }
    }
}
----------
    @Test
    public void Test14CurrentThread(){
        //创建两个线程,分别调用mm()方法
        Test02 t1 = new Test02();

        new Thread(new Runnable() {
            @Override
            public void run() {
                t1.mm();//使用的锁对象this就是t1对象
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
//                t1.mmm();//使用的锁对象this也是obj对象, 可以同步
                new Test02().mmm();//使用的锁对象this是刚刚new创建的一个新对象,不是同一个锁对象不能同步
            }
        }).start();
    }

同步方法与同步代码块如何选择?-->同步方法锁的粒度粗, 执行效率低, 同步代码块执行效率高

public class Test03 {
    /**
     * 同步方法, 执行效率低
     */
    public synchronized void doLongTimeTask(){
        try {
            System.out.println("Task Begin");
            Thread.sleep(3000);
            for(int i = 1; i <= 100; i++){
                System.out.println(Thread.currentThread().getName() + "-->" + i);
            }
            System.out.println("Task end");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 同步代码块,锁的粒度细, 执行效率高
     */
    public void doLongTimeTask2(){
        try {
            System.out.println("Task Begin");
            Thread.sleep(3000);
            synchronized (this){
                for(int i = 1; i <= 100; i++){
                    System.out.println(Thread.currentThread().getName() + "-->" + i);
                }
            }
            System.out.println("Task end");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

-----
    @Test
    public void Test15CurrentThread(){
        Test03 t3 = new Test03();
        new Thread(new Runnable() {
            @Override
            public void run() {
                t3.doLongTimeTask();
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                t3.doLongTimeTask2();
            }
        }).start();
    }

Java多线程脏读

脏读:

出现读取属性值出现了一些意外, 读取的是中间值,而不是修改之后的值.

出现脏读的原因是:对共享数据的修改与对共享数据的读取不同步

解决方法:

不仅对修改数据的代码块进行同步,还要对读取数据的代码块同步

public class Test04 {
    public static void main(String[] args) throws InterruptedException {
        PublicValue publicValue = new PublicValue();
        SubThread thread = new SubThread(publicValue);
        thread.start();

        //为了确定设置成功
        Thread.sleep(100);
        //在main线程中读取用户名,密码
        publicValue.getValue();
    }

    /**
     * 定义线程,设置用户名和密码
     */
    static class SubThread extends Thread{
        private PublicValue publicValue;
        public SubThread( PublicValue publicValue){
            this.publicValue = publicValue;
        }

        @Override
        public void run() {
            publicValue.setValue("test", "test");
        }
    }

    static class PublicValue{
        private String name = "glj";
        private String pwd = "glj123";

        public synchronized void getValue(){
            System.out.println(Thread.currentThread().getName() + ", getter -- name: " + name + ",--pwd: " + pwd);
        }

        public synchronized void setValue(String name, String pwd){
            this.name = name;
            try {
                //模拟操作name属性需要一定时间
                Thread.sleep(1000);
                this.pwd = pwd;
                System.out.println(Thread.currentThread().getName() + ", setter --name:" + name + ", --pwd: " + pwd );
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Java多线程出现异常会自动释放锁

同步过程中线程出现异常, 会自动释放锁对象

public class Test05 {
    public static void main(String[] args) {
        Test05 t5 = new Test05();
        new Thread(new Runnable() {
            @Override
            public void run() {
                t5.m1();
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                Test05.m2();
            }
        }).start();
    }
    //定义方法,打印100行字符串
    public void m1(){
        //使用当前类的运行时类对象作为锁对象,可以简单的理解为把Test05类的字节码文件作为锁对象
        synchronized (Test05.class){
            for (int i = 1; i <= 100; i++) {
                System.out.println(Thread.currentThread().getName() + " --> " + i);
                if ( i == 50){
                    //把字符串转换为int类型时,如果字符串不符合 数字格式会产生异常
                    Integer.parseInt("abc");
                }
            }
        }
    }
    //使用synchronized修饰静态方法,同步静态方法, 默认运行时类Test05.class作为锁对象
    public synchronized static void m2(){
        for (int i = 1; i <= 100; i++) {
            System.out.println(Thread.currentThread().getName() + " --> " + i);
        }
    }
}

Java多线程死锁

死锁:

在多线程程序中,同步时可能需要使用多个锁,如果获得锁的顺序不一致,可能会导致死锁

如何避免死锁?当需要获得多个锁时,所有线程获得锁的顺序保持一致即可

下面例子会造成死锁:

public class Test06 {

    public static void main(String[] args) {
        SubThread t1 = new SubThread();
        t1.setName("a");
        t1.start();

        SubThread t2 = new SubThread();
        t2.setName("b");
        t2.start();
    }

    static class  SubThread  extends Thread{
        private static final  Object lock1 = new Object();
        private static final  Object lock2 = new Object();

        @Override
        public void run() {
            if ("a".equals(Thread.currentThread().getName())){
                synchronized (lock1){
                    System.out.println("a线程获得了lock1锁,还需要获得lock2锁");
                    synchronized (lock2){
                        System.out.println("a线程获得lock1后又获得了lock2,可以想干任何想干的事");
                    }
                }
            }
            if ("b".equals(Thread.currentThread().getName())){
                synchronized (lock2){
                    System.out.println("b线程获得了lock2锁,还需要获得lock1锁");
                    synchronized (lock1){
                        System.out.println("b线程获得lock2后又获得了lock1,可以想干任何想干的事");
                    }
                }
            }
        }
    }
}

--------

所有线程获得锁的顺序保持一致即可,就是把线程b也是先获取lock1再获取lock2即可

Java volatile关键字的作用

volatile的作用可以强制线程从公共内存中读取变量的值,而不是从工作内存中读取

public class Test7 {

    public static void main(String[] args) throws InterruptedException {
        //创建PrintString对象
        PrintString printString = new PrintString();

        new Thread(new Runnable() {
            @Override
            public void run() {
                printString.printStringMethod();
            }
        }).start();

        //main线程睡眠1000毫秒
        Thread.sleep(1000);
        System.out.println("在main线程中修改打印标志");
        printString.setContinuePrint(false);
    }

    static class  PrintString{
        private volatile boolean continuePrint = true;

        public PrintString setContinuePrint(boolean continuePrint) {
            this.continuePrint = continuePrint;
            return this;
        }

        public void printStringMethod(){
            System.out.println(Thread.currentThread().getName() + "开始....");
            while ( continuePrint ){

            }
            System.out.println(Thread.currentThread().getName() + "结束++++++++++++++");
        }

    }

}

volatile与synchronized比较

volatile关键字是线程同步的轻量级实现,所以volatile性能肯定比synchronized要好;

volatile只能修饰变量,而synchronized可以修饰方法,代码块. 随着JDK新版本的发布,synchronized的执行效率也有较大的提升,在开发中使用sychronized的比率还是很大的。

多线程访问volatile变量不会发生阻塞,而synchronized可能会阻塞。

volatile能保证数据的可见性,但是不能保证原子性; 而synchronized可以保证原子性,也可以保证可见性。

关键字volatile解决的是变量在多个线程之间的可见性; synchronized关键字解决多个线程之间访问公共资源的同步性。

volatile关键字增加了实例变量在多个线程之间的可见性,但是不具备原子性。

Java原子类自增自减操作

我们知道i++操作不是原子操作, 除了使用Synchronized进行同步外,也可以使用AtomicInteger/AtomicLong原子类进行实现。

public class Test08 {
    public static void main(String[] args) throws InterruptedException {
        //在main线程中创建10个子线程
        for (int i = 0; i < 1000; i++) {
            new MyThread().start();
        }
        Thread.sleep(1000);
        System.out.println( MyThread.count.get());
    }

    static class MyThread extends Thread{
        private static AtomicInteger count = new AtomicInteger();

        public static void addCounter(){
            for (int i = 0; i < 10000; i++) {
                //自增的后缀形式
                count.getAndIncrement();
            }
            System.out.println(Thread.currentThread().getName() + " count=" + count.get());
        }

        @Override
        public void run() {
            addCounter();
        }
    }
}

AtomicInteger具体的用法:https://www.jianshu.com/p/e5102928f7b2

Java CAS多线程

CAS(Compare And Swap)是由硬件实现的。

CAS可以将read- modify - write这类的操作转换为原子操作。

i++自增操作包括三个子操作:

  1. 从主内存读取i变量值
  2. 对i的值加1
  3. 再把加1之后 的值保存到主内存

CAS原理:在把数据更新到主内存时,再次读取主内存变量的值,如果现在变量的值与期望的值(操作起始时读取的值)一样就更新。

img

使用CAS实现线程安全的计数器

Public class CASTest {

    public static void main(String[] args) {
        CASCounter casCounter = new CASCounter();
        for (int i = 0; i < 100000; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println(casCounter.incrementAndGet());
                }
            }).start();
        }
    }
}
class CASCounter{
    //使用volatile修饰value值,使线程可见
    volatile private long value;

    public long getValue(){
        return value;
    }

    //定义comare and swap方法
    private boolean compareAndSwap(long expectedValue, long newValue){
        //如果当前value的值与期望的expectedVAlue值一样,就把当前的Value字段替换为newValue值
        synchronized (this){
            if ( value == expectedValue){
                value = newValue;
                return true;
            }else {
                return false;
            }
        }
    }

    //定义自增的方法
    public long incrementAndGet(){
        long oldvalue ;
        long newValue;
        do {
            oldvalue = value;
            newValue = oldvalue+1;
        }while ( !compareAndSwap(oldvalue, newValue) );
        return newValue;
    }

}

Java原子变量

原子变量类基于CAS实现的, 当对共享变量进行read-modify-write更新操作时,通过原子变量类可以保障操作的原子性与可见性.对变量的read-modify-write更新操作是指当前操作不是一个简单的赋值,而是变量的新值依赖变量的旧值,如自增操作i++. 由于volatile只能保证可见性,无法保障原子性, 原子变量类内部就是借助一个Volatile变量,并且保障了该变量的read-modify-write操作的原子性, 有时把原子变量类看作增强的volatile变量. 原子变量类有12个,如:

分组 原子变量类
基础数据型 AtomicInteger, AtomicLong, AtomicBoolean
数组型 AtomicIntegerArray, AtomicLongArray,AtomicReferenceArray
字段更新器 AtomicIntegerFieldUpdater, AtomicLongFieldUpdater, AtomicReferenceFieldUpdater
引用型 AtomicReference,AtomicStampedReference, AtomicMarkableReference

Java线程间的通信方式

Java线程间的通信方式

1、同步

这里讲的同步是指多个线程通过synchronized关键字这种方式来实现线程间的通信。

参考示例:

public class MyObject {

    synchronized public void methodA() {
        //do something....
    }

    synchronized public void methodB() {
        //do some other thing
    }
}
public class ThreadA extends Thread {

    private MyObject object;
    //省略构造方法
    @Override
    public void run() {
        super.run();
        object.methodA();
    }
}

------------

public class ThreadB extends Thread {

    private MyObject object;
    //省略构造方法
    @Override
    public void run() {
        super.run();
        object.methodB();
    }
}
public class Run {
    public static void main(String[] args) {
        MyObject object = new MyObject();

        //线程A与线程B 持有的是同一个对象:object
        ThreadA a = new ThreadA(object);
        ThreadB b = new ThreadB(object);
        a.start();
        b.start();
    }
}

由于线程A和线程B持有同一个MyObject类的对象object,尽管这两个线程需要调用不同的方法,但是它们是同步执行的,比如:线程B需要等待线程A执行完了methodA()方法之后,它才能执行methodB()方法。这样,线程A和线程B就实现了通信。这种方式,本质上就是“共享内存”式的通信。多个线程需要访问同一个共享变量,谁拿到了锁(获得了访问权限),谁就可以执行。

2、while轮询的方式

代码如下:

import java.util.ArrayList;
import java.util.List;

public class MyList {

    private List<String> list = new ArrayList<String>();
    public void add() {
        list.add("elements");
    }
    public int size() {
        return list.size();
    }
}
import mylist.MyList;

public class ThreadA extends Thread {

    private MyList list;

    public ThreadA(MyList list) {
        super();
        this.list = list;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                list.add();
                System.out.println("添加了" + (i + 1) + "个元素");
                Thread.sleep(1000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
import mylist.MyList;

public class ThreadB extends Thread {

    private MyList list;

    public ThreadB(MyList list) {
        super();
        this.list = list;
    }

    @Override
    public void run() {
        try {
            while (true) {
                if (list.size() == 5) {
                    System.out.println("==5, 线程b准备退出了");
                    throw new InterruptedException();
                }
            }
        } catch (InterruptedException e) {

            e.printStackTrace();
        }
    }
}
import mylist.MyList;
import extthread.ThreadA;
import extthread.ThreadB;

public class Test {

    public static void main(String[] args) {
        MyList service = new MyList();

        ThreadA a = new ThreadA(service);
        a.setName("A");
        a.start();

        ThreadB b = new ThreadB(service);
        b.setName("B");
        b.start();
    }
}

在这种方式下,线程A不断地改变条件,线程ThreadB不停地通过while语句检测这个条件(list.size()5)是否成立 ,从而实现了线程间的通信。但是这种方式会浪费CPU资源。之所以说它浪费资源,是因为JVM调度器将CPU交给线程B执行时,它没做啥“有用”的工作,只是在不断地测试 某个条件是否成立。就类似于现实生活中,某个人一直看着手机屏幕是否有电话来了,而不是: 在干别的事情,当有电话来时,响铃通知TA电话来了。

这种方式还存在另外一个问题:轮询的条件的可见性问题

线程都是先把变量读取到本地线程栈空间,然后再去再去修改的本地变量。因此,如果线程B每次都在取本地的 条件变量,那么尽管另外一个线程已经改变了轮询的条件,它也察觉不到,这样也会造成死循环。

3、wait/notify机制

代码如下:

import java.util.ArrayList;
import java.util.List;

public class MyList {

    private static List<String> list = new ArrayList<String>();

    public static void add() {
        list.add("anyString");
    }

    public static int size() {
        return list.size();
    }
}
public class ThreadA extends Thread {

    private Object lock;

    public ThreadA(Object lock) {


        super();
        this.lock = lock;
    }


    @Override
    public void run() {
        try {
            synchronized (lock) {
                if (MyList.size() != 5) {
                    System.out.println("wait begin "
                            + System.currentTimeMillis());
                    lock.wait();
                    System.out.println("wait end  "
                            + System.currentTimeMillis());
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class ThreadB extends Thread {
    private Object lock;

    public ThreadB(Object lock) {
        super();
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            synchronized (lock) {
                for (int i = 0; i < 10; i++) {
                    MyList.add();
                    if (MyList.size() == 5) {
                        lock.notify();
                        System.out.println("已经发出了通知");
                    }
                    System.out.println("添加了" + (i + 1) + "个元素!");
                    Thread.sleep(1000);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public class Run {

    public static void main(String[] args) {

        try {
            Object lock = new Object();

            ThreadA a = new ThreadA(lock);
            a.start();

            Thread.sleep(50);

            ThreadB b = new ThreadB(lock);
            b.start();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

线程A要等待某个条件满足时(list.size()5),才执行操作。线程B则向list中添加元素,改变list 的size。

A,B之间如何通信的呢?也就是说,线程A如何知道 list.size() 已经为5了呢?

这里用到了Object类的 wait() 和 notify() 方法。

当条件未满足时(list.size() !=5),线程A调用wait()放弃CPU,并进入阻塞状态。

当条件满足时,线程B调用 notify()通知 线程A,所谓通知线程A,就是唤醒线程A,并让它进入可运行状态。

这种方式的一个好处就是CPU的利用率提高了。

但是也有一些缺点:比如,线程B先执行,一下子添加了5个元素并调用了notify()发送了通知,而此时线程A还执行;当线程A执行并调用wait()时,那它永远就不可能被唤醒了。因为,线程B已经发了通知了,以后不再发通知了。这说明:通知过早,会打乱程序的执行逻辑。

4、管道通信

管道通信就是使用java.io.PipedInputStreamjava.io.PipedOutputStream进行通信。

Java多线程的等待通知机制

什么是等待通知机制

在单线程编程中,要执行的操作需要满足一定的条件才能执行,可以把这个操作放在if语句块中。

在多线程编程中,可能A线程的条件没有满足只是暂时的, 稍后其他的线程B可能会更新条件使得A线程的条件得到满足. 可以将A线程暂停,直到它的条件得到满足后再将A线程唤醒.它的伪代码:

atomics{        //原子操作
    while( 条件不成立 ){
        //等待
    }
    //当前线程被唤醒条件满足后,继续执行下面的操作
}

等待/通知机制的实现

Object类中的wait()方法可以使执行当前代码的线程等待,暂停执行,直到接到通知或被中断为止。

注意:

  1. wait()方法只能在同步代码块中由锁对象调用。
  2. 调用wait()方法,当前线程会释放锁。

其伪代码如下:

//在调用wait()方法前获得对象的内部锁
synchronized( 锁对象 ){
    while( 条件不成立 ){
        //通过锁对象调用 wait()方法暂停线程,会释放锁对象
        锁对象.wait();
    }
    //线程的条件满足了继续向下执行
}

Object类的notify()可以唤醒线程,该方法也必须在同步代码块中由锁对象调用. 没有使用锁对象调用wait()/notify()会抛出IlegalMonitorStateExeption异常. 如果有多个等待的线程,notify()方法只能唤醒其中的一个. 在同步代码块中调用notify()方法后,并不会立即释放锁对象,需要等当前同步代码块执行完后才会释放锁对象,一般将notify()方法放在同步代码块的最后. 它的伪代码如下:

synchronized( 锁对象 ){
    //执行修改保护条件 的代码
    //唤醒其他线程
    锁对象.notify();
}

需要通过notify()唤醒等待的线程

public class Test01 {

    public static void main(String[] args) {
        String lock = "glj";
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (lock){
                    System.out.println("线程1开始等待: " + System.currentTimeMillis());
                    //线程等待,会释放锁对象,当前线程转入blocked阻塞状态
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("线程1结束等待:" + System.currentTimeMillis());
                }
            }
        });

        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                //notify()方法也需要在同步代码块中,由锁对象调用
                synchronized (lock){
                    System.out.println("线程2开始唤醒 : " + System.currentTimeMillis());
                    lock.notify();      //唤醒在lock锁对象上等待的某一个线程
                    System.out.println("线程2结束唤醒 : " + System.currentTimeMillis());
                }
            }
        });
        //开启t1线程,t1线程等待
        t1.start();
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //t1线程开启3秒后,再开启t2线程唤醒t1线程
        t2.start();
    }

}

notify()方法后不会立即释放锁对象

public class Test02 {

    public static void main(String[] args) throws InterruptedException {
        //定义一个List集合存储String数据
        List<String> list = new ArrayList<>();

        //定义第一个线程,当list集合中元素的数量不等于5时线程等待
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (list){
                    if (list.size()!=5) {
                        System.out.println("线程1开始等待: " + System.currentTimeMillis());
                        try {
                            System.out.println("等待");
                            list.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("线程1被唤醒:" + System.currentTimeMillis());
                    }
                }
            }
        });

        //定义第二个线程,向list集合中添加元素
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (list){
                    for(int i=0; i<10;i++){
                        list.add("data-->"+i);
                        System.out.println("线程2添加了第" + (i+1) + "个数据");

                        if (list.size()==5) {
                            //唤醒线程, 不会立即释放锁对象,需要等到当前同步代码块都执行完后才能释放锁对象
                            list.notify();
                            System.out.println("线程2发送唤醒通知");
                        }

                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }

                    }
                }
            }
        });

        t1.start();
        Thread.sleep(3000);
        t2.start();

    }

}

interrupt()会中断线程的wait()等待

当线程处于wait()等待状态时, 调用线程对象的interrupt()方法会中断线程的等待状态, 会产生InterruptedException异常。

public class Test03 {

    public static void main(String[] args) throws InterruptedException {
        SunThread sub = new SunThread();
        sub.start();
        System.out.println("当前时间:"+System.currentTimeMillis());
        Thread.sleep(3000);
        System.out.println("当前时间:"+System.currentTimeMillis());
        sub.interrupt();
        System.out.println("当前时间:"+System.currentTimeMillis());
    }


    private static final Object LOCK = new Object();

    static class SunThread extends Thread{
        @Override
        public void run() {
            synchronized (LOCK){
                try {
                    System.out.println("begin wait...");
                    LOCK.wait();
                    System.out.println("end wait..");
                } catch (InterruptedException e) {
                    System.out.println("wait等待被中断了****");
                }
            }
        }
    }

}

notify()与notifyAll()

notify()一次只能唤醒一个线程,如果有多个等待的线程,只能随机唤醒其中的某一个; 想要唤醒所有等待线程,需要调用notifyAll()。

public class Test04 {

    public static void main(String[] args) throws InterruptedException {
        Object lock = new Object();
        SubThread t1 = new SubThread(lock);
        SubThread t2 = new SubThread(lock);
        SubThread t3 = new SubThread(lock);
        t1.setName("t1");
        t2.setName("t2");
        t3.setName("t3");
        t1.start();
        t2.start();
        t3.start();

        Thread.sleep(3000);
        synchronized (lock){
            // 调用一次notify()只能唤醒其中的一个线程,其他等待的线程依然处于等待状态,
            // 对于处于等待状态的线程来说,错过了通知信号,这种现象也称为信号丢失
            // lock.notify();
            lock.notifyAll();
        }
    }

    static class SubThread extends Thread{
        private Object lock;        //定义实例变量作为锁对象
        public SubThread(Object lock) {
            this.lock = lock;
        }

        @Override
        public void run() {
            synchronized (lock){
                try {
                    System.out.println("begin wait...");
                    lock.wait();
                    System.out.println("end wait..");
                } catch (InterruptedException e) {
                    System.out.println("wait等待被中断了****");
                }
            }
        }
    }
}

wait(long)的使用

wait(long)带有long类型参数的wait()等待,如果在参数指定的时间内没有被唤醒,超时后会自动唤醒。

public class Test05 {
    public static void main(String[] args) {
        final Object obj = new Object();
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (obj){
                    try {
                        System.out.println("thread begin wait");
                        obj.wait(5000);         //如果5000毫秒内没有被唤醒 ,会自动唤醒
                        System.out.println("end wait....");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        t.start();
    }
}

避免通知过早

线程wait()等待后,可以调用notify()唤醒线程, 如果notify()唤醒的过早,在等待之前就调用了notify()可能会打乱程序正常的运行逻辑

wait等待条件发生了变化

在使用wait/nofity模式时,注意wait条件发生了变化,也可能会造成逻辑的混乱。

  1. 定义一个集合
  2. 定义一个线程向集合中添加数据,添加完数据后通知另外的线程从集合中取数据
  3. 定义一个线程从集合中取数据,如果集合中没有数据就等待
public class Test06 {

    public static void main(String[] args) {
        //定义添加数据的线程对象
        ThreadAdd threadAdd = new ThreadAdd();
        //定义取数据的线程对象
        ThreadSubtract threadSubtract = new ThreadSubtract();
        threadSubtract.setName("subtract 1 ");

        ThreadSubtract threadSubtract2 = new ThreadSubtract();
        threadSubtract2.setName("subtract 2 ");

        threadSubtract.start();
        threadSubtract2.start();

        threadAdd.start();
    }

    //定义一个集合
    static List list = new ArrayList();

    //定义一个从集合取数据的方法
    public static void subtract(){
        synchronized (list){
            while (list.size() == 0) {
                try {
                    System.out.println(Thread.currentThread().getName() + " begin wait....");
                    list.wait();        //等待
                    System.out.println(Thread.currentThread().getName() + " end wait..");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    //定义方法向集合中添加数据后,通知等待的线程取数据
    public static void add(){
        synchronized (list){
            list.add("data");
            System.out.println( Thread.currentThread().getName() + "存储了一个数据");
            list.notifyAll();
        }
    }

    static class ThreadAdd extends Thread{
        @Override
        public void run() {
            add();
        }
    }

    static class ThreadSubtract extends Thread{
        @Override
        public void run() {
            subtract();
        }
    }

}

Java生产者消费者模式

在Java中,负责产生数据的模块是生产者,负责使用数据的模块是消费者. 生产者消费者解决数据的平衡问题,即先有数据然后才能使用,没有数据时,消费者需要等待。

生产-消费:操作值

package com.glj.producerdata;

/**
 * 定义一个操作数据的类
 *
 * @Author: GuoLiangJun
 * @Date: 2021/9/20
 * @Time: 15:10
 */
public class ValueOP {

    private String value = "";

    public void getValue() {
        synchronized (this){
            while ("".equalsIgnoreCase(value)) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //不是空串,读取 字段值
            System.out.println("get的值是: " + this.value);
            this.value = "";
            this.notifyAll();
        }
    }

    //定义方法修改value字段的值
    public void setValue() {
        synchronized (this){
            while (!"".equalsIgnoreCase(value)){
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //如果value字段值是容串, 就设置value字段的值
            String value = System.currentTimeMillis() + " - " + System.nanoTime();
            System.out.println("set设置的值是: " + value);
            this.value = value;
            this.notifyAll();
        }
    }

}

package com.glj.producerdata;

/**
 * 生产者
 *
 * @Author: GuoLiangJun
 * @Date: 2021/9/20
 * @Time: 15:20
 */
public class ProducerThread extends Thread{

    private ValueOP valueOP;

    public ProducerThread(ValueOP valueOP) {
        this.valueOP = valueOP;
    }

    @Override
    public void run() {
        while (true){
            valueOP.setValue();
        }
    }
}

package com.glj.producerdata;

/**
 * 消费者
 *
 * @Author: GuoLiangJun
 * @Date: 2021/9/20
 * @Time: 15:22
 */
public class ConsumerThread extends Thread{
    //消费者使用数据, 就是使用ValueOP类的value字段值
    private ValueOP valueOP;

    public ConsumerThread(ValueOP valueOP) {
        this.valueOP = valueOP;
    }

    @Override
    public void run() {
        while (true){
            valueOP.getValue();
        }
    }
}

package com.glj.producerdata;

/**
 * 测试类
 *
 * @Author: GuoLiangJun
 * @Date: 2021/9/20
 * @Time: 15:24
 */
public class Test01 {
    public static void main(String[] args) {
        ValueOP valueOP = new ValueOP();

        ProducerThread p1 = new ProducerThread(valueOP);
        ProducerThread p2 = new ProducerThread(valueOP);
        ProducerThread p3 = new ProducerThread(valueOP);
        ConsumerThread c1 = new ConsumerThread(valueOP);
        ConsumerThread c2 = new ConsumerThread(valueOP);
        ConsumerThread c3 = new ConsumerThread(valueOP);

        p1.start();
        p2.start();
        p3.start();
        c1.start();
        c2.start();
        c3.start();
    }
}

模拟操作栈

使生产者把数据存储到List集合中, 消费者从List集合中取数据,使用List集合模拟栈。

package com.glj.producerdata;

import java.util.ArrayList;
import java.util.List;

/**
 * 模拟栈
 *
 * @Author: GuoLiangJun
 * @Date: 2021/9/20
 * @Time: 17:05
 */
public class MyStack {

    //定义集合模拟栈
    private List list = new ArrayList();
    //集合的最大容量
    private static final int MAXSIZE = 3;

    public synchronized void push(){
        //当栈中的数据已满 就等待
        while (list.size() >= MAXSIZE) {
            System.out.println(Thread.currentThread().getName() + " begin  wait....");
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        String data = "data--" + Math.random();
        System.out.println( Thread.currentThread().getName() + "添加了数据: " + data);
        list.add(data);
        this.notifyAll();
    }

    public synchronized void pop(){
        while (list.size() == 0) {
            try {
                System.out.println(Thread.currentThread().getName() + " begin  wait....");
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println( Thread.currentThread().getName() + "出栈数据:" + list.remove(0) );
        this.notifyAll();
    }

}

package com.glj.producerdata;

/**
 * 生产者
 *
 * @Author: GuoLiangJun
 * @Date: 2021/9/20
 * @Time: 15:20
 */
public class ProducerThreadStack extends Thread{

    private MyStack myStack;

    public ProducerThreadStack(MyStack myStack) {
        this.myStack = myStack;
    }

    @Override
    public void run() {
        while (true){
            myStack.push();
        }
    }


}

package com.glj.producerdata;

/**
 * 消费者
 *
 * @Author: GuoLiangJun
 * @Date: 2021/9/20
 * @Time: 15:22
 */
public class ConsumerThreadStack extends Thread{
    //消费者使用数据, 就是使用ValueOP类的value字段值
    private MyStack myStack;

    public ConsumerThreadStack(MyStack myStack) {
        this.myStack = myStack;
    }

    @Override
    public void run() {
        while (true){
            myStack.pop();
        }
    }
}

package com.glj.producerdata;

/**
 * 测试类
 *
 * @Author: GuoLiangJun
 * @Date: 2021/9/20
 * @Time: 15:24
 */
public class TestStack {
    public static void main(String[] args) {
        MyStack stack = new MyStack();

        ProducerThreadStack  p = new ProducerThreadStack(stack);
        ProducerThreadStack  p2 = new ProducerThreadStack(stack);
        ProducerThreadStack  p3 = new ProducerThreadStack(stack);
        ProducerThreadStack c1 = new ProducerThreadStack(stack);
        ProducerThreadStack c2 = new ProducerThreadStack(stack);
        ProducerThreadStack c3 = new ProducerThreadStack(stack);
        p.setName("生产者1号");
        p2.setName("生产者2号");
        p3.setName("生产者3号");
        c1.setName("消费者1号");
        c2.setName("消费者2号");
        c3.setName("消费者3号");

        p.start();
        p2.start();
        p3.start();
        c1.start();
        c2.start();
        c3.start();
    }
}

Java线程中join方法

在主线程中启动了子线程,如果子线程要进行大量耗时运行,主线程往往早于子线程结束,如果主线程想等在子线程结束后再结束,如主线程想要使用子线程运算结果,这时就需要使用join()方法, join()方法的作用是等待线程对象销毁,在当前线程中加入子线程,当前线程会转为等待状态,等到子线程运行结束后,当前线程再继续向下执行。

package com.glj.join;

import java.util.Random;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: GuoLiangJun
 * @Date: 2021/9/20
 * @Time: 17:23
 */
public class Test {


    public static void main(String[] args) throws InterruptedException {

        SubThread t = new SubThread();
        t.start();
        t.join();


    }

    static  class SubThread extends Thread{
        @Override
        public void run() {
            int xx = new Random().nextInt(10000);
            System.out.println("在子线程中产生了数据: " + xx);
            try {
                Thread.sleep(xx);       //模拟子线程运行耗时
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

Java ThreadLocal使用

除了控制资源的访问外, 还可以通过增加资源来保证线程安全,ThreadLocal主要解决为每个线程绑定自己的值。

ThreadLocal的基本使用

package com.glj.threadlocal;

/**
 * @Author: GuoLiangJun
 * @Date: 2021/9/20
 * @Time: 17:30
 */
public class Test01 {

    static ThreadLocal threadLocal = new ThreadLocal();

    public static void main(String[] args) {
        SubThread t1 = new SubThread();
        SubThread t2 = new SubThread();

        t1.start();
        t2.start();
    }

    static class SubThread extends Thread{
        @Override
        public void run() {
            for (int i = 0;i<20;i++){
                //设置线程关联的的值
                threadLocal.set( Thread.currentThread().getName() + " - " + i);
                //调用get()方法读取关联的值
                System.out.println(Thread.currentThread().getName() + " value = " + threadLocal.get());
            }
        }

    }

}

Java中锁的可重入性

在JDK5中增加了Lock锁接口,有ReentrantLock实现类,ReentrantLock锁称为可重入锁, 它功能比synchronized多。

锁的可重入性

锁的可重入是指,当一个线程获得一个对象锁后,再次请求该对象锁时是可以获得该对象的锁的。

演示锁的可重入性:

package com.glj.lock;

/**
 *
 * @Author: GuoLiangJun
 * @Date: 2021/9/20
 * @Time: 17:34
 */
public class Test01 {

    public static void main(String[] args) {
        Test01 t1 = new Test01();
        new Thread(new Runnable() {
            @Override
            public void run() {
                t1.sm1();
            }
        }).start();
    }

    private synchronized void sm1(){
        System.out.println("同步方法1");
        //线程执行sm1()方法,默认this作为锁对象,在sm1()方法中调用了sm2()方法,注意当前线程还是持有this锁对象的
        //sm2()同步方法默认的锁对象也是this对象, 要执行sm2()必须先获得this锁对象,当前this对象被当前线程持有,可以再次获得this对象, 这就是锁的可重入性. 假设锁不可重入的话,可能会造成死锁
        sm2();
    }

    private synchronized void sm2(){
        System.out.println("同步方法2");
        sm3();
    }

    private synchronized void sm3(){
        System.out.println("同步方法3");
    }

}

Java ReentrantLock使用

调用lock()方法获得锁, 调用unlock()释放锁。

Lock锁的基本使用:

package com.glj.lock;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: GuoLiangJun
 * @Date: 2021/9/20
 * @Time: 21:26
 */
public class Test02 {
    //自定义一个显示锁
    static Lock lock = new ReentrantLock();

    public static void sm(){
        lock.lock();
        for (int i=0;i<100;i++){
            System.out.println(Thread.currentThread().getName() + " -- " + i);
        }
        lock.unlock();
    }

    public static void main(String[] args) {
        Runnable r = new Runnable() {
            @Override
            public void run() {
                sm();
            }
        };

        new Thread(r).start();
        new Thread(r).start();
    }

}

使用Lock锁同步不同方法中的同步代码块

package com.glj.lock;

import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: GuoLiangJun
 * @Date: 2021/9/20
 * @Time: 21:42
 */
public class Test03 {

    static Lock lock = new ReentrantLock();

    public static void main(String[] args) {
        Runnable r1 = new Runnable() {
            @Override
            public void run() {
                sm1();
            }
        };

        Runnable r2 = new Runnable() {
            @Override
            public void run() {
                sm2();
            }
        };

        new Thread(r1).start();
        new Thread(r1).start();
        new Thread(r1).start();
        new Thread(r2).start();
        new Thread(r2).start();
        new Thread(r2).start();
    }

    public static void sm1(){
        lock.lock();//获得锁
        try{
            System.out.println(Thread.currentThread().getName() + "-- method 1 -- " + System.currentTimeMillis() );
            Thread.sleep(new Random().nextInt(1000));
            System.out.println(Thread.currentThread().getName() + "-- method 1 -- " + System.currentTimeMillis() );
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public static void sm2(){
        lock.lock();//获得锁
        try{
            System.out.println(Thread.currentThread().getName() + "-- method 1 -- " + System.currentTimeMillis() );
            Thread.sleep(new Random().nextInt(1000));
            System.out.println(Thread.currentThread().getName() + "-- method 1 -- " + System.currentTimeMillis() );
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

}

ReentrantLock锁的可重入性

package com.glj.lock;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: GuoLiangJun
 * @Date: 2021/9/20
 * @Time: 22:32
 */
public class Test04 {

    static class SubThread extends Thread{
        private static Lock lock = new ReentrantLock();

        public static int num = 0;

        public static void main(String[] args) throws InterruptedException {
            SubThread t1 = new SubThread();
            SubThread t2 = new SubThread();
            t1.start();
            t2.start();
            t1.join();
            t2.join();
            System.out.println( SubThread.num );
        }


        @Override
        public void run() {
            for (int i = 0; i <10000 ; i++) {
                try {
                    //可重入锁指可以反复获得该锁
                    lock.lock();
                    lock.lock();
                    num++;
                }finally {
                    lock.unlock();
                    lock.unlock();
                }
            }
        }
    }
}

lockInterruptibly()方法

lockInterruptibly()方法的作用:如果当前线程未被中断则获得锁,如果当前线程被中断则出现异常。

package com.glj.lock;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: GuoLiangJun
 * @Date: 2021/9/20
 * @Time: 22:38
 */
public class Test05 {

    public static void main(String[] args) throws InterruptedException {
        Servier  s = new Servier();
        Runnable r = new Runnable() {
            @Override
            public void run() {
                s.serviceMethod();
            }
        };
        Thread t1 = new Thread(r);
        t1.start();

        Thread.sleep(50);
        Thread t2 = new Thread(r);
        t2.start();
        Thread.sleep(50);
        t2.interrupt();     //中断t2线程
    }

    static class Servier{
        private Lock lock1 = new ReentrantLock();

        public void serviceMethod(){
            try {
                //获得锁定,即使调用了线程的interrupt()方法,也没有真正的中断线程
                lock1.lock();
                //如果线程被中断了,不会获得锁,会产生异常
                lock1.lockInterruptibly();
                System.out.println(Thread.currentThread().getName() + "-- begin lock");
                //执行一段耗时的操作
                for (int i = 0; i < Integer.MAX_VALUE; i++) {
                    new StringBuilder();
                }
                System.out.println( Thread.currentThread().getName() + " -- end lock");
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                lock1.unlock();
                System.out.println( Thread.currentThread().getName() + " ***** 释放锁");
            }
        }
    }

}

Java公平锁与非公平锁

大多数情况下,锁的申请都是非公平的, 如果线程1与线程2都在请求锁A,当锁A可用时, 系统只是会从阻塞队列中随机的选择一个线程, 不能保证其公平性。

公平的锁会按照时间先后顺序,保证先到先得, 公平锁的这一特点不会出现线程饥饿现象。

synchronized内部锁就是非公平的. ReentrantLock重入锁提供了一个构造方法:ReentrantLock(boolean fair) ,当在创建锁对象时实参传递true可以把该锁设置为公平锁. 公平锁看起来很公平,但是要实现公平锁必须要求系统维护一个有序队列,公平锁的实现成本较高,性能也低. 因此默认情况下锁是非公平的. 不是特别的需求,一般不使用公平锁。

/**
 * 公平 锁与非公平锁
 */
public class Test01 {
//    static ReentrantLock lock = new ReentrantLock();        //默认是非公平锁
    static ReentrantLock lock = new ReentrantLock(true);        //定义公平锁

    public static void main(String[] args) {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                while (true){
                    try {
                        lock.lock();
                        System.out.println(Thread.currentThread().getName() + " 获得了锁对象");
                    }finally {
                        lock.unlock();
                    }
                }
            }
        };

        for (int i = 0; i < 5; i++) {
            new Thread(runnable).start();
        }
        /*
            运行程序
                1)如果是非公平锁, 系统倾向于让一个线程再次获得已经持有的锁, 这种分配策略是高效的,非公平的
                2)如果是公平锁, 多个线程不会发生同一个线程连续多次获得锁的可能,保证了公平性
         */
    }
}

Lock锁的常用方法

int getHoldCount() 返回当前线程调用lock()方法的次数。

int getQueueLength() 返回正等待获得锁的线程预估数。

int getWaitQueueLength(Condition condition) 返回与Condition条件相关的等待的线程预估数。

boolean hasQueuedThread(Thread thread) 查询参数指定的线程是否在等待获得锁。

boolean hasQueuedThreads() 查询是否还有线程在等待获得该锁。

boolean hasWaiters(Condition condition) 查询是否有线程正在等待指定的Condition条件。

boolean isFair() 判断是否为公平锁。

boolean isHeldByCurrentThread() 判断当前线程是否持有该锁。

boolean isLocked() 查询当前锁是否被线程持有。

Java线程组

类似于在计算机中使用文件夹管理文件,也可以使用线程组来管理线程,在线程组中定义一组相似(相关)的线程,在线程组中也可以定义子线程组。

Thread类有几个构造方法允许在创建线程时指定线程组,如果在创建线程时没有指定线程组则该线程就属于父线程所在的线程组,JVM在创建main线程时会为它指定一个线程组,因此每个Java线程都有一个线程组与之关联, 可以调用线程的getThreadGroup()方法返回线程组。

线程组开始是出于安全的考虑设计用来区分不同的Applet,然而ThreadGroup并未实现这一目标,在新开发的系统中,已经不常用线程组,现在一般会将一组相关的线程存入一个数组或一个集合中,如果仅仅是用来区分线程时,可以使用线程名称来区分,多数情况下,可以忽略线程组。

Java创建线程组

package com.glj.threadgroup;

/**
 * 演示创建线程组
 *
 * @Author: GuoLiangJun
 * @Date: 2021/9/21
 * @Time: 23:34
 */
public class Test01 {

    public static void main(String[] args) {
        // 1) 返回当前main线程的线程组
        ThreadGroup mainGroup  = Thread.currentThread().getThreadGroup();
        System.out.println("mainGroup:"+mainGroup);

        //2) 定义线程组,如果不指定所属线程组,则自动归属当前线程所属的线程组中
        ThreadGroup group1 = new ThreadGroup("group1");
        System.out.println(group1);

        //3)定义线程组, 同时指定父线程组
        ThreadGroup group2 = new ThreadGroup(mainGroup, "group2");

        System.out.println( group1.getParent() == mainGroup);
        System.out.println( group2.getParent() == mainGroup);

        //4) 在创建线程时指定所属线程组
        Runnable r = new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread());
            }
        };

        //在创建线程时,如果没有指定线程组,则默认线程归属到父线程的线程组中
        //在main线程中创建了t1线程,称main线程为父线程,t1线程为子线程, t1没有指定线程组则t1线程就归属到父线程main线程的线程组中
        Thread t1 = new Thread(r, "t1");
        //Thread[t1,5,main], t1的线程组是main线程组
        System.out.println( t1 );
        //创建线程时,可以指定线程所属线程组
        Thread t2 = new Thread(group1, r, "t2");
        Thread t3 = new Thread(group2, r, "t3");
        System.out.println(t2);
        System.out.println(t3);

    }

}

Java线程组的基本操作

activeCount()

返回当前线程组及子线程组中活动线程的数量(近似值)。

activeGroupCount()

返回当前线程组及子线程组中活动线程组的数量(近似值)。

int enumerate(Thread[] list)

将当前线程组中的活动线程复制到参数数组中。

enumerate(ThreadGroup[] list)

将当前线程组中的活动线程组复制到参数数组中。

getMaxPriority()

返回线程组的最大优先级,默认是10。

getName()

返回线程组的名称。

getParent()

返回父线程组。

interrupt()

中断线程组中所有的线程。

isDaemon()

判断当前线程组是否为守护线程组。

list()

将当前线程组中的活动线程打印出来。

parentOf(ThreadGroup g)

判断当前线程组是否为参数线程组的父线程组。

setDaemon(boolean daemon)

设置线程组为守护线程组。

演示线程组的基本操作

package com.glj.threadgroup;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: GuoLiangJun
 * @Date: 2021/9/21
 * @Time: 23:56
 */
public class Test02 {

    public static void main(String[] args) {
        ThreadGroup mainGroup  = Thread.currentThread().getThreadGroup();

        ThreadGroup threadGroup = new ThreadGroup("threadGroup");

        Runnable r = new Runnable() {
            @Override
            public void run() {
                System.out.println("-----------当前线程: " + Thread.currentThread());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        Thread t1 = new Thread(r,"t1");

        Thread t2 = new Thread(threadGroup,r,"t2");

        t1.start();
        t2.start();

        //打印线程组的相关属性
        System.out.println("main 线程组中活动线程数量: " + mainGroup.activeCount());
        System.out.println("group 子线程组中活动线程数量: " + threadGroup.activeCount());    //1,  t2
        System.out.println("main线程组中子线程组数量: " + mainGroup.activeGroupCount());  //1,  group
        System.out.println("group子线程组中子线程组数量: " + threadGroup.activeGroupCount());    //0
        System.out.println("main线程组的父线程组: " + mainGroup.getParent());   //main线程组的父线程组是system
        System.out.println("group线程组的父线程组: " + threadGroup.getParent());  //main
        System.out.println( mainGroup.parentOf(mainGroup));  //true, 线程组也是它自己的父线程组
        System.out.println( mainGroup.parentOf(threadGroup));     //true
        mainGroup.list();   //把main线程组中所有的线程打印输出
    }

}

复制线程组中的线程及子线程组

enumerate(Thread[] list)

把当前线程组和子线程组中所有的线程复制到参数数组中。

enumerate(Thread[] list, boolean recursive)

如果第二个参数设置为false,则只复制当前线程组中所有的线程,不复制子线程组中的线程。

enumerate(ThreadGroup[] list)

把当前线程组和子线程组中所有的线程组复制到参数数组中。

enumerate(ThreadGroup[] list, boolean recurse)

第二个参数设置false,则只复制当前线程组的子线程组。

演示复制线程组中的内容

package com.glj.threadgroup;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: GuoLiangJun
 * @Date: 2021/9/22
 * @Time: 0:08
 */
public class Test03 {

    public static void main(String[] args) {
        ThreadGroup mainGroup = Thread.currentThread().getThreadGroup();

        //main线程组中定义了两个子线程组

        ThreadGroup group1 = new ThreadGroup("group1");
        ThreadGroup group2 = new ThreadGroup(mainGroup, "group2");


        Runnable r = new Runnable() {
            @Override
            public void run() {
                while (true){
                    System.out.println("----当前线程: " + Thread.currentThread());
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };

        //创建并启动三个线程
        Thread t1 = new Thread(r, "t1");
        Thread t2 = new Thread(group1, r, "t2");
        Thread t3 = new Thread(group2, r, "t3");

        t1.start();
        t2.start();
        t3.start();

        Thread[] threadList = new Thread[mainGroup.activeCount()];

        //把main线程组包括子线程组中的所有的线程复制到数组中
        mainGroup.enumerate(threadList);
        for (Thread thread : threadList) {
            System.out.println(thread);
        }
        System.out.println("----------------------------");

        //只把main线程组中的线程复制到数组中,不包含子线程组的线程
        mainGroup.enumerate(threadList, false);
        //遍历threadList数组
        for (Thread thread : threadList) {
            System.out.println(thread);
        }
        System.out.println("----------------------------");

        //2) 把main线程组中的子线程组复制到数组中
        //定义数组存储线程组
        ThreadGroup [] threadGroups = new ThreadGroup[mainGroup.activeGroupCount()];
        //把main线程组中的子线程组复制到数组中
        mainGroup.enumerate(threadGroups);
        System.out.println("============================");
        for (ThreadGroup threadGroup : threadGroups) {
            System.out.println(threadGroup);
        }

    }


}

线程组的批量中断

线程组的interrupt() 可以给该线程组中所有的活动线程添加中断标志。

package com.glj.threadgroup;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: GuoLiangJun
 * @Date: 2021/9/22
 * @Time: 0:15
 */
public class Test04 {

    public static void main(String[] args) throws InterruptedException {

        Runnable r = new Runnable() {
            @Override
            public void run() {
                System.out.println("当前线程--" + Thread.currentThread() + "--开始循环");
                //当线程没有被中断就一直循环
                while (Thread.currentThread().isInterrupted()) {
                    System.out.println(Thread.currentThread().getName() + "------------------");
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        //如果中断睡眠中的线程,产生中断异常, 同时会清除中断标志
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName() + "循环结束");
            }
        };

        //创建线程组
        ThreadGroup group = new ThreadGroup("group");
        //在group线程组中创建5个线程
        for (int i = 0; i < 5; i++) {
            new Thread(group,r,"T"+i).start();
        }

        //main线程睡眠2秒
        Thread.sleep(50);
        //中断线程组, 会中断线程组中所有的线程
        group.interrupt();


    }
}

设置守护线程组

守护线程是为其他线程提供服务的,当JVM中只有守护线程时,守护线程会自动销毁,JVM会退出。

调用线程组的setDaemon(true)可以把线程组设置为守护线程组,当守护线程组中没有任何活动线程时,守护线程组会自动销毁。

注意线程组的守护属性,不影响线程组中线程的守护属性,或者说守护线程组中的线程可以是非守护线程。

package com.glj.threadgroup;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: GuoLiangJun
 * @Date: 2021/9/22
 * @Time: 0:18
 */
public class Test05 {

    public static void main(String[] args) throws InterruptedException {

        ThreadGroup threadGroup = new ThreadGroup("group");

        //设置线程组为守护线程组
        threadGroup.setDaemon(true);

        Runnable r = new Runnable() {
            @Override
            public void run() {
                for (int j = 0; j < 20; j++) {
                    System.out.println(Thread.currentThread().getName() + " -- " + j);
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };

        for (int i=0;i<5;i++){
            new Thread(threadGroup,r,"t"+i).start();
        }

        //main线程睡眠5秒
        Thread.sleep(5000);
        System.out.println("main...end....");

    }

}

Java多线程捕获异常处理

在线程的run方法中,如果有受检异常必须进行捕获处理,如果想要获得run()方法中出现的运行时异常信息,可以通过回调UncaughtExceptionHandler接口获得哪个线程出现了运行时异常,在Thread类中有关处理运行异常的方法有:

getDefaultUncaughtExceptionHandler()

获得全局的(默认的)UncaughtExceptionHandler

getUncaughtExceptionHandler()

获得当前线程的UncaughtExceptionHandler

setDefaultUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh)

设置全局的UncaughtExceptionHandler

setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh)

设置当前线程的UncaughtExceptionHandler

当线程运行过程中出现异常,JVM会调用Thread类的dispatchUncaughtException(Throwable e)方法, 该方法会调用getUncaughtExceptionHandler().uncaughtException(this, e); 如果想要获得线程中出现异常的信息,就需要设置线程的UncaughtExceptionHandler

package com.glj.threadexception;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: GuoLiangJun
 * @Date: 2021/9/22
 * @Time: 15:37
 */
public class Test01 {

    public static void main(String[] args) {

         /*
            在实际开发中,这种设计异常处理的方式还是比较常用的,尤其是异常执行的方法
            如果线程产生了异常, JVM会调用dispatchUncaughtException()方法,在该方法中调用了getUncaughtExceptionHandler().uncaughtException(this, e);
            如果当前线程设置了UncaughtExceptionHandler回调接口就直接调用它自己的uncaughtException方法,
            如果没有设置则调用当前线程所在线程组UncaughtExceptionHandler回调接口的uncaughtException方法,如果线程组也没有设置回调接口,则直接把异常的栈信息定向到System.err中
         */

        //设置线程全局的回调接口
        Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){
            @Override
            public void uncaughtException(Thread t, Throwable e) {
                //t参数接收发生异常的线程, e就是该线程中的异常
                System.out.println(t.getName() + "线程产生了异常: " + e.getMessage());
            }
        });


        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + "开始运行");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    //线程中的受检异常必须捕获处理
                    e.printStackTrace();
                }
                //会产生算术异常
                System.out.println(12 / 0 );
            }
        });

        t1.start();

        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + "开始运行");
                String txt = null;
                //会产生空指针异常
                System.out.println( txt.length());
            }
        });
        t2.start();

    }

}

Hook钩子线程注入

现在很多软件包括MySQL, Zookeeper, kafka等都存在Hook线程的校验机制, 目的是校验进程是否已启动,防止重复启动程序。

Hook线程也称为钩子线程, 当JVM退出的时候会执行Hook线程.经常在程序启动时创建一个.lock文件, 用.lock文件校验程序是否启动,在程序退出(JVM退出)时删除该.lock文件, 在Hook线程中除了防止重新启动进程外,还可以做资源释放, 尽量避免在Hook线程中进行复杂的操作。

package com.glj.hook;

import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.TimeUnit;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: GuoLiangJun
 * @Date: 2021/9/22
 * @Time: 15:47
 */
public class Test01 {


    public static void main(String[] args) {
        //1)注入Hook线程,在程序退出时删除.lock文件
        Runtime.getRuntime().addShutdownHook(new Thread(){
            @Override
            public void run() {
                System.out.println("JVM退出,会启动当前Hook线程,在Hook线程中删除.lock文件");
                getLockFile().toFile().delete();
            }
        });

        //2)程序运行时,检查lock文件是否存在,如果lock文件存在,则抛出异常
        System.out.println("getLockFile().toFile():"+getLockFile().toFile());
        if (getLockFile().toFile().exists()) {
            throw  new RuntimeException("程序已启动");
        }else {
            //文件不存在,说明程序是第一次启动,创建lock文件
            try {
                getLockFile().toFile().createNewFile();
                System.out.println("程序在启动时创建了lock文件");
            } catch (IOException e) {
                e.printStackTrace();
            }

        }

        //模拟程序运行
        for (int i = 0; i < 10; i++) {
            System.out.println("程序正在运行");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

    private static Path getLockFile(){
        return Paths.get("", "tmp.lock");
    }


}

Java线程池是什么

可以以 new Thread( () -> { 线程执行的任务 }).start(); 这种形式开启一个线程. 当run()方法运行结束,线程对象会被GC释放。

在真实的生产环境中,可能需要很多线程来支撑整个应用,当线程数量非常多时 ,反而会耗尽CPU资源. 如果不对线程进行控制与管理,反而会影响程序的性能. 线程开销主要包括: 创建与启动线程的开销; 线程销毁开销; 线程调度的开销; 线程数量受限CPU处理器数量。

线程池就是有效使用线程的一种常用方式. 线程池内部可以预先创建一定数量的工作线程,客户端代码直接将任务作为一个对象提交给线程池, 线程池将这些任务缓存在工作队列中, 线程池中的工作线程不断地从队列中取出任务并执行。

img

多线程JDK线程池

JDK提供了一套Executor框架,可以帮助开发人员有效的使用线程池。

img

线程池的基本使用

package com.glj.threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: GuoLiangJun
 * @Date: 2021/9/22
 * @Time: 16:04
 */
public class Test01 {

    public static void main(String[] args) {

        //创建有5个线程大小的线程池
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);

        //向线程池中提交18个任务,这18个任务存储到线程池的阻塞队列中, 线程池中这5个线程就从阻塞队列中取任务执行
        for (int i = 0; i < 18; i++) {

            fixedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getId() + " 编号的任务在执行任务,开始时间: " + System.currentTimeMillis());
                    try {
                        Thread.sleep(3000);     //模拟任务执行时长
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}

线程池的计划任务

package com.glj.threadpool;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: GuoLiangJun
 * @Date: 2021/9/22
 * @Time: 16:10
 */
public class Test02 {

    public static void main(String[] args) {
        //创建一个有调度功能的线程池
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);


        //在延迟2秒后执行任务, schedule(  Runnable任务, 延迟时长, 时间单位)
        scheduledExecutorService.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getId() + " -- " + System.currentTimeMillis() );
            }
        },2, TimeUnit.SECONDS);


        //以固定的频率执行任务,开启任务的时间是固定的, 在3秒后执行任务,以后每隔5秒重新执行一次
/*        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getId() + "----在固定频率开启任务---" + System.currentTimeMillis());
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },3,2,TimeUnit.SECONDS);*/


        //在上次任务结束后,在固定延迟后再次执行该任务,不管执行任务耗时多长,总是在任务结束后的2秒再次开启新的任务
        scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getId() + "----在固定频率开启任务---" + System.currentTimeMillis());
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },3,2,TimeUnit.SECONDS);

    }

}

总结:

scheduleAtFixedRate :

是以上一个任务开始的时间计时,period时间过去后,检测上一个任务是否执行完毕,如果上一个任务执行完毕,则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行。

scheduleWithFixedDelay:

是以上一个任务结束时开始计时,period时间过去后,立即执行

Java线程池的底层实现

查看Executors工具类中newCachedThreadPool(), newSingleThreadExecutor(), newFixedThreadPool()源码

newCachedThreadPool()

该线程池在极端情况下,每次提交新的任务都会创建新的线程执行. 适合用来执行大量耗时短并且提交频繁的任务

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}   

newFixedThreadPool

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

imgExcutors工具类中返回线程池的方法底层都使用了ThreadPoolExecutor线程池,这些方法都是ThreadPoolExecutor线程池的封装。

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

img各个参数含义:

corePoolSize 指定线程池中核心线程的数量。

maxinumPoolSize 指定线程池中最大线程数量。

keepAliveTime 当线程池线程的数量超过corePoolSize时,多余的空闲线程的存活时长,即空闲线程在多长时长内销毁。

unit 是keepAliveTime时长单位。

workQueue 任务队列,把任务提交到该任务队列中等待执行。

threadFactory 线程工厂,用于创建线程。

handler 拒绝策略,当任务太多来不及处理时,如何拒绝。

说明:

workQueue工作队列是指提交未执行的任务队列,它是BlockingQueue接口的对象,仅用于存储Runnable任务.根据队列功能分类,在ThreadPoolExecutor构造方法中可以使用以下几种阻塞队列:

1、直接提交队列,由SynchronousQueue 对象提供,该队列没有容量,提交给线程池的任务不会被真实的保存,总是将新的任务提交给线程执行,如果没有空闲线程,则尝试创建新的线程,如果线程数量已经达到maxinumPoolSize规定的最大值则执行拒绝策略。

2、有界任务队列,由ArrayBlockingQueue实现,在创建ArrayBlockingQueue对象时,可以指定一个容量. 当有任务需要执行时,如果线程池中线程数小于corePoolSize核心线程数则创建新的线程;如果大于corePoolSize核心线程数则加入等待队列.如果队列已满则无法加入,在线程数小于maxinumPoolSize指定的最大线程数前提下会创建新的线程来执行,如果线程数大于maxinumPoolSize最大线程数则执行拒绝策略。

img

3、无界任务队列,由LinkedBlockingQueue对象实现,与有界队列相比,除非系统资源耗尽,否则无界队列不存在任务入队失败的情况. 当有新的任务时,在系统线程数小于corePoolSize核心线程数则创建新的线程来执行任务;当线程池中线程数量大于corePoolSize核心线程数则把任务加入阻塞队列。

4、优先任务队列是通过PriorityBlockingQueue实现的,是带有任务优先级的队列,是一个特殊的无界队列.不管是ArrayBlockingQueue队列还是LinkedBlockingQueue队列都是按照先进先出算法处理任务的.在PriorityBlockingQueue队列中可以根据任务优先级顺序先后执行。

Java线程池的拒绝策略

ThreadPoolExecutor构造方法的最后一个参数指定了拒绝策略.当提交给线程池的任务量超过实际承载能力时,如何处理? 即线程池中的线程已经用完了,等待队列也满了,无法为新提交的任务服务,可以通过拒绝策略来处理这个问题. JDK提供了四种拒绝策略:

● AbortPolicy策略,会抛出异常。

● CallerRunsPolicy策略,只要线程池没关闭,会在调用者线程中运行当前被丢弃的任务。

● DiscardOldestPolicy将任务队列中最老的任务丢弃,尝试再次提交新任务。

● DiscardPolicy直接丢弃这个无法处理的任务。

Executors工具类提供的静态方法返回的线程池默认的拒绝策略是AbortPolicy抛出异常,如果内置的拒绝策略无法满足实际需求,可以扩展RejectedExecutionHandler接口。

自定义拒绝策略列子

package com.glj.threadpool;

import java.util.Random;
import java.util.concurrent.*;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: GuoLiangJun
 * @Date: 2021/9/22
 * @Time: 17:13
 */
public class Test03 {

    public static void main(String[] args) {

        Runnable r = new Runnable() {
            @Override
            public void run() {
                int num = new Random().nextInt(5);
                System.out.println(Thread.currentThread().getId() + "--" + System.currentTimeMillis() + "开始睡眠" + num + "秒");
                try {
                    TimeUnit.SECONDS.sleep(num);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };


        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), Executors.defaultThreadFactory(), new RejectedExecutionHandler(){
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                //r就是请求的任务, executor就是当前线程池
                System.out.println(r + " is discarding..");
            }
        });

        //向线程池提交若干任务
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            threadPoolExecutor.submit(r);
        }

    }

}

Java线程池ThreadFactory

线程池中的线程从哪儿来的? 答案就是ThreadFactory

ThreadFactory是一个接口,只有一个用来创建线程的方法:

Thread newThread(Runnable r);

当线程池中需要创建线程时就会调用该方法。

自定义线程工厂列子:

package com.glj.threadpool;

import java.util.Random;
import java.util.concurrent.*;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: GuoLiangJun
 * @Date: 2021/9/22
 * @Time: 17:21
 */
public class Test04 {

    public static void main(String[] args) {
        //定义任务
        Runnable r = new Runnable() {
            @Override
            public void run() {
                int num = new Random().nextInt(10);
                System.out.println(Thread.currentThread().getId() + "--" + System.currentTimeMillis() + "开始睡眠:" + num + "秒");
                try {
                    TimeUnit.SECONDS.sleep(num);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        //创建线程池, 使用自定义线程工厂, 采用默认的拒绝策略是抛出异常
        ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(true);
                System.out.println("创建了线程: " + t);
                return t ;
            }
        });

        //提交5个任务, 当给当前线程池提交的任务超过5个时,线程池默认抛出异常
        for (int i = 0; i < 5; i++) {
            executorService.submit(r);
        }

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


    }

}

Java监控线程池

ThreadPoolExecutor提供了一组方法用于监控线程池。

int getActiveCount()

获得线程池中当前活动线程的数量。

long getCompletedTaskCount()

返回线程池完成任务的数量。

int getCorePoolSize()

线程池中核心线程的数量。

int getLargestPoolSize()

返回线程池曾经达到的线程的最大数。

int getMaximumPoolSize()

返回线程池的最大容量。

int getPoolSize()

当前线程池的大小。

BlockingQueue getQueue()

返回阻塞队列。

long getTaskCount()

返回线程池收到的任务总数。

监控线程池列子:

package com.glj.threadpool;

import java.util.concurrent.*;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: GuoLiangJun
 * @Date: 2021/9/22
 * @Time: 17:26
 */
public class Test05 {

    public static void main(String[] args) {
        Runnable r = new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getId() + " 编号 的线程开始执行: " + System.currentTimeMillis());
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };


        TimeUnit unit;
        BlockingQueue workQueue;
        unit = TimeUnit.SECONDS;
        workQueue =  new ArrayBlockingQueue<>(5);
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,5 ,0 , unit, workQueue,
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardPolicy());

        //向线程池提交30个任务
        for (int i = 0; i < 20; i++) {
            threadPoolExecutor.submit(r);
            /*System.out.println("当前线程池核心线程数量: " + threadPoolExecutor.getCorePoolSize()
                    + ", 最大线程数:" + threadPoolExecutor.getMaximumPoolSize()
                    + ",当前线程池大小:" + threadPoolExecutor.getPoolSize()
                    + ",活动线程数量:" + threadPoolExecutor.getActiveCount()
                    + ",收到任务数量:" + threadPoolExecutor.getTaskCount()
                    + ",完成任务数: " + threadPoolExecutor.getCompletedTaskCount()
                    + ",等待任务数:" + threadPoolExecutor.getQueue().size()) ;
            try {
                TimeUnit.MILLISECONDS.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }*/
        }

        while (threadPoolExecutor.getActiveCount() > 0) {

            System.out.println("当前线程池核心线程数量:"+ threadPoolExecutor.getCorePoolSize()
                    + ",最大线程数:" + threadPoolExecutor.getMaximumPoolSize()
                    + ",当前线程池的大小:" + threadPoolExecutor.getPoolSize()
                    + ",活动线程数量:" + threadPoolExecutor.getActiveCount()
                    + ",线程池收到的任务总数:" + threadPoolExecutor.getTaskCount()
                    + ",线程池完成任务的数量:" + threadPoolExecutor.getCompletedTaskCount()
                    + ",等待任务数:" + threadPoolExecutor.getQueue().size()
                    + ",线程池曾经达到的线程的最大数:" + threadPoolExecutor.getLargestPoolSize());
            try {
                TimeUnit.MILLISECONDS.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }


    }

}

Java线程池扩展

有时需要对线程池进行扩展,如在监控每个任务的开始和结束时间,或者自定义一些其他增强的功能。

ThreadPoolExecutor线程池提供了两个方法:

● protected void afterExecute(Runnable r, Throwable t)

● protected void beforeExecute(Thread t, Runnable r)

在线程池执行某个任务前会调用beforeExecute()方法,在任务结束后(任务异常退出)会执行afterExecute()方法。

查看ThreadPoolExecutor源码,在该类中定义了一个内部类Worker, ThreadPoolExecutor线程池中的工作线程就是Worker类的实例, Worker实例在执行时会调用beforeExecute()afterExecute()方法

列子:

package com.glj.threadpool;


import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: GuoLiangJun
 * @Date: 2021/9/22
 * @Time: 17:44
 */
public class Test06 {

    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>()){
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println(t.getId() + "线程准备执行任务: " + ((MyTask)r).name);
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println( ((MyTask)r).name + "任务执行完毕");
            }

            @Override
            protected void terminated() {
                System.out.println("线程池退出");
            }
        };

        for (int i = 0; i < 5; i++) {
            MyTask task = new MyTask("task-" + i);
            threadPoolExecutor.execute(task);
        }

        //关闭线程池
        threadPoolExecutor.shutdown();

    }

    //定义任务类
    private static class  MyTask implements  Runnable{
        String name;

        public MyTask(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            System.out.println(name + "任务正在被线程 " + Thread.currentThread().getId() + " 执行");
            try {
                Thread.sleep(1000);     //模拟任务执行时长
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

Java线程池的大小与线程池死锁

优化线程池大小

线程池大小对系统性能是有一定影响的,过大或者过小都会无法发挥最优的系统性能, 线程池大小不需要非常精确,只要避免极大或者极小的情况即可, 一般来说,线程池大小需要考虑CPU数量,内存大小等因素. 在书中给出一个估算线程池大小的公式:

线程池大小 = CPU的数量 * 目标CPU的使用率*( 1 + 等待时间与计算时间的比)

线程池死锁

如果在线程池中执行的任务A在执行过程中又向线程池提交了任务B, 任务B添加到了线程池的等待队列中, 如果任务A的结束需要等待任务B的执行结果. 就有可能会出现这种情况: 线程池中所有的工作线程都处于等待任务处理结果,而这些任务在阻塞队列中等待执行, 线程池中没有可以对阻塞队列中的任务进行处理的线程,这种等待会一直持续下去,从而造成死锁。

适合给线程池提交相互独立的任务,而不是彼此依赖的任务. 对于彼此依赖的任务,可以考虑分别提交给不同的线程池来执行。

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注