Chef.Yeon
Code Cook
Chef.Yeon
전체 방문자
오늘
어제
  • 분류 전체보기 (230)
    • 게임 개발 (1)
      • Unity (1)
    • Android (27)
      • Kotlin (19)
      • 우아한테크코스 5기 (4)
    • Language (11)
      • 파이썬 (3)
      • Java (7)
    • DB (2)
      • SQL (16)
    • Spring (25)
    • 코딩테스트 (56)
    • Git (1)
    • TIL (85)
    • DevOps (6)

블로그 메뉴

  • 홈
  • 태그
  • 방명록

공지사항

인기 글

태그

  • java
  • 에라토스테네스의 체
  • rsocket
  • 프리코스
  • kibana
  • 안드로이드
  • grafana
  • Wil
  • 파이썬
  • 내림차순
  • 코틀린
  • spring
  • 코딩테스트
  • elasticsearch
  • 다이나믹 프로그래밍
  • Docker
  • enum
  • SQL
  • 우아한테크코스
  • ec2
  • 레포지토리
  • MariaDB
  • Android
  • 코틀린 인 액션
  • 문자열
  • til
  • 백준
  • kotlin
  • webflux
  • 프로그래머스

최근 댓글

최근 글

티스토리

hELLO · Designed By 정상우.
Chef.Yeon

Code Cook

[TIL - 20230608] Webflux + Websocket + Kafka
TIL

[TIL - 20230608] Webflux + Websocket + Kafka

2023. 6. 8. 00:15

 

💻현재 상황

websocket과 reactive kafka를 사용

 

kafkaConfig

@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String consumerGroupId; // group.id 값 추가

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    @Bean
    public ReactiveKafkaConsumerTemplate<String, ChatDto> kafkaConsumerTemplate() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        ReceiverOptions<String, ChatDto> receiverOptions = ReceiverOptions.create(configProps);

        return new ReactiveKafkaConsumerTemplate<>(receiverOptions);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

 

KafkaConsumerService

@Service
@Slf4j
public class KafkaConsumerService {

    @KafkaListener(topics = "test")
    public void receiveMessage(String message) {
        // 수신된 메시지 처리 로직
        log.info("[test] " + message);
    }

    @KafkaListener(topics = "test2")
    public void receiveMessage2(String message) {
        log.info("[test2] " + message);
    }
}

 

KafkaMessageListener

@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaMessageListener {

    private final Map<String, List<RSocketRequester>> participants;
    private final ReactiveKafkaConsumerTemplate<String, ChatDto> kafkaConsumerTemplate;

    @PostConstruct
    public void listen() {
        kafkaConsumerTemplate.receiveAutoAck()
                .doOnNext(record -> {
                    ChatDto chatDto = record.value();
                    log.info("Received Kafka message: {}", chatDto);

                    String chattingAddress = chatDto.getChattingAddress();
                    List<RSocketRequester> subscribers = participants.get(chattingAddress);
                    if (subscribers != null) {
                        Flux.fromIterable(subscribers)
                                .flatMap(requester -> requester.route("message").data(chatDto).send())
                                .subscribe();
                    }
                })
                .subscribe();
    }
}

 

KafkaProducerService

@Service
public class KafkaProducerService {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, ChatDto chatDto) {
        String message = chatDto.getUsername() + ": " + chatDto.getMessage();
        kafkaTemplate.send(topic, message);
    }
}

 

ChatDto

@Getter
@Setter
@Builder
@AllArgsConstructor
public class ChatDto {
    private String username;
    private String message;
    private String chattingAddress;
}

 

클라이언트 측에서 rsocket을 연결하고, chattingAddress를 test 또는 test2 로 보내면 다음과 같이 로그가 출력된다.


💻문제점

지금은 클라이언트에서 test와 test2로만 보내서 테스트하지만, test3 토픽으로 보내면 로그가 출력되지 않는다. 그렇다고 토픽 개수만큼 메서드를 만들 수도 없는 노릇이다.


📃시도

application.yml

# RSocket 설정
spring:
  rsocket:
    server:
      port: 6565
      transport: websocket
      mapping-path: /rs
  
# Kafka 설정
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      topics: test,test2 //띄어쓰기 X
      auto-offset-reset: earliest
      group-id: my-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

 

KafkaConsumerService

@Service
@Slf4j
public class KafkaConsumerService {

    @KafkaListener(id = "dynamic-topic-listener", topics = "#{'${spring.kafka.consumer.topics}'.split(',')}")
    public void receiveMessage(String message) {
        log.info("Received message: " + message);
    }
}

 

@KafkaListener 애노테이션에 topics 속성에 SpEL을 사용하여 동적으로 토픽을 생성했다.

이 토픽들은 application.yml에 정의된 속성이다. 토픽을 콤마(,)로 구분하여 설정했다. 그렇기에 띄어쓰기가 있으면 안된다...ㅎㅎ

 

 

그런데 아직 중요한 문제는 해결되지 않았다.

우리 서비스는 회원 가입시, 그 사람 방송의 채팅방 주소 고유 번호가 발급되는데, 동적으로 생성되기 때문에 application.yml에 계속 동적으로 추가할 수 없었고, 때문에 KafkaListener를 지정해서 로그를 출력하기에는 어려움이 있었다. 


🔍해결

생각해보니, 이 메서드는 단순히 로그 확인용이라 실제 서비스할 때는 구현될 필요가 없다는 생각이 들었다. 로그만 안 찍히지 실제 토픽에 메시지가 수신되기는 하니까...!

728x90

'TIL' 카테고리의 다른 글

[TIL - 20230614] docker compose  (0) 2023.06.15
[TIL - 20230612] OBS 방송 시작 관련 문제 해결  (0) 2023.06.13
[TIL - 20230606] Webflux + Websocket 전체 채팅  (0) 2023.06.07
[TIL - 20230605] RSocketRequester Redis 저장 실패  (0) 2023.06.05
[TIL - 20230603] Webflux, Mock 사용한 Channel 도메인, ChannelService 테스트 케이스 작성  (0) 2023.06.03
    'TIL' 카테고리의 다른 글
    • [TIL - 20230614] docker compose
    • [TIL - 20230612] OBS 방송 시작 관련 문제 해결
    • [TIL - 20230606] Webflux + Websocket 전체 채팅
    • [TIL - 20230605] RSocketRequester Redis 저장 실패
    Chef.Yeon
    Chef.Yeon
    보기 좋고 깔끔한 코드를 요리하기 위해 노력하고 있습니다.

    티스토리툴바