1. 在处理文件时如何协调多个线程
当你处理大型文件或复杂的数据处理时,常常会遇到一个需求:将工作拆分给多个线程。例如,一个线程从文件读取每一行,其他线程——处理这些行(查找单词、统计等)。
问题是什么?
- 如果所有线程都访问同一个资源(例如文件或共享集合),很容易出错:一些线程可能“跑在前面”,从而出现竞态、数据丢失、内存过载。
- 如果线程很多而没有协调——有的会空闲,有的会过载。
我们需要一个解决方案,它:
- 允许在线程间安全地交换数据。
- 在处理速度慢于读取速度时避免内存被填满。
- 当数据用尽时,能轻松结束所有线程。
2. “Producer–Consumer” 模式(生产者–消费者)
Producer–Consumer 是一个经典模式,帮助线程协调工作、互不干扰。
该模式中有两个角色:Producer(生产者)创建数据——例如读取文件行或接收网络消息——并把它们放进共享队列。Consumer(消费者)从队列取出数据并处理:统计单词、保存到数据库或写入另一个文件。
核心思想是这两类线程相互独立地工作。生产者可能比消费者处理得快,或相反——但双方都不会彼此阻塞。它们之间有一个缓冲区——队列,用来平衡工作节奏。
可视化示意
[文件] --(读取)--> [Producer] --(入队)--> [BlockingQueue] --(取出)--> [Consumer] --(处理)
3. 使用 BlockingQueue 的实现
在 Java 中,为实现这样的交换,接口 BlockingQueue(例如实现 ArrayBlockingQueue)非常适合。
什么是 BlockingQueue?
BlockingQueue 是一种线程安全、带边界的队列,它自己处理同步。如果生产者尝试添加元素而队列已满,线程会自动阻塞并等待出现空位。类似地,如果消费者尝试取元素而队列为空,它就会等待,直到有人把东西放进队列。
这种机制能自动解决线程“抢跑”和内存溢出等经典问题:生产者不会向队列塞入过多元素,消费者也不会对“空无一物”进行处理。所有线程都能平稳、协同地完成工作。
创建队列的示例
import java.util.concurrent.*;
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100); // 缓冲区大小为 100 个元素
100 —— 队列中的最大元素数。如果 producer 更快,它会等待,直到 consumer 处理掉一部分数据。
4. 线程协调:backpressure 与结束流程
限制队列大小(backpressure)
Backpressure 是一种机制,当 consumer 处理不过来时,防止 producer “灌爆”内存。
- 如果队列已满,producer 会自动“减速”(方法 put() 会阻塞)。
- 如果队列为空,consumer 会等待(方法 take() 会阻塞)。
这使系统即使在不同线程速度下也能稳定运行。
结束流程:“毒丸” (poison pill)
当 producer 读完文件后,需要告知 consumers 数据已经结束,应该收尾退出。
解决方案:
- 向队列放入一个特殊对象——“毒丸”(例如字符串 "__END__" 或其他特殊值,但不是 null)。
- Consumer 收到“毒丸”后,就明白该退出了。
如果有多个 consumer,需要放入与 consumer 数量相同的“毒丸”!
5. 流水线示例:读取文件并处理行
让我们实现一个简单的流水线:
- 一个线程从文件读取行并把它们放入队列。
- 多个线程从队列取出行,计算单词数量并输出结果。
步骤 1. Producer —— 文件读取器
import java.io.*;
import java.util.concurrent.*;
public class FileProducer implements Runnable {
private final BlockingQueue<String> queue;
private final File file;
private final int consumerCount;
private final String POISON_PILL = "__END__";
public FileProducer(BlockingQueue<String> queue, File file, int consumerCount) {
this.queue = queue;
this.file = file;
this.consumerCount = consumerCount;
}
@Override
public void run() {
try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
String line;
while ((line = reader.readLine()) != null) {
queue.put(line); // 若队列已满 —— 等待
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
} finally {
// 为每个 consumer 放入一颗毒丸
try {
for (int i = 0; i < consumerCount; i++) {
queue.put(POISON_PILL);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
步骤 2. Consumer —— 行处理器
import java.util.concurrent.BlockingQueue;
public class LineConsumer implements Runnable {
private final BlockingQueue<String> queue;
private final String POISON_PILL = "__END__";
public LineConsumer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
String line = queue.take(); // 若队列为空 —— 等待
if (POISON_PILL.equals(line)) {
break; // 结束工作
}
int wordCount = line.trim().isEmpty() ? 0 : line.trim().split("\\s+").length;
System.out.println(Thread.currentThread().getName() + ": " + wordCount + " 个词在该行中: " + line);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
步骤 3. 通过 Executors 启动流水线
import java.io.File;
import java.util.concurrent.*;
public class PipelineDemo {
public static void main(String[] args) throws Exception {
int consumerCount = 3;
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
File file = new File("input.txt"); // 你的文件
// 启动 producer
Thread producer = new Thread(new FileProducer(queue, file, consumerCount));
producer.start();
// 通过 ExecutorService 启动 consumers
ExecutorService consumers = Executors.newFixedThreadPool(consumerCount);
for (int i = 0; i < consumerCount; i++) {
consumers.submit(new LineConsumer(queue));
}
// 等待 producer 结束
producer.join();
// 等待 consumers 结束
consumers.shutdown();
consumers.awaitTermination(1, TimeUnit.MINUTES);
System.out.println("处理完成!");
}
}
6. 可视化:流水线工作示意图
flowchart LR
A[Producer: 读取文件] -- put() --> Q[BlockingQueue]
Q -- take() --> C1[Consumer 1: 统计单词数]
Q -- take() --> C2[Consumer 2: 统计单词数]
Q -- take() --> C3[Consumer 3: 统计单词数]
A -.->|poison pill| Q
Q -.->|poison pill| C1
Q -.->|poison pill| C2
Q -.->|poison pill| C3
7. 有用的细节与最佳实践
- 限制队列大小 —— 这能防止内存溢出。
- 使用“毒丸”来结束 —— 否则 consumers 可能会一直挂起。
- 不要用 null 作为“毒丸”,如果队列中可能出现真实的 null 值。最好使用专用字符串或对象。
- 处理 InterruptedException —— 这对正确结束线程很重要。
- 使用 ExecutorService 管理线程池——比手动创建线程更简单、更安全。
8. 实现流水线时的常见错误
错误 1:无界队列 → OutOfMemoryError。 如果使用未限制大小的 LinkedBlockingQueue,当 consumer 跟不上时,producer 可能会“灌爆”内存。
错误 2:没有放入“毒丸” → consumers 挂起。 如果忘记放入“毒丸”,消费者线程将会一直等待新数据。
错误 3:只放入一个“毒丸”,但有多个 consumers。 每个消费者线程都需要自己的“毒丸”——否则不是所有线程都会结束。
错误 4:未处理 InterruptedException。 如果线程被中断,需要正确收尾(恢复中断标志),否则可能出现“挂死”的线程。
错误 5:在线程之间未同步地共享变量。 不要“造轮子”——使用 BlockingQueue,而不是普通列表或数组。
GO TO FULL VERSION