Перейти к основному содержанию
Перейти к основному содержанию

Создание роллапа на основе материализованных представлений для быстрой аналитики временных рядов

В этом руководстве показано, как поддерживать предварительно агрегированные сводки (rollup-таблицы) для высоконагруженной таблицы событий с помощью материализованных представлений. Вы создадите три объекта: «сырую» таблицу, rollup-таблицу и материализованное представление, которое будет автоматически записывать данные в rollup-таблицу.

Когда использовать этот шаблон

Используйте этот шаблон, если:

  • У вас есть поток событий с только добавлением (клики, просмотры страниц, IoT, логи).
  • Большинство запросов — это агрегации по временным диапазонам (по минутам/часам/дням).
  • Вам нужны стабильные чтения с задержкой менее секунды без повторного сканирования всех исходных строк.

Создание таблицы необработанных событий

CREATE TABLE events_raw
(
    event_time   DateTime,
    user_id      UInt64,
    country      LowCardinality(String),
    event_type   LowCardinality(String),
    value        Float64
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_time)
ORDER BY (event_time, user_id)
TTL event_time + INTERVAL 90 DAY DELETE

Примечания

  • PARTITION BY toYYYYMM(event_time) позволяет создавать небольшие партиции, которые легко удалять.
  • ORDER BY (event_time, user_id) поддерживает ограниченные по времени запросы + вторичную фильтрацию.
  • LowCardinality(String) позволяет экономить память для категориальных измерений.
  • TTL удаляет сырые данные по истечении 90 дней (настройте в соответствии с вашими требованиями к сроку хранения данных).

Проектирование таблицы свёртки (агрегированной)

Мы будем предварительно агрегировать данные с почасовой гранулярностью. Выберите гранулярность в соответствии с наиболее распространённым окном анализа.

CREATE TABLE events_rollup_1h
(
    bucket_start  DateTime,            -- начало часа
    country       LowCardinality(String),
    event_type    LowCardinality(String),
    users_uniq    AggregateFunction(uniqExact, UInt64),  -- уникальные пользователи
    value_sum     AggregateFunction(sum, Float64),       -- сумма значений
    value_avg     AggregateFunction(avg, Float64),       -- среднее значение
    events_count  AggregateFunction(count)               -- количество событий
)
ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMM(bucket_start)
ORDER BY (bucket_start, country, event_type)

Мы храним агрегатные состояния (например, AggregateFunction(sum, ...)), которые компактно представляют частичные агрегаты и могут быть объединены или завершены позже.

Создайте материализованное представление для заполнения сводной таблицы

Это материализованное представление срабатывает автоматически при вставке в events_raw и записывает агрегатные состояния в таблицу свёртки.

CREATE MATERIALIZED VIEW mv_events_rollup_1h
TO events_rollup_1h
AS
SELECT
    toStartOfHour(event_time) AS bucket_start,
    country,
    event_type,
    uniqExactState(user_id)   AS users_uniq,
    sumState(value)           AS value_sum,
    avgState(value)           AS value_avg,
    countState()              AS events_count
FROM events_raw
GROUP BY bucket_start, country, event_type;

Вставка тестовых данных

Вставьте тестовые данные:

INSERT INTO events_raw VALUES
    (now() - INTERVAL 4 SECOND, 101, 'US', 'view', 1),
    (now() - INTERVAL 3 SECOND, 101, 'US', 'click', 1),
    (now() - INTERVAL 2 SECOND, 202, 'DE', 'view', 1),
    (now() - INTERVAL 1 SECOND, 101, 'US', 'view', 1);

Запрос агрегированных данных

Вы можете либо объединять состояния при чтении, либо финализировать их:

SELECT
    bucket_start,
    country,
    event_type,
    uniqExactMerge(users_uniq) AS users,
    sumMerge(value_sum)        AS value_sum,
    avgMerge(value_avg)        AS value_avg,
    countMerge(events_count)   AS events
FROM events_rollup_1h
WHERE bucket_start >= now() - INTERVAL 1 DAY
GROUP BY ALL
ORDER BY bucket_start, country, event_type;

Совет

Если вы ожидаете, что запросы всегда будут обращаться к агрегированным данным, можно создать второе материализованное представление, которое записывает финализированные значения в «обычную» таблицу MergeTree с той же детализацией в 1 час. Состояния обеспечивают большую гибкость, тогда как финализированные значения упрощают чтение.

Фильтруйте по полям первичного ключа для оптимальной производительности

Используйте команду EXPLAIN, чтобы увидеть, как индекс применяется для отсечения данных:

EXPLAIN indexes=1
SELECT *
FROM events_rollup_1h
WHERE bucket_start BETWEEN now() - INTERVAL 3 DAY AND now()
  AND country = 'US';
┌─explain────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
    1.  │ Expression ((Project names + Projection))                                                                                          │
    2.  │   Expression                                                                                                                       │
    3.  │     ReadFromMergeTree (default.events_rollup_1h)                                                                                   │
    4.  │     Indexes:                                                                                                                       │
    5.  │       MinMax                                                                                                                       │
    6.  │         Keys:                                                                                                                      │
    7.  │           bucket_start                                                                                                             │
    8.  │         Condition: and((bucket_start in (-Inf, 1758550242]), (bucket_start in [1758291042, +Inf)))                                 │
    9.  │         Parts: 1/1                                                                                                                 │
    10. │         Granules: 1/1                                                                                                              │
    11. │       Partition                                                                                                                    │
    12. │         Keys:                                                                                                                      │
    13. │           toYYYYMM(bucket_start)                                                                                                   │
    14. │         Condition: and((toYYYYMM(bucket_start) in (-Inf, 202509]), (toYYYYMM(bucket_start) in [202509, +Inf)))                     │
    15. │         Parts: 1/1                                                                                                                 │
    16. │         Granules: 1/1                                                                                                              │
    17. │       PrimaryKey                                                                                                                   │
    18. │         Keys:                                                                                                                      │
    19. │           bucket_start                                                                                                             │
    20. │           country                                                                                                                  │
    21. │         Condition: and((country in ['US', 'US']), and((bucket_start in (-Inf, 1758550242]), (bucket_start in [1758291042, +Inf)))) │
    22. │         Parts: 1/1                                                                                                                 │
    23. │         Granules: 1/1                                                                                                              │
        └────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

В приведенном выше плане выполнения запроса используются три типа индексов: индекс MinMax, индекс партиций и индекс первичного ключа. Каждый индекс использует поля, указанные в первичном ключе: (bucket_start, country, event_type). Для достижения максимальной производительности фильтрации убедитесь, что ваши запросы используют поля первичного ключа для отсечения данных.

Распространённые варианты

  • Разные уровни агрегирования: добавьте дневной роллап:
CREATE TABLE events_rollup_1d
(
    bucket_start Date,
    country      LowCardinality(String),
    event_type   LowCardinality(String),
    users_uniq   AggregateFunction(uniqExact, UInt64),
    value_sum    AggregateFunction(sum, Float64),
    value_avg    AggregateFunction(avg, Float64),
    events_count AggregateFunction(count)
)
ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMM(bucket_start)
ORDER BY (bucket_start, country, event_type);

Затем второе материализованное представление:

CREATE MATERIALIZED VIEW mv_events_rollup_1d
TO events_rollup_1d
AS
SELECT
    toDate(event_time) AS bucket_start,
    country,
    event_type,
    uniqExactState(user_id),
    sumState(value),
    avgState(value),
    countState()
FROM events_raw
GROUP BY ALL;
  • Сжатие: применяйте кодеки к крупным столбцам (например, Codec(ZSTD(3))) в raw-таблице.
  • Контроль затрат: перенесите основную длительность хранения в сырую таблицу и поддерживайте долгоживущие агрегаты.
  • Заполнение задним числом (backfilling): при загрузке исторических данных вставляйте данные в events_raw, материализованное представление автоматически построит агрегаты (roll-ups). Для существующих строк используйте POPULATE при создании материализованного представления, если это уместно, или INSERT SELECT.

Очистка и хранение данных

  • Увеличьте TTL сырых данных (например, до 30/90 дней), но храните агрегированные данные дольше (например, до 1 года).
  • Вы также можете использовать TTL для перемещения старых частей данных в более дешёвое хранилище, если включено многоуровневое хранение.

Устранение неполадок

  • Материализованное представление не обновляется? Проверьте, что вставки выполняются в таблицу events_raw (а не в roll-up-таблицу) и что целевая таблица в определении материализованного представления указана корректно (TO events_rollup_1h).
  • Медленные запросы? Убедитесь, что они выполняются по rollup-таблице (выполните запрос напрямую к таблице rollup) и что временные фильтры соответствуют шагу агрегации этой rollup-таблицы.
  • Несоответствия при бэкфилле? Используйте SYSTEM FLUSH LOGS и проверьте system.query_log / system.parts, чтобы подтвердить, что вставки и слияния выполнены.