๋ฐ์ํ
๊ตฌํ ํ๊ฒฝ
> Spring Boot 2.7.7
> Kotlin
Kafka ํ๊ฒฝ ์ค์น (Docker)
docker-compose.yml
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.12-2.5.0
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
์๋ ๋ช ๋ น์ด๋ฅผ ์ฌ์ฉํ์ฌ zookeeper / kafka ์คํ
docker-compose up -d
๋์ปค ๋์๋ณด๋ or ๋ช ๋ น์ด๋ฅผ ์ด์ฉํ์ฌ kafka ์ปจํ ์ด๋ ๋ด๋ถ์ ์ง์ ํ์ฌ ์๋ ๋ช ๋ น์ด๋ก ํ ํฝ ์์ฑ
kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test-topic
Kafka ์ค์
gradle ์ค์
dependencies {
implementation("org.springframework.kafka:spring-kafka")
}
application.yml
kafka-consumer:
config:
kafka-test:
bootstrap-servers: localhost:9092
client-id: kafka-test-consumer
group-id: kafka-test-consumer
topic: test-topic
App Config ์ค์
- KafkaConsumerProperties ๊ฐ์ฒด๋ฅผ ๊ฐ๋ AppConfig ์์ฑ (→ app config ํ์ผ์ด ๋ถํ์ํ๋ค๋ฉด ์๋ต ๊ฐ๋ฅ)
@ConstructorBinding
@ConfigurationProperties
data class AppConfig(
val kafkaConsumer: KafkaConsumerProperties
) {
// ์ถํ ์๋์ ๊ฐ์ด ์ถ๊ฐํ์ฌ config ํ์ผ๋ก ์ฌ์ฉํ ์ ์์
companion object {
const val SERVICE_NAME = "kafka-example"
}
}
@SpringBootApplication
@EnableConfigurationProperties(AppConfig::class) // ์ด ๋ถ๋ถ ์ถ๊ฐ!
class KafkaApplication
fun main(args: Array<String>) {
runApplication<KafkaApplication>(*args)
}
Kafka Consumer ๊ด๋ จ ์ค์
KafkaConsumerConfig
- kafka consum์ ์ํ factory๋ฅผ ์์ฑ
- consumerFactory๊ณผ ์๋ฌ ์ฒ๋ฆฌ๋ฅผ ์ํ errorHandler๋ ์ง์
@Configuration
class KafkaConsumerConfig(
private val kafkaConsumerErrorHandler: KafkaConsumerErrorHandler,
) {
@Bean("testKafkaListenerContainerFactory")
fun testKafkaListenerContainerFactory(
appConfig: AppConfig
): ConcurrentKafkaListenerContainerFactory<String, String> =
ConcurrentKafkaListenerContainerFactory<String, String>().apply {
consumerFactory = appConfig.kafkaConsumer.consumerFactory(KAFKA_TEST)
setCommonErrorHandler(kafkaConsumerErrorHandler)
}
companion object {
const val KAFKA_TEST= "kafka-test"
}
}
KafkaConsumerProperties
- kafka ๊ด๋ จ properties๋ฅผ ๊ฐ์ง๊ณ ์๋ ๊ฐ์ฒด
- https://kafka.apache.org/documentation/#consumerconfigs
data class KafkaConsumerProperties(
val config: Map<String, KafkaConsumerConfig>,
val keyDeserializerClass: Class<*> = StringDeserializer::class.java,
val valueDeserializerClass: Class<*> = StringDeserializer::class.java,
val autoOffsetReset: String = "earliest" // earlist or lastest
) {
data class KafkaConsumerConfig(
val bootstrapServers: String,
val clientId: String,
val groupId: String, // ํ์์ ์ฌ์ฉ
)
fun propertiesMap(kafkaConsumerConfig: KafkaConsumerConfig): Map<String, Any?> =
mutableMapOf<String, Any?>(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaConsumerConfig.bootstrapServers,
ConsumerConfig.CLIENT_ID_CONFIG to kafkaConsumerConfig.clientId,
ConsumerConfig.GROUP_ID_CONFIG to kafkaConsumerConfig.groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to keyDeserializerClass,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to valueDeserializerClass,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to autoOffsetReset,
).toMap()
fun <V> consumerFactory(key: String): ConsumerFactory<String, V> {
return DefaultKafkaConsumerFactory(propertiesMap(config[key]!!))
}
}
Consumer ๊ตฌํ
@Component
@KafkaListener(
topics = ["\\${kafka-consumer.config.kafka-test.topic}"],
clientIdPrefix = "\\${kafka-consumer.config.kafka-test.client-id}",
containerFactory = "testKafkaListenerContainerFactory"
)
class TestConsumer {
@KafkaHandler
fun listen(msg: String){
println(msg)
}
}
- application.yml์ ์ ์ํ topic์ ๊ฐ์ง๊ณ ์ค๋๋ก ์ง์
- clientIfPrefix (ํ์ x) : ์ด๋ฅผ ์ค์ ํ์ง ์์ ๊ฒฝ์ฐ, ์ดํ๋ฆฌ์ผ์ด์ ๋ด ๋์ผํ clientId๋ฅผ ์ฌ์ฉํ๋ ๊ณณ์ด ์๋ค๋ฉด, InstanceAlreadyExistsException ์ ๋ฐ์ → ์ด๋ฅผ ๋ฐฉ์งํ๊ธฐ ์ํด ๊ฐ๊ฐ consumer๋ง๋ค clientIdPrefix๋ฅผ ๋ค๋ฅด๊ฒ ์ง์ ํ๋ ๊ฒ์ด ํ์
์คํ ๊ฒฐ๊ณผ
Produce ๋ช ๋ น์ด๋ฅผ ๋ ๋ ค์ ์ consumeํ๋์ง ํ์ธ
kafka-console-producer.sh --topic test-topic --broker-list localhost:9092
> this is a msg from doteloper
https://github.com/jeongum/kafka-test
๋ฐ์ํ