CodeGym /课程 /JAVA 25 SELF /文件处理流水线:生产者–消费者

文件处理流水线:生产者–消费者

JAVA 25 SELF
第 59 级 , 课程 4
可用

1. 在处理文件时如何协调多个线程

当你处理大型文件或复杂的数据处理时,常常会遇到一个需求:将工作拆分给多个线程。例如,一个线程从文件读取每一行,其他线程——处理这些行(查找单词、统计等)。

问题是什么?

  • 如果所有线程都访问同一个资源(例如文件或共享集合),很容易出错:一些线程可能“跑在前面”,从而出现竞态、数据丢失、内存过载。
  • 如果线程很多而没有协调——有的会空闲,有的会过载。

我们需要一个解决方案,它:

  • 允许在线程间安全地交换数据。
  • 在处理速度慢于读取速度时避免内存被填满。
  • 当数据用尽时,能轻松结束所有线程。

2. “Producer–Consumer” 模式(生产者–消费者)

Producer–Consumer 是一个经典模式,帮助线程协调工作、互不干扰。

该模式中有两个角色:Producer(生产者)创建数据——例如读取文件行或接收网络消息——并把它们放进共享队列。Consumer(消费者)从队列取出数据并处理:统计单词、保存到数据库或写入另一个文件。

核心思想是这两类线程相互独立地工作。生产者可能比消费者处理得快,或相反——但双方都不会彼此阻塞。它们之间有一个缓冲区——队列,用来平衡工作节奏。

可视化示意

[文件] --(读取)--> [Producer] --(入队)--> [BlockingQueue] --(取出)--> [Consumer] --(处理)

3. 使用 BlockingQueue 的实现

在 Java 中,为实现这样的交换,接口 BlockingQueue(例如实现 ArrayBlockingQueue)非常适合。

什么是 BlockingQueue?

BlockingQueue 是一种线程安全、带边界的队列,它自己处理同步。如果生产者尝试添加元素而队列已满,线程会自动阻塞并等待出现空位。类似地,如果消费者尝试取元素而队列为空,它就会等待,直到有人把东西放进队列。

这种机制能自动解决线程“抢跑”和内存溢出等经典问题:生产者不会向队列塞入过多元素,消费者也不会对“空无一物”进行处理。所有线程都能平稳、协同地完成工作。

创建队列的示例

import java.util.concurrent.*;

BlockingQueue<String> queue = new ArrayBlockingQueue<>(100); // 缓冲区大小为 100 个元素

100 —— 队列中的最大元素数。如果 producer 更快,它会等待,直到 consumer 处理掉一部分数据。

4. 线程协调:backpressure 与结束流程

限制队列大小(backpressure)

Backpressure 是一种机制,当 consumer 处理不过来时,防止 producer “灌爆”内存。

  • 如果队列已满,producer 会自动“减速”(方法 put() 会阻塞)。
  • 如果队列为空,consumer 会等待(方法 take() 会阻塞)。

这使系统即使在不同线程速度下也能稳定运行。

结束流程:“毒丸” (poison pill)

当 producer 读完文件后,需要告知 consumers 数据已经结束,应该收尾退出。

解决方案:

  • 向队列放入一个特殊对象——“毒丸”(例如字符串 "__END__" 或其他特殊值,但不是 null)。
  • Consumer 收到“毒丸”后,就明白该退出了。

如果有多个 consumer,需要放入与 consumer 数量相同的“毒丸”!

5. 流水线示例:读取文件并处理行

让我们实现一个简单的流水线:

  • 一个线程从文件读取行并把它们放入队列。
  • 多个线程从队列取出行,计算单词数量并输出结果。

步骤 1. Producer —— 文件读取器

import java.io.*;
import java.util.concurrent.*;

public class FileProducer implements Runnable {
    private final BlockingQueue<String> queue;
    private final File file;
    private final int consumerCount;
    private final String POISON_PILL = "__END__";

    public FileProducer(BlockingQueue<String> queue, File file, int consumerCount) {
        this.queue = queue;
        this.file = file;
        this.consumerCount = consumerCount;
    }

    @Override
    public void run() {
        try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
            String line;
            while ((line = reader.readLine()) != null) {
                queue.put(line); // 若队列已满 —— 等待
            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 为每个 consumer 放入一颗毒丸
            try {
                for (int i = 0; i < consumerCount; i++) {
                    queue.put(POISON_PILL);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

步骤 2. Consumer —— 行处理器

import java.util.concurrent.BlockingQueue;

public class LineConsumer implements Runnable {
    private final BlockingQueue<String> queue;
    private final String POISON_PILL = "__END__";

    public LineConsumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                String line = queue.take(); // 若队列为空 —— 等待
                if (POISON_PILL.equals(line)) {
                    break; // 结束工作
                }
                int wordCount = line.trim().isEmpty() ? 0 : line.trim().split("\\s+").length;
                System.out.println(Thread.currentThread().getName() + ": " + wordCount + " 个词在该行中: " + line);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

步骤 3. 通过 Executors 启动流水线

import java.io.File;
import java.util.concurrent.*;

public class PipelineDemo {
    public static void main(String[] args) throws Exception {
        int consumerCount = 3;
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);

        File file = new File("input.txt"); // 你的文件

        // 启动 producer
        Thread producer = new Thread(new FileProducer(queue, file, consumerCount));
        producer.start();

        // 通过 ExecutorService 启动 consumers
        ExecutorService consumers = Executors.newFixedThreadPool(consumerCount);
        for (int i = 0; i < consumerCount; i++) {
            consumers.submit(new LineConsumer(queue));
        }

        // 等待 producer 结束
        producer.join();

        // 等待 consumers 结束
        consumers.shutdown();
        consumers.awaitTermination(1, TimeUnit.MINUTES);

        System.out.println("处理完成!");
    }
}

6. 可视化:流水线工作示意图

flowchart LR
    A[Producer: 读取文件] -- put() --> Q[BlockingQueue]
    Q -- take() --> C1[Consumer 1: 统计单词数]
    Q -- take() --> C2[Consumer 2: 统计单词数]
    Q -- take() --> C3[Consumer 3: 统计单词数]
    A -.->|poison pill| Q
    Q -.->|poison pill| C1
    Q -.->|poison pill| C2
    Q -.->|poison pill| C3

7. 有用的细节与最佳实践

  • 限制队列大小 —— 这能防止内存溢出。
  • 使用“毒丸”来结束 —— 否则 consumers 可能会一直挂起。
  • 不要用 null 作为“毒丸”,如果队列中可能出现真实的 null 值。最好使用专用字符串或对象。
  • 处理 InterruptedException —— 这对正确结束线程很重要。
  • 使用 ExecutorService 管理线程池——比手动创建线程更简单、更安全。

8. 实现流水线时的常见错误

错误 1:无界队列 → OutOfMemoryError。 如果使用未限制大小的 LinkedBlockingQueue,当 consumer 跟不上时,producer 可能会“灌爆”内存。

错误 2:没有放入“毒丸” → consumers 挂起。 如果忘记放入“毒丸”,消费者线程将会一直等待新数据。

错误 3:只放入一个“毒丸”,但有多个 consumers。 每个消费者线程都需要自己的“毒丸”——否则不是所有线程都会结束。

错误 4:未处理 InterruptedException 如果线程被中断,需要正确收尾(恢复中断标志),否则可能出现“挂死”的线程。

错误 5:在线程之间未同步地共享变量。 不要“造轮子”——使用 BlockingQueue,而不是普通列表或数组。

1
调查/小测验
并行处理文件第 59 级,课程 4
不可用
并行处理文件
并行处理文件
评论
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION