Использование Vector с Kafka и ClickHouse
Использование Vector с Kafka и ClickHouse
Vector — это независимый от поставщиков конвейер обработки данных, который может читать из Kafka и отправлять события в ClickHouse.
Руководство по началу работы с Vector и ClickHouse сфокусировано на сценарии с логами и чтении событий из файла. Мы используем пример набора данных GitHub с событиями, размещёнными в топике Kafka.
Vector использует sources для получения данных по push- или pull-модели. Sinks, в свою очередь, являются приёмниками событий. Поэтому мы используем источник Kafka и приёмник ClickHouse. Обратите внимание, что хотя Kafka поддерживается как Sink, источник ClickHouse недоступен. В результате Vector не подходит пользователям, желающим передавать данные из ClickHouse в Kafka.
Vector также поддерживает transformation данных. Это выходит за рамки данного руководства. При необходимости применения трансформаций к своему набору данных пользователю следует обратиться к документации Vector.
Обратите внимание, что текущая реализация приёмника ClickHouse использует HTTP-интерфейс. Приёмник ClickHouse в данный момент не поддерживает использование JSON-схемы. Данные должны публиковаться в Kafka либо в виде обычного JSON-формата, либо в виде строк (Strings).
Лицензия
Vector распространяется по лицензии MPL-2.0
Сбор параметров подключения
Чтобы подключиться к ClickHouse по HTTP(S), вам потребуется следующая информация:
| Параметр(ы) | Описание |
|---|---|
HOST и PORT | Обычно используется порт 8443 при использовании TLS или 8123 при отсутствии TLS. |
DATABASE NAME | По умолчанию существует база данных default; используйте имя базы данных, к которой вы хотите подключиться. |
USERNAME и PASSWORD | По умолчанию имя пользователя — default. Используйте имя пользователя, соответствующее вашему сценарию. |
Сведения о вашем сервисе ClickHouse Cloud доступны в консоли ClickHouse Cloud. Выберите сервис и нажмите Connect:

Выберите HTTPS. Параметры подключения отображаются в примере команды curl.

Если вы используете самостоятельное (self-managed) развертывание ClickHouse, параметры подключения задаются администратором ClickHouse.
Шаги
- Создайте топик Kafka
githubи вставьте в него набор данных GitHub.
Этот набор данных содержит 200 000 строк из репозитория ClickHouse/ClickHouse.
- Убедитесь, что целевая таблица создана. Ниже мы используем базу данных по умолчанию.
CREATE TABLE github ( file_time DateTime, event_type Enum('CommitCommentEvent' = 1, 'CreateEvent' = 2, 'DeleteEvent' = 3, 'ForkEvent' = 4, 'GollumEvent' = 5, 'IssueCommentEvent' = 6, 'IssuesEvent' = 7, 'MemberEvent' = 8, 'PublicEvent' = 9, 'PullRequestEvent' = 10, 'PullRequestReviewCommentEvent' = 11, 'PushEvent' = 12, 'ReleaseEvent' = 13, 'SponsorshipEvent' = 14, 'WatchEvent' = 15, 'GistEvent' = 16, 'FollowEvent' = 17, 'DownloadEvent' = 18, 'PullRequestReviewEvent' = 19, 'ForkApplyEvent' = 20, 'Event' = 21, 'TeamAddEvent' = 22), actor_login LowCardinality(String), repo_name LowCardinality(String), created_at DateTime, updated_at DateTime, action Enum('none' = 0, 'created' = 1, 'added' = 2, 'edited' = 3, 'deleted' = 4, 'opened' = 5, 'closed' = 6, 'reopened' = 7, 'assigned' = 8, 'unassigned' = 9, 'labeled' = 10, 'unlabeled' = 11, 'review_requested' = 12, 'review_request_removed' = 13, 'synchronize' = 14, 'started' = 15, 'published' = 16, 'update' = 17, 'create' = 18, 'fork' = 19, 'merged' = 20), comment_id UInt64, path String, ref LowCardinality(String), ref_type Enum('none' = 0, 'branch' = 1, 'tag' = 2, 'repository' = 3, 'unknown' = 4), creator_user_login LowCardinality(String), number UInt32, title String, labels Array(LowCardinality(String)), state Enum('none' = 0, 'open' = 1, 'closed' = 2), assignee LowCardinality(String), assignees Array(LowCardinality(String)), closed_at DateTime, merged_at DateTime, merge_commit_sha String, requested_reviewers Array(LowCardinality(String)), merged_by LowCardinality(String), review_comments UInt32, member_login LowCardinality(String) ) ENGINE = MergeTree ORDER BY (event_type, repo_name, created_at);
Несколько важных замечаний о данной конфигурации и поведении Vector:
- Этот пример был протестирован с Confluent Cloud. Поэтому параметры безопасности
sasl.*иssl.enabledмогут быть неприменимы в сценариях с самостоятельным управлением кластером. - Префикс протокола не требуется для параметра конфигурации
bootstrap_servers, например:pkc-2396y.us-east-1.aws.confluent.cloud:9092. - Параметр источника
decoding.codec = "json"гарантирует, что сообщение передается в ClickHouse sink как один JSON-объект. При обработке сообщений как строк (String) и использовании значения по умолчаниюbytesсодержимое сообщения будет добавлено в полеmessage. В большинстве случаев это потребует обработки в ClickHouse, как описано в руководстве Vector getting started. - Vector добавляет ряд полей к сообщениям. В нашем примере мы игнорируем эти поля в ClickHouse sink с помощью параметра конфигурации
skip_unknown_fields = true. Это приводит к игнорированию полей, которые не являются частью схемы целевой таблицы. При необходимости скорректируйте схему, чтобы такие мета-поля, какoffset, также добавлялись. - Обратите внимание, как sink ссылается на источник событий через параметр
inputs. - Обратите внимание на поведение ClickHouse sink, описанное здесь. Для оптимальной пропускной способности пользователи могут настроить параметры
buffer.max_events,batch.timeout_secsиbatch.max_bytes. Согласно рекомендациям ClickHouse, значение 1000 следует рассматривать как минимальное количество событий в одном пакете. Для сценариев с равномерно высокой пропускной способностью пользователи могут увеличить параметрbuffer.max_events. При более переменной нагрузке могут потребоваться изменения параметраbatch.timeout_secs. - Параметр
auto_offset_reset = "smallest"заставляет источник Kafka начинать чтение с начала топика, гарантируя, что мы потребляем сообщения, опубликованные на шаге (1). Пользователям может потребоваться иное поведение. См. подробности здесь.
- Запустите Vector
По умолчанию перед началом вставки данных в ClickHouse требуется проверка работоспособности. Это гарантирует, что можно установить соединение и прочитать схему. Добавьте перед командой VECTOR_LOG=debug, чтобы получить более подробные логи, которые могут быть полезны при возникновении проблем.
- Подтвердите вставку данных.
| Количество |
|---|
| 200000 |