Работая с BigQuery, мы привыкли, что отрабатывает SQL-запрос почти любой сложности. Самые крупные наши таблицы содержали 70–90 миллионов строк, и с такими объемами СУБД справлялась без проблем. Именно поэтому, не мудрствуя лукаво, мы обновляли свои витрины ежедневно целиком, то есть перерассчитывая заново тонны строк за прошлые периоды.
Перейдя на ClickHouse, мы сразу прочувствовали его отличия от BigQuery. В BQ кеши витрин мы обновляли запросами такого вида:
CREATE OR REPLACE TABLE project_name.dataset_name.datamart_cache
AS
SELECT * FROM project_name.dataset_name.datamart_view
где datamart_cache — кеш витрины данных, datamart_view — представление с логикой витрины.
Эта конструкция обновляет таблицу одномоментно и только после того, как представление успешно отработает.
В ClickHouse эту операцию приходится выполнять в два шага:
- Очистка таблицы кеша
TRUNCATE TABLE database_name.datamart_cache;
- Запись кеша
INSERT INTO database_name.datamart_cache
SELECT * FROM database_name.datamart_view;
Вставка строк производится асинхронно, то есть, пока запрос выполняется, таблица «в реальном времени» увеличивается. Но не блокируется на чтение, а значит, обратившись к ней до завершения вставки, мы увидим неполные данные.
Есть проблемы и с ограничениями по сложности и объему запросов. Джойны больших таблиц валят запросы или растягивают выполнение на часы. Увеличение оперативной памяти кластера (за которой в BQ вообще не нужно следить — там ресурсы на выполнение запроса выделяются автоматически) не всегда помогает и, конечно, стоит денег.
Частичное обновление по партициям
Первое, что приходит на ум для решения проблемы, — обновлять не всю таблицу, а только ту часть, которая изменяется. Самые объемные наши витрины представляют собой временные ряды. Данные в них если и меняются задним числом, то на определенный период в прошлое. Например, в течение месяца может уточняться статистика откруток пользователей на рекламных системах. Более ранние периоды можно считать неизменными, а значит, и незачем их трогать.
Кроме того, часть таблицы требуется обновить синхронно, то есть мгновенно. Для этого используем механизм партиционирования таблиц.
Прежде всего таблицы кешей витрин должны быть партиционированы по полю, которое наиболее часто используется в фильтрах при обращении к данной таблице. Например, месяц или дата в таблице с транзакциями. Код для создания партиционированной таблицы:
CREATE OR REPLACE TABLE database_name.datamart_name
ON CLUSTER '{cluster}'
(
id Int32,
period String,
user_id Int32,
price Float64,
date_paid Date
)
ENGINE = MergeTree()
PARTITION BY period
ORDER BY date_paid;
Далее в представлении соответствующей витрины следует выставить фильтры так, чтобы при обращении к представлению выводились только строки, входящие в требуемую партицию. Например, при партиционировании по месяцу транзакции добавляем в представление условие равенства периода текущему месяцу (вида YYYY-MM):
WHERE period = LEFT(toString(DATE_TRUNC('month', CURRENT_DATE())), 7)
Теперь записываем результат выполнения представления в буферную таблицу. Она будет содержать только текущий месяц и на порядки меньше строк, чем конечный кеш витрины.
TRUNCATE TABLE database_name.datamart_update;
INSERT INTO database_name.datamart_update
SELECT * FROM database_name.datamart_view;
Наконец, с помощью операции REPLACE PARTITION заменим в кеше витрины требующую обновления партицию на вновь вычисленную.
ALTER TABLE database_name.datamart_cache
REPLACE PARTITION '2024-10'
FROM database_name.datamart_update;
К сожалению, за раз можно обновить только одну партицию и при этом нужно явно указать ее имя (то есть значение поля, по которому партиционированы обе таблицы). Если мы хотим обновить несколько партиций, можно организовать выполнение нескольких таких запросов в цикле — например, через даг Airflow. Таск обновления в таком даге может выглядеть примерно так:
ch_load_money_fact = ClickhouseSqlOperator(
task_id='refresh_datamart',
executor_config=config.K8S_EXECUTOR_CONFIG,
clickhouse_conn_id='clickhouse',
sql='dag_refresh/sql/refresh_datamart.sql',
params={
'period': datetime.datetime.now().strftime('%Y-%m'),
},
dag=dag
)
где ClickhouseSqlOperator — оператор выполнения запроса в ClickHouse, а refresh_datamart.sql — файл, содержащий все запросы из вышеописанных шагов, имеющий следующее содержимое:
1. Обновляем буферную таблицу с последним периодом:
TRUNCATE TABLE database_name.datamart_update;
INSERT INTO database_name.datamart_update
SELECT * FROM database_name.datamart_view;
2. Заменяем в конечной таблице партицию последнего месяца:
ALTER TABLE database_name.datamart_cache
REPLACE PARTITION '{{ params['period'] }}'
FROM database_name.datamart_update;
Резюме
Частичное обновление таблиц в ClickHouse позволяет существенно сократить потребление ресурсов и время исполнения. Такое обновление можно делать путем замены текущей партиции на вновь вычисленную. Для этого:
- Рассчитываем обновляемую партицию запросом, ограниченным нужным периодом.
- Сохраняем результат в буферную таблицу.
- Заменяем в целевой таблице обновляемую партицию на такую же из буферной таблицы.