Search

트랜잭션널 아웃박스 패턴 - Kafka & Spring

1. 개념

가. 트랜잭션널 아웃박스 패턴 개념

목적: 분산 시스템에서 메시지 전달의 일관성을 보장합니다.
핵심 원리: Producer는 Outbox 저장소를 활용하여 At-Least Once Delivery 방식으로 메시지를 발행하고 Consumer는 메시지를 중복으로 받더라도 멱등성 있게 처리하여 일관되게 메시지를 처리합니다.
장점: RDB에 커밋됐지만 카프카에 메시지가 전달되지 않는 않는 상황과 RDB에 커밋이 안됐는데 카프카에 메시지가 전달되는 상황을 방지할 수 있습니다.
단점: 추가적인 개발 비용이 발생합니다. Publishing 모듈에는 Outbox 저장 로직과 Message 발행 로직을 추가하고, Consuming 로직에는 메시지 멱등처리 로직을 추가해야 합니다.

나. 주요 동작

비즈니스 트랜잭션 실행: 비즈니스 로직을 수행하고 데이터베이스에 변경 사항을 기록합니다. 이 때, 메시지에 대한 기록을 outbox 테이블에 추가합니다.
outbox 조회 및 메시지 발행: (At-Least Once) 메시지가 성공적으로 발행될 때까지 메시지를 전송합니다. 스케줄링이나 이벤트 트리거를 활용하여 outbox 테이블 내 데이터를 조회하여 메시지를 발행합니다. 발행된 메시지와 그렇지 않은 것을 구분할 수 있도록 처리합니다.
메시지 수신 및 멱등 처리: 메시지를 중복적으로 수신되는 것을 감안하여 멱등 처리를 합니다. 예를 들어, 동일한 메시지 ID의 메시지를 처리하지 않거나 상태를 확인하여 중복 처리를 피해야 합니다.

다. Outbox 엔티티 속성

id: 고유 식별자
type: 메시지의 타입
payload: 실제 데이터
status: 메시지의 발행 여부 또는 메시지 처리 여부
createdAt: 발행 시점 또는 생성 시점

2. 예시 - 적림금 취소 시나리오

가. 비지니스 트랜잭션 로직

@Service class CancelPointAccountService( private val repository: PointAccountRepository, private val transRepository: PointTransactionRepository, private val outboxRepository: PointEventOutboxRepository ) { @Retryable(value = [ObjectOptimisticLockingFailureException::class, DataIntegrityViolationException::class]) @Transactional fun cancelPoint(memberId: String) { val account = repository.find(MemberId(memberId)) ?: throw MemberNotFoundException(MemberId(memberId)) val pointsUsed = findLatestUsedPoints(account) account.addPoints(pointsUsed) outboxRepository.save(PointEvent(account.accountId.toString(), pointsUsed)) } private fun findLatestUsedPoints(account: PointAccount): Int { val useTrans = transRepository.findAll( account.accountId, types = setOf(TransactionType.USE), pageReq = PageRequest.of(0, 1, Sort.by("createdAt").descending()), ) if (useTrans.isEmpty) throw UseTransNotFoundException(account.accountId) return useTrans.first().points } } @Entity class PointEvent( val accountId: String, val amount: Int ) { @Id @TableGenerator(name = "PointEventIdGenerator", table = "sequence", allocationSize = 100) @GeneratedValue(strategy = GenerationType.TABLE, generator = "PointEventIdGenerator") var id: Long? = null }
Kotlin
복사

나. 메시지 퍼블리싱 로직

@Component class MessagePublishingJob( private val service: MessagePublishingService ) { private val logger = LoggerFactory.getLogger(javaClass) @Scheduled(fixedDelay = 1000) fun publishMessages() { logger.info("publishing task is scheduled") service.publish() } } @Service class MessagePublishingService( private val outbox: PointEventOutboxRepository, private val kafkaTemplate: KafkaTemplate<String, PointEvent>, @Value("\${spring.kafka.topic.point.cancel}") private val pointCancelTopic: String, ) { @Transactional fun publish() { val messages = outbox.findAll() messages.forEach { kafkaTemplate.send(pointCancelTopic, it) } outbox.delete(messages.map { it.id }) } }
Kotlin
복사

다. 메시지 컨슈밍 로직

@Component class PointEventListener( private val service: PointTransactionService, private val checkService: CheckDuplicatedMessageService ) { @KafkaListener( topics = ["\${spring.kafka.topic.point.cancel}"], groupId = ["\${spring.kafka.group-id}"], containerFactory = "pointListenerContainerFactory" ) fun onPointCancelled(event: PointEvent) { checkService.check(event) // 멱등성 처리 service.cancel(AccountId(event.accountId), event.amount) } } @Service class PointTransactionService( private val transRepository: PointTransactionRepository, ) { private val transFactory = PointTransactionFactory() @Transactional fun cancel(accountId: AccountId, amount: Int) { val cancelTrans = transFactory.createCancelTrans(accountId, amount) transRepository.save(cancelTrans) } } @Service class CheckDuplicatedMessageService( private val repository: PointEventOutboxRepository, ) { @Transactional fun check(event: PointEvent) { val msg = repository.find(event) if(msg != null) throw DuplicatedMessageException(msg) } }
Kotlin
복사

3. Exactly Once Delivery 구현

가. Exactly Once Delivery 개념 설명

Exactly Once Delivery 방식은 컨슈머가 프로듀서의 트랜잭션 정상 종료를 확인하는 프로세스가 추가되는 방향으로 구현됩니다. 구체적으로 말하자면 프로듀서의 트랜잭션이 정상 종료됨에 따라 LSO(Last Stable Offset)이 업데이트가 됩니다. LSO 구간의 메시지에 한해서 컨슈머가 메시지를 polling할 수 있습니다.

나. Exactly Once Delivery 설정 예시

아래와 같이 ISOLATION_LEVEL_CONFIGREAD_COMMITTED 로 수정하면, LSO 기반의 Exactly Once 방식으로 메시지의 일관성을 보장합니다.
@EnableKafka @Configuration class KafkaConsumerConfig( @Value("\${spring.kafka.bootstrap-servers}") private val bootstrapServers: String ) { @Bean fun kafkaConsumerFactory(): DefaultKafkaConsumerFactory<String, PointEvent> { val configProps = mapOf( // 생략 props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_COMMITTED); ) return DefaultKafkaConsumerFactory(configProps) } }
Kotlin
복사

참고

강남언니, 준, 분산 시스템에서 메시지 안전하게 다루기, https://blog.gangnamunni.com/post/transactional-outbox/
NHN Cloud, 김병부, 분산 시스템에서 데이터를 전달하는 효율적인 방법, https://www.youtube.com/watch?v=uk5fRLUsBfk