Grid Ops Java-TcpSocketsPort

时间:2020-01-09 10:35:45  来源:igfitidea点击:

要从TcpServer读取来自传入TCP连接的消息,请使用TcpSocketsPort。 TcpSocketsPort负责使用NIO Selector注册进入的连接(SocketChannel实例),从连接中读取数据并将数据分解为消息。

创建一个TcpSocketsPort

我们可以这样创建TcpSocketsPort

TcpPort tcpServer = GridOps.tcpServerBuilder().buildAndStart();

TcpSocketsPort tcpSocketsPort =
        GridOps.tcpSocketsPortBuilder().tcpServer(tcpServer).build();

读取消息

我们可以通过TcpSocketsPort对象的read()方法读取消息。这是一个例子:

MemoryBlock[] messages = new MemoryBlock[64];

int messageCount = tcpSocketsPort.read(messages);

read()方法将消息读取到MemoryBlock实例(实际上是TcpMessage实例,它是MemoryBlock的子类)。 TcpSocketsPort将把MemoryBlock实例写入到作为参数传递给read()方法的MemoryBlock数组中。从read()方法返回的值是读入MemoryBlock数组的消息数。

我们可以像这样遍历写入MemoryBlock数组中的消息:

int messageCount = tcpSocketsPort.read(messages);

for(int i=0; i<messageCount; i++){
    MemoryBlock message = messages[i];
}

这段代码可以简化为:

for(int i=0, n=tcpSocketsPort.read(messages); i<n; i++){
    MemoryBlock message = messages[i];
}

不管我们是否喜欢这种简化,我都会由我们决定。

read()是非阻塞的

read()方法是非阻塞的。它会读取调用read()方法时收到的所有完整消息。一半接收到的消息被缓存在TcpSocketsPort内部,直到接收到完整的消息为止。可以从read()调用中获取0(零)条消息。

TcpMessage

TcpMessage类是MemoryBlock类的子类。 " MemoryBlock"是存储在更大的共享数组中的字节序列。 TcpMessage类添加了一些有关从中读取消息的TCP套接字的信息。当从TCP套接字读取消息时,返回的消息由TcpMessage实例表示。因此,我们可以将读取的消息强制转换为TcpMessage实例,如下所示:

for(int i=0, n=tcpSocketsPort.read(messages); i<n; i++){
    TcpMessage message = (TcpMessage) messages[i];
}

写入消息

将消息写入由TcpSocketsPort管理的套接字需要执行以下步骤:

  • 分配一个" TcpMessage"来保存响应。
  • 将数据写入TcpMessage
  • 确定应将TcpMessage发送到哪个TcpSocket。
  • 将TcpMessage与所需的TcpSocket排队。
  • 将排队的消息写入其指定的" TcpSockets"。

完整的示例如下所示。实际的数据写到" TcpMessage"已经被省去了。这将在其他教程中介绍。

TcpServer tcpServer = GridOps.tcpServerBuilder().buildAndStart();

TcpSocketsPort tcpSocketsPort =
        GridOps.tcpSocketsPortBuilder().tcpServer(tcpServer).build();

MemoryBlock[] messages = new MemoryBlock[64];

for(int i=0, n=tcpSocketsPort.read(messages); i<n; i++){
    TcpMessage request  = (TcpMessage) messages[i];
    TcpMessage response = tcpSocketsPort.allocateWriteMemoryBlock(128);

    //write data to response - left out for brevity

    // send response to tcpSocket request was received from
    response.tcpSocket = request.tcpSocket;

    tcpSocketsPort.enqueue(response);
}

// write as much as possible of enqueued messages to the sockets they
// are enqueued for.
tcpSocketsPort.writeToSockets();

注意,在此示例结尾处调用的writeToSockets()方法不能保证将所有排队的消息都写入其指定的套接字。 writeToSockets()方法将向指定的套接字写入尽可能多的数据,但是如果对套接字的写操作未能写入任何字节(写入0字节),则它将不会尝试向该套接字写入更多数据调用writeToSockets()的时间。

上例中从TcpSocketsPort读取消息并回写响应的部分通常应在循环内调用。这样,writeToSockets()方法将在循环中每次迭代被调用一次,因此任何部分写入的消息最终都将写入其指定的套接字。这是一个简单的示例,在while循环中显示了前面的示例:

TcpServer tcpServer = GridOps.tcpServerBuilder().buildAndStart();

TcpSocketsPort tcpSocketsPort =
        GridOps.tcpSocketsPortBuilder().tcpServer(tcpServer).build();

MemoryBlock[] messages = new MemoryBlock[64];

while(true) {
    for(int i=0, n=tcpSocketsPort.read(messages); i<n; i++){
        TcpMessage request  = (TcpMessage) messages[i];
        TcpMessage response = tcpSocketsPort.allocateWriteMemoryBlock(128);

        // write data to response - left out for brevity

        // send response to tcpSocket request was received from
        response.tcpSocket = request.tcpSocket;

        tcpSocketsPort.enqueue(response);
    }

    // write as much as possible of enqueued messages to the sockets they
    // are enqueued for.
    tcpSocketsPort.writeToSockets();
}

在实际的应用程序中,我们可能会有不同的while()子句,该子句对某些停止信号做出反应。本示例仅使用while(true)循环来说明有关重复读取和写入消息的要点。