Vert.x Verticle
术语verticle是可以部署到Vert.x的组件的名称。在某些方面,Verticle类似于Java EE中的Servlet或者消息驱动的EJB。但是,顶点的工作方式不同,并且顶点,Servlet和EJB执行的并发模型也不同。本文将仔细研究如何在Vert.x中创建和部署顶点,以及顶点如何通过内部事件总线相互通信。
实施Verticle
我们可以通过创建扩展io.vertx.core.AbstractVerticle的类来实现verticle。这是一个示例verticle类:
import io.vertx.core.AbstractVerticle; public class BasicVerticle extends AbstractVerticle { }
类BasicVerticle扩展了AbstractVerticle,但没有任何函数。我们将在以下各节中添加它。
start()
AbstractVerticle类包含一个start()方法,我们可以在verticle类中重写该方法。 Vert.x在部署顶点并准备开始时调用start()
方法。这是实现start()
方法的样子:
public class BasicVerticle extends AbstractVerticle { @Override public void start() throws Exception { System.out.println("BasicVerticle started"); } }
start()方法是初始化顶点的地方。在start()
方法内部,我们通常会创建例如HTTP或者TCP服务器,在事件总线上注册事件处理程序,部署其他verticle,或者ververt进行工作所需的其他任何东西。
AbstracVerticle类还包含start()的另一个版本,该版本将Future用作参数。这个Future
可以用来异步地告诉Vert.x Verticle是否被成功部署。
我们将在本教程的后面部分看到如何使用两种版本的start()
方法。
stop()
AbstractVerticle
类还包含我们可以覆盖的stop()
方法。当Vert.x关闭并且顶点需要停止时,将调用stop()方法。这是在我们自己的Verticle中重写stop()
方法的示例:
public class BasicVerticle extends AbstractVerticle { @Override public void start() throws Exception { System.out.println("BasicVerticle started"); } @Override public void stop() throws Exception { System.out.println("BasicVerticle stopped"); } }
部署Verticle
创建Verticle之后,我们需要将其部署在Vert.x中才能执行。这是我们部署一个Verticle的方法:
public class VertxVerticleMain { public static void main(String[] args) throws InterruptedException { Vertx vertx = Vertx.vertx(); vertx.deployVerticle(new BasicVerticle()); } }
首先创建一个" Vertx"实例。其次,在" Vertx"实例上调用" deployVerticle()"方法,并以顶点实例(在此示例中为" BasicVerticle")作为参数。
Vert.x现在将在内部部署该Verticle。一旦Vert.x部署了该顶点,该顶点的start()
方法就会被调用。
该顶点将异步部署,因此在deployVerticle()
方法返回时,该顶点可能尚未部署。如果我们需要确切地知道什么时候完全部署了一个顶点,可以向deployVerticle()
提供一个Handler
实现。看起来是这样的:
vertx.deployVerticle(new BasicVerticle(), new Handler<AsyncResult<String>>() { @Override public void handle(AsyncResult<String> stringAsyncResult) { System.out.println("BasicVerticle deployment complete"); } });
或者使用Java Lambda表达式:
vertx.deployVerticle(new BasicVerticle(), stringAsyncResult -> { System.out.println("BasicVerticle deployment complete"); });
从另一个Verticle部署一个Verticle
可以从另一个Verticle内部部署一个Verticle。这是一个例子:
public class BasicVerticle extends AbstractVerticle { @Override public void start() throws Exception { System.out.println("BasicVerticle started"); vertx.deployVerticle(new SecondVerticle()); } @Override public void stop() throws Exception { System.out.println("BasicVerticle stopped"); } }
使用事件总线
顶点侦听来自事件总线的传入消息,或者通过事件总线将消息写入其他顶点是非常常见的。因此,在以下各节中,我将向我们展示如何进行这两项操作。
接收消息
当一个顶点要侦听来自事件总线的消息时,它会侦听某个地址。地址只是我们可以自由选择的名称(字符串)。
多个顶点可以侦听同一地址上的消息。这意味着一个地址不是单个顶点唯一的。因此,地址更像是我们可以通过其通信的频道的名称。多个顶点可以侦听一个地址上的消息,并且多个顶点可以将消息发送到一个地址。
一个顶点可以通过继承自AbstractVerticle的vertx实例获得对事件总线的引用。
这是在给定地址上侦听消息的样子:
public class EventBusReceiverVerticle extends AbstractVerticle { public void start(Future<Void> startFuture) { vertx.eventBus().consumer("anAddress", message -> { System.out.println("1 received message.body() = " + message.body()); }); } }
本示例显示了一个垂直记录,该垂直记录在Vert.x事件总线上记录了消息的使用者(侦听器)。使用者在地址" anAddress"中注册,这意味着它使用了通过事件总线发送到该地址的消息。
使用者是一个包含单个方法的处理程序对象。这就是为什么它在上面使用lambda表达式实现的原因。
传送消息
通过事件总线发送消息可以通过事件总线上的send()
或者publish()
方法来完成。
"发布"方法将消息发送到在给定地址上侦听的所有顶点。
send()
方法仅将消息发送到其中一个侦听顶点。 Vert.x决定接收消息的哪个顶点。在撰写本文时,Vert.x文档说,使用"非严格循环"算法选择了一个顶点。基本上,这意味着Vert.x将尝试在侦听的顶点之间平均分配消息。这对于在多个顶点(例如线程或者CPU)上分配工作负载很有用。
这是一个部署两个事件总线使用者(侦听器)和一个事件总线发送者的示例。发件人将两条消息发送到给定地址。第一条消息是通过" publish()"方法发送的,因此两个使用者都收到了该消息。第二条消息是通过send()
方法发送的,因此只有一个使用者会收到该消息。
Vertx vertx = Vertx.vertx(); vertx.deployVerticle(new EventBusReceiverVerticle("R1")); vertx.deployVerticle(new EventBusReceiverVerticle("R2")); Thread.sleep(3000); vertx.deployVerticle(new EventBusSenderVerticle());
public class EventBusSenderVerticle extends AbstractVerticle { public void start(Future<Void> startFuture) { vertx.eventBus().publish("anAddress", "message 2"); vertx.eventBus().send ("anAddress", "message 1"); } }
public class EventBusReceiverVerticle extends AbstractVerticle { private String name = null; public EventBusReceiverVerticle(String name) { this.name = name; } public void start(Future<Void> startFuture) { vertx.eventBus().consumer("anAddress", message -> { System.out.println(this.name + " received message: " + message.body()); }); } }
如果运行此代码,我们将看到两个使用者(R1 + R2)都接收到第一条消息,而第二条消息仅由其中一个使用者接收。