Java实现 生产者/消费者模式


Java实现 生产者/消费者模式

消费者/生产者模式是一种常见的并发设计模式,用于解决多个线程之间的协作问题。该模式包含两个角色:生产者和消费者。生产者负责生成数据,并将其存放在缓冲区中;消费者从缓冲区中取出数据并进行消费。

当生产者生成数据时,如果缓冲区已满,生产者将被阻塞,直到有足够的空间来存放数据。当消费者消费数据时,如果缓冲区为空,消费者将被阻塞,直到有足够的数据可用。

消费者/生产者模式通常使用一个先进先出的队列来实现缓冲区。生产者将数据放入队列的末尾,而消费者从队列的头部取出数据。这种方法可以确保数据的顺序正确。

消费者/生产者模式的优点在于可以提高系统的并发性能,减少线程间的竞争。生产者和消费者之间的缓冲区充当了一个解耦器,使得它们可以在不同的速度下运行,从而实现了高效的协作。

接下来我们用阻塞队列的方式实现生产者消费者模式

BlockingQueue 实现

数据 Data

public class Data {
    private int id;
    //生产量
    private int num;
    public Data(int id,int num){
        this.id=id;
        this.num=num;
    }
    public Data(){

    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public int getNum() {
        return num;
    }

    public void setNum(int num) {
        this.num = num;
    }
}

生产者

/**
 * 生产者
 */
public class Producer implements Runnable{

    //共享阻塞队列
    private BlockingDeque<Data> queue;

    //是否还在运行
    private volatile boolean isRunning = true;

    //id生成器原子操作
    private static AtomicInteger count = new AtomicInteger();
    // 生成随机数
    private static Random random = new Random();
    public Producer(BlockingDeque<Data> queue){
        this.queue=queue;
    }

    @Override
    public void run() {
        try{
            while (isRunning){
                //模拟生产耗时
                Thread.sleep(random.nextInt());
                int num = count.incrementAndGet();
                Data data = new Data(num, num);
                System.out.println("当前》》生产者:"+Thread.currentThread().getName()+"生产量"+num);
                if (!queue.offer(data,2, TimeUnit.SECONDS)){
                    System.out.println("生产失败");
                }
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    public void stop(){
        isRunning = false;
    }
}

消费者java

/**
 * 消费者
 */
public class Consumer implements  Runnable {
    //双端队列,加入或者取出元素都是线程安全的
    private BlockingDeque<Data> queue;
    private static Random random=new  Random();
    public Consumer(BlockingDeque<Data> queue){
        this.queue=queue;
    }
    @Override
    public void run(){
        while (true){
            try{
                // 检索并删除,如果需要等待、直到元素可用。
                Data data= queue.take();
                //模拟消费耗时
                Thread.sleep(random.nextInt(1000));
                if(data!=null){
                    System.out.println("当前<<消费者:"+Thread.currentThread().getName()+",消费量"+data.getNum());
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }java
    }
}

生产者和消费者共用一个阻塞队列,

接下来我们编写测试代码

public class Test {
    public static void main(String[] args) throws InterruptedException {

        BlockingDeque<Data> queue = new LinkedBlockingDeque<>(10);

        Consumer consumer1 = new Consumer(queue);
        Consumer consumer2 = new Consumer(queue);
        Consumer consumer3 = new Consumer(queue);

        Producer producer1 = new Producer(queue);
        Producer producer2 = new Producer(queue);
        Producer producer3 = new Producer(queue);

        ExecutorService executorService = Executors.newCachedThreadPool();

        executorService.execute(producer1);
        executorService.execute(producer2);
        executorService.execute(producer3);

        executorService.execute(consumer1);
        executorService.execute(consumer2);
        executorService.execute(consumer3);

        Thread.sleep(3000);

        producer1.stop();
        producer2.stop();
        producer3.stop();
        Thread.sleep(1000);

        executorService.shutdown();
    }
}

image-20230314232201743

我们可以看到消费者顺序消费,最后一次生产20此时所有的生产者都停止生产了,但是此时产品池还没空,于是消费者继续消费,直到把产品池中的数据消耗完

生产者消费者可以做到解耦,实际应用如短信发送

我们的代码是自上而下同步执行的。发送短信是耗时的操作。如果短信被阻塞住,用户响应将会延迟。用户点击发送短信验证码按钮后,向后端发送报文后端接收到报文件,生成短信验证码,需要第三方服务器容联云发送短信阿里云将短信发送成功或失败结果反馈后,后端再根据结果,返回前端响应报文响应延迟会造成用户界面的倒计时延迟。前端收到后端发送短信成功的报文响应后,开始倒计时显示

解决思路:1. 异步发送短信

2.使用生产者消费者模式

  • 为了将发送短信从主业务中解耦出来,我们引入生产者消费者设计模式。
  • 它是最常用的解耦方式之一,寻找中间人(broker)搭桥,保证两个业务没有直接关联。 在这里插入图片描述

生产者只负责生产任务,生产者生产的数据发送到消息队列中。 消费者只负责处理处理数据,消费者处理的数据来源是从消息队列中取的。 生产者与消费者并不直接对接,所有数据和消息都通过中间人(消息队列broker)转交。 生产者消费者设计模式需要明确以下内容 任务:生产者生产的任务是什么 生产者、消费者、中间人是谁 结合本项目: 生产者: 商城后端,根据前端发送过来的请求,生成短信验证码(需要将短信验证码通过容联云发送到客户的手机上) 任务:发送短信 消费者:celery 中间人:redis 或 MQ 生产者(商城)生成短信验证码,通过些设计模式发送短信,将短信验证码发送到 中间人(redis)中,就直接将响应结果返回前端,不再等待短信发送状态 消费者:一直开启监听服务,当中间人(redis)有发送短信的需求时,取出短信验证码和相关信息,通过容联云发送短信 此方法避免了,生产者等待消费者把短信发完成后再将响应结果返回前端,实现了后端与发送短信操作的解耦。


Author: qwq小小舒
Reprint policy: All articles in this blog are used except for special statements CC BY 4.0 reprint policy. If reproduced, please indicate source qwq小小舒 !
  TOC