💻현재 상황
websocket과 reactive kafka를 사용
kafkaConfig
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String consumerGroupId; // group.id 값 추가
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ReactiveKafkaConsumerTemplate<String, ChatDto> kafkaConsumerTemplate() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
ReceiverOptions<String, ChatDto> receiverOptions = ReceiverOptions.create(configProps);
return new ReactiveKafkaConsumerTemplate<>(receiverOptions);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
KafkaConsumerService
@Service
@Slf4j
public class KafkaConsumerService {
@KafkaListener(topics = "test")
public void receiveMessage(String message) {
// 수신된 메시지 처리 로직
log.info("[test] " + message);
}
@KafkaListener(topics = "test2")
public void receiveMessage2(String message) {
log.info("[test2] " + message);
}
}
KafkaMessageListener
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaMessageListener {
private final Map<String, List<RSocketRequester>> participants;
private final ReactiveKafkaConsumerTemplate<String, ChatDto> kafkaConsumerTemplate;
@PostConstruct
public void listen() {
kafkaConsumerTemplate.receiveAutoAck()
.doOnNext(record -> {
ChatDto chatDto = record.value();
log.info("Received Kafka message: {}", chatDto);
String chattingAddress = chatDto.getChattingAddress();
List<RSocketRequester> subscribers = participants.get(chattingAddress);
if (subscribers != null) {
Flux.fromIterable(subscribers)
.flatMap(requester -> requester.route("message").data(chatDto).send())
.subscribe();
}
})
.subscribe();
}
}
KafkaProducerService
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, ChatDto chatDto) {
String message = chatDto.getUsername() + ": " + chatDto.getMessage();
kafkaTemplate.send(topic, message);
}
}
ChatDto
@Getter
@Setter
@Builder
@AllArgsConstructor
public class ChatDto {
private String username;
private String message;
private String chattingAddress;
}
클라이언트 측에서 rsocket을 연결하고, chattingAddress를 test 또는 test2 로 보내면 다음과 같이 로그가 출력된다.
💻문제점
지금은 클라이언트에서 test와 test2로만 보내서 테스트하지만, test3 토픽으로 보내면 로그가 출력되지 않는다. 그렇다고 토픽 개수만큼 메서드를 만들 수도 없는 노릇이다.
📃시도
application.yml
# RSocket 설정
spring:
rsocket:
server:
port: 6565
transport: websocket
mapping-path: /rs
# Kafka 설정
kafka:
bootstrap-servers: localhost:9092
consumer:
topics: test,test2 //띄어쓰기 X
auto-offset-reset: earliest
group-id: my-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
KafkaConsumerService
@Service
@Slf4j
public class KafkaConsumerService {
@KafkaListener(id = "dynamic-topic-listener", topics = "#{'${spring.kafka.consumer.topics}'.split(',')}")
public void receiveMessage(String message) {
log.info("Received message: " + message);
}
}
@KafkaListener 애노테이션에 topics 속성에 SpEL을 사용하여 동적으로 토픽을 생성했다.
이 토픽들은 application.yml에 정의된 속성이다. 토픽을 콤마(,)로 구분하여 설정했다. 그렇기에 띄어쓰기가 있으면 안된다...ㅎㅎ
그런데 아직 중요한 문제는 해결되지 않았다.
우리 서비스는 회원 가입시, 그 사람 방송의 채팅방 주소 고유 번호가 발급되는데, 동적으로 생성되기 때문에 application.yml에 계속 동적으로 추가할 수 없었고, 때문에 KafkaListener를 지정해서 로그를 출력하기에는 어려움이 있었다.
🔍해결
생각해보니, 이 메서드는 단순히 로그 확인용이라 실제 서비스할 때는 구현될 필요가 없다는 생각이 들었다. 로그만 안 찍히지 실제 토픽에 메시지가 수신되기는 하니까...!
728x90
'TIL' 카테고리의 다른 글
[TIL - 20230614] docker compose (0) | 2023.06.15 |
---|---|
[TIL - 20230612] OBS 방송 시작 관련 문제 해결 (0) | 2023.06.13 |
[TIL - 20230606] Webflux + Websocket 전체 채팅 (0) | 2023.06.07 |
[TIL - 20230605] RSocketRequester Redis 저장 실패 (0) | 2023.06.05 |
[TIL - 20230603] Webflux, Mock 사용한 Channel 도메인, ChannelService 테스트 케이스 작성 (0) | 2023.06.03 |