线程通信--生产者消费者问题
目录
问题概述 #
这是一种典型的线程通信问题。
- 仓库中只能存放一件产品,生产者将产品放入仓库,消费者将产品取走
- 如果仓库中没有产品,则生产者将产品放入仓库,否则停止生产并等待,直到仓库中的产品被消费者取走
- 如果仓库中没=有产品,则消费者将产品取走,否则停止消费并等待,直到仓库中的产品被生产者生产
传统synchronized的解决思路 #
java 提供了几个方法解决线程之间的通信问题
注意:均是Object类的方法,都只能在同步方法或者同步代码块中使用,否则会抛出异常illegalMonitorStateException
管程法 #
生产者:负责生产数据的模块(可能是方法,对象,线程,进程)
消费者:负责处理数据的模块(可能是方法,对象,线程,进程)
缓冲区:消费者不能直接使用生产者的数据,他们之间有个缓冲区,生产者将生产好的数据放入缓冲区,消费者从缓冲区拿出数据。
/**
* 测试:生产者消费者模型-->利用缓冲区解决:管程法
*/
public class Main {
public static void main(String[] args) {
SynContainer synContainer = new SynContainer();
new Producer(synContainer).start();
new Consumer(synContainer).start();
}
}
//生产者
class Producer extends Thread {
//容缓冲区
SynContainer container;
public Producer(SynContainer container) {
this.container = container;
}
//生产
@Override
public void run() {
for (int i = 0; i < 100; i++) {
container.push(new Product(i));
System.out.println("生产了" + i + "件产品");
}
}
}
//消费者
class Consumer extends Thread {
//容缓冲区
SynContainer container;
public Consumer(SynContainer container) {
this.container = container;
}
//消费
@Override
public void run() {
for (int i = 0; i < 100; i++) {
System.out.println("消费了-->" + container.pop().id + "件产品");
}
}
}
//产品
class Product {
int id;//产品编号
public Product(int id) {
this.id = id;
}
}
//缓冲区
class SynContainer {
//需要一个容器大小
Product[] products = new Product[10];
//容器计数器
int count = 0;
//生产者放入产品
public synchronized void push(Product product) {
//如果容器满了,需要等待消费者消费
/*如果是if的话,假如消费者1消费了最后一个,
这是index变成0此时释放锁被消费者2拿到而不是生产者拿到,
这时消费者的wait是在if里所以它就直接去消费index-1下标越界,
如果是while就会再去判断一下index得值是不是变成0了*/
while (count == products.length) {
//通知消费者消费,等待生产
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//如果没有满,需要丢入产品
products[count] = product;
count++;
//通知消费者消费
this.notifyAll();
}
//消费者消费产品
public synchronized Product pop() {
//判断是否能消费
while (count <= 0) {
//等待生产者生产
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//如果可以消费
count--;
Product product = products[count];
//吃完了 通知生产者生产
this.notifyAll();
return product;
}
}
信号灯法 #
通过一个标志位来阻塞和唤醒对应的线程
public class Main {
public static void main(String[] args) {
TV tv = new TV();
new Player(tv).start();
new Watcher(tv).start();
new Watcher(tv).start();
new Watcher(tv).start();
}
}
//生产者
class Player extends Thread{
TV tv;
public Player(TV tv){
this.tv = tv;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
if (i%2==0){
this.tv.play("节目:快乐大本营播放中");
}else {
this.tv.play("广告:抖音,记录美好生活");
}
}
}
}
//消费者
class Watcher extends Thread{
TV tv;
public Watcher(TV tv){
this.tv = tv;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
tv.watch();
}
}
}
//电视
class TV{
//演员说话 , 观众等待
//观众观看 , 演员等待
boolean flag = true;
//说话
String voice;
//表演
public synchronized void play(String voice){
//演员等待
while (!flag){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("表演了"+voice);
this.voice = voice;
//让观众观看
this.notifyAll();
this.flag = !this.flag;
}
//观看
public synchronized void watch(){
//观众等待
while (flag){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("观众听到了: "+voice);
//通知演员说话
this.notifyAll();
this.flag = !this.flag;
}
}
两个有关线程的疑问? #
为什么一定要用同步方法?存在如下图这种线程不安全的情况。因此判断条件和操作资源时需要每次只能让一个线程完成
为什么一定要用
while
代替if
判断条件?在wait()
方法的解释中:
📌就是用if判断的话,唤醒后线程会从wait之后的代码开始运行,但是不会重新判断if条件,直接继续运行if代码块之后的代码,而如果使用while的话,也会从wait之后的代码运行,但是唤醒后会重新判断循环条件,如果不成立再执行while代码块之后的代码块,成立的话继续wait。 因为线程被唤醒后,执行开始的地方是wait之后
在下面的代码中Product类的produce()方法和consume()方法都需要对资源数量进行判断,如果都用if来判断就会出现“虚假唤醒”的情况。因此需要使用while代替if作为判断条件。
public class Main {
public static void main(String[] args) {
Product p=new Product();
new Thread(()->{
for(int i=0;i<10;i++){
try {
p.produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for(int i=0;i<10;i++){
try {
p.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(()->{
for(int i=0;i<10;i++){
try {
p.produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(()->{
for(int i=0;i<10;i++){
try {
p.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
}
class Product{
private int num=1;
public synchronized void produce() throws InterruptedException {
if(num==1){
this.wait();
}
num++;
System.out.println(Thread.currentThread().getName() + "=>" + num);
this.notifyAll();
}
public synchronized void consume() throws InterruptedException {
if(num==0){
this.wait();
}
num--;
System.out.println(Thread.currentThread().getName() + "=>" + num);
this.notifyAll();
}
}
Lock锁实现生产者消费者问题 #
Lock中主要使用 await()
和 signal()
实现生产者消费者问题:
signal
void signal()
//唤醒一个等待线程。
//如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。
await
boolean await(long time,
TimeUnit unit)
throws InterruptedException
//造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等效于:
awaitNanos(unit.toNanos(time)) > 0
/*
参数:
time - 最长等待时间
unit - time 参数的时间单位
返回:
如果在从此方法返回前检测到等待时间超时,则返回 false,否则返回 true
抛出:
InterruptedException - 如果当前线程被中断(并且支持中断线程挂起)
*/
代码实现 #
这种方法存在问题:就是唤醒的全部的线程,线程的执行顺序是随机的,有没有一种方法可以精确唤醒和控制线程的执行?
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LockCAP {
public static void main(String[] args) {
Data2 data = new Data2();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}
}
class Data2 {
private int num = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
// +1
public void increment() throws InterruptedException {
lock.lock();
try {
// 判断等待
while (num != 0) {
condition.await();
}
num++;
System.out.println(Thread.currentThread().getName() + "=>" + num);
// 通知其他线程 +1 执行完毕
condition.signalAll();
}finally {
lock.unlock();
}
}
// -1
public void decrement() throws InterruptedException {
lock.lock();
try {
// 判断等待
while (num == 0) {
condition.await();
}
num--;
System.out.println(Thread.currentThread().getName() + "=>" + num);
// 通知其他线程 +1 执行完毕
condition.signalAll();
}finally {
lock.unlock();
}
}
}
Condition的优势 #
精准的通知和唤醒的线程!
如果我们要指定通知的下一个进行顺序怎么办呢? 我们可以使用Condition来指定通知进程~
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Description:
* A 执行完 调用B
* B 执行完 调用C
* C 执行完 调用A
**/
public class ConditionDemo {
public static void main(String[] args) {
Data3 data3 = new Data3();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data3.printA();
}
},"A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data3.printB();
}
},"B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data3.printC();
}
},"C").start();
}
}
class Data3 {
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
private int num = 1; // 1A 2B 3C
public void printA() {
lock.lock();
try {
// 业务代码 判断 -> 执行 -> 通知
while (num != 1) {
condition1.await();
}
System.out.println(Thread.currentThread().getName() + "==> AAAA" );
num = 2;
condition2.signal();
}catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void printB() {
lock.lock();
try {
// 业务代码 判断 -> 执行 -> 通知
while (num != 2) {
condition2.await();
}
System.out.println(Thread.currentThread().getName() + "==> BBBB" );
num = 3;
condition3.signal();
}catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void printC() {
lock.lock();
try {
// 业务代码 判断 -> 执行 -> 通知
while (num != 3) {
condition3.await();
}
System.out.println(Thread.currentThread().getName() + "==> CCCC" );
num = 1;
condition1.signal();
}catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
/*
A==> AAAA
B==> BBBB
C==> CCCC
A==> AAAA
B==> BBBB
C==> CCCC
...
*/