๐Ÿ’ป ๊ฐœ๋ฐœ ์ผ์ง€/SpringBoot

[SpringBoot/Kotlin] Transactional Outbox Pattern ์ ์šฉ (Kafka)

์ ์ด 2025. 1. 11. 20:25
๋ฐ˜์‘ํ˜•

 

 

Microservices Pattern: Pattern: Transactional outbox

First, write the message/event to a database OUTBOX table as part of the transaction that updates business objects, and then publish it to a message broker.

microservices.io

Context

MSA ํ™˜๊ฒฝ์—์„œ๋Š” ๋„๋ฉ”์ธ์— ๋Œ€ํ•œ ์ƒ์„ฑ, ์ˆ˜์ •, ์‚ญ์ œ ์ด๋ฒคํŠธ๊ฐ€ ์ผ์–ด๋‚ฌ์„ ๋•Œ, ์ด๋ฅผ ๋ฉ”์„ธ์ง€ ๋ธŒ๋กœ์ปค๋ฅผ ํ†ตํ•ด์„œ ๋‹ค๋ฅธ ๋ถ„์‚ฐ ์„œ๋ฒ„์— ์ „์†กํ•ด์•ผํ•˜๋Š” ์ผ์ด ๋ฐœ์ƒํ•œ๋‹ค.
ํ•˜์ง€๋งŒ, ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์™€ ๋ฉ”์„ธ์ง€ ๋ธŒ๋กœ์ปค๋ฅผ ์•„์šฐ๋ฅด๋Š” ์ „ํ†ต์ ์ธ ๋ถ„์‚ฐ ํŠธ๋žœ์žญ์…˜(2PC, 2-Phase Commit)์„ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์€ ํ˜„์‹ค์ ์œผ๋กœ ๋ถˆ๊ฐ€๋Šฅํ•˜๋‹ค. ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค๋‚˜ ๋ฉ”์„ธ์ง€ ๋ธŒ๋กœ์ปค๊ฐ€ 2PC๋ฅผ ์ง€์›ํ•˜์ง€ ์•Š์„ ์ˆ˜ ์žˆ์„ ๋ฟ๋”๋Ÿฌ, ๋‘ ์ธํ”„๋ผ๊ฐ€ ๊ฐ•๊ฒฐํ•ฉ ๋˜๋Š”๊ฒƒ์€ ๋ฐ”๋žŒ์งํ•˜์ง€ ์•Š์„ ์ˆ˜ ์žˆ๋‹ค.
๊ตฌ์ฒด์ ์ธ ๋ฌธ์ œ ์ƒํ™ฉ์€ ์•„๋ž˜์™€ ๊ฐ™๋‹ค.
  1. ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์ž‘์—…๊ณผ ๋ฉ”์„ธ์ง€ ์ „์†ก์ด ํ•˜๋‚˜์˜ ํŠธ๋žœ์žญ์…˜์œผ๋กœ ๋ฌถ์ผ ๊ฒฝ์šฐ: ๋ฉ”์„ธ์ง€ ์ „์†ก ์‹œ์— ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•  ๊ฒฝ์šฐ, ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค Rollback์ด ๋ฐœ์ƒํ•œ๋‹ค. ์ด ๊ฒฝ์šฐ, ๋ฉ”์„ธ์ง€ ์ „์†ก ์ž์ฒด๋Š” ์„ฑ๊ณตํ•  ์ˆ˜ ์žˆ๋‹ค. (๋ฉ”์„ธ์ง€ ๋ธŒ๋กœ์ปค ๋‚ด๋ถ€ ๋™์ž‘์„ ํŒŒ์•…ํ•˜์ง€ ๋ชปํ•˜๊ณ  ์žˆ์œผ๋ฏ€๋กœ)
    ์ฆ‰, ๊ฐ Consumer์— ์ด๋ฒคํŠธ๋Š” ์ „์†ก๋˜์—ˆ์œผ๋‚˜, ์‹ค์ œ ์ด๋ฒคํŠธ ๋ฐœํ–‰ ์„œ๋น„์Šค๋Š” ๋™์ž‘์„ Rollbackํ•˜์—ฌ ๋„๋ฉ”์ธ์˜ ์›์ž์„ฑ์ด ๋ณด์žฅ๋˜์ง€ ์•Š๋Š”๋‹ค.
  2. ๋ฉ”์„ธ์ง€ ์ „์†ก์ด ํŠธ๋žœ์žญ์…˜ ์™ธ๋ถ€์— ์žˆ์„ ๊ฒฝ์šฐ: ๋ณ„๋„์˜ ์„ค๋ช…์ด ํ•„์š”ํ•˜์ง€ ์•Š์„ ์ •๋„๋กœ ๋ช…ํ™•ํ•œ ๋ฌธ์ œ๊ฐ€ ๋ฐœ์ƒํ•œ๋‹ค. ๋„๋ฉ”์ธ ์ด๋ฒคํŠธ์— ๋Œ€ํ•œ ์ „์†ก์„ ์™„๋ฒฝํ•˜๊ฒŒ ๋ณด์žฅํ•˜์ง€ ๋ชปํ•œ๋‹ค.

Transaction Outbox Pattern

๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์˜ ์ˆ˜์ •๊ณผ ๋ฉ”์„ธ์ง€ ๋ธŒ๋กœ์ปค๋กœ์˜ ์ „์†ก์„ ์›์ž์ ์œผ๋กœ ์ˆ˜ํ–‰ํ•˜๋ ค๋ฉด ์–ด๋–ป๊ฒŒ ํ•ด์•ผํ• ๊นŒ?

๋ฉ”์„ธ์ง€๋ฅผ ์ „์†กํ•˜๋Š” ์„œ๋น„์Šค๋Š” ๋„๋ฉ”์ธ์„ ์—…๋ฐ์ดํŠธํ•˜๋Š” ํŠธ๋žœ์žญ์…˜์˜ ์ผ๋ถ€๋กœ ์ „์†กํ•  ๋ฉ”์„ธ์ง€๋ฅผ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ๋จผ์ € ์ €์žฅํ•œ๋‹ค.
์ดํ›„ ๋ณ„๋„์˜ ํ”„๋กœ์„ธ์Šค๋กœ ๋ฉ”์„ธ์ง€ ๋ธŒ๋กœ์ปค์— ๋ฉ”์„ธ์ง€๋ฅผ ์ „์†กํ•œ๋‹ค.

 

์ฆ‰, ๋ฉ”์„ธ์ง€ ์ €์žฅ๊นŒ์ง€ ํ•˜๋‚˜์˜ ํŠธ๋žœ์žญ์…˜์œผ๋กœ ๋ฌถ์–ด ๋ฉ”์„ธ์ง€ ๋ฐœํ–‰ ์ž์ฒด๋ฅผ ๋ณด์žฅํ•˜๋„๋ก ํ•˜๋Š”๊ฒƒ์ด๋‹ค.

Architecture

  • Sender: ๋„๋ฉ”์ธ ๋ฉ”์„ธ์ง€๋ฅผ ๋ณด๋‚ด๋Š” ์„œ๋น„์Šค
  • Database: ๋„๋ฉ”์ธ ์—”ํ‹ฐํ‹ฐ์™€ ๋ฉ”์„ธ์ง€ Outbox๋ฅผ ์ €์žฅ
  • Message Outbox
  • Message Relay: ์ €์žฅ๋œ ๋ฉ”์„ธ์ง€๋ฅผ ๋ฉ”์„ธ์ง€ ๋ธŒ๋กœ์ปค๋กœ ์ „์†ก

Message Relay

  • Scheduler ํ˜น์€ ์ด๋ฒคํŠธ ๊ธฐ๋ฐ˜์œผ๋กœ ์‹คํ–‰๋˜๋ฉฐ, ์•„์›ƒ๋ฐ•์Šค์—์„œ ๋ฉ”์„ธ์ง€๋ฅผ ์ฝ์–ด์™€ ๋ฉ”์„ธ์ง€ ๋ธŒ๋กœ์ปค๋กœ ์ „์†กํ•œ๋‹ค.
  • ๋ฉ”์„ธ์ง€ ๋ธŒ๋กœ์ปค๋กœ ์ „์†ก๋œ ํ›„, ์•„์›ƒ๋ฐ•์Šค์—์„œ ์‚ญ์ œํ•˜๊ฑฐ๋‚˜ ์ƒํƒœ๋ฅผ ์—…๋ฐ์ดํŠธํ•œ๋‹ค.
  • ์ „์†ก์ด ์‹คํŒจํ•œ ๊ฒฝ์šฐ, ์žฌ์‹œ๋„๊ฐ€ ๊ฐ€๋Šฅํ•˜๋‹ค -> ๋ฉ”์„ธ์ง€ ์ „์†ก์ด ๋ณด์žฅ๋œ๋‹ค.

๊ตฌํ˜„

ํ•ด๋‹น ํŒจํ„ด์„ ๊ตฌํ˜„ํ•˜๋Š” ์—ฌ๋Ÿฌ๊ฐ€์ง€ ๋ฐฉ๋ฒ•์ด ์žˆ๊ฒ ์ง€๋งŒ, ์ด๋ฒˆ ์˜ˆ์ œ์—์„œ๋Š” Scheduler๋ฅผ ํ™œ์šฉํ•œ Relay ๋Œ€์‹ 
Transaction ์ƒํƒœ์— ๋”ฐ๋ผ ์ด๋ฒคํŠธ๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” EventListener๋ฅผ ์‚ฌ์šฉํ•˜๋„๋ก ๊ตฌํ˜„ํ•˜์˜€๋‹ค. (์ฐธ๊ณ  - 29cm์˜ ์ ์šฉ ์‚ฌ๋ก€)

  1. User ์ €์žฅ: User ๋„๋ฉ”์ธ ์ €์žฅ ์š”์ฒญ
  2. User Database ์ €์žฅ: 1์—์„œ ๋งŒ๋“  ์—”ํ‹ฐํ‹ฐ๋ฅผ Database์— ์ €์žฅ
  3. User ์ƒ์„ฑ ์ด๋ฒคํŠธ ๋ฐœํ–‰: `ApplicationEventPublisher` ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ด๋ฒคํŠธ๋ฅผ ๋ฐœํ–‰ํ•œ๋‹ค.
  4. EventRecord ์ €์žฅ: `UserCreateService`์˜ `create` ํŠธ๋žœ์žญ์…˜์ด ์ปค๋ฐ‹๋˜๊ธฐ ์ง์ „ Database์— Record ์ •๋ณด๋ฅผ ์ €์žฅํ•œ๋‹ค.
  5. Kafka ํ”„๋กœ๋“€์‹ฑ: `UserCreateService`์˜ `create` ํŠธ๋žœ์žญ์…˜์ด ์ปค๋ฐ‹๋˜๋Š” ์งํ›„ kafka์— ์ด๋ฒคํŠธ๋ฅผ ํ”„๋กœ๋“€์‹ฑ ํ•œ๋‹ค.
  6. EventRecord ๊ฒฐ๊ณผ ์ €์žฅ: Kafka ํ”„๋กœ๋“€์‹ฑ ๊ฒฐ๊ณผ์™€ ํ•จ๊ป˜ ๊ธฐ์กด Record๋ฅผ ์—…๋ฐ์ดํŠธ ํ˜น์€ ์‚ญ์ œ ํ•œ๋‹ค.

 

UserCreateService

@Service
class UserCreateService(
    private val userRepository: UserRepository,
    private val eventPublisher: ApplicationEventPublisher,
) {
    val logger: Logger = LoggerFactory.getLogger(this::class.java)

    @Transactional // --- 1
    fun create(name: String) {
        val user = User(name)

        userRepository.save(user) // --- 2
        eventPublisher.publishEvent(UserEventRecord(user.id, user.name)) // --- 3

        logger.info("Complete User Creation")
    }
}
  1. `@Transactional` : ๋„๋ฉ”์ธ ๋กœ์ง๊ณผ Outbox ์ €์žฅ์„ ์œ„ํ•œ ์ด๋ฒคํŠธ ๋ฐœํ–‰์„ ํ•˜๋‚˜์˜ ํŠธ๋žœ์žญ์…˜ ๋ฒ”์œ„๋กœ ์žก๋Š”๋‹ค.
  2. user ๋„๋ฉ”์ธ ์ €์žฅ: ๋„๋ฉ”์ธ ๋กœ์ง์„ ์‹คํ–‰ํ•œ๋‹ค.
  3. event ๋ฐœํ–‰: Spring์—์„œ ์ œ๊ณตํ•˜๋Š” `ApplicationEventPublisher` ๋ฅผ ํ™œ์šฉํ•˜์—ฌ ์ด๋ฒคํŠธ๋ฅผ ๋ฐœํ–‰ํ•œ๋‹ค.

 

UserEventListener

@Component
class UserEventListener(
    private val userEventRecorder: UserEventRecorder,
    private val userEventRepository: UserEventRepository,
) {
    val logger: Logger = LoggerFactory.getLogger(this::class.java)

    @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) // --- 1
    fun recordMessageHandler(eventRecord: UserEventRecord) { // ---- 2
        userEventRecorder.save(eventRecord)
    }

    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) // --- 1
    fun sendMessageHandler(eventRecord: UserEventRecord) { // --- 3
        val event = UserCreateEvent(eventRecord.id, eventRecord.name)

        val status: EventRecordStatus = runCatching {
            userEventRepository.publishCreateEvent(event)
        }.fold(
            onSuccess = {
                logger.info("Success to Publish Event")
                EventRecordStatus.SUCCESS
            }, onFailure = {
                logger.info("Fail to Publish Event")
                EventRecordStatus.FAIL
            }
        )

        userEventRecorder.save(eventRecord.copy(status = status))
    }
}
  1. `@TransactionalEventListener` : ํŠธ๋žœ์žญ์…˜ ์ƒํƒœ์— ๋”ฐ๋ผ ์ด๋ฒคํŠธ ์ฒ˜๋ฆฌ ๋ฉ”์„œ๋“œ๋ฅผ ์ง€์ •ํ•  ์ˆ˜ ์žˆ๋‹ค.
    1. `TransactionPhase.BEFORE_COMMIT`: Transaction์ด ์ปค๋ฐ‹๋˜๊ธฐ ์ง์ „์— ์‹คํ–‰ -> ํ•ด๋‹น ๋กœ์ง์ด ์„ฑ๊ณตํ•ด์•ผ Commit
    2. `TransactionPhase.AFTER_COMMIT` : Transaction์ด ์ปค๋ฐ‹๋˜๊ณ  ๋‚œ ํ›„ ์‹คํ–‰
    3. Commit์„ ๊ธฐ์ค€์œผ๋กœ Before๊ณผ After๋กœ ๋‚˜๋ˆ„์–ด์ ธ ์žˆ์–ด, ์ˆœ์ฐจ์‹คํ–‰ ๋ณด์žฅ์ด ๋˜๋ฏ€๋กœ ์ด๋ฅผ ์ด์šฉํ•˜์—ฌ Transaction Outbox Pattern์„ ๊ตฌํ˜„
  2. `recordMessageHandler()` : Processing ์ค‘์ธ EventRecord๋ฅผ ์ตœ์ดˆ ์ €์žฅํ•œ๋‹ค.
  3. `sendMessageHandler()` : userEventRepository(Kafka)๋ฅผ ๋ฐœํ–‰ํ•˜๊ณ , ํ•ด๋‹น ๊ฒฐ๊ณผ์— ๋”ฐ๋ผ Record ์ƒํ…Œ๋ฅผ ์—…๋ฐ์ดํŠธํ•˜์—ฌ ์ €์žฅํ•œ๋‹ค.

์ด์Šˆ

ํ•ด๋‹น ํŒจํ„ด์€ ๋„๋ฉ”์ธ ๊ฐ์ฒด์˜ ์›์ž์„ฑ์„ ๋ณด์žฅํ•œ๋‹ค.
ํ•˜์ง€๋งŒ Scheduler๋ฅผ ํ™œ์šฉํ•˜๊ฑฐ๋‚˜, ์‹คํŒจ ์ผ€์ด์Šค์— ๋Œ€ํ•ด ์žฌ์‹œ๋„ ์š”๊ตฌ์‚ฌํ•ญ์„ ๊ฐ€์ง„ ๊ฒฝ์šฐ, ๋ฉ”์„ธ์ง€ ๋ธŒ๋กœ์ปค๋กœ ๋™์ผํ•œ ๋ฉ”์„ธ์ง€๊ฐ€ ์—ฌ๋Ÿฌ๋ฒˆ ๊ฒŒ์‹œ๋  ์ˆ˜ ์žˆ๋‹ค.
๊ฒฐ๊ณผ์ ์œผ๋กœ Consumer๋Š” ๋ฉฑ๋“ฑ์„ฑ์„ ๋ณด์žฅํ•  ์ˆ˜ ์žˆ๋Š” Idempotent Consumer๋กœ ๊ตฌํ˜„๋˜์–ด์•ผ ํ•œ๋‹ค.
์œ„ ๋ฐฉ๋ฒ•์€ ์ถ”ํ›„ ๊ฒŒ์‹œ๊ธ€๋กœ ์†Œ๊ฐœ ํ•  ์˜ˆ์ •์ด๋‹ค.

์ „์ฒด ์ฝ”๋“œ 

https://github.com/jeongum/transactional-outbox-pattern

 

GitHub - jeongum/transactional-outbox-pattern

Contribute to jeongum/transactional-outbox-pattern development by creating an account on GitHub.

github.com

๋ฐ˜์‘ํ˜•