前言

最近在看线程池的实现原理,后面又看了源码,发现其内部是一个生产者-消费者模型,用户是提交的任务(task)是生产者,线程池中的线程(worker)是消费者。

然后感觉对生产者-消费者模型的实现,磕磕绊绊的,所以记录一下。下面会涉及多种生产者-消费者模型的实现,可以先抽象出关键的接口,并实现一些抽象类:

准备

生产者消费者接口

生产者接口

public interface Producer {
    void produce() throws InterruptedException;
}

消费者接口

public interface Consumer {
    void consume() throws InterruptedException;
}

生产者消费者抽象类

生产者抽象类

abstract class AbstractProducer implements Runnable, Producer {
    @Override
    public void run() {
        while (true){
            try{
                produce();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
}

消费者抽象类

abstract class AbstractConsumer implements Consumer, Runnable {
    @Override
    public void run() {
        while (true){
            try{
                consume();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
}

Model接口

对应的实现生产者-消费者模型,不同的生产者-消费者实现模型都实现该接口,

public interface Model {
    Runnable newRunnableConsumer();
    Runnable newRunnableProducer();
}

Task

将Task作为生产和消费的基本单位

public class Task {
    private int no;

    public Task(int no){
        this.no = no;

    }

    public int getNo() {
        return no;
    }

    public void setNo(int no) {
        this.no = no;
    }
}

实现一:BlockingQueue

BlockingQueue的写法最简单。核心思想是,把并发和容量控制封装在缓冲区中。而BlockingQueue的性质天生满足这个要求。

package com.train.concurrent.procon;


import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @Time: 2020/4/21下午8:54
 * @Author: kongwiki
 * @Email: kongwiki@163.com
 */
public class BlockingQueueModel implements Model {
    private BlockingQueue<Task> queue;
    private final AtomicInteger increTaskNo = new AtomicInteger(0);

    public BlockingQueueModel(int cap){
        this.queue = new LinkedBlockingDeque<>(cap);
    }


    @Override
    public Runnable newRunnableConsumer() {
        return new ConsumerImpl();
    }

    @Override
    public Runnable newRunnableProducer() {
        return new ProducerImpl();
    }

    private class ConsumerImpl extends AbstractConsumer implements Runnable, Consumer{

        @Override
        public void consume() throws InterruptedException {
            Task task = queue.take();
            Thread.sleep(500 + (long) (Math.random() * 500));
            System.out.println("consume: " + task.getNo());

        }
    }

    private class ProducerImpl extends AbstractProducer implements Runnable, Producer{
        @Override
        public void produce() throws InterruptedException {
            Thread.sleep((long) (500 + (Math.random()) * 1000));
            Task task = new Task(increTaskNo.getAndIncrement());
            System.out.println("produce: " + task.getNo());
            queue.put(task);
        }
    }

    public static void main(String[] args) {
        Model model = new BlockingQueueModel(3);
        for (int i = 0; i < 2; i++) {
            new Thread(model.newRunnableConsumer()).start();
        }

        for (int i = 0; i < 5; i++) {
            new Thread(model.newRunnableProducer()).start();
        }
    }
}

实现二:wait&notify

package com.train.concurrent.procon;


import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @Time: 2020/4/22上午7:40
 * @Author: kongwiki
 * @Email: kongwiki@163.com
 */
public class WaitNotifyModel implements Model {
    private int integer;
    private final Object buffer_lock = new Object();
    private final Queue<Task> buffer = new LinkedList<>();
    private int cap;

    public WaitNotifyModel(int cap){
        this.cap = cap;
    }

    @Override
    public Runnable newRunnableConsumer() {
        return new ConsumerImpl();
    }

    @Override
    public Runnable newRunnableProducer() {
        return new ProducerImpl();
    }

    public class ProducerImpl extends AbstractProducer implements Runnable, Producer{

        @Override
        public void produce() throws InterruptedException {
            Thread.sleep((long) (500 + (Math.random() * 1000)));
            synchronized (buffer_lock){
                while (buffer.size() == cap){
                    buffer_lock.wait();
                }
                Task task = new Task(integer++);
                buffer.add(task);
                System.out.println("produce: " + task.getNo());
                buffer_lock.notifyAll();
            }
        }
    }

    public class ConsumerImpl extends AbstractConsumer implements Runnable, Consumer{

        @Override
        public void consume() throws InterruptedException {
            synchronized (buffer_lock){
                while (buffer.size() == 0){
                    buffer_lock.wait();
                }
                Task task = buffer.poll();
                assert task != null;
                Thread.sleep((long) (500 + (Math.random() * 1000)));
                System.out.println("consume: " + task.getNo());
                buffer_lock.notifyAll();

            }
        }
    }

    public static void main(String[] args) {
        Model model = new WaitNotifyModel(3);
        for (int i = 0; i < 3; i++) {
            new Thread(model.newRunnableConsumer()).start();
        }

        for (int i = 0; i < 5; i++) {
            new Thread(model.newRunnableProducer()).start();
        }
    }
 }

实现三:lock && condition

package com.train.concurrent.procon;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @Time: 2020/4/22上午8:01
 * @Author: kongwiki
 * @Email: kongwiki@163.com
 */
public class LockConditionModel implements Model {
    private final Lock BUFFER_LOCK = new ReentrantLock();
    private final Condition BUFFER_COND = BUFFER_LOCK.newCondition();
    private int integer;
    private Queue<Task> buffer = new LinkedList<>();
    private int cap;

    public LockConditionModel(int cap){
        this.cap = cap;
    }

    @Override
    public Runnable newRunnableConsumer() {
        return new ConsumerImpl();
    }

    @Override
    public Runnable newRunnableProducer() {
        return new ProducerImpl();
    }

    public class ProducerImpl extends AbstractProducer implements Producer, Runnable{

        @Override
        public void produce() throws InterruptedException {
            Thread.sleep((long) (500 + (Math.random() * 1000)));
            BUFFER_LOCK.lockInterruptibly();
            try{
                while (buffer.size() == cap){
                    BUFFER_COND.await();
                }
                Task task = new Task(integer++);
                buffer.offer(task);
                System.out.println("produce: "+ task.getNo());
                BUFFER_COND.signalAll();
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                BUFFER_LOCK.unlock();
            }
        }
    }

    public class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable{

        @Override
        public void consume() throws InterruptedException {
            BUFFER_LOCK.lockInterruptibly();
            try{
                while (buffer.size() == 0){
                    BUFFER_COND.await();
                }
                Task task = buffer.poll();
                assert task != null;
                System.out.println("consume: " + task.getNo());
                BUFFER_COND.signalAll();
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                BUFFER_LOCK.unlock();
            }
        }
    }

    public static void main(String[] args) {
        Model model = new LockConditionModel(3);
        for (int i = 0; i < 3; i++) {
            new Thread(model.newRunnableConsumer()).start();
        }

        for (int i = 0; i < 5; i++) {
            new Thread(model.newRunnableProducer()).start();
        }
    }
}