2.41 生产者和消费者

3 minute read

2.41.1 wait() & notify()

  public class ProducerConsumerDemo {
    private int count = 0;
    private final int FULL = 10;
    private Object lock = new Object();

    public static void main(String[] args) {
      ProducerConsumerDemo pc = new ProducerConsumerDemo();
      new Thread(pc.new Producer()).start();
      new Thread(pc.new Consumer()).start();
    }

    class Producer implements Runnable {
      public void run() {
        for (int i = 0; i < 10; i++) {
          ...
          synchronized(lock) {
            while (count == FULL) {
              try {
                // 调用了Object#wait()立马释放锁
                lock.wait();
              } catch (InterruptException e) {
                e.printStackTrace();
              }
            }
          }
          count++;
          lock.notifyAll();
        }
      }
    }

    class Consumer implements Runnable {
      public void run() {
        for (int i = 0; i < 10; i++) {
          ...
          synchronized(lock) {
            while (count == 0) {
              try {
                // 调用了Object#wait()立马释放锁
                lock.wait();
              } catch (InterruptException e) {
                e.printStackTrace();
              }
            }
          }
          count--;
          lock.notifyAll();
        }
      }
    }
  }

2.41.2 ReentrantLock

  public class ReentrantDemo {
    private int count = 0;
    private static final int FULL = 10;
    private Lock lock = new ReentrantLock();
    private final Condition notFull = lock.new Condition();
    private final Condition notEmpty = lock.new Condition();

    public static void main(String args[]) {
      ReentrantDemo rt = new ReentrantDemo();
      new Thread(rt.new Producer()).start();
      new Thread(rt.new Comsumer()).start();
      ...
    }

    class Producer implements Runnable {

      public void run() {
        for (int i = 0; i < 10; i++) {
          ...
          lock.lock();
          try {
            while (count == FULL) {
              try {
                notFull.await();
              } catch(InterruptException e) {
                ...
              }
            }
            count++;
            notEmpty.signal();
          } finally {
            lock.unlock();
          }
        }
      }
    }

    class Comsumer implements Runnable {
      public void run() {
        for (int i = 0; i < 10 ; i++) {
          ...
          lock.lock();
          try {
            while (count == 0) {
              try {
                notEmpty.await();
              } catch(InterruptException e) {
                ...
              }
            }
            count--;
            notFull.signal();
          } finally {
            lock.unlock();
          }
        }
      }
    }
  }

BlockingQueue

  public class BlockingQueueDemo {
    private int count = 0;
    private BlockingQueue<Integer> queue = new ArrayBlockingQueue<>();
    public static void main(String args[]) {
      BlockingQueueDemo bqd = new BlockingQueueDemo();
      new Thread(bqd.new Producer()).start();
      new Thread(bqd.new Consumer()).start();
    }

    class Producer implements Runnable {
      public void run() {
        for (int i = 0; i < 10; i++) {
          ...
          try {
            queue.put(count);
            count++
          } catch (InterruptException e) {
            ...
          }
        }
      }
    }

    class Consumer implements Runnable {
      public void run() {
        for (int i = 0; i < 10; i++) {
          ...
          try {
            queue.take();
            count--;
          } catch(InterruptException e) {
            ...
          }
        }
      }
    }
  }

信号量Semaphore

public class SemaphoreDemo {
  private int count = 0;
  final Semaphore notFull = new Semaphore(10);
  final Semaphore notEmpty = new Semaphore(10);
  final Semaphore mutex = new Semaphore(1);
  public static void main(String args[]) {
    SemaphoreDemo sd = new SemaphoreDemo();
    new Thread(sd.new Producer()).start();
    new Thread(sd.new Consumer()).start();
  }

  class Producer implements Runnable {
    public void run() {
      for (int i = 0 ; i < 10; i++) {
        ...
        try {
          notFull.acquire();
          mutex.acquire();
          count++;
        } catch (InterruptException e) {
          ...
        } finally {
          mutex.release();
          notEmpty.release();
        }
      }
    }
  }

  class Consumer implements Runnable {
    public void run() {
      for (int i = 0; i < 10; i++) {
        ...
        try {
          notEmpty.acquire();
          mutex.acquire();
          count--;
        } catch (InterruptException e) {
          ...
        } finally {
          mutex.release();
          notFull.release();
        }
      }
    }
  }
}

管道

  public class PipedDemo {
    final PipedInputStream pis = new PipedInputStream();
    final PipedOutputStream pos = new PipedOutputStream();
    public static void main(String args[]) {
      PipedDemo pd = new PipedDemo();
      try {
        pd.pis.connect(pos);
      } catch (IOException e) {
        ...
      }

      new Thread(pd.new Producer()).start();
      new Thread(pd.new Consumer()).start();
    }

    class Producer implements Runnable {
      public void run() {
        try {
          while (true) {
            int num = (int)(Math.random() * 255);
            pos.write(num);
            pos.flush();
          }
        } catch(Exception e) {
          ...
        } finally {
          try {
            pos.close();
            pis.close();
          } catch (IOException e) {
            ...
          }
        }
      }
    }

    class Consumer implements Runnable {
      public void run() {
        try {
          while (true) {
            int num = pis.read();
            ...
          }
        } catch(Exception e) {
          ...
        } finally {
          try {
            pos.close();
            pis.close();
          } catch (IOException e) {
            ...
          }
        }
      }
    }
  }

参考