1. 개요
프로젝트를 진행하면서 예매 서비스와 결제 서비스가 각각 독립적인 마이크로서비스로 운영되는 환경을 구축했습니다.
그런데 문제는 결제가 완료되면 예매 서비스의 상태를 변경해야 한다는 점이었습니다.
두 서비스가 독립적인 애플리케이션으로 구성되어있기 때문에 단순한 API 호출 방식으로 해결하려 하면 서버 간 강한 의존성이 생기고 확장성이 저하되는 문제가 발생할 수 있었습니다. 이 문제를 해결하기 위해 Kafka를 활용한 이벤트 기반 아키텍처를 도입했습니다.
결제 서비스에서 '결제 완료' 이벤트를 Kafka에 발행(Publish) 하면, 예매 서비스가 해당 이벤트를 구독(Subscribe)하여 상태를 변경하는 방식입니다. 이를 통해 서비스 간 결합도를 낮추면서도 비동기적으로 데이터를 안정적으로 전달할 수 있는 구조를 만들 수 있었습니다.
이번 글에서는 Kafka를 활용하여 MSA 환경에서 서비스 간 데이터를 전달하는 방법을 살펴보겠습니다.
1-1 Kafka란?
카프카를 설치하기 전 Kafka에 대해 간단히 알아보도록 하겠습니다.
Apache Kafka는 대규모 데이터 스트림을 처리할 수 있는 분산형 메시지 브로커입니다. 고성능, 확장성, 내구성이 뛰어나 MSA 환경에서 서비스 간 통신을 위한 비동기 이벤트 처리에 널리 사용됩니다.
Kafka는 발행-구독(Pub/Sub) 모델을 기반으로 하며, 주요 개념은 다음과 같습니다:
- Producer(생산자): 데이터를 생성하고 Kafka 토픽(Topic)에 메시지를 발행하는 역할
- Topic(토픽): 메시지를 분류하고 저장하는 공간
- Broker(브로커): Kafka 클러스터에서 메시지를 관리하는 서버
- Consumer(소비자): 특정 토픽의 메시지를 구독하여 처리하는 서비스
- Partition: 토픽을 여러 개의 파티션으로 나누어 병렬로 처리될 수 있도록 지원
- Zookeeper : kafka의 메타데이터 관리 및 클러스터 상태를 유지
1-2 카프카의 장점
kafka를 사용함으로 얻을 수 있는 장점은 다음과 같습니다.
1. 고성능
- 수백만 건의 메시지를 초당 처리가 가능하다.
- 데이터가 여러 개의 파티션에 분산되어 병렬 처리가 가능하다.
2. 안정성
- 브로커가 여러 개로 구성되기 때문에 하나의 브로커가 장애가 발생해도 데이터가 손실되지 않는다.
3. 비동기 이벤트 처리
- 서비스 간 직접적인 API 호출 없이 이벤트를 전달할 수 있어 서비스 간 결합도를 낮출 수 있다.
4. 데이터 재처리
- 메시지가 Kafka에 일정 기간 유지되어 Consumer가 다운되어도 복구 후 다시 메시지를 가져올 수 있다.
지금까지 카프카에 대해 간단히 알아보았습니다. 지금부터는 본격적으로 카프카를 설치하고, 카프카를 Spring에서 사용하는 방법에 대해 알아보겠습니다.
2. 카프카 설치하기
docker-compose.yml 파일을 작성해줍니다.
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
- image: confluentinc/cp-zookeeper:latest
- Confluent에서 제공하는 Zookeeper 최신 버전 이미지를 사용합니다.
- container_name: zookeeper
- 컨테이너의 이름을 zookeeper로 지정합니다.
- environment
- ZOOKEEPER_CLIENT_PORT: 2181
- Zookeeper가 클라이언트 연결을 위한 포트를 2181로 설정합니다.
- ZOOKEEPER_CLIENT_PORT: 2181
- ports
- 2181:2181
- 호스트(로컬)와 컨테이너 간 포트 매핑
- 로컬에서 2181 포트로 접속하면 컨테이너 내부의 2181 포트로 연결됩니다.
- 2181:2181
- image: confluentinc/cp-kafka:latest
- Confluent Kafka 최신 버전 이미지를 사용합니다.
- container_name: kafka
- 컨테이너 이름을 kafka로 지정합니다.
- depends_on: - zookeeper
- Kafka는 Zookeeper가 실행된 후 실행되도록 설정합니다.
- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- Kafka가 Zookeeper와 연결될 때, 컨테이너 이름 zookeeper와 포트 2181을 사용하도록 설정합니다.
- KAFKA_BROKER_ID: 1
- Kafka 브로커의 ID를 1로 설정합니다.
- 브로커가 여러 개일 경우 각 브로커는 고유한 ID를 가져야 합니다.
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
- Kafka 클라이언트가 접속할 때 사용할 호스트와 포트 정보를 지정합니다.
- PLAINTEXT://localhost:9092
- 로컬 환경에서 사용할 경우 localhost:9092로 설정합니다.
- Docker 네트워크 내에서 접근할 경우 PLAINTEXT://kafka:9092와 같이 설정할 수도 있습니다.
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
- __consumer_offsets 토픽의 복제 개수를 1로 설정합니다.
2-1 카프카 docker-compose.yml 작성하기
docker-compose up -d
위와 같은 명령어를 통해 카프카를 실행해줍니다. (-d 명령어를 사용하면 백그라운드로 실행이 가능합니다.)
2-2 카프카 컨테이너가 정상적으로 생성되었는지 확인하기
정상적으로 컨테이너가 생성되었는지 확인하기 위해 다음과 같은 명령어를 통해 확인해줍니다.
docker ps
3. Kafka가 정상적으로 작동하는지 확인하기
Kafka가 정상적으로 실행되는지 확인하기 위해 Kafka Producer(생산자)와 Consumer(소비자)를 이용해 메시지를 전송하고 수신하는 테스트를 진행해보겠습니다.
3-1. Kafka Topic 생성
docker exec -it kafka kafka-topics --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
exec -it kafka : kafka 컨테이너 내에 접속한다.
kafka-topics : 카프카 토픽에 대한 명령을 실행한다.
--create : 카프카 토픽을 생성한다.
--topic : 생성할 토픽의 이름을 지정해줍니다.
--bootstrap-server : kafak 브로커 서비스를 지정해줍니다.
--replication-factor1: 복제할 계수
--partition: 토픽 내 파티션 개수를 지정해줍니다.
아래 명령어를 통해 토픽이 생성되었는지 확인해줍니다.
docker exec -it kafka kafka-topics --describe --topic test-topic --bootstrap-server localhost:9092
3-2. Consumer 실행 (메시지 수신)
먼저 메시지를 수신할 Consumer를 실행해줍니다.
docker exec -it kafka kafka-console-consumer --topic test-topic --from-beginning --bootstrap-server localhost:9092
3-3 Producer 실행하기(메시지 전송)
docker exec -it kafka kafka-console-producer --topic test-topic --bootstrap-server localhost:9092
위와 같은 명령어를 실행하고 메시지를 보내면 Consumer가 메시지를 수신하는 모습을 확인할 수 있습니다.
(왼쪽의 수신 메시지가 많은 이유는 이전 테스트에서 보냈던 메시지입니다.)
4. Spring에서 카프카 사용하기
이제는 본격적으로 Spring에서 카프카를 이용해보도록 하겠습니다.
4-1 kafka 의존성 추가하기
implementation 'org.springframework.kafka:spring-kafka'
4-2 KafkaProducerConfig 설정 추가하기
메시지를 발행하는 PaymentService 애플리케이션에
Kafka Producer를 설정하기 위해 해당 클래스를 만들어줍니다.
@Configuration
public class KafkaProducerConfig {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configs);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Kafka의 브로커 주소를 설정해줍니다.
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS)
Kafka의 메시지는 바이트 배열로 전송되기 때문에 직렬화(Serialization)가 필요합니다. 직렬화를 위한 설정을 추가해줍니다.
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- KEY_SERIALIZER_CLASS_CONFIG: 메시지의 Key 직렬화 설정
- VALUE_SERIALIZER_CLASS_CONFIG: 메시지의 Value 직렬화 설정
Kafka를 전송하는 객체인 KafkaTemplate를 생성하고 빈으로 등록해줍니다.
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
내부적으로 Kafka Producer 인스턴스를 생성하고 관리하는 역할을 하고 kafkaTemplate.send("topic", "message") 형태로 메시지를 발행할 수 있습니다.
4-3 kafka 이벤트 발행을 위한 Producer 클래스 만들기
@Slf4j
@Component
@RequiredArgsConstructor
public class PaymentCompletedProducer {
private static final String PAYMENT_COMPLETED_TOPIC = "payment-completed-topic";
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
public void paymentCompletedEvent(final Long reservationId) throws JsonProcessingException {
PaymentCompletedMessage message = new PaymentCompletedMessage(reservationId);
String paymentCompletedMessage = objectMapper.writeValueAsString(message);
kafkaTemplate.send(PAYMENT_COMPLETED_TOPIC, paymentCompletedMessage);
log.info("Kafka 메시지 전송 완료: {}", paymentCompletedMessage);
}
}
Kafka로 메시지를 발송할 토픽 이름을 설정해줍니다.
private static final String PAYMENT_COMPLETED_TOPIC = "payment-completed-topic";
ObjectMapper를 사용하여 PaymentCompletedMessage 객체를 JSON 문자열로 변환합니다.
String paymentCompletedMessage = objectMapper.writeValueAsString(message);
이후 결제 로직에서 결제가 성공하면 해당 이벤트가 발행되도록 해당 로직을 추가해줍니다.
public PaymentConfirmResponse confirmPayment(final PaymentConfirmRequest paymentConfirmRequest, final Long reservationId, final Long userId) throws JsonProcessingException {
~~결제로직 진행
paymentRepository.save(payment);
paymentCompletedProducer.paymentCompletedEvent(reservationId); //이벤트 발행하기
return paymentConfirmResponse;
}
4-4 KafkaConsumerConfig 설정 추가하기
메시지를 수신하는 reservation-service에 consumer 설정 클래스를 만들어줍니다.
@Configuration
public class KafkaConsumerConfig {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "reservation-service-group";
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configs);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
- BOOTSTRAP_SERVERS: kafka 서버와 통신하기 위한 주소
- GROUP_ID: Kafka Consumer 그룹의 ID, 동일한 그룹에 속한 여러 Consumer가 있을 때 메시지를 로드 밸런싱하여 처리
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
Kafka에서 받은 메시지의 key와 value를 어떤 방식으로 역직렬화할지 설정해줍니다.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
ConcurrentKafkaListenerContainerFactory 객체를 반환하는 빈을 등록합니다. 이 객체는 Kafka 메시지를 비동기적으로 처리하는 데 사용됩니다.
4-5 KafkaListener 설정하기
kafka 이벤트를 받아서 처리하는 Consumer 클래스를 만들어줍니다.
@Slf4j
@Component
@RequiredArgsConstructor
public class PaymentCompletedConsumer {
private final ReservationService reservationService;
private final ObjectMapper objectMapper;
@KafkaListener(topics = "payment-completed-topic", groupId = "reservation-service")
public void consumePaymentCompletedEvent(final String reservationId) throws JsonProcessingException {
log.info("Receive message: {}", reservationId);
PaymentCompletedMessage message = objectMapper.readValue(reservationId, PaymentCompletedMessage.class);
reservationService.updateReservationStatusIsBooked(message.getReservationId());
}
}
이벤트를 받아 처리하는 메서드에 해당 어노테이션을 추가해줍니다.
@KafkaListener(topics = "payment-completed-topic", groupId = "reservation-service")
- Kafka에서 payment-completed-topic을 구독하여 토픽에 메시지가 발행되면 해당 메서드가 실행되도록 합니다.
- groupId는 Kafka Consumer 그룹을 정의하며, 동일한 그룹에 여러 Consumer들이 메시지를 나누어 처리할 수 있도록 합니다.
public void consumePaymentCompletedEvent(final String reservationId) throws JsonProcessingException {
log.info("Receive message: {}", reservationId);
PaymentCompletedMessage message = objectMapper.readValue(reservationId, PaymentCompletedMessage.class);
reservationService.updateReservationStatusIsBooked(message.getReservationId());
}
수신한 메시지를 PaymentCompletedMessage 객체로 변환한 후 결제 완료된 예매의 상태를 업데이트하는 비즈니스 로직을 수행합니다.
5. 동작 확인하기
테스트를 통해 동작이 제대로 수행되는지 확인해보겠습니다.
5-1. 결제 진행 전 예매 상태 (PAMENT_WAITING)
5-2. 결제 진행하기
Toss간편결제를 통해 결제를 진행한 후 payment-service 어플리케이션에서 kafka 메시지를 발행한 것을 확인할 수 있습니다.
5-3. 예매의 상태 확인 (BOOKED)
이벤트를 수신한 후 정상적으로 예매 상태가 변경된 것을 확인할 수 있습니다.
6. 남아있는 문제들
지금까지 Kafka를 활용해 예매 상태를 변경하는 작업을 진행했습니다.
하지만 아직 해결해야 할 문제들이 남아있습니다.
- 결제 완료 메시지가 Kafka에 전달되지 않으면?!
- 결제는 성공적으로 이루어졌지만 예매 상태 변경이 실패 했을 경우
위의 경우 같이 분산 트랜잭션 환경에서 발생할 수 있는 문제와 해결방법을 다음 글에서 다뤄보도록 하겠습니다!

'Backend > MSA 전환' 카테고리의 다른 글
[MSA 전환하기 9편] Transactional Outbox Pattern을 사용해 분산시스템에서 메시지 발행 신뢰성 보장하기 (0) | 2025.02.17 |
---|---|
[MSA 전환하기 7편] 서킷 브레이커 적용하기 (Resilence4J) (1) | 2025.01.31 |
[MSA 전환하기 6편] OpenFeign을 활용한 서비스 간 통신하기 (0) | 2025.01.29 |
[MSA 전환하기 5편] Spring Cloud Config 도입하기 (+ Spring Cloud Bus) (0) | 2025.01.20 |
[MSA 전환하기 4편] Spring Cloud Gateway 구현하기 (API Gateway) (0) | 2025.01.15 |