模式切换
高级并发编程
Java 内存模型(JMM)
主内存与工作内存
主内存:所有线程共享的内存区域,存储共享变量。
工作内存:每个线程独立的内存区域,存储线程私有的变量和共享变量的副本。
数据同步:
- 线程对共享变量的操作必须在工作内存中进行。
- 修改后的值需要刷新到主内存,其他线程才可能看到。
内存屏障(Memory Barrier)
作用:确保指令的执行顺序,防止指令重排序。
类型:
- Load Barrier:读屏障,确保读取操作之前的所有读取操作已完成。
- Store Barrier:写屏障,确保写入操作之前的所有写入操作已完成。
- Full Barrier:读写屏障,确保屏障之前的所有操作完成后,才能执行屏障之后的操作。
Happens-Before 规则
JMM 定义的一系列规则,用于描述操作之间的可见性和顺序性。
常见规则:
- 程序顺序规则:同一线程中的操作按程序顺序执行。
- 锁规则:解锁操作 Happens-Before 后续的加锁操作。
- volatile 变量规则:对 volatile 变量的写操作 Happens-Before 后续的读操作。
- 线程启动规则:线程的
start()
方法 Happens-Before 线程的run()
方法。 - 线程终止规则:线程的
run()
方法 Happens-Before 线程的join()
方法。
线程间通信
wait()、notify()、notifyAll() 的使用
wait()
:使当前线程进入等待状态,并释放锁。notify()
:唤醒一个等待的线程。notifyAll()
:唤醒所有等待的线程。
java
public class WaitNotifyExample {
private final Object lock = new Object();
private boolean condition = false;
public void waitForCondition() throws InterruptedException {
synchronized (lock) {
while (!condition) {
lock.wait();
}
System.out.println("Condition met");
}
}
public void setCondition() {
synchronized (lock) {
condition = true;
lock.notifyAll();
}
}
}
使用 Condition 实现线程间通信
使用 Condition
与 Lock
配合使用,提供更灵活的线程等待和唤醒机制。
java
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionExample {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private boolean ready = false;
public void waitForReady() throws InterruptedException {
lock.lock();
try {
while (!ready) {
condition.await();
}
System.out.println("Ready");
} finally {
lock.unlock();
}
}
public void setReady() {
lock.lock();
try {
ready = true;
condition.signalAll();
} finally {
lock.unlock();
}
}
}
Fork/Join 框架
分治算法的实现
分治算法是将一个大任务拆分为多个小人物,分别处理后再合并结果。
分治思想适合处理可以递归分解的任务,如排序、搜索等。
ForkJoinPool 的使用
- ForkJoinPool:专门用于执行分治任务的线程池。
- RecursiveTask:用于有返回值的任务。
- RecursiveAction:用于无返回值的任务。
java
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;
public class ForkJoinExample extends RecursiveTask<Long> {
private final long[] array;
private final int start, end;
public ForkJoinExample(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= 100) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
int mid = (start + end) / 2;
ForkJoinExample left = new ForkJoinExample(array, start, mid);
ForkJoinExample right = new ForkJoinExample(array, mid, end);
left.fork();
return right.compute() + left.join();
}
}
public static void main(String[] args) {
long[] array = new long[1000];
for (int i = 0; i < array.length; i++) {
array[i] = i + 1;
}
ForkJoinPool pool = new ForkJoinPool();
long result = pool.invoke(new ForkJoinExample(array, 0, array.length));
System.out.println("Sum: " + result);
}
}
CompletableFuture
异步编程
CompletableFuture
用于编写异步、非阻塞的代码。
java
import java.util.concurrent.CompletableFuture;
public class CompletableFutureExample {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> "Hello")
.thenApplyAsync(s -> s + " World")
.thenAcceptAsync(System.out::println)
.join();
}
}
链式调用与组合操作
- 链式调用:通过
thenApply()
、thenAccept()
等方法实现任务链。 - 组合操作:通过
thenCombine()
、allOf()
等方法组合多个CompletableFuture
。
java
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2)
.thenAccept(System.out::println)
.join();
并发设计模式
生产者-消费者模式
生产者生成数据并放入缓冲区,消费者从缓冲区取出数据。
java
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ProducerConsumerExample {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
queue.put(i);
System.out.println("Produced: " + i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
int value = queue.take();
System.out.println("Consumed: " + value);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
线程池模式
通过线程池管理线程资源,避免频繁创建和销毁线程。
java
ExecutorService executor = Executors.newFixedThreadPool(4);
executor.execute(() -> System.out.println("Task executed"));
executor.shutdown();
Future 模式
通过 Future
获取异步任务的执行结果。
java
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(() -> {
Thread.sleep(1000);
return 42;
});
System.out.println("Result: " + future.get());
executor.shutdown();
总结
- Java 内存模型(JMM)定义了多线程环境下的内存可见性和顺序性。
- 线程间通信可以通过
wait()
、notify()
或Condition
实现。 Fork/Join
框架用于处理分治任务。CompletableFuture
提供了强大的异步编程能力。- 并发设计模式(如生产者-消费者、线程池、
Future
)是解决常见并发问题的有效方法。