Java高并发四:无锁的实际应用-创新互联
无 锁 算法 详 解
无 锁 的Vector 实现:
参照着JDK中的 Vector 源码
1、Vector中的 add 方法的实现,它是一个同步方法,所以保证了每一次只能又一个值对数组 elementData 进行操作。
protected Object[] elementData; 通过数据来实现存储
protected int elementCount; 记录对这个Vector的操作数
public synchronized boolean add(E e) {
modCount++;
ensureCapacityHelper(elementCount + 1);//这边是做越界判断
elementData[elementCount++] = e;
return true;
}
private void ensureCapacityHelper(int minCapacity) {
// overflow-conscious code
if (minCapacity - elementData.length > 0)
grow(minCapacity);//如果没有越界
}
private void grow(int minCapacity) {
// overflow-conscious code
int oldCapacity = elementData.length;
int newCapacity = oldCapacity + ((capacityIncrement > 0) ?
capacityIncrement : oldCapacity);
//如果初始化的时候不指定增量capacityIncrement,那么就是将oldCapacity+oldCapacity赋值给新的长度,如果指定增量那么就是oldCapacity+capacityIncrement
if (newCapacity - minCapacity < 0)
newCapacity = minCapacity;
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
elementData = Arrays.copyOf(elementData, newCapacity);
//最后将老的元素和新的一起加入到Vector中
}
public static
return (T[]) copyOf(original, newLength, original.getClass());
}
值得注意的一点就是如果指定增量,那么可以减少空间的浪费。
JDK阅读只是为未无锁的Vector做铺垫
无锁的Vector的实现
/**
- @author Allen李
@date
/
public class LockFreeVectorextends AbstractList 2^30-1{
private static final boolean debug = false;
private static final int FIRST_BUCKET_SIZE = 8;
//虽然这边篮子的个数是固定的,但是绝对是够用的因为总共的篮子数就是8
private static final int N_BUCKET = 30;
private final AtomicReferenceArray> buckets;
//利用二维数组来嵌套是为了使一维的AtomicReferenceArray尽量避免修改,在一维数组填充满了时是不会去扩充的,
// 而是往二维数组里面填充,这样就避免了修改一维数组,而且高并发就是避免不必要的写来影响性能
public LockFreeVector(AtomicReferenceArray> buckets) {
this.buckets = buckets;
}
static class WriteDescriptor{
public E oldV;
public E newV;
public AtomicReferenceArrayaddr;
public int addr_ind;public WriteDescriptor( AtomicReferenceArray
addr, int addr_ind,E oldV, E newV) { this.oldV = oldV; this.newV = newV; this.addr = addr; this.addr_ind = addr_ind; } public void doInt(){ addr.compareAndSet(addr_ind,oldV,newV); } }
static class Descriptor{
public int size;
volatile WriteDescriptorwriteOp; public Descriptor(int size, WriteDescriptor
writeOp) { this.size = size; this.writeOp = writeOp; } public void completeWrite(){ writeDescriptor tmpOp = writeOp; if(tmpOp != null){ tmpOp.doInt(); writeOp=null; } } }
private AtomicReference
> descriptor;
private static final int zeroNumFirst = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE);public LockFreeVector(){
buckets = new AtomicReferenceArray>(N_BUCKET);
buckets.set(0,new AtomicReferenceArray(FIRST_BUCKET_SIZE));
descriptor = new AtomicReference>(new Descriptor (0,null));
}public void push_back(E e){
Descriptordesc;
DescriptornewD; do { desc=descriptor.get(); desc.completeWrite(); int pos = desc.size+FIRST_BUCKET_SIZE; int zeroNumPos = Integer.numberOfLeadingZeros(pos); int bucketInd = zeroNumFirst - zeroNumPos; if(buckets.get(bucketInd)==null){ int newLen = 2 * buckets.get(bucketInd - 1).length(); if (debug) System.out.println("New Length is:"+newLen); buckets.compareAndSet(bucketInd,null,new AtomicReferenceArray
(newLen)); } //0x80000000就是1000000... 总共32位 int idx = (0x80000000>>>zeroNumPos) ^ pos; newD = new Descriptor<>(desc.size + 1,new WriteDescriptor (buckets.get(bucketInd),idx,null, e)); }while (!descriptor.compareAndSet(desc,newD)); descriptor.get().completeWrite(); }
public E pop_back(){
Descriptordesc;
DescriptornewD;
E elem;
do {
desc = descriptor.get();
desc.completeWrite();
int pos = desc.size + FIRST_BUCKET_SIZE - 1;
int bucketInd = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) - Integer.numberOfLeadingZeros(pos);
int idx = Integer.highestOneBit(pos) ^ pos;
elem = buckets.get(bucketInd).get(idx);
newD = new Descriptor(desc.size - 1,null);
}while (!descriptor.compareAndSet(desc,newD));return elem;
}
@Override
public E get(int index){
int pos = index + FIRST_BUCKET_SIZE;
int zeroNumPos = Integer.numberOfLeadingZeros(pos);
int bucketInd = zeroNumFirst - zeroNumPos;
int idx = (0x80000000>>>zeroNumPos) ^ pos;
return buckets.get(bucketInd).get(idx);
}@Override
public E set(int index,E e){
int pos = index + FIRST_BUCKET_SIZE;
int bucketInd = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) - Integer.numberOfLeadingZeros(pos);
int idx = Integer.highestOneBit(pos) ^ pos;
AtomicReferenceArraybucket = buckets.get(bucketInd);
while (true){
E oldV = bucket.get(idx);
if (bucket.compareAndSet(idx,oldV,e))
return oldV;
}
}public void reserve(int newSize){
int size = descriptor.get().size;
int pos = size + FIRST_BUCKET_SIZE - 1;
int i = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) - Integer.numberOfLeadingZeros(pos);if (i<1) i = 1; int initialSize = buckets.get(i - 1).length(); while (i < Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) - Integer.numberOfLeadingZeros(newSize + FIRST_BUCKET_SIZE - 1)){ i++; initialSize *= FIRST_BUCKET_SIZE; buckets.compareAndSet(i, null , new AtomicReferenceArray
(initialSize)); } }
public int size(){
return descriptor.get().size;
}@Override
public boolean add(E obj){
push_back(obj);
return true;
}
}
它的结构是:private final AtomicReferenceArray> buckets;
从这里我们可以看到,它的内部是采用的是 无锁的引用数组, 数组嵌套数组
变量buckets存放所有的内部元素。从定义上看,它是一个保存着数组的数组,也就是通常的二维数组。特别之处在于这些数组都是使用CAS的原子数组。为什么使用二维数组去实现一个一维的Vector呢?这是为了将来Vector进行动态扩展时可以更加方便。我们知道,AtomicReferenceArray内部使用Object[]来进行实际数据的存储,这使得动态空间增加特别的麻烦,因此使用二维数组的好处就是为将来增加新的元素。
相当于一个二维数组,它的大小可以动态的进行扩展,
为了更有序的读写数组,定义了一个Descriptor的静态内部类。它的作用是使用CAS操作写入新数据。
它定义了
private static final int FIRST_BUCKET_SIZE = 8;
/**
- number of buckets. 30 will allow 8(2^30-1) elements
/
private static final int N_BUCKET = 30;
FIRST_BUCKET_SIZE:为第一个数组的长度
N_BUCKET 整个二维数组大可扩转至30
每次的扩展是成倍的增加,即:第一个数组长度为8,第二个为8<<1,第三个为8<<2 ......第30个为 8<<29
3. push_back
在第23行,使用descriptor将数据真正地写入数组中。这个descriptor写入的数据由20~21行构造的WriteDescriptor决定。
在循环最开始(第5行),使用descriptor先将数据写入数组,是为了防止上一个线程设置完descriptor后(22行),还没来得及执行第23行的写入,因此,做一次预防性的操作。
第8~10行通过当前Vector的大小(desc.size),计算新的元素应该落入哪个数组。这里使用了位运算进行计算。
LockFreeVector每次都会扩容。它的第一个数组长度为8,第2个就是16,第3个就是32,依次类推。它们的二进制表示如下:
它们之和就是整个LockFreeVector的总大小,因此,如果每一个数组都恰好填满,那么总大小应该类似如下的值(以4个数组为例)00000000 00000000 00000000 01111000:4个数组都恰好填满时的大小。
导致这个数字进位的最小条件,就是加上二进制的1000。而这个数字整好是8(FIRST_BUCKET_SIZE就是8)这就是第8行代码的意义。
它可以使得数组大小发生一次二进制进位(如果不进位说明还在第一个数组中),进位后前导零的数量就会发生变化。而元素所在的数组,和pos(第8行定义的比变量)的前导零直接相关。每进行一次数组扩容,它的前导零就会减1。如果从来没有扩容过,它的前导零就是28个。以后,逐级减1。这就是第9行获得pos前导零的原因。第10行,通过pos的前导零可以立即定位使用哪个数组(也就是得到了bucketInd的值)。
第11行,判断这个数组是否存在。如果不存在,则创建这个数组,大小为前一个数组的两倍,并把它设置到buckets中。
接着再看一下元素没有恰好填满的情况:
那么总大小如下:
总个数加上二进制1000后,得到:
显然,通过前导零可以定位到第4个数组。而剩余位,显然就表示元素在当前数组内偏移量(也就是数组下标)。根据这个理论,就可以通过pos计算这个元素应该放在给定数组的哪个位置。通过第19行代码,获得pos的除了第一位数字1以外的其他位的数值。因此,pos的前导零可以表示元素所在的数组,而pos的后面几位,则表示元素所在这个数组中的位置。由此,第19行代码就取得了元素所在位置idx。
代码理解可以参考:
https://blog.csdn.net/netcobol/article/details/79785651
和
http://www.shaoqun.com/a/197387.html
另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。
本文名称:Java高并发四:无锁的实际应用-创新互联
文章URL:http://pcwzsj.com/article/dsgdse.html