在 Java IO 中,管道(Pipe)是一种用于在同一 JVM 中不同线程之间进行通信的机制,它可以实现线程之间的数据传输。因此,管道也可以是数据的来源或目的地。如下图:

注意:Java IO 中,你不能使用管道与不同 JVM(不同进程)中的线程通信。Java 中的管道概念不同于 Unix / Linux 中的管道概念,在 Unix / Linux 中,运行在不同地址空间的两个进程可以通过管道进行通信。在 Java 中,通信双方必须运行在同一个进程中(即同一个 JVM),而且应该是不同的线程。
使用 Java IO 创建管道是通过 PipedOutputStream 和 PipedInputStream 类完成的。一个 PipedInputStream 应连接到一个 PipedOutputStream。一个线程写入 PipedOutputStream 的数据可由另一个线程从连接的 PipedInputStream 读取。
下面是一个如何将管道输入流连接到管道输出流的简单示例:
package com.hxstrive.java_io.demo01;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
/**
* 管道流示例
* @author hxstrive.com
*/
public class PipeDemo1 {
public static void main(String[] args) throws IOException {
// 创建管道流对象
final PipedOutputStream output = new PipedOutputStream();
final PipedInputStream input = new PipedInputStream(output);
// 线程1,向管道中写入数据
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
try {
output.write("Hello world, pipe!".getBytes());
output.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
// 线程2,从管道中读取数据
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
try {
int data = input.read();
while(data != -1){
System.out.print((char) data);
data = input.read();
}
// 输出:Hello world, pipe!
input.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
// 启动线程
thread1.start();
thread2.start();
}
}你还可以使用两个管道流的 connect() 方法将它们连接起来。PipedInputStream 和 PipedOutputStream 都有一个 connect() 方法,可以将其中一个连接到另一个。例如:
// 创建管道流对象 final PipedOutputStream output = new PipedOutputStream(); final PipedInputStream input = new PipedInputStream(); input.connect(output);
或者
// 创建管道流对象 final PipedOutputStream output = new PipedOutputStream(); final PipedInputStream input = new PipedInputStream(); output.connect(input);
管道是线程间通信的工具,用于实现不同线程之间的数据传输和协作。
一定要记住,在使用两个连接的管道流时,应将一个流传递给一个线程,另一个流传递给另一个线程。管道流上的 read() 和 write() 调用是阻塞性的,这意味着如果尝试使用同一线程同时调用 read() 和 write(),可能会导致线程自身陷入死锁。
以下是管道和线程的关系:
(1)管道是线程间通信的桥梁:管道允许一个线程(生产者)向另一个线程(消费者)发送数据,实现线程间的同步或异步通信。例如,在生产者-消费者模型中,生产者线程将数据写入管道,消费者线程从管道读取数据,两者通过管道解耦。
(2)传统 I/O 管道与线程的协作:传统管道(PipedInputStream/PipedOutputStream)的读写操作是阻塞的。当管道中无数据时,读操作会阻塞,直到有数据写入。当管道已满时,写操作会阻塞,直到数据被读取。注意:管道本身是线程安全的,但需要确保读写操作由不同线程完成,否则可能导致死锁(如同一线程同时读写)。
(3)NIO 管道与线程的高效协作:NIO 管道(Pipe)通过通道(SinkChannel/SourceChannel)和缓冲区(ByteBuffer)支持非阻塞操作。注意,可与选择器(Selector)配合,在单个线程中管理多个管道的读写,适合高并发场景。NIO 的内容将在 Java NIO 教程详细介绍
在同一个 JVM 中,线程可以通过管道以外的许多其他方式进行通信。例如:
BlockingQueue 是 Java 并发包中的一部分,它能在线程间安全地传递数据。BlockingQueue 提供了阻塞操作,在队列为空时会阻塞取操作,在队列满时会阻塞放操作。
示例代码:
package com.hxstrive.java_io.pipe;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* BlockingQueue示例
* @author hxstrive.com
*/
public class BlockingQueueExample {
public static void main(String[] args) {
// 用于线程间通信
final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
// 线程1
new Thread(() -> {
try {
// 发送数据
for (int i = 0; i < 10; i++) {
queue.put(i);
System.out.println("Produced: " + i);
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// 线程2
new Thread(() -> {
try {
// 接收数据
while (true) {
Integer item = queue.take();
System.out.println("Consumed: " + item);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}运行结果如下:
Produced: 0 Consumed: 0 Produced: 1 ... Consumed: 8 Produced: 9 Consumed: 9
SynchronousQueue 是一种特殊的 BlockingQueue,它的容量为零,每个插入操作必须等待另一个线程的移除操作,反之亦然。这在需要线程间同步数据交换的场景下很有用。
示例代码:
package com.hxstrive.java_io.pipe;
import java.util.concurrent.SynchronousQueue;
/**
* SynchronousQueue示例
* @author hxstrive.com
*/
public class SynchronousQueueExample {
public static void main(String[] args) {
SynchronousQueue<Integer> queue = new SynchronousQueue<>();
// 线程1
new Thread(() -> {
try {
// 生产数据
for (int i = 0; i < 10; i++) {
queue.put(i);
System.out.println("Produced: " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// 线程2
new Thread(() -> {
try {
// 消费数据
for (int i = 0; i < 10; i++) {
Integer item = queue.take();
System.out.println("Consumed: " + item);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}CompletableFuture 可用于异步编程,能在线程间传递计算结果。
示例代码:
package com.hxstrive.java_io.pipe;
import java.util.concurrent.CompletableFuture;
public class CompletableFutureExample {
public static void main(String[] args) {
// 利用 CompletableFuture 实现了异步任务的执行,并在任务完成后处理其结果。
// CompletableFuture 是 Java 8 引入的一个强大工具,用于简化异步编程。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000); // 模拟耗时
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Hello, World!";
});
// thenAccept 是 CompletableFuture 的一个方法,用于在异步任务完成后执行一个操作
future.thenAccept(result -> System.out.println("Received: " + result));
// 输出:
// Received: Hello, World!
// 等待任务完成
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}事实上,线程更经常交换的是完整对象,而不是原始字节数据。但是,如果需要在线程间交换原始字节数据,Java IO 的管道也是一种可能的方式。