加入收藏 | 设为首页 | 会员中心 | 我要投稿 惠州站长网 (https://www.0752zz.com.cn/)- 办公协同、云通信、物联设备、操作系统、高性能计算!
当前位置: 首页 > 建站 > 正文

1小时让你掌握响应式编程,并入门Reactor

发布时间:2019-09-25 18:11:45 所属栏目:建站 来源:Java之高级架构
导读:副标题#e# 我看同步阻塞 你知道什么是同步阻塞吗,当然知道了。那你怎么看它呢,这个。。。 在同步阻塞的世界里,代码执行到哪里,数据就跟到哪里。如果数据很慢跟不上来,代码就停在那里等待数据的到来,然后再带着数据一起往下执行。 可以说是,代码执行

在Reactor中,有两个非常重要的类,就是Mono和Flux,它们都是数据源,在它们内部都已经实现了“以数据为主线”和“在变化时通知处理者”这两个功能,而且还提供了方法让我们来插入逻辑代码用于“对变化做出反应”。

Mono表示0个或1个数据,Flux表示0到多个数据。先从简单的Mono开始。

设计一个简单的示例,首先创建一个数据源,只包含一个数据10,第一个处理就是加1,第二个处理就是奇偶性过滤,第三个处理就是把这个数据消费掉,然后就结束了。

为了清楚地看出来主线程执行的是哪些代码,工作线程执行的是哪些代码,特意打印了很多信息。

  1. public static void main(String[] args) { 
  2.  displayCurrTime(1); 
  3.  displayCurrThreadId(1); 
  4.  //创建一个数据源 
  5.  Mono.just(10) 
  6.  //延迟5秒再发射数据 
  7.  .delayElement(Duration.ofSeconds(5)) 
  8.  //在数据上执行一个转换 
  9.  .map(n -> { 
  10.  displayCurrTime(2); 
  11.  displayCurrThreadId(2); 
  12.  displayValue(n); 
  13.  delaySeconds(2); 
  14.  return n + 1; 
  15.  }) 
  16.  //在数据上执行一个过滤 
  17.  .filter(n -> { 
  18.  displayCurrTime(3); 
  19.  displayCurrThreadId(3); 
  20.  displayValue(n); 
  21.  delaySeconds(3); 
  22.  return n % 2 == 0; 
  23.  }) 
  24.  //如果数据没了就用默认值 
  25.  .defaultIfEmpty(9) 
  26.  //订阅一个消费者把数据消费了 
  27.  .subscribe(n -> { 
  28.  displayCurrTime(4); 
  29.  displayCurrThreadId(4); 
  30.  displayValue(n); 
  31.  delaySeconds(2); 
  32.  System.out.println(n + " consumed, worker Thread over, exit."); 
  33.  }); 
  34.  displayCurrTime(5); 
  35.  displayCurrThreadId(5); 
  36.  pause(); 
  37. //显示当前时间 
  38. static void displayCurrTime(int point) { 
  39.  System.out.println(point + " : " + LocalTime.now()); 
  40. //显示当前线程Id 
  41. static void displayCurrThreadId(int point) { 
  42.  System.out.println(point + " : " + Thread.currentThread().getId()); 
  43. //显示当前的数值 
  44. static void displayValue(int n) { 
  45.  System.out.println("input : " + n); 
  46. //延迟若干秒 
  47. static void delaySeconds(int seconds) { 
  48.  try { 
  49.  TimeUnit.SECONDS.sleep(seconds); 
  50.  } catch (InterruptedException e) { 
  51.  e.printStackTrace(); 
  52.  } 
  53. //主线程暂停 
  54. static void pause() { 
  55.  try { 
  56.  System.out.println("main Thread over, paused."); 
  57.  System.in.read(); 
  58.  } catch (IOException e) { 
  59.  e.printStackTrace(); 
  60.  } 

(编辑:惠州站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

推荐文章
    热点阅读