多线程实现与同步工具包详解

作者 : 开心源码 本文共14979个字,预计阅读时间需要38分钟 发布时间: 2022-05-12 共214人阅读

多线程学习

概念

简述如下:

并发:指一个CPU可以异步的解决多个进程
并行:则是一个CPU同时解决多个进程
进程:程序运行的执行过程,是一个程序的实例。每个进程都有自己的虚拟地址空间和控制线程
线程:是进程的一个执行单元,是操作系统调度器(Schduler)分配解决器时间的基础单元。

一句话总结:

线程是程序执行时的最小单位,它是进程的一个执行流,是CPU调度和分派的基本单位,一个进程可以由很多个线程组成,线程间共享进程的所有资源,每个线程有自己的堆栈和局部变量。线程由CPU独立调度执行,在多CPU环境下就允许多个线程同时运行。同样多线程也可以实现并发操作,每个请求分配一个线程来解决。

Java代码实现方式

  • 方式一、继承Thread类,由于Java 是单继承,继承Thread的类不能再继承其余类,并且继承Thread其本身就是执行线程,每个子线程之间相互独立,资源不共享
  • 方式二、实现Runnable接口,资源共享,多个线程可以对共享数据进行操作
  • 方式三、实现Callable接口通过FutureTask包装器来创立Thread线程,可以接收一个返回值
  • 方式四、使用ExecutorService、Callable、Future实现有返回结果的线程

完整代码如下:

import java.util.concurrent.*;public class Demo1 {    /**     * 第一种实现方式:     * 继承Thread类创立线程     */    public class MyThread extends Thread{        private int ticket = 10;        @Override        public void run() {            for (int i=0;i<=10;i++){                if(ticket>0){                    System.out.println(Thread.currentThread().getName()+"工号 还有 "+ticket--+" 张传单需要派发");                }else{                    System.out.println(Thread.currentThread().getName()+"工号 的传单一律派发完了,可以下班了");                }            }        }    }    /**     * 第二种实现方式:     * 实现Runnable接口创立线程     */    public class MyRunable implements Runnable{        private int ticket = 10;        @Override        public void run() {            for (int i=0;i<10;i++){                //加锁互斥                synchronized (this){                    if(ticket>0){                        System.out.println(Thread.currentThread().getName()+"窗口 卖出倒数第"+ticket--+"张回家车票");                    }else {                        System.out.println(Thread.currentThread().getName()+"窗口 车票已售罄");                        break;                    }                }            }        }    }    /**     * 第三种实现方式:     * 实现Callable接口通过FutureTask包装器来创立Thread线程     */    public class MyCallRunable implements Callable<Integer>{        @Override        public Integer call() throws Exception {            int sum=0;            for (int i = 0; i <= 100; i++) {                sum += i;            }            return sum;        }    }    /**     * 第四种方式:     * 线程池的方式,使用ExecutorService、Callable、Future实现有返回结果的线程     */    public class OddCallRunable implements Callable<Integer>{        @Override        public Integer call() throws Exception {            int sum=0;            for (int i = 0; i <= 100; i++) {                if(i%2 == 0)                    sum += i;            }            return sum;        }    }    public class EvenCallRunable implements Callable<Integer>{        @Override        public Integer call() throws Exception {            int sum=0;            for (int i = 0; i <= 100; i++) {                if(i%2 == 1)                    sum += i;            }            return sum;        }    }    /**     * 总结:     * 方式一、继承Thread类,由于Java 是单继承,继承Thread的类不能再继承其余类,并且继承Thread 其本身就是执行线程,每个子线程之间相互独立,资源不共享     * 方式二、实现Runnable接口,资源共享,多个线程可以对共享数据进行操作     * 方式三、实现Callable接口通过FutureTask包装器来创立Thread线程,可以接收一个返回值     * 方式四、使用ExecutorService、Callable、Future实现有返回结果的线程     * @param args     * @throws InterruptedException     */    public static void main(String[] args) throws InterruptedException, ExecutionException {        System.out.println("第一种方式:");        MyThread thread1 = new Demo1().new MyThread();        MyThread thread2 = new Demo1().new MyThread();        MyThread thread3 = new Demo1().new MyThread();        thread1.start();        thread2.start();        thread3.start();        Thread.sleep(1000);        System.out.println();        System.out.println("第二种方式,");        MyRunable myRunable = new Demo1().new MyRunable();        Thread threadTwo1 = new Thread(myRunable);        Thread threadTwo2 = new Thread(myRunable);        Thread threadTwo3 = new Thread(myRunable);        threadTwo1.start();        threadTwo2.start();        threadTwo3.start();        Thread.sleep(1000);        System.out.println();        System.out.println("第三种方式:");        MyCallRunable myCallRunable = new Demo1().new MyCallRunable();        FutureTask<Integer> futureTask = new FutureTask<>(myCallRunable);        Thread threadCall = new Thread(futureTask);        threadCall.start();        Integer integer = futureTask.get();        System.out.println("result sum="+integer);        Thread.sleep(1000);        System.out.println();        System.out.println("第四种方式:");        // 创立一个线程池        ExecutorService pool = Executors.newFixedThreadPool(2);        // 创立多个有返回值的任务        OddCallRunable oddCallRunable = new Demo1().new OddCallRunable();        EvenCallRunable evenCallRunable = new Demo1().new EvenCallRunable();        Future<Integer> submit1 = pool.submit(oddCallRunable);        Future<Integer> submit2 = pool.submit(evenCallRunable);        // 关闭线程池        pool.shutdown();        System.out.println("1-100 偶数和:"+submit1.get());        System.out.println("1-100 奇数和:"+submit2.get());        System.out.println("1-100 总和:"+(submit1.get()+submit2.get()));    }}

线程的类型

  • 主线程:
    JVM调用程序main()所产生的线程。
  • 当前线程:
    这个是容易混淆的概念。一般指通过Thread.currentThread()来获取的进程。
  • 后端线程:
    指为其余线程提供服务的线程,也称为守护线程。JVM的垃圾回收线程就是一个后端线程。客户线程和守护线程的区别在于,能否等待主线程依赖于主线程结束而结束
  • 前端线程:
    是指接受后端线程服务的线程,其实前端后端线程是联络在一起,就像傀儡和幕后操纵者一样的关系。傀儡是前端线程、幕后操纵者是后端线程。由前端线程创立的线程默认也是前端线程。
    可以通过isDaemon()和setDaemon()方法来判断和设置一个线程能否为后端线程。但必需在start()方法前调用,否则会报错

线程的常用方法

  • sleep()
    强迫一个线程睡眠N毫秒。
  • isAlive()
    判断一个线程能否存活。
  • join()
    等待线程终止。
  • activeCount()
    程序中活跃的线程数。
  • enumerate()
    枚举程序中的线程。
  • currentThread()
    得到当前线程。
  • isDaemon()
    一个线程能否为守护线程。
  • setDaemon()
    设置一个线程为守护线程。(客户线程和守护线程的区别在于,能否等待主线程依赖于主线程结束而结束)
  • setName()
    为线程设置一个名称。
  • wait()
    强迫一个线程等待。
  • notify()
    通知一个线程继续运行。
  • setPriority()
    设置一个线程的优先级
  • stop()
    线程中断,弃用,这个方法是有问题。
  • interrupt()
    给线程设置一个中断状态,而后通过isInterruptd()方法获取能否中断的状态,但是假如线程中调用 join()、wait()、slettp() 等线程阻塞的方法,会清理掉 interrupt状态,所以慎用。

synchronized关键字

因为每个线程执行的过程是不可控的,所以很可能导致最终的结果与实际上的愿望相违反或者者直接导致程序出错。
比方,当多个线程同时访问临界资源(一个对象,对象中的属性,一个文件,一个数据库等)时,即可能会产生线程安全问题。

临界资源就是共享资源。比方共享变量等

怎样处理线程安全?专业术语叫序列化访问临界资源

线程安全处理方案

synchronized 就是来处理线程安全的方案之一,互斥锁:顾名思义,能到达到互斥访问目的的锁,即在同一时刻,只能有一个线程访问临界资源,也称作同步互斥访问
假如对临界资源加上互斥锁,当一个线程在访问该临界资源时,其余线程便只能等待。
使用synchronized关键字来标记一个方法或者者代码块,当某个线程调用该对象的synchronized方法或者者访问synchronized代码块时,这个线程便取得了该对象的锁,其余线程暂时无法访问这个方法,只有等待这个方法执行完毕或者者代码块执行完毕,这个线程才会释放该对象的锁,其余线程才能执行这个方法或者者代码块。
就比方上述代码实现的方式二,片段代码如下,

public void run() {    for (int i=0;i<10;i++){        //加锁互斥        synchronized (this){            if(ticket>0){                System.out.println(Thread.currentThread().getName()+"窗口 卖出倒数第"+ticket--+"张回家车票");            }else {                System.out.println(Thread.currentThread().getName()+"窗口 车票已售罄");                break;            }        }    }}

synchronized (this) 中的this 就是一把锁,以当前对象作为锁,多个窗口售票,但却保证了卖票的数量不会出错。
上述的例子是修饰一段代码块,同样的也是可以修饰方法,看情况选择,修饰代码块比较灵活,比方一个方法里面可能你只有一小部分需要同步的话,就没必要放在方法上

synchronized的作用域

  • 当一个线程正在访问一个对象的synchronized方法,那么其余线程不能访问该对象的其余synchronized方法。这个起因很简单,由于一个对象只有一把锁,当一个线程获取了该对象的锁之后,其余线程无法获取该对象的锁,所以无法访问该对象的其余synchronized方法。

  • 当一个线程正在访问一个对象的synchronized方法,那么其余线程能访问该对象的非synchronized方法。这个起因很简单,访问非synchronized方法不需要取得该对象的锁,如果一个方法没用synchronized关键字修饰,说明它不会使用到临界资源,那么其余线程是可以访问这个方法的,

  • 假如一个线程A需要访问对象object1的synchronized方法method1,另外一个线程B需要访问对象object2的synchronized方法method1,即便object1和object2是同一类型),也不会产生线程安全问题,由于他们访问的是不同的对象,所以不存在互斥问题。

  • synchronized关键字是不能继承的,也就是说,基类的方法synchronized method1(){} 在继承类中并不自动是synchronized method1(){},而是变成了method1(){}。继承类需要你显式的指定它的某个方法为synchronized方法;

Lock

从Java 5之后,在java.util.concurrent.locks包下提供了另外一种方式来实现同步访问,那就是Lock。
可以说Lock就是synchronized的加强版,区别:

  • synchronized是java内置关键字,在jvm层面,Lock是个java类;
  • synchronized无法判断能否获取锁的状态,Lock可以判断能否获取到锁;
  • synchronized会自动释放锁,Lock需在finally中手工释放锁(unlock()方法释放锁),否则容易造成线程死锁;
  • 用synchronized关键字的两个线程1和线程2,假如当前线程1取得锁,线程2线程等待。假如线程1阻塞,线程2则会一直等待下去,而Lock锁可以设置一个超时时间
  • synchronized的锁可重入、不可中断、非公平,而Lock锁可重入、可判断、可公平(两者皆可)
  • Lock锁适合大量同步的代码的同步问题,synchronized锁适合代码一些的同步问题。
    小栗子:
public class Demo2 {    public static void main(String[] args) {        MyRunable myRunable = new MyRunable();        Thread thread1 = new Thread(myRunable);        Thread thread2 = new Thread(myRunable);        Thread thread3 = new Thread(myRunable);        thread1.start();        thread2.start();        thread3.start();    }}class MyRunable implements Runnable{    private static int ticket = 10;    private static Lock lock = new ReentrantLock();    @Override    public void run() {        while (true){            //加锁互斥            try {                lock.lock();                if(ticket>0){                    System.out.println(Thread.currentThread().getName()+"窗口 卖出倒数第"+ticket--+"张回家车票");                }else {                    System.out.println(Thread.currentThread().getName()+"窗口 车票已售罄");                    break;                }            } catch (Exception e) {                e.printStackTrace();            } finally {                lock.unlock();            }        }    }}

wait()、notify/notifyAll()

这三个方法是用于线程间通信的基础方法,但实际上,它们不是Thread类中的方法,而是Object类中的本地方法
当线程执行wait()方法时候,会释放当前的锁,而后让出CPU,进入等待状态。
只有当 notify/notifyAll() 被执行时候,才会唤醒一个或者多个正处于等待状态的线程,而后继续往下执行,直到执行完synchronized 代码块的代码或者是中途遇到wait() ,再次释放锁。
notify方法只唤醒一个等待(对象的)线程并使该线程开始执行。所以假如有多个线程等待一个对象,这个方法只会唤醒其中一个线程,选择哪个线程取决于操作系统对多线程管理的实现。notifyAll 会唤醒所有等待(对象的)线程,虽然哪一个线程将会第一个解决取决于操作系统的实现
需求:交替打印0-100 的奇偶数

// 打印的数据体public class Number {    int i=1;    volatile boolean isOdd=false;    public int getI() {        return i;    }    public void setI(int i) {        this.i = i;    }    public boolean isOdd() {        return isOdd;    }    public void setOdd(boolean odd) {        isOdd = odd;    }}//奇数线程public class EvenRunable implements Runnable{    Number number;    int maxNumber;    public EvenRunable(Number number, int maxNumber) {        this.number=number;        this.maxNumber=maxNumber;    }    @Override    public void run() {        while (number.getI() <= maxNumber) {            synchronized (number) {                if (!number.isOdd) {                    System.out.println("奇数i=" + number.getI());                    number.setI(number.getI()+1);                    number.setOdd(true);                    //唤醒其余线程                    number.notify();                } else {                    try {                        //释放锁,阻塞                        number.wait();                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                }            }        }    }}// 偶数线程public class OddRunable implements Runnable {    Number number;    int maxNumber;    public OddRunable(Number number, int maxNumber) {        this.number = number;        this.maxNumber = maxNumber;    }    @Override    public void run() {        while (number.getI() <= maxNumber) {            synchronized (number) {                if (number.isOdd) {                    System.out.println("偶数i=" + number.getI());                    number.setI(number.getI()+1);                    number.setOdd(false);                    number.notify();                } else {                    try {                        number.wait();                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                }            }        }    }}// 运行public class Main {    public static void main(String[] args) {        Number number = new Number();        int maxNumber=100;        EvenRunable evenRunable = new EvenRunable(number,maxNumber);        OddRunable oddRunable = new OddRunable(number,maxNumber);        Thread evenThread = new Thread(evenRunable);        Thread oddThread = new Thread(oddRunable);        oddThread.start();        evenThread.start();    }}//输出奇数i=1偶数i=2奇数i=3.....偶数i=98奇数i=99偶数i=100

JUC(java.util.concurrent)包同步工具

AtomicInteger

AtomicInteger原子类型,线程安全,如何保证线程安全,它的部分源码如下:

// setup to use Unsafe.compareAndSwapInt for updatesprivate static final Unsafe unsafe = Unsafe.getUnsafe();    private static final long valueOffset;    static {        try {            valueOffset = unsafe.objectFieldOffset                (AtomicInteger.class.getDeclaredField("value"));        } catch (Exception ex) { throw new Error(ex); }    }private volatile int value;

我们看到value使用了volatile修饰符,volatile相当于synchronized的弱实现,也就是说volatile实现了相似synchronized的语义,却又没有锁机制。
它确保对volatile字段的升级以可预见的方式告知其余的线程。

例子:多线程顺序打印abc
import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;public class Demo5 implements Runnable{    private static AtomicInteger currentCount = new AtomicInteger(0);    private static final Integer MAX_COUNT = 30;    private static String [] chars = {"a", "b", "c"};    private String name;    public Demo5(String name) {        this.name =  name;    }    @Override    public void run() {        while(currentCount.get()<MAX_COUNT){            if(this.name.equals(chars[currentCount.get()%3])){                printAndPlusOne(this.name + "\t" + currentCount);            }        }    }    public void printAndPlusOne(String content){        System.out.println(content);        currentCount.getAndIncrement();    }        public static void main(String [] args){        ExecutorService executorService = Executors.newCachedThreadPool();        executorService.submit(new Demo5("a"));        executorService.submit(new Demo5("b"));        executorService.submit(new Demo5("c"));        executorService.shutdown();    }}

CyclicBarrier

字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再一律同时执行。
回环的意思是由于当所有等待线程都被释放以后,CyclicBarrier可以被重用。

例子:多线程顺序打印abc
import java.util.concurrent.*;import java.util.concurrent.atomic.AtomicInteger;public class Demo6 implements Runnable{    //其参数表示屏障阻拦的线程数量    private static int threadNum = 3;    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(threadNum);    private static Integer currentCount = 0;    private static final Integer MAX_COUNT = 30;    private static String [] chars = {"a", "b", "c"};    private String name;    public Demo6(String name) {        this.name =  name;    }    @Override    public void run() {        while(currentCount<MAX_COUNT){            while(this.name.equals(chars[currentCount%3]))                printAndPlusOne(this.name + "\t" + currentCount);            try {                //使用await()方法告诉CyclicBarrier我已经到达了屏障,而后当前线程被阻塞。等threadNum 个一律await之后再一律同时执行                cyclicBarrier.await();            }catch (Exception e){                e.printStackTrace();            }        }    }    public void printAndPlusOne(String name){        System.out.println(name);        currentCount ++;    }    public static void main(String [] args){        ExecutorService executorService = Executors.newCachedThreadPool();        executorService.submit(new Demo6("a"));        executorService.submit(new Demo6("b"));        executorService.submit(new Demo6("c"));        executorService.shutdown();    }}

参考自:https://blog.csdn.net/u013968384/article/details/82584944

CountDownLatch

倒计时计数器,countDown() 方法调用递减锁存器的计数,假如计数到达零,则释放所有等待的线程。
调用await()方法的线程会被挂起,它会等待直到计数值为0才继续执行

import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.atomic.AtomicInteger;public class Demo8 {    public static void main(String [] args) throws InterruptedException {        //赛道个数,一人一道        int count = 6;        //指挥官        final CountDownLatch masterCountDownLatch = new CountDownLatch(1);        //闭锁,可实现计数器递减        final CountDownLatch countDownLatch = new CountDownLatch(count);        AtomicInteger atomicInteger = new AtomicInteger(1);        System.out.println("校园400米赛跑,即将开始");        Thread.sleep(1*1000);        ExecutorService executorService = Executors.newCachedThreadPool();        for (int i = 0; i < count ; i++) {            executorService.execute(() -> {                try {                    System.out.println(Thread.currentThread().getName()+"准备好了");                    //比赛选择准备                    masterCountDownLatch.await();                    System.out.println(Thread.currentThread().getName()+"拼命奔跑中");                    Thread.sleep((long)(Math.random()*10000));                    if(atomicInteger.decrementAndGet() >= 0){                        System.out.println("冠军诞生了 "+Thread.currentThread().getName()+"首先到达终点");                    }else{                        System.out.println("其次 "+Thread.currentThread().getName()+"到达终点");                    }                } catch (Exception e) {                    e.printStackTrace();                }                //闭锁减一                countDownLatch.countDown();            });        }        Thread.sleep(5*1000);        System.out.println("预备,开始比赛");        // 准备结束,开始比赛,处于等待的线程继续执行任务        masterCountDownLatch.countDown();        countDownLatch.await();//线程阻塞,直到闭锁值为0时,阻塞才释放,继续往下执行        System.out.println("比赛结束");        executorService.shutdown();    }}

控制台打印:

校园400米赛跑,即将开始pool-1-thread-1准备好了pool-1-thread-2准备好了pool-1-thread-3准备好了pool-1-thread-4准备好了pool-1-thread-5准备好了pool-1-thread-6准备好了预备,开始比赛pool-1-thread-1拼命奔跑中pool-1-thread-2拼命奔跑中pool-1-thread-6拼命奔跑中pool-1-thread-5拼命奔跑中pool-1-thread-3拼命奔跑中pool-1-thread-4拼命奔跑中冠军诞生了 pool-1-thread-4首先到达终点其次 pool-1-thread-6到达终点其次 pool-1-thread-1到达终点其次 pool-1-thread-2到达终点其次 pool-1-thread-5到达终点其次 pool-1-thread-3到达终点比赛结束

Semaphore

信号量,是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证正当的使用公共资源。
构造器参数 permits(许可数),定义资源可以并发访问的最大个数。
常用方法:

  • acquire()
    获取执行许可,当总计未释放的许可数不超过permits时,允许通行,否则线程阻塞等待,直到获取到许可。
  • release()
    释放许可,可以让其余线程取得许可

并发访问如下demo:

import java.text.SimpleDateFormat;import java.util.Date;import java.util.concurrent.*;public class Demo7{    public static void main(String [] args) throws InterruptedException {        int clientTotal = 12;        // 同时并发执行的线程数,这里        int threadTotal = 5;        int count = 0;        ExecutorService executorService = Executors.newCachedThreadPool();        //信号量,此处用于控制并发的线程数        final Semaphore semaphore = new Semaphore(threadTotal);        //闭锁,可实现计数器递减        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);        for (int i = 0; i < clientTotal ; i++) {            executorService.execute(() -> {                try {                    //执行此方法用于获取执行许可,当总计未释放的许可数不超过threadTotal时,                    //允许通行,否则线程阻塞等待,直到获取到许可。                    semaphore.acquire();                    //任务                    goToWC();                    //释放许可                    semaphore.release();                } catch (Exception e) {                    //log.error("exception", e);                    e.printStackTrace();                }                //闭锁减一                countDownLatch.countDown();            });        }        countDownLatch.await();//线程阻塞,直到闭锁值为0时,阻塞才释放,继续往下执行        executorService.shutdown();    }    //上厕所    public static void goToWC() throws InterruptedException {        System.out.println(new SimpleDateFormat("HH:mm:ss    ").format(new Date())+Thread.currentThread().getName()+"-正在使用厕所 ");        Thread.sleep(2000);        System.out.println(Thread.currentThread().getName()+"上完厕所了,回去上班");    }}

上面的场景是这样的,一个楼层有5个厕所。现在有12 个人要上厕所。只能排队
控制台打印输出,如下

14:12:28    pool-1-thread-5-正在使用厕所 14:12:28    pool-1-thread-7-正在使用厕所 14:12:28    pool-1-thread-3-正在使用厕所 14:12:28    pool-1-thread-2-正在使用厕所 14:12:28    pool-1-thread-1-正在使用厕所 pool-1-thread-5上完厕所了,回去上班pool-1-thread-1上完厕所了,回去上班pool-1-thread-2上完厕所了,回去上班pool-1-thread-3上完厕所了,回去上班14:12:30    pool-1-thread-9-正在使用厕所 14:12:30    pool-1-thread-6-正在使用厕所 pool-1-thread-7上完厕所了,回去上班14:12:30    pool-1-thread-10-正在使用厕所 14:12:30    pool-1-thread-11-正在使用厕所 14:12:30    pool-1-thread-4-正在使用厕所 pool-1-thread-6上完厕所了,回去上班pool-1-thread-9上完厕所了,回去上班14:12:32    pool-1-thread-8-正在使用厕所 14:12:32    pool-1-thread-12-正在使用厕所 pool-1-thread-4上完厕所了,回去上班pool-1-thread-10上完厕所了,回去上班pool-1-thread-11上完厕所了,回去上班pool-1-thread-8上完厕所了,回去上班pool-1-thread-12上完厕所了,回去上班

通过前面打印的时间,可知只有threadTotal 个取得许可,其余只能等待取得许可的线程释放许可才能继续执行。
这几个demo算是对多线程有一个理解吧,多线程还有更难的,一起加油!!!

说明
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » 多线程实现与同步工具包详解

发表回复