SpringBoot异步编程-1
大约 19 分钟
RSocket 简介
微服务调用
- 在当今项目应用环境中,通过 SpringBoot 开发框架可以直接基于 JSON 数据响应形式方便的实现 Rest 处理架构,同时为了减少服务的体积,往往会将一个完整的服务拆分为老干个微服务,每个微服务提供不同的业务逻辑,同时微服务彼此之间也可以互相调用

REST 与 HTTP 协议
- REST 是一个简单并且容易使用的异构处理架构,REST 对于浏览器有着非常友好的支持同时也便于开发者进行测试,然而,当前所采用的 Rest 架构大多都是基于 HTTP/1.1 协议实现的,它存在有如下的问题:
- HTTP/1.1 采用的是重文本传输,在一些时候会给微服务的交互带来巨大的负荷
- HTTP 属于无状态协议,对于一些附加头信息往往只能采用非压缩的方式进行传输;
- HTTP/1.1 协议属于一元操作,所以用户每发送一个请求才可以得到一个响应,在未接收到响应之前不能够发送其它请求;
- HTTP/1.1 基于 TCP 协议完成,所以需要采用三次握手的方式以保证可靠连接,这样的操作会非常耗时,从而影响到微服务的整体设计性能。
RSocket
- 虽然在浏览器和后端之间,使用 Rest 是最佳的做法,但是为了避免以上的问题,所以需要采用一些比 Rest 更好的方式来实现微服务之间的通讯处理。在这样的背景下由 Facebook,Netifi 和 Pivotal 等工程师开发了一个新的 RSocket 通信协议,该协议采用二进制点对点数据传输,主要应用于分布式架构之中,是一种基于 ReactiveStreams 规范标准实现的新的网络通信第七层(应用层)协议,随着响应式编程技术的不断普及 RSocket 协议在网络通信(特别是移动通讯)中会有着非常重要的发展前景。
RSocket 协议特点
- RSocket 协议具有多路复用(Multiplexed)、双向流(Bidirectional Streaming)流控(Flow Control)、连接恢复(Socket Resumption)、异步消息传递(Asynchronous Message Passing)、传输层解耦和(Transport independent)等主要特点
多路复用二进制协议(Multiplexed Binary Protocol)
- 在 HTTP 3.0 标准以前所有的 HTTP 协议都是基于 TCP 协议实现的,所以在 HTTP/1.0 协议版本中每一次用户的请求对服务器端都需要创建有一个新的 TCP 连接(3 次握手与 4 次挥手),而为了解决 TCP 性能的问题,在 HTTP/1.1 协议版本中提出了 TCP 连接复用的支持,但是此时的连接复用在每次只允许有一个用户的请求进行处理,而当该请求处理完成后才允许其他请求继续使用此 TCP 连接进行请求处理,这样来红果菜一个请求的处理现化非觉耗时,则会,致后续這求处理性能下降。所以为二进,艺解决请求处理性能的问题,在 HTTP/2.0 中对连接操作进行了进一步改进,允许的请求处理,这样即便某一个请求操作耗时,但是也不会影响到整体的处理性能,如图所示。但是基于 TCP 协议实现的 HTTP 协议始终会存在有性能问题,所以在 HTTP/3.0 协议版本中使用 QUIC 作为新的传输层协议,QUIC 基于 UDP 协议实现,同时也自带多路复用结构。

RSocket 传输
- 在 HTTP/2.0 协议中重点的问题是解决了 TCP 连接多路复用的问题,但是在 HTTP 协议中一切的数据都是以文本的形式进行传输,所以在实际开发中就会存在有数据传输过大以及传输结构受限的问题,而 RSocket 是一个二进制协议,可以方便的进行各种数据的传输,同时没有数据格式的限制,用户也可以根据自身的需要进行压缩处理。在 RSocket 中将消息体分为数据(data)和元数据(metadata)两个组成部分,这样可以保证在高速数据传输下依然可以对外暴露少量元数据给其他服务使用。

双向流(Bidirectional Streaming)
- RSocket 实现了双向流通讯支持利用双向流可以实现服务端与客户端之间的通讯处理客户端可以向服务器端发送请求,这样在请求与响应的处理过程中,服务器端也可以向客户端发送请求

RSocket 四种数据交互模式:
- Request-And-Response:请求/响应,类似于 HTTP 的通信特点,提供异步通信与多路复用支持;
- Request-Response-Stream:请求/流式响应,一个请求对应多个流式的响应,例如:获取视频列表或产品列表;
- Fire-And-Forget:异步触发,不需要响应,可以用于进行日志记录;
- Channel(bi-directional streams):双向异步通讯,消息流在服务端与客户端两个方向上异步流动;
流控(Flow Control)
- 在分布式的项目开发环境之中,如果说生产者生产的数据过快,就会导致消费者无法及时进行处理,肯最终就有可能出现内存与 CPU 的占用率增高,从而出现服务端或客户端无响应的状况,而如果没有进行良好的实现控制,那么就有可能会由于雪崩问题而导致整个应用集群的瘫痪,如图所示。为了避免这样的情况出现,就需要有一套流控机制来协调生产者与消费者之间的处理速度。

在 RSocket 中提供了 Stream Level 流量控制,由于 RSocket 作为一个应用层协议,所以采取的并不是基于字节的网络层实现流控,而是基于应用层帧数的流量控制(控制生产者生产的消息数量)
连接恢复(Socket Resumption)
- 由于移动网络的兴起,所以在网络连接的稳定性上就出现了较大的挑战,当网络出现故障后应及时的进行连接恢复,在 RSocket 中提供有连接恢复(ConnectionResumption)功能,同时为了简化用户的处理操作,在连接恢复成功后用户不会有任何的感知,而在连接恢复失败时才会通过 onError 事件触发相应的回调函数,这样在进行 Stream 时可以保持响应,同时减少重复数据信息的传输,因为在多路复用的结构中如果重复传输则意味着网络压力的增加。
RSocket 连接恢复
- RSocket 中提供的“Socket Resumption”恢复机制,恢复实现的核心原理在于重新建立网络连接后不从头处理用户请求,客户端和服务端需要能够在连接中断后的一段时而在连接恢复后,客户端会将此状态间内自动的保存该 Connection 上的 Stream 状态,信息发送给服务器端,服务器端会进行恢复判断,如果成功恢复则继续之前的 Stream 操作

异步消息传递(Asynchronous Message Passing)
- RSocket 的协议在进行数据传输时采用的是异步消息传递的形式,所传输的内容为 Frame(应用层帧,例如:FrameHeader、RESUME 等),同时在 RSocket 传输中并不像 HTTP 协议那样包含有明确的目标访问路径,所有的访问全部由路由模块负责实现。RSocket 协议在数据传输时消息使用帧来进行封装的,每个帧可能是请求内容、响应内容或与协议相关的数据信息,而一个应用消息可能被切分为多个不同的片段以保存在一个帧中。
传输层解耦和(Transport independent)
- RSocket 协议是一个应用层的面向连接协议,不依赖于传输层协议,所以可以由用户自由的选择不同的应用场景,例如:在进行数据中心构建时可以使用 TCP 处理,而在进行浏览器异步交互时,可以使用 WebSocket 处理,在进行 HTTP 服务时可以使用 HTTP/2.0 处理。
RSocket 基础开发
RSocket 程序开发
- 在 SpringBoot 中对 RSocket 编程提供了良好的支持,开发者直接在项目中引入 spring-boot-starter-rsocket”依赖库,即可实现 RSocket 程序开发

1、
RSocket四种数据交互模式:
Request-And-Response:请求/响应,类似于HTTP的通信特点,提供异步通信与多路复用支持;
Request-Response-Stream:请求/流式响应,一个请求对应多个流式的响应,例如:获取视频列表或产品列表;
Fire-And-Forget:异步触发,不需要响应,可以用于进行日志记录;
Channel(bi-directional streams):双向异步通讯,消息流在服务端与客户端两个方向上异步流动;
2、
// https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-rsocket
implementation group: 'org.springframework.boot', name: 'spring-boot-starter-rsocket', version: '2.4.5'
3、
project('microboot-rsocket-base') { // 子模块
dependencies { // 配置子模块依赖
compile('org.springframework.boot:spring-boot-starter-web')// 引入SpringBoot依赖
compile('org.springframework.boot:spring-boot-starter-rsocket')
}
}
4、
package com.yootk.rsocket.server.handler;
import io.rsocket.Payload;
import io.rsocket.RSocket; // 原生的RSocket官方提供
import io.rsocket.util.DefaultPayload;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j // 日志组件
public class MessageRSocketHandler implements RSocket {
@Override
public Mono<Void> fireAndForget(Payload payload) { // 无响应
// 一般这种无响应的操作可以应用于日志记录的模式上,例如:客户端发送一个日志,是不需要等待响应的
// Payload表示所有的附加数据信息,对于RSocket通讯来讲,所有的数据项都通过此结构传输
String message = payload.getDataUtf8(); // 获取数据
log.info("【FireAndForget】接收请求数据:{}", message);
return Mono.empty(); // 返回一个空消息
}
@Override
public Mono<Payload> requestResponse(Payload payload) { // 传统模式,有请求有响应
String message = payload.getDataUtf8(); // 获取数据
log.info("【RequestAndResponse】接收请求数据:{}" , message);
return Mono.just(DefaultPayload.create("【ECHO】" + message)); // 进行数据响应处理
}
@Override
public Flux<Payload> requestStream(Payload payload) { // 处理流数据
String message = payload.getDataUtf8(); // 获取数据
log.info("【RequestStream】接收请求数据:{}" , message);
return Flux.fromStream(message.chars() // 将接收到的字符串转换为一个int流数据
.mapToObj(c -> Character.toUpperCase((char) c)) // 获取里面的每一个字符的编码,并且转大写
.map(Object::toString) // 利用toString()方法将字符转为String
.map(DefaultPayload::create)); // 创建Payload的附加数据
}
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) { // 双向流
return Flux.from(payloads).map(Payload :: getDataUtf8).map(msg ->{
log.info("【RequestChannel】接收请求数据:{}", msg);
return msg; // 直接返回发送的数据内容
}).map(DefaultPayload :: create);
}
}
5、
package com.yootk.rsocket.server.acceptor;
import com.yootk.rsocket.server.handler.MessageRSocketHandler;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import reactor.core.publisher.Mono;
public class MessageRSocketAcceptor implements SocketAcceptor {
@Override
public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) { // 实现了RSocket连接处理
return Mono.just(new MessageRSocketHandler()); // 配置自己的处理类
}
}
6、
package com.yootk.rsocket.server;
import com.yootk.rsocket.server.acceptor.MessageRSocketAcceptor;
import io.rsocket.core.RSocketServer;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.server.TcpServerTransport;
import reactor.core.Disposable;
public class MessageServer { // 实现RSocket状态的操作控制
private static Disposable disposable; // 用于释放任务
public static void start() { // 服务启动
RSocketServer rSocketServer = RSocketServer.create(); // 创建RSocket服务端
rSocketServer.acceptor(new MessageRSocketAcceptor()); // 创建连接器
rSocketServer.payloadDecoder(PayloadDecoder.ZERO_COPY); // 采用零拷贝技术实现
disposable = rSocketServer.bind(TcpServerTransport.create(6565)).subscribe(); // 开启订阅
}
public static void stop() { // 服务启动
disposable.dispose(); // 释放
}
}
7、
package com.yootk.test;
import com.yootk.rsocket.server.MessageServer;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketConnector;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.DefaultPayload;
import org.junit.jupiter.api.*;
import reactor.core.publisher.Flux;
import java.time.Duration;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class) // 手工配置方法执行顺序
public class TestMessageServer { // 编写测试类
private static RSocket rsocket;
@BeforeAll // 在所有测试开始之前执行
public static void setUpClient() {
MessageServer.start(); // 进行服务启动
rsocket = RSocketConnector.connectWith(TcpClientTransport.create(6565)).block(); // 客户端需要进行连接
}
@Test
public void testFireAndForget() { // 测试RSocket模式
getRequestPayload().flatMap(payload -> rsocket.fireAndForget(payload))
.blockLast(Duration.ofMinutes(1));
}
private static Flux<Payload> getRequestPayload() { // 传递所有的附加数据内容
return Flux.just("yootk.com", "springboot", "edu.yootk.com", "springcloud", "redis", "netty", "elasticsearch")
.delayElements(Duration.ofSeconds(1))
.map(DefaultPayload :: create);
}
@AfterAll
public static void testStopServer() {
MessageServer.stop();
}
}
8、
@Test
public void testRequestAndResponse() { // 测试RSocket模式
getRequestPayload().flatMap(payload -> rsocket.requestResponse(payload))
.doOnNext(response -> System.out.println("【RSocket测试类】接收服务端响应数据:" + response.getDataUtf8()))
.blockLast(Duration.ofMinutes(1));
}
9、
@Test
public void testRequestStream() { // 测试RSocket模式
getRequestPayload().flatMap(payload -> rsocket.requestStream(payload))
.doOnNext(response -> System.out.println("【RSocket测试类】接收服务端响应数据:" + response.getDataUtf8()))
.blockLast(Duration.ofMinutes(1));
}
10、
package com.yootk.test;
import com.yootk.rsocket.server.MessageServer;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketConnector;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.DefaultPayload;
import org.junit.jupiter.api.*;
import reactor.core.publisher.Flux;
import java.time.Duration;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class) // 手工配置方法执行顺序
public class TestMessageServer { // 编写测试类
private static RSocket rsocket;
@BeforeAll // 在所有测试开始之前执行
public static void setUpClient() {
MessageServer.start(); // 进行服务启动
rsocket = RSocketConnector.connectWith(TcpClientTransport.create(6565)).block(); // 客户端需要进行连接
}
@Test
public void testFireAndForget() { // 测试RSocket模式
getRequestPayload().flatMap(payload -> rsocket.fireAndForget(payload))
.blockLast(Duration.ofMinutes(1));
}
@Test
public void testRequestAndResponse() { // 测试RSocket模式
getRequestPayload().flatMap(payload -> rsocket.requestResponse(payload))
.doOnNext(response -> System.out.println("【RSocket测试类】接收服务端响应数据:" + response.getDataUtf8()))
.blockLast(Duration.ofMinutes(1));
}
@Test
public void testRequestStream() { // 测试RSocket模式
getRequestPayload().flatMap(payload -> rsocket.requestStream(payload))
.doOnNext(response -> System.out.println("【RSocket测试类】接收服务端响应数据:" + response.getDataUtf8()))
.blockLast(Duration.ofMinutes(1));
}
@Test
public void testRequestChannel() { // 测试RSocket模式
rsocket.requestChannel(getRequestPayload())
.doOnNext(response -> System.out.println("【RSocket测试类】接收服务端响应数据:" + response.getDataUtf8()))
.blockLast(Duration.ofMinutes(1));
}
private static Flux<Payload> getRequestPayload() { // 传递所有的附加数据内容
return Flux.just("yootk.com", "springboot", "edu.yootk.com", "springcloud", "redis", "netty", "elasticsearch")
.delayElements(Duration.ofSeconds(1))
.map(DefaultPayload :: create);
}
@AfterAll
public static void testStopServer() {
MessageServer.stop();
}
}
搭建 RSocket 服务端
RSocket 服务端
- 在实际项目开发中一般都需要搭建有专属的 RSocket 服务,每一个服务都是一个控制层处理,需要有与之匹配的业务层提供业务处理支持。而控制层中所提供的方法可以根据需要实现不同的 RSocket 数据交互处理,并且所有的控制层方法必须通过“@MessageMapping”注解进行配置才可以对外提供服务

RSocket 开发模块 = 在本次的项目开发中考虑到项目结构的完整性,会使用三个块整个服务的搭建:microboot-rsocket-common 子模块(RSocket 公共模块)、microboot-rsocket、microboot-rsocket-client(RSocket 客户端)server 子模块(RSocket 服务端)

1、
jar {enabled = true}
javadocTask {enabled = false}
javadocJar {enabled = false}
bootJar {enabled = false}
2、
project('microboot-rsocket-common') { // 子模块
dependencies { // 配置子模块依赖
}
}
3、
project('microboot-rsocket-server') { // 子模块
dependencies { // 配置子模块依赖
compile('org.springframework.boot:spring-boot-starter-web')// 引入SpringBoot依赖
compile('org.springframework.boot:spring-boot-starter-rsocket')
compile(project(':microboot-rsocket-common'))
}
}
4、
package com.yootk.rsocket.vo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data // 自动生成类结构
@NoArgsConstructor // 无参构造
@AllArgsConstructor // 全参构造
public class Message {
private String title;
private String content;
}
5、
gradle build
6、
package com.yootk.rsocket.server.service;
import com.yootk.rsocket.vo.Message;
import org.springframework.stereotype.Service;
import java.util.List;
// 本次的开发为了节约代码编写的成本,不再创建繁琐的业务接口了,直接进行业务实现类的处理了
@Service
public class MessageService {
public List<Message> list() { // 响应集合数据
return List.of(
new Message("yootk", "沐言优拓:www.yootk.com"),
new Message("muyan", "沐言科技:www.yootk.com"),
new Message("edu", "李兴华高薪就业编程训练营:edu.yootk.com"));
}
public Message get(String title) { // 响应单个数据
return new Message(title, "【" + title + "】www.yootk.com");
}
public Message echo(Message message) { // 响应单个数据
message.setTitle("【ECHO】" + message.getTitle());
message.setContent("【ECHO】" + message.getContent());
return message;
}
}
7、
package com.yootk.rsocket.server.action;
import com.yootk.rsocket.server.service.MessageService;
import com.yootk.rsocket.vo.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
@Controller // 别使用那个RestController
@Slf4j // 日志组件
public class MessageAction {
@Autowired
private MessageService messageService; // 注入业务接口实例
@MessageMapping("message.echo")
public Mono<Message> echoMessage(Mono<Message> message) {
return message.doOnNext(msg -> this.messageService.echo(msg)) // 响应处理
.doOnNext(msg -> log.info("【消息接收】{}", message)); // 日志处理
}
@MessageMapping("message.delete")
public void deleteMessage(Mono<String> title) {
title.doOnNext(msg -> log.info("【消息删除】{}", msg)).subscribe();// 日志输出
}
@MessageMapping("message.list")
public Flux<Message> listMessage() {
return Flux.fromStream(this.messageService.list().stream());
}
@MessageMapping("message.get")
public Flux<Message> getMessage(Flux<String> title) {
return title.doOnNext(t -> log.info("【消息查询】title = {}", t)) // 日志输出
.map(titleInfo -> titleInfo.toLowerCase()) // 数据转小写
.map(this.messageService :: get) // 加载业务数据
.delayElements(Duration.ofSeconds(1)); // 数据延缓一下
}
}
8、
spring:
rsocket: # RSocket配置项
server:
port: 6869 # 服务监听端口
9、
package com.yootk.rsocket.server;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StartMessageRSocketApplication {
public static void main(String[] args) {
SpringApplication.run(StartMessageRSocketApplication.class, args);
}
}
搭建 RSocket 客户端

1、
project('microboot-rsocket-client') { // 子模块
dependencies { // 配置子模块依赖
compile('org.springframework.boot:spring-boot-starter-web')// 引入SpringBoot依赖
compile('org.springframework.boot:spring-boot-starter-rsocket')
compile(project(':microboot-rsocket-common'))
}
}
2、
package com.yootk.rsocket.client.config;
import io.rsocket.transport.netty.client.TcpClientTransport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.http.codec.cbor.Jackson2CborDecoder;
import org.springframework.http.codec.cbor.Jackson2CborEncoder;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import java.time.Duration;
@Configuration // 配置类
public class RSocketConfig { // RSocket配置类
@Bean // 注册Bean
public RSocketStrategies getRSocketStrategies() {
return RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder())) // 定义传输编码器
.decoders(decoders -> decoders.add(new Jackson2CborDecoder())) // 解码器
.build();
}
@Bean
public Mono<RSocketRequester> getRSocketRequester(RSocketRequester.Builder builder) {
return Mono.just(
builder.rsocketConnector(rSocketConnector -> rSocketConnector.reconnect( // 失败重连
Retry.fixedDelay(2, Duration.ofSeconds(2))))
.dataMimeType(MediaType.APPLICATION_CBOR) // 设置数据的传输类型
.transport(TcpClientTransport.create(6869))); // 设置连接端口
}
}
3、
package com.yootk.rsocket.client;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StartRSocketClientApplication {
public static void main(String[] args) {
SpringApplication.run(StartRSocketClientApplication.class, args);
}
}
4、
package com.yootk.test;
import com.yootk.rsocket.client.StartRSocketClientApplication;
import com.yootk.rsocket.vo.Message;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.test.context.web.WebAppConfiguration;
import reactor.core.publisher.Mono;
@ExtendWith(SpringExtension.class)
@WebAppConfiguration
@SpringBootTest(classes = StartRSocketClientApplication.class)
public class TestMessageRSocket { // 编写测试类
@Autowired
private Mono<RSocketRequester> requesterMono; // 来进行服务调用
@Test
public void testEchoMessage() { // 测试服务响应
this.requesterMono.map(r -> r.route("message.echo")
.data(new Message("李兴华", "沐言科技编程讲师"))) // 配置请求数据
.flatMap(r -> r.retrieveMono(Message.class))
.doOnNext(o -> System.out.println(o)).block();
}
}
5、
package com.yootk.test;
import com.yootk.rsocket.client.StartRSocketClientApplication;
import com.yootk.rsocket.vo.Message;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.test.context.web.WebAppConfiguration;
import reactor.core.publisher.Mono;
@ExtendWith(SpringExtension.class)
@WebAppConfiguration
@SpringBootTest(classes = StartRSocketClientApplication.class)
public class TestMessageRSocket { // 编写测试类
@Autowired
private Mono<RSocketRequester> requesterMono; // 来进行服务调用
@Test
public void testEchoMessage() { // 测试服务响应
this.requesterMono.map(r -> r.route("message.echo")
.data(new Message("李兴华", "沐言科技编程讲师"))) // 配置请求数据
.flatMap(r -> r.retrieveMono(Message.class))
.doOnNext(o -> System.out.println(o)).block();
}
@Test
public void testDeleteMessage() { // 测试服务响应
this.requesterMono.map(r -> r.route("message.delete")
.data("yootk")) // 配置请求数据
.flatMap(RSocketRequester.RetrieveSpec :: send).block(); // 发送不接收数据
}
}
6、
public void testGetMessage() {
Flux<String> titles = Flux.just("muyan", "yootk", "edu"); // 要获取消息的标题
Flux<Message> messageFlux = this.requesterMono.map(r -> r.route("message.get").data(titles))
.flatMapMany(r -> r.retrieveFlux(Message.class))
.doOnNext(o -> System.out.println(o));
messageFlux.blockLast();
}
7、
package com.yootk.test;
import com.yootk.rsocket.client.StartRSocketClientApplication;
import com.yootk.rsocket.vo.Message;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.test.context.web.WebAppConfiguration;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ExtendWith(SpringExtension.class)
@WebAppConfiguration
@SpringBootTest(classes = StartRSocketClientApplication.class)
public class TestMessageRSocket { // 编写测试类
@Autowired
private Mono<RSocketRequester> requesterMono; // 来进行服务调用
@Test
public void testEchoMessage() { // 测试服务响应
this.requesterMono.map(r -> r.route("message.echo")
.data(new Message("李兴华", "沐言科技编程讲师"))) // 配置请求数据
.flatMap(r -> r.retrieveMono(Message.class))
.doOnNext(o -> System.out.println(o)).block();
}
@Test
public void testDeleteMessage() { // 测试服务响应
this.requesterMono.map(r -> r.route("message.delete")
.data("yootk")) // 配置请求数据
.flatMap(RSocketRequester.RetrieveSpec :: send).block(); // 发送不接收数据
}
@Test
public void testListMessage() { // 测试消息列表
this.requesterMono.map(r -> r.route("message.list"))
.flatMapMany(r -> r.retrieveFlux(Message.class))
.doOnNext(o -> System.out.println(o)).blockLast();
}
@Test
public void testGetMessage() {
Flux<String> titles = Flux.just("muyan", "yootk", "edu"); // 要获取消息的标题
Flux<Message> messageFlux = this.requesterMono.map(r -> r.route("message.get").data(titles))
.flatMapMany(r -> r.retrieveFlux(Message.class))
.doOnNext(o -> System.out.println(o));
messageFlux.blockLast();
}
}
RSocket 文件上传
RSocket 文件上传
- RSocket 协议由于本身基于二进制传输,所以也提供了方便的文件上传处理支持。而在进行文件上传时,并不是直接将一个文件整体上传,而是采用了文件块(chunk)的形式进行上传文件的切割,利用 Fux 包裹要上传的一组文件块,而在服务器接收到该文件块之后会通过专属的通道进行文件保存,同时也会将上传的状态发送到 RSocket 客户端

1、
package com.yootk.rsocket.type;
public enum UploadStatus { // 上传状态
CHUNK_COMPLETED, // 文件上传处理之中
COMPLETED, // 文件上传完毕
FAILED; // 文件上传失败
}
2、
package com.yootk.rsocket.constants;
public class UploadConstants { // 上传常量配置
public static final String MIME_FILE_NAME = "message/x.upload.file.name"; // 文件类型
public static final String MIME_FILE_EXTENSION = "message/x.upload.file.extension"; // 文件扩展类型
public static final String FILE_NAME = "file.name"; // 文件名称保存
public static final String FILE_EXT = "file.ext"; // 文件,扩展名
}
3、
package com.yootk.rsocket.server.config;
import com.yootk.rsocket.constants.UploadConstants;
import org.springframework.boot.autoconfigure.integration.IntegrationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.codec.cbor.Jackson2CborDecoder;
import org.springframework.http.codec.cbor.Jackson2CborEncoder;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.MimeType;
@Configuration
public class RSocketConfig { // RSocket配置类
@Bean // 配置策略
public RSocketStrategies getRSocketStrategies() {
return RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder())) // 编码器
.decoders(decoders -> decoders.add(new Jackson2CborDecoder())) // 解码器
.metadataExtractorRegistry(metadataExtractorRegistry -> { // 元数据注册处理
metadataExtractorRegistry.metadataToExtract(
MimeType.valueOf(UploadConstants.MIME_FILE_NAME), String.class, UploadConstants.FILE_NAME);
metadataExtractorRegistry.metadataToExtract(
MimeType.valueOf(UploadConstants.MIME_FILE_EXTENSION), String.class, UploadConstants.FILE_EXT);
}).build();
}
}
4、
package com.yootk.rsocket.server.action;
import com.yootk.rsocket.constants.UploadConstants;
import com.yootk.rsocket.type.UploadStatus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Map;
@Controller
@Slf4j
public class UploadAction {
@Value("${output.file.path:upload}") // 项目目录/upload
private Path outputPath; // 文件保存路径
@MessageMapping("message.upload")
public Flux<UploadStatus> upload( // 文件上传处理
@Headers Map<String, Object> metadata,
@Payload Flux<DataBuffer> content) throws Exception {
log.info("【上后窜路径】outputPath = {}", this.outputPath); // 保存路径
var fileName = metadata.get(UploadConstants.FILE_NAME); // 获取文件名称
var fileExt = metadata.get(UploadConstants.FILE_EXT); // 获取文件后缀
var path = Paths.get(fileName + "." + fileExt); // 获取文件操作路径
log.info("【文件上传】FileName = {}、FileExt = {}、Path = {}", fileName, fileExt, path);
AsynchronousFileChannel channel = AsynchronousFileChannel.open(
this.outputPath.resolve(path), // 解析出输出路径
StandardOpenOption.CREATE, // 文件创建
StandardOpenOption.WRITE // 文件写入
); // 异步文件通道
return Flux.concat(DataBufferUtils.write(content, channel)
.map(s -> UploadStatus.CHUNK_COMPLETED), Mono.just(UploadStatus.COMPLETED))
.doOnComplete(() -> {
try {
channel.close(); // 通道关闭
} catch (IOException e) {
e.printStackTrace();
}
})
.onErrorReturn(UploadStatus.FAILED); // 我失败了
}
}
5、
package com.yootk.test;
import com.yootk.rsocket.client.StartRSocketClientApplication;
import com.yootk.rsocket.constants.UploadConstants;
import com.yootk.rsocket.type.UploadStatus;
import com.yootk.rsocket.vo.Message;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.test.context.web.WebAppConfiguration;
import org.springframework.util.MimeType;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.UUID;
@ExtendWith(SpringExtension.class)
@WebAppConfiguration
@SpringBootTest(classes = StartRSocketClientApplication.class)
public class TestUploadRSocket { // 编写测试类
@Autowired
private Mono<RSocketRequester> requesterMono; // 来进行服务调用
@Value("classpath:/images/muyan_yootk.jpg") // 资源定位表达式
private Resource resource; // 配置资源
@Test
public void testUpload() { // 测试服务响应
String fileName = "muyan-" + UUID.randomUUID(); // 随机生成上传文件名称
String fileExt = this.resource.getFilename().substring(this.resource.getFilename().lastIndexOf(".") + 1);
Flux<DataBuffer> resourceFlux = DataBufferUtils.read(
this.resource, new DefaultDataBufferFactory(), 1024)
.doOnNext(s -> System.out.println("文件上传:" + s));
Flux<UploadStatus> uploadStatusFlux = this.requesterMono
.map(r -> r.route("message.upload") // 配置路由地址
.metadata(metadataSpec -> {
System.out.println("【上传测试】文件名称:" + fileName + "." + fileExt);
metadataSpec.metadata(fileName, MimeType.valueOf(UploadConstants.MIME_FILE_NAME)); // 设置文件名称
metadataSpec.metadata(fileExt, MimeType.valueOf(UploadConstants.MIME_FILE_EXTENSION)); // 设置文件后缀
}).data(resourceFlux)) // 设置文件上传数据
.flatMapMany(r -> r.retrieveFlux(UploadStatus.class))
.doOnNext(o -> System.out.println("上传进度:" + o));
uploadStatusFlux.blockLast(); // 进行阻塞等待
}
}
基于 RSocket 开发 WebSocket
1、
project('microboot-rsocket-websocket') { // 子模块
dependencies { // 配置子模块依赖
compile('org.springframework.boot:spring-boot-starter-web')// 引入SpringBoot依赖
compile('org.springframework.boot:spring-boot-starter-rsocket')
compile(project(':microboot-rsocket-common'))
}
}
2、
package com.yootk.rsocket.action;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
@Controller
@Slf4j
public class MessageAction {
@MessageMapping("message.echo")
public Mono<String> echo(Mono<String> messageMono) { // Request-And-Response模式
return messageMono.map(msg -> "【ECHO】" + msg); // 数据响应
}
@MessageMapping("message.repeat")
public Flux<String> repeat(Mono<String> mono) { // Request-Response-Stream
return mono.flatMapMany(message -> Flux.range(0, 3).map(count -> "【ECHO - " + count + "】" + message))
.delayElements(Duration.ofSeconds(1)); // 间隔1秒响应
}
}
3、
spring:
rsocket:
server:
transport: websocket # 设置处理协议
port: 6969 # 监听端口
mapping-path: /websocket # 映射路径
4、
package com.yootk.rsocket;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StartWebSocketApplication {
public static void main(String[] args) {
SpringApplication.run(StartWebSocketApplication.class,args); // 运行SpringBoot程序
}
}
demo
