跳到主要内容
  1. 所有文章/
  2. Java多线程基础笔记/

线程通信--生产者消费者问题

·📄 2757 字·🍵 6 分钟

问题概述 #

这是一种典型的线程通信问题。

  • 仓库中只能存放一件产品,生产者将产品放入仓库,消费者将产品取走
  • 如果仓库中没有产品,则生产者将产品放入仓库,否则停止生产并等待,直到仓库中的产品被消费者取走
  • 如果仓库中没=有产品,则消费者将产品取走,否则停止消费并等待,直到仓库中的产品被生产者生产

传统synchronized的解决思路 #

java 提供了几个方法解决线程之间的通信问题

image-20220125211336538.png

注意:均是Object类的方法,都只能在同步方法或者同步代码块中使用,否则会抛出异常illegalMonitorStateException

管程法 #

生产者:负责生产数据的模块(可能是方法,对象,线程,进程)

消费者:负责处理数据的模块(可能是方法,对象,线程,进程)

缓冲区:消费者不能直接使用生产者的数据,他们之间有个缓冲区,生产者将生产好的数据放入缓冲区,消费者从缓冲区拿出数据。

image-20220125214556302.png

/**
 * 测试:生产者消费者模型-->利用缓冲区解决:管程法
 */
public class Main {
    public static void main(String[] args) {
        SynContainer synContainer = new SynContainer();
        new Producer(synContainer).start();
        new Consumer(synContainer).start();
    }
}

//生产者
class Producer extends Thread {
    //容缓冲区
    SynContainer container;

    public Producer(SynContainer container) {
        this.container = container;
    }

    //生产
    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            container.push(new Product(i));
            System.out.println("生产了" + i + "件产品");
        }
    }
}

//消费者
class Consumer extends Thread {
    //容缓冲区
    SynContainer container;

    public Consumer(SynContainer container) {
        this.container = container;
    }

    //消费
    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            System.out.println("消费了-->" + container.pop().id + "件产品");
        }
    }
}

//产品
class Product {
    int id;//产品编号

    public Product(int id) {
        this.id = id;
    }
}

//缓冲区
class SynContainer {
    //需要一个容器大小
    Product[] products = new Product[10];
    //容器计数器
    int count = 0;

    //生产者放入产品
    public synchronized void push(Product product) {
        //如果容器满了,需要等待消费者消费
        /*如果是if的话,假如消费者1消费了最后一个,
        这是index变成0此时释放锁被消费者2拿到而不是生产者拿到,
        这时消费者的wait是在if里所以它就直接去消费index-1下标越界,
        如果是while就会再去判断一下index得值是不是变成0了*/
        while (count == products.length) {
            //通知消费者消费,等待生产
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //如果没有满,需要丢入产品
        products[count] = product;
        count++;
        //通知消费者消费
        this.notifyAll();
    }

    //消费者消费产品
    public synchronized Product pop() {
        //判断是否能消费
        while (count <= 0) {
            //等待生产者生产
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //如果可以消费
        count--;
        Product product = products[count];
        //吃完了 通知生产者生产
        this.notifyAll();
        return product;
    }
}

信号灯法 #

通过一个标志位来阻塞和唤醒对应的线程

image-20220125214803353.png

public class Main {
    public static void main(String[] args) {
        TV tv = new TV();
        new Player(tv).start();
        new Watcher(tv).start();
        new Watcher(tv).start();
        new Watcher(tv).start();
    }
}


//生产者
class Player extends Thread{
    TV tv;

    public Player(TV tv){
        this.tv = tv;
    }
    @Override
    public void run() {
        for (int i = 0; i < 20; i++) {
            if (i%2==0){
                this.tv.play("节目:快乐大本营播放中");
            }else {
                this.tv.play("广告:抖音,记录美好生活");
            }
        }
    }
}

//消费者
class Watcher extends Thread{
    TV tv;
    public Watcher(TV tv){
        this.tv = tv;
    }
    @Override
    public void run() {
        for (int i = 0; i < 20; i++) {
            tv.watch();
        }
    }
}
//电视
class TV{
    //演员说话 , 观众等待
    //观众观看 , 演员等待
    boolean flag = true;

    //说话
    String voice;

    //表演
    public synchronized void play(String voice){
        //演员等待
        while (!flag){
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("表演了"+voice);
        this.voice = voice;
        //让观众观看
        this.notifyAll();
        this.flag = !this.flag;

    }

    //观看
    public synchronized void watch(){
        //观众等待
        while (flag){
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("观众听到了: "+voice);
        //通知演员说话
        this.notifyAll();
        this.flag = !this.flag;
    }

}

两个有关线程的疑问? #

  1. 为什么一定要用同步方法?存在如下图这种线程不安全的情况。因此判断条件和操作资源时需要每次只能让一个线程完成

    image-20220128191942800.png

  2. 为什么一定要用 while 代替 if判断条件?在 wait() 方法的解释中:

    image-20220128192600609.png

📌就是用if判断的话,唤醒后线程会从wait之后的代码开始运行,但是不会重新判断if条件,直接继续运行if代码块之后的代码,而如果使用while的话,也会从wait之后的代码运行,但是唤醒后会重新判断循环条件,如果不成立再执行while代码块之后的代码块,成立的话继续wait。 因为线程被唤醒后,执行开始的地方是wait之后

在下面的代码中Product类的produce()方法和consume()方法都需要对资源数量进行判断,如果都用if来判断就会出现“虚假唤醒”的情况。因此需要使用while代替if作为判断条件。

public class Main {
    public static void main(String[] args) {
        Product p=new Product();
        new Thread(()->{
            for(int i=0;i<10;i++){
                try {
                    p.produce();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"A").start();
        new Thread(()->{
            for(int i=0;i<10;i++){
                try {
                    p.consume();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"B").start();
        new Thread(()->{
            for(int i=0;i<10;i++){
                try {
                    p.produce();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"C").start();
        new Thread(()->{
            for(int i=0;i<10;i++){
                try {
                    p.consume();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"D").start();
    }
}

class Product{
    private int num=1;

    public synchronized void produce() throws InterruptedException {
        if(num==1){
            this.wait();
        }
        num++;
        System.out.println(Thread.currentThread().getName() + "=>" + num);
        this.notifyAll();
    }
    public synchronized void consume() throws InterruptedException {
        if(num==0){
            this.wait();
        }
        num--;
        System.out.println(Thread.currentThread().getName() + "=>" + num);
        this.notifyAll();
    }
}

Lock锁实现生产者消费者问题 #

Lock中主要使用 await()signal() 实现生产者消费者问题:

signal
void signal()
//唤醒一个等待线程。
//如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。

await
boolean await(long time,
              TimeUnit unit)
              throws InterruptedException
//造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等效于:
   awaitNanos(unit.toNanos(time)) > 0

/*
参数:
time - 最长等待时间
unit - time 参数的时间单位
返回:
如果在从此方法返回前检测到等待时间超时,则返回 false,否则返回 true
抛出:
InterruptedException - 如果当前线程被中断(并且支持中断线程挂起)
*/

image-20220206190930531.png

代码实现 #

这种方法存在问题:就是唤醒的全部的线程,线程的执行顺序是随机的,有没有一种方法可以精确唤醒和控制线程的执行?

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LockCAP {
    public static void main(String[] args) {
        Data2 data = new Data2();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {

                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }, "A").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "B").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "C").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "D").start();
    }
}

class Data2 {
    private int num = 0;
    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();
    // +1
    public  void increment() throws InterruptedException {
        lock.lock();
        try {
            // 判断等待
            while (num != 0) {
                condition.await();
            }
            num++;
            System.out.println(Thread.currentThread().getName() + "=>" + num);
            // 通知其他线程 +1 执行完毕
            condition.signalAll();
        }finally {
            lock.unlock();
        }

    }

    // -1
    public  void decrement() throws InterruptedException {
        lock.lock();
        try {
            // 判断等待
            while (num == 0) {
                condition.await();
            }
            num--;
            System.out.println(Thread.currentThread().getName() + "=>" + num);
            // 通知其他线程 +1 执行完毕
            condition.signalAll();
        }finally {
            lock.unlock();
        }

    }
}

Condition的优势 #

精准的通知和唤醒的线程!

如果我们要指定通知的下一个进行顺序怎么办呢? 我们可以使用Condition来指定通知进程~

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Description:
 * A 执行完 调用B
 * B 执行完 调用C
 * C 执行完 调用A
 **/
public class ConditionDemo {
    public static void main(String[] args) {
        Data3 data3 = new Data3();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data3.printA();
            }
        },"A").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data3.printB();
            }
        },"B").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data3.printC();
            }
        },"C").start();
    }

}
class Data3 {
    private Lock lock = new ReentrantLock();
    private Condition condition1 = lock.newCondition();
    private Condition condition2 = lock.newCondition();
    private Condition condition3 = lock.newCondition();
    private int num = 1; // 1A 2B 3C

    public void printA() {
        lock.lock();
        try {
            // 业务代码 判断 -> 执行 -> 通知
            while (num != 1) {
                condition1.await();
            }
            System.out.println(Thread.currentThread().getName() + "==> AAAA" );
            num = 2;
            condition2.signal();
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
    public void printB() {
        lock.lock();
        try {
            // 业务代码 判断 -> 执行 -> 通知
            while (num != 2) {
                condition2.await();
            }
            System.out.println(Thread.currentThread().getName() + "==> BBBB" );
            num = 3;
            condition3.signal();
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
    public void printC() {
        lock.lock();
        try {
            // 业务代码 判断 -> 执行 -> 通知
            while (num != 3) {
                condition3.await();
            }
            System.out.println(Thread.currentThread().getName() + "==> CCCC" );
            num = 1;
            condition1.signal();
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}
/*
A==> AAAA
B==> BBBB
C==> CCCC
A==> AAAA
B==> BBBB
C==> CCCC
...
*/