非阻塞算法

时间:2020-01-09 10:35:51  来源:igfitidea点击:

并发上下文中的非阻塞算法是允许线程访问共享状态(或者以其他方式进行协作或者通信)而不会阻塞所涉及线程的算法。更笼统地说,如果一个线程的挂起不能导致该算法中涉及的其他线程的挂起,则该算法被称为非阻塞。为了更好地理解阻塞和非阻塞并发算法之间的区别,我将从解释阻塞算法开始,然后继续非阻塞算法。

阻塞并发算法

阻塞并发算法是以下一种算法:

  • 答:执行线程请求的操作-或者
  • B:阻塞线程,直到可以安全地执行操作为止

许多类型的算法和并发数据结构正在阻塞。例如,java.util.concurrent.BlockingQueue接口的不同实现都是阻塞数据结构。如果线程试图将元素插入到" BlockingQueue"中,并且队列没有空间,则插入线程将被阻塞(挂起),直到" BlockingQueue"具有用于新元素的空间。

下图说明了保护共享数据结构的阻塞算法的行为:

非阻塞并发算法

非阻塞并发算法是以下一种算法:

  • 答:执行线程请求的操作-或者
  • B:通知请求线程无法执行该操作

Java也包含几个非阻塞数据结构。 AtomicBoolean,AtomicInteger,AtomicLong和AtomicReference都是非阻塞数据结构的示例。

下图说明了保护共享数据结构的非阻塞算法的行为:

非阻塞与阻塞算法

阻塞和非阻塞算法之间的主要区别在于行为的第二步,如以上两节所述。换句话说,区别在于当无法执行请求的操作时,阻塞算法和非阻塞算法将执行以下操作:

阻塞算法将阻塞线程,直到可以执行请求的操作为止。非阻塞算法会通知请求该操作的线程该操作无法执行。

使用阻塞算法,线程可能会阻塞,直到可以执行请求的操作为止。通常,另一个线程的操作将使第一个线程执行请求的操作成为可能。如果由于某种原因其他线程被挂起(阻塞)在应用程序中的其他位置,从而无法执行使第一个线程请求的操作成为可能的动作,则第一个线程将无限期保持阻塞状态,或者直到另一个线程最终执行必要的操作行动。

例如,如果一个线程试图将一个元素插入完整的" BlockingQueue"中,则该线程将阻塞,直到另一个线程从" BlockingQueue"中获取了一个元素。如果由于某种原因,应该从" BlockingQueue"中获取元素的线程在应用程序中的其他位置被阻塞(挂起),则尝试插入新元素的线程将无限期地阻塞,或者直到线程被元素最终占用来自" BlockingQueue"的元素。

非阻塞并发数据结构

在多线程系统中,线程通常通过某种数据结构进行通信。这样的数据结构可以是任何类型,从简单的变量到更高级的数据结构(例如队列,映射,堆栈等)。为了促进多个线程正确并发地访问数据结构,必须通过某种并发算法来保护数据结构。保护算法使数据结构成为并发数据结构。

如果保护并发数据结构的算法正在阻塞(使用线程挂起),则称其为阻塞算法。因此,该数据结构被称为阻塞的并发数据结构。

如果保护并发数据结构的算法是非阻塞的,则称其为非阻塞算法。因此,该数据结构被称为非阻塞并发数据结构。

每个并发数据结构旨在支持某种通信方法。因此,可以使用哪种并发数据结构取决于通信需求。在以下各节中,我将介绍一些非阻塞的并发数据结构,并说明在什么情况下可以使用它们。这些非阻塞数据结构如何工作的解释应该使我们对如何设计和实现非阻塞数据结构有所了解。

易变变量

Java volatile变量是始终直接从主内存读取的变量。当将新值分配给易失性变量时,该值总是立即写入主存储器。这样可以保证volatile变量的最新值始终对在其他CPU上运行的其他线程可见。其他线程每次都会从主存储器而不是例如从主存储器读取volatile的值。线程正在运行的CPU的CPU缓存。

易变变量是非阻塞的。将值写入易失性变量是原子操作。它不能被打断。但是,对易失性变量执行的读取-更新-写入序列不是原子的。因此,如果由多个线程执行,则此代码仍可能导致争用情况:

volatile myVar = 0;

...
int temp = myVar;
temp++;
myVar = temp;

首先,易失变量" myVar"的值从主存储器中读取到临时变量中。然后将temp变量加1. 然后将temp变量的值分配给易失性myVar变量,这意味着它将被写回到主存储器中。

如果有两个线程执行此代码,并且两个线程都读取了" myVar"的值,然后将其加一个并将其写回主内存,则我们冒着风险,而不是将2而不是被添加到" myVar"变量中,被加(例如,两个线程都读取值19,递增到20,然后写回20)。

我们可能认为我们不会像上面那样编写代码,但是实际上上面的代码与此等效:

myVar++;

执行时,将" myVar"的值读入CPU寄存器或者本地CPU高速缓存,将其相加,然后将来自CPU寄存器或者CPU高速缓存的值写回到主存储器。

单写入情况

在某些情况下,只有一个线程写入共享变量,而多个线程读取该变量的值。当只有一个线程正在更新变量时,无论有多少线程正在读取变量,都不会发生竞争条件。因此,只要我们只有一个共享变量的编写者,就可以使用volatile变量。

当多个线程对共享变量执行读-更新-写操作序列时,就会发生争用条件。如果只有一个线程执行读-更新-写操作序列,而所有其他线程仅执行读操作,则没有竞争条件。

这是一个不使用同步但仍处于并发状态的单个编写器计数器:

public class SingleWriterCounter {

    private volatile long count = 0;

    /**
     * Only one thread Jan ever call this method,
     * or it will lead to race conditions.
     */
    public void inc() {
        this.count++;
    }

    /**
     * Many reading threads Jan call this method
     * @return
     */
    public long count() {
        return this.count;
    }
}

只要只有一个线程调用inc(),多个线程就可以访问该计数器的同一实例。我的意思不是一次线程。我的意思是,只有相同的单线程才允许调用inc()。多个线程可以调用count()。这不会导致任何比赛情况。

下图说明了线程将如何访问volatilecount变量:

基于易变变量的更高级的数据结构

可以创建使用易失性变量组合的数据结构,其中每个易失性变量仅由单个线程写入,并由多个线程读取。每个volatile变量可以由不同的线程(但只能是一个线程)写入。使用这样的数据结构,多个线程可能能够使用易失性变量以非阻塞的方式相互发送信息。

这是一个简单的double writer计数器类,显示了它的外观:

public class DoubleWriterCounter {

    private volatile long countA = 0;
    private volatile long countB = 0;

    /**
     * Only one (and the same from thereon) thread Jan ever call this method,
     * or it will lead to race conditions.
     */
    public void incA() { this.countA++;  }

    /**
     * Only one (and the same from thereon) thread Jan ever call this method,
     * or it will lead to race conditions.
     */
    public void incB() { this.countB++;  }

    /**
     * Many reading threads Jan call this method
     */
    public long countA() { return this.countA; }

    /**
     * Many reading threads Jan call this method
     */
    public long countB() { return this.countB; }
}

如我们所见," DoubleWriterCounter"现在包含两个volatile变量,以及两对增量和读取方法。只有单个线程可以调用incA(),并且只有单个线程可以调用incB()。它可以是调用incA()incB()的不同线程。许多线程都可以调用countA()countB()。这不会引起比赛条件。

DoubleWriterCounter可用于例如两个线程进行通信。这两个计数可以是产生的任务和消耗的任务。该图显示了两个线程通过类似于上面的数据结构进行通信:

精明的读者将认识到,通过使用两个SingleWriterCounter实例,我们可能已经达到了DoubleWriterCounter的效果。如果需要,我们甚至可以使用更多线程和SingleWriterCounter实例。

比较和交换的乐观锁定

如果我们确实需要多个线程来写入相同的共享变量,那么volatile变量将不够。我们将需要某种对该变量的排他访问。这是使用Java中的同步块可以看到这种互斥访问的方式:

public class SynchronizedCounter {
    long count = 0;

    public void inc() {
        synchronized(this) {
            count++;
        }
    }

    public long count() {
        synchronized(this) {
            return this.count;
        }
    }
}

注意" inc()"和" count()"方法如何都包含一个同步块。这就是我们要避免同步块和wait()``notify()调用等的原因。

除了两个同步块,我们还可以使用Java的原子变量之一。在这种情况下,是" AtomicLong"。这是使用AtomicLong代替相同的计数器类的样子:

import java.util.concurrent.atomic.AtomicLong;

public class AtomicCounter {
    private AtomicLong count = new AtomicLong(0);

    public void inc() {
        boolean updated = false;
        while(!updated){
            long prevCount = this.count.get();
            updated = this.count.compareAndSet(prevCount, prevCount + 1);
        }
    }

    public long count() {
        return this.count.get();
    }
}

此版本与以前的版本一样具有线程安全性。这个版本有趣的是inc()方法的实现。 inc()方法不再包含同步块。相反,它包含以下几行:

boolean updated = false;
while(!updated){
    long prevCount = this.count.get();
    updated = this.count.compareAndSet(prevCount, prevCount + 1);
}

这些行不是原子操作。这意味着,两个不同的线程可以调用inc()方法并执行long prevCount = this.count.get()语句,从而都可以获取该计数器的先前计数。但是,以上代码不包含任何竞争条件。

秘密是在while循环内的两行中的第二行。 compareAndSet()方法调用是一个原子操作。它将" AtomicLong"的内部值与期望值进行比较,如果两个值相等,则为" AtomicLong"设置一个新的内部值。比较和交换指令通常直接在CPU中支持compareAndSet()方法。因此,不需要同步,也不需要线程挂起。这样可以节省线程挂起的开销。

想象一下," AtomicLong"的内部值为20。然后有两个线程读取该值,并且两个线程都试图调用" compareAndSet(20,20 + 1)"。由于compareAndSet()是一个原子操作,因此线程将顺序执行此方法(一次执行一次)。

第一个线程会将期望值20(计数器的先前值)与" AtomicLong"的内部值进行比较。由于两个值相等,因此" AtomicLong"会将其内部值更新为21(20 +1)。 " updated"变量将被设置为" true",而" while"循环将停止。

现在第二个线程调用compareAndSet(20,20 + 1)。由于" AtomicLong"的内部值不再为20,因此该调用将失败。 " AtomicLong"的内部值将不会设置为21. " updated"变量将设置为" false",并且线程将在" while"循环中再旋转一次。这次它将读取值21并尝试将其更新为22. 如果在此期间,没有其他线程调用inc(),则第二次迭代将成功将AtomicLong更新为22.

为什么称之为乐观锁?

上一节中显示的代码称为乐观锁定。乐观锁不同于传统锁,有时也称为悲观锁。传统锁定使用同步块或者某种锁来阻止对共享内存的访问。同步的块或者锁可能导致线程被挂起。

乐观锁定允许所有线程创建共享内存的副本而没有任何阻塞。然后,线程可以对其副本进行修改,并尝试将其修改后的版本写回到共享内存中。如果没有其他线程对共享内存进行任何修改,则比较和交换操作将允许该线程将其更改写入共享内存。如果另一个线程已经更改了共享内存,则该线程将必须获取新副本,进行更改,然后尝试再次将它们写入共享内存。

之所以称为"乐观锁定",是因为线程在乐观的假设(即没有其他线程会同时对共享内存进行更改)下获得要更改的数据的副本并应用更改。当这种乐观假设成立时,线程就可以在不锁定的情况下更新共享内存。如果此假设为假,则浪费了工作,但仍未应用锁定。

乐观锁定通常在共享内存争用较低或者中等的情况下效果最好。如果共享内存上的内容过多,线程将浪费大量的CPU周期来复制和修改共享内存,以至于无法将更改写回到共享内存中。但是,如果共享内存上有很多内容,则无论如何都应考虑重新设计代码以减少争用。

乐观锁是不阻塞的

我在这里显示的乐观锁定机制是非阻塞的。如果某个线程获取了共享内存的副本并在尝试对其进行修改时(无论出于何种原因)被阻止,则不会阻止其他线程访问共享内存。

使用传统的锁/解锁范例,当线程锁定锁时,该锁将对所有其他线程保持锁定状态,直到拥有该锁的线程再次将其解锁。如果锁定该锁的线程在其他地方被阻塞,则该锁定将保持锁定状态很长时间,甚至可能是无限期的。

不可交换的数据结构

简单的比较交换乐观锁定适用于共享数据结构,其中整个数据结构可以在单个比较交换操作中与新的数据结构交换(交换)。但是,使用修改后的副本交换整个数据结构并非总是可能或者可行的。

想象一下,如果共享数据结构是一个队列。每个试图从队列中插入或者从队列中获取元素的线程都必须复制整个队列,并对复制进行所需的修改。这可以通过" AtomicReference"来实现。复制引用,复制并修改队列,然后尝试将" AtomicReference"中指向的引用交换到新创建的队列。

但是,大数据结构可能需要大量内存和CPU周期才能进行复制。这将使应用程序花费更多的内存,并在复制上浪费大量时间。这将影响应用程序的性能,尤其是在数据结构上的争用较高的情况下。此外,线程复制和修改数据结构所花费的时间越长,则其他某个线程将在其间修改数据结构的可能性就越大。如我们所知,如果另一个线程自复制以来已经修改了共享数据结构,则所有其他线程必须重新启动其复制-修改操作。这将进一步增加对性能和内存消耗的影响。

下一节将说明一种实现非阻塞数据结构的方法,该结构可以并发更新,而不仅仅是复制和修改。

共享预期的修改

线程可以共享对共享数据结构的预期修改,而不是复制和修改整个共享数据结构。想要对共享数据结构进行修改的线程的过程将变为:

  • 检查另一个线程是否已提交对数据结构的预期修改。
  • 如果没有其他线程提交预期的修改,请创建预期的修改(由一个对象表示),然后将该预期的修改提交到数据结构中(使用比较和交换操作)。
  • 进行共享数据结构的修改。
  • 删除对预期修改的引用,以向其他线程发出已进行预期修改的信号。

如我们所见,第二步可以阻止其他线程提交预期的修改。因此,第二步有效地充当了共享数据结构的锁定。如果一个线程成功提交了预期的修改,则在执行第一个预期的修改之前,没有其他线程可以提交预期的修改。

如果线程提交了预期的修改,然后又被阻止执行其他工作,则共享数据结构将被有效锁定。共享数据结构不会使用该数据结构直接阻止其他线程。其他线程可以检测到它们无法提交预期的修改并决定执行其他操作。显然,我们需要解决此问题。

可完成的预期修改

为了避免提交的预期修改可以锁定共享数据结构,提交的预期修改对象必须包含足够的信息,以便另一个线程可以完成修改。因此,如果提交预期修改的线程从未完成修改,则另一个线程可以代表它完成修改,并使共享数据结构可供其他线程使用。

这是说明上述非阻塞算法的蓝图的图:

修改必须作为一项或者多项比较和交换操作来执行。因此,如果两个线程试图完成预期的修改,则只有一个线程将能够执行任何比较和交换操作。一旦完成比较交换操作,完成该比较交换操作的进一步尝试将失败。

A-B-A问题

上面说明的算法可能遭受A-B-A问题。 A-B-A问题是指变量从A更改为B,然后再次变回A的情况。因此,对于另一个线程,不可能检测到该变量确实已更改。

如果线程A检查正在进行的更新,复制数据并被线程调度程序挂起,则线程B可能同时能够访问共享数据结构。如果线程B执行了数据结构的完整更新并删除了其预期的修改,则它将看起来像线程A,因为自从它复制数据结构以来就没有进行任何修改。但是,确实进行了修改。当线程A根据其现在已过期的数据结构副本继续执行其更新时,该数据结构将撤消线程B的修改。

下图从上述情况说明了A-B-A问题:

A-B-A解决方案

解决A-B-A问题的常见方法是不仅交换指向预期修改对象的指针,而且将指针与计数器组合在一起,并使用单个比较交换操作交换指针+计数器。在支持C和C ++之类的指针的语言中,这是可能的。因此,即使当前修改指针被设置回指向"无正在进行的修改",指针+计数器的计数器部分也将被递增,从而使更新对其他线程可见。

在Java中,我们不能将引用和计数器合并到一个变量中。取而代之的是,Java提供了" AtomicStampedReference"类,该类可以使用"比较并交换"操作原子地交换引用和标记。

非阻塞算法模板

下面是一个代码模板,旨在为我们提供有关如何实现非阻塞算法的想法。该模板基于本教程前面给出的描述。

注意:我不是非阻塞算法专家,所以下面的模板可能有一些错误。不要在我的模板上建立自己的非阻塞算法实现。该模板仅用于使我们了解非阻塞算法的代码外观。如果要实现自己的非阻塞算法,请首先研究一些实际的,有效的非阻塞算法实现,以了解有关如何在实践中实现它们的更多信息。

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicStampedReference;

public class NonblockingTemplate {

    public static class IntendedModification {
        public AtomicBoolean completed =
                new AtomicBoolean(false);
    }

    private AtomicStampedReference<IntendedModification>
        ongoingMod =
            new AtomicStampedReference<IntendedModification>(null, 0);

    //declare the state of the data structure here.

    public void modify() {
        while(!attemptModifyASR());
    }

    public boolean attemptModifyASR(){

        boolean modified = false;
    
        IntendedModification currentlyOngoingMod =
        ongoingMod.getReference();
        int stamp = ongoingMod.getStamp();
    
        if(currentlyOngoingMod == null){
            //copy data structure state - for use
            //in intended modification
        
            //prepare intended modification
            IntendedModification newMod =
            new IntendedModification();
        
            boolean modSubmitted = 
                ongoingMod.compareAndSet(null, newMod, stamp, stamp + 1);
        
            if(modSubmitted){
            
                //complete modification via a series of compare-and-swap operations.
                //note: other threads Jan assist in completing the compare-and-swap
                // operations, so some CAS Jan fail
            
                modified = true;
            }
    
        } else {
            //attempt to complete ongoing modification, so the data structure is freed up
            //to allow access from this thread.
        
            modified = false;
        }
    
        return modified;
    }
}

非阻塞算法难以实现

非阻塞算法很难正确设计和实现。在尝试实现自己的非阻塞算法之前,请查看是否没有人已经开发出可以满足我们需求的非阻塞算法。

Java已经提供了一些非阻塞的实现(例如,ConcurrentLinkedQueue),并且在未来的Java版本中很可能会获得更多的非阻塞算法的实现。

除了Java的内置非阻塞数据结构外,我们还可以使用一些开源的非阻塞数据结构。例如,LMAX Disrupter(类似于队列的数据结构)和来自Cliff Click的无阻塞HashMap。请参阅我的Java并发引用页面,以获取更多资源的链接。

非阻塞算法的好处

与阻塞算法相比,非阻塞算法有几个好处。本节将描述这些好处。

选择

非阻塞算法的第一个好处是,在无法执行其请求的操作时,可以选择对线程进行处理。请求线程可以选择做什么,而不仅仅是被阻止。有时,线程无能为力。在这种情况下,它可以选择阻止或者等待自身,从而将CPU释放给其他任务。但是至少给请求线程一个选择。

在单个CPU系统上,挂起一个无法执行所需动作的线程,然后让其他可以执行其工作的线程在CPU上运行也许是有意义的。但是,即使在单个CPU系统上,阻塞算法也可能导致死锁,饥饿和其他并发问题。

无死锁

非阻塞算法的第二个好处是,一个线程的挂起不会导致其他线程的挂起。这意味着不会发生死锁。不能阻止两个线程互相等待以释放所需的锁。由于线程无法执行其请求的操作时不会被阻塞,因此无法阻塞彼此之间的等待。非阻塞算法可能仍会导致活动锁定,在此情况下,两个线程将继续尝试某些操作,但始终被告知不可能(由于另一个线程的操作)。

没有线程悬浮

挂起和重新激活线程的成本很高。是的,随着操作系统和线程库变得更加高效,挂起和重新激活的成本已随着时间的推移而下降。但是,线程暂停和重新激活仍然需要付出高昂的代价。

每当线程被阻塞时,它就会被挂起,从而导致线程挂起和重新激活的开销。由于线程不会被非阻塞算法挂起,因此不会发生这种开销。这意味着CPU可能会花费更多的时间来执行实际的业务逻辑,而不是进行上下文切换。

在多CPU系统上,阻塞算法可能会对整体性能产生更大的影响。可以阻止CPU A上运行的线程等待CPU B上运行的线程。这降低了应用程序能够实现的并行度。当然,CPU A可以安排另一个线程运行,但是挂起和激活线程(上下文切换)非常昂贵。需要挂起的线程越少越好。

减少线程延迟

在这种情况下,延迟意味着请求的操作之间的时间变为可能,线程实际执行该操作。由于线程未在非阻塞算法中挂起,因此它们不必支付昂贵且缓慢的重新激活开销。这意味着当请求的动作变为可能时,线程可以更快地响应,从而减少其响应等待时间。

非阻塞算法通常通过忙于等待直到请求的动作变为可能而获得较低的延迟。当然,在非阻塞数据结构上具有高线程争用的系统中,CPU可能会在这些繁忙的等待期间消耗大量的周期。这是要记住的事情。如果数据结构具有高线程争用,则非阻塞算法可能不是最好的。但是,通常有很多方法可以重新设计应用程序以减少线程争用。