๐ป ๊ฐ๋ฐ ์ผ์ง/SpringBoot
[SpringBoot/Kotlin] Transactional Outbox Pattern ์ ์ฉ (Kafka)
์ ์ด
2025. 1. 11. 20:25
๋ฐ์ํ
Microservices Pattern: Pattern: Transactional outbox
First, write the message/event to a database OUTBOX table as part of the transaction that updates business objects, and then publish it to a message broker.
microservices.io
Context
MSA ํ๊ฒฝ์์๋ ๋๋ฉ์ธ์ ๋ํ ์์ฑ, ์์ , ์ญ์ ์ด๋ฒคํธ๊ฐ ์ผ์ด๋ฌ์ ๋, ์ด๋ฅผ ๋ฉ์ธ์ง ๋ธ๋ก์ปค๋ฅผ ํตํด์ ๋ค๋ฅธ ๋ถ์ฐ ์๋ฒ์ ์ ์กํด์ผํ๋ ์ผ์ด ๋ฐ์ํ๋ค.
ํ์ง๋ง, ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ๋ฉ์ธ์ง ๋ธ๋ก์ปค๋ฅผ ์์ฐ๋ฅด๋ ์ ํต์ ์ธ ๋ถ์ฐ ํธ๋์ญ์
(2PC, 2-Phase Commit)์ ์ฌ์ฉํ๋ ๊ฒ์ ํ์ค์ ์ผ๋ก ๋ถ๊ฐ๋ฅํ๋ค. ๋ฐ์ดํฐ๋ฒ ์ด์ค๋ ๋ฉ์ธ์ง ๋ธ๋ก์ปค๊ฐ 2PC๋ฅผ ์ง์ํ์ง ์์ ์ ์์ ๋ฟ๋๋ฌ, ๋ ์ธํ๋ผ๊ฐ ๊ฐ๊ฒฐํฉ ๋๋๊ฒ์ ๋ฐ๋์งํ์ง ์์ ์ ์๋ค.
๊ตฌ์ฒด์ ์ธ ๋ฌธ์ ์ํฉ์ ์๋์ ๊ฐ๋ค.
- ๋ฐ์ดํฐ๋ฒ ์ด์ค ์์
๊ณผ ๋ฉ์ธ์ง ์ ์ก์ด ํ๋์ ํธ๋์ญ์
์ผ๋ก ๋ฌถ์ผ ๊ฒฝ์ฐ: ๋ฉ์ธ์ง ์ ์ก ์์ ์์ธ๊ฐ ๋ฐ์ํ ๊ฒฝ์ฐ, ๋ฐ์ดํฐ๋ฒ ์ด์ค Rollback์ด ๋ฐ์ํ๋ค. ์ด ๊ฒฝ์ฐ, ๋ฉ์ธ์ง ์ ์ก ์์ฒด๋ ์ฑ๊ณตํ ์ ์๋ค. (๋ฉ์ธ์ง ๋ธ๋ก์ปค ๋ด๋ถ ๋์์ ํ์
ํ์ง ๋ชปํ๊ณ ์์ผ๋ฏ๋ก)
์ฆ, ๊ฐ Consumer์ ์ด๋ฒคํธ๋ ์ ์ก๋์์ผ๋, ์ค์ ์ด๋ฒคํธ ๋ฐํ ์๋น์ค๋ ๋์์ Rollbackํ์ฌ ๋๋ฉ์ธ์ ์์์ฑ์ด ๋ณด์ฅ๋์ง ์๋๋ค. - ๋ฉ์ธ์ง ์ ์ก์ด ํธ๋์ญ์ ์ธ๋ถ์ ์์ ๊ฒฝ์ฐ: ๋ณ๋์ ์ค๋ช ์ด ํ์ํ์ง ์์ ์ ๋๋ก ๋ช ํํ ๋ฌธ์ ๊ฐ ๋ฐ์ํ๋ค. ๋๋ฉ์ธ ์ด๋ฒคํธ์ ๋ํ ์ ์ก์ ์๋ฒฝํ๊ฒ ๋ณด์ฅํ์ง ๋ชปํ๋ค.
Transaction Outbox Pattern
๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์์ ๊ณผ ๋ฉ์ธ์ง ๋ธ๋ก์ปค๋ก์ ์ ์ก์ ์์์ ์ผ๋ก ์ํํ๋ ค๋ฉด ์ด๋ป๊ฒ ํด์ผํ ๊น?
๋ฉ์ธ์ง๋ฅผ ์ ์กํ๋ ์๋น์ค๋ ๋๋ฉ์ธ์ ์
๋ฐ์ดํธํ๋ ํธ๋์ญ์
์ ์ผ๋ถ๋ก ์ ์กํ ๋ฉ์ธ์ง๋ฅผ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ๋จผ์ ์ ์ฅํ๋ค.
์ดํ ๋ณ๋์ ํ๋ก์ธ์ค๋ก ๋ฉ์ธ์ง ๋ธ๋ก์ปค์ ๋ฉ์ธ์ง๋ฅผ ์ ์กํ๋ค.
์ฆ, ๋ฉ์ธ์ง ์ ์ฅ๊น์ง ํ๋์ ํธ๋์ญ์
์ผ๋ก ๋ฌถ์ด ๋ฉ์ธ์ง ๋ฐํ ์์ฒด๋ฅผ ๋ณด์ฅํ๋๋ก ํ๋๊ฒ์ด๋ค.
Architecture
- Sender: ๋๋ฉ์ธ ๋ฉ์ธ์ง๋ฅผ ๋ณด๋ด๋ ์๋น์ค
- Database: ๋๋ฉ์ธ ์ํฐํฐ์ ๋ฉ์ธ์ง Outbox๋ฅผ ์ ์ฅ
- Message Outbox
- Message Relay: ์ ์ฅ๋ ๋ฉ์ธ์ง๋ฅผ ๋ฉ์ธ์ง ๋ธ๋ก์ปค๋ก ์ ์ก
Message Relay
- Scheduler ํน์ ์ด๋ฒคํธ ๊ธฐ๋ฐ์ผ๋ก ์คํ๋๋ฉฐ, ์์๋ฐ์ค์์ ๋ฉ์ธ์ง๋ฅผ ์ฝ์ด์ ๋ฉ์ธ์ง ๋ธ๋ก์ปค๋ก ์ ์กํ๋ค.
- ๋ฉ์ธ์ง ๋ธ๋ก์ปค๋ก ์ ์ก๋ ํ, ์์๋ฐ์ค์์ ์ญ์ ํ๊ฑฐ๋ ์ํ๋ฅผ ์ ๋ฐ์ดํธํ๋ค.
- ์ ์ก์ด ์คํจํ ๊ฒฝ์ฐ, ์ฌ์๋๊ฐ ๊ฐ๋ฅํ๋ค -> ๋ฉ์ธ์ง ์ ์ก์ด ๋ณด์ฅ๋๋ค.
๊ตฌํ
ํด๋น ํจํด์ ๊ตฌํํ๋ ์ฌ๋ฌ๊ฐ์ง ๋ฐฉ๋ฒ์ด ์๊ฒ ์ง๋ง, ์ด๋ฒ ์์ ์์๋ Scheduler๋ฅผ ํ์ฉํ Relay ๋์
Transaction ์ํ์ ๋ฐ๋ผ ์ด๋ฒคํธ๋ฅผ ์ฒ๋ฆฌํ๋ EventListener๋ฅผ ์ฌ์ฉํ๋๋ก ๊ตฌํํ์๋ค. (์ฐธ๊ณ - 29cm์ ์ ์ฉ ์ฌ๋ก)
Transaction ์ํ์ ๋ฐ๋ผ ์ด๋ฒคํธ๋ฅผ ์ฒ๋ฆฌํ๋ EventListener๋ฅผ ์ฌ์ฉํ๋๋ก ๊ตฌํํ์๋ค. (์ฐธ๊ณ - 29cm์ ์ ์ฉ ์ฌ๋ก)
- User ์ ์ฅ: User ๋๋ฉ์ธ ์ ์ฅ ์์ฒญ
- User Database ์ ์ฅ: 1์์ ๋ง๋ ์ํฐํฐ๋ฅผ Database์ ์ ์ฅ
- User ์์ฑ ์ด๋ฒคํธ ๋ฐํ: `ApplicationEventPublisher` ๋ฅผ ์ฌ์ฉํ์ฌ ์ด๋ฒคํธ๋ฅผ ๋ฐํํ๋ค.
- EventRecord ์ ์ฅ: `UserCreateService`์ `create` ํธ๋์ญ์ ์ด ์ปค๋ฐ๋๊ธฐ ์ง์ Database์ Record ์ ๋ณด๋ฅผ ์ ์ฅํ๋ค.
- Kafka ํ๋ก๋์ฑ: `UserCreateService`์ `create` ํธ๋์ญ์ ์ด ์ปค๋ฐ๋๋ ์งํ kafka์ ์ด๋ฒคํธ๋ฅผ ํ๋ก๋์ฑ ํ๋ค.
- EventRecord ๊ฒฐ๊ณผ ์ ์ฅ: Kafka ํ๋ก๋์ฑ ๊ฒฐ๊ณผ์ ํจ๊ป ๊ธฐ์กด Record๋ฅผ ์ ๋ฐ์ดํธ ํน์ ์ญ์ ํ๋ค.
UserCreateService
@Service
class UserCreateService(
private val userRepository: UserRepository,
private val eventPublisher: ApplicationEventPublisher,
) {
val logger: Logger = LoggerFactory.getLogger(this::class.java)
@Transactional // --- 1
fun create(name: String) {
val user = User(name)
userRepository.save(user) // --- 2
eventPublisher.publishEvent(UserEventRecord(user.id, user.name)) // --- 3
logger.info("Complete User Creation")
}
}
- `@Transactional` : ๋๋ฉ์ธ ๋ก์ง๊ณผ Outbox ์ ์ฅ์ ์ํ ์ด๋ฒคํธ ๋ฐํ์ ํ๋์ ํธ๋์ญ์ ๋ฒ์๋ก ์ก๋๋ค.
- user ๋๋ฉ์ธ ์ ์ฅ: ๋๋ฉ์ธ ๋ก์ง์ ์คํํ๋ค.
- event ๋ฐํ: Spring์์ ์ ๊ณตํ๋ `ApplicationEventPublisher` ๋ฅผ ํ์ฉํ์ฌ ์ด๋ฒคํธ๋ฅผ ๋ฐํํ๋ค.
UserEventListener
@Component
class UserEventListener(
private val userEventRecorder: UserEventRecorder,
private val userEventRepository: UserEventRepository,
) {
val logger: Logger = LoggerFactory.getLogger(this::class.java)
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) // --- 1
fun recordMessageHandler(eventRecord: UserEventRecord) { // ---- 2
userEventRecorder.save(eventRecord)
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) // --- 1
fun sendMessageHandler(eventRecord: UserEventRecord) { // --- 3
val event = UserCreateEvent(eventRecord.id, eventRecord.name)
val status: EventRecordStatus = runCatching {
userEventRepository.publishCreateEvent(event)
}.fold(
onSuccess = {
logger.info("Success to Publish Event")
EventRecordStatus.SUCCESS
}, onFailure = {
logger.info("Fail to Publish Event")
EventRecordStatus.FAIL
}
)
userEventRecorder.save(eventRecord.copy(status = status))
}
}
- `@TransactionalEventListener` : ํธ๋์ญ์
์ํ์ ๋ฐ๋ผ ์ด๋ฒคํธ ์ฒ๋ฆฌ ๋ฉ์๋๋ฅผ ์ง์ ํ ์ ์๋ค.
- `TransactionPhase.BEFORE_COMMIT`: Transaction์ด ์ปค๋ฐ๋๊ธฐ ์ง์ ์ ์คํ -> ํด๋น ๋ก์ง์ด ์ฑ๊ณตํด์ผ Commit
- `TransactionPhase.AFTER_COMMIT` : Transaction์ด ์ปค๋ฐ๋๊ณ ๋ ํ ์คํ
- Commit์ ๊ธฐ์ค์ผ๋ก Before๊ณผ After๋ก ๋๋์ด์ ธ ์์ด, ์์ฐจ์คํ ๋ณด์ฅ์ด ๋๋ฏ๋ก ์ด๋ฅผ ์ด์ฉํ์ฌ Transaction Outbox Pattern์ ๊ตฌํ
- `recordMessageHandler()` : Processing ์ค์ธ EventRecord๋ฅผ ์ต์ด ์ ์ฅํ๋ค.
- `sendMessageHandler()` : userEventRepository(Kafka)๋ฅผ ๋ฐํํ๊ณ , ํด๋น ๊ฒฐ๊ณผ์ ๋ฐ๋ผ Record ์ํ ๋ฅผ ์ ๋ฐ์ดํธํ์ฌ ์ ์ฅํ๋ค.
์ด์
ํด๋น ํจํด์ ๋๋ฉ์ธ ๊ฐ์ฒด์ ์์์ฑ์ ๋ณด์ฅํ๋ค.
ํ์ง๋ง Scheduler๋ฅผ ํ์ฉํ๊ฑฐ๋, ์คํจ ์ผ์ด์ค์ ๋ํด ์ฌ์๋ ์๊ตฌ์ฌํญ์ ๊ฐ์ง ๊ฒฝ์ฐ, ๋ฉ์ธ์ง ๋ธ๋ก์ปค๋ก ๋์ผํ ๋ฉ์ธ์ง๊ฐ ์ฌ๋ฌ๋ฒ ๊ฒ์๋ ์ ์๋ค.
๊ฒฐ๊ณผ์ ์ผ๋ก Consumer๋ ๋ฉฑ๋ฑ์ฑ์ ๋ณด์ฅํ ์ ์๋ Idempotent Consumer๋ก ๊ตฌํ๋์ด์ผ ํ๋ค.
์ ๋ฐฉ๋ฒ์ ์ถํ ๊ฒ์๊ธ๋ก ์๊ฐ ํ ์์ ์ด๋ค.
์ ์ฒด ์ฝ๋
https://github.com/jeongum/transactional-outbox-pattern
GitHub - jeongum/transactional-outbox-pattern
Contribute to jeongum/transactional-outbox-pattern development by creating an account on GitHub.
github.com
๋ฐ์ํ