Vert.x Verticle

时间:2020-01-09 10:47:21  来源:igfitidea点击:

术语verticle是可以部署到Vert.x的组件的名称。在某些方面,Verticle类似于Java EE中的Servlet或者消息驱动的EJB。但是,顶点的工作方式不同,并且顶点,Servlet和EJB执行的并发模型也不同。本文将仔细研究如何在Vert.x中创建和部署顶点,以及顶点如何通过内部事件总线相互通信。

实施Verticle

我们可以通过创建扩展io.vertx.core.AbstractVerticle的类来实现verticle。这是一个示例verticle类:

import io.vertx.core.AbstractVerticle;

public class BasicVerticle extends AbstractVerticle {

}

类BasicVerticle扩展了AbstractVerticle,但没有任何函数。我们将在以下各节中添加它。

start()

AbstractVerticle类包含一个start()方法,我们可以在verticle类中重写该方法。 Vert.x在部署顶点并准备开始时调用start()方法。这是实现start()方法的样子:

public class BasicVerticle extends AbstractVerticle {

    @Override
    public void start() throws Exception {
        System.out.println("BasicVerticle started");
    }
}

start()方法是初始化顶点的地方。在start()方法内部,我们通常会创建例如HTTP或者TCP服务器,在事件总线上注册事件处理程序,部署其他verticle,或者ververt进行工作所需的其他任何东西。

AbstracVerticle类还包含start()的另一个版本,该版本将Future用作参数。这个Future可以用来异步地告诉Vert.x Verticle是否被成功部署。

我们将在本教程的后面部分看到如何使用两种版本的start()方法。

stop()

AbstractVerticle类还包含我们可以覆盖的stop()方法。当Vert.x关闭并且顶点需要停止时,将调用stop()方法。这是在我们自己的Verticle中重写stop()方法的示例:

public class BasicVerticle extends AbstractVerticle {

    @Override
    public void start() throws Exception {
        System.out.println("BasicVerticle started");
    }

    @Override
    public void stop() throws Exception {
        System.out.println("BasicVerticle stopped");
    }
}

部署Verticle

创建Verticle之后,我们需要将其部署在Vert.x中才能执行。这是我们部署一个Verticle的方法:

public class VertxVerticleMain {

    public static void main(String[] args) throws InterruptedException {
        Vertx vertx = Vertx.vertx();

        vertx.deployVerticle(new BasicVerticle());
    }
}

首先创建一个" Vertx"实例。其次,在" Vertx"实例上调用" deployVerticle()"方法,并以顶点实例(在此示例中为" BasicVerticle")作为参数。

Vert.x现在将在内部部署该Verticle。一旦Vert.x部署了该顶点,该顶点的start()方法就会被调用。

该顶点将异步部署,因此在deployVerticle()方法返回时,该顶点可能尚未部署。如果我们需要确切地知道什么时候完全部署了一个顶点,可以向deployVerticle()提供一个Handler实现。看起来是这样的:

vertx.deployVerticle(new BasicVerticle(), new Handler<AsyncResult<String>>() {
    @Override
    public void handle(AsyncResult<String> stringAsyncResult) {
        System.out.println("BasicVerticle deployment complete");
        }
    });

或者使用Java Lambda表达式:

vertx.deployVerticle(new BasicVerticle(), stringAsyncResult -> {
        System.out.println("BasicVerticle deployment complete");
});

从另一个Verticle部署一个Verticle

可以从另一个Verticle内部部署一个Verticle。这是一个例子:

public class BasicVerticle extends AbstractVerticle {

    @Override
    public void start() throws Exception {
        System.out.println("BasicVerticle started");

        vertx.deployVerticle(new SecondVerticle());
    }

    @Override
    public void stop() throws Exception {
        System.out.println("BasicVerticle stopped");
    }
}

使用事件总线

顶点侦听来自事件总线的传入消息,或者通过事件总线将消息写入其他顶点是非常常见的。因此,在以下各节中,我将向我们展示如何进行这两项操作。

接收消息

当一个顶点要侦听来自事件总线的消息时,它会侦听某个地址。地址只是我们可以自由选择的名称(字符串)。

多个顶点可以侦听同一地址上的消息。这意味着一个地址不是单个顶点唯一的。因此,地址更像是我们可以通过其通信的频道的名称。多个顶点可以侦听一个地址上的消息,并且多个顶点可以将消息发送到一个地址。

一个顶点可以通过继承自AbstractVerticle的vertx实例获得对事件总线的引用。

这是在给定地址上侦听消息的样子:

public class EventBusReceiverVerticle extends AbstractVerticle {

    public void start(Future<Void> startFuture) {

        vertx.eventBus().consumer("anAddress", message -> {
            System.out.println("1 received message.body() = "
                + message.body());
        });
    }
}

本示例显示了一个垂直记录,该垂直记录在Vert.x事件总线上记录了消息的使用者(侦听器)。使用者在地址" anAddress"中注册,这意味着它使用了通过事件总线发送到该地址的消息。

使用者是一个包含单个方法的处理程序对象。这就是为什么它在上面使用lambda表达式实现的原因。

传送消息

通过事件总线发送消息可以通过事件总线上的send()或者publish()方法来完成。

"发布"方法将消息发送到在给定地址上侦听的所有顶点。

send()方法仅将消息发送到其中一个侦听顶点。 Vert.x决定接收消息的哪个顶点。在撰写本文时,Vert.x文档说,使用"非严格循环"算法选择了一个顶点。基本上,这意味着Vert.x将尝试在侦听的顶点之间平均分配消息。这对于在多个顶点(例如线程或者CPU)上分配工作负载很有用。

这是一个部署两个事件总线使用者(侦听器)和一个事件总线发送者的示例。发件人将两条消息发送到给定地址。第一条消息是通过" publish()"方法发送的,因此两个使用者都收到了该消息。第二条消息是通过send()方法发送的,因此只有一个使用者会收到该消息。

Vertx vertx = Vertx.vertx();

vertx.deployVerticle(new EventBusReceiverVerticle("R1"));
vertx.deployVerticle(new EventBusReceiverVerticle("R2"));

Thread.sleep(3000);
vertx.deployVerticle(new EventBusSenderVerticle());
public class EventBusSenderVerticle extends AbstractVerticle {

    public void start(Future<Void> startFuture) {
        vertx.eventBus().publish("anAddress", "message 2");
        vertx.eventBus().send   ("anAddress", "message 1");
    }
}
public class EventBusReceiverVerticle extends AbstractVerticle {

    private String name = null;

    public EventBusReceiverVerticle(String name) {
        this.name = name;
    }

    public void start(Future<Void> startFuture) {
        vertx.eventBus().consumer("anAddress", message -> {
            System.out.println(this.name + 
                " received message: " +
                message.body());
        });
    }
}

如果运行此代码,我们将看到两个使用者(R1 + R2)都接收到第一条消息,而第二条消息仅由其中一个使用者接收。