并发数据结构

一.BlockingDeque阻塞双端队列(线程安全):

注意ArrayDeque和LinkedList仅仅扩展了Deque,是非阻塞类型的双端队列。

BlockingQueue单向队列,其内部基于ReentrantLock + Condition来控制同步和"阻塞"/"唤醒"的时机;有如下几个实现类:

  1. ArrayBlockingQueue: “浮动相对游标”的数组,来实现有界的阻塞队列。
  2. DelayQueue:支持“可延迟”的队列,DelayQueue还只接受Delayed类型的元素,Delayed接口继承自Compare接口并提供了一个long getDelay(TimeUnit),来获取指定时间到now的时间剩余量。DelayQueue底层就是使用PriorityQueue作为支撑的。
  3. PriorityBlockingQueue:有权重的队列,此队列时可以根据指定的comparator进行排序的。
  4. SynchronousQueue://
  5. LinkedBlockingDeque:有界或者无界阻塞队列

PriorityQueue为非线程安全非阻塞,有权重的队列,其权重需要根据特定的compartor来产生。

 

二.ConcurrentMap(接口):支持并发的map,支持多线程环境中安全的访问。

其提供了几个独特的方法:

  • V putIfAbsent(K,V):如果map中不存在此key,则put,否则返回现有的此key关联的值。此过程有Lock同步:

 

Java代码  

  1. //等价于:  
  2. if (!map.containsKey(key))   
  3.       return map.put(key, value);  
  4.   else  
  5. return map.get(key);  

 

Java代码  

  1. Map<String,Object> map = new ConcurrentHashMap<String, Object>();  
  2. if(map.containsKey("key")){  
  3.     map.put("key", new Object());  
  4. }  
  5. //注意,concurrentHashMap并不保证contains方法和put方法直接保持"原子性",即有可能contains方法返回false之后,在put之前,可能其他线程已经put成功,即在当前线程put时,此时数据已经不一致了.建议采用putIfAbsent  

 

 

  • boolean remove(Object key,Object value):比较并删除指定的key和value。
  • boolean replace(K,V oldValue,V newValue):比较并替换。

目前实现ConcurrentMap的类有ConcurrentHashMap,一种基于锁分段技术实现的并发hashMap,锁采取了ReentrantLock。

 

三.ConcurrentLinkedQueue:

基于单向链表实现的,线程安全的并发队列,无界非阻塞队列,当队列需要在多线程环境中被使用,可以考虑使用它。记住,这是个非阻塞队列不过支持阻塞的队列,貌似都是线程安全的

此队列的size不是时间固定的,它的iterator也会被不断调整。ConcurrentLinkedQueue并没有使用Lock,而是采用了CAS的方式,对tail.next进行赋值操作。因为tail.next永远是null,且队列不接受null的元素。

 

注意,非并发集合(list,queue,set)的iterator以及forEach循环在并发环境中是不能正常工作的,如果原始集合被外部修改(其他线程的add,remove),将会导致异常。对于并发集合的iterator,没有做相关的size校验。

 

Lock(锁)是控制操作(action)的,可以让一个操作或者一个子操作被串行的处理。。。CAS其实只是对内存数据的变更时使用,如果想知道数据变更在并发环境中是否符合预期,才会使用到CAS。

 

四.ConcurrentSkipListMap/ConcurrentSkipListSet

两个基于SkipList(跳跃表)方式实现的、支持并发访问的数据结构。依据跳跃表的思想,可以提高数据访问的效率。其中ConcurrentSkipListSet底层使用ConcurrentSkipListMap支撑。

ConcurrentSkipListMap也是ConcurrentNavigableMap的实现类,对于SkipList,其内部元素,必须是可排序的。

 

跳跃表是一个很简单的表,(参见跳跃表概念),对底层的线性存储结构,加入类似“多级索引”的概念,“索引”的添加时基于随即化。一个跳跃表,整体设计上(设计思路很多)分为表左端head索引,右端tail索引(边界),底端存储层(排序的线性链表),和一个随机化、散列化的不同高度的多级索引“指针”。head索引是高度最高的索引,它是整个链表中值最小的元素锁产生的索引;右端为边界索引,索引指向null或者任意设计的边界值(bound).

 

跳跃表的底端是一个和普通的链表没啥区别,单向或者双向的均可,前提是必须是排序的。索引节点,就是一个有向路径的标,每个索引节点,都分别有right、down指向,对于双向跳跃表,就具有left、right、up、down四个方向指针;指针就是为了方便寻路。每个新增元素时,首先会导致底层链表的改动;根据自定义的随即算法,来决定此元素的索引高度,如果高度不为0,则依次建立相应层高的索引,并调整各个层高的所以指向。

 

跳跃表之所以这么设计,实事上就是在做一件事情:基于简单的设计思路和算法,来实现较为高效的查询策略。相对于二分查找有一定的优势.

 

五.CopyOnWriteArrayList/CopyOnWriteArraySet:

均是CopyOnWrite思想,在数据修改时(happen-before),对数据进行Copy(),read操作可以在原数据结构上继续进行,待write操作结束后,调整数据结构指针。基于这种设计思路的数据结构,通常是read操作频率远大于write操作,可以在并发环境中,支撑较高的吞吐量;避免了因为同步而带来的瓶颈,同时也能确保数据安全操作。同时需要注意,copy操作将会带来多余的空间消耗。注意,此API时支持并发的,多个线程add操作(即CopyOnWrite)将会被队列化,内部采取了ReentrantLock机制来控制.

  • CopyOnWriteArrayList底层基于数组实现,在进行write操作时(add,remove),将会导致Arrays.copy操作,创建一个新的数组;待write操作成功后,将原数组的指针替换成新数组指针.
  • CopyOnWriteArraySet底层直接使用CopyOnWriteArrayList作为支撑,只不过在add操作时会遍历整个数组结构并进行equals比较(确保具有Set的特性),只有发现此新元素不存在时才会"替换指针".

    java中这两个API,支持并发操作时,仍然可以进行遍历而无需额外的同步;即不会抛出ConcurrentModificationException。事实上,迭代器所持有的数组只是一个"创建iterator时底层数组的引用",所以在遍历期间,即使CopyOnWriteArrayList已经新增或者删除了某些元素,仍不会发生冲突,因为iterator持有的是旧数组的引用,而CopyOnWriteArrayList持有的是Copy操作时创建的新数组引用..由此可见,iterator也无法反应实时的数组变化(遍历期间,实际数组的添加、删除),但是原始数组中对象内容发生改变还是可以在迭代器中反应出来。CopyOnWrite的遍历器的remove/add/set操作不被支持,这区别于ArrayList.

    CopyOnWriteArrayList、CopyOnWriteArraySet,底层基于数组实现,采取ReentrantLock来同步add/remove/clear等操作。并采取了snapshot的简单手段:

 

 

Java代码  

  1. //例如add:  
  2.   
  3. public boolean add(E e) {  
  4.     final ReentrantLock lock = this.lock;  
  5.     lock.lock();  
  6.     try {  
  7.        Object[] elements = getArray();  
  8.        int len = elements.length;  
  9.         //数组copy  
  10.        Object[] newElements = Arrays.copyOf(elements, len + 1);  
  11.        newElements[len] = e;  
  12.         //修改结束后,指针转换  
  13.        setArray(newElements);  
  14.        return true;  
  15.     } finally {  
  16.        lock.unlock();  
  17.     }  
  18. }  

 

 

 

六.CountDownLatch:

同步类,用于多个线程协调工作。共享锁,当锁计数器较少到0时,将释放等待的线程。使用场景是,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。当CountDownLatch的锁计数器为1时,可以作为一种“开关”来使用。计数器无法被重置,如果需要重复计数,可以使用CyclicBarrier。

CountDownLatch内部基于AQS来控制线程访问。这个API很简单,只有2个核心方法:

  • void await():如果计数器不为0,则获取锁失败,加入同步队列;即线程阻塞。
  • void countDown():释放锁,导致计数器递减,如果此时计数器为0,则表示锁释放成功,AQS会帮助“发射”因为await阻塞的线程(组)。

Java代码  

  1. public class CountDownLatchTestMain {  
  2.   
  3.     /** 
  4.  
  5.     * @param args 
  6.  
  7.     */  
  8.   
  9.     public static void main(String[] args) throws Exception{  
  10.         System.out.println("Begin");  
  11.         CountDownLatch latch = new CountDownLatch(2);  
  12.         for(int i=0;i<4;i++){  
  13.             CThread c = new CThread(i,latch);  
  14.             c.start();  
  15.             //Thread.sleep(500);  
  16.         }  
  17.         Thread.sleep(1000);  
  18.         System.out.println("End");  
  19.     }  
  20.   
  21.     static class CThread extends Thread{  
  22.         CountDownLatch latch;  
  23.         int count;  
  24.         CThread(int count,CountDownLatch latch){  
  25.             this.count = count;  
  26.             this.latch = latch;  
  27.         }  
  28.         @Override  
  29.         public void run(){  
  30.             try{  
  31.                 System.out.println("---"+count);  
  32.             if(count % 2 == 0){  
  33.                 latch.await();  
  34.                 System.out.println("//" + count + "await--!");  
  35.             }else{  
  36.                 latch.countDown();  
  37.                 System.out.println("//" + count + "down!");  
  38.             }  
  39.             }catch(Exception e){  
  40.                 e.printStackTrace();  
  41.             }  
  42.   
  43.         }  
  44.   
  45.     }  
  46.   
  47. }  
  48.   
  49.    

 

七.CyclicBarrier:

循环屏障,它允许一组线程互相等待,直到到达某个公共屏障点;线程(组)数量固定,线程之间需要不时的互相等待,CyclicBarrier和CountDownLatch相比,它可以在释放等待线程后被再次“重用”,所以称为循环屏障。它提供了类似“关卡”的功能。对于失败的同步尝试,CyclicBarrier 使用了一种要么全部要么全不 (all-or-none) 的破坏模式:如果因为中断、失败或者超时等原因,导致线程过早地离开了屏障点,那么在该屏障点等待的其他所有线程也将通过 BrokenBarrierException(如果它们几乎同时被中断,则用 InterruptedException)以反常的方式离开。

 

  • CyclicBarrier(int parties):指定参与者个数
  • CyclicBarrier(int parties,Runnable barrierAction):指定一个屏障操作,此操作将会有最后一个进入barrier的线程执行。
  • int await():在所有的线程达到barrier之前,一直等待。此方法可以抛出InterrutedExeception(此线程被中断),可以抛出BrokenBarrierExeception(当其他参与者在wait期间中断,导致屏障完整性被破坏),在方法被await时,如果抛出上述异常,需要做补救的相应操作。此方法返回当前线程到达屏障时的索引。(第一个到达的,为0,最后一个为getParties() - 1);根据返回值的不同可以做一些操作,比如最先/最后达到的做一些前置、后置操作等。
  • boolean isBroken():屏障是否处于损坏状态。
  • void reset():重置屏障为其初始状态;如果此时有线程在await,其线程将会抛出BrokenBarrierExeception。对于reset操作,需要线程的执行方法有相应的配合(比如支持操作轮训等),否则重置会带来一些不必要的麻烦。。。如果你需要重置,尚不如重新建一个CyclicBarrier。

底层基于ReentrantLock实现。线程阻塞基于Condition方式(注意Condition底层也是通过AQS框架实现);大概伪代码:

Java代码  

  1. ReentrantLock lock = new ReentrantLock();  
  2. Condition trip = lock.newCondition();  
  3. ////await方法:  
  4. if(count!=0){  
  5.     trip.await();//AQS:当前线程队列化,LockSupport.park  
  6.     count--;  
  7. }else{  
  8.     trip.signalAll();  
  9. }  

 

 

Java代码  

  1. //////////////////代码实例  
  2.   
  3. public class CyclicBarrierTestMain {  
  4.   
  5.    
  6.   
  7.     /** 
  8.  
  9.     * @param args 
  10.  
  11.     */  
  12.   
  13.     public static void main(String[] args) throws Exception{  
  14.         CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {  
  15.             @Override  
  16.             public void run() {  
  17.                 System.out.println("Barrier action!!");  
  18.             }  
  19.             });  
  20.         for(int i=0;i<5;i++){  
  21.             CThread c = new CThread(barrier);  
  22.             c.start();  
  23.         }  
  24.         Thread.sleep(1000);  
  25.     }  
  26.   
  27.     static class CThread extends Thread{  
  28.         CyclicBarrier barrier;  
  29.         CThread(CyclicBarrier barrier){  
  30.             this.barrier = barrier;  
  31.         }  
  32.         @Override  
  33.         public void run(){  
  34.             int count = 0;  
  35.             while(true){  
  36.                 try{  
  37.                     System.out.print("---" + count);  
  38.                     int index = barrier.await();  
  39.                     System.out.println("+++" + count);  
  40.                     count++;  
  41.                     if(index == barrier.getParties() - 1){  
  42.                         //barrier.reset();  
  43.                     }  
  44.                 }catch(Exception e){  
  45.                     e.printStackTrace();  
  46.                     break;  
  47.                 }  
  48.             }  
  49.         }  
  50.     }  
  51. }  

 

 八.Exchanger

Exchanger:同步交换器,2个互相匹配(协调的对象),互相交换数据。2个线程需要把相同类型的数据,以互相等待的方式交换。比如线程1将数据A交换给B,此时线程1等待直到线程B将数据交换出去。Exchanger有一个方法,就是exchange(V x):其作用就是等待另一个线程到达交换点,然后将数据传递给线程。

如果没有其他线程到达交换点,处于调度的目的,禁用当前线程,直到某个线程到达或者某个线程中断。

伪代码:

Java代码  

  1. void exchange(V item){  
  2.     //如果有线程已经到达  
  3.     for(;;){  
  4.         Node e = get();  
  5.         if(e != null){  
  6.             V i = e.getItem();  
  7.             CAS(e,i,null);//将等待匹配者移除  
  8.             Thread t = e.waiter;  
  9.             LockSupport.unpark(t);  
  10.             //  
  11.             Node ne = new Node(currentThread,ne);  
  12.             set();//将当前需要交换的数据加入,当其他线程unpart之后,可以get,并获取数据  
  13.             return i;//返回需要交换的数据  
  14.         }else{  
  15.             Node e = new Now(currentThread,item);  
  16.             set(node);  
  17.             LockSupport.park(currentThread);  
  18.         //重新回到顶层for循环,并获取交换数据  
  19.         }  
  20.     }  
  21. }  

 

如下的例子是基于一个简单的Productor和Consumer模式,一个线程负责生产数据,当数据满时,交换给consumer消费;当consumer消费完时,也申请交换。

 

 

Java代码  

  1. import java.util.ArrayDeque;  
  2. import java.util.Queue;  
  3. import java.util.Random;  
  4. import java.util.concurrent.Exchanger;  
  5.   
  6. public class ExchangerTestMain {  
  7.   
  8.     /** 
  9.  
  10.     * @param args 
  11.  
  12.     */  
  13.   
  14.     public static void main(String[] args) throws Exception{  
  15.         Exchanger<Queue<Integer>> exchanger = new Exchanger<Queue<Integer>>();  
  16.         CThread c = new CThread(exchanger);  
  17.         PThread p = new PThread(exchanger);  
  18.         c.start();  
  19.         p.start();  
  20.         Thread.sleep(2000);  
  21.     }  
  22.   
  23.     static class CThread extends Thread{  
  24.         Exchanger<Queue<Integer>> exchanger ;  
  25.         Queue<Integer> current;  
  26.         CThread(Exchanger<Queue<Integer>> exchanger){  
  27.             this.exchanger = exchanger;  
  28.         }  
  29.   
  30.         @Override  
  31.         public void run(){  
  32.             if(current == null){  
  33.                 current = new ArrayDeque<Integer>(10);  
  34.             }  
  35.             try{  
  36.                 while(true){  
  37.                 //productor  
  38.                 if(current.size() == 0){  
  39.                     current = exchanger.exchange(current);//交换出去fullList,希望获得EmptyList  
  40.                 }  
  41.                     System.out.println("C:" + current.poll());  
  42.                 }  
  43.             }catch(Exception e){  
  44.                 e.printStackTrace();  
  45.                 return;  
  46.             }  
  47.         }  
  48.     }  
  49.   
  50.     static class PThread extends Thread{  
  51.         Exchanger<Queue<Integer>> exchanger ;  
  52.         Queue<Integer> current;  
  53.         PThread(Exchanger<Queue<Integer>> exchanger){  
  54.             this.exchanger = exchanger;  
  55.         }  
  56.   
  57.         @Override  
  58.         public void run(){  
  59.             Random r = new Random();  
  60.             if(current == null){  
  61.                 current = new ArrayDeque<Integer>(10);  
  62.             }  
  63.             try{  
  64.                 while(true){  
  65.                     //productor  
  66.                     if(current.size() >= 10){  
  67.                         current = exchanger.exchange(current);//交换出去fullList,希望获得EmptyList  
  68.                     }  
  69.                     Integer i = r.nextInt(20);  
  70.                     System.out.println("P:" + i);  
  71.                     current.add(i);  
  72.                 }  
  73.             }catch(Exception e){  
  74.                 e.printStackTrace();  
  75.                 return;  
  76.             }  
  77.         }  
  78.     }  
  79. }  

 

 

九.Semaphore:信号量

我们需要把semaphore真的看成“信号量”,它是可以被“增减”的锁引用,“0”是判断信号“过剩”的界限。

我们通常使用semaphore来控制资源访问并发量。它底层使用“共享”模式锁实现,提供了“公平”“非公平”2中策略。当“信号量”大于0时,允许获取锁;否则将阻塞直到信号量恢复。

将信号量初始化为 1,使得它在使用时最多只有一个可用的许可,从而可用作一个相互排斥的锁。这通常也称为二进制信号量,因为它只能有两种状态:一个可用的许可,或零个可用的许可。按此方式使用时,二进制信号量具有某种属性(与很多 Lock 实现不同),即可以由线程释放“锁”,而不是由所有者(因为信号量没有所有权的概念)。在某些专门的上下文(如死锁恢复)中这会很有用。

  • Semaphore(int permits, boolean fair):指定信号量,指定公平策略。
  • void acquire():获取一个信号,如果信号量<=0,则阻塞;在非公平模式下,允许闯入。
  • void acquire(int permits).

上面2个方法都会抛出InterruptException,即在等待线程被“中断时”,将会抛出异常而返回。底层基于AQS.acquireSharedInterruptibly()

  • void acquireUninterruptibly():获取一个信号,不支持中断,当线程被中断时,此线程将继续等待,当线程确实从此方法返回后,将设置其中断状态。底层基于AQS.acquireShared();
  • void release():释放一个信号,直接导致信号量++。
  • boolean tryAcquire():获取一个信号,如果获取成功,则返回true。
  • 原文链接:[http://wely.iteye.com/blog/2228720]
时间: 2017-07-03

并发数据结构的相关文章

招募译者翻译并发数据结构

什么是并发数据结构? 引用wiki上的定义 In computer science, a concurrent data structure is a particular way of storing and organizing data for access by multiple computing threads (or processes) on a computer. 简而言之,并发数据结构即允许多线程同时访问(读和写)的数据结构. 并发数据结构中的算法的设计原则主要分两大类,li

并发数据结构-1.1 并发的数据结构的设计

原文链接,译文链接,译者:董明鑫,校对:周可人 随着多个处理器共享同一内存的机器在商业上的广泛使用,并发编程的艺术也产生了巨大的变化.当前的趋势向着低功耗芯片级多线程(CMT)发展,所以这样的机器一定会更加广泛的被使用. 共享内存多处理器是指并发的执行多个线程的系统,这些线程在共享的内存中通过数据结构通讯和同步.这些数据结构的效率对于性能是很关键的,而目前熟练掌握为多处理器机器设计高效数据结构这一技术的人并不多.对大多数人来说,设计并发的数据结构比设计单线程的难多了,因为并发执行的线程可能会多种

并发数据结构- 1.8 优先队列&amp;1.9 总结

原文链接,译文链接,译者:郭振斌,校对:周可人 1.8 优先队列 并发的优先队列是一个可线性化到顺序优先队列的数据结构,能够通过常用的优先队列语义提供insert和delete-min操作. 基于堆的优先队列 许多文献中提到的并发优先队列结构,其实是本书前面提到的可线性化堆结构.再一次的,这种结构的基本思想是在个别堆节点上使用细粒度锁,使线程在并行下也能够尽可能的访问数据结构的不同部分.设计这种并发堆的关键问题,在于传统自底向上的insert和自顶向下的delete-min操作有可能造成死锁.B

并发数据结构-1.5 链表

原文链接,译文链接,译者:huavben,校对:周可人 考虑支持插入,删除和查找操作的并发数据结构实现.如果这些操作只处理键值(译者注:而不处理具体值),这样的数据结构会是一个集合.如果一个数据值与每一个键关联起来,我们就得到了一部数据字典.由于他们都是密切相关的数据结构,一个并发的集合通常能够经过适当修改来实现一部字典.在接下来的三个小节中,我们将专注于利用linked lists,hash tables,和trees这三种不同的数据结构来实现集合. 试想我们使用一个linked list去实

并发数据结构-1.1.4 复杂度测量&amp;1.1.5 正确性

原文链接,译文链接,译者:张军,校对:周可人 1.1.4 复杂度测量 一个被广泛研究的方向是在理想化模型,如并行随机存取机上分析并发数据结构和算法的渐进复杂度[35, 122, 135].然而,很少有将这些数据结构放在一个真实的多处理器上进行建模的.这里有多种原因,大部分原因跟系统硬件架构与线程异步执行的相互作用有关.想想组合树(combining tree)的例子,虽然我们能通过计算(指令数)得到O(P/logP)的加速比,但这无法反映在实证研究中[52, 129].真实世界的行为是被上述其他

并发数据结构-1.1.2 阻塞技术

原文链接,译文链接,译者:周可人,校对:梁海舰 1.1.2 阻塞技术 在很多数据结构中,内存竞争所带来的不良现象和前文所说的顺序瓶颈带来的影响都可以通过使用细粒度锁机制来减小.在细粒度锁机制中,我们用多个粒度较小的锁来保护数据结构中的不同部分.这样做的目的是允许并发操作在它们不访问数据结构的相同部分时并行执行.这种方法也可以用于避免独立内存位置访问的额外竞争.在一些数据结构中,这种现象经常发生:举个例子,在哈希表中,对那些被哈希到不同哈希桶中的值的操作自然访问的是数据结构中的一部分. 对其他的数

并发数据结构-1.1.3 非阻塞技术

原文链接,译文链接,译者:Noodles,校对:周可人 1.1.3 非阻塞技术 正如前面讨论的那样,非阻塞实现主要目的是为了消除由锁带来的相关问题,为了形式化研究这一概念,多种非阻塞演进条件已经在相关文献有所研究了,如wait-freedom演进条件,lock-freedom演进条件,和obstruction-freedom演进条件.满足wait-free演进条件的操作是指在执行自身包含的有限步骤之后,保证操作必须完成,而不用考虑其他操作发生的时序,满足lock-free演进条件的操作是指在执行

并发数据结构:迷人的原子

随着多核CPU成为主流,并行程序设计亦成为研究领域的热门. 要想利用多核/多路CPU带来的强大功能,通常使用多线程来开发应用程序.但是要想拥有良好的硬件 利用率,仅仅简单的在多个线程间分割工作是不够的.还必须确保线程大部分时间在工作,而不是在等待 工作或等待锁定共享数据结构. 在不止一个线程访问共享数据时,所有线程都必须使用同步.如果线程间不进行协调,则没有任务可 以真正并行,更糟糕的是这会给程序带来毁灭性的错误. 现在让我们来看一下在.NET和D语言中的标准同步手段-锁定..NET下我们使用l

并发数据结构:Stack

在叙述并发Stack前,我们先来了解下非线程安全的Stack. Stack是一种线性数据结构,只能访问它的一端来存储或读取数据.Stack很像餐厅中的一叠盘子:将 新盘子堆在最上面,并从最上面取走盘子.最后一个堆在上面的盘子第一个被取走.因此Stack也被称为 后进先出结构(LIFO). Stack有两种实现方式:数组和列表.下面我们分别用这两种方式来实现一个简单的Stack.采用数组 实现代码如下: using System; using System.Collections.Generic;

【转载】并发数据结构

本文转载自http://shift-alt-ctrl.iteye.com/blog/1841084   请首先参考:http://shift-alt-ctrl.iteye.com/blog/1839142 一.BlockingDeque阻塞双端队列(线程安全): 注意ArrayDeque和LinkedList仅仅扩展了Deque,是非阻塞类型的双端队列. BlockingQueue单向队列,其内部基于ReentrantLock + Condition来控制同步和"阻塞"/"唤