๋ฐ์ํ
    
    
    
  ๊ตฌํ ํ๊ฒฝ
> Spring Boot 2.7.7
> Kotlin
Kafka ํ๊ฒฝ ์ค์น (Docker)
docker-compose.yml
version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka:2.12-2.5.0
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
์๋ ๋ช ๋ น์ด๋ฅผ ์ฌ์ฉํ์ฌ zookeeper / kafka ์คํ
docker-compose up -d
๋์ปค ๋์๋ณด๋ or ๋ช ๋ น์ด๋ฅผ ์ด์ฉํ์ฌ kafka ์ปจํ ์ด๋ ๋ด๋ถ์ ์ง์ ํ์ฌ ์๋ ๋ช ๋ น์ด๋ก ํ ํฝ ์์ฑ
kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test-topic
Kafka ์ค์ 
gradle ์ค์ 
dependencies {
	implementation("org.springframework.kafka:spring-kafka")
}
application.yml
kafka-consumer:
  config:
    kafka-test:
      bootstrap-servers: localhost:9092
      client-id: kafka-test-consumer
      group-id: kafka-test-consumer
      topic: test-topic
App Config ์ค์ 
- KafkaConsumerProperties ๊ฐ์ฒด๋ฅผ ๊ฐ๋ AppConfig ์์ฑ (→ app config ํ์ผ์ด ๋ถํ์ํ๋ค๋ฉด ์๋ต ๊ฐ๋ฅ)
 
@ConstructorBinding
@ConfigurationProperties
data class AppConfig(
    val kafkaConsumer: KafkaConsumerProperties
) {
	// ์ถํ ์๋์ ๊ฐ์ด ์ถ๊ฐํ์ฌ config ํ์ผ๋ก ์ฌ์ฉํ  ์ ์์
	companion object {
		const val SERVICE_NAME = "kafka-example"
	}
}
@SpringBootApplication
@EnableConfigurationProperties(AppConfig::class) // ์ด ๋ถ๋ถ ์ถ๊ฐ!
class KafkaApplication
fun main(args: Array<String>) {
	runApplication<KafkaApplication>(*args)
}
Kafka Consumer ๊ด๋ จ ์ค์ 
KafkaConsumerConfig
- kafka consum์ ์ํ factory๋ฅผ ์์ฑ
 - consumerFactory๊ณผ ์๋ฌ ์ฒ๋ฆฌ๋ฅผ ์ํ errorHandler๋ ์ง์ 
 
@Configuration
class KafkaConsumerConfig(
    private val kafkaConsumerErrorHandler: KafkaConsumerErrorHandler,
) {
    @Bean("testKafkaListenerContainerFactory")
    fun testKafkaListenerContainerFactory(
        appConfig: AppConfig
    ): ConcurrentKafkaListenerContainerFactory<String, String> =
        ConcurrentKafkaListenerContainerFactory<String, String>().apply {
            consumerFactory = appConfig.kafkaConsumer.consumerFactory(KAFKA_TEST)
            setCommonErrorHandler(kafkaConsumerErrorHandler)
        }
    companion object {
        const val KAFKA_TEST= "kafka-test"
    }
}
KafkaConsumerProperties
- kafka ๊ด๋ จ properties๋ฅผ ๊ฐ์ง๊ณ ์๋ ๊ฐ์ฒด
 - https://kafka.apache.org/documentation/#consumerconfigs
 
Apache Kafka
Apache Kafka: A Distributed Streaming Platform.
kafka.apache.org
data class KafkaConsumerProperties(
    val config: Map<String, KafkaConsumerConfig>,
    val keyDeserializerClass: Class<*> = StringDeserializer::class.java,
    val valueDeserializerClass: Class<*> = StringDeserializer::class.java,
    val autoOffsetReset: String = "earliest" // earlist or lastest
) {
    data class KafkaConsumerConfig(
        val bootstrapServers: String,
        val clientId: String,
        val groupId: String,    // ํ์์ ์ฌ์ฉ
    )
    fun propertiesMap(kafkaConsumerConfig: KafkaConsumerConfig): Map<String, Any?> =
        mutableMapOf<String, Any?>(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaConsumerConfig.bootstrapServers,
            ConsumerConfig.CLIENT_ID_CONFIG to kafkaConsumerConfig.clientId,
            ConsumerConfig.GROUP_ID_CONFIG to kafkaConsumerConfig.groupId,
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to keyDeserializerClass,
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to valueDeserializerClass,
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to autoOffsetReset,
        ).toMap()
    fun <V> consumerFactory(key: String): ConsumerFactory<String, V> {
        return DefaultKafkaConsumerFactory(propertiesMap(config[key]!!))
    }
}
Consumer ๊ตฌํ
@Component
@KafkaListener(
    topics = ["\\${kafka-consumer.config.kafka-test.topic}"],
    clientIdPrefix = "\\${kafka-consumer.config.kafka-test.client-id}",
    containerFactory = "testKafkaListenerContainerFactory"
)
class TestConsumer {
    @KafkaHandler
    fun listen(msg: String){
        println(msg)
    }
}
- application.yml์ ์ ์ํ topic์ ๊ฐ์ง๊ณ ์ค๋๋ก ์ง์ 
 - clientIfPrefix (ํ์ x) : ์ด๋ฅผ ์ค์ ํ์ง ์์ ๊ฒฝ์ฐ, ์ดํ๋ฆฌ์ผ์ด์  ๋ด ๋์ผํ clientId๋ฅผ ์ฌ์ฉํ๋ ๊ณณ์ด ์๋ค๋ฉด, InstanceAlreadyExistsException ์ ๋ฐ์ → ์ด๋ฅผ ๋ฐฉ์งํ๊ธฐ ์ํด ๊ฐ๊ฐ consumer๋ง๋ค clientIdPrefix๋ฅผ ๋ค๋ฅด๊ฒ ์ง์ ํ๋ ๊ฒ์ด ํ์
 
์คํ ๊ฒฐ๊ณผ
Produce ๋ช ๋ น์ด๋ฅผ ๋ ๋ ค์ ์ consumeํ๋์ง ํ์ธ
kafka-console-producer.sh --topic test-topic --broker-list localhost:9092
> this is a msg from doteloper

https://github.com/jeongum/kafka-test
GitHub - jeongum/kafka-test: kafka consumer example (springboot)
kafka consumer example (springboot). Contribute to jeongum/kafka-test development by creating an account on GitHub.
github.com
๋ฐ์ํ