Движок таблицы S3Queue
Этот движок обеспечивает интеграцию с экосистемой Amazon S3 и позволяет выполнять потоковый импорт. Он аналогичен движкам Kafka и RabbitMQ, но предоставляет функции, специфичные для S3.
Важно учитывать следующее примечание из исходного PR по реализации S3Queue: когда к таблице с этим движком присоединяется MATERIALIZED VIEW, движок таблицы S3Queue начинает собирать данные в фоновом режиме.
Создать таблицу
До версии 24.7 требуется использовать префикс s3queue_ для всех настроек, кроме mode, after_processing и keeper_path.
Параметры движка
Параметры S3Queue такие же, как у табличного движка S3. См. раздел «Параметры» здесь.
Пример
Использование коллекций с именами:
Настройки
Чтобы получить список настроек, заданных для таблицы, используйте таблицу system.s3_queue_settings. Доступно, начиная с версии 24.10.
Mode
Возможные значения:
- unordered — В режиме unordered множество всех уже обработанных файлов отслеживается с помощью постоянных узлов в ZooKeeper.
- ordered — В режиме ordered файлы обрабатываются в лексикографическом порядке. Это означает, что если файл с именем
BBBбыл обработан в какой‑то момент, а позже в бакет был добавлен файл с именемAA, он будет проигнорирован. В ZooKeeper сохраняются только максимальное имя (в лексикографическом смысле) успешно обработанного файла и имена файлов, которые будут повторно загружены после неудачной попытки загрузки.
Значение по умолчанию: ordered в версиях до 24.6. Начиная с 24.6 значение по умолчанию отсутствует, настройку требуется указывать вручную. Для таблиц, созданных в более ранних версиях, значение по умолчанию останется ordered для сохранения совместимости.
after_processing
Что делать с файлом после успешной обработки.
Возможные значения:
- keep.
- delete.
- move.
- tag.
Значение по умолчанию: keep.
Для варианта move требуются дополнительные настройки. В случае перемещения в пределах того же бакета необходимо указать новый префикс пути в параметре after_processing_move_prefix.
Перемещение в другой S3‑бакет требует указания URI целевого бакета в параметре after_processing_move_uri, а также учетных данных доступа к S3 в параметрах after_processing_move_access_key_id и after_processing_move_secret_access_key.
Пример:
Для перемещения данных из одного контейнера Azure в другой необходимо указать строку подключения Blob Storage в параметре after_processing_move_connection_string и имя контейнера в параметре after_processing_move_container. См. настройки AzureQueue.
Для добавления тегов необходимо указать ключ и значение тега в параметрах after_processing_tag_key и after_processing_tag_value.
after_processing_retries
Количество повторных попыток выполнения запрошенного действия постобработки перед тем, как прекратить попытки.
Возможные значения:
- Неотрицательное целое число.
Значение по умолчанию: 10.
after_processing_move_access_key_id
ID ключа доступа (Access Key ID) для S3‑бакета, в который нужно переместить успешно обработанные файлы, если целевым местом назначения является другой S3‑бакет.
Возможные значения:
- Строка.
Значение по умолчанию: пустая строка.
after_processing_move_prefix
Префикс пути, в который перемещаются успешно обработанные файлы. Применимо как при перемещении в пределах того же бакета, так и в другой бакет.
Возможные значения:
- Строка.
Значение по умолчанию: пустая строка.
after_processing_move_secret_access_key
Secret Access Key для S3‑бакета, в который нужно перемещать успешно обработанные файлы, если целевой ресурс — другой S3‑бакет.
Возможные значения:
- Строка.
Значение по умолчанию: пустая строка.
after_processing_move_uri
URI S3-бакета, в который следует перемещать успешно обработанные файлы, если местом назначения является другой S3-бакет.
Возможные значения:
- Строка.
Значение по умолчанию: пустая строка.
after_processing_tag_key
Ключ тега, который будет использоваться для пометки успешно обработанных файлов, если after_processing='tag'.
Возможные значения:
- Строка.
Значение по умолчанию: пустая строка.
after_processing_tag_value
Значение тега, которое будет присвоено успешно обработанным файлам, если after_processing='tag'.
Возможные значения:
- Строка.
Значение по умолчанию: пустая строка.
keeper_path
Путь в ZooKeeper может быть задан в параметрах движка таблицы, либо путь по умолчанию может быть сформирован из пути, указанного в глобальной конфигурации, и UUID таблицы. Возможные значения:
- строка.
Значение по умолчанию: /.
s3queue_loading_retries
Повторять загрузку файла до указанного количества раз. По умолчанию повторы не выполняются. Возможные значения:
- Положительное целое число.
Значение по умолчанию: 0.
s3queue_processing_threads_num
Количество потоков обработки. Применяется только в режиме Unordered.
Значение по умолчанию: количество CPU или 16.
s3queue_parallel_inserts
По умолчанию processing_threads_num будет выполнять один INSERT, поэтому файлы будут только загружаться и парситься в несколько потоков.
Но это ограничивает степень параллелизма, поэтому для лучшей пропускной способности используйте parallel_inserts=true — это позволит вставлять данные параллельно (но имейте в виду, что это приведёт к большему количеству создаваемых частей данных для семейства движков MergeTree).
INSERT-запросы будут создаваться с учётом настроек max_process*_before_commit.
Значение по умолчанию: false.
s3queue_enable_logging_to_s3queue_log
Включает логирование в system.s3queue_log.
Значение по умолчанию: 0.
s3queue_polling_min_timeout_ms
Указывает минимальное время в миллисекундах, которое ClickHouse ожидает перед следующей попыткой опроса.
Возможные значения:
- Положительное целое число.
Значение по умолчанию: 1000.
s3queue_polling_max_timeout_ms
Определяет максимальное время в миллисекундах, в течение которого ClickHouse ждёт перед запуском следующей попытки опроса.
Возможные значения:
- Положительное целое число.
Значение по умолчанию: 10000.
s3queue_polling_backoff_ms
Определяет дополнительное время ожидания, добавляемое к предыдущему интервалу опроса при отсутствии новых файлов. Следующий опрос выполняется после истечения времени, равного сумме предыдущего интервала и этого значения backoff, либо по наступлении максимального интервала — в зависимости от того, что меньше.
Возможные значения:
- Положительное целое число.
Значение по умолчанию: 0.
s3queue_tracked_files_limit
Позволяет ограничить количество узлов ZooKeeper при использовании режима unordered; не влияет на режим ordered.
Если лимит достигнут, самые старые обработанные файлы будут удалены из узла ZooKeeper и обработаны повторно.
Возможные значения:
- Положительное целое число.
Значение по умолчанию: 1000.
s3queue_tracked_file_ttl_sec
Максимальное время в секундах для хранения обработанных файлов в узле ZooKeeper (по умолчанию хранятся бессрочно) в режиме unordered; не влияет на режим ordered.
По истечении указанного времени файл будет загружен повторно.
Возможные значения:
- Положительное целое число.
Значение по умолчанию: 0.
s3queue_cleanup_interval_min_ms
Для режима Ordered. Определяет минимальное значение интервала перепланирования фоновой задачи, которая отвечает за поддержание TTL отслеживаемых файлов и ограничения на их максимальное количество.
Значение по умолчанию: 10000.
s3queue_cleanup_interval_max_ms
Для режима Ordered. Определяет максимальный интервал между переназначениями фоновой задачи, которая отвечает за поддержание TTL отслеживаемых файлов и максимального набора отслеживаемых файлов.
Значение по умолчанию: 30000.
s3queue_buckets
Для режима Ordered. Доступно начиная с версии 24.6. Если существует несколько реплик таблицы S3Queue, каждая из которых работает с одним и тем же каталогом метаданных в keeper, значение s3queue_buckets должно быть не меньше количества реплик. Если также используется настройка s3queue_processing_threads, имеет смысл дополнительно увеличить значение s3queue_buckets, так как она определяет фактический уровень параллелизма обработки S3Queue.
use_persistent_processing_nodes
По умолчанию таблица S3Queue всегда использовала эфемерные узлы обработки, что могло приводить к дублированию данных в случае, если сессия ZooKeeper истекает до того, как S3Queue зафиксирует обработанные файлы в ZooKeeper, но после того, как обработка уже началась. Эта настройка заставляет сервер исключить возможность появления дубликатов при истечении сессии Keeper.
persistent_processing_nodes_ttl_seconds
В случае некорректного завершения работы сервера, если включён use_persistent_processing_nodes, могут остаться неудалённые узлы обработки. Этот параметр определяет период времени, по истечении которого эти узлы обработки могут быть безопасно удалены.
Значение по умолчанию: 3600 (1 час).
Настройки S3
Движок поддерживает все настройки S3. Дополнительную информацию о настройках S3 см. здесь.
Ролевой доступ к S3
S3 Role-Based Access is available in the Scale and Enterprise plans. To upgrade, visit the plans page in the cloud console.
Движок таблицы s3Queue поддерживает ролевую модель доступа. См. документацию здесь с описанием шагов по настройке роли для доступа к вашему бакету.
После настройки роли значение roleARN можно передать через параметр extra_credentials, как показано ниже:
Упорядоченный режим (ordered) для S3Queue
Режим обработки S3Queue позволяет хранить меньше метаданных в ZooKeeper, но имеет ограничение: файлы, добавленные позже по времени, должны иметь имена, которые в алфавитно-цифровом порядке больше имён ранее добавленных файлов.
Режим ordered в S3Queue, так же как и unordered, поддерживает настройку (s3queue_)processing_threads_num (префикс s3queue_ является необязательным), которая позволяет управлять количеством потоков, обрабатывающих файлы S3 локально на сервере.
Кроме того, режим ordered вводит дополнительную настройку (s3queue_)buckets, которая означает «логические потоки». В распределённой конфигурации, когда есть несколько серверов с репликами таблиц S3Queue, эта настройка определяет количество единиц обработки. Например, каждый поток обработки на каждой реплике S3Queue будет пытаться захватить определённый bucket для обработки; каждый bucket сопоставляется определённым файлам по хэшу имени файла. Поэтому в распределённом сценарии настоятельно рекомендуется, чтобы значение настройки (s3queue_)buckets было как минимум равно количеству реплик или больше. Допустимо, если число бакетов больше количества реплик. Оптимальным будет сценарий, когда настройка (s3queue_)buckets равна произведению number_of_replicas и (s3queue_)processing_threads_num.
Использование настройки (s3queue_)processing_threads_num не рекомендуется до версии 24.6.
Настройка (s3queue_)buckets доступна начиная с версии 24.6.
Описание
SELECT мало полезен для потокового импорта (кроме отладки), потому что каждый файл можно импортировать только один раз. Более практично создавать потоки в реальном времени с помощью материализованных представлений. Для этого:
- Используйте этот движок для создания таблицы, которая будет читать данные из указанного пути в S3 и рассматриваться как поток данных.
- Создайте таблицу с требуемой структурой.
- Создайте материализованное представление, которое преобразует данные из этого движка и помещает их в ранее созданную таблицу.
Когда MATERIALIZED VIEW подключено к этому движку, оно начинает собирать данные в фоновом режиме.
Пример:
Виртуальные столбцы
_path— Путь к файлу._file— Имя файла._size— Размер файла._time— Время создания файла.
Дополнительную информацию о виртуальных столбцах см. здесь.
Подстановочные символы в path
Аргумент path может задавать несколько файлов, используя подстановочные шаблоны в стиле bash. Чтобы файл был обработан, он должен существовать и полностью соответствовать шаблону пути. Перечень файлов определяется во время выполнения SELECT (а не в момент CREATE).
*— Заменяет любое количество любых символов, кроме/, включая пустую строку.**— Заменяет любое количество любых символов, включая/, включая пустую строку.?— Заменяет ровно один произвольный символ.{some_string,another_string,yet_another_one}— Заменяет любую из строк'some_string', 'another_string', 'yet_another_one'.{N..M}— Заменяет любое число в диапазоне от N до M включительно. N и M могут содержать ведущие нули, например000..078.
Конструкции с {} аналогичны табличной функции remote.
Ограничения
- Дубликаты строк могут возникать в результате:
-
во время парсинга происходит исключение в середине обработки файла, и включены повторные попытки через
s3queue_loading_retries; -
S3Queueнастроен на нескольких серверах, указывающих на один и тот же путь в ZooKeeper, и сессия Keeper завершается до того, как один из серверов успел зафиксировать обработанный файл, что может привести к тому, что другой сервер возьмет в обработку файл, который уже мог быть частично или полностью обработан первым сервером; однако это не актуально, начиная с версии 25.8, еслиuse_persistent_processing_nodes = 1. -
аварийного завершения работы сервера.
- Если
S3Queueнастроен на нескольких серверах, указывающих на один и тот же путь в ZooKeeper, и используется режимOrdered, тоs3queue_loading_retriesне будет работать. Это будет скоро исправлено.
Интроспекция
Для интроспекции используйте неперсистентную таблицу system.s3queue и персистентную таблицу system.s3queue_log.
system.s3queue. Эта таблица неперсистентная и отображает состояниеS3Queueв памяти: какие файлы в данный момент обрабатываются, какие файлы уже обработаны или завершились с ошибкой.
Пример:
system.s3queue_log. Персистентная таблица. Содержит ту же информацию, что иsystem.s3queue, но для файлов со статусамиprocessedиfailed.
Таблица имеет следующую структуру:
Чтобы использовать system.s3queue_log, задайте его конфигурацию в файле конфигурации сервера:
Пример: