原子类

转自彤哥

原子操作

原子操作是指不会被线程调度机制打断的操作,这种操作一旦开始,就一直运行到结束,中间不会有任何线程上下文切换。

原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序不可以被打乱,也不可以被切割而只执行其中的一部分,将整个操作视作一个整体是原子性的核心特征。

我们这里说的原子操作与数据库ACID中的原子性,笔者认为最大区别在于,数据库中的原子性主要运用在事务中,一个事务之内的所有更新操作要么都成功,要么都失败,事务是有回滚机制的,而我们这里说的原子操作是没有回滚的,这是最大的区别。

AtomicInteger

(1)AtomicInteger中维护了一个使用volatile修饰的变量value,保证可见性;

(2)AtomicInteger中的主要方法最终几乎都会调用到Unsafe的compareAndSwapInt()方法保证对变量修改的原子性。

(3)存在ABA问题

主要属性

// 获取Unsafe的实例
private static final Unsafe unsafe = Unsafe.getUnsafe();
// 标识value字段的偏移量
private static final long valueOffset;
// 静态代码块,通过unsafe获取value的偏移量
static {
    try {
        valueOffset = unsafe.objectFieldOffset
            (AtomicInteger.class.getDeclaredField("value"));
    } catch (Exception ex) { throw new Error(ex); }
}
// 存储int类型值的地方,使用volatile修饰
private volatile int value;

getAndIncrement()

// Unsafe中的方法
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
    
    return var5;
}

AtomicStampedReference

AtomicStampedReference是java并发包下提供的一个原子类,它能解决其它原子类无法解决的ABA问题

ABA

ABA问题发生在多线程环境中,当某线程连续读取同一块内存地址两次,两次得到的值一样,它简单地认为“此内存地址的值并没有被修改过”,然而,同时可能存在另一个线程在这两次读取之间把这个内存地址的值从A修改成了B又修改回了A,这时还简单地认为“没有修改过”显然是错误的。

ABA的危害

(1)举个例子,一个栈,top节点放在AtomicReference原子类中

(2)最开始时栈为top->3->2->1,A线程想要pop(),但此时还没有修改

(3)同一时间,B线程pop了3,pop了2,最后又push了3

(4)在A线程看来,top的值是3没有变化,但Node对象发生了改变

ABA的解决方式:

(1)版本号

比如,上面的栈结构增加一个版本号用于控制,每次CAS的同时检查版本号有没有变过。

还有一些数据结构喜欢使用高位存储一个邮戳来保证CAS的安全。

(2)不重复使用节点的引用

比如,上面的栈结构在线程2执行push()入栈操作的时候新建一个节点传入,而不是复用节点1的引用;

(3)直接操作元素而不是节点

比如,上面的栈结构push()方法不应该传入一个节点(Node),而是传入元素值(int的value)。

AtomicStampedReference主要属性

private static class Pair<T> {
    final T reference;  //用来存值
    final int stamp;    //用来存版本号
    private Pair(T reference, int stamp) {
        this.reference = reference;
        this.stamp = stamp;
    }
    static <T> Pair<T> of(T reference, int stamp) {
        return new Pair<T>(reference, stamp);
    }
}

AtomicStampedReference修改值的源码

public boolean compareAndSet(V   expectedReference,
                             V   newReference,
                             int expectedStamp,
                             int newStamp) {
    // 获取当前的(元素值,版本号)对
    Pair<V> current = pair;
    return
        expectedReference == current.reference &&  // 引用没变
        expectedStamp == current.stamp &&  // 版本号没变
        ((newReference == current.reference &&
          newStamp == current.stamp) || // 新引用等于旧引用并且新版本号等于旧版本号
         casPair(current, Pair.of(newReference, newStamp))); // 构造新的Pair对象并CAS更新
}

(1)如果元素值和版本号都没有变化,并且和新的也相同,返回true;

(2)如果元素值和版本号都没有变化,并且和新的不完全相同,就构造一个新的Pair对象并执行CAS更新pair。

LongAdder

LongAdder的原理是,在最初无竞争时,只更新base的值,当有多线程竞争时通过分段的思想,让不同的线程更新不同的段,最后把这些段相加就得到了完整的LongAdder存储的值。

主要内部类Cell

LongAdder继承自Striped64抽象类,Striped64中定义了Cell内部类和各重要属性。

// Striped64中的内部类,使用@sun.misc.Contended注解,说明里面的值消除伪共享
@sun.misc.Contended static final class Cell {
    // 存储元素的值,使用volatile修饰保证可见性
    volatile long value;
    Cell(long x) { value = x; }
    // CAS更新value的值
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    // Unsafe实例
    private static final sun.misc.Unsafe UNSAFE;
    // value字段的偏移量
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

主要属性

// 这三个属性都在Striped64中
// cells数组,存储各个段的值
transient volatile Cell[] cells;
// 最初无竞争时使用的,也算一个特殊的段
transient volatile long base;
// 标记当前是否有线程在创建或扩容cells,或者在创建Cell
// 通过CAS更新该值,相当于是一个锁
transient volatile int cellsBusy;

最初无竞争或有其它线程在创建cells数组时使用base更新值,有过竞争时使用cells更新值。

最初无竞争是指一开始没有线程之间的竞争,但也有可能是多线程在操作,只是这些线程没有同时去更新base的值。

有过竞争是指只要出现过竞争不管后面有没有竞争都使用cells更新值,规则是不同的线程hash到不同的cell上去更新,减少竞争。

add方法

(1)最初无竞争时只更新base;

(2)直到更新base失败时,创建cells数组;

(3)当多个线程竞争同一个Cell比较激烈时,可能要扩容;

扩容longAccumulate()方法

(1)如果cells数组未初始化,当前线程会尝试占有cellsBusy锁并创建cells数组;

(2)如果当前线程尝试创建cells数组时,发现有其它线程已经在创建了,就尝试更新base,如果成功就返回;

(3)通过线程的probe值找到当前线程应该更新cells数组中的哪个Cell;

(4)如果当前线程所在的Cell未初始化,就占有占有cellsBusy锁并在相应的位置创建一个Cell;

(5)尝试CAS更新当前线程所在的Cell,如果成功就返回,如果失败说明出现冲突;

(5)当前线程更新Cell失败后并不是立即扩容,而是尝试更新probe值后再重试一次;

(6)如果在重试的时候还是更新失败,就扩容;

(7)扩容时当前线程占有cellsBusy锁,并把数组容量扩大到两倍,再迁移原cells数组中元素到新数组中;

(8)cellsBusy在创建cells数组、创建Cell、扩容cells数组三个地方用到;

求和sum()方法

可以看到sum()方法是把base和所有段的值相加得到,那么,这里有一个问题,如果前面已经累加到sum上的Cell的value有修改,不是就没法计算到了么?

答案确实如此,所以LongAdder可以说不是强一致性的,它是最终一致性的

在longAccumulate()方法中有个条件是 n>=NCPU就不会走到扩容逻辑了,而n是2的倍数,那是不是代表cells数组最大只能达到大于等于NCPU的最小2次方?

答案是明确的。因为同一个CPU核心同时只会运行一个线程,而更新失败了说明有两个不同的核心更新了同一个Cell,这时会重新设置更新失败的那个线程的probe值,这样下一次它所在的Cell很大概率会发生改变,如果运行的时间足够长,最终会出现同一个核心的所有线程都会hash到同一个Cell(大概率,但不一定全在一个Cell上)上去更新,所以,这里cells数组中长度并不需要太长,达到CPU核心数足够了。

比如,笔者的电脑是8核的,所以这里cells的数组最大只会到8,达到8就不会扩容了。

原子类分类

原子更新基本类型或引用类型

  • AtomicBoolean:原子更新布尔类型,内部使用int类型的value存储1和0表示true和false,底层也是对int类型的原子操作。

  • AtomicInteger:原子更新int类型。

原子更新int类型。

  • AtomicLong:原子更新long类型。

原子更新long类型。

  • AtomicReference:原子更新引用类型,通过泛型指定要操作的类。

原子更新引用类型,通过泛型指定要操作的类。

  • AtomicMarkableReference:原子更新引用类型,内部使用Pair承载引用对象及是否被更新过的标记,避免了ABA问题。

原子更新引用类型,内部使用Pair承载引用对象及是否被更新过的标记,避免了ABA问题。

  • AtomicStampedReference:原子更新引用类型,内部使用Pair承载引用对象及更新的邮戳,避免了ABA问题。

原子更新数组中的元素

  • AtomicIntegerArray:原子更新int数组中的元素。

  • AtomicLongArray:原子更新long数组中的元素。

  • AtomicReferenceArray:原子更新Object数组中的元素。

    原子更新Object数组中的元素。

原子更新对象中的字段

  • AtomicIntegerFieldUpdater:原子更新对象中的int类型字段。

原子更新对象中的int类型字段。

  • AtomicLongFieldUpdater:原子更新对象中的long类型字段。

原子更新对象中的long类型字段。

  • AtomicReferenceFieldUpdater:原子更新对象中的引用类型字段。

原子更新对象中的引用类型字段。

高性能原子类

高性能原子类,是java8中增加的原子类,它们使用分段的思想,把不同的线程hash到不同的段上去更新,最后再把这些段的值相加得到最终的值。

  • Striped64:下面四个类的父类。

下面四个类的父类。

  • LongAccumulator:long类型的聚合器,需要传入一个long类型的二元操作,可以用来计算各种聚合操作,包括加乘等。

long类型的聚合器,需要传入一个long类型的二元操作,可以用来计算各种聚合操作,包括加乘等。

  • LongAdder:long类型的累加器,LongAccumulator的特例,只能用来计算加法,且从0开始计算。

long类型的累加器,LongAccumulator的特例,只能用来计算加法,且从0开始计算。

  • DoubleAccumulator:double类型的聚合器,需要传入一个double类型的二元操作,可以用来计算各种聚合操作,包括加乘等。

double类型的聚合器,需要传入一个double类型的二元操作,可以用来计算各种聚合操作,包括加乘等。

  • DoubleAdder:double类型的累加器,DoubleAccumulator的特例,只能用来计算加法,且从0开始计算。

double类型的累加器,DoubleAccumulator的特例,只能用来计算加法,且从0开始计算。


hhhhh