基本概念

同步和异步

用来形容方法调用

  • 同步方法调用:方法调用开始后,调用者必须等到方法返回后才能继续后续的操作
  • 异步方法调用:方法调用开始后,异步方法会在另一个线程中执行,调用者可以继续其他操作。当异步方法有返回值时,会在方法调用结束后返回。

例子:
同步调用就像我们在实体店买东西,我们要自己亲自去实体店,买完东西后我们要亲自把东西带回来,而异步就像网购,我们在完成网上支付时买东西这个事件就已经结束了,我们可以去做其他的一些事情,而卖方会将东西给我们送过来。

并发和并行

  • 并发:多个任务交替执行
  • 并行:多个任务同时执行

临界区

共享数据,可以被多个线程使用,但一次只能有一个线程用。

阻塞和非阻塞

形容多线程间的相互影响。

  • 阻塞:一个线程访问了临界区,其他需要这个临界区资源的线程必须等待。

  • 非阻塞:没有一个线程可以妨碍其他线程的执行,所有线程都会不断的向前执行。

死锁、饥饿和活锁

  • 死锁:两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象 。
  • 饥饿:一个或者多个线程因为种种原因(例如优先级)无法获得所需要的资源,导致一直无法执行。
  • 活锁:获取不到资源就释放已有资源,一直尝试-失败-尝试-失败。
    • 例子: 走路时对面堵着一个人,我向左走想避开他,他向右走想避开我,这样我们就有堵在一起了。

并发级别

  • 阻塞:其他线程释放资源前,当前线程无法执行
  • 无饥饿:线程的调度是公平的,先来先调度,后来后调度,不存在饥饿的现象
  • 无障碍:所有线程无需等待可以一起进入临界区,一旦临界区的数据遭到破坏,所有线程会回滚,保证数据的安全。
  • 无锁:无锁是无障碍的一种,但是无锁保证必然有一个线程能够执行成功
  • 无等待:所有的线程都必须在有限步内完成
    • 例子:RCU(Read-Copy-Update)对数据的读可以不加控制。因此,所有的读线程都是无等待的,它们既不会被锁定等待也不会引起任何冲突。 但在写数据的时候,先取得原始数据的副本,接着只修改副本数据(这就是为什么读可以不加控制),修改完成后,在合适的时机回写数据。

多线程的原子性、可见性和有序性

  • 原子性:一个操作不可中断
  • 可见性:一个线程修改了一个共享变量的值,其他线程能否立即知道这个修改。
    • 缓存优化、硬件优化、指令重排等都可能导致可见性问题
  • 有序性:指令重排可能导致多线程下先写的代码后运行

Happen-Before原则

指令不能重排的原则:

  • 程序顺序原则∶ 一个线程内保证语义的串行性

  • volatile规则∶volatile变量的写,先发生于读,这保证了volatile变量的可见性
  • 锁规则∶解锁(unlock)必然发生在随后的加锁(lock)前
  • 传递性∶ A先于B,B先于C,那么 A 必然先于 C
  • 线程的 start()方法先于它的每一个动作

  • 线程的所有操作先于线程的终结(Thread.join())
  • 线程的中断(interrupt())先于被中断线程的代码
  • 对象的构造函数执行、结束先于 finalize()方法

并行程序基础

线程的状态

Thread中的 State 枚举中定义

1
2
3
4
5
6
7
8
public enum State{
NEW,
RUNNABLE,
BLOCKED,
WAITING, //无限时间等待
TIMED_WAITING, //有限时间等待
TERMINATED;
}

线程的基本操作

新建线程

new一个Thread对象,然后start()就可以新建一个线程了。

1
2
Thread t1=new Thread();
t1.start();

start()之后会调用run()函数,我们需要在run函数中实现自己想要的功能。

  • 继承Thread类(这里是匿名内部类)重载run
1
2
3
4
5
6
7
Thread t1=new Thread() {
@Override
public void run () {
System.out.println("Hello,I am t1");
}
};
t1.start ();
  • 实现Runnable接口
1
2
3
4
5
6
7
8
9
10
11
12
13
public class CreateThread3 implements Runnable{
public static void main(String[] args){
Thread t1=new Thread(new CreateThread3());
t1.start();
}

@Override
public void run (){
System.out.println("Oh,I am Runnable");
}
}


终止线程

用标记变量来决定是否退出:设置个stopme的boolean,为true就break

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static class ChangeObjectThread extends Thread{
volatile boolean stopme = false;

public void stopMe() {
stopme = true;
}
@override
public void run () {
while (true){
if (stopme) {
System.out.println("exit by stop me");
break;
}
synchronized (u){
int v=(int)(System.currentTimeMillis()/ 1000);
u.setId(v);
//oh,do sth. else
try{
Thread.sleep(100);
}catch(InterruptedException e){
e.printStackTrace();
}
u.setName(String.valueOf(v));
}
Thread.yield();
}
}
}

中断线程

1
2
3
public void Thread.interrupt() //中断线程
public boolean Thread.isInterrupted() //判断线程是否中断
public static boolean Thread.interrupted() //判断是否中断,并清除中断状态

例子:通过Thread.isInterrupted()l来在run函数里写中断时做的操作,这样中断Thread.interrupt()调用时才有作用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[]args)throws InterruptedException {
Thread t1=new Thread() {
@override
public void run () {
if(Thread.currentThread().isInterrupted()){
//do something
}
Thread.yield();
}
};
t1.start();
Thread.sleep (2000);
t1.interrupt ();
}

wait和notify

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

package ch2;

public class SimpleWN {
final static Object object = new Object();

public static class T1 extends Thread {


public void run() {
synchronized (object) {//获取object锁
System.out.println(System.currentTimeMillis() + "T1");
try {
System.out.println(System.currentTimeMillis() + "T1 wait for object");
object.wait();//等待并释放object锁
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis() + "T1 end");
}
}
}

public static class T2 extends Thread {


public void run() {
synchronized (object) {//获取锁
System.out.println(System.currentTimeMillis() + "T2");
object.notify();//通知T1
System.out.println(System.currentTimeMillis() + "T2 end");
try {
Thread.sleep(2000);//休眠两秒,两秒后才释放锁,T1才能继续操作
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

public static void main(String[] args) {
Thread t1 = new T1();
Thread t2 = new T2();
t1.start();
t2.start();
}
}

输出

1
2
3
4
5
T1 start!
T1 wait for object
T2 start! notify one thread
T2 end!
T1 end!

wait会释放掉锁,sleep不会。notify之后T1线程要获得锁才能继续执行

wait是object的方法,sleep是Thread的方法

join和yield

线程a调用线程b的join函数,则a会等待b线程结束才继续执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package ch2;

public class JoinMain {
public static volatile int i = 0;

public static class AddThread extends Thread{


public void run() {
for(i=0;i<10000000;i++){
}
}
}

public static void main(String[] args) throws InterruptedException {
AddThread at = new AddThread();
at.start();
at.join();
System.out.println(i);
}
}

yield声明了当前线程已经完成生命周期中最重要的部分,可以切换给其他线程来执行。让出CPU重新参与资源竞争。

Daemon

守护线程,当所有用户线程结束时,程序终止,杀死所有守护线程。

volatile

读取volatile类型的变量时总会返回最新写入的值

  • 不会指令重排(保证有序性)
  • 变量不会被缓存到寄存器或其他处理器不可见的地方(保证可见性)

对非 volatile 变量进行读写的时候,每个线程先从内存拷贝变量到CPU缓存中。如果计算机有多个CPU,每个线程可能在不同的CPU上被处理,这意味着每个线程可以拷贝到不同的 CPU cache 中。而声明变量是 volatile 的,JVM 保证了每次读变量都从内存中读,跳过 CPU cache 这一步

性能:读性能消耗与普通变量几乎相同,但是写操作稍慢,因为它需要在本地代码中插入许多内存屏障指令来保证处理器不发生乱序执行

volatile不保证原子性,所以对于十个线程做i++这种操作,结果会不准确:Java中只有对基本类型变量的赋值和读取是原子操作,如i = 1的赋值操作,但是像j = i或者i++这样的操作都不是原子操作,因为他们都进行了多次原子操作,比如先读取i的值,再将i的值赋值给j,两个原子操作加起来就不是原子操作了。

  1. 线程读取i
  2. temp = i + 1
  3. i = temp

当 i=5 的时候,A和B两个线程同时读入了 i 的值, 然后A线程执行了temp = i + 1的操作,要注意,此时的 i 的值还没有变化,然后B线程也执行了temp = i + 1的操作,注意,此时A,B两个线程保存的 i 的值都是5,temp 的值都是6, 然后A线程执行了 i = temp 的操作,此时i的值会立即刷新到主存并通知其他线程保存的 i 值失效, 此时B线程需要重新读取 i 的值。那么此时B线程保存的 i 就是6,同时B线程保存的 temp 还仍然是6, 然后B线程执行 i=temp ,所以导致了计算结果比预期少了1。

volatile和synchronized的区别

  • volatile告诉jvm每次从主存中取值,非阻塞;synchronized加锁,只有当前线程可以访问,阻塞
  • volatile只能变量级,synchronized可以变量和方法
  • volatile不保证原子性,synchronized保证原子性

synchronized

简述一下

synchronized关键字解决的是多个线程之间访问资源的同步性,synchronized 关键字可以保证被它修饰的方法或者代码块在任意时刻只能有一个线程执行。

在Java 早期版本中,synchronized属于重量级锁,效率低下,因为监视器锁(monitor)是依赖于底层的操作系统的Mutex Lock来实现的,Java 的线程是映射到操作系统的原生线程之上的。如果要挂起或者唤醒一个线程, 都需要操作系统帮忙完成,而操作系统实现线程之间的切换时需要从用户态转换到内核态,这个状态之间的转换需要相对比较长的时间,时间成本相对较高,这也是为什么早期的synchronized 效率低的原因。庆幸的是在Java 6之后Java 官方对从JVM层面对synchronized 较大优化,所以现在的synchronized 锁效率也优化得很不错了。JDK1.6对锁的实现引入了大量的优化,如自旋锁、适应性自旋锁、锁消除、锁粗化、偏向锁、轻量级锁等技术来减少锁操作的开销。

如何使用

synchronized关键字最主要的三种使用方式:

  • 普通方法,只对当前实例对象加锁
  • 静态方法,对类的所有对象加锁
  • 方法块
    • this :对当前实例对象加锁
    • Classname.class:对类的所有对象加锁

具体例子、底层原理看这里

ArrayList线程不安全

  • 添加元素的add方法不是原子性操作

    add方法分成两步:1、加入元素 2、移动标志位

    有A、B两条线程,A线程执行到array[flag] = 1后还未移动标志位,此时线程B执行array[flag] = 2;将线程A添加的1覆盖,此后A、B线程相继执行 flag+=1; A、B线程执行完后数组变为array = {2,0,0} 与预期{1,2}不符

  • 数组扩容可能会发生数组越界

    在调用add()方法前要先判断数组容量,若数组已满则需要扩容,然后再执行添加操作。假设有A、B两个线程,当A线程要往数组大小为10的Arraylist中添加第10个元素时完成判断不需扩容,正要执行添加操作,B线程执行完成判断不需扩容 接着添加元素,移动标志位 flag = 10;A线程继续执行使用标志位flag = 10,从而产生数组越界的错误

1
2
3
4
5
6
//源码
public boolean add(E e) {
ensureCapacityInternal(size + 1); // Increments modCount!!
elementData[size++] = e;
return true;
}

解决方案:适用Vector类,是线程安全的,方法加了synchronized关键字。

ConcurrentHashMap实现原理

HashMap和ConcurrentHashMap看这里

AQS

AbstractQuenedSynchronizer抽象的队列式同步器。是除了java自带的synchronized关键字之外的锁机制。

AQS的核心思想

  • 资源空闲,加锁
  • 资源不空闲,线程加入CLH队列中等待

CLH队列是一个虚拟的双向队列,虚拟的双向队列即不存在队列实例,仅存在节点之间的关联关系。AQS是将每一条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node),来实现锁的分配。

AQS就是基于CLH队列,用volatile修饰共享变量state,线程通过CAS去改变状态符,成功则获取锁成功,失败则进入等待队列,等待被唤醒。

注意:AQS是自旋锁:在等待唤醒的时候,经常会使用自旋(while(!cas()))的方式,不停地尝试获取锁,直到被其他线程获取成功

AQS 定义了两种资源共享方式:

  1. Exclusive:独占,只有一个线程能执行,如ReentrantLock
  2. Share:共享,多个线程可以同时执行,如Semaphore、CountDownLatch、ReadWriteLock,CyclicBarrier

ReentrantLock为例,(可重入独占式锁):state初始化为0,表示未锁定状态,A线程lock()时,会调用tryAcquire()独占锁并将state+1.之后其他线程再想tryAcquire的时候就会失败,直到A线程unlock()到state=0为止,其他线程才有机会获取该锁。A释放锁之前,自己也是可以重复获取此锁(state累加),这就是可重入的概念。
注意:获取多少次锁就要释放多少次锁,保证state是能回到零态的。

JUC并发包

java.util.concurrent

ReentrantLock和synchronized

java.util.concurrent.locks.ReentrantLock

1
2
3
public static ReentrantLock lock=new ReentrantLock();
lock.lock();
lock.unlock();

中断:为了解决死锁问题,可以中断ReentrantLock。使用lockInterruptibly申请锁,这是一个可以对中断进行响应的锁申请动作。

等待tryLock(num,TimeUnit.SECONDS)可以实现限时等待锁。超过num秒还没得到锁就返回false

可重入:一个线程可以多次申请加锁,只要解锁的时候次数与加锁的次数相等即可。

公平锁:先申请锁的线程先得到锁。构造函数fair可以设定ReentrantLock是公平锁还是非公平锁。

Condition:用来暂停和唤醒锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
 public class ReentrantLockCondition implements Runnable{
public static ReentrantLock lock=new ReentrantLock();
public static Condition condition = lock.newCondition();//生成与lock绑定的Condition对象
@Override
public void run () {
try{
lock.lock();
condition.await (); //等待
System.out.println("Thread is going on");
} catch (InterruptedException e) {
e.printStackTrace () ;
}finally{
lock. unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ReenterLockCondition tl=new ReenterLockCondition();
Thread t1=new Thread(tl);
t1.start();
Thread.sleep (2000);//通知线程 t1 继续执行
lock.lock();
condition.signal(); //唤醒继续执行
lock.unlock();
}
}
  • 实现:synchronized由JVM实现,ReentrantLock由JDK实现
  • 可中断:synchronized不可中断,ReentrantLock可中断
  • 公平锁:synchronized非公平锁,ReentrantLock默认非公平,也可以做公平锁。
  • 等待和唤醒:synchronized使用Object.wait()notify(),ReentrantLock使用Condition.await()signal().另外一个ReentrantLock锁可以绑定多个Condition对象
  • 性能上两者差不多,synchronized做了很多优化
  • 都是可重入锁

信号量Semaphore

synchronized和ReentrantLock同时都只能有一个线程访问一个资源,信号量可以控制多个线程访问一个资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package chapter3;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
* 信号量
*/
public class SemapDemo implements Runnable {
final Semaphore semp = new Semaphore(5); //控制只有5个线程可以同时访问资源

@Override
public void run() {
try {
semp.acquire(); //申请获得许可
Thread.sleep(2000);
System.out.println(Thread.currentThread().getId() + ":done!");
semp.release(); //释放许可
} catch (InterruptedException e) {
e.printStackTrace();
}
}

/**
* 总共20个线程,系统会以5个线程一组为单位,依次执行并输出
*/
public static void main(String args[]) {
ExecutorService executorService = Executors.newFixedThreadPool(20);
final SemapDemo demo = new SemapDemo();
for (int i = 0; i < 20; i++) {
executorService.submit(demo);
}
}
}

读写锁ReadWriteLock

读读之间不阻塞,读写、写写之间阻塞。提升系统效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package chapter3;

import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* Created by 13 on 2017/5/5.
*/
public class ReadWriteLockDemo {
private static Lock lock = new ReentrantLock();
private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
private static Lock readLock = reentrantReadWriteLock.readLock();
private static Lock writeLock = reentrantReadWriteLock.writeLock();
private int value;

public Object handleRead(Lock lock) throws InterruptedException {
try {
lock.lock();
Thread.sleep(1000);//模拟读操作
System.out.println("读操作:" + value);
return value;
} finally {
lock.unlock();
}
}

public void handleWrite(Lock lock, int index) throws InterruptedException {
try {
lock.lock();
Thread.sleep(1000);//模拟写操作
System.out.println("写操作:" + value);
value = index;
} finally {
lock.unlock();
}
}

public static void main(String args[]) {
final ReadWriteLockDemo demo = new ReadWriteLockDemo();

Runnable readRunnable = new Runnable() {
@Override
public void run() {
//分别使用两种锁来运行,性能差别很直观的就体现出来,使用读写锁后读操作可以并行,节省了大量时间
try {
demo.handleRead(readLock);
//demo.handleRead(lock);
} catch (InterruptedException e) {
e.printStackTrace();
}

}
};
Runnable writeRunnable = new Runnable() {
@Override
public void run() {
//分别使用两种锁来运行,性能差别很直观的就体现出来
try {
demo.handleWrite(writeLock, new Random().nextInt(100));
//demo.handleWrite(lock, new Random().nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 18; i++) {
new Thread(readRunnable).start();
}
for (int i = 18; i < 20; i++) {
new Thread(writeRunnable).start();
}
}
}

倒计时器CountDownLatch

用一个整数n初始化,当n个线程执行完后才执行主线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package chapter3;

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchDemo implements Runnable {
static final CountDownLatch end = new CountDownLatch(10);//10个线程完成才执行主线程
static final CountDownLatchDemo demo = new CountDownLatchDemo();

@Override
public void run() {

try {
Thread.sleep(new Random().nextInt(3) * 1000);
System.out.println("check complete");
end.countDown();//完成一个线程就计数-1
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String args[]) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executorService.submit(demo);
}
//等待检查
end.await();
//发射火箭
System.out.println("Fire!");
executorService.shutdown();
}
}

循环栅栏CyclicBarrier

和倒计时器的区别在于,可以重复使用,而且可以定义多个线程完成后做一个动作。

1
2
3
4
5
6
7
//简单版
CyclicBarrier cyclicBarrier = new CyclicBarrier(N, new BarrierRun(flag, N));//N个线程

private final CyclicBarrier cyclicBarrier;
cyclicBarrier.await();//等待所有线程就绪
doWork();
cyclicBarrier.await();//等待所有线程执行完dowork()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package chapter3;

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
* Created by 13 on 2017/5/4.
*/
public class CyclicBarrierDemo {
public static class Soldier implements Runnable {
private String soldier;
private final CyclicBarrier cyclicBarrier;

public Soldier(CyclicBarrier cyclicBarrier, String soldier) {
this.soldier = soldier;
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void run() {
try {
cyclicBarrier.await();
doWork();
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}

}

void doWork() {
try {
Thread.sleep(Math.abs(new Random().nextInt() % 10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(soldier + ":任务完成");
}
}

public static class BarrierRun implements Runnable {

boolean flag;
int N;

public BarrierRun(boolean flag, int N) {
this.flag = flag;
this.N = N;
}

@Override
public void run() {
if (flag) {
System.out.println("司令:[士兵" + N + "个,任务完成!");
} else {
System.out.println("司令:[士兵" + N + "个,集合完毕!");
flag = true;
}
}
}


public static void main(String args[]) {
final int N = 10;
Thread[] allSoldier = new Thread[N];
boolean flag = false;
CyclicBarrier cyclicBarrier = new CyclicBarrier(N, new BarrierRun(flag, N));
System.out.println("集合队伍!");
for (int i = 0; i < N; i++) {
System.out.println("士兵" + i + "报道!");
allSoldier[i] = new Thread(new Soldier(cyclicBarrier, "士兵" + i));
allSoldier[i].start();
}
}
}

线程池

基本概念

什么是线程池?

就是线程复用。创建线程变为从线程池里拿空闲线程,销毁线程变为向线程池里归还线程

为什么要用线程池?

减少资源消耗,提高响应速度(少了创建线程的时间),方便管理

框架

四种常用构造方法

  • newFixedThreadPool:创建固定线程数量的线程池

  • newSingleThreadExecutor:创建只有一个线程的线程池

  • newCachedThreadPool:创建根据实际情况调整线程数量的线程池

  • newScheduledThreadPool:线程在指定时间执行

    • scheduleAtFixedRate():从上次开始到下次开始

    • scheduleWithFixedDelay()从上次结束到下次开始

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//newFixedThreadPool
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolDemo {
public static class MyTask implements Runnable {
public void run() {
System.out.println(System.currentTimeMillis() + " Thread id:" + Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

// 前5个线程执行时间和后5个比差了1秒钟
public static void main(String[] args) {
MyTask task = new MyTask();
ExecutorService es = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
es.submit(task);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//scheduleAtFixedRate
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledExecutorServiceDemo {
public static void main(String[] args) {
ScheduledExecutorService es = Executors.newScheduledThreadPool(10);
es.scheduleAtFixedRate(new Runnable() {

public void run() {
try {
Thread.sleep(1000);
System.out.println(System.currentTimeMillis() / 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 0, 2, TimeUnit.SECONDS);
}
}

构造方法的内部实现

前三种方法newFixedThreadPoolnewSingleThreadExecutornewCachedThreadPool都是由ThreadPoolExcutor构造函数的不同参数来实现的。

1
2
3
4
5
6
7
8
9
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
  • corePoolSize:线程池维护线程的最少数量
  • maximumPoolSize:线程池维护线程的最大数量
  • keepAliveTime:超过corePoolSize的空闲线程的存活时间
  • unit:线程池维护线程所允许的空闲时间的单位
  • workQueue:线程池所使用的缓冲队列
  • handler:线程池对拒绝任务的处理策略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

由于newFixedThreadPool采用了LinkedBlockingQueue,这是无界队列,所以实际线程队列永远维持在nThreads,所以maximumPoolSize和keepAliveTime无效。

由于newCachedThreadPool采用了SynchronousQueue,这个阻塞队列没有存储空间,一有请求就使用空闲线程或申请新的线程执行任务

BlockingQueue阻塞队列

四种:直接提交,有界,无界,优先

  • 直接提交队列:SynchronousQueue,有空闲线程就直接提交,没有就创建新线程,超过上限就拒绝
  • 有界任务队列:ArrayBlockingQueue,有一个参数capacity表示队列长度。有新的任务时
    • 实际运行的线程数小于corePoolSize时,直接创建新线程
    • 大于corePoolSize但小于corePoolSize+capacity时,加入等待队列
    • 大于corePoolSize+capacity但小于maximumPoolSize时,直接创建新线程
    • 大于maximumPoolSize时,拒绝
  • 无界任务队列:LinkedBlockingQueue,超过corePoolSize的线程全部进队列等待
  • 优先任务队列:PriorityBlockingQueue:特殊的无界队列,有优先级。

线程池增长策略(任务调度)

  • 实际运行的线程数小于corePoolSize时,直接创建新线程
  • 大于corePoolSize但workQueue未满时,加入等待队列
  • 大于workQueue已满但小于maximumPoolSize时,直接创建新线程
  • 大于maximumPoolSize时,拒绝

拒绝策略

  • AbortPolicy:抛出异常,丢弃任务
  • CallerRunsPolicy:不想放弃但也没资源,就使用调用该excute的线程来执行,性能会下降
  • DiscardOldestPolicy:删除workQueue队列头的任务,再重试执行任务
  • DiscardPolicy:不抛出异常,丢弃任务

并发集合

ConcurrentHashMap

HashMap线程不安全,解决方案:

  • 使用Collections.synchronizedMap()方法包装HashMap,效率不高,不适合高并发

    1
    public static Map m=Collections.synchronizedMap(new HashMap());
  • 使用ConcurrentHashMap,参考这里

CopyOnWriteArrayList

适用读多写少的情况

写的时候,先复制一份副本,修改副本完成后再替换原有数据。这样写操作就不会阻塞读操作。

ConcurrentLinkedQueue

高并发下性能最好的队列

BlockingQueue

BlockingQueue是一个接口,主要有四种实现,参见上一节的BlockingQueue阻塞队列

队列末尾压入元素put(),如果队列已满,会等待;队列头取出元素take(),如果队为空,会等待

例如take(),对于线程而言,如果队列为空,该线程就会等待在notEmpty上,当有新的元素入队的时候,会通知notEmpty的所有线程。

1
2
3
4
//定义的字段
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
1
2
3
4
5
6
7
8
9
10
11
public E take()throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try{
while (count =- 0)
notEmpty.await();
return extract();
}finally {
lock.unlock();
}
}
1
2
3
4
5
6
7
//新元素来了激活
private void insert(E x){
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}

ConcurrentSkipListMap

跳表

  • 分层,最底层链表维护了所有元素
  • 上一层是下一层的子集
  • 链表的元素是排序的
  • 查找时,从顶层链表开始查起,发现大于当前值就去下一层查找
  • 查找时间复杂度是logN

使用示例

1
2
3
4
5
6
7
Map<Integer,Integer>map = new ConcurrentSkipListMap<Integer,Integer>(); 
for(int i=0;i<30;i++) {
map.put(i,i);
}
for(Map.Entry<Integer,Integer>entry:map.entryset()){
System.out.println(entry.getKey());
}

锁的优化

提高锁性能的建议

  1. 减少锁的持有时间
  2. 减少锁粒度(只对需要的一部分进行加锁),相对是锁的粗化。
  3. 读写分离锁(读和读之间不阻塞)
  4. jvm对锁的优化(这是synchronized优化)

ConcurrentHashMap里每个Segment分别加锁就是减少锁粒度。缺点是对于全局操作,资源消耗会很大,因为要给所有段加锁,如size()函数。

锁分离的一个例子是LinkedBlockingQueue,take()函数操作的是队头,put操作的是队尾,因此可以两把锁分别给这两个函数,实现读数据和写数据不冲突。

锁粗化的一个例子是把锁加在循环外而非循环内,这样就不用重复地加锁和释放锁。

1
2
3
4
5
synchronized(lock){
for(int i=0;i<CIRCLE;i++){

}
}

jvm虚拟机对锁的优化

  • 偏向锁:第一个线程请求锁并拿到锁之后,下次再请求锁不需要进行同步操作,直接拿到锁。当有第二个线程竞争的时候,偏向锁失效。
  • 轻量级锁:偏向锁失效后,使用轻量级锁。将对象头部作为指针,指向持有锁的线程堆栈内部,如果成功就获得轻量级锁,如果失败升级为自旋锁。
  • 自旋锁:轻量级锁失效之后,java 虚拟机会假设在不久的时间内,可以拿到锁。所以让线程进行几个空循环,如果可以拿到锁则进入临界区,否则阻塞等待锁。
  • 锁消除:去除不可能存在共享资源竞争的锁。节省不必要的锁请求的时间。

ThreadLocal

存储结构

ThreadLocal是线程的变量副本,每个线程都可以通过set()get()来对这个局部变量进行操作,每个线程隔离。

ThreadLocal一般设为static,这样所有线程都共用一个实例,get的时候通过这个ThreadLocal实例计算hash,然后到当前线程的ThreadLocalMap里去找key,找到后返回value。

  • 每个线程有一个ThreadLocalMap,存储的是entry类型的数组。

  • entry变量的key是ThreadLocal实例,但是是弱引用。

  • 从整体来看,代码里只有一个ThreadLocal变量t1,但是对于每个线程来说,执行t1.get()函数获取到的value是不一样的,这样就实现了t1变量在不同线程里的隔离

一个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static final SimpleDateFormat sdf=new SimpleDateFormat("yyy-MM-dd HH:mm;ss");
public static class ParseDate implements Runnable{
int i=0;
public ParseDate(int i){this.i=i;}
public void run (){
Date t=sdf.parse("2015-03-29 19:29:"+i%60);
System.out.println(i+":"+t);
}

public static void main(String[] args){
ExecutorService es=Executors.newFixedThreadPool(10);
for(int i=0;i<1000;i++){
es.execute(new ParseDate(i));
}
}
}

这个程序是把String类型的时间转成Date类型的时间。由于parse()函数不是线程安全,所以会报错。

使用ThreadLocal为每一个线程都产生一个 SimpleDateformat 对象实例解决线程安全问题。ThreadLocal相当于一个容器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadLocalDemo {
private static ThreadLocal<SimpleDateFormat> t1 = new ThreadLocal<SimpleDateFormat>();//相当于创建了容器
public static class ParseDate implements Runnable{
int i=0;
public ParseDate(int i){this.i=i;}

public void run() {
if(t1.get()==null){
t1.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));//为每个线程创建一个SimpleDateFormat对象
}
Date t =t1.get().parse("2015-03-29 19:29:"+i%60);//调用自己的parse函数,每个线程只访问自己的对象
System.out.println(i+":"+t);

}
}
public static void main(String []args){
ExecutorService es = Executors.newFixedThreadPool(10);
for(int i=0;i<1000;i++){
es.execute(new ParseDate(i));
}
}
}

set函数

计算hash,如果key不相等说明冲突了,检查下一个

get函数

同上

内存泄漏

如果指向ThreadLocal实例的强引用消失了,那么GC会回收实例,导致ThreadLocalMap就会存在key为null,但value不为null的Entry。

每次使用完线程使用remove函数可以清除所有键值对,帮助解决内存泄漏。

无锁

CAS比较交换

  • 天生免疫死锁,没有锁竞争和线程切换的开销
  • CAS(V,E,N),V表示要更新的变量,E表示预期值,N表示新值,当V=E时,才会将V值设为N,如果不相等则该线程被告知失败,可以再次尝试
  • 硬件层面现代处理器已经支持原子化的CAS指令

AtomicInteger

  • atomic包中实现了直接使用CAS的线程安全类型
  • AtomicInteger与Integer的区别是它是可变的,也是线程安全的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//AtomicInteger使用示例
import java.util.concurrent.atomic.AtomicInteger;

public class AtomicIntegerDemo {
static AtomicInteger i = new AtomicInteger();
public static class AddThread implements Runnable{

public void run() {
for(int k=0;k<10000;k++){
i.incrementAndGet();//CAS操作,给自己+1
}
}
}

public static void main(String []args) throws InterruptedException {
Thread []ts = new Thread[10];
for(int k=0;k<10;k++){
ts[k] = new Thread(new AddThread());
}
for(int k=0;k<10;k++){
ts[k].start();
}
for(int k=0;k<10;k++){
ts[k].join();
}
System.out.println(i);
}
}

incrementAndGet()实现原理

1
2
3
4
5
6
7
8
public final int incrementAndGet(){
for (;;){ //死循环就是CAS的失败不断重新尝试
int current = get ();
int next = current + 1;
if (compareAndSet(current,next))
return next;
}
}

其他

单例模式

一个类只产生一个实例对象

好处

  • 减少new花费的时间
  • 减轻GC的压力

写法

  • 构造函数设为private
  • 在本类中(饿汉式)或方法中(懒汉式)创建一个本类对象(private static)
  • 写一个公有静态方法(publiac static)返回创建的对象

饿汉式

1
2
3
4
5
6
7
8
9
public class Singleton {
private Singleton(){
System.out.println("Singleton is created");
}
private static Singleton instance = new Singleton();
public static Singleton getInstance(){
return instance;
}
}

懒汉式

1
2
3
4
5
6
7
8
9
10
11
12
13
public class LazySingleton {
private LazySingleton() {
System.out.println("LazySingleton is created");
}

private static LazySingleton instance = null;

public static synchronized LazySingleton getInstance() {//多线程下需要加锁实现同步
if (instance == null)
instance = new LazySingleton();//方法内创建,方法调用时对象才会创建
return instance;
}
}

使用内部类结合懒汉式和饿汉式

1
2
3
4
5
6
7
8
9
10
11
12
13
public class staticSingleton{
private staticSingleton(){
System.put.printline{"staticSingleton is created."}
}

private static class innerClassHolder{
private static staticSingleton instance=new staticSingleton();
}

public static staticSingleton getInstance(){
return innerClassHolder.instance;
}
}

生产者消费者的实现

五种实现方式,参考这里

  • 用synchronized对存储加锁,然后用object原生的wait() 和 notify()做同步。
  • 用concurrent.locks.Lock,然后用condition的await() 和signal()做同步。
  • 直接使用concurrent.BlockingQueue。
  • 使用PipedInputStream/PipedOutputStream。
  • 使用信号量semaphore。

synchronized

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import java.util.LinkedList;
import java.util.Queue;

public class ProducerAndConsumer {
private final int MAX_LEN = 10;
private Queue<Integer> queue = new LinkedList<Integer>();
class Producer extends Thread {
@Override
public void run() {
producer();
}
private void producer() {
while(true) {
synchronized (queue) {
while (queue.size() == MAX_LEN) {
queue.notify();
System.out.println("当前队列满");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.add(1);
queue.notify();
System.out.println("生产者生产一条任务,当前队列长度为" + queue.size());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
class Consumer extends Thread {
@Override
public void run() {
consumer();
}
private void consumer() {
while (true) {
synchronized (queue) {
while (queue.size() == 0) {
queue.notify();
System.out.println("当前队列为空");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.poll();
queue.notify();
System.out.println("消费者消费一条任务,当前队列长度为" + queue.size());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
public static void main(String[] args) {
ProducerAndConsumer pc = new ProducerAndConsumer();
Producer producer = pc.new Producer();
Consumer consumer = pc.new Consumer();
producer.start();
consumer.start();
}
}

concurrent.locks.Lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* version 1 doesn't use synchronized to improve performance
*/
public class ProducerAndConsumer1 {
private final int MAX_LEN = 10;
private Queue<Integer> queue = new LinkedList<Integer>();
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
class Producer extends Thread {
@Override
public void run() {
producer();
}
private void producer() {
while(true) {
lock.lock();
try {
while (queue.size() == MAX_LEN) {
System.out.println("当前队列满");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.add(1);
condition.signal();
System.out.println("生产者生产一条任务,当前队列长度为" + queue.size());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}
}
}
class Consumer extends Thread {
@Override
public void run() {
consumer();
}
private void consumer() {
while (true) {
lock.lock();
try {
while (queue.size() == 0) {
System.out.println("当前队列为空");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.poll();
condition.signal();
System.out.println("消费者消费一条任务,当前队列长度为" + queue.size());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}
}
}
public static void main(String[] args) {
ProducerAndConsumer pc = new ProducerAndConsumer();
Producer producer = pc.new Producer();
Consumer consumer = pc.new Consumer();
producer.start();
consumer.start();
}
}

BlockingQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;


public class ProducerAndConsumer {
private BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(10);
class Producer extends Thread {
@Override
public void run() {
producer();
}
private void producer() {
while(true) {
try {
queue.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("生产者生产一条任务,当前队列长度为" + queue.size());
try {
Thread.sleep(new Random().nextInt(1000)+500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer extends Thread {
@Override
public void run() {
consumer();
}
private void consumer() {
while (true) {
try {
queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者消费一条任务,当前队列长度为" + queue.size());
try {
Thread.sleep(new Random().nextInt(1000)+500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
ProducerAndConsumer pc = new ProducerAndConsumer();
Producer producer = pc.new Producer();
Consumer consumer = pc.new Consumer();
producer.start();
consumer.start();
}
}

多线程交替打印1212…

需要创建一个Object对象,然后用synchronized给这个对象加锁。

notify()会激活等待的线程,wait()会释放锁。

  • 线程1先加锁,再激活等待线程(第一次没有等待线程),然后wait()释放锁,线程1休眠等待
  • 线程2先加锁,再激活等待线程(线程1),然后wait()释放锁,线程2休眠等待
  • 线程1被激活且获得锁,打印1,释放锁(synchronized块结束),再加锁,激活等待线程(线程2),然后wait()释放锁,线程1休眠等待
  • 线程2被激活且获得锁,打印2,释放锁(synchronized块结束),再加锁,激活等待线程(线程1),然后wait()释放锁,线程2休眠等待
  • ……
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class Main {

public static class Mythread implements Runnable{
private int idx;
private Object o;
private int count=10;

private Mythread(int idxnew,Object onew){
idx=idxnew;
o=onew;
}

@Override
public void run() {
while (true) {

try {
synchronized (o){
o.notify();
o.wait();
System.out.println(idx);
if(count!=1){
count--;
}
else{
o.notify();
return;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}
}

public static void main(String[] args)throws Exception{
Object o=new Object();
Thread th1=new Thread(new Mythread(1,o));
th1.start();
th1.sleep(10);//保证先打印1
Thread th2=new Thread(new Mythread(2,o));
th2.start();
}
}

线程池参数设置

CPU密集型:N+1

IO密集型:2*N

线程等待时间为w,线程CPU计算时间为c,CPU核心数为n,则线程池大小应设为:(w+c)/c *n 才能达到100%CPU使用率。

对于CPU密集型,w=0,+1是为了如果有线程停下可以快速补上;

对于IO密集型,c=0,