2.41 生产者和消费者
3 minute read
2.41.1 wait()
& notify()
public class ProducerConsumerDemo {
private int count = 0;
private final int FULL = 10;
private Object lock = new Object();
public static void main(String[] args) {
ProducerConsumerDemo pc = new ProducerConsumerDemo();
new Thread(pc.new Producer()).start();
new Thread(pc.new Consumer()).start();
}
class Producer implements Runnable {
public void run() {
for (int i = 0; i < 10; i++) {
...
synchronized(lock) {
while (count == FULL) {
try {
// 调用了Object#wait()立马释放锁
lock.wait();
} catch (InterruptException e) {
e.printStackTrace();
}
}
}
count++;
lock.notifyAll();
}
}
}
class Consumer implements Runnable {
public void run() {
for (int i = 0; i < 10; i++) {
...
synchronized(lock) {
while (count == 0) {
try {
// 调用了Object#wait()立马释放锁
lock.wait();
} catch (InterruptException e) {
e.printStackTrace();
}
}
}
count--;
lock.notifyAll();
}
}
}
}
2.41.2 ReentrantLock
public class ReentrantDemo {
private int count = 0;
private static final int FULL = 10;
private Lock lock = new ReentrantLock();
private final Condition notFull = lock.new Condition();
private final Condition notEmpty = lock.new Condition();
public static void main(String args[]) {
ReentrantDemo rt = new ReentrantDemo();
new Thread(rt.new Producer()).start();
new Thread(rt.new Comsumer()).start();
...
}
class Producer implements Runnable {
public void run() {
for (int i = 0; i < 10; i++) {
...
lock.lock();
try {
while (count == FULL) {
try {
notFull.await();
} catch(InterruptException e) {
...
}
}
count++;
notEmpty.signal();
} finally {
lock.unlock();
}
}
}
}
class Comsumer implements Runnable {
public void run() {
for (int i = 0; i < 10 ; i++) {
...
lock.lock();
try {
while (count == 0) {
try {
notEmpty.await();
} catch(InterruptException e) {
...
}
}
count--;
notFull.signal();
} finally {
lock.unlock();
}
}
}
}
}
BlockingQueue
public class BlockingQueueDemo {
private int count = 0;
private BlockingQueue<Integer> queue = new ArrayBlockingQueue<>();
public static void main(String args[]) {
BlockingQueueDemo bqd = new BlockingQueueDemo();
new Thread(bqd.new Producer()).start();
new Thread(bqd.new Consumer()).start();
}
class Producer implements Runnable {
public void run() {
for (int i = 0; i < 10; i++) {
...
try {
queue.put(count);
count++
} catch (InterruptException e) {
...
}
}
}
}
class Consumer implements Runnable {
public void run() {
for (int i = 0; i < 10; i++) {
...
try {
queue.take();
count--;
} catch(InterruptException e) {
...
}
}
}
}
}
信号量Semaphore
public class SemaphoreDemo {
private int count = 0;
final Semaphore notFull = new Semaphore(10);
final Semaphore notEmpty = new Semaphore(10);
final Semaphore mutex = new Semaphore(1);
public static void main(String args[]) {
SemaphoreDemo sd = new SemaphoreDemo();
new Thread(sd.new Producer()).start();
new Thread(sd.new Consumer()).start();
}
class Producer implements Runnable {
public void run() {
for (int i = 0 ; i < 10; i++) {
...
try {
notFull.acquire();
mutex.acquire();
count++;
} catch (InterruptException e) {
...
} finally {
mutex.release();
notEmpty.release();
}
}
}
}
class Consumer implements Runnable {
public void run() {
for (int i = 0; i < 10; i++) {
...
try {
notEmpty.acquire();
mutex.acquire();
count--;
} catch (InterruptException e) {
...
} finally {
mutex.release();
notFull.release();
}
}
}
}
}
管道
public class PipedDemo {
final PipedInputStream pis = new PipedInputStream();
final PipedOutputStream pos = new PipedOutputStream();
public static void main(String args[]) {
PipedDemo pd = new PipedDemo();
try {
pd.pis.connect(pos);
} catch (IOException e) {
...
}
new Thread(pd.new Producer()).start();
new Thread(pd.new Consumer()).start();
}
class Producer implements Runnable {
public void run() {
try {
while (true) {
int num = (int)(Math.random() * 255);
pos.write(num);
pos.flush();
}
} catch(Exception e) {
...
} finally {
try {
pos.close();
pis.close();
} catch (IOException e) {
...
}
}
}
}
class Consumer implements Runnable {
public void run() {
try {
while (true) {
int num = pis.read();
...
}
} catch(Exception e) {
...
} finally {
try {
pos.close();
pis.close();
} catch (IOException e) {
...
}
}
}
}
}
参考