Коннектор Confluent HTTP Sink
Коннектор HTTP Sink не зависит от формата данных, поэтому не требует схемы Kafka и при этом поддерживает специфичные для ClickHouse типы данных, такие как Map и Array. Эта дополнительная гибкость приводит к небольшому усложнению конфигурации.
Ниже описана простая установка, считывающая сообщения из одного топика Kafka и вставляющая строки в таблицу ClickHouse.
HTTP Connector распространяется по лицензии Confluent Enterprise.
Быстрый старт
1. Соберите параметры подключения
Чтобы подключиться к ClickHouse по HTTP(S), вам потребуется следующая информация:
| Параметр(ы) | Описание |
|---|---|
HOST и PORT | Обычно используется порт 8443 при использовании TLS или 8123 при отсутствии TLS. |
DATABASE NAME | По умолчанию существует база данных default; используйте имя базы данных, к которой вы хотите подключиться. |
USERNAME и PASSWORD | По умолчанию имя пользователя — default. Используйте имя пользователя, соответствующее вашему сценарию. |
Сведения о вашем сервисе ClickHouse Cloud доступны в консоли ClickHouse Cloud. Выберите сервис и нажмите Connect:

Выберите HTTPS. Параметры подключения отображаются в примере команды curl.

Если вы используете самостоятельное (self-managed) развертывание ClickHouse, параметры подключения задаются администратором ClickHouse.
2. Запустите Kafka Connect и коннектор HTTP Sink
У вас есть два варианта:
-
Самостоятельное развертывание: Загрузите пакет Confluent и установите его локально. Следуйте инструкциям по установке коннектора, приведённым здесь. Если вы используете метод установки через confluent-hub, ваши локальные файлы конфигурации будут обновлены.
-
Confluent Cloud: Полностью управляемая версия HTTP Sink доступна для тех, кто использует Confluent Cloud для хостинга Kafka. Для этого требуется, чтобы ваша среда ClickHouse была доступна из Confluent Cloud.
В следующих примерах используется Confluent Cloud.
3. Создайте целевую таблицу в ClickHouse
Перед проверкой соединения создадим тестовую таблицу в ClickHouse Cloud, которая будет получать данные из Kafka:
4. Настройте HTTP Sink
Создайте топик Kafka и экземпляр HTTP Sink Connector:

Настройте HTTP Sink Connector:
- Укажите имя созданного топика
- Authentication
HTTP Url— URL-адрес ClickHouse Cloud с указанным запросомINSERT<protocol>://<clickhouse_host>:<clickhouse_port>?query=INSERT%20INTO%20<database>.<table>%20FORMAT%20JSONEachRow. Примечание: запрос должен быть закодирован.Endpoint Authentication type— BASICAuth username— имя пользователя ClickHouseAuth password— пароль ClickHouse
С этим HTTP Url легко допустить ошибку. Убедитесь, что экранирование выполнено точно, чтобы избежать проблем.

- Configuration
Input Kafka record value formatзависит от ваших исходных данных, но в большинстве случаев это JSON или Avro. В дальнейших настройках мы предполагаем форматJSON.- В разделе
advanced configurations:HTTP Request Method— установите POSTRequest Body Format— jsonBatch batch size— согласно рекомендациям ClickHouse установите значение не менее 1000.Batch json as array— trueRetry on HTTP codes— 400-500, но адаптируйте при необходимости, например, это может измениться, если у вас есть HTTP-прокси перед ClickHouse.Maximum Reties— значение по умолчанию (10) подходит, но вы можете скорректировать его для более надежных повторных попыток.

5. Тестирование подключения
Создайте сообщение в топике, настроенном для вашего HTTP Sink

и убедитесь, что созданное сообщение было записано в ваш экземпляр ClickHouse.
Устранение неполадок
HTTP Sink не объединяет сообщения в батчи
Коннектор HTTP Sink не объединяет запросы для сообщений, содержащих значения заголовков Kafka, которые различаются.
- Убедитесь, что все ваши записи в Kafka имеют один и тот же ключ.
- Когда вы добавляете параметры к URL HTTP API, каждая запись может приводить к формированию уникального URL-адреса. По этой причине пакетная отправка отключена при использовании дополнительных параметров URL.
400 bad request
CANNOT_PARSE_QUOTED_STRING
Если HTTP Sink завершает работу с ошибкой со следующим сообщением при вставке JSON-объекта в столбец типа String:
Установите настройку input_format_json_read_objects_as_strings=1 в URL как URL‑кодированную строку SETTINGS%20input_format_json_read_objects_as_strings%3D1
Загрузка набора данных GitHub (необязательно)
Обратите внимание, что в этом примере сохраняются поля типа Array набора данных Github. Мы предполагаем, что у вас есть пустой топик github в примерах и что вы используете kcat для вставки сообщений в Kafka.
1. Подготовьте конфигурацию
Следуйте этим инструкциям по настройке Connect в соответствии с типом вашей установки, обращая внимание на различия между автономным и распределённым кластерами. Если вы используете Confluent Cloud, вам подходит распределённая схема.
Наиболее важным параметром является http.api.url. HTTP‑интерфейс для ClickHouse требует, чтобы вы закодировали выражение INSERT как параметр в URL. Оно должно включать формат (в данном случае JSONEachRow) и целевую базу данных. Формат должен соответствовать формату данных в Kafka, которые будут преобразованы в строку в теле HTTP‑запроса. Эти параметры должны быть URL‑кодированы. Пример такого формата для набора данных Github (предполагая, что вы запускаете ClickHouse локально) показан ниже:
Следующие дополнительные параметры относятся к использованию HTTP Sink с ClickHouse. Полный список параметров можно найти здесь:
request.method- Установите в POST.retry.on.status.codes- Установите в 400–500 для повторных попыток при любых кодах ошибок. Уточните значение на основе ожидаемых ошибок в данных.request.body.format- В большинстве случаев это будет JSON.auth.type- Установите в BASIC, если вы используете базовую аутентификацию в ClickHouse. Другие механизмы аутентификации, совместимые с ClickHouse, в настоящее время не поддерживаются.ssl.enabled- установите в true при использовании SSL.connection.user- имя пользователя для ClickHouse.connection.password- пароль для ClickHouse.batch.max.size- Количество строк, отправляемых в одном пакете. Убедитесь, что это значение достаточно велико. Согласно рекомендациям ClickHouse, значение 1000 следует считать минимальным.tasks.max- Коннектор HTTP Sink поддерживает выполнение одной или нескольких задач. Это можно использовать для повышения производительности. Совместно с размером пакета это ваши основные средства улучшения производительности.key.converter- задайте в соответствии с типами ваших ключей.value.converter- задайте исходя из типа данных в вашем топике. Для этих данных не требуется схема. Формат здесь должен быть согласован с FORMAT, указанным в параметреhttp.api.url. Проще всего использовать JSON и конвертер org.apache.kafka.connect.json.JsonConverter. Также возможно рассматривать значение как строку с помощью конвертера org.apache.kafka.connect.storage.StringConverter, однако это потребует от пользователя извлечения значения в операторе INSERT с использованием функций. Формат Avro также поддерживается в ClickHouse при использовании конвертера io.confluent.connect.avro.AvroConverter.
Полный список настроек, включая конфигурацию прокси, повторные попытки и расширенный SSL, можно найти здесь.
Примеры конфигурационных файлов для тестовых данных GitHub можно найти здесь, при условии, что Kafka Connect запущен в автономном режиме (standalone), а Kafka размещена в Confluent Cloud.
2. Создайте таблицу ClickHouse
Убедитесь, что таблица создана. Ниже приведён пример минимального набора данных GitHub, использующего стандартный движок MergeTree.
3. Добавьте данные в Kafka
Отправьте сообщения в Kafka. Ниже мы используем kcat для отправки 10 000 сообщений.
Простое чтение целевой таблицы «Github» должно подтвердить, что данные были вставлены.