ProducerConsumer---生产者消费者问题

1 用wait()/notify()实现

class EmailMessageQueueClass {
    Queue<String> q = new LinkedList<String>();
    private int queueMaxSize = 10;

    synchronized void retrieve() throws InterruptedException {
        while (true) {
            //条件检查可以避免missed signal
            //while循环可以避免supurious wakeup(伪唤醒)
            while (q.isEmpty()) {
                wait();
            }
            q.remove();
            notifyAll();
        }
    }

    synchronized void adds(String emailMessage) throws InterruptedException {
        while (q.size() == queueMaxSize) {
            wait();
        }
        q.add(emailMessage);
        notify();
    }

    public static void main(String... s) throws InterruptedException {
        EmailMessageQueueClass messageQueue = new EmailMessageQueueClass();
        new Thread(new Consumer(messageQueue)).start();
        for (int i = 0; i < 1000; ++i) {
            new Thread(new Producer("Email Message " + i, messageQueue))
                    .start();
        }
    }
}


class Consumer implements Runnable {
    EmailMessageQueueClass messageQueue;

    public Consumer(EmailMessageQueueClass messageQueue) {
        this.messageQueue = messageQueue;
    }

    public void run() {
        try {
            messageQueue.retrieve();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Producer implements Runnable {
    EmailMessageQueueClass messageQueue;
    String emailMessageContent;

    public Producer(String message, EmailMessageQueueClass messageQueue) {
        emailMessageContent = message;
        this.messageQueue = messageQueue;
    }

    public void run() {
        try {
            messageQueue.adds(emailMessageContent);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
    }
}

2 用Semaphore实现

public class ConsumerProducer{

    public static void main(String[] args) {

           Semaphore semaphoreProducer=new Semaphore(1);
           Semaphore semaphoreConsumer=new Semaphore(0);
           System.out.println("semaphoreProducer permit=1 | semaphoreConsumer permit=0");

       Producer producer=new Producer(semaphoreProducer,semaphoreConsumer);
       Consumer consumer=new Consumer(semaphoreConsumer,semaphoreProducer);

        Thread producerThread = new Thread(producer, "ProducerThread");
        Thread consumerThread = new Thread(consumer, "ConsumerThread");

        producerThread.start();
        consumerThread.start();

    }
}

class Producer implements Runnable{

    Semaphore semaphoreProducer;
    Semaphore semaphoreConsumer;


    public Producer(Semaphore semaphoreProducer,Semaphore semaphoreConsumer) {
           this.semaphoreProducer=semaphoreProducer;
           this.semaphoreConsumer=semaphoreConsumer;
    }

    public void run() {
           for(int i=1;i<=5;i++){
                  try {
                      semaphoreProducer.acquire();
                      System.out.println("Produced : "+i);
                      semaphoreConsumer.release();

                  } catch (InterruptedException e) {
                        e.printStackTrace();
                  }
           }          
    }
}

class Consumer implements Runnable{

    Semaphore semaphoreConsumer;
    Semaphore semaphoreProducer;

    public Consumer(Semaphore semaphoreConsumer,Semaphore semaphoreProducer) {
           this.semaphoreConsumer=semaphoreConsumer;
           this.semaphoreProducer=semaphoreProducer;
    }

    public void run() {

           for(int i=1;i<=5;i++){
                  try {
                      semaphoreConsumer.acquire();
                      System.out.println("Consumed : "+i);
                      semaphoreProducer.release();
                  } catch (InterruptedException e) {
                        e.printStackTrace();
                  }
           }
    }

}

results matching ""

    No results matching ""