💻현재 상황
채널까지 생성했으니, 이제 해당 채널에 있는 사람이 채팅을 보낼 때 같은 채널에 있는 사람들에게만 채팅이 전달되어야 한다. 현재 진행된 채팅 기능은 채팅을 보내면 모든 사람이 채팅을 받게 되어있는데, 채팅방에 따라 RSocketRequester를 저장해주지 않고, 그냥 리스트에 모두 저장했기 때문이다.
일단 진행했던 빈 프로젝트에서 만든 RSocket 채팅을 기존에 진행하던 프로젝트로 옮겨왔다.
이전에 개발한 내용은 다음 링크를 참고하면 된다.
https://yeon-dev.tistory.com/207
[Spring] Webflux + RSocket, React 사용한 전체 채팅 구현
Webflux에서 RSocket을 사용하여 전체 채팅을 구현해보자. build.gradle 의존성은 다음과 같이 추가했다. dependencies { implementation 'org.springframework.boot:spring-boot-starter-rsocket' implementation 'org.springframework.boot:s
yeon-dev.tistory.com
@Slf4j
@Service
@RequiredArgsConstructor
public class ChatService {
private final List<RSocketRequester> CLIENTS = new ArrayList<>();
public void onConnect(RSocketRequester requester, String chattingAddress) {
requester.rsocket()
.onClose()
.doFirst(() -> {
CLIENTS.add(requester);
})
.doOnError(error -> {
log.info(error.getMessage());
})
.doFinally(consumer -> {
CLIENTS.remove(requester);
})
.subscribe();
}
// 생략
}
그래서 List가 아닌 Map을 사용해서 key는 스트리밍 중인 채널의 채팅방 주소가 들어가고, value로 RSocketRequester타입의 List로 저장하기로 했다.
private final Map<String, List<RSocketRequester>> participants = new ConcurrentHashMap<>();
💻문제점1
클라이언트에서 RSocket 연결을 할 때 chattingAdress를 서버 측에 넘겨줘야 했다.
어떻게 넘겨줄 수 있을지 부터가 문제였다.
React는 다루지 않다보니 클라이언트 코드를 고치는 것도 매우 어려웠다.
📃문제점1-시도
시도1
onConnet()에서 Payload로 클라이언트에서 보낸 chattingAddress를 받도록 했다.
ChatController
@Slf4j
@Controller
@RequiredArgsConstructor
public class ChatController {
private final ChatService chatService;
@ConnectMapping
public void onConnect(@Payload String chattingAddress, RSocketRequester requester) {
chatService.onConnect(chattingAddress, requester);
}
//생략
}
클라이언트
클라이언트에서 connect() 할 때 chattingAddress를 담아서 보내주도록 했다.
const connect = () => {
const client = new RSocketClient({
serializers: {
data: JsonSerializer,
metadata: IdentitySerializer,
},
setup: {
...
metadataMimeType: 'message/x.rsocket.routing.v0',
},
responder: responder,
transport: new RSocketWebSocketClient({
url: 'ws://localhost:6565/rs',
}),
});
client.connect({
data: { chattingAddress: chattingAddress },
})
.subscribe({
...
});
};
이렇게 변경하고 수행했더니 client.connect() is not function 이라는 오류가 발생했다.
시도2
클라이언트에서 RSocketClient를 생성할 때 chattingAddress를 넣어줄 수는 없을까?
서버에서는 인자로 RSocketRequester만 받고 그 안에 들어있는 chattingAddress를 꺼낼 수 있으면 좋을 것이다🤔
클라이언트
socket
.requestResponse({
data: {
chattingAddress: chattingAddress,
nickname: '',
message: message,
},
metadata: String.fromCharCode(message.length) + message,
})
...
이렇게 작성하고 생각해보니, 이 부분은 RSocket은 이미 연결되고, 채팅 메시지를 보내는 부분이어서 여기서 chattingAdress를 보내는 것은 현재로서는 의미가 없었다.
시도3
webflux+RSocket 를 구현한 사람을 찾아보려 했지만 일단 React로 한 사람들이 많이 보이지 않았다. (내가 못 찾는걸까?)
그러던 중 찾은 것이 RSocketClient를 만들 때 setup 프레임을 사용하여 데이터를 전달하는 것이었다.
서버에서는 @Payload로 데이터를 받도록 했다.
클라이언트
const client = new RSocketClient({
setup: {
data: {
chattingAddress: chattingAddress,
},
// ... 기타 설정
},
transport: new RSocketWebSocketClient({
url: 'ws://localhost:8080',
}),
serializers: {
data: JsonSerializer,
metadata: IdentitySerializer,
},
});
client.connect().then(...);
서버
@ConnectMapping
public void onConnect(RSocketRequester requester, @Payload String chattingAddress) {
chatService.onConnect(requester, chattingAddress);
}
onConnect() 자체가 수행되지 않는다...!
🔍문제점1-해결
이후에도 나와 정아님이 서로 여러 시도를 해봤지만 chattingAddress를 받아올 수 없었다.
마침 내가 수요일에 깃허브에 올리려고 빈 프로젝트에 Rsocket 전체 채팅을 다시 만들어둔게 있어서, 거기서 다시 해보기로 했다.
클라이언트에서 RSocketClient를 만들 때 setup 프레임의 payload 안에 데이터를 넣어주는 방식으로 계속 시도해보았다.
클라이언트
const connect = (chattingAddress) => {
const client = new RSocketClient({
serializers: {
data: JsonSerializer,
metadata: IdentitySerializer,
},
setup: {
payload: {
data: chattingAddress
},
keepAlive: 60000,
lifetime: 180000,
dataMimeType: 'application/json',
metadataMimeType: 'message/x.rsocket.routing.v0',
},
responder: responder,
transport: new RSocketWebSocketClient({
url: 'ws://localhost:6565/rs',
})
});
서버에서는 기존에 시도하던 방식과 다르게, payload를 받을 때 String 타입이 아닌 Object 타입으로 받도록 변경했다.
서버
@ConnectMapping
public void onConnect(RSocketRequester requester, @Payload Object chattingAddress) {
log.info("address: " + (String)chattingAddress);
chatService.onConnect(requester, (String)chattingAddress);
}
chattingAddress를 무사히 전달 받고, RSocket 연결도 수행되었다!😆
원인은 JSON 객체를 바로 String으로 변환하려고 했기 때문이었다.
서버 측에서 Payload를 Object 타입으로 역직렬화 한 뒤, 'String'으로 캐스팅 하여 사용하니 값을 받을 수 있었다.
이후 ChatService의 onConnect() 메서드를 수정했다.
ChatService
map에 key가 채팅방 주소인 것이 있다면 List<RSocketRequester>에 추가해주고, 없다면 requester를 추가한 ArrayList를 만들어서 map에 put 해주었다.
@Slf4j
@Service
@RequiredArgsConstructor
public class ChatService {
private final Map<String, List<RSocketRequester>> participants = new ConcurrentHashMap<>();
public void onConnect(RSocketRequester requester, String chattingAddress) {
requester.rsocket()
.onClose()
.doFirst(() -> {
if(participants.containsKey(chattingAddress))
participants.get(chattingAddress).add(requester);
else participants.put(chattingAddress, new ArrayList(Arrays.asList(requester)));
})
.doOnError(error -> {
log.info(error.getMessage());
})
.doFinally(consumer -> {
participants.get(chattingAddress).remove(requester);
})
.subscribe();
}
//생략
}
채널에 들어갔을 때 해당 채널의 채팅방 별로 RSocketRequester가 잘 저장된다!!
💻문제점2
서버 측에서는 클라이언트가 메시지를 보낼 때 어느 채팅방으로 보낼건지 알아야 했다.
🔍문제점2-해결
클라이언트에서 메시지를 보낼 때 chattingAddress를 함께 보내도록 변경했다.
ChatDto에도 chattingAddress 속성을 추가했다.
ChatService의 sendMessage 메서드에서는 같은 채팅방에 속한 사람들에게만 메시지를 보내줄 수 있도록 했다.
클라이언트
const send = () => {
socket
.requestResponse({
data: {
username: 'Superpil',
message: message,
chattingAddress : chattingAddress,
},
metadata: String.fromCharCode('message'.length) + 'message',
})
.subscribe({
onComplete: (com) => {
console.log('com : ', com);
},
onError: (error) => {
console.log(error);
},
onNext: (payload) => {
console.log(payload.data);
},
onSubscribe: (subscription) => {
console.log('subscription', subscription);
},
});
};
ChatDto
@Getter
@Setter
@Builder
@AllArgsConstructor
public class ChatDto {
private String username;
private String message;
private String chattingAddress;
}
ChatController
@Slf4j
@Controller
@RequiredArgsConstructor
public class ChatController {
private final ChatService chatService;
@ConnectMapping
public void onConnect(RSocketRequester requester, @Payload Object chattingAddress) {
chatService.onConnect(requester, (String)chattingAddress);
}
@MessageMapping("message")
Mono<ChatDto> message(ChatDto chatDto) {
return chatService.message(chatDto);
}
@MessageMapping("send")
void sendMessage(ChatDto chatDto) {
chatService.sendMessage(chatDto);
}
}
ChatService
@Slf4j
@Service
@RequiredArgsConstructor
public class ChatService {
private final Map<String, List<RSocketRequester>> participants = new ConcurrentHashMap<>();
//생략
public Mono<ChatDto> message(ChatDto chatDto) {
this.sendMessage(chatDto);
return Mono.just(chatDto);
}
public void sendMessage(ChatDto chatDto) {
Flux.fromIterable(participants.get(chatDto.getChattingAddress()))
.doOnNext(ea -> {
ea.route("")
.data(chatDto)
.send()
.subscribe();
})
.subscribe();
}
}
같은 방송의 채팅방에 들어간 사람들끼리 채팅이 잘 된다!
💻문제점3
이제 이걸 다시 기존 프로젝트에 합쳐보자. 그런데 분명 똑같은 코드인데 여전히 onConnect()가 수행이 안됐다.
생각해보자 빈프로젝트와 기존 프로젝트의 차이점을...
그것은 바로 본 프로젝트에는 webflux security가 적용되어있다는 점이다!
예전에 Web MVC에서 WebSocket으로 채팅을 구현할 때 securityConfig에서 /ws/** 경로를 허용해주었던 기억이 났다.
🔍문제점3-해결
클라이언트에서는 connect()를 시도할 때 ws://localhost:6565/rs 로 연결을 시도하게 되어있다.
const connect = (chattingAddress)=> {
const client = new RSocketClient({
serializers: {
data: JsonSerializer,
metadata: IdentitySerializer,
},
setup: {
payload: {
data: chattingAddress
},
keepAlive: 60000,
lifetime: 180000,
dataMimeType: 'application/json',
metadataMimeType: 'message/x.rsocket.routing.v0',
},
responder:responder,
transport: new RSocketWebSocketClient({
url: 'ws://localhost:6565/rs',
})
});
그래서 /ws/** 경로는 통과하도록 허용해주었더니...!
WebfluxSecurityConfiguration
@Configuration
@EnableWebFluxSecurity
@RequiredArgsConstructor
public class WebfluxSecurityConfiguration {
private final AuthenticationManager authenticationManager;
private final SecurityContextRepository securityContextRepository;
@Bean
public PasswordEncoder passwordEncoder() {
return new BCryptPasswordEncoder();
}
@Bean
public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity serverHttpSecurity) {
serverHttpSecurity
.csrf(ServerHttpSecurity.CsrfSpec::disable)
.formLogin(ServerHttpSecurity.FormLoginSpec::disable)
.httpBasic(ServerHttpSecurity.HttpBasicSpec::disable)
.authorizeExchange(exchanges -> exchanges
.pathMatchers("/members/signup", "/members/login", "ws/**").permitAll()
.pathMatchers(HttpMethod.GET,"/broadcasts/**").permitAll()
.anyExchange().authenticated())
.securityContextRepository(securityContextRepository)
.authenticationManager(authenticationManager)
.exceptionHandling(exceptionHandlingSpec -> exceptionHandlingSpec
.accessDeniedHandler((swe, e) ->
Mono.fromRunnable(() -> swe.getResponse().setStatusCode(HttpStatus.FORBIDDEN)))
.authenticationEntryPoint((swe, e) ->
Mono.fromRunnable(() -> swe.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED))));
return serverHttpSecurity.build();
}
}
같은 채널에 입장한 사람들끼리 채팅이 잘 된다!ヽ(✿゚▽゚)ノ
다음 내용
https://yeon-dev.tistory.com/208
[TIL - 20230602] RSocket 채팅 전송 시 토큰 검사, 토큰에서 데이터 추출
💻문제점1 스트리밍 중인 채널에 접속하여 같은 채널에 있는 사람들끼리 채팅을 하도록 구현했다. 하지만 현재는 로그인 하지 않은 유저도 채팅이 가능하기 때문에, 로그인한 유저만 채팅을 보
yeon-dev.tistory.com
'TIL' 카테고리의 다른 글
[TIL - 20230603] Webflux, Mock 사용한 Channel 도메인, ChannelService 테스트 케이스 작성 (0) | 2023.06.03 |
---|---|
[TIL - 20230602] RSocket 채팅 전송 시 토큰 검사, 토큰에서 데이터 추출 (0) | 2023.06.03 |
[TIL - 20230531] webflux 채널 생성/조회 (0) | 2023.06.01 |
[TIL - 20230530] Webflux + RSocket 채팅 (0) | 2023.06.01 |
[TIL - 20230526] Github Actions Jacoco&Codecov (0) | 2023.05.27 |