TIL

[TIL - 20230605] RSocketRequester Redis 저장 실패

Chef.Yeon 2023. 6. 5. 21:06

 

💻문제점

현재 RSocketRequester를 로컬에서 저장하는데, 로컬에서 관리하지 않으려고 DB에 저장할까 했으나, 삭제와 저장이 빈번하게 일어나고 새로고침이나 뒤로가기 한 번으로 휘발되는 데이터라 굳이? 라는 생각이 들었다. 그래서 레디스에 저장을 하기로 했다.


📃시도

시도1

@Slf4j
@Repository
@RequiredArgsConstructor
public class RSocketRepository {

    private final ReactiveRedisTemplate<String, Object> reactiveRedisTemplate;

    public Flux<RSocketRequester> findAllByChattingAddress(String address) {
        return reactiveRedisTemplate.opsForSet().members(address).cast(RSocketRequester.class);
    }

    public Mono<Boolean> existsByChattingAddress(String address) {
        return reactiveRedisTemplate.hasKey(address);
    }

    public Mono<RSocketRequester> save(String address, RSocketRequester rsocketRequester) {
        log.info("address : " + address + " / rsocket : " + rsocketRequester);
        return reactiveRedisTemplate
                .opsForSet()
                .add(address, rsocketRequester)
                .flatMap(size -> {
                    if (size != null) return Mono.just(rsocketRequester).log();
                    else return Mono.error(new RuntimeException("Redis 저장 오류"));
                });
    }

    public Mono<Long> deleteByChattingAddress(String address, RSocketRequester rsocketRequester) {
        return reactiveRedisTemplate.opsForSet().remove(address, rsocketRequester)
                .flatMap(count -> Mono.just(count));
    }
}

 

    public void sendMessage(ChatDto chatDto) {
        rsocketRepository
                .findAllByChattingAddress(chatDto.getChattingAddress()).log()
                .doOnNext(ea -> {
                    ea.route("")
                            .data(chatDto)
                            .send()
                            .subscribe();
                }).subscribe();
    }

 

Rsocket 연결은 되는데 채팅 출력이 안된다.

 

시도2

key가 address인 쌍의 value를 가져오려고 했는데, 우리가 원하는대로 안 가져와지나 해서 findAllByChattingAddress 메서드를 다음과 같이 Set<RSocketReqeuster> 타입으로 반환했다.

@Slf4j
@Repository
@RequiredArgsConstructor
public class RSocketRepository {

    private final ReactiveRedisTemplate<String, Object> reactiveRedisTemplate;

    public Set<RSocketRequester> findAllByChattingAddress(String address) {
        Set<RSocketRequester> result = new HashSet<>();
        reactiveRedisTemplate.opsForSet()
                .members(address)
                .cast(RSocketRequester.class)
                .subscribe(result::add);
        return result;
    }
//생략
}

 

Set 타입이라 fromIterable를 사용해서 value인 Set<RSocketRequester>에 들어 있는 모든 RSocketRequester에게 메시지를 전송하고자 했다.

    public void sendMessage(ChatDto chatDto) {

        Flux.fromIterable(rsocketRepository.findAllByChattingAddress(chatDto.getChattingAddress()))
                .doOnNext(ea -> {
                    ea.route("")
                            .data(chatDto)
                            .send()
                            .subscribe();
                })
                .subscribe();
    }

 

하지만 여전히 출력은 묵묵부답..

 

시도3

혹시 레디스에 저장이 안되나 싶어 existsByChattingAddress 메서드를 호출해서 key의 존재 여부를 확인해보았다.

    public void sendMessage(ChatDto chatDto) {
        rsocketRepository.existsByChattingAddress(chatDto.getChattingAddress())
                        .doOnNext(exists -> {
                            if (exists) log.info("존재");
                            else log.info("안 존재");
                        }).subscribe();
    }

 

그랬더니 "안 존재"가 출력 된다 ㅎㅎ 애초에 레디스에 저장이 안 됐던 것이다.

 

RSocketRepository에서 사용하는 redisTemplate 는 다음과 같이 생겼다.

처음에는 Object 가 아니라 Set<RSocketRequester> 였는데, 직렬화하는 방법을 찾지 못해서 Object로 두고 했었다.

    @Bean
    public ReactiveRedisTemplate<String, Object> rsocketRequesterDefine(ReactiveRedisConnectionFactory factory) {
        return new ReactiveRedisTemplate<String, Object>(
                factory,
                RedisSerializationContext.fromSerializer(new Jackson2JsonRedisSerializer(Object.class))
        );
    }

 

gpt에게 물어보니, 역시나 RSocketRequester의 직렬화 문제같다고 한다. Jackson2JsonRedisSerializer를 통해 저장되는 객체가 직렬화 되는데, RSocketRequester가 직렬화 가능한 객체가 아니라서 문제가 되는 듯 하다.

이 객체를 직렬화/역직렬화 할 수 있도록 정의를 해주어야 했다.

 

그런데 더 찾아보니, 애초에 RSocketRequester를 redis로 관리하는 예시는 없었고, redis를 사용한다면, redis의 pub/sub을 사용하는 경우였다.

 

현재도 서버가 3개로 나누어져 있는데, 나중에 채팅 서버도 분리되고, 확장되는 것을 생각한다면 메세지 큐를 사용하는 것이 나을 것 같다. rsocket을 연결하면 해당 채널의 채팅방을 구독 sub 하고, 채팅을 보내면 해당 채팅방을 구독하는 사람들에게 퍼블리시 pub 해줘야 할 것 같다.

728x90