歡迎光臨
每天分享高質量文章

Java 實現生產者 : 消費者模型

(點選上方公眾號,可快速關註)


來源:猴子007 ,

monkeysayhi.github.io/2017/10/08/Java實現生產者-消費者模型/

考查Java的併發程式設計時,手寫“生產者-消費者模型”是一個經典問題。有如下幾個考點:

  • 對Java併發模型的理解

  • 對Java併發程式設計介面的熟練程度

  • bug free

  • coding style

本文主要歸納了4種寫法,閱讀後,最好在白板上練習幾遍,檢查自己是否掌握。這4種寫法或者程式設計介面不同,或者併發粒度不同,但本質是相同的——都是在使用或實現BlockingQueue。

生產者-消費者模型

網上有很多生產者-消費者模型的定義和實現。本文研究最常用的有界生產者-消費者模型,簡單概括如下:

  • 生產者持續生產,直到緩衝區滿,阻塞;緩衝區不滿後,繼續生產

  • 消費者持續消費,直到緩衝區空,阻塞;緩衝區不空後,繼續消費

  • 生產者可以有多個,消費者也可以有多個

可透過如下條件驗證模型實現的正確性:

  • 同一產品的消費行為一定發生在生產行為之後

  • 任意時刻,緩衝區大小不小於0,不大於限制容量

該模型的應用和變種非常多,不贅述。

幾種寫法

準備

面試時可語言說明以下準備程式碼。關鍵部分需要實現,如AbstractConsumer。

下麵會涉及多種生產者-消費者模型的實現,可以先抽象出關鍵的介面,並實現一些抽象類:

public interface Consumer {

  void consume() throws InterruptedException;

}

public interface Producer {

  void produce() throws InterruptedException;

}

abstract class AbstractConsumer implements Consumer, Runnable {

  @Override

  public void run() {

    while (true) {

      try {

        consume();

      } catch (InterruptedException e) {

        e.printStackTrace();

        break;

      }

    }

  }

}

abstract class AbstractProducer implements Producer, Runnable {

  @Override

  public void run() {

    while (true) {

      try {

        produce();

      } catch (InterruptedException e) {

        e.printStackTrace();

        break;

      }

    }

  }

}

不同的模型實現中,生產者、消費者的具體實現也不同,所以需要為模型定義抽象工廠方法:

public interface Model {

  Runnable newRunnableConsumer();

  Runnable newRunnableProducer();

}

我們將Task作為生產和消費的單位:

public class Task {

  public int no;

  public Task(int no) {

    this.no = no;

  }

}

如果需求還不明確(這符合大部分工程工作的實際情況),建議邊實現邊抽象,不要“面向未來程式設計”。

實現一:BlockingQueue

BlockingQueue的寫法最簡單。核心思想是,把併發和容量控制封裝在緩衝區中。而BlockingQueue的性質天生滿足這個要求。

public class BlockingQueueModel implements Model {

  private final BlockingQueue queue;

  private final AtomicInteger increTaskNo = new AtomicInteger(0);

  public BlockingQueueModel(int cap) {

    // LinkedBlockingQueue 的佇列是 lazy-init 的,但 ArrayBlockingQueue 在建立時就已經 init

    this.queue = new LinkedBlockingQueue<>(cap);

  }

  @Override

  public Runnable newRunnableConsumer() {

    return new ConsumerImpl();

  }

  @Override

  public Runnable newRunnableProducer() {

    return new ProducerImpl();

  }

  private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable {

    @Override

    public void consume() throws InterruptedException {

      Task task = queue.take();

      // 固定時間範圍的消費,模擬相對穩定的伺服器處理過程

      Thread.sleep(500 + (long) (Math.random() * 500));

      System.out.println(“consume: ” + task.no);

    }

  }

  private class ProducerImpl extends AbstractProducer implements Producer, Runnable {

    @Override

    public void produce() throws InterruptedException {

      // 不定期生產,模擬隨機的使用者請求

      Thread.sleep((long) (Math.random() * 1000));

      Task task = new Task(increTaskNo.getAndIncrement());

      queue.put(task);

      System.out.println(“produce: ” + task.no);

    }

  }

  public static void main(String[] args) {

    Model model = new BlockingQueueModel(3);

    for (int i = 0; i < 2; i++) {

      new Thread(model.newRunnableConsumer()).start();

    }

    for (int i = 0; i < 5; i++) {

      new Thread(model.newRunnableProducer()).start();

    }

  }

}

擷取前面的一部分輸出:

produce: 0

produce: 4

produce: 2

produce: 3

produce: 5

consume: 0

produce: 1

consume: 4

produce: 7

consume: 2

produce: 8

consume: 3

produce: 6

consume: 5

produce: 9

consume: 1

produce: 10

consume: 7

由於操作“出隊/入隊+日誌輸出”不是原子的,所以上述日誌的絕對順序與實際的出隊/入隊順序有出入,但對於同一個任務號task.no,其consume日誌一定出現在其produce日誌之後,即:同一任務的消費行為一定發生在生產行為之後。緩衝區的容量留給讀者驗證。符合兩個驗證條件。

BlockingQueue寫法的核心只有兩行程式碼,併發和容量控制都封裝在了BlockingQueue中,正確性由BlockingQueue保證。面試中首選該寫法,自然美觀簡單。

實現二:wait && notify

如果不能將併發與容量控制都封裝在緩衝區中,就只能由消費者與生產者完成。最簡單的方案是使用樸素的wait && notify機制。

public class WaitNotifyModel implements Model {

  private final Object BUFFER_LOCK = new Object();

  private final Queue buffer = new LinkedList<>();

  private final int cap;

  private final AtomicInteger increTaskNo = new AtomicInteger(0);

  public WaitNotifyModel(int cap) {

    this.cap = cap;

  }

  @Override

  public Runnable newRunnableConsumer() {

    return new ConsumerImpl();

  }

  @Override

  public Runnable newRunnableProducer() {

    return new ProducerImpl();

  }

  private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable {

    @Override

    public void consume() throws InterruptedException {

      synchronized (BUFFER_LOCK) {

        while (buffer.size() == 0) {

          BUFFER_LOCK.wait();

        }

        Task task = buffer.poll();

        assert task != null;

        // 固定時間範圍的消費,模擬相對穩定的伺服器處理過程

        Thread.sleep(500 + (long) (Math.random() * 500));

        System.out.println(“consume: ” + task.no);

        BUFFER_LOCK.notifyAll();

      }

    }

  }

  private class ProducerImpl extends AbstractProducer implements Producer, Runnable {

    @Override

    public void produce() throws InterruptedException {

      // 不定期生產,模擬隨機的使用者請求

      Thread.sleep((long) (Math.random() * 1000));

      synchronized (BUFFER_LOCK) {

        while (buffer.size() == cap) {

          BUFFER_LOCK.wait();

        }

        Task task = new Task(increTaskNo.getAndIncrement());

        buffer.offer(task);

        System.out.println(“produce: ” + task.no);

        BUFFER_LOCK.notifyAll();

      }

    }

  }

  public static void main(String[] args) {

    Model model = new WaitNotifyModel(3);

    for (int i = 0; i < 2; i++) {

      new Thread(model.newRunnableConsumer()).start();

    }

    for (int i = 0; i < 5; i++) {

      new Thread(model.newRunnableProducer()).start();

    }

  }

}

驗證方法同上。

樸素的wait && notify機制不那麼靈活,但足夠簡單。synchronized、wait、notifyAll的用法可參考【Java併發程式設計】之十:使用wait/notify/notifyAll實現執行緒間通訊的幾點重要說明,著重理解喚醒與鎖競爭的區別。

http://blog.csdn.net/ns_code/article/details/17225469

實現三:簡單的Lock && Condition

我們要保證理解wait && notify機制。實現時可以使用Object類提供的wait()方法與notifyAll()方法,但更推薦的方式是使用java.util.concurrent包提供的Lock && Condition。

public class LockConditionModel1 implements Model {

  private final Lock BUFFER_LOCK = new ReentrantLock();

  private final Condition BUFFER_COND = BUFFER_LOCK.newCondition();

  private final Queue buffer = new LinkedList<>();

  private final int cap;

  private final AtomicInteger increTaskNo = new AtomicInteger(0);

  public LockConditionModel1(int cap) {

    this.cap = cap;

  }

  @Override

  public Runnable newRunnableConsumer() {

    return new ConsumerImpl();

  }

  @Override

  public Runnable newRunnableProducer() {

    return new ProducerImpl();

  }

  private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable {

    @Override

    public void consume() throws InterruptedException {

      BUFFER_LOCK.lockInterruptibly();

      try {

        while (buffer.size() == 0) {

          BUFFER_COND.await();

        }

        Task task = buffer.poll();

        assert task != null;

        // 固定時間範圍的消費,模擬相對穩定的伺服器處理過程

        Thread.sleep(500 + (long) (Math.random() * 500));

        System.out.println(“consume: ” + task.no);

        BUFFER_COND.signalAll();

      } finally {

        BUFFER_LOCK.unlock();

      }

    }

  }

  private class ProducerImpl extends AbstractProducer implements Producer, Runnable {

    @Override

    public void produce() throws InterruptedException {

      // 不定期生產,模擬隨機的使用者請求

      Thread.sleep((long) (Math.random() * 1000));

      BUFFER_LOCK.lockInterruptibly();

      try {

        while (buffer.size() == cap) {

          BUFFER_COND.await();

        }

        Task task = new Task(increTaskNo.getAndIncrement());

        buffer.offer(task);

        System.out.println(“produce: ” + task.no);

        BUFFER_COND.signalAll();

      } finally {

        BUFFER_LOCK.unlock();

      }

    }

  }

  public static void main(String[] args) {

    Model model = new LockConditionModel1(3);

    for (int i = 0; i < 2; i++) {

      new Thread(model.newRunnableConsumer()).start();

    }

    for (int i = 0; i < 5; i++) {

      new Thread(model.newRunnableProducer()).start();

    }

  }

}

該寫法的思路與實現二的思路完全相同,僅僅將鎖與條件變數換成了Lock和Condition。

實現四:更高併發效能的Lock && Condition

現在,如果做一些實驗,你會發現,實現一的併發效能高於實現二、三。暫且不關心BlockingQueue的具體實現,來分析看如何最佳化實現三(與實現二的思路相同,效能相當)的效能。

分析實現三的瓶頸

最好的查證方法是記錄方法執行時間,這樣可以直接定位到真正的瓶頸。但此問題較簡單,我們直接用“瞪眼法”分析。

實現三的併發瓶頸很明顯,因為在鎖 BUFFER_LOCK 看來,任何消費者執行緒與生產者執行緒都是一樣的。換句話說,同一時刻,最多隻允許有一個執行緒(生產者或消費者,二選一)操作緩衝區 buffer。

而實際上,如果緩衝區是一個佇列的話,“生產者將產品入隊”與“消費者將產品出隊”兩個操作之間沒有同步關係,可以在隊首出隊的同時,在隊尾入隊。理想效能可提升至實現三的兩倍。

去掉這個瓶頸

那麼思路就簡單了:需要兩個鎖 CONSUME_LOCK與PRODUCE_LOCK,CONSUME_LOCK控制消費者執行緒併發出隊,PRODUCE_LOCK控制生產者執行緒併發入隊;相應需要兩個條件變數NOT_EMPTY與NOT_FULL,NOT_EMPTY負責控制消費者執行緒的狀態(阻塞、執行),NOT_FULL負責控制生產者執行緒的狀態(阻塞、執行)。以此讓最佳化消費者與消費者(或生產者與生產者)之間是序列的;消費者與生產者之間是並行的。

public class LockConditionModel2 implements Model {

  private final Lock CONSUME_LOCK = new ReentrantLock();

  private final Condition NOT_EMPTY = CONSUME_LOCK.newCondition();

  private final Lock PRODUCE_LOCK = new ReentrantLock();

  private final Condition NOT_FULL = PRODUCE_LOCK.newCondition();

  private final Buffer buffer = new Buffer<>();

  private AtomicInteger bufLen = new AtomicInteger(0);

  private final int cap;

  private final AtomicInteger increTaskNo = new AtomicInteger(0);

  public LockConditionModel2(int cap) {

    this.cap = cap;

  }

  @Override

  public Runnable newRunnableConsumer() {

    return new ConsumerImpl();

  }

  @Override

  public Runnable newRunnableProducer() {

    return new ProducerImpl();

  }

  private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable {

    @Override

    public void consume() throws InterruptedException {

      int newBufSize = -1;

      CONSUME_LOCK.lockInterruptibly();

      try {

        while (bufLen.get() == 0) {

          System.out.println(“buffer is empty…”);

          NOT_EMPTY.await();

        }

        Task task = buffer.poll();

        newBufSize = bufLen.decrementAndGet();

        assert task != null;

        // 固定時間範圍的消費,模擬相對穩定的伺服器處理過程

        Thread.sleep(500 + (long) (Math.random() * 500));

        System.out.println(“consume: ” + task.no);

        if (newBufSize > 0) {

          NOT_EMPTY.signalAll();

        }

      } finally {

        CONSUME_LOCK.unlock();

      }

      if (newBufSize < cap) {

        PRODUCE_LOCK.lockInterruptibly();

        try {

          NOT_FULL.signalAll();

        } finally {

          PRODUCE_LOCK.unlock();

        }

      }

    }

  }

  private class ProducerImpl extends AbstractProducer implements Producer, Runnable {

    @Override

    public void produce() throws InterruptedException {

      // 不定期生產,模擬隨機的使用者請求

      Thread.sleep((long) (Math.random() * 1000));

      int newBufSize = -1;

      PRODUCE_LOCK.lockInterruptibly();

      try {

        while (bufLen.get() == cap) {

          System.out.println(“buffer is full…”);

          NOT_FULL.await();

        }

        Task task = new Task(increTaskNo.getAndIncrement());

        buffer.offer(task);

        newBufSize = bufLen.incrementAndGet();

        System.out.println(“produce: ” + task.no);

        if (newBufSize < cap) {

          NOT_FULL.signalAll();

        }

      } finally {

        PRODUCE_LOCK.unlock();

      }

      if (newBufSize > 0) {

        CONSUME_LOCK.lockInterruptibly();

        try {

          NOT_EMPTY.signalAll();

        } finally {

          CONSUME_LOCK.unlock();

        }

      }

    }

  }

  private static class Buffer {

    private Node head;

    private Node tail;

    Buffer() {

      // dummy node

      head = tail = new Node(null);

    }

    public void offer(E e) {

      tail.next = new Node(e);

      tail = tail.next;

    }

    public E poll() {

      head = head.next;

      E e = head.item;

      head.item = null;

      return e;

    }

    private class Node {

      E item;

      Node next;

      Node(E item) {

        this.item = item;

      }

    }

  }

  public static void main(String[] args) {

    Model model = new LockConditionModel2(3);

    for (int i = 0; i < 2; i++) {

      new Thread(model.newRunnableConsumer()).start();

    }

    for (int i = 0; i < 5; i++) {

      new Thread(model.newRunnableProducer()).start();

    }

  }

需要註意的是,由於需要同時在UnThreadSafe的緩衝區 buffer 上進行消費與生產,我們不能使用實現二、三中使用的佇列了,需要自己實現一個簡單的緩衝區 Buffer。Buffer要滿足以下條件:

  • 在頭部出隊,尾部入隊

  • 在poll()方法中只操作head

  • 在offer()方法中只操作tail

還能進一步最佳化嗎

我們已經最佳化掉了消費者與生產者之間的瓶頸,還能進一步最佳化嗎?

如果可以,必然是繼續最佳化消費者與消費者(或生產者與生產者)之間的併發效能。然而,消費者與消費者之間必須是序列的,因此,併發模型上已經沒有地方可以繼續優化了。

不過在具體的業務場景中,一般還能夠繼續最佳化。如:

  • 併發規模中等,可考慮使用CAS代替重入鎖

  • 模型上不能最佳化,但一個消費行為或許可以進一步拆解、最佳化,從而降低消費的延遲

  • 一個佇列的併發效能達到了極限,可採用“多個佇列”(如分散式訊息佇列等)

4種實現的本質

文章開頭說:這4種寫法的本質相同——都是在使用或實現BlockingQueue。實現一直接使用BlockingQueue,實現四實現了簡單的BlockingQueue,而實現二、三則實現了退化版的BlockingQueue(效能降低一半)。

實現一使用的BlockingQueue實現類是LinkedBlockingQueue,給出其原始碼閱讀對照,寫的不難:

public class LinkedBlockingQueue extends AbstractQueue

        implements BlockingQueue, java.io.Serializable {

/** Lock held by take, poll, etc */

    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */

    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */

    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */

    private final Condition notFull = putLock.newCondition();

    /**

     * Signals a waiting take. Called only from put/offer (which do not

     * otherwise ordinarily lock takeLock.)

     */

    private void signalNotEmpty() {

        final ReentrantLock takeLock = this.takeLock;

        takeLock.lock();

        try {

            notEmpty.signal();

        } finally {

            takeLock.unlock();

        }

    }

    /**

     * Signals a waiting put. Called only from take/poll.

     */

    private void signalNotFull() {

        final ReentrantLock putLock = this.putLock;

        putLock.lock();

        try {

            notFull.signal();

        } finally {

            putLock.unlock();

        }

    }

    /**

     * Links node at end of queue.

     *

     * @param node the node

     */

    private void enqueue(Node node) {

        // assert putLock.isHeldByCurrentThread();

        // assert last.next == null;

        last = last.next = node;

    }

    /**

     * Removes a node from head of queue.

     *

     * @return the node

     */

    private E dequeue() {

        // assert takeLock.isHeldByCurrentThread();

        // assert head.item == null;

        Node h = head;

        Node first = h.next;

        h.next = h; // help GC

        head = first;

        E x = first.item;

        first.item = null;

        return x;

    }

    /**

     * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.

     *

     * @param capacity the capacity of this queue

     * @throws IllegalArgumentException if {@code capacity} is not greater

     *         than zero

     */

    public LinkedBlockingQueue(int capacity) {

        if (capacity <= 0) throw new IllegalArgumentException();

        this.capacity = capacity;

        last = head = new Node(null);

    }

    /**

     * Inserts the specified element at the tail of this queue, waiting if

     * necessary for space to become available.

     *

     * @throws InterruptedException {@inheritDoc}

     * @throws NullPointerException {@inheritDoc}

     */

    public void put(E e) throws InterruptedException {

        if (e == null) throw new NullPointerException();

        // Note: convention in all put/take/etc is to preset local var

        // holding count negative to indicate failure unless set.

        int c = -1;

        Node node = new Node(e);

        final ReentrantLock putLock = this.putLock;

        final AtomicInteger count = this.count;

        putLock.lockInterruptibly();

        try {

            /*

             * Note that count is used in wait guard even though it is

             * not protected by lock. This works because count can

             * only decrease at this point (all other puts are shut

             * out by lock), and we (or some other waiting put) are

             * signalled if it ever changes from capacity. Similarly

             * for all other uses of count in other wait guards.

             */

            while (count.get() == capacity) {

                notFull.await();

            }

            enqueue(node);

            c = count.getAndIncrement();

            if (c + 1 < capacity)

                notFull.signal();

        } finally {

            putLock.unlock();

        }

        if (c == 0)

            signalNotEmpty();

    }

    public E take() throws InterruptedException {

        E x;

        int c = -1;

        final AtomicInteger count = this.count;

        final ReentrantLock takeLock = this.takeLock;

        takeLock.lockInterruptibly();

        try {

            while (count.get() == 0) {

                notEmpty.await();

            }

            x = dequeue();

            c = count.getAndDecrement();

            if (c > 1)

                notEmpty.signal();

        } finally {

            takeLock.unlock();

        }

        if (c == capacity)

            signalNotFull();

        return x;

    }

}

還存在非常多的寫法,如訊號量Semaphore,也很常見(本科作業系統教材中的生產者-消費者模型就是用訊號量實現的)。不過追究過多了就好像在糾結茴香豆的寫法一樣,本文不繼續探討。

總結

實現一必須掌握,實現四至少要能清楚表述;實現二、三掌握一個即可。

看完本文有收穫?請轉發分享給更多人

關註「ImportNew」,提升Java技能

贊(0)

分享創造快樂