Chef.Yeon
Code Cook
Chef.Yeon
전체 방문자
오늘
어제
  • 분류 전체보기 (230)
    • 게임 개발 (1)
      • Unity (1)
    • Android (27)
      • Kotlin (19)
      • 우아한테크코스 5기 (4)
    • Language (11)
      • 파이썬 (3)
      • Java (7)
    • DB (2)
      • SQL (16)
    • Spring (25)
    • 코딩테스트 (56)
    • Git (1)
    • TIL (85)
    • DevOps (6)

블로그 메뉴

  • 홈
  • 태그
  • 방명록

공지사항

인기 글

태그

  • Wil
  • elasticsearch
  • MariaDB
  • 백준
  • 다이나믹 프로그래밍
  • kibana
  • SQL
  • ec2
  • rsocket
  • Android
  • java
  • 코틀린 인 액션
  • 문자열
  • 파이썬
  • Docker
  • til
  • spring
  • 프로그래머스
  • 우아한테크코스
  • enum
  • grafana
  • 레포지토리
  • 코틀린
  • 프리코스
  • 에라토스테네스의 체
  • 코딩테스트
  • 안드로이드
  • kotlin
  • 내림차순
  • webflux

최근 댓글

최근 글

티스토리

hELLO · Designed By 정상우.
Chef.Yeon

Code Cook

[TIL - 20230531-0601] RSocket을 사용해 채팅방 별 실시간 채팅하기
TIL

[TIL - 20230531-0601] RSocket을 사용해 채팅방 별 실시간 채팅하기

2023. 6. 2. 03:32

 

💻현재 상황

채널까지 생성했으니, 이제 해당 채널에 있는 사람이 채팅을 보낼 때 같은 채널에 있는 사람들에게만 채팅이 전달되어야 한다. 현재 진행된 채팅 기능은 채팅을 보내면 모든 사람이 채팅을 받게 되어있는데, 채팅방에 따라 RSocketRequester를 저장해주지 않고, 그냥 리스트에 모두 저장했기 때문이다.

 

일단 진행했던 빈 프로젝트에서 만든 RSocket 채팅을 기존에 진행하던 프로젝트로 옮겨왔다.

이전에 개발한 내용은 다음 링크를 참고하면 된다.

https://yeon-dev.tistory.com/207

 

[Spring] Webflux + RSocket, React 사용한 전체 채팅 구현

Webflux에서 RSocket을 사용하여 전체 채팅을 구현해보자. build.gradle 의존성은 다음과 같이 추가했다. dependencies { implementation 'org.springframework.boot:spring-boot-starter-rsocket' implementation 'org.springframework.boot:s

yeon-dev.tistory.com

 

@Slf4j
@Service
@RequiredArgsConstructor
public class ChatService {

    private final List<RSocketRequester> CLIENTS = new ArrayList<>();

    public void onConnect(RSocketRequester requester, String chattingAddress) {
        requester.rsocket()
                .onClose()
                .doFirst(() -> {
                    CLIENTS.add(requester);
                })
                .doOnError(error -> {
                    log.info(error.getMessage());
                })
                .doFinally(consumer -> {
                    CLIENTS.remove(requester);
                })
                .subscribe();
    }
// 생략
}

 

그래서 List가 아닌 Map을 사용해서 key는 스트리밍 중인 채널의 채팅방 주소가 들어가고, value로 RSocketRequester타입의 List로 저장하기로 했다.

    private final Map<String, List<RSocketRequester>> participants = new ConcurrentHashMap<>();

💻문제점1

클라이언트에서 RSocket 연결을 할 때 chattingAdress를 서버 측에 넘겨줘야 했다.

어떻게 넘겨줄 수 있을지 부터가 문제였다.

React는 다루지 않다보니 클라이언트 코드를 고치는 것도 매우 어려웠다.


📃문제점1-시도

시도1 

onConnet()에서 Payload로 클라이언트에서 보낸 chattingAddress를 받도록 했다.

 

ChatController

@Slf4j
@Controller
@RequiredArgsConstructor
public class ChatController {

    private final ChatService chatService;

    @ConnectMapping
    public void onConnect(@Payload String chattingAddress, RSocketRequester requester) {
        chatService.onConnect(chattingAddress, requester);
    }
	//생략
}

 

클라이언트

클라이언트에서 connect() 할 때 chattingAddress를 담아서 보내주도록 했다.

const connect = () => {
  const client = new RSocketClient({
    serializers: {
      data: JsonSerializer,
      metadata: IdentitySerializer,
    },
    setup: {
      ...
      metadataMimeType: 'message/x.rsocket.routing.v0',
    },
    responder: responder,
    transport: new RSocketWebSocketClient({
      url: 'ws://localhost:6565/rs',
    }),
  });

  client.connect({
    data: { chattingAddress: chattingAddress },
  })
  .subscribe({
    ...
  });
};

 

이렇게 변경하고 수행했더니 client.connect() is not function 이라는 오류가 발생했다.

 

시도2

클라이언트에서 RSocketClient를 생성할 때 chattingAddress를 넣어줄 수는 없을까?

서버에서는 인자로 RSocketRequester만 받고 그 안에 들어있는 chattingAddress를 꺼낼 수 있으면 좋을 것이다🤔

 

클라이언트

socket
  .requestResponse({
    data: {
      chattingAddress: chattingAddress,
      nickname: '',
      message: message,
    },
    metadata: String.fromCharCode(message.length) + message,
  })
  ...

 

이렇게 작성하고 생각해보니, 이 부분은 RSocket은 이미 연결되고, 채팅 메시지를 보내는 부분이어서 여기서 chattingAdress를 보내는 것은 현재로서는 의미가 없었다.

 

시도3

webflux+RSocket 를 구현한 사람을 찾아보려 했지만 일단 React로 한 사람들이 많이 보이지 않았다. (내가 못 찾는걸까?)

그러던 중 찾은 것이 RSocketClient를 만들 때 setup 프레임을 사용하여 데이터를 전달하는 것이었다.

서버에서는 @Payload로 데이터를 받도록 했다.

 

클라이언트

const client = new RSocketClient({
  setup: {
    data: {
      chattingAddress: chattingAddress,
    },
    // ... 기타 설정
  },
  transport: new RSocketWebSocketClient({
    url: 'ws://localhost:8080',
  }),
  serializers: {
    data: JsonSerializer,
    metadata: IdentitySerializer,
  },
});

client.connect().then(...);

 

서버

@ConnectMapping
public void onConnect(RSocketRequester requester, @Payload String chattingAddress) {
    chatService.onConnect(requester, chattingAddress);
}

 

onConnect() 자체가 수행되지 않는다...!


🔍문제점1-해결

이후에도 나와 정아님이 서로 여러 시도를 해봤지만 chattingAddress를 받아올 수 없었다. 

마침 내가 수요일에 깃허브에 올리려고 빈 프로젝트에 Rsocket 전체 채팅을 다시 만들어둔게 있어서, 거기서 다시 해보기로 했다.

 

클라이언트에서 RSocketClient를 만들 때 setup 프레임의 payload 안에 데이터를 넣어주는 방식으로 계속 시도해보았다.

 

클라이언트

  const connect = (chattingAddress) => {
    const client = new RSocketClient({
      serializers: {
        data: JsonSerializer,
        metadata: IdentitySerializer,
      },
      setup: {
        payload: {
          data: chattingAddress
        },
        keepAlive: 60000,
        lifetime: 180000,
        dataMimeType: 'application/json',
        metadataMimeType: 'message/x.rsocket.routing.v0',
      },
      responder: responder,
      transport: new RSocketWebSocketClient({
        url: 'ws://localhost:6565/rs',
      })
    });

 

서버에서는 기존에 시도하던 방식과 다르게, payload를 받을 때 String 타입이 아닌 Object 타입으로 받도록 변경했다.

 

서버

@ConnectMapping
public void onConnect(RSocketRequester requester, @Payload Object chattingAddress) {
    log.info("address: " + (String)chattingAddress);
    chatService.onConnect(requester, (String)chattingAddress);
}

 

chattingAddress를 무사히 전달 받고, RSocket 연결도 수행되었다!😆

 

원인은 JSON 객체를 바로 String으로 변환하려고 했기 때문이었다.

서버 측에서 Payload를 Object 타입으로 역직렬화 한 뒤, 'String'으로 캐스팅 하여 사용하니 값을 받을 수 있었다.

 

이후 ChatService의 onConnect() 메서드를 수정했다.

 

 

ChatService

map에 key가 채팅방 주소인 것이 있다면 List<RSocketRequester>에 추가해주고, 없다면 requester를 추가한 ArrayList를 만들어서 map에 put 해주었다.

@Slf4j
@Service
@RequiredArgsConstructor
public class ChatService {

    private final Map<String, List<RSocketRequester>> participants = new ConcurrentHashMap<>();

    public void onConnect(RSocketRequester requester, String chattingAddress) {
        requester.rsocket()
                .onClose()
                .doFirst(() -> {
                    if(participants.containsKey(chattingAddress))
                        participants.get(chattingAddress).add(requester);
                    else participants.put(chattingAddress, new ArrayList(Arrays.asList(requester)));
                })
                .doOnError(error -> {
                    log.info(error.getMessage());
                })
                .doFinally(consumer -> {
                    participants.get(chattingAddress).remove(requester);
                })
                .subscribe();
    }
//생략
}

 

채널에 들어갔을 때 해당 채널의 채팅방 별로 RSocketRequester가 잘 저장된다!!


💻문제점2

서버 측에서는 클라이언트가 메시지를 보낼 때 어느 채팅방으로 보낼건지 알아야 했다.


🔍문제점2-해결

클라이언트에서 메시지를 보낼 때 chattingAddress를 함께 보내도록 변경했다.

ChatDto에도 chattingAddress 속성을 추가했다.

ChatService의 sendMessage 메서드에서는 같은 채팅방에 속한 사람들에게만 메시지를 보내줄 수 있도록 했다.

 

클라이언트

const send = () => {
    socket
      .requestResponse({
        data: {
          username: 'Superpil',
          message: message,
          chattingAddress : chattingAddress,
        },
        metadata: String.fromCharCode('message'.length) + 'message',
      })
      .subscribe({
        onComplete: (com) => {
          console.log('com : ', com);
        },
        onError: (error) => {
          console.log(error);
        },
        onNext: (payload) => {
          console.log(payload.data);
        },
        onSubscribe: (subscription) => {
          console.log('subscription', subscription);
        },
      });
  };

 

ChatDto

@Getter
@Setter
@Builder
@AllArgsConstructor
public class ChatDto {
    private String username;
    private String message;
    private String chattingAddress;
}

 

ChatController

@Slf4j
@Controller
@RequiredArgsConstructor
public class ChatController {

    private final ChatService chatService;

    @ConnectMapping
    public void onConnect(RSocketRequester requester, @Payload Object chattingAddress) {
        chatService.onConnect(requester, (String)chattingAddress);
    }

    @MessageMapping("message")
    Mono<ChatDto> message(ChatDto chatDto) {
        return chatService.message(chatDto);
    }

    @MessageMapping("send")
    void sendMessage(ChatDto chatDto) {
        chatService.sendMessage(chatDto);
    }
}

 

ChatService

@Slf4j
@Service
@RequiredArgsConstructor
public class ChatService {

    private final Map<String, List<RSocketRequester>> participants = new ConcurrentHashMap<>();

	//생략

    public Mono<ChatDto> message(ChatDto chatDto) {
        this.sendMessage(chatDto);
        return Mono.just(chatDto);
    }

    public void sendMessage(ChatDto chatDto) {
        Flux.fromIterable(participants.get(chatDto.getChattingAddress()))
                .doOnNext(ea -> {
                    ea.route("")
                            .data(chatDto)
                            .send()
                            .subscribe();
                })
                .subscribe();
    }
}

 

같은 방송의 채팅방에 들어간 사람들끼리 채팅이 잘 된다!


💻문제점3

이제 이걸 다시 기존 프로젝트에 합쳐보자. 그런데 분명 똑같은 코드인데 여전히 onConnect()가 수행이 안됐다.

생각해보자 빈프로젝트와 기존 프로젝트의 차이점을...

그것은 바로 본 프로젝트에는 webflux security가 적용되어있다는 점이다!

 

예전에 Web MVC에서 WebSocket으로 채팅을 구현할 때 securityConfig에서 /ws/** 경로를 허용해주었던 기억이 났다.


🔍문제점3-해결

클라이언트에서는 connect()를 시도할 때 ws://localhost:6565/rs 로 연결을 시도하게 되어있다.

  const connect = (chattingAddress)=> {
    const client = new RSocketClient({
        serializers: {
            data: JsonSerializer,
            metadata: IdentitySerializer,
        },
        setup: {
            payload: {
                data: chattingAddress
            },
            keepAlive: 60000,
            lifetime: 180000,
            dataMimeType: 'application/json',
            metadataMimeType: 'message/x.rsocket.routing.v0',
        },
        responder:responder,
        transport: new RSocketWebSocketClient({
            url: 'ws://localhost:6565/rs', 
        })
    });

 

그래서 /ws/** 경로는 통과하도록 허용해주었더니...!

 

WebfluxSecurityConfiguration

@Configuration
@EnableWebFluxSecurity
@RequiredArgsConstructor
public class WebfluxSecurityConfiguration {

    private final AuthenticationManager authenticationManager;
    private final SecurityContextRepository securityContextRepository;

    @Bean
    public PasswordEncoder passwordEncoder() {
        return new BCryptPasswordEncoder();
    }

    @Bean
    public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity serverHttpSecurity) {
        serverHttpSecurity
                .csrf(ServerHttpSecurity.CsrfSpec::disable)
                .formLogin(ServerHttpSecurity.FormLoginSpec::disable)
                .httpBasic(ServerHttpSecurity.HttpBasicSpec::disable)
                .authorizeExchange(exchanges -> exchanges
                        .pathMatchers("/members/signup", "/members/login", "ws/**").permitAll()
                        .pathMatchers(HttpMethod.GET,"/broadcasts/**").permitAll()
                        .anyExchange().authenticated())
                .securityContextRepository(securityContextRepository)
                .authenticationManager(authenticationManager)
                .exceptionHandling(exceptionHandlingSpec -> exceptionHandlingSpec
                        .accessDeniedHandler((swe, e) ->
                                Mono.fromRunnable(() -> swe.getResponse().setStatusCode(HttpStatus.FORBIDDEN)))
                        .authenticationEntryPoint((swe, e) ->
                                Mono.fromRunnable(() -> swe.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED))));

        return serverHttpSecurity.build();
    }
}

 

같은 채널에 입장한 사람들끼리 채팅이 잘 된다!ヽ(✿゚▽゚)ノ

 

다음 내용

https://yeon-dev.tistory.com/208

 

[TIL - 20230602] RSocket 채팅 전송 시 토큰 검사, 토큰에서 데이터 추출

💻문제점1 스트리밍 중인 채널에 접속하여 같은 채널에 있는 사람들끼리 채팅을 하도록 구현했다. 하지만 현재는 로그인 하지 않은 유저도 채팅이 가능하기 때문에, 로그인한 유저만 채팅을 보

yeon-dev.tistory.com

 

728x90

'TIL' 카테고리의 다른 글

[TIL - 20230603] Webflux, Mock 사용한 Channel 도메인, ChannelService 테스트 케이스 작성  (0) 2023.06.03
[TIL - 20230602] RSocket 채팅 전송 시 토큰 검사, 토큰에서 데이터 추출  (0) 2023.06.03
[TIL - 20230531] webflux 채널 생성/조회  (0) 2023.06.01
[TIL - 20230530] Webflux + RSocket 채팅  (0) 2023.06.01
[TIL - 20230526] Github Actions Jacoco&Codecov  (0) 2023.05.27
    'TIL' 카테고리의 다른 글
    • [TIL - 20230603] Webflux, Mock 사용한 Channel 도메인, ChannelService 테스트 케이스 작성
    • [TIL - 20230602] RSocket 채팅 전송 시 토큰 검사, 토큰에서 데이터 추출
    • [TIL - 20230531] webflux 채널 생성/조회
    • [TIL - 20230530] Webflux + RSocket 채팅
    Chef.Yeon
    Chef.Yeon
    보기 좋고 깔끔한 코드를 요리하기 위해 노력하고 있습니다.

    티스토리툴바