在Reactor中,有两个非常重要的类,就是Mono和Flux,它们都是数据源,在它们内部都已经实现了“以数据为主线”和“在变化时通知处理者”这两个功能,而且还提供了方法让我们来插入逻辑代码用于“对变化做出反应”。
Mono表示0个或1个数据,Flux表示0到多个数据。先从简单的Mono开始。
设计一个简单的示例,首先创建一个数据源,只包含一个数据10,第一个处理就是加1,第二个处理就是奇偶性过滤,第三个处理就是把这个数据消费掉,然后就结束了。
为了清楚地看出来主线程执行的是哪些代码,工作线程执行的是哪些代码,特意打印了很多信息。
- public static void main(String[] args) {
- displayCurrTime(1);
- displayCurrThreadId(1);
- //创建一个数据源
- Mono.just(10)
- //延迟5秒再发射数据
- .delayElement(Duration.ofSeconds(5))
- //在数据上执行一个转换
- .map(n -> {
- displayCurrTime(2);
- displayCurrThreadId(2);
- displayValue(n);
- delaySeconds(2);
- return n + 1;
- })
- //在数据上执行一个过滤
- .filter(n -> {
- displayCurrTime(3);
- displayCurrThreadId(3);
- displayValue(n);
- delaySeconds(3);
- return n % 2 == 0;
- })
- //如果数据没了就用默认值
- .defaultIfEmpty(9)
- //订阅一个消费者把数据消费了
- .subscribe(n -> {
- displayCurrTime(4);
- displayCurrThreadId(4);
- displayValue(n);
- delaySeconds(2);
- System.out.println(n + " consumed, worker Thread over, exit.");
- });
- displayCurrTime(5);
- displayCurrThreadId(5);
- pause();
- }
- //显示当前时间
- static void displayCurrTime(int point) {
- System.out.println(point + " : " + LocalTime.now());
- }
- //显示当前线程Id
- static void displayCurrThreadId(int point) {
- System.out.println(point + " : " + Thread.currentThread().getId());
- }
- //显示当前的数值
- static void displayValue(int n) {
- System.out.println("input : " + n);
- }
- //延迟若干秒
- static void delaySeconds(int seconds) {
- try {
- TimeUnit.SECONDS.sleep(seconds);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- //主线程暂停
- static void pause() {
- try {
- System.out.println("main Thread over, paused.");
- System.in.read();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
(编辑:惠州站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|