并发编程是指在多线程环境下运行代码,以便多个任务能够同时执行。在多核处理器时代,并发编程能够显著提高应用程序的性能和响应能力。然而,并发编程也带来了数据竞争、死锁、资源共享等复杂问题。如果处理不当,这些问题可能导致程序行为不确定、数据损坏,甚至系统崩溃。
1. 并发问题的基本概念
1.1 数据竞争
数据竞争发生在多个线程同时读写同一内存位置,而至少一个操作是写操作。如果没有适当的同步机制,不同线程的操作可能会相互覆盖,导致数据不一致。
1.2 死锁
死锁是指两个或多个线程相互等待对方持有的资源,导致这些线程无法继续执行,从而使程序进入僵局。死锁通常发生在多个线程同时竞争多个资源时。
1.3 饥饿和活锁
饥饿:某个线程长时间得不到所需的资源,导致它无法继续执行。活锁:线程不断变换状态,但无法推进,类似于死锁,但不同的是活锁中的线程总是在做出进展的尝试。
2. 解决并发问题的常见技术
2.1 使用锁机制
锁机制是解决数据竞争的最常见方法。锁通过确保在任一时刻,只有一个线程能够访问共享资源,从而避免数据不一致问题。
互斥锁(Mutex):在Java中,synchronized关键字和ReentrantLock类是实现互斥锁的主要方式。synchronized用于保护代码块或方法,确保在同一时间只有一个线程执行代码块或方法。ReentrantLock提供了更灵活的锁定机制。
public class Counter {
private int count = 0;
private final Object lock = new Object();
public void increment() {
synchronized (lock) {
count++;
}
}
public int getCount() {
synchronized (lock) {
return count;
}
}
}
读写锁(ReadWriteLock):ReadWriteLock允许多个线程同时读取共享资源,但在写入时,只有一个线程能够持有写锁,其他线程都被阻塞。这种锁机制提高了读操作多、写操作少的场景下的并发性能。
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteCounter {
private int count = 0;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
public void increment() {
lock.writeLock().lock();
try {
count++;
} finally {
lock.writeLock().unlock();
}
}
public int getCount() {
lock.readLock().lock();
try {
return count;
} finally {
lock.readLock().unlock();
}
}
}
2.2 无锁编程(Lock-Free Programming)
无锁编程使用原子操作来管理并发,避免了锁竞争,提高了并发性能。在Java中,java.util.concurrent.atomic包提供了一些原子类,如AtomicInteger、AtomicLong、AtomicReference等,用于无锁的线程安全操作。
原子操作:原子操作是不可分割的操作,即使在多线程环境下也能够保证操作的完整性。例如,AtomicInteger类提供了incrementAndGet()方法,可以在没有锁的情况下安全地递增整数值。
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicCounter {
private final AtomicInteger count = new AtomicInteger(0);
public void increment() {
count.incrementAndGet();
}
public int getCount() {
return count.get();
}
}
2.3 线程协调(Thread Coordination)
线程之间的协调是指控制线程之间的执行顺序或依赖关系,以避免数据竞争和死锁。
条件变量(Condition Variables):Condition接口配合ReentrantLock使用,用于实现线程之间的等待/通知机制。它比传统的Object.wait()和Object.notify()更灵活。
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedBuffer {
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
private final Object[] items = new Object[100];
private int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
信号量(Semaphore):Semaphore用于限制同时访问某一资源的线程数量。例如,它可以用于实现连接池,限制同时访问的连接数。
import java.util.concurrent.Semaphore;
public class ConnectionPool {
private final Semaphore semaphore;
public ConnectionPool(int poolSize) {
semaphore = new Semaphore(poolSize);
}
public void connect() throws InterruptedException {
semaphore.acquire(); // 获取一个许可
try {
// 执行连接操作
} finally {
semaphore.release(); // 释放许可
}
}
}
栅栏(Barrier):CyclicBarrier允许一组线程等待彼此到达某个公共的屏障点后再继续执行。这在需要同步启动多个线程或协调阶段性任务时非常有用。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class BarrierExample {
private static final int THREAD_COUNT = 3;
private static final CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT,
() -> System.out.println("All threads reached the barrier"));
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(new Task()).start();
}
}
static class Task implements Runnable {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " is waiting at the barrier");
barrier.await();
System.out.println(Thread.currentThread().getName() + " passed the barrier");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
2.4 不变性(Immutability)
不可变对象是指一旦创建后,其状态就不能再改变的对象。在并发环境中使用不可变对象,可以避免数据竞争,因为多个线程可以安全地共享不可变对象而无需同步。
Java中的不可变类:例如String、Integer等Java标准类都是不可变的。你可以通过final关键字和只读属性来创建自己的不可变类。
public final class ImmutablePoint {
private final int x;
private final int y;
public ImmutablePoint(int x, int y) {
this.x = x;
this.y = y;
}
public int getX() {
return x;
}
public int getY() {
return y;
}
}
2.5 并发容器(Concurrent Collections)
Java 提供了一组线程安全的集合类,这些集合类在内部实现了高效的并发控制,减少了开发者手动管理同步的复杂性。
ConcurrentHashMap:一个线程安全的哈希表,适用于多线程环境下的键值对存储。
CopyOnWriteArrayList:一个线程安全的List,在写操作时会复制底层数组,因此适用于读操作多、写操作少的场景。
BlockingQueue:线程安全的队列,提供阻塞的put()和take()操作,用于生产者-消费者模型。
3. 总结
加锁:使用锁机制可以限制同一时间只有一个线程访问共享资源,可以使用互斥锁、读写锁、自旋锁等不同类型的锁来控制并发访问。
使用线程池:线程池可以有效控制并发线程的数量,通过设置合适的线程池大小来平衡资源占用和并发性能。
使用原子操作:原子操作是不可被中断的操作,保证了操作的完整性,可以避免多个线程同时对同一变量进行修改造成的并发问题。
使用并发容器:Java中提供了一些并发容器,如ConcurrentHashMap、ConcurrentLinkedQueue等,它们可以在并发环境下安全地进行读写操作。
避免共享资源:尽量避免多个线程共享同一资源,如果无法避免,可以通过复制资源、分片资源等方式来减少并发访问的冲突。
合理设计数据结构:选择合适的数据结构可以减少并发访问的冲突,如使用线程安全的数据结构或使用无锁的数据结构等。
合理设置并发级别:根据具体的应用场景,合理设置并发级别,避免过高的并发级别导致性能下降或过低的并发级别导致资源浪费。