Webflux에서 RSocket을 사용하여 전체 채팅을 구현해보자.
build.gradle
의존성은 다음과 같이 추가했다.
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-rsocket'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
compileOnly 'org.projectlombok:lombok'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
}
application.yml
RSocket을 websocket으로 transport해주고, port와 mapping-path를 설정해준다.
spring:
rsocket:
server:
port: 6565
transport: websocket
mapping-path: /rs
RSocketConfig
@Configuration
public class RsocketConfig {
@Bean
public RSocketRequester getRSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.builder()
.rsocketConnector(connector -> connector.reconnect(Retry.backoff(10, Duration.ofMillis(500))))
.rsocketStrategies(rSocketStrategies)
.dataMimeType(MimeTypeUtils.APPLICATION_JSON)
.tcp("localhost", 6565);
}
}
ChatDto
@Getter
@AllArgsConstructor
public class ChatDto {
private String username;
private String message;
}
ChatController
- onConnet()
@ConnectMapping을 통해 Rsocket 연결을 시도하면 onConnet() 메서드를 수행하게 된다.
- message()
클라이언트에서 서버로 message라는 엔드포인트로 메시지를 전송하면 ChatDto를 수신한다.
@Slf4j
@Controller
@RequiredArgsConstructor
public class ChatController {
private final ChatService chatService;
@ConnectMapping
public void onConnect(RSocketRequester requester) {
chatService.onConnect(requester);
}
@MessageMapping("message")
Mono<ChatDto> message(ChatDto chatDto) {
return chatService.message(chatDto);
}
@MessageMapping("send")
void sendMessage(ChatDto chatDto) {
chatService.sendMessage(chatDto);
}
}
ChatService
- onConnet()
연결 시, RSocketRequester를 리스트에 담아 저장하고, 연결이 끊기면 삭제한다.
- message()
sendMessage()를 호출한다.
- sendMessage()
현재 연결된 모든 requester (CLIENTS에 저장된 모든 RSocketRequester)에게 메시지를 전달한다.
@Slf4j
@Service
@RequiredArgsConstructor
public class ChatService {
private final List<RSocketRequester> participants = new ArrayList<>();
public void onConnect(RSocketRequester requester) {
requester.rsocket()
.onClose()
.doFirst(() -> {
participants.add(requester);
})
.doOnError(error -> {
})
.doFinally(consumer -> {
participants.remove(requester);
})
.subscribe();
}
public Mono<ChatDto> message(ChatDto chatDto) {
this.sendMessage(chatDto);
return Mono.just(chatDto);
}
public void sendMessage(ChatDto chatDto) {
Flux.fromIterable(participants)
.doOnNext(ea -> {
ea.route("")
.data(chatDto)
.send()
.subscribe();
})
.subscribe();
}
}
React
RSocket을 연결하고, 메시지를 보낸다.
import React, { useState, useEffect } from 'react';
import {
RSocketClient,
JsonSerializer,
IdentitySerializer,
} from 'rsocket-core';
import RSocketWebSocketClient from 'rsocket-websocket-client';
import { EchoResponder } from './responder';
const Chatting = () => {
const [message, setMessage] = useState('');
const [socket, setSocket] = useState(null);
const [messages, setMessages] = useState([]);
useEffect(() => {
connect();
}, []);
const messageReceiver = (payload) => {
setMessages((prevMessages) => [...prevMessages, payload.data]);
};
const responder = new EchoResponder(messageReceiver);
const send = () => {
socket
.requestResponse({
data: {
username: 'Superpil',
message: message,
},
metadata: String.fromCharCode('message'.length) + 'message',
})
.subscribe({
onComplete: (com) => {
console.log('com : ', com);
},
onError: (error) => {
console.log(error);
},
onNext: (payload) => {
console.log(payload.data);
},
onSubscribe: (subscription) => {
console.log('subscription', subscription);
},
});
};
const connect = () => {
const client = new RSocketClient({
serializers: {
data: JsonSerializer,
metadata: IdentitySerializer,
},
setup: {
// ms btw sending keepalive to server
keepAlive: 60000,
// ms timeout if no keepalive response
lifetime: 180000,
// format of `data`
dataMimeType: 'application/json',
// format of `metadata`
metadataMimeType: 'message/x.rsocket.routing.v0',
},
responder: responder,
transport: new RSocketWebSocketClient({
url: 'ws://localhost:6565/rs',
}),
});
client.connect().subscribe({
onComplete: (socket) => {
setSocket(socket);
},
onError: (error) => {
console.log(error);
},
onSubscribe: (cancel) => {
console.log(cancel);
},
});
};
return (
<div>
<h1>Chatting</h1>
<input
type="text"
value={message}
onChange={(e) => setMessage(e.target.value)}
/>
<button onClick={send}>전송</button>
<ul>
{messages.map((item, index) => (
<li key={index}>{item.username} : {item.message}</li>
))}
</ul>
</div>
);
};
export default Chatting;
채팅방별로 대화하는 것은 깃허브에서 확인할 수 있다.
https://github.com/O-Wensu/WebfluxRSocket_Chat_with_React
GitHub - O-Wensu/WebfluxRSocket_Chat_with_React
Contribute to O-Wensu/WebfluxRSocket_Chat_with_React development by creating an account on GitHub.
github.com
728x90
'Spring' 카테고리의 다른 글
[Spring] Junit5 테스트 No tests found for given includes 오류 해결 (0) | 2023.06.21 |
---|---|
[Spring] 아임포트 사용한 결제 구현 + JavaScript/React 코드 (0) | 2023.06.20 |
[Spring] SpringBoot K6 + Grafana 부하 테스트 및 모니터링 (0) | 2023.05.29 |
[Spring] Jacoco 적용하여 코드 커버리지 확인 및 Codecov사용한 PR에 커버리지 코멘트 추가 (0) | 2023.05.27 |
[Spring] SpringBoot Prometheus, Grafana를 사용한 모니터링(Feat. Docker) (0) | 2023.05.25 |