Python 官方文档:入门教程 => 点击学习
目录一、 LongAdder简介二、LongAdder代码分析(1)LongAdder的结构(2)add方法实现(3)add方法中longAccumulate方法的实现三、总结一、
LongAdder类是JDK1.8新增的一个原子性操作类。上一节说到,AtomicLong通过CAS提供了非阻塞的原子性操作,相比用阻塞算法的synchronized来说性能已经得到了很大提升。在高并发下大量线程会同时竞争更新同一个原子变量,但由于只有一个线程的CAS操作会成功,这就造成了大量线程竞争失败后,会通过无限循环不断进行自旋尝试CAS操作,这会白白浪费CPU资源。
为了解决AtomicLong在高并发下的缺点,LongAdder应运而生。LongAdder采用的思路是:既然AtomicLong由于过多线程同时去竞争同一个变量的更新而产生性能瓶颈,那么把一个变量分解为多个变量,让同样多的线程去竞争多个资源,就解决了性能问题。
如图4-1:
使用AtomicLong时,是多个线程同时竞争同一个原子变量。
如图4-2:
使用LongAdder时,则是内部维护多个Cell变量,每个Cell里面有一个初始值为0的long型变量,在同等并发量的情况下,争夺单个变量的线程会减少,这是变相地减少了争夺共享资源的并发量。另外,多个线程在争夺同一个Cell原子变量时候,如果失败并不是自旋CAS重试,而是尝试获取在其他Cell原子变量上进行CAS尝试,这增加了当前线程重试CAS成功的可能性。最后,在获取LongAdder当前值时,是把所有Cell变量的value值累加后再加上base值返回的。
LongAdder维护了一个Cells数组和一个基值变量base,Cells数组的特点如下:
Cell 类是AtomicLong的一个改进,用来减少缓存的争用,即解决伪共享问题。对于大多数孤立的多个原子操作进行字节填充是浪费的,因为原子操作都是无规律地分散在内存中进行的,多个原子变量被放入同一个缓存行的可能性很小。但是原子性数组元素的内存地址是连续的,故数组内的多个元素能经常共享缓存行(伪共享),因此Cell类使用了@sun.misc.Contended注解进行字节填充,这是为了防止数组中多个元素共享一个缓存行,从而提升性能。
为了解决高并发下多线程对一个变量 CAS 争夺失败后进行无限自旋而造成的降低并发性能的问题,LongAdder在内部维护了一个动态的Cell数组来分担对单个变量进行争夺的开销。
这一节我们来围绕以下话题对LongAdder的实现进行分析:
如图,LongAdder类继承自Striped64类:
先混个眼熟,Striped64类是一个高并发累加的工具类。其特点为:
Striped64类内部维护三个重要的变量:
上文中出现频率很高的Cell长啥样?下面我们来揭秘一下。
Cell结构如下:
@sun.misc.Contended
static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
简单分析一下:
从add方法的代码中,我们可以找到开头里很多问题的答案。
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {//(1)
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 || //(2)
(a = as[getProbe() & m]) == null ||//(3)
!(uncontended = a.cas(v = a.value, v + x)))//(4)
longAccumulate(x, null, uncontended);//(5)
}
}
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
总结来看就是,(2)(3)(4)处的代码就是获取当前线程应该访问的Cells数组中的Cell元素,然后进行CAS更新操作,在获取期间如果有条件不满足就会跳到代码(5)执行。
另外,当前线程应该访问Cells数组的哪一个Cell元素是通过getProbe() & m
进行计算的,其中m是当前Cells数组元素个数-1,getProbe()
则用于获取当前线程中变量threadLocalRandomProbe
的值(默认为0,代码(5)将其初始化),并且当前线程通过分配的Cell元素的cas函数来保证对Cell元素更新的原子性。到此,我们解决了问题当前线程应该访问Cells数组里的哪一个Cell元素和如何保证线程操作被分配的Cell元素的原子性。
longAccumulate
方法是cells数组被初始化和扩容的地方。
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;//(6)
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {//(7)
if ((a = as[(n - 1) & h]) == null) {//(8)
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//(9)
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
//(10)
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
//(11)
else if (!collide)
collide = true;
//(12)
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
//(12.1)
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
//(12.2)
cellsBusy = 0;
}
//(12.3)
collide = false;
continue; // Retry with expanded table
}
//(13)为了能找到一个空闲的Cell,重新计算hash值,xorshift算法生成随机数。
h = advanceProbe(h);
}
//(14)
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
//(14.1)
Cell[] rs = new Cell[2];
//(14.2)
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
//(14.3)
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
如果Cells数组不存在时,会发生什么?
casCellsBusy
方法。Cells数组扩容是怎么实现的?
线程访问所分配的Cell元素有冲突后如何处理?
最后再问一下自己,看完了源码分析后,能回答出这六个问题吗?如果能,那么恭喜你,LongAdder的原理已经掌握得差不多了。
由于篇幅有限,下一期我们再来看看LongAccumulator类的原理,毕竟这期的内容有点多了,讲太多也不好消化。这期就到这吧~更多关于java高并发AtomicLong LongAdder的资料请关注编程网其它相关文章!
--结束END--
本文标题: java高并发下解决AtomicLong性能瓶颈方案LongAdder
本文链接: https://www.lsjlt.com/news/175442.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
下载Word文档到电脑,方便收藏和打印~
2024-03-01
2024-03-01
2024-03-01
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0