💻문제점
현재 RSocketRequester를 로컬에서 저장하는데, 로컬에서 관리하지 않으려고 DB에 저장할까 했으나, 삭제와 저장이 빈번하게 일어나고 새로고침이나 뒤로가기 한 번으로 휘발되는 데이터라 굳이? 라는 생각이 들었다. 그래서 레디스에 저장을 하기로 했다.
📃시도
시도1
@Slf4j
@Repository
@RequiredArgsConstructor
public class RSocketRepository {
private final ReactiveRedisTemplate<String, Object> reactiveRedisTemplate;
public Flux<RSocketRequester> findAllByChattingAddress(String address) {
return reactiveRedisTemplate.opsForSet().members(address).cast(RSocketRequester.class);
}
public Mono<Boolean> existsByChattingAddress(String address) {
return reactiveRedisTemplate.hasKey(address);
}
public Mono<RSocketRequester> save(String address, RSocketRequester rsocketRequester) {
log.info("address : " + address + " / rsocket : " + rsocketRequester);
return reactiveRedisTemplate
.opsForSet()
.add(address, rsocketRequester)
.flatMap(size -> {
if (size != null) return Mono.just(rsocketRequester).log();
else return Mono.error(new RuntimeException("Redis 저장 오류"));
});
}
public Mono<Long> deleteByChattingAddress(String address, RSocketRequester rsocketRequester) {
return reactiveRedisTemplate.opsForSet().remove(address, rsocketRequester)
.flatMap(count -> Mono.just(count));
}
}
public void sendMessage(ChatDto chatDto) {
rsocketRepository
.findAllByChattingAddress(chatDto.getChattingAddress()).log()
.doOnNext(ea -> {
ea.route("")
.data(chatDto)
.send()
.subscribe();
}).subscribe();
}
Rsocket 연결은 되는데 채팅 출력이 안된다.
시도2
key가 address인 쌍의 value를 가져오려고 했는데, 우리가 원하는대로 안 가져와지나 해서 findAllByChattingAddress 메서드를 다음과 같이 Set<RSocketReqeuster> 타입으로 반환했다.
@Slf4j
@Repository
@RequiredArgsConstructor
public class RSocketRepository {
private final ReactiveRedisTemplate<String, Object> reactiveRedisTemplate;
public Set<RSocketRequester> findAllByChattingAddress(String address) {
Set<RSocketRequester> result = new HashSet<>();
reactiveRedisTemplate.opsForSet()
.members(address)
.cast(RSocketRequester.class)
.subscribe(result::add);
return result;
}
//생략
}
Set 타입이라 fromIterable를 사용해서 value인 Set<RSocketRequester>에 들어 있는 모든 RSocketRequester에게 메시지를 전송하고자 했다.
public void sendMessage(ChatDto chatDto) {
Flux.fromIterable(rsocketRepository.findAllByChattingAddress(chatDto.getChattingAddress()))
.doOnNext(ea -> {
ea.route("")
.data(chatDto)
.send()
.subscribe();
})
.subscribe();
}
하지만 여전히 출력은 묵묵부답..
시도3
혹시 레디스에 저장이 안되나 싶어 existsByChattingAddress 메서드를 호출해서 key의 존재 여부를 확인해보았다.
public void sendMessage(ChatDto chatDto) {
rsocketRepository.existsByChattingAddress(chatDto.getChattingAddress())
.doOnNext(exists -> {
if (exists) log.info("존재");
else log.info("안 존재");
}).subscribe();
}
그랬더니 "안 존재"가 출력 된다 ㅎㅎ 애초에 레디스에 저장이 안 됐던 것이다.
RSocketRepository에서 사용하는 redisTemplate 는 다음과 같이 생겼다.
처음에는 Object 가 아니라 Set<RSocketRequester> 였는데, 직렬화하는 방법을 찾지 못해서 Object로 두고 했었다.
@Bean
public ReactiveRedisTemplate<String, Object> rsocketRequesterDefine(ReactiveRedisConnectionFactory factory) {
return new ReactiveRedisTemplate<String, Object>(
factory,
RedisSerializationContext.fromSerializer(new Jackson2JsonRedisSerializer(Object.class))
);
}
gpt에게 물어보니, 역시나 RSocketRequester의 직렬화 문제같다고 한다. Jackson2JsonRedisSerializer를 통해 저장되는 객체가 직렬화 되는데, RSocketRequester가 직렬화 가능한 객체가 아니라서 문제가 되는 듯 하다.
이 객체를 직렬화/역직렬화 할 수 있도록 정의를 해주어야 했다.
그런데 더 찾아보니, 애초에 RSocketRequester를 redis로 관리하는 예시는 없었고, redis를 사용한다면, redis의 pub/sub을 사용하는 경우였다.
현재도 서버가 3개로 나누어져 있는데, 나중에 채팅 서버도 분리되고, 확장되는 것을 생각한다면 메세지 큐를 사용하는 것이 나을 것 같다. rsocket을 연결하면 해당 채널의 채팅방을 구독 sub 하고, 채팅을 보내면 해당 채팅방을 구독하는 사람들에게 퍼블리시 pub 해줘야 할 것 같다.
'TIL' 카테고리의 다른 글
[TIL - 20230608] Webflux + Websocket + Kafka (0) | 2023.06.08 |
---|---|
[TIL - 20230606] Webflux + Websocket 전체 채팅 (0) | 2023.06.07 |
[TIL - 20230603] Webflux, Mock 사용한 Channel 도메인, ChannelService 테스트 케이스 작성 (0) | 2023.06.03 |
[TIL - 20230602] RSocket 채팅 전송 시 토큰 검사, 토큰에서 데이터 추출 (0) | 2023.06.03 |
[TIL - 20230531-0601] RSocket을 사용해 채팅방 별 실시간 채팅하기 (2) | 2023.06.02 |