๐Ÿ’ป ๊ฐœ๋ฐœ ์ผ์ง€/SpringBoot

[SpringBoot] Kafka ์—ฐ๋™(docker) ๋ฐ Consumer ๊ตฌํ˜„ํ•˜๊ธฐ

์ ์ด 2023. 1. 11. 19:11
๋ฐ˜์‘ํ˜•

๊ตฌํ˜„ ํ™˜๊ฒฝ

> Spring Boot 2.7.7

> Kotlin


Kafka ํ™˜๊ฒฝ ์„ค์น˜ (Docker)

docker-compose.yml

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka:2.12-2.5.0
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

์•„๋ž˜ ๋ช…๋ น์–ด๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ zookeeper / kafka ์‹คํ–‰

docker-compose up -d

๋„์ปค ๋Œ€์‹œ๋ณด๋“œ or ๋ช…๋ น์–ด๋ฅผ ์ด์šฉํ•˜์—ฌ kafka ์ปจํ…Œ์ด๋„ˆ ๋‚ด๋ถ€์— ์ง„์ž…ํ•˜์—ฌ ์•„๋ž˜ ๋ช…๋ น์–ด๋กœ ํ† ํ”ฝ ์ƒ์„ฑ

kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test-topic

Kafka ์„ค์ •

gradle ์„ค์ •

dependencies {
	implementation("org.springframework.kafka:spring-kafka")
}

application.yml

kafka-consumer:
  config:
    kafka-test:
      bootstrap-servers: localhost:9092
      client-id: kafka-test-consumer
      group-id: kafka-test-consumer
      topic: test-topic

App Config ์„ค์ •

  • KafkaConsumerProperties ๊ฐ์ฒด๋ฅผ ๊ฐ–๋Š” AppConfig ์ƒ์„ฑ (→ app config ํŒŒ์ผ์ด ๋ถˆํ•„์š”ํ•˜๋‹ค๋ฉด ์ƒ๋žต ๊ฐ€๋Šฅ)
@ConstructorBinding
@ConfigurationProperties
data class AppConfig(
    val kafkaConsumer: KafkaConsumerProperties
) {
	// ์ถ”ํ›„ ์•„๋ž˜์™€ ๊ฐ™์ด ์ถ”๊ฐ€ํ•˜์—ฌ config ํŒŒ์ผ๋กœ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Œ
	companion object {
		const val SERVICE_NAME = "kafka-example"
	}
}
@SpringBootApplication
@EnableConfigurationProperties(AppConfig::class) // ์ด ๋ถ€๋ถ„ ์ถ”๊ฐ€!
class KafkaApplication

fun main(args: Array<String>) {
	runApplication<KafkaApplication>(*args)
}

Kafka Consumer ๊ด€๋ จ ์„ค์ •

KafkaConsumerConfig

  • kafka consum์„ ์œ„ํ•œ factory๋ฅผ ์ƒ์„ฑ
  • consumerFactory๊ณผ ์—๋Ÿฌ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•œ errorHandler๋„ ์ง€์ •
@Configuration
class KafkaConsumerConfig(
    private val kafkaConsumerErrorHandler: KafkaConsumerErrorHandler,
) {
    @Bean("testKafkaListenerContainerFactory")
    fun testKafkaListenerContainerFactory(
        appConfig: AppConfig
    ): ConcurrentKafkaListenerContainerFactory<String, String> =
        ConcurrentKafkaListenerContainerFactory<String, String>().apply {
            consumerFactory = appConfig.kafkaConsumer.consumerFactory(KAFKA_TEST)
            setCommonErrorHandler(kafkaConsumerErrorHandler)
        }

    companion object {
        const val KAFKA_TEST= "kafka-test"
    }
}

KafkaConsumerProperties

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

data class KafkaConsumerProperties(
    val config: Map<String, KafkaConsumerConfig>,
    val keyDeserializerClass: Class<*> = StringDeserializer::class.java,
    val valueDeserializerClass: Class<*> = StringDeserializer::class.java,
    val autoOffsetReset: String = "earliest" // earlist or lastest
) {
    data class KafkaConsumerConfig(
        val bootstrapServers: String,
        val clientId: String,
        val groupId: String,    // ํ•„์š”์‹œ ์‚ฌ์šฉ
    )

    fun propertiesMap(kafkaConsumerConfig: KafkaConsumerConfig): Map<String, Any?> =
        mutableMapOf<String, Any?>(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaConsumerConfig.bootstrapServers,
            ConsumerConfig.CLIENT_ID_CONFIG to kafkaConsumerConfig.clientId,
            ConsumerConfig.GROUP_ID_CONFIG to kafkaConsumerConfig.groupId,
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to keyDeserializerClass,
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to valueDeserializerClass,
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to autoOffsetReset,
        ).toMap()

    fun <V> consumerFactory(key: String): ConsumerFactory<String, V> {
        return DefaultKafkaConsumerFactory(propertiesMap(config[key]!!))
    }
}

Consumer ๊ตฌํ˜„

@Component
@KafkaListener(
    topics = ["\\${kafka-consumer.config.kafka-test.topic}"],
    clientIdPrefix = "\\${kafka-consumer.config.kafka-test.client-id}",
    containerFactory = "testKafkaListenerContainerFactory"
)
class TestConsumer {
    @KafkaHandler
    fun listen(msg: String){
        println(msg)
    }
}
  • application.yml์— ์ •์˜ํ•œ topic์„ ๊ฐ€์ง€๊ณ  ์˜ค๋„๋ก ์ง€์ •
  • clientIfPrefix (ํ•„์ˆ˜ x) : ์ด๋ฅผ ์„ค์ •ํ•˜์ง€ ์•Š์„ ๊ฒฝ์šฐ, ์–ดํ”Œ๋ฆฌ์ผ€์ด์…˜ ๋‚ด ๋™์ผํ•œ clientId๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ณณ์ด ์žˆ๋‹ค๋ฉด, InstanceAlreadyExistsException ์„ ๋ฐœ์ƒ → ์ด๋ฅผ ๋ฐฉ์ง€ํ•˜๊ธฐ ์œ„ํ•ด ๊ฐ๊ฐ consumer๋งˆ๋‹ค clientIdPrefix๋ฅผ ๋‹ค๋ฅด๊ฒŒ ์ง€์ •ํ•˜๋Š” ๊ฒƒ์ด ํ•„์š”

์‹คํ–‰ ๊ฒฐ๊ณผ

Produce ๋ช…๋ น์–ด๋ฅผ ๋‚ ๋ ค์„œ ์ž˜ consumeํ•˜๋Š”์ง€ ํ™•์ธ

kafka-console-producer.sh --topic test-topic --broker-list localhost:9092
> this is a msg from doteloper

https://github.com/jeongum/kafka-test

 

GitHub - jeongum/kafka-test: kafka consumer example (springboot)

kafka consumer example (springboot). Contribute to jeongum/kafka-test development by creating an account on GitHub.

github.com

 

๋ฐ˜์‘ํ˜•