Java NIO:非阻塞服务器

时间:2020-01-09 10:36:15  来源:igfitidea点击:

即使我们了解Java NIO非阻塞函数的工作方式("选择器","通道","缓冲区"等),设计非阻塞服务器仍然很困难。与阻塞IO相比,非阻塞IO包含多个挑战。这份非阻塞服务器教程将讨论非阻塞服务器的主要挑战,并为它们描述一些潜在的解决方案。

很难找到有关设计非阻塞服务器的良好信息。因此,本教程中提供的解决方案基于我自己的工作和想法。如果我们有其他选择甚至更好的主意,我将很高兴听到它们!我们可以在文章下方写评论,也可以给我发送电子邮件(请参阅"关于"页面),或者在Twitter上吸引我。

本教程中描述的思想是围绕Java NIO设计的。但是,我相信,只要这些想法具有类似"选择器"的构造,它们就可以在其他语言中重用。据我所知,这样的构造是由底层操作系统提供的,因此我们很有可能也可以使用其他语言来访问它。

无阻塞IO管道

无阻塞IO管道是处理无阻塞IO的组件链。这包括以非阻塞方式读取和写入IO。这是简化的无阻塞IO管道的说明:

组件使用选择器检查通道何时有要读取的数据。然后,组件读取输入数据并根据输入生成一些输出。输出再次被写入"通道"。

无阻塞的IO管道不需要读取和写入数据。某些管道可能仅读取数据,而某些管道可能仅写入数据。

上图仅显示了一个组件。无阻塞IO管道可能具有多个组件处理传入数据。无阻塞IO管道的长度取决于管道需要执行的操作。

非阻塞IO管道也可能同时从多个"通道"中读取。例如,从多个" SocketChannel"中读取数据。

上图中的控制流程也得到了简化。它是启动通过"选择器"从"通道"读取数据的组件。即使是上图所示,也不是"通道"将数据推入"选择器"并从那里推入组件。

非阻塞与阻塞IO管道

非阻塞和阻塞IO管道之间的最大区别是如何从底层的"通道"(套接字或者文件)读取数据。

IO管道通常从某个流(从套接字或者文件)读取数据,并将该数据拆分为一致的消息。这类似于将数据流分成令牌以使用令牌生成器进行解析。相反,我们将数据流分解为更大的消息。我将调用将该流分解为消息阅读器的消息的组件。这是消息阅读器将流分成消息的说明:

阻塞的IO管道可以使用类似InputStream的接口,可以一次从底层的Channel读取一个字节,并且可以使用类似InputStream的接口阻塞,直到准备好读取数据为止。这将导致阻塞的消息阅读器实现。

对流使用阻塞的IO接口可以大大简化消息阅读器的实现。阻塞的消息阅读器永远不必处理从流中未读取任何数据,或者仅从流中读取了部分消息并且以后需要恢复消息解析的情况。

同样,阻塞消息编写器(将消息写入流的组件)永远不必处理仅写入消息的一部分且以后必须恢复消息写入的情况。

阻止IO管道的缺点

尽管阻塞消息阅读器更易于实现,但它有一个不幸的缺点,即需要为每个需要拆分为消息的流分配一个单独的线程。之所以需要这样做,是因为每个流的IO接口都会阻塞,直到有一些数据要从中读取为止。这意味着单个线程无法尝试从一个流中读取,如果没有数据,则从另一个流中读取。一旦线程尝试从流中读取数据,线程就会阻塞,直到实际上有一些数据要读取为止。

如果IO管道是必须处理大量并发连接的服务器的一部分,则该服务器将为每个活动的传入连接需要一个线程。如果服务器在任何时候都只有几百个并发连接,那么这可能不是问题。但是,如果服务器具有数百万个并发连接,则这种设计的扩展性就不太好。每个线程将为其堆栈占用320K(32位JVM)和1024K(64位JVM)内存。因此,1.000.000线程将占用1 TB内存!也就是在服务器使用任何内存来处理传入消息之前(例如,为消息处理期间使用的对象分配的内存)。

为了减少线程数量,许多服务器使用一种设计,其中服务器保留一个线程池(例如100),该线程池一次从入站连接中读取消息。入站连接保留在队列中,线程按入站连接放入队列的顺序处理来自每个入站连接的消息。此设计在此处说明:

但是,此设计要求入站连接合理地发送数据。如果入站连接可能长时间处于非活动状态,则大量的非活动连接实际上可能会阻塞线程池中的所有线程。这意味着服务器的响应速度变慢甚至不响应。

一些服务器设计试图通过使线程池中的线程数具有一定的弹性来缓解此问题。例如,如果线程池用完了线程,则线程池可能会启动更多线程来处理负载。此解决方案意味着需要大量的慢速连接才能使服务器无响应。但是请记住,我们可以运行多少个线程仍然存在上限。因此,如果连接速度为1.000.000,连接速度很慢。

基本的无阻塞IO管道设计

无阻塞的IO管道可以使用单个线程从多个流中读取消息。这要求流可以切换到非阻塞模式。在非阻塞模式下,当我们尝试从流中读取数据时,流可能返回0个或者多个字节。如果流中没有要读取的数据,则返回0字节。当流实际有一些数据要读取时,将返回1+字节。

为了避免检查要读取的0字节流,我们使用Java NIO选择器。可以向一个"选择器"注册一个或者多个" SelectableChannel"实例。当我们在Selector上调用select()或者selectNow()时,它仅为我们提供实际上具有要读取的数据的SelectableChannel实例。此设计在此处说明:

阅读部分消息

当我们从" SelectableChannel"中读取一个数据块时,我们不知道该数据块包含的是少于还是多于一条消息。数据块可能包含部分消息(少于一条消息),完整消息或者多于一条消息,例如1.5或者2.5条消息。此处说明了各种部分消息的可能性:

处理部分消息有两个挑战:

  • 检测数据块中是否有完整的消息。
  • 在部分消息到达之前如何处理部分消息。

要检测完整消息,需要消息阅读器查看数据块中的数据,以查看数据中是否至少包含一条完整消息。如果数据块包含一个或者多个完整消息,则可以将这些消息沿管道发送以进行处理。查找完整消息的过程将重复很多,因此该过程必须尽可能快。

每当数据块中有部分消息时,无论是本身还是在一个或者多个完整消息之后,都需要存储该部分消息,直到该消息的其余部分从"通道"到达为止。

消息阅读器负责检测全部消息和存储部分消息。为了避免混合来自不同Channel实例的消息数据,我们将为每个Channel使用一个Message Reader。设计看起来像这样:

在检索到具有要从"选择器"中读取的数据的"通道"实例后,与该"通道"相关联的消息阅读器将读取数据,并尝试将其分解为消息。如果这导致读取任何完整的消息,则可以将这些消息沿读取管道向下传递到需要处理它们的任何组件。

消息阅读器当然是特定于协议的。消息阅读器需要知道它试图读取的消息的消息格式。如果我们的服务器实现可跨协议重用,则可能需要通过以某种方式接受消息阅读器工厂作为配置参数来插入消息阅读器实现。

存储部分消息

现在我们已经确定了消息阅读器的职责是存储部分消息,直到收到完整的消息为止,我们需要弄清楚应该如何实现部分消息存储。

我们应考虑两个设计注意事项:

  • 我们希望尽可能少地复制消息数据。复制越多,性能越低。
  • 我们希望完整的消息以连续的字节序列存储,以使解析消息更加容易。

每个消息阅读器的缓冲区

显然,部分消息需要存储在某种缓冲区中。直接的实现是在每个消息阅读器内部简单地具有一个缓冲区。但是,该缓冲区应该有多大?它必须足够大才能存储最大允许的消息。因此,如果允许的最大消息为1MB,则每个消息阅读器中的内部缓冲区至少需要为1MB。

当我们达到数百万个连接时,每个连接使用1MB的空间实际上是行不通的。 1.000.000 x 1MB仍然是1TB内存!如果最大邮件大小为16MB,该怎么办?还是128MB?

可调整大小的缓冲区

另一个选择是实现可调整大小的缓冲区,以供在每个Message Reader中使用。可调整大小的缓冲区将从较小的缓冲区开始,如果消息对于该缓冲区而言太大,则缓冲区将被扩展。这样,每个连接将不一定需要例如1MB缓冲区。每个连接仅占用它们容纳下一条消息所需的内存。

有几种方法可以实现可调整大小的缓冲区。它们都有优点和缺点,因此我将在以下各节中讨论它们。

按副本调整大小

实现可调整大小的缓冲区的第一种方法是从一个较小的缓冲区开始,例如4KB。如果消息无法容纳在4KB缓冲区中,请使用更大的缓冲区,例如可以分配8KB,并将4KB缓冲区中的数据复制到更大的缓冲区中。

"按副本调整大小"缓冲区实现的优点是,一条消息的所有数据都保存在一个连续的字节数组中。这使得解析消息变得更加容易。

"按副本调整大小"缓冲区实现的缺点是,它将导致针对较大消息复制大量数据。

为了减少数据复制,我们可以分析流经系统的消息的大小,以找到可以减少复制量的某些缓冲区大小。例如,我们可能会看到大多数消息都小于4KB,因为它们仅包含很小的请求/响应。这意味着第一个缓冲区大小应为4KB。

然后,我们可能会看到,如果一条消息大于4KB,通常是因为它包含一个文件。然后,我们可能会注意到,流经系统的大多数文件都小于128KB。然后,将第二个缓冲区的大小设置为128KB是有意义的。

最终,我们可能会看到,一旦消息超过128KB,就没有真正的消息大小模式,因此,最终的缓冲区大小应该仅仅是最大消息大小。

根据流经我们系统的消息大小使用这3种缓冲区大小,我们将在某种程度上减少数据复制。 4KB以下的消息将永远不会被复制。对于1.000.000并发连接,导致1.000.000 x 4KB = 4GB,这在今天(2014年)的大多数服务器中都是可能的。 4KB和128KB之间的消息将被复制一次,并且仅4KB数据将需要复制到128KB缓冲区中。 128KB和最大消息大小之间的消息将被复制两次。第一次将复制4KB,第二次将复制128KB,因此对于最大的消息,总共将复制132KB。假设没有太多消息超过128KB,这可能是可以接受的。

一旦消息已被完全处理,分配的内存应再次释放。这样,从同一连接接收到的下一条消息将再次以最小的缓冲区大小开始。必须确保在连接之间可以更有效地共享内存。很有可能并非所有连接都同时需要大缓冲区。

我有一个完整的教程,关于如何实现这样的内存缓冲区,该内存缓冲区在此处支持可调整大小的数组:Resizable Arrays。本教程还包含指向GitHub存储库的链接,其中包含显示有效实施的代码。

通过追加调整大小

调整缓冲区大小的另一种方法是使缓冲区由多个数组组成。当我们需要调整缓冲区的大小时,我们只需分配另一个字节数组,然后将数据写入其中即可。

有两种方法可以增加这种缓冲区。一种方法是分配单独的字节数组,并保留这些字节数组的列表。另一种方法是分配更大的共享字节数组的切片,然后保留分配给缓冲区的切片的列表。就个人而言,我认为切片方法稍好一些,但差异很小。

通过在缓冲区上添加单独的数组或者切片来增加缓冲区的优点是,在写入过程中无需复制任何数据。所有数据都可以直接从套接字(" Channel")直接复制到数组或者切片中。

以这种方式增长缓冲区的缺点是数据不会存储在单个连续的数组中。这使消息解析更加困难,因为解析器需要同时查找每个单个数组的末尾和所有数组的末尾。由于我们需要在书面数据中查找消息的结尾,因此使用此模型不太容易。

TLV编码的消息

某些协议消息格式使用TLV格式(类型,长度,值)进行编码。这意味着,当消息到达时,消息的总长度将存储在消息的开头。这样,我们立即知道要为整个消息分配多少内存。

TLV编码使内存管理更加容易。我们立即知道为该消息分配多少内存。在仅部分使用的缓冲区的末尾不会浪费任何内存。

TLV编码的一个缺点是,我们必须在消息的所有数据到达之前为消息分配所有内存。因此,一些发送大消息的速度较慢的连接可以分配我们所有可用的内存,从而使服务器无响应。

解决此问题的方法是使用一种消息格式,其中包含多个TLV字段。因此,将为每个字段分配内存,而不是为整个消息分配内存,并且仅在字段到达时才分配内存。尽管如此,大字段对大容量消息的影响仍与大消息相同。

另一个解决方法是超时,例如,在该时间内未接收到的消息。 10-15秒。这可以使服务器从同时出现的许多大消息中恢复过来,但仍然会使服务器在一段时间内无响应。此外,故意的DoS(拒绝服务)攻击仍可能导致为服务器完全分配内存。

TLV编码存在不同的变体。确切地使用了多少字节,因此指定字段的类型和长度取决于每个单独的TLV编码。也有TLV编码将字段的长度放在首位,然后是类型,然后是值(LTV编码)。尽管字段的顺序不同,但这仍然是TLV的变体。

TLV编码使内存管理更容易的事实是HTTP 1.1如此糟糕的协议的原因之一。这是他们试图在HTTP 2.0中解决的问题之一,在HTTP 2.0中,数据是以LTV编码的帧进行传输的。这也是为什么我们为使用TLV编码的VStack.co项目设计了自己的网络协议的原因。

编写部分消息

在无阻塞的IO管道中,写入数据也是一个挑战。当我们在非阻塞模式下在Channel上调用write(ByteBuffer)时,无法保证在ByteBuffer中有多少字节被写入。 write(ByteBuffer)方法返回已写入的字节数,因此可以跟踪已写入的字节数。这就是挑战:跟踪部分写入的消息,以便最后发送一条消息的所有字节。

为了管理将部分消息写入"通道",我们将创建一个消息编写器。就像使用消息阅读器一样,我们需要在每个将消息写入到的"频道"中使用消息编写器。在每个Message Writer中,我们都精确跟踪它当前正在写入的消息已写入多少字节。

如果到达消息编写器的消息超过了直接写入"通道"的消息数量,则需要在消息编写器内部将消息排队。然后,消息编写器将消息尽快写入"通道"。

这是显示到目前为止如何设计部分消息编写的图:

为了使Message Writer能够发送仅部分发送的较早消息,需要不时调用Message Writer,以便它可以发送更多数据。

如果我们有很多连接,则将有很多Message Writer实例。检查例如一百万个Message Writer实例查看它们是否可以写入任何数据很慢。首先,许多Message Writer实例很多没有任何要发送的消息。我们不想检查那些Message Writer实例。其次,并非所有的" Channel"实例都可以准备向其写入数据。我们不想浪费时间尝试将数据写入到仍然无法接受任何数据的"通道"中。

要检查"通道"是否已准备好写,我们可以使用"选择器"注册该通道。但是,我们不希望向Selector注册所有Channel实例。想象一下,如果我们有1.000.000个连接,其中大多数都是空闲的,而所有1.000.000个连接都已在"选择器"中注册。然后,当我们调用select()时,大多数这些Channel实例将是可写的(它们大多是空闲的,还记得吗?)。然后,我们必须检查所有这些连接的消息编写器,以查看它们是否有任何数据要写入。

为了避免检查所有Message Writer实例中的消息以及所有始终没有任何消息要发送给它们的Channel实例,我们使用了以下两步方法:

  • 将消息写入消息编写器时,消息编写器将其关联的"通道"注册到"选择器"(如果尚未注册)。
  • 当服务器有时间时,它将检查"选择器"以查看哪些已注册的"通道"实例已准备好进行写入。对于每个可写的"通道",要求其关联的消息编写器将数据写入"通道"。如果消息编写器将其所有消息写入其"通道",则将再次从"选择器"中取消注册"通道"。

这个小小的两步方法可以确保只有将要写入消息的Channel实例实际上是在Selector中注册的。

放在一起

如我们所见,非阻塞服务器需要不时检查传入数据,以查看是否接收到任何新的完整消息。服务器可能需要多次检查,直到收到一条或者多条完整消息为止。仅检查一次是不够的。

同样,非阻塞服务器需要不时检查是否有任何数据要写入。如果是,则服务器需要检查是否有任何相应的连接已准备好将数据写入其中。仅在消息第一次排队时进行检查是不够的,因为消息可能会被部分写入。

总而言之,一个非阻塞服务器最终需要定期执行以下三个"管道":

  • 读取管道,用于检查来自打开的连接的新传入数据。
  • 处理接收到的所有完整消息的处理管道。
  • 用于检查是否可以将任何传出消息写入任何打开的连接的写管道。

这三个管道在一个循环中重复执行。我们也许可以在某种程度上优化执行。例如,如果没有消息排队,则可以跳过写管道。或者,如果没有收到新的完整消息,则可以跳过流程管道。

服务器线程模型

GitHub存储库中的非阻塞服务器实现使用具有2个线程的线程模型。第一个线程接受来自ServerSocketChannel的传入连接。第二个线程处理接受的连接,这意味着读取消息,处理消息并将响应写回到连接。这2个线程模型在这里说明:

上一节中说明的服务器处理循环由处理线程执行。