Java 9反应流
Java 9 Reactive Streams允许我们实现非阻塞异步流处理。
这是将反应式编程模型应用于核心Java编程的重要一步。
如果您不熟悉反应式编程,请阅读"反应式宣言",并阅读有关"反应式流"的简短说明。
RxJava和Akka Streams已成为反应式流的流行实现。
现在,Java 9通过java.util.concurrent.Flow
API引入了反应流支持。
Java 9反应流
反应性流是关于流的异步处理的,因此应该有一个发布者和一个订阅者。
发布者发布数据流,而订阅者使用数据。
有时我们必须在发布者和订阅者之间转换数据。
处理器是位于最终发布者和订阅者之间的实体,用于转换从发布者接收的数据,以便订阅者可以理解它。
我们可以拥有一连串的处理器。
从上图中可以很清楚地看到,Processor既可以作为订阅者,也可以作为发布者。
Java 9 Flow API
Java 9 Flow API实现了反应式流规范。
Flow API是Iterator和Observer模式的组合。
Iterator在拉模型上工作,其中应用程序从源中拉出项目,而Observer在推模型上工作,并在将项目从源推到应用程序时做出反应。
Java 9 Flow API订阅者可以在订阅发布者的同时请求N个项目。
然后将项目从发布者推送到订阅者,直到没有其他项目可推送或者出现一些错误为止。
Java 9 Flow API类和接口
让我们快速看一下Flow API的类和接口。
java.util.concurrent.Flow
:这是Flow API的主要类。
此类封装了Flow API的所有重要接口。
这是最后一堂课,我们无法扩展。java.util.concurrent.Flow.Publisher
:这是一个功能接口,每个发布者都必须实现它的subscription方法,以添加给定的订阅者来接收消息。java.util.concurrent.Flow.Subscriber:每个订阅者都必须实现此接口。
订户中的方法以严格的顺序调用。
该接口中有四种方法:" onSubscribe":这是在订阅者订阅订阅者以接收发布者的消息时被调用的第一种方法。
通常我们调用subscription.request
来开始从处理器接收项目。onNext:从发布者那里收到项目时,将调用此方法,这是我们实现业务逻辑以处理流然后向发布者请求更多数据的地方。
onError:当发生不可恢复的错误时,将调用此方法,我们可以使用此方法清理任务,例如关闭数据库连接。
onComplete:类似于finally方法,并且在发布者没有生产其他任何商品并且关闭发布者时调用它。
我们可以使用它发送成功处理流的通知。java.util.concurrent.Flow.Subscription
:这用于在发布者和订阅者之间创建异步非阻塞链接。
订户调用其" request"方法以要求发布者提供商品。
它还具有取消方法来取消订阅,即关闭发布者和订阅者之间的链接。java.util.concurrent.Flow.Processor:此接口扩展了发布者和订阅者,用于在发布者和订阅者之间转换消息。
java.util.concurrent.SubmissionPublisher
:一个Publisher实现,它异步地将提交的项目发布给当前的订阅者,直到关闭为止。
它使用Executor框架。
我们将在反应流示例中使用此类来添加订户,然后向其提交项目。
Java 9反应流示例
让我们从一个简单的示例开始,在该示例中,我们将实现Flow API Subscriber接口并使用SubmissionPublisher创建发布者并发送消息。
流数据
假设我们有一个Employee类,该类将用于创建要从发布者发送到订阅者的流消息。
package com.theitroad.reactive.beans; public class Employee { private int id; private String name; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Employee(int i, String s) { this.id = i; this.name = s; } public Employee() { } @Override public String toString() { return "[id="+id+",name="+name+"]"; } }
我们还有一个实用程序类,可以为我们的示例创建一个雇员列表。
package com.theitroad.reactive_streams; import java.util.ArrayList; import java.util.List; import com.theitroad.reactive.beans.Employee; public class EmpHelper { public static List<Employee> getEmps() { Employee e1 = new Employee(1, "hyman"); Employee e2 = new Employee(2, "David"); Employee e3 = new Employee(3, "Lisa"); Employee e4 = new Employee(4, "Ram"); Employee e5 = new Employee(5, "Anupam"); List<Employee> emps = new ArrayList<>(); emps.add(e1); emps.add(e2); emps.add(e3); emps.add(e4); emps.add(e5); return emps; } }
订户
package com.theitroad.reactive_streams; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import com.theitroad.reactive.beans.Employee; public class MySubscriber implements Subscriber<Employee> { private Subscription subscription; private int counter = 0; @Override public void onSubscribe(Subscription subscription) { System.out.println("Subscribed"); this.subscription = subscription; this.subscription.request(1); //requesting data from publisher System.out.println("onSubscribe requested 1 item"); } @Override public void onNext(Employee item) { System.out.println("Processing Employee "+item); counter++; this.subscription.request(1); } @Override public void onError(Throwable e) { System.out.println("Some error happened"); e.printStackTrace(); } @Override public void onComplete() { System.out.println("All Processing Done"); } public int getCounter() { return counter; } }
Subscription变量用于保留引用,以便可以通过onNext方法发出请求。
使用`counter'变量来保持已处理项目数的计数,请注意,其值在onNext方法中增加了。
在我们的main方法中将使用它来等待执行完成,然后再结束主线程。订阅请求在onSubscribe方法中被调用以开始处理。
另请注意,在处理完该项目后,它再次以onNext方法调用,要求发布者处理下一个项目。" onError"和" onComplete"在这里没有太多内容,但是在现实情况下,当发生错误时应该使用它们来执行纠正措施,或者在处理成功完成时清理资源。
反应式流测试程序
对于示例,我们将使用SubmissionPublisher作为发布者,因此让我们看一下响应流实现的测试程序。
package com.theitroad.reactive_streams; import java.util.List; import java.util.concurrent.SubmissionPublisher; import com.theitroad.reactive.beans.Employee; public class MyReactiveApp { public static void main(String args[]) throws InterruptedException { //Create Publisher SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>(); //Register Subscriber MySubscriber subs = new MySubscriber(); publisher.subscribe(subs); List<Employee> emps = EmpHelper.getEmps(); //Publish items System.out.println("Publishing Items to Subscriber"); emps.stream().forEach(i -> publisher.submit(i)); //logic to wait till processing of all messages are over while (emps.size() != subs.getCounter()) { Thread.sleep(10); } //close the Publisher publisher.close(); System.out.println("Exiting the app"); } }
上面的代码中最重要的部分是发布者的"订阅"和"提交"方法调用。
我们应该始终关闭发布者,以避免任何内存泄漏。
当执行上述程序时,我们将得到以下输出。
Subscribed Publishing Items to Subscriber onSubscribe requested 1 item Processing Employee [id=1,name=hyman] Processing Employee [id=2,name=David] Processing Employee [id=3,name=Lisa] Processing Employee [id=4,name=Ram] Processing Employee [id=5,name=Anupam] Exiting the app All Processing Done
请注意,如果我们没有让main方法等待所有项目都经过处理的逻辑,那么我们将得到不想要的结果。
消息转换示例
处理器用于在发布者和订阅者之间转换消息。
假设我们有另一个订户,它希望处理不同类型的消息。
假设此新消息类型为"自由职业者"。
package com.theitroad.reactive.beans; public class Freelancer extends Employee { private int fid; public int getFid() { return fid; } public void setFid(int fid) { this.fid = fid; } public Freelancer(int id, int fid, String name) { super(id, name); this.fid = fid; } @Override public String toString() { return "[id="+super.getId()+",name="+super.getName()+",fid="+fid+"]"; } }
我们有一个新订阅者可以使用Freelancer流数据。
package com.theitroad.reactive_streams; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import com.theitroad.reactive.beans.Freelancer; public class MyFreelancerSubscriber implements Subscriber<Freelancer> { private Subscription subscription; private int counter = 0; @Override public void onSubscribe(Subscription subscription) { System.out.println("Subscribed for Freelancer"); this.subscription = subscription; this.subscription.request(1); //requesting data from publisher System.out.println("onSubscribe requested 1 item for Freelancer"); } @Override public void onNext(Freelancer item) { System.out.println("Processing Freelancer "+item); counter++; this.subscription.request(1); } @Override public void onError(Throwable e) { System.out.println("Some error happened in MyFreelancerSubscriber"); e.printStackTrace(); } @Override public void onComplete() { System.out.println("All Processing Done for MyFreelancerSubscriber"); } public int getCounter() { return counter; } }
处理器
重要的部分是"处理器"接口的实现。
由于我们要使用SubmissionPublisher,因此我们将对其进行扩展并在适用的地方使用它。
package com.theitroad.reactive_streams; import java.util.concurrent.Flow.Processor; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.SubmissionPublisher; import java.util.function.Function; import com.theitroad.reactive.beans.Employee; import com.theitroad.reactive.beans.Freelancer; public class MyProcessor extends SubmissionPublisher<Freelancer> implements Processor<Employee, Freelancer> { private Subscription subscription; private Function<Employee,Freelancer> function; public MyProcessor(Function<Employee,Freelancer> function) { super(); this.function = function; } @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(Employee emp) { submit((Freelancer) function.apply(emp)); subscription.request(1); } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onComplete() { System.out.println("Done"); } }
Function将用于将Employee对象转换为Freelancer对象。
我们将在onNext方法中将传入的Employee消息转换为Freelancer消息,然后使用SubmissionPublisher Submit方法将其发送给订阅者。
由于Processor同时作为订阅者和发布者,因此我们可以在最终发布者和订阅者之间创建处理器链。
邮件转换测试
package com.theitroad.reactive_streams; import java.util.List; import java.util.concurrent.SubmissionPublisher; import com.theitroad.reactive.beans.Employee; import com.theitroad.reactive.beans.Freelancer; public class MyReactiveAppWithProcessor { public static void main(String[] args) throws InterruptedException { //Create End Publisher SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>(); //Create Processor MyProcessor transformProcessor = new MyProcessor(s -> { return new Freelancer(s.getId(), s.getId() + 100, s.getName()); }); //Create End Subscriber MyFreelancerSubscriber subs = new MyFreelancerSubscriber(); //Create chain of publisher, processor and subscriber publisher.subscribe(transformProcessor); //publisher to processor transformProcessor.subscribe(subs); //processor to subscriber List<Employee> emps = EmpHelper.getEmps(); //Publish items System.out.println("Publishing Items to Subscriber"); emps.stream().forEach(i -> publisher.submit(i)); //Logic to wait for messages processing to finish while (emps.size() != subs.getCounter()) { Thread.sleep(10); } //Closing publishers publisher.close(); transformProcessor.close(); System.out.println("Exiting the app"); } }
阅读程序中的注释以正确理解它,最重要的变化是创建了生产者-处理器-用户链。
当执行上述程序时,我们将得到以下输出。
Subscribed for Freelancer Publishing Items to Subscriber onSubscribe requested 1 item for Freelancer Processing Freelancer [id=1,name=hyman,fid=101] Processing Freelancer [id=2,name=David,fid=102] Processing Freelancer [id=3,name=Lisa,fid=103] Processing Freelancer [id=4,name=Ram,fid=104] Processing Freelancer [id=5,name=Anupam,fid=105] Exiting the app All Processing Done for MyFreelancerSubscriber Done
取消订阅
我们可以使用订阅取消方法停止在订阅者中接收消息。
请注意,如果我们取消订阅,则订阅者将不会收到onComplete或者onError信号。
这是一个示例代码,其中订阅者仅使用3条消息,然后取消订阅。
@Override public void onNext(Employee item) { System.out.println("Processing Employee "+item); counter++; if(counter==3) { this.subscription.cancel(); return; } this.subscription.request(1); }
请注意,在这种情况下,在处理所有消息之前暂停主线程的逻辑将进入无限循环。
我们可以为此场景添加一些其他逻辑,可能是一些全局变量,以查找订户是否已停止处理或者取消了订户。