Grid Ops Java-TcpSocketsPort
要从TcpServer读取来自传入TCP连接的消息,请使用TcpSocketsPort。 TcpSocketsPort负责使用NIO Selector注册进入的连接(SocketChannel实例),从连接中读取数据并将数据分解为消息。
创建一个TcpSocketsPort
我们可以这样创建TcpSocketsPort
:
TcpPort tcpServer = GridOps.tcpServerBuilder().buildAndStart(); TcpSocketsPort tcpSocketsPort = GridOps.tcpSocketsPortBuilder().tcpServer(tcpServer).build();
读取消息
我们可以通过TcpSocketsPort对象的read()方法读取消息。这是一个例子:
MemoryBlock[] messages = new MemoryBlock[64]; int messageCount = tcpSocketsPort.read(messages);
read()方法将消息读取到MemoryBlock实例(实际上是TcpMessage实例,它是MemoryBlock的子类)。 TcpSocketsPort将把MemoryBlock实例写入到作为参数传递给read()方法的MemoryBlock数组中。从read()方法返回的值是读入MemoryBlock数组的消息数。
我们可以像这样遍历写入MemoryBlock数组中的消息:
int messageCount = tcpSocketsPort.read(messages); for(int i=0; i<messageCount; i++){ MemoryBlock message = messages[i]; }
这段代码可以简化为:
for(int i=0, n=tcpSocketsPort.read(messages); i<n; i++){ MemoryBlock message = messages[i]; }
不管我们是否喜欢这种简化,我都会由我们决定。
read()是非阻塞的
read()方法是非阻塞的。它会读取调用read()方法时收到的所有完整消息。一半接收到的消息被缓存在TcpSocketsPort内部,直到接收到完整的消息为止。可以从read()调用中获取0(零)条消息。
TcpMessage
TcpMessage类是MemoryBlock类的子类。 " MemoryBlock"是存储在更大的共享数组中的字节序列。 TcpMessage类添加了一些有关从中读取消息的TCP套接字的信息。当从TCP套接字读取消息时,返回的消息由TcpMessage
实例表示。因此,我们可以将读取的消息强制转换为TcpMessage
实例,如下所示:
for(int i=0, n=tcpSocketsPort.read(messages); i<n; i++){ TcpMessage message = (TcpMessage) messages[i]; }
写入消息
将消息写入由TcpSocketsPort管理的套接字需要执行以下步骤:
- 分配一个" TcpMessage"来保存响应。
- 将数据写入
TcpMessage
。 - 确定应将TcpMessage发送到哪个TcpSocket。
- 将TcpMessage与所需的TcpSocket排队。
- 将排队的消息写入其指定的" TcpSockets"。
完整的示例如下所示。实际的数据写到" TcpMessage"已经被省去了。这将在其他教程中介绍。
TcpServer tcpServer = GridOps.tcpServerBuilder().buildAndStart(); TcpSocketsPort tcpSocketsPort = GridOps.tcpSocketsPortBuilder().tcpServer(tcpServer).build(); MemoryBlock[] messages = new MemoryBlock[64]; for(int i=0, n=tcpSocketsPort.read(messages); i<n; i++){ TcpMessage request = (TcpMessage) messages[i]; TcpMessage response = tcpSocketsPort.allocateWriteMemoryBlock(128); //write data to response - left out for brevity // send response to tcpSocket request was received from response.tcpSocket = request.tcpSocket; tcpSocketsPort.enqueue(response); } // write as much as possible of enqueued messages to the sockets they // are enqueued for. tcpSocketsPort.writeToSockets();
注意,在此示例结尾处调用的writeToSockets()
方法不能保证将所有排队的消息都写入其指定的套接字。 writeToSockets()方法将向指定的套接字写入尽可能多的数据,但是如果对套接字的写操作未能写入任何字节(写入0字节),则它将不会尝试向该套接字写入更多数据调用writeToSockets()
的时间。
上例中从TcpSocketsPort读取消息并回写响应的部分通常应在循环内调用。这样,writeToSockets()
方法将在循环中每次迭代被调用一次,因此任何部分写入的消息最终都将写入其指定的套接字。这是一个简单的示例,在while
循环中显示了前面的示例:
TcpServer tcpServer = GridOps.tcpServerBuilder().buildAndStart(); TcpSocketsPort tcpSocketsPort = GridOps.tcpSocketsPortBuilder().tcpServer(tcpServer).build(); MemoryBlock[] messages = new MemoryBlock[64]; while(true) { for(int i=0, n=tcpSocketsPort.read(messages); i<n; i++){ TcpMessage request = (TcpMessage) messages[i]; TcpMessage response = tcpSocketsPort.allocateWriteMemoryBlock(128); // write data to response - left out for brevity // send response to tcpSocket request was received from response.tcpSocket = request.tcpSocket; tcpSocketsPort.enqueue(response); } // write as much as possible of enqueued messages to the sockets they // are enqueued for. tcpSocketsPort.writeToSockets(); }
在实际的应用程序中,我们可能会有不同的while()
子句,该子句对某些停止信号做出反应。本示例仅使用while(true)循环来说明有关重复读取和写入消息的要点。