Chef.Yeon
Code Cook
Chef.Yeon
전체 방문자
오늘
어제
  • 분류 전체보기 (230)
    • 게임 개발 (1)
      • Unity (1)
    • Android (27)
      • Kotlin (19)
      • 우아한테크코스 5기 (4)
    • Language (11)
      • 파이썬 (3)
      • Java (7)
    • DB (2)
      • SQL (16)
    • Spring (25)
    • 코딩테스트 (56)
    • Git (1)
    • TIL (85)
    • DevOps (6)

블로그 메뉴

  • 홈
  • 태그
  • 방명록

공지사항

인기 글

태그

  • Wil
  • enum
  • Android
  • java
  • MariaDB
  • 레포지토리
  • elasticsearch
  • kibana
  • 에라토스테네스의 체
  • 다이나믹 프로그래밍
  • 프로그래머스
  • Docker
  • 문자열
  • 코딩테스트
  • SQL
  • ec2
  • 안드로이드
  • rsocket
  • 코틀린 인 액션
  • 백준
  • 프리코스
  • 파이썬
  • kotlin
  • til
  • spring
  • webflux
  • 내림차순
  • 우아한테크코스
  • 코틀린
  • grafana

최근 댓글

최근 글

티스토리

hELLO · Designed By 정상우.
Chef.Yeon

Code Cook

Spring

[Spring] Webflux + RSocket, React 사용한 전체 채팅 구현

2023. 6. 2. 03:43

Webflux에서 RSocket을 사용하여 전체 채팅을 구현해보자.

 

build.gradle

의존성은 다음과 같이 추가했다.

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-rsocket'
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    compileOnly 'org.projectlombok:lombok'
    developmentOnly 'org.springframework.boot:spring-boot-devtools'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'io.projectreactor:reactor-test'
}

 

application.yml

RSocket을 websocket으로 transport해주고, port와 mapping-path를 설정해준다.

spring:
  rsocket:
    server:
      port: 6565
      transport: websocket
      mapping-path: /rs

 

RSocketConfig

@Configuration
public class RsocketConfig {
    @Bean
    public RSocketRequester getRSocketRequester(RSocketStrategies rSocketStrategies) {
        return RSocketRequester.builder()
                .rsocketConnector(connector -> connector.reconnect(Retry.backoff(10, Duration.ofMillis(500))))
                .rsocketStrategies(rSocketStrategies)
                .dataMimeType(MimeTypeUtils.APPLICATION_JSON)
                .tcp("localhost", 6565);
    }
}

 

ChatDto

@Getter
@AllArgsConstructor
public class ChatDto {
    private String username;
    private String message;
}

 

ChatController

- onConnet()

@ConnectMapping을 통해 Rsocket 연결을 시도하면 onConnet() 메서드를 수행하게 된다.

 

- message()

클라이언트에서 서버로 message라는 엔드포인트로 메시지를 전송하면 ChatDto를 수신한다.

@Slf4j
@Controller
@RequiredArgsConstructor
public class ChatController {

    private final ChatService chatService;

    @ConnectMapping
    public void onConnect(RSocketRequester requester) {
        chatService.onConnect(requester);
    }

    @MessageMapping("message")
    Mono<ChatDto> message(ChatDto chatDto) {
        return chatService.message(chatDto);
    }

    @MessageMapping("send")
    void sendMessage(ChatDto chatDto) {
        chatService.sendMessage(chatDto);
    }
}

 

ChatService

- onConnet()

연결 시, RSocketRequester를 리스트에 담아 저장하고, 연결이 끊기면 삭제한다.

 

- message()

sendMessage()를 호출한다.

 

- sendMessage()

현재 연결된 모든 requester (CLIENTS에 저장된 모든 RSocketRequester)에게 메시지를 전달한다.

@Slf4j
@Service
@RequiredArgsConstructor
public class ChatService {

    private final List<RSocketRequester> participants = new ArrayList<>();

    public void onConnect(RSocketRequester requester) {
        requester.rsocket()
                .onClose()
                .doFirst(() -> {
                    participants.add(requester);
                })
                .doOnError(error -> {
                })
                .doFinally(consumer -> {
                    participants.remove(requester);
                })
                .subscribe();
    }

    public Mono<ChatDto> message(ChatDto chatDto) {
        this.sendMessage(chatDto);
        return Mono.just(chatDto);
    }

    public void sendMessage(ChatDto chatDto) {
        Flux.fromIterable(participants)
                .doOnNext(ea -> {
                    ea.route("")
                            .data(chatDto)
                            .send()
                            .subscribe();
                })
                .subscribe();
    }
}

 

React 

RSocket을 연결하고, 메시지를 보낸다.

import React, { useState, useEffect } from 'react';
import {
  RSocketClient,
  JsonSerializer,
  IdentitySerializer,
} from 'rsocket-core';
import RSocketWebSocketClient from 'rsocket-websocket-client';
import { EchoResponder } from './responder';

const Chatting = () => {
  const [message, setMessage] = useState('');
  const [socket, setSocket] = useState(null);
  const [messages, setMessages] = useState([]);

  useEffect(() => {
    connect();
  }, []);

  const messageReceiver = (payload) => {
    setMessages((prevMessages) => [...prevMessages, payload.data]);
  };
  const responder = new EchoResponder(messageReceiver);

  const send = () => {
    socket
      .requestResponse({
        data: {
          username: 'Superpil',
          message: message,
        },
        metadata: String.fromCharCode('message'.length) + 'message',
      })
      .subscribe({
        onComplete: (com) => {
          console.log('com : ', com);
        },
        onError: (error) => {
          console.log(error);
        },
        onNext: (payload) => {
          console.log(payload.data);
        },
        onSubscribe: (subscription) => {
          console.log('subscription', subscription);
        },
      });
  };

  const connect = () => {
    const client = new RSocketClient({
      serializers: {
        data: JsonSerializer,
        metadata: IdentitySerializer,
      },
      setup: {
        // ms btw sending keepalive to server
        keepAlive: 60000,
        // ms timeout if no keepalive response
        lifetime: 180000,
        // format of `data`
        dataMimeType: 'application/json',
        // format of `metadata`
        metadataMimeType: 'message/x.rsocket.routing.v0',
      },
      responder: responder,
      transport: new RSocketWebSocketClient({
        url: 'ws://localhost:6565/rs', 
      }),
    });

    client.connect().subscribe({
      onComplete: (socket) => {
        setSocket(socket);
      },
      onError: (error) => {
        console.log(error);
      },
      onSubscribe: (cancel) => {
        console.log(cancel);
      },
    });
  };

  return (
    <div>
      <h1>Chatting</h1>
      <input
        type="text"
        value={message}
        onChange={(e) => setMessage(e.target.value)}
      />
      <button onClick={send}>전송</button>
      <ul>
        {messages.map((item, index) => (
          <li key={index}>{item.username} : {item.message}</li>
        ))}
      </ul>
    </div>
  );
};

export default Chatting;

 

 

채팅방별로 대화하는 것은 깃허브에서 확인할 수 있다.

https://github.com/O-Wensu/WebfluxRSocket_Chat_with_React

 

GitHub - O-Wensu/WebfluxRSocket_Chat_with_React

Contribute to O-Wensu/WebfluxRSocket_Chat_with_React development by creating an account on GitHub.

github.com

 

728x90

'Spring' 카테고리의 다른 글

[Spring] Junit5 테스트 No tests found for given includes 오류 해결  (0) 2023.06.21
[Spring] 아임포트 사용한 결제 구현 + JavaScript/React 코드  (0) 2023.06.20
[Spring] SpringBoot K6 + Grafana 부하 테스트 및 모니터링  (0) 2023.05.29
[Spring] Jacoco 적용하여 코드 커버리지 확인 및 Codecov사용한 PR에 커버리지 코멘트 추가  (0) 2023.05.27
[Spring] SpringBoot Prometheus, Grafana를 사용한 모니터링(Feat. Docker)  (0) 2023.05.25
    'Spring' 카테고리의 다른 글
    • [Spring] Junit5 테스트 No tests found for given includes 오류 해결
    • [Spring] 아임포트 사용한 결제 구현 + JavaScript/React 코드
    • [Spring] SpringBoot K6 + Grafana 부하 테스트 및 모니터링
    • [Spring] Jacoco 적용하여 코드 커버리지 확인 및 Codecov사용한 PR에 커버리지 코멘트 추가
    Chef.Yeon
    Chef.Yeon
    보기 좋고 깔끔한 코드를 요리하기 위해 노력하고 있습니다.

    티스토리툴바