Интеграция ClickHouse с Kafka с использованием именованных коллекций
Введение
В этом руководстве мы рассмотрим, как подключить ClickHouse к Kafka с использованием именованных коллекций. Использование файла конфигурации для именованных коллекций дает несколько преимуществ:
- Централизованное и более простое управление настройками.
- Изменения настроек можно вносить без изменения SQL-определений таблиц.
- Более удобный просмотр и отладка конфигурации за счет анализа одного файла конфигурации.
Это руководство было проверено на Apache Kafka 3.4.1 и ClickHouse 24.5.1.
Предварительные условия
В этом документе предполагается, что у вас уже есть:
- Работоспособный кластер Kafka.
- Развернутый и запущенный кластер ClickHouse.
- Базовые знания SQL и опыт работы с конфигурациями ClickHouse и Kafka.
Предварительные требования
Убедитесь, что пользователь, создающий именованную коллекцию, обладает необходимыми правами доступа:
<access_management>1</access_management>
<named_collection_control>1</named_collection_control>
<show_named_collections>1</show_named_collections>
<show_named_collections_secrets>1</show_named_collections_secrets>
См. руководство User Management Guide для получения подробной информации о включении управления доступом.
Конфигурация
Добавьте следующий раздел в файл конфигурации ClickHouse config.xml:
<!-- Именованные коллекции для интеграции Kafka -->
<named_collections>
<cluster_1>
<!-- Параметры движка ClickHouse Kafka -->
<kafka_broker_list>c1-kafka-1:9094,c1-kafka-2:9094,c1-kafka-3:9094</kafka_broker_list>
<kafka_topic_list>cluster_1_clickhouse_topic</kafka_topic_list>
<kafka_group_name>cluster_1_clickhouse_consumer</kafka_group_name>
<kafka_format>JSONEachRow</kafka_format>
<kafka_commit_every_batch>0</kafka_commit_every_batch>
<kafka_num_consumers>1</kafka_num_consumers>
<kafka_thread_per_consumer>1</kafka_thread_per_consumer>
<!-- Расширенная конфигурация Kafka -->
<kafka>
<security_protocol>SASL_SSL</security_protocol>
<enable_ssl_certificate_verification>false</enable_ssl_certificate_verification>
<sasl_mechanism>PLAIN</sasl_mechanism>
<sasl_username>kafka-client</sasl_username>
<sasl_password>kafkapassword1</sasl_password>
<debug>all</debug>
<auto_offset_reset>latest</auto_offset_reset>
</kafka>
</cluster_1>
<cluster_2>
<!-- Параметры движка ClickHouse Kafka -->
<kafka_broker_list>c2-kafka-1:29094,c2-kafka-2:29094,c2-kafka-3:29094</kafka_broker_list>
<kafka_topic_list>cluster_2_clickhouse_topic</kafka_topic_list>
<kafka_group_name>cluster_2_clickhouse_consumer</kafka_group_name>
<kafka_format>JSONEachRow</kafka_format>
<kafka_commit_every_batch>0</kafka_commit_every_batch>
<kafka_num_consumers>1</kafka_num_consumers>
<kafka_thread_per_consumer>1</kafka_thread_per_consumer>
<!-- Расширенная конфигурация Kafka -->
<kafka>
<security_protocol>SASL_SSL</security_protocol>
<enable_ssl_certificate_verification>false</enable_ssl_certificate_verification>
<sasl_mechanism>PLAIN</sasl_mechanism>
<sasl_username>kafka-client</sasl_username>
<sasl_password>kafkapassword2</sasl_password>
<debug>all</debug>
<auto_offset_reset>latest</auto_offset_reset>
</kafka>
</cluster_2>
</named_collections>
Примечания по конфигурации
- Настройте адреса Kafka и связанные параметры в соответствии с конфигурацией вашего кластера Kafka.
- Раздел перед
<kafka> содержит параметры движка Kafka в ClickHouse. Полный список параметров смотрите в разделе параметры движка Kafka.
- Раздел внутри
<kafka> содержит расширенные параметры конфигурации Kafka. Дополнительные параметры смотрите в документации по конфигурации librdkafka.
- В этом примере используется протокол безопасности
SASL_SSL и механизм PLAIN. При необходимости скорректируйте эти параметры в соответствии с конфигурацией вашего кластера Kafka.
Создание таблиц и баз данных
Создайте необходимые базы данных и таблицы в вашем кластере ClickHouse. Если вы запускаете ClickHouse на отдельном узле, опустите часть SQL-команды, относящуюся к кластеру, и используйте любой другой движок вместо ReplicatedMergeTree.
Создание базы данных
CREATE DATABASE kafka_testing ON CLUSTER LAB_CLICKHOUSE_CLUSTER;
Создайте таблицы Kafka
Создайте первую таблицу Kafka для первого кластера Kafka:
CREATE TABLE kafka_testing.first_kafka_table ON CLUSTER LAB_CLICKHOUSE_CLUSTER
(
`id` UInt32,
`first_name` String,
`last_name` String
)
ENGINE = Kafka(cluster_1);
Создайте вторую таблицу Kafka для второго кластера Kafka:
CREATE TABLE kafka_testing.second_kafka_table ON CLUSTER STAGE_CLICKHOUSE_CLUSTER
(
`id` UInt32,
`first_name` String,
`last_name` String
)
ENGINE = Kafka(cluster_2);
Создание реплицируемых таблиц
Создайте первую таблицу Kafka:
CREATE TABLE kafka_testing.first_replicated_table ON CLUSTER STAGE_CLICKHOUSE_CLUSTER
(
`id` UInt32,
`first_name` String,
`last_name` String
) ENGINE = ReplicatedMergeTree()
ORDER BY id;
Создайте вторую таблицу Kafka:
CREATE TABLE kafka_testing.second_replicated_table ON CLUSTER STAGE_CLICKHOUSE_CLUSTER
(
`id` UInt32,
`first_name` String,
`last_name` String
) ENGINE = ReplicatedMergeTree()
ORDER BY id;
Создание материализованных представлений
Создайте материализованное представление, которое будет вставлять данные из первой таблицы Kafka в первую реплицируемую таблицу:
CREATE MATERIALIZED VIEW kafka_testing.cluster_1_mv ON CLUSTER STAGE_CLICKHOUSE_CLUSTER TO first_replicated_table AS
SELECT
id,
first_name,
last_name
FROM first_kafka_table;
Создайте материализованное представление, которое будет вставлять данные из второй таблицы Kafka во вторую реплицированную таблицу:
CREATE MATERIALIZED VIEW kafka_testing.cluster_2_mv ON CLUSTER STAGE_CLICKHOUSE_CLUSTER TO second_replicated_table AS
SELECT
id,
first_name,
last_name
FROM second_kafka_table;
Проверка настройки
Теперь в ваших кластерах Kafka должны появиться соответствующие группы потребителей:
cluster_1_clickhouse_consumer в cluster_1
cluster_2_clickhouse_consumer в cluster_2
Выполните следующие запросы на любом из узлов ClickHouse, чтобы увидеть данные в обеих таблицах:
SELECT * FROM first_replicated_table LIMIT 10;
SELECT * FROM second_replicated_table LIMIT 10;
Примечание
В этом руководстве данные, поступающие в оба топика Kafka, одинаковы. В вашем случае они будут различаться. Вы можете добавить столько кластеров Kafka, сколько потребуется.
Пример вывода:
┌─id─┬─first_name─┬─last_name─┐
│ 0 │ FirstName0 │ LastName0 │
│ 1 │ FirstName1 │ LastName1 │
│ 2 │ FirstName2 │ LastName2 │
└────┴────────────┴───────────┘
На этом настройка интеграции ClickHouse с Kafka с использованием именованных коллекций завершена. Вынеся конфигурацию Kafka в файл config.xml ClickHouse, вы сможете проще управлять настройками и корректировать их по мере необходимости, обеспечивая более удобную и эффективную интеграцию.