Java8 Striped64 和 LongAdder

本文原发表在码蜂笔记,原文链接:码蜂笔记Java8 Striped64 和 LongAdder

数据 striping

根据维基百科的这段说明

In www.html-guide.computer data storage, data striping is the technique of segmenting logically sequential data, such as a file, so that consecutive segments are stored on different physical storage devices.

Striping is useful when a processing device requests data more quickly than a single storage device can provide it. By spreading segments across multiple devices which can be accessed concurrently, total data throughput is increased. It is also a useful method for balancing I/O load across an array of disks. Striping is used across disk drives in redundant array of independent disks (RAID) storage, network interface controllers, different www.html-guide.computers in clustered file systems and grid-oriented storage, and RAM in some systems.

数据 striping 就是把逻辑上连续的数据分为多个段,使这一序列的段存储在不同的物理设备上。通过把段分散到多个设备上可以增加访问并发性,从而提升总体的吞吐量。

Striped64

JDK 8 的 java.util.concurrent.atomic 下有一个包本地的类 Striped64 ,它持有常见表示和机制用于类支持动态 striping 到 64bit 值上。

设计思路

这个类维护一个延迟初始的、原子地更新值的表,加上额外的 “base” 字段。表的大小是 2 的幂。索引使用每线程的哈希码来masked。这个的几乎所有声明都是包私有的,通过子类直接访问。

表的条目是 Cell 类,一个填充过(通过 sun.misc.Contended )的 AtomicLong 的变体,用于减少缓存竞争。填充对于多数 Atomics 是过度杀伤的,因为它们一般不规则地分布在内存里,因此彼此间不会有太多冲突。但存在于数组的原子对象将倾向于彼此相邻地放置,因此将通常共享缓存行(对 性能有巨大的副作用),在没有这个防备下。

部分地,因为Cell相对比较大,我们避免创建它们直到需要时。当没有竞争时,所有的更新都作用到 base 字段。根据第一次竞争(更新 base 的 CAS 失败),表被初始化为大小 2。表的大小根据更多的竞争加倍,直到大于或等于CPU数量的最小的 2 的幂。表的槽在它们需要之前保持空。

一个单独的自旋锁(“cellsBusy”)用于初始化和resize表,还有用新的Cell填充槽。不需要阻塞锁,当锁不可得,线程尝试其他槽(或 base)。在这些重试中,会增加竞争和减少本地性,这仍然好于其他选择。

通过 ThreadLocalRandom 维护线程探针字段,作为每线程的哈希码。我们让它们为 0 来保持未初始化直到它们在槽 0 竞争。然后初始化它们为通常不会互相冲突的值。当执行更新操作时,竞争和/或表冲突通过失败了的 CAS 来指示。根据冲突,如果表的大小小于容量,它的大小加倍,除非有些线程持有了锁。如果一个哈希后的槽是空的,且锁可得,创建新的Cell。否则,如果槽存 在,重试CAS。重试通过 “重散列,double hashing” 来继续,使用一个次要的哈希算法(Marsaglia XorShift)来尝试找到一个自由槽位。

表的大小是有上限的,因为,当线程数多于CPU数时,假如每个线程绑定到一个CPU上,存在一个完美的哈希函数映射线程到槽上,消除了冲突。当我们 到达容量,我们随机改变碰撞线程的哈希码搜索这个映射。因为搜索是随机的,冲突只能通过CAS失败来知道,收敛convergence 是慢的,因为线程通常不会一直绑定到CPU上,可能根本不会发生。然而,尽管有这些限制,在这些案例下观察到的竞争频率显著地低。

当哈希到特定 Cell 的线程终止后,Cell 可能变为空闲的,表加倍后导致没有线程哈希到扩展的 Cell 也会出现这种情况。我们不尝试去检测或移除这些 Cell,在实例长期运行的假设下,观察到的竞争水平将重现,所以 Cell 将最终被再次需要。对于短期存活的实例,这没关系。

设计思路小结

  • striping和缓存行填充:通过把类数据 striping 为 64bit 的片段,使数据成为缓存行友好的,减少CAS竞争。
  • 分解表示:对于一个数字 5,可以分解为一序列数的和:2 + 3,这个数字加 1 也等价于它的分解序列中的任一 数字加 1:5 + 1 = 2 + (3 + 1)。
  • 通过把分解序列存放在表里面,表的条目都是填充后的 Cell;限制表的大小为 2 的幂,则可以用掩码来实现索引;同时把表的大小限制为大于等于CPU数量的最小的 2 的幂。
  • 当表的条目上出现竞争时,在到达容量前表扩容一倍,通过增加条目来减少竞争。

Cell 类

Cell 类是 Striped64 的静态内部类。通过注解 @sun.misc.Contended 来自动实现缓存行填充,让Java编译器和JRE运行时来决定如何填充。本质上是一个填充了的、提供了CAS更新的volatile变量。

@sun.misc.Contended static final class Cell {
     volatile long value;
     Cell(long x) { value = x; }
     final boolean cas(long cmp, long val) {
          return UNSAFE.www.html-guide.compareAndSwapLong(this, valueOffset, cmp, val);
     }

     // Unsafe mechanics
     private static final sun.misc.Unsafe UNSAFE;
     private static final long valueOffset;
     static {
          try {
               UNSAFE = sun.misc.Unsafe.getUnsafe();
               Class<?> ak = Cell.class;
               valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
          } catch (Exception e) {
               throw new Error(e);
          }
     }
}

Striped64

Striped64 通过一个 Cell 数组维持了一序列分解数的表示,通过 base 字段维持数的初始值,通过 cellsBusy 字段来控制 resing 和/或 创建Cell。它还提供了对数进行累加的机制。

abstract class Striped64 extends Number {
    static final int NCPU = Runtime.getRuntime().availableProcessors();

     // 存放 Cell 的表。当不为空时大小是 2 的幂。
    transient volatile Cell[] cells;

     // base 值,在没有竞争时使用,也作为表初始化竞争时的一个后备。
    transient volatile long base;

     // 自旋锁,在 resizing 和/或 创建Cell时使用。
    transient volatile int cellsBusy;
}

累加机制 longAccumulate

设计思路里针对机制的实现,核心逻辑。该方法处理涉及初始化、resing、创建新cell、和/或竞争的更新。

逻辑如下:

  • if 表已初始化
    • if 映射到的槽是空的,加锁后再次判断,如果仍然是空的,初始化cell并关联到槽。
    • else if (槽不为空)在槽上之前的CAS已经失败,重试。
    • else if (槽不为空、且之前的CAS没失败,)在此槽的cell上尝试更新
    • else if 表已达到容量上限或被扩容了,重试。
    • else if 如果不存在冲突,则设置为存在冲突,重试。
    • else if 如果成功获取到锁,则扩容。
    • else 重散列,尝试其他槽。
  • else if 锁空闲且获取锁成功,初始化表
  • else if 回退 base 上更新且成功则退出
  • else 继续
final void longAccumulate(long x, LongBinaryOperator fn,
                                boolean wasUncontended) {
     int h;
     if ((h = getProbe()) == 0) {
          // 未初始化的
          ThreadLocalRandom.current(); // 强制初始化
          h = getProbe();
          wasUncontended = true;
     }

     // 最后的槽不为空则 true,也用于控制扩容,false重试。
     boolean collide = false;
     for (;;) {
          Cell[] as; Cell a; int n; long v;
          if ((as = cells) != null && (n = as.length) > 0) {
               // 表已经初始化

               if ((a = as[(n - 1) & h]) == null) {
                    // 线程所映射到的槽是空的。

                    if (cellsBusy == 0) {       // 尝试关联新的Cell

                         // 锁未被使用,乐观地创建并初始化cell。
                         Cell r = new Cell(x);

                         if (cellsBusy == 0 && casCellsBusy()) {
                              // 锁仍然是空闲的、且成功获取到锁

                              boolean created = false;
                              try {               // 在持有锁时再次检查槽是否空闲。
                                   Cell[] rs; int m, j;
                                   if ((rs = cells) != null &&
                                        (m = rs.length) > 0 &&
                                        rs[j = (m - 1) & h] == null) {
                                        // 所映射的槽仍为空

                                        rs[j] = r;          // 关联 cell 到槽
                                        created = true;
                                   }
                              } finally {
                                   cellsBusy = 0;     // 释放锁
                              }
                              if (created)
                                   break;               // 成功创建cell并关联到槽,退出
                              continue;           // 槽现在不为空了
                         }
                    }
                    // 锁被占用了,重试
                    collide = false;
               }
               // 槽被占用了

               else if (!wasUncontended)       // 已经知道 CAS 失败
                    wasUncontended = true;      // 在重散列后继续

               // 在当前槽的cell上尝试更新
               else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                                  fn.applyAsLong(v, x))))
                    break;

               // 表大小达到上限或扩容了;
               // 表达到上限后就不会再尝试下面if的扩容了,只会重散列,尝试其他槽
               else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale

               //  如果不存在冲突,则设置为存在冲突
               else if (!collide)
                    collide = true;

               // 有竞争,需要扩容
               else if (cellsBusy == 0 && casCellsBusy()) {
                    // 锁空闲且成功获取到锁
                    try {
                         if (cells == as) {      // 距上一次检查后表没有改变,扩容:加倍
                              Cell[] rs = new Cell[n << 1];
                              for (int i = 0; i < n; ++i)
                                   rs[i] = as[i];
                              cells = rs;
                         }
                    } finally {
                         cellsBusy = 0;     // 释放锁
                    }
                    collide = false;
                    continue;                   // 在扩容后的表上重试
               }

               // 没法获取锁,重散列,尝试其他槽
               h = advanceProbe(h);
          }
          else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
               // 加锁的情况下初始化表

               boolean init = false;
               try {                           // Initialize table
                    if (cells == as) {
                         Cell[] rs = new Cell[2];
                         rs[h & 1] = new Cell(x);
                         cells = rs;
                         init = true;
                    }
               } finally {
                    cellsBusy = 0;     // 释放锁
               }
               if (init)
                    break;     // 成功初始化,已更新,跳出循环
          }
          else if (casBase(v = base, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
               // 表未被初始化,可能正在初始化,回退使用 base。
               break;                          // 回退到使用 base
     }
}

LongAdder

LongAdder 继承自 Striped64,它的方法只针对简单的情况:cell存在且更新无竞争,其余情况都通过 Striped64 的longAccumulate方法来完成。

public void add(long x) {
     Cell[] as; long b, v; int m; Cell a;
     if ((as = cells) != null || !casBase(b = base, b + x)) {
          // cells 不为空 或在 base 上cas失败。也即出现了竞争。
          boolean uncontended = true;

          //
          if (as == null || (m = as.length - 1) < 0 ||
               (a = as[getProbe() & m]) == null ||
               !(uncontended = a.cas(v = a.value, v + x)))
               // 如果所映射的槽不为空,且成功更新则返回,否则进入复杂处理流程。
               longAccumulate(x, null, uncontended);
     }
}

// 获取当前的和。base值加上每个cell的值。
public long sum() {
     Cell[] as = cells; Cell a;
     long sum = base;
     if (as != null) {
          for (int i = 0; i < as.length; ++i) {
               if ((a = as[i]) != null)
                    sum += a.value;
          }
     }
     return sum;
}

原创文章,转载请注明: 转载自九乐棋牌本文链接地址: Java8 Striped64 和 LongAdder


FavoriteLoading添加本文到我的收藏
  • Trackback 关闭
  • 评论 (2)
    • 我是谁
    • 2014/05/18 5:32下午

    怎么回事?一定是我打开方式不对。我居然没看懂

  1. 绝对是好文,虽然翻译得有点生硬,但是对strip的思路还是解释得很到位的。

您必须 登陆 后才能发表评论

return top