Java多线程生产者-消费者模型实现示例

Répondu en utilisant GPT-4o-mini par Java Q&A Hub

Question :

Java面试题之多线程 现成程序中的Test类中的代码在不断地产生数据,然后交给TestDo.doSome()方法去处理,就好像生产者在不断地产生数据,消费者在不断消费数据。请将程序改造成有10个线程来消费生成者产生的数据,这些消费者都调用TestDo.doSome()方法去进行处理,故每个消费者都需要一秒才能处理完,程序应保证这些消费者线程依次有序地消费数据,只有上一个消费者消费完后,下一个消费者才能消费数据,下一个消费者是谁都可以,但要保证这些消费者线程拿到的数据是有顺序的。

Réponse :

为了实现一个多线程的生产者-消费者模型,其中消费者线程依次有序地消费数据,我们可以使用Java的BlockingQueue来存储生产者产生的数据,并使用一个ReentrantLockSemaphore来确保消费者线程的顺序执行。

以下是一个示例程序,展示了如何实现这个需求:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

class Test {
    private static final int QUEUE_CAPACITY = 10;
    private static final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
    private static int data = 0;

    public static void main(String[] args) {
        // 启动生产者线程
        Thread producer = new Thread(() -> {
            while (true) {
                try {
                    // 生产数据
                    queue.put(data++);
                    System.out.println("Produced: " + (data - 1));
                    Thread.sleep(500); // 模拟生产时间
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        // 启动消费者线程
        for (int i = 0; i < 10; i++) {
            new Thread(new Consumer()).start();
        }

        producer.start();
    }

    static class Consumer implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    // 从队列中获取数据
                    Integer item = queue.take();
                    doSome(item);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }

        private void doSome(Integer item) {
            // 模拟处理时间
            System.out.println("Consumed: " + item);
            try {
                Thread.sleep(1000); // 模拟处理时间
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

代码说明:

  1. BlockingQueue: 我们使用ArrayBlockingQueue来存储生产者产生的数据。这个队列是线程安全的,能够有效地处理多线程环境下的生产和消费。

  2. 生产者线程: 生产者线程不断地生成数据并将其放入队列中。这里我们使用queue.put(data++)来将数据放入队列,并在每次生产后休眠500毫秒以模拟生产时间。

  3. 消费者线程: 我们创建了10个消费者线程,每个线程都在不断地从队列中取出数据并调用doSome()方法进行处理。queue.take()会阻塞直到有数据可用。

  4. 顺序消费: 由于BlockingQueue的特性,消费者线程会按照数据被放入队列的顺序来消费数据。即使有多个消费者线程,它们也会依次处理数据,确保数据的顺序性。

  5. 处理时间: 在doSome()方法中,我们模拟了处理时间,消费者每处理完一个数据项后会休眠1秒。

注意事项:

  • 这个程序是一个无限循环的示例,实际应用中可能需要添加适当的退出条件。
  • 生产者和消费者的数量可以根据实际需求进行调整。