⚡️Clickhouse-notes
.
Развиваем наши знания по матвьюхам:
- в первых сериях мы настроили парсинг данных (из "сырого" json выбираем нужные поля и складываем в отдельную таблицу cdc.openweathermap)
.
Теперь собираем дневную статистику:
- мин\макс\сред для температуры, давления, влажности + кол-во записей.
Решаем задачу аналогичным путем: используем materialized view, только теперь будут нюансы:
- приемной таблицы не будет (но это не точно 😉), то есть нет секции TO ....
- определим параметры MV при её создании (Engine, PARTITION BY, ....)
- используем ключевое слово POPULATE для применения MV ко всем существующим данным ( ☝️будьте аккуратны с ним, если в вашей таблице 100500млн строк - может быть ресурсозатратно).
.
Составляем первую часть запроса - создание и параметры MV:
.
🔹ENGINE = SummingMergeTree - специальный движок для агрегаций (count\max\min\avg)
На самом деле существует только один общий движок
🔹PARTITION BY toStartOfWeek(dated_at) - партиционирование MV по неделям
🔹POPULATE - данных в таблице не так много -> применим MV ко всем существующим данным
.
Вторая часть DDL MV это запрос SELECT с нужной нам агрегацией данных, например, он мог бы быть таким:
.
Но так получится сделать только для завершенных дней, но для дней данные по которым приходят наш пайплайн должен обновлять статистику, чтобы этого достичь необходимо хранить промежуточные данные (например, для вычисления среднего необходимо хранить сумму и кол-во значений) - такое состояние называется
.
Для получения данных запрос выглядит так:
- запрашиваем данные из MV
- выполняем теже агрегации
- постфикс Merge
И вишенка на торте 🍒, чтобы скрыть от пользователя данную логику, необходимо обернуть Merge запрос в VIEW, например,
🧐 А где же хранятся промежуточные данные, вы могли бы спросить? MV не хранит данные, при таком DDL под капотом создаётся внутренняя таблицы, в нашем случае
#clickhouse #mv
MATERIALIZED VIEW - 2.
Развиваем наши знания по матвьюхам:
- в первых сериях мы настроили парсинг данных (из "сырого" json выбираем нужные поля и складываем в отдельную таблицу cdc.openweathermap)
.
Теперь собираем дневную статистику:
- мин\макс\сред для температуры, давления, влажности + кол-во записей.
Решаем задачу аналогичным путем: используем materialized view, только теперь будут нюансы:
- приемной таблицы не будет (но это не точно 😉), то есть нет секции TO ....
- определим параметры MV при её создании (Engine, PARTITION BY, ....)
- используем ключевое слово POPULATE для применения MV ко всем существующим данным ( ☝️будьте аккуратны с ним, если в вашей таблице 100500млн строк - может быть ресурсозатратно).
.
Составляем первую часть запроса - создание и параметры MV:
CREATE MATERIALIZED VIEW odm.openweathermap_daily_stats_mv
ENGINE = SummingMergeTree
PARTITION BY toStartOfWeek(dated_at)
ORDER BY (dated_at)
POPULATE
AS
...
.
🔹ENGINE = SummingMergeTree - специальный движок для агрегаций (count\max\min\avg)
На самом деле существует только один общий движок
AggregatingMergeTree - это мы увидим в DDL который сохранится в базе, просто его синтаксис чуть сложнее и для простый агрегаций есть такой алиас SummingMergeTree🔹PARTITION BY toStartOfWeek(dated_at) - партиционирование MV по неделям
🔹POPULATE - данных в таблице не так много -> применим MV ко всем существующим данным
.
Вторая часть DDL MV это запрос SELECT с нужной нам агрегацией данных, например, он мог бы быть таким:
SELECT toDate(record_timestamp) AS dated_at,
count(1) AS record_counts,
min(temp) AS min_temp,
max(temp) AS max_temp,
avg(temp) AS avg_temp,
...
FROM cdc.openweathermap o
GROUP BY dated_at
.
Но так получится сделать только для завершенных дней, но для дней данные по которым приходят наш пайплайн должен обновлять статистику, чтобы этого достичь необходимо хранить промежуточные данные (например, для вычисления среднего необходимо хранить сумму и кол-во значений) - такое состояние называется
State и определяется как: фукнция агрегации + постфикс State, например, avg -> avgState. Данные состояния и хранятся в базе, а в момент запроса пользователя необходимо рассчитать конечный результат ("смержить"), для этого используется постфикс Merge:avg -> avgState -> avgMerge. Merge функция применяется уже к полям MV (или таблицы, в которую складывается результат). Итого запрос DDL для MV выглядит так:CREATE MATERIALIZED VIEW odm.openweathermap_daily_stats_mv
ENGINE = SummingMergeTree
PARTITION BY toStartOfWeek(dated_at)
ORDER BY (dated_at)
POPULATE
AS
SELECT toDate(record_timestamp) AS dated_at,
countState(1) AS record_counts,
minState(temp) AS min_temp,
maxState(temp) AS max_temp,
avgState(temp) AS avg_temp,
...
FROM cdc.openweathermap o
GROUP BY dated_at
.
Для получения данных запрос выглядит так:
- запрашиваем данные из MV
- выполняем теже агрегации
- постфикс Merge
SELECT dated_at,
countMerge(record_counts) AS record_counts,
minMerge(min_temp) AS min_temp,
maxMerge(max_temp) AS max_temp,
avgMerge(avg_temp) AS avg_temp,
...
FROM odm.openweathermap_daily_stats_mv
GROUP BY dated_at
И вишенка на торте 🍒, чтобы скрыть от пользователя данную логику, необходимо обернуть Merge запрос в VIEW, например,
odm.openweathermap_daily_stats_v. Полный код найдете в репо.🧐 А где же хранятся промежуточные данные, вы могли бы спросить? MV не хранит данные, при таком DDL под капотом создаётся внутренняя таблицы, в нашем случае
.inner_id.5bacf24b-9e5a-4874-95c4-5ad93eaacbde, в ней и лежат предагрегаты (скрины в репо и в комментах).#clickhouse #mv
🔥1
✅ O - Optimization
.
В далеком посте , который рассказывал про оптимизацию с помощью условий
всё уже было сказано и вот второй раз наступил на грабли (пока еще нет, но мог бы 🔨).
.
Также недавно прошел курс по GreenPlum, конспект можно найти тут. В числе прочего была рассмотрена тема оптимизации, где отмечено:
.
Тк много работаю с Clickhouse, я думал он не такой, он же быстрый🚀, но не тут-то было. В работе использую планировщик
-
В одном из запросов была такая конструкция:
.
Кажется, слишком много букв, переписал так:
И запрос начал выполняться чуть дольше, стоит отметить что таблица партицинирована по месяцам.
Вывод
Для второго
Итог: используйте планировщик, много интересного он может рассказать - ClickHouse Explain
#optimization #clickhouse
.
В далеком посте , который рассказывал про оптимизацию с помощью условий
WHEREвсё уже было сказано и вот второй раз наступил на грабли (пока еще нет, но мог бы 🔨).
.
Также недавно прошел курс по GreenPlum, конспект можно найти тут. В числе прочего была рассмотрена тема оптимизации, где отмечено:
Признаки неоптимального SQL-запроса:
- лишние сортировки
- distinct
- union вместо union all
- Условия join, содержащие неравенства или OR (тк будет использован Nested Loop )
- Неоптимальные фильтры по ключу партицирования (явное использование ключа без преобразований)
- Неоптимальные фильтры по полям, входящим в индексы:
-В Greenplum версии 6 индексы B-tree, построенные на значениях функции от значений полей, используются в запросах с фильтрами только при построении плана Postgres.
-При построении плана запроса GPORCA использование в фильтрах любых функций от полей, по которым построены индексы на таблицу, исключает использование индексов и приводит к неоптимальному выполнению запроса.
.
Тк много работаю с Clickhouse, я думал он не такой, он же быстрый🚀, но не тут-то было. В работе использую планировщик
explain у него есть ряд опций, например:-
estimate показывает сколько будет прочитано строк, засечек индекса и тдВ одном из запросов была такая конструкция:
WHERE datetime >= '2024-01-15' and datetime < toDate('2024-01-15') + INTERVAL '1 day'
.
Кажется, слишком много букв, переписал так:
WHERE toDate(datetime) = '2024-01-15'
И запрос начал выполняться чуть дольше, стоит отметить что таблица партицинирована по месяцам.
Вывод
explain estimate для первого случая rows = 24064519Для второго
rows = 312864901 (будет прочитана вся таблица 😱)Итог: используйте планировщик, много интересного он может рассказать - ClickHouse Explain
#optimization #clickhouse
🔥2
📔 Executable Jupyter Notebooks
.
Не редко (особенно в аналитических целях) разработка ведется в Jupyter Notebooks и несколько
больно переносить наработки в py-модули, в хотелось бы запустить что-то:
или
.
Может и нам так можно🧐
.
Возможностей не так много, но они есть:
- сохранить jupyter как скрипт
- библиотека papermill - вот оно наше сокровище, позволяет запускть jupyter notebooks, а также их параметризовать, ну все как мы любим
.
Для создания параметризированного notebook необходимо:
- указать ячейку с параметрами, например, в ячейке может быть код:
- указать, что ячейка = ячейка с параметрами: необходимо добавить cell tag = parameters к той ячейке, где располагаются переменные-параметры. papermill парсит их и устанавливает переданные вами значения. Где это находится смотри на скрине в комментариях или в репо.
- описать обработку параметров в коде (необязательно =))
- запустить jupyter notebook
.
Все артефакты найдете в репо
#papermill #jupyter #runasnoscript
.
Не редко (особенно в аналитических целях) разработка ведется в Jupyter Notebooks и несколько
больно переносить наработки в py-модули, в хотелось бы запустить что-то:
./my-notebook.ipynb или
run my-notebook.ipyb. Например, в Databricks так и сделали, notebook является единицей job\task и исполняется под капотом..
Может и нам так можно🧐
.
Возможностей не так много, но они есть:
- сохранить jupyter как скрипт
.py . При таком сохранении часть кода специфичная для notebooks может быть не выполнена, напримерdisplay или отдельно стоящий вывод: df.tail()- библиотека papermill - вот оно наше сокровище, позволяет запускть jupyter notebooks, а также их параметризовать, ну все как мы любим
.
Для создания параметризированного notebook необходимо:
- указать ячейку с параметрами, например, в ячейке может быть код:
# external params
period = "day"
output_file = "tmp.txt"
- указать, что ячейка = ячейка с параметрами: необходимо добавить cell tag = parameters к той ячейке, где располагаются переменные-параметры. papermill парсит их и устанавливает переданные вами значения. Где это находится смотри на скрине в комментариях или в репо.
- описать обработку параметров в коде (необязательно =))
- запустить jupyter notebook
papermill extract.ipynb -p period "week" -p output_file "weather-forecast.json" executed.ipynb --log-output --log-level INFO
.
Все артефакты найдете в репо
#papermill #jupyter #runasnoscript
🔥2
☝️ Последнее слово (но это не точно)
.
В предыдущих сериях по clickhouse-notes мы создали:
- таблицу сырья
- таблицу распарсенных данных и матвью, её наполняющую
- матвью с подневной статистикой и вьюху над ней
.
Дополним сет, данными последней точки: последние полученные данные. Например, нам пригодится для отображения
на дашборде последних значений.
.
Варианты есть различные, мы поступим так:
- создаём таблицу с движком ReplacingMergeTree,
- создаем MV для поиска этой последней записи,
.
Структура таблицы:
-
-
.
Полный код таблицы и MV в репе. А пока посмотрим на возможные артефакты:
- от СClickhouse у нас есть гарантия 100% схлопывания дублей
- это может произойти в любой момент времени (хоть сразу, после вставки дубля, хоть через полдня)
.
Что в такой ситуации делать:
- нативный метод - схлопнуть дубли в момент запроса, используя ключевое слово
.
Другие способы найдете в репе (сортировка и выборка 1 строки, использование
🫶 Скоро зафиналим какой-нибудь клевый дашборд
#clickhouse #mv #replacingmergetree
.
В предыдущих сериях по clickhouse-notes мы создали:
- таблицу сырья
- таблицу распарсенных данных и матвью, её наполняющую
- матвью с подневной статистикой и вьюху над ней
.
Дополним сет, данными последней точки: последние полученные данные. Например, нам пригодится для отображения
на дашборде последних значений.
.
Варианты есть различные, мы поступим так:
- создаём таблицу с движком ReplacingMergeTree,
odm.openweathermap_last_point - чтобы хранить только последнюю запись (но не всё так просто)- создаем MV для поиска этой последней записи,
odm.openweathermap_last_point_weather_mv.
Структура таблицы:
CREATE TABLE odm.openweathermap_last_point
(
last_ts DateTime,
temp Float64,
pressure Float64,
humidity Float64,
metric String
)
ENGINE = ReplacingMergeTree(last_ts)
ORDER BY metric
SETTINGS index_granularity = 8192
-
ReplacingMergeTree(last_ts) - оставляем только последнюю запись-
ORDER BY metric - синтетическое поле, по его совпадению и "схлопываются" дубли.
Полный код таблицы и MV в репе. А пока посмотрим на возможные артефакты:
- от СClickhouse у нас есть гарантия 100% схлопывания дублей
- это может произойти в любой момент времени (хоть сразу, после вставки дубля, хоть через полдня)
.
Что в такой ситуации делать:
- нативный метод - схлопнуть дубли в момент запроса, используя ключевое слово
FINALSELECT *
FROM odm.openweathermap_last_point
FINAL
FINAL - использование может быть очень накладно, если нужно схлопнуть довольно большой объем данных (не наш случай, поэтому мы им и воспользуемся). Чтобы скрыть от пользователя особенности технической реализации необходимо создать, например, odm.openweathermap_last_point_weather_v.
Другие способы найдете в репе (сортировка и выборка 1 строки, использование
argMax).🫶 Скоро зафиналим какой-нибудь клевый дашборд
#clickhouse #mv #replacingmergetree
👍2🔥2
🔊 Митап, митап, митап
.
С частью команды DWH, во-первых встретились, во-вторых, встречу провели с пользой: сходили на митап по Greenplum от ЯCloud.
.
1️⃣ В первой части Дмитрий Немчин из Tinkoff поведал тайны обслуживания GP. Презантация была скорее обзорной, тк интереснее было послушать про то как устроены их небольшие фреймворки по обслуживанию, а названия у них очень забавные:
- отстрел
- управление вьюхами и ролевой моделью gpACL, gpViews
- управление жизненным циклом данных (удаление, охлаждение, партиционирование) Guillotine
- мониторинг запросов, очередей, нагрузок - Inquisitor
.
Понравился процесс работы с песочницей: данные в ней живут 21 день, после дропаются без разговоров - очень хочется внедрить такой процесс у нас, так sandbox - маленькое болотце. Есть и другой подход, который базируется не на времени, а на пропорции между кол-вом запросов в песочницу и в прод, если доля песочницы первыщает то идем разбираться.
.
2️⃣ Во второй части, Кирилл рассказал о расширении Yezzey - позиционируется как охлаждение данных в s3, для организации гибридного хранения. Но по результатам тестом выглядит так, будто можно все данных хранить в s3, а мощности GP только для Compute использовать, ну и диски будут только для spill файлов😁
Эта часть нас и заинтересовала, тк есть большая потребность в охлаждении данных с последующим их использованием.
.
Ну и конечно, не обошлось без технических шуток. Номинация приз вечера получила шутка: Вы всё еще используете HEAP таблицы 😉
#gp #greenplum #охлаждение #meetup
.
С частью команды DWH, во-первых встретились, во-вторых, встречу провели с пользой: сходили на митап по Greenplum от ЯCloud.
.
1️⃣ В первой части Дмитрий Немчин из Tinkoff поведал тайны обслуживания GP. Презантация была скорее обзорной, тк интереснее было послушать про то как устроены их небольшие фреймворки по обслуживанию, а названия у них очень забавные:
- отстрел
плохишей по различным правилам - Raskolnokov- управление вьюхами и ролевой моделью gpACL, gpViews
- управление жизненным циклом данных (удаление, охлаждение, партиционирование) Guillotine
- мониторинг запросов, очередей, нагрузок - Inquisitor
.
Понравился процесс работы с песочницей: данные в ней живут 21 день, после дропаются без разговоров - очень хочется внедрить такой процесс у нас, так sandbox - маленькое болотце. Есть и другой подход, который базируется не на времени, а на пропорции между кол-вом запросов в песочницу и в прод, если доля песочницы первыщает то идем разбираться.
.
2️⃣ Во второй части, Кирилл рассказал о расширении Yezzey - позиционируется как охлаждение данных в s3, для организации гибридного хранения. Но по результатам тестом выглядит так, будто можно все данных хранить в s3, а мощности GP только для Compute использовать, ну и диски будут только для spill файлов😁
Эта часть нас и заинтересовала, тк есть большая потребность в охлаждении данных с последующим их использованием.
.
Ну и конечно, не обошлось без технических шуток. Номинация приз вечера получила шутка: Вы всё еще используете HEAP таблицы 😉
#gp #greenplum #охлаждение #meetup
👍3🔥2
⭐ Это заслуживает 5 звёзд ⭐
.
Когда приходится использовать нетривиальное (или наоброт тривиальное)
определение JSON или неJSON, наподобие такого:
.
То ожидаешь какого-то интересного кейса, но такого подвоха не ожидал. Одна строка ломает всё:
1 с
.
Когда приходится использовать нетривиальное (или наоброт тривиальное)
определение JSON или неJSON, наподобие такого:
CASE WHEN (json_answer ->> 'answer') LIKE '%{%' THEN json_object_keys((json_answer ->> 'answer')::json)
ELSE Null END
.
То ожидаешь какого-то интересного кейса, но такого подвоха не ожидал. Одна строка ломает всё:
1 с
трока, 1 строка из 2млн, Карл
#de #work #буднидатаинженера😢1
Как много мы пишем boilerplate code🤨
.
Стоит сказать, что иногда и прикольно его пописать😉 Но в рабочем потоке задач, хочется, чтобы рутинные
действия были атоматизированы, а особенно создание нового дага, проекта или чего-то подобного.
.
На помощь придет библиотека cookiecutter, наконец-то до неё добрался. И снова здесь магия шаблонизатора
.
Рекомендую ознакомиться с
- шаблоны при веб-разработке (
-
- cookiecutter - печеньки 🍪, обернутые в jinja2
.
Напиши, где ты еще встречал шаблонизацию при помощи jinja2👀
.
Процесс создания шаблона выглядит очень просто:
- описываем структуру
- добавляем необходимые файлы
- внутри файлов можно указать шаблонный код и подстановку нужные значений при вводе
.
Официальной доки обычно хватает, но мне потребовалась статья, в ней показано дерево папок при создании шаблона.
.
Пройдя по этому пути, у меня получился простой шаблон для создания дага (но как водиться, он решает 80%), также поэкспериментировал с шаблоном для DA ресерча. Найти примеры тут
.
На просторах github существует большое разнообразие шаблонов под различные нужды, мне приглянулись:
- Python Best Practices
- pyscaffoldext-dsproject
.
#template #cookiecutter #jinja2
.
Стоит сказать, что иногда и прикольно его пописать😉 Но в рабочем потоке задач, хочется, чтобы рутинные
действия были атоматизированы, а особенно создание нового дага, проекта или чего-то подобного.
.
На помощь придет библиотека cookiecutter, наконец-то до неё добрался. И снова здесь магия шаблонизатора
jinja2 😍.
Рекомендую ознакомиться с
jinja2, мне отлично помог краткий курс, тк шаблонизация встречается всё чаще и чаще:- шаблоны при веб-разработке (
Flask\Django)-
dbt = SQL over jinja2- cookiecutter - печеньки 🍪, обернутые в jinja2
.
Напиши, где ты еще встречал шаблонизацию при помощи jinja2👀
.
Процесс создания шаблона выглядит очень просто:
- описываем структуру
- добавляем необходимые файлы
- внутри файлов можно указать шаблонный код и подстановку нужные значений при вводе
.
Официальной доки обычно хватает, но мне потребовалась статья, в ней показано дерево папок при создании шаблона.
.
Пройдя по этому пути, у меня получился простой шаблон для создания дага (но как водиться, он решает 80%), также поэкспериментировал с шаблоном для DA ресерча. Найти примеры тут
.
На просторах github существует большое разнообразие шаблонов под различные нужды, мне приглянулись:
- Python Best Practices
- pyscaffoldext-dsproject
.
#template #cookiecutter #jinja2
❤1
Распараллель меня, если сможешь
.
TaskFlow и динамический маппинг тасок в AirFlow. Эти две возможности
также открыли для меня коллеги. Стоит отметить, что программирование дагов для меня
неPythonic way - странный синтаксис, странная логика, как будто неPython внутри Python🤷♂️
.
Две вышеобозначенные возможности не добавляют питонячности, хотя и можно найти какие-то сходства с
функциональным программированием.
.
TaskFlow - набор удобных декораторов для создания дагов, тасок, как Python-функций. Теперь, вместо:
Можно написать:
.
Красивый код получается, когда используются подходы в чистом виде: или классический или декораторы, когда классический стиль и TaskFlow смешиваются получается каша 🥣. TaskFlow даёт одно очевидное преимущество: TaskFlow сам заботиться о перемещении данных между input\output tasks, как если бы вы использовали обычные функции - это очень круто🤘 Пример ниже:
.
Второй интересный паттерн: динамический маппинг тасок. Сегодня у тебя 1 хост, а завтра 7. Сегодня нужно 3 таски за разные дни, а завтра 8. Чтобы на зависеть от неизвестных входящих параметров следует брать на заметку динамический маппинг. Реализуется он довольно просто: у каждого оператора (таска) есть методы, например,
1️⃣ Генерим несколько дат и выполняем etl за каждую
2️⃣ На s3 N файлов, нужно однотипно все обработать или просто загрузить
.
Ранее такое приходилось реализовывать через циклы, а начиная с версии 2.3 у нас есть удобная возможность - используйте💪
.
Самое интересное, что маппить можно не только 1 -> N, но и Nin -> Nout, то есть рельтута одного маппинга подавать на вход другого и работать это будет именно так как ожидается: не сначала N первых тасок, а после N последующих. Таски свяжутся в последовательные кусочки: N1 -> N1, N2 -> N2 и тд 🔥
Набросал небольшой пример, как можно работать с API при помощи динамического маппинга - покрутите, экспериментируйте.
🔗 Links:
- TaskFlow
- Dynamic Task Mapping
- Create dynamic Airflow tasks (astronomer)
#airflow #taskflow #mapping
.
TaskFlow и динамический маппинг тасок в AirFlow. Эти две возможности
также открыли для меня коллеги. Стоит отметить, что программирование дагов для меня
неPythonic way - странный синтаксис, странная логика, как будто неPython внутри Python🤷♂️
.
Две вышеобозначенные возможности не добавляют питонячности, хотя и можно найти какие-то сходства с
функциональным программированием.
.
TaskFlow - набор удобных декораторов для создания дагов, тасок, как Python-функций. Теперь, вместо:
def etl():
pass
with Dag(...) as dag:
etl = Python(task_id="etl", python_callable=etl, ...., da=dag)
Можно написать:
@dag(....)
def create_dag():
@task
def etl():
pass
create_dag()
.
Красивый код получается, когда используются подходы в чистом виде: или классический или декораторы, когда классический стиль и TaskFlow смешиваются получается каша 🥣. TaskFlow даёт одно очевидное преимущество: TaskFlow сам заботиться о перемещении данных между input\output tasks, как если бы вы использовали обычные функции - это очень круто🤘 Пример ниже:
@task
def get_data_from_api() -> List[]:
...
return data
api_data = get_data_from_api()
@task
def etl(data: List[]) -> None:
# transfrom and save data
...
etl = etl(data=api_data)
.
Второй интересный паттерн: динамический маппинг тасок. Сегодня у тебя 1 хост, а завтра 7. Сегодня нужно 3 таски за разные дни, а завтра 8. Чтобы на зависеть от неизвестных входящих параметров следует брать на заметку динамический маппинг. Реализуется он довольно просто: у каждого оператора (таска) есть методы, например,
partial, expand, что это нам дает:1️⃣ Генерим несколько дат и выполняем etl за каждую
@task
def get_dates() -> List[str]:
return ['2024-02-01', '2024-02-04', '2024-02-03']
@task
def etl(dated_at: str) -> None:
# some code, which required from dated_at
dates = get_dates()
# метод expand - позволяет вызвать экземпляр таски для каждого элемента списка
etl = etl.expand(dated_at=dates)
2️⃣ На s3 N файлов, нужно однотипно все обработать или просто загрузить
@task
def find_s3_files() -> List[str]:
return [{'filepath': 's3://file_0.parquet'},
{'filepath': 's3://file_1.parquet'},
{'filepath': 's3://file_2.parquet'}]
files = find_s3_files()
# используем partial для задания аргументов одинаковых для каждой таски
# выполняем загрузку каждого файла отдельной таской
load = ClickhouseOperator.partial(
task_id="load",
connection_id="conn_id",
sql="""INSERT INTO stg.table SELECT * FROM s3('{{ params.filepath }})'""",
).expand(params=files)
.
Ранее такое приходилось реализовывать через циклы, а начиная с версии 2.3 у нас есть удобная возможность - используйте💪
.
Самое интересное, что маппить можно не только 1 -> N, но и Nin -> Nout, то есть рельтута одного маппинга подавать на вход другого и работать это будет именно так как ожидается: не сначала N первых тасок, а после N последующих. Таски свяжутся в последовательные кусочки: N1 -> N1, N2 -> N2 и тд 🔥
Набросал небольшой пример, как можно работать с API при помощи динамического маппинга - покрутите, экспериментируйте.
🔗 Links:
- TaskFlow
- Dynamic Task Mapping
- Create dynamic Airflow tasks (astronomer)
#airflow #taskflow #mapping
🔥4👍1
🎨 Сам себе Кандинский
.
Финалим серию постов про Clickhouse и наши разборки с погодой.
.
📒Рецепт приготовления:
- данные о погоде (вьюха со среднедневными данными + last_point)
- Metabase BI
.
Естественно, пришлось поработать напильником 🪚, тк Metabase из коробки не умеет ходить в Clickhouse, но
сообщество не дремлет и выкатили версию с его поддержкой.
.
Ссылку на версию Metabase + Clickhouse и внешний вид дашборда найдете в репе (или ниже).
.
🔗 Links:
- Metabase + clickhouse-driver
- Run Metabase in docker
- Get started in Metabase
Визуализация дело тонкое, проэтому если тебе нужно прокачать этот навык, то вот рекомендации:
- Бесплатный курс от DataYoga (Рома Бунин в создателях)
- Визуализация данных и введение в BI-инструменты от ЯПрактикум (Рома Бунин + Александр Богачёв - убийственное комбо) или можно почитать книгу Богачёва Графики, которые убеждают всех
.
#bi #clickhouse #dash #dashboard
.
Финалим серию постов про Clickhouse и наши разборки с погодой.
.
📒Рецепт приготовления:
- данные о погоде (вьюха со среднедневными данными + last_point)
- Metabase BI
.
Естественно, пришлось поработать напильником 🪚, тк Metabase из коробки не умеет ходить в Clickhouse, но
сообщество не дремлет и выкатили версию с его поддержкой.
.
Ссылку на версию Metabase + Clickhouse и внешний вид дашборда найдете в репе (или ниже).
.
🔗 Links:
- Metabase + clickhouse-driver
- Run Metabase in docker
- Get started in Metabase
Визуализация дело тонкое, проэтому если тебе нужно прокачать этот навык, то вот рекомендации:
- Бесплатный курс от DataYoga (Рома Бунин в создателях)
- Визуализация данных и введение в BI-инструменты от ЯПрактикум (Рома Бунин + Александр Богачёв - убийственное комбо) или можно почитать книгу Богачёва Графики, которые убеждают всех
.
#bi #clickhouse #dash #dashboard
👍4
💪 Прокачай себя до уровня PRO или полезные лайфхаки
.
Воды не будет, только те, конструкции, которые сам нашел и часто использую в Python+Pandas:
1️⃣ Pandas метод assign
Когда нужно создать столбцы на лету:
2️⃣ Pandas метод pipe
Как apply, только для всего датафрейма и принимает аргументы функции:
3️⃣ Структуры данных из collections
Если не список и не датафрейм, то точно defaultdict. Обычно использую для сбора результатов в цикле:
Кейсы:
- подготовка данных для визуализации (например, отдать по API)
- сбор статистики при выполнении сложных SQL запросов
- ...
4️⃣ Модуль itertools
-
-
-
5️⃣ Модуль functools - магия функционального программирования
-
Кейсы:
- ресерч методов или алгоритмов для решения одной и той же задачи
- отправка уведомлений только нужным адресам (чтобы каждый раз не прописывать их в аргументах)
6️⃣
.
И последнее (но это не точно): не забывайте заглядывать в доки, чтобы не городить монструозные преобразования, которые решаются специальным параметром. Помните, в казалось бы, простой функции pandas.read_csv - ~50 параметров, скорее всего ваш кейс уже там есть🙃
.
#python #lifehack
.
Воды не будет, только те, конструкции, которые сам нашел и часто использую в Python+Pandas:
1️⃣ Pandas метод assign
Когда нужно создать столбцы на лету:
import pandas as pd
df.assign(column_one = lambda row: row["existing_column"] // 1024,
column_two = 24,
....)
2️⃣ Pandas метод pipe
Как apply, только для всего датафрейма и принимает аргументы функции:
import numpy as np
import pandas as pd
def calculate(df: pd.DataFrame, thr: float = 2.56, columns: list = []):
for col in columns:
df[f"{col}_transformed"] = np.sqrt(df[col]) > 2.56
return df
df.pipe(calculate, thr=5, columns=["value_1", "value_2"])
3️⃣ Структуры данных из collections
Если не список и не датафрейм, то точно defaultdict. Обычно использую для сбора результатов в цикле:
from collections import defaultdict
res = defaultdict(list) # общая структура словарь, где для каждого ключа значением по-умолчанию будет пустой список
for idx, value enumerate([123, 999, 678]):
res["idx"].append(idx)
res["value"].append(value**2)
res["value"] # это список =)
Кейсы:
- подготовка данных для визуализации (например, отдать по API)
- сбор статистики при выполнении сложных SQL запросов
- ...
4️⃣ Модуль itertools
-
batched открыл для себя, когда сам сначала написал такую штуку (немножко изобрел 🚲)-
combinations - итератор комбинаций из элементов списка-
zip_longest - "длинный" брат zip (если не знал, то zip проходится по самому короткому из списков)5️⃣ Модуль functools - магия функционального программирования
-
partial - кажется, это самая часто используемая функцию из модуля (ну может после lru_cache())import typing
import pandas as pd
import numpy as np
def func(df: pd.DataFrame, columns: list, method: t.Callable = np.mean, window_width: int = 2):
assert window_width == 2, "Ширина окна должна быть 2+!"
for col in columns:
df[f"{col}_rolling"] = df[col].rolling(window_width, min_periods=2).apply(lambda window: method(window))
return df
func_mean = partial(func, columns=["value_1", "value_2"], method=np.median)
# теперь можно так, остальные параметры уже частично применились выше
func_mean(df=df)
Кейсы:
- ресерч методов или алгоритмов для решения одной и той же задачи
- отправка уведомлений только нужным адресам (чтобы каждый раз не прописывать их в аргументах)
6️⃣
reduce - применяет функцию из 2 аргументов итерабельно к списку, на выходе 1 значениеl = [0, 1, -5, 7, 8]
# x - первый элемент списка, или результат применения функции к текущему элементу, y - текущий элемент
res = reduce(lambda x, y: x+y, l)
print(res) # 11
.
И последнее (но это не точно): не забывайте заглядывать в доки, чтобы не городить монструозные преобразования, которые решаются специальным параметром. Помните, в казалось бы, простой функции pandas.read_csv - ~50 параметров, скорее всего ваш кейс уже там есть🙃
.
#python #lifehack
👍4🔥1
🤘 Collab Practice
.
За время ревьюверства на курсах по Анализу данных и Data Science накопилось небольшое кол-во
Collab-ноутбуков с полезными штуками.
.
Делюсь, может кому-то пригодится:
1️⃣ Пропуски и их заполнение
2️⃣ Как не выстрелить себе в ногу при приведении типов данных
3️⃣ xticks или как самому накинуть на ось Х
4️⃣ Категоризация данных
.
Любые замечания или дополнения принимаются 🤝
#python #collab #practice
.
За время ревьюверства на курсах по Анализу данных и Data Science накопилось небольшое кол-во
Collab-ноутбуков с полезными штуками.
.
Делюсь, может кому-то пригодится:
1️⃣ Пропуски и их заполнение
2️⃣ Как не выстрелить себе в ногу при приведении типов данных
3️⃣ xticks или как самому накинуть на ось Х
4️⃣ Категоризация данных
.
Любые замечания или дополнения принимаются 🤝
#python #collab #practice
🔥7
Выходим из затишья
,
Почти теория: чем выше рабочая нагрузка тем меньше постов в блогах =)
Станем разрушать эту теорию.
,
За время затишья удалось:
- погрузиться в методологию DV2.0 (datavault) ✊
- разобраться в рабочих фреймворках 👌
- написать свои фреймворки 💪
- стать teamlead (всё только началось)
.
Обо всем поговорим - планируется несколько серий постов, уххх 📒 А сейчас о нашем быстром друге Clichouse.
Парочка рабочих кейсов:
- проблема inodes
Что такое inodes: wiki, по-простому, файлы с метаинформацией и как оказалось их кол-во,
хоть и большое, но ограничено. И вот если вставлять очень активно и много, то они могут закончиться и тогда кластер переходит в read-only. Не слабо такое хватануть в понедельник утром, проблему решала поддержка YaCloud
- массовые вставки и отъезд по TTL
Хранение в клике может быть разным:
- на дисках кластера
- на дисках кластера и на внешних дисках (один из вариантов внутренний object_storage), почитать в доке - это гибридное хранилище
.
TTL - это возможность управлять жизненным циклом данных, возможности достаточно интересные, но сейчас нас интересует возможность
изменять тип хранения, то есть по достижении определенного момента времени хранение данных должно измениться, local -> object_storage.
При совокупности некоторых параметров и кейсов может наступить забавная ситуация:
- бекфилл данных
- данные настолько далеко в истории, что сразу наступает TTL
.
Что происходит: вставка идет на локальный диск и сразу данные отъезжают в object_storage и если данных много - получается два очень активных процесса. И это бомба замедленного действия, которая стреляет в ногу = поврежденные файлы, которые ломают процесс отъезда даннных -> накопление данных на локальных дисках -> место заканчивается -> read-only. Положение спасала поддержка YaCloud, так что
не создавайте таких массовых процессов 😉
.
Вот такие рабочие кейсы, до встречи🖐
.
#work #clickhouse
,
Почти теория: чем выше рабочая нагрузка тем меньше постов в блогах =)
Станем разрушать эту теорию.
,
За время затишья удалось:
- погрузиться в методологию DV2.0 (datavault) ✊
- разобраться в рабочих фреймворках 👌
- написать свои фреймворки 💪
- стать teamlead (всё только началось)
.
Обо всем поговорим - планируется несколько серий постов, уххх 📒 А сейчас о нашем быстром друге Clichouse.
Парочка рабочих кейсов:
- проблема inodes
Что такое inodes: wiki, по-простому, файлы с метаинформацией и как оказалось их кол-во,
хоть и большое, но ограничено. И вот если вставлять очень активно и много, то они могут закончиться и тогда кластер переходит в read-only. Не слабо такое хватануть в понедельник утром, проблему решала поддержка YaCloud
- массовые вставки и отъезд по TTL
Хранение в клике может быть разным:
- на дисках кластера
- на дисках кластера и на внешних дисках (один из вариантов внутренний object_storage), почитать в доке - это гибридное хранилище
.
TTL - это возможность управлять жизненным циклом данных, возможности достаточно интересные, но сейчас нас интересует возможность
изменять тип хранения, то есть по достижении определенного момента времени хранение данных должно измениться, local -> object_storage.
При совокупности некоторых параметров и кейсов может наступить забавная ситуация:
- бекфилл данных
- данные настолько далеко в истории, что сразу наступает TTL
.
Что происходит: вставка идет на локальный диск и сразу данные отъезжают в object_storage и если данных много - получается два очень активных процесса. И это бомба замедленного действия, которая стреляет в ногу = поврежденные файлы, которые ломают процесс отъезда даннных -> накопление данных на локальных дисках -> место заканчивается -> read-only. Положение спасала поддержка YaCloud, так что
не создавайте таких массовых процессов 😉
.
Вот такие рабочие кейсы, до встречи🖐
.
#work #clickhouse
🔥4👍3
FrameWorkStory1.0
.
Однажды был участником дискуссии: фреймворк или библиотека - одно и тоже или есть разница🙈. На тот момент объяснить оппоненту не удалось разницу, кажется, у меня не было правильных слов.
Обратимся к определению, на мой вкус эта часть ближе всего описывает что такое фреймворк:
программное обеспечение, облегчающее разработку и объединение разных компонентов большого программного проекта
.
Фреймворк объединяет строительные блоки, таким образом, что они подходят друг к другу. Блоками могут быть как раз библиотеки. Но что важнее фреймворк диктует правила построения архитектуры приложения, задавая на начальном этапе разработки поведение по умолчанию — «каркас», который нужно будет расширять и изменять согласно указанным требованиям.
.
Почему возникло желание:
- писать каждый раз шаблонный код дага для Airflow стало утомительно
- каждый из инженеров придумывает какие-то куски заново
- каждый переиспользует то, что сам видел или сам писал
- разнообразие функций с одинаковым функционалом
- поддержка кучи кастомных процессов
- ...
.
Почему вообще возникает множество кейсов:
- разные источники (API, S3, базы данных и др)
- API бывают сильно разные, как минимум по типу возвращаемого результата (json, csv, excel)
- формат и структура хранения данных во внешних S3 разнятся (однажды смежная команда разработчиков сказала, что они не знают что такое parquet, поэтому выгрузка в csv)
.
Что хочется:
1. Процесс доставки данных до dwh должен быть унифицированным (это L в ETL), например, в dwh стандартно загружаются файлы parquet со сжатием gzip. То есть всё интеграции должны быть приведены к этому формату
2. Процесс извлечения данных должен быть сведён к разработке только специфических адаптеров, например, клиент для работы с API яметрики или API ВыгрузиМеняПозже будут разными, но отдавать в какое-то единое место будут только parquet со сжатием gzip.
3. Создание дагов должно быть автоматизировано (соблюдай принцип DRY)
4. Структура Дага должна быть унифицирована
5. Управление из единого места
6. Сохранять исходники данные
.
Задачка масштабная, решать всё и сразу не получится, поэтому действуем от кейсов, в моем кармане было 2 штуки:
- загрузка csv из внешнего API
- выгрузка из API (API отдаёт json)
.
В результате нескольких подходов вышла такая архитектура:
- yml всему голова ( в yml описываются всё необходимые части пайплайна, как без боли читать yml это отдельная серия)
- конечной точкой процесса является S3
- два слоя: raw (данные as is), ods (gzip.parquet, партицирование по дням в формате YYYY-MM-DD)
- структура пайплайна совпадает с классикой etl (каждая буковка это таска) :
E = extractor ( возвращает коллекцию объектов ExtractorResourse, может быть только один)
T = transformers ( принимает Resourse, преобразует и возвращает TransformResourse, трансформеров может сколько угодно)
L = S = saver (принимает TransformResourse, сохраняет, может быть только один) по факту он неизменен, тк выходной формат стандартизирован и сохранение на S3
.
Зерно в ваши головы закинул, остальное будет дальше😉
А пока расскажите:
- а как вам идея
- получится ли все автоматизировать
- и есть ли у вас примеры такой автоматизации/фреймворков
.
ps: у меня есть пример API с которым будут эксперименты, но можно в меня позакидывать что-то оригинальное😜
.
#framework #automate
.
Однажды был участником дискуссии: фреймворк или библиотека - одно и тоже или есть разница🙈. На тот момент объяснить оппоненту не удалось разницу, кажется, у меня не было правильных слов.
Обратимся к определению, на мой вкус эта часть ближе всего описывает что такое фреймворк:
программное обеспечение, облегчающее разработку и объединение разных компонентов большого программного проекта
.
Фреймворк объединяет строительные блоки, таким образом, что они подходят друг к другу. Блоками могут быть как раз библиотеки. Но что важнее фреймворк диктует правила построения архитектуры приложения, задавая на начальном этапе разработки поведение по умолчанию — «каркас», который нужно будет расширять и изменять согласно указанным требованиям.
.
Почему возникло желание:
- писать каждый раз шаблонный код дага для Airflow стало утомительно
- каждый из инженеров придумывает какие-то куски заново
- каждый переиспользует то, что сам видел или сам писал
- разнообразие функций с одинаковым функционалом
- поддержка кучи кастомных процессов
- ...
.
Почему вообще возникает множество кейсов:
- разные источники (API, S3, базы данных и др)
- API бывают сильно разные, как минимум по типу возвращаемого результата (json, csv, excel)
- формат и структура хранения данных во внешних S3 разнятся (однажды смежная команда разработчиков сказала, что они не знают что такое parquet, поэтому выгрузка в csv)
.
Что хочется:
1. Процесс доставки данных до dwh должен быть унифицированным (это L в ETL), например, в dwh стандартно загружаются файлы parquet со сжатием gzip. То есть всё интеграции должны быть приведены к этому формату
2. Процесс извлечения данных должен быть сведён к разработке только специфических адаптеров, например, клиент для работы с API яметрики или API ВыгрузиМеняПозже будут разными, но отдавать в какое-то единое место будут только parquet со сжатием gzip.
3. Создание дагов должно быть автоматизировано (соблюдай принцип DRY)
4. Структура Дага должна быть унифицирована
5. Управление из единого места
6. Сохранять исходники данные
.
Задачка масштабная, решать всё и сразу не получится, поэтому действуем от кейсов, в моем кармане было 2 штуки:
- загрузка csv из внешнего API
- выгрузка из API (API отдаёт json)
.
В результате нескольких подходов вышла такая архитектура:
- yml всему голова ( в yml описываются всё необходимые части пайплайна, как без боли читать yml это отдельная серия)
- конечной точкой процесса является S3
- два слоя: raw (данные as is), ods (gzip.parquet, партицирование по дням в формате YYYY-MM-DD)
- структура пайплайна совпадает с классикой etl (каждая буковка это таска) :
E = extractor ( возвращает коллекцию объектов ExtractorResourse, может быть только один)
T = transformers ( принимает Resourse, преобразует и возвращает TransformResourse, трансформеров может сколько угодно)
L = S = saver (принимает TransformResourse, сохраняет, может быть только один) по факту он неизменен, тк выходной формат стандартизирован и сохранение на S3
.
Зерно в ваши головы закинул, остальное будет дальше😉
А пока расскажите:
- а как вам идея
- получится ли все автоматизировать
- и есть ли у вас примеры такой автоматизации/фреймворков
.
ps: у меня есть пример API с которым будут эксперименты, но можно в меня позакидывать что-то оригинальное😜
.
#framework #automate
👍1🔥1
WALLE
Заботливо дадим красивое имя фреймворку
- он строит фундамент DWH ( всё отгружаем на S3)
- построение фундамента - самая грязная работа (не считая подготовки данных от RAW до STAGE слоя)
- да, это просто красиво 😍 (не называть же его YetAnotherDagGenerator или FrameWork №Х)
К предыдущему посту вопросиков накидали:
1.
Автоматизируем 80% рутинной работы и оставим место нетривиальным кейсам (чтобы инженеру было где поразмять извилины 🤯)
2.
я такого не видел. Даже с точки зрения устаревания какого-то процесса, при автоматизации удобнее: убрал yml в архив и всё.
В общем тут посыл такой: если есть любой код - ему нужен рефактор
3.
Очень не нравиться копаться в чужом коде, разбирая, что же там сломалось (хех, можно заметить, что если автоматизацию писал не ты, то это тоже чудой код =))
4.
Честно, очень бы хотелось, но пока тестирование только в dev среде или на персональном стенде.
Общий посыл автоматизации такой:
#walle #framework #automate
Заботливо дадим красивое имя фреймворку
Walle, почему так:- он строит фундамент DWH ( всё отгружаем на S3)
- построение фундамента - самая грязная работа (не считая подготовки данных от RAW до STAGE слоя)
- да, это просто красиво 😍 (не называть же его YetAnotherDagGenerator или FrameWork №Х)
К предыдущему посту вопросиков накидали:
А проблемы при этом возникают, что твоя автоматизация не умеет все, что надо, её надо дописывать,
тестить, рефакторить, иногда не работает… и это ради 10 строчек кода?
1.
автоматизация не умеет всё - справедливо, но всё и не надо. Применяем правило Парето: Автоматизируем 80% рутинной работы и оставим место нетривиальным кейсам (чтобы инженеру было где поразмять извилины 🤯)
2.
рефакторить - Отвечу вопросом на вопрос: а неавтоматизированные даги рефакторить не надо? Написал один раз и забыл, нетя такого не видел. Даже с точки зрения устаревания какого-то процесса, при автоматизации удобнее: убрал yml в архив и всё.
В общем тут посыл такой: если есть любой код - ему нужен рефактор
3.
иногда не работает - справедливо, сделаю отсылку к п2: любой код может не работать =). Также вижу пользу: если не работает то, для всего - нет специфических проблем.Очень не нравиться копаться в чужом коде, разбирая, что же там сломалось (хех, можно заметить, что если автоматизацию писал не ты, то это тоже чудой код =))
4.
тестить - пффф, какие тесты. *уяк - *уяк и в продакшенЧестно, очень бы хотелось, но пока тестирование только в dev среде или на персональном стенде.
Общий посыл автоматизации такой:
Нет стремления покрыть 100% кейсов, есть желание написать код, в котором в одних и тех местах вызываются коннекшены, они имеют
одинаковые названия, код переиспользуется (для алертинга используется 1 и только 1 функция или стандартный набор, а не 8 различных фукнций 🤦♂️),
таски\даги - имеют одинаковый нейминг и одинаковую структуру и тд...
#walle #framework #automate
❤2👍2🔥1
WALLE - Готовим yml
Во вступительном посте оговорили, что фундамент фреймворка - yml конфиг.
Надо научиться читать его и формировать некую стркутуру для управления фреймворком.
Реализуем базу:
- все конфиги храним в директории
- читаем директорию
- данные из yml храним в переменной, например:
-
Какие минимальные данные необходимы для формирования дага:
- dag_id
- расписание (но это не точно)
- дата старта\окончания (мы же хотим управлять процессом)
- catchup
- куда алертить (помним, что хотим единый алертинг: в нашем случае будет телеграмм, поэтому нужен channel_id + токен)
- общая информация (owner\tags\denoscription)
С одной стороны конфиг должен знать что-то про нашу интеграцию, с другой у него должно быть что-то общее (про даг и про задачу),
попробуем выделять независимые секции в конфиге под эти задачи. Начнем с такого:
ps: по API будем забирать топ реддитов за последний час (данных там копейки, десятки строк, но нам не так важно кол-во)
Сохраняем наш первый yml 🥳
.
Учимся читать
.
Ничего необычного, Python библиотека pyyaml уже всё умеет.
От нас достаточно открыть файл, прочитать содержимое как текст и далее прогрузить через библиотеку
Добавил обработку ошибок
Проверяем работу реализованной функции (скрин в репе):
- реализуем функцию
- объект даг нужно положить в
- пусть даг имеет следующую структуру:
- 1 таск группа
- таска start (EmptyOperator)
- таска end
Мои маленькие дата инженеры - это подлежит самостоятельной реализации 😉. Мою реализацию посмотреть тут
.
Ну и плюс, код который будет читать все конфиги из директории
У меня получилась такая структура папок:
Загружаем в AirFlow, наслаждаемся картиной (она в скринах)
На этом самая простая часть закончилась🙃
#walle #framework #automate
Во вступительном посте оговорили, что фундамент фреймворка - yml конфиг.
Надо научиться читать его и формировать некую стркутуру для управления фреймворком.
Реализуем базу:
- все конфиги храним в директории
metadata- читаем директорию
metadata- данные из yml храним в переменной, например:
integration_meta-
integration_meta подается на вход функции создания дага create_dag(integration_meta)Какие минимальные данные необходимы для формирования дага:
- dag_id
- расписание (но это не точно)
- дата старта\окончания (мы же хотим управлять процессом)
- catchup
- куда алертить (помним, что хотим единый алертинг: в нашем случае будет телеграмм, поэтому нужен channel_id + токен)
- общая информация (owner\tags\denoscription)
С одной стороны конфиг должен знать что-то про нашу интеграцию, с другой у него должно быть что-то общее (про даг и про задачу),
попробуем выделять независимые секции в конфиге под эти задачи. Начнем с такого:
version: 2
models:
- name: reddit # имя интеграции, оно же dag_id
denoscription: Топ реддитов за последний час # описание интеграции, оно же dag_denoscription
dag:
# dag_id: "" # можно переопределить dag_id != name
schedule_interval: 0 * * * *
start_date: '2024-08-01'
# end_date: '2024-08-31'
catchup: False
alerting_chat_id: -987654321
alerting_secret_name: alerting_bot_token
owner: dwh
tags:
- dwh
- api_integration
ps: по API будем забирать топ реддитов за последний час (данных там копейки, десятки строк, но нам не так важно кол-во)
Сохраняем наш первый yml 🥳
.
Учимся читать
.
Ничего необычного, Python библиотека pyyaml уже всё умеет.
От нас достаточно открыть файл, прочитать содержимое как текст и далее прогрузить через библиотеку
pyyaml. Немного шаблонного кода:import yaml
def read_yaml(path_to_file: str):
with open(path_to_file, "r") as f_yaml:
try:
content = yaml.safe_load(f_yaml)
return content
except yaml.YAMLError as e:
print(e)
Добавил обработку ошибок
try\except - чуть-чуть защиты от сломанных файлов yml.Проверяем работу реализованной функции (скрин в репе):
{'models': [{'dag': {'alerting_chat_id': -987654321,
'alerting_secret_name': 'alerting_bot_token',
'catchup': False,
'owner': 'dwh',
'schedule_interval': '0 * * * *',
'start_date': '2024-08-01',
'tags': ['dwh', 'reddit', 'api_integration']},
'denoscription': 'Топ реддитов за последний час',
'name': 'reddit'}],
'version': 2}yaml.safe_load возвращает данные в виде python-словаря. Еще немного шаблонного кода для создания AirFlow дага и мы на финишной прямой:- реализуем функцию
create_dag, она должна возвращать объект DAG (from airflow import DAG)- объект даг нужно положить в
globals(), чтобы AirFlow смог узнать про наш даг- пусть даг имеет следующую структуру:
- 1 таск группа
- таска start (EmptyOperator)
- таска end
Мои маленькие дата инженеры - это подлежит самостоятельной реализации 😉. Мою реализацию посмотреть тут
.
Ну и плюс, код который будет читать все конфиги из директории
metadata (тут в помощь модуль из стандартной либы python os) и формировать даги. Дерзаем💪У меня получилась такая структура папок:
- src:
- /common:
- func.py
- /metadata:
- reddit.yml
- /yaml_reader:
- reader.py
- walle.py
Загружаем в AirFlow, наслаждаемся картиной (она в скринах)
На этом самая простая часть закончилась🙃
#walle #framework #automate
❤🔥2🔥1👏1
🧯 По следам бременских рабочих кейсов или ещё раз про ClickHouse
.
Помните решали вопрос с курсорами, чтобы никого не упасть. Упёрлись в другой нюанс, таска выполняется настолько долго, что heartbeat 💓не может прилететь, airflow со спокойной душой шлётвас fail_alert.
.
Тут было два варианта:
- сменить executor (был кубер)
- изменить интеграцию
.
Решено было идти вторым путём. Вариантов, если cdc интеграция невозможна, кот наплакал: к именованным курсорам уже перешли ( загрузка была стабильной, но долгой, ~2часов по самой большой сущности, да, сущность прибавила в весе х2, наречем её
.
Хотелки инженера:
- перейти к хранению снепшотов на s3 и соответственно загрузки из s3
- dbt
- единообразие
.
Из этого уже имеется: рабочий фреймворк для запуска dbt моделей -> единообразие, осталось складировать снепшоты на s3. Был стандартный путь: читаем табличку батчами, сохраняем в файлы на s3, но хотелось чего-то другого и простого (скажем честно, хотелось экспериментов 😎).
Так у нас же ClickHouse, подумал я, и почему бы не использовать его, как compute🧐
Входные данные:
- бек postgres
- табличная функция postgresql
- табличная функция s3 (можно не только читать но и писать)
.
Получается примерно так:
.
Очень просто и со вкусом, а что самое интересное работает это как молния ⚡ Для сущности
.
Далее дело за малым: сделать вьюху над файлом, модельку dbt... 👉
.
Ну чем не кайфота. Такую интеграцию нельзя назвать полноценной ( тк имеем только снепшот на текущий момент, нет делитов, апдейтов и др), но если это закрывает потребность заказчика, то почему нет.
.
Будут у вас вопросики к такому подходу?
.
#work #cases #clickhouse
.
Помните решали вопрос с курсорами, чтобы никого не упасть. Упёрлись в другой нюанс, таска выполняется настолько долго, что heartbeat 💓не может прилететь, airflow со спокойной душой шлёт
.
Тут было два варианта:
- сменить executor (был кубер)
- изменить интеграцию
.
Решено было идти вторым путём. Вариантов, если cdc интеграция невозможна, кот наплакал: к именованным курсорам уже перешли ( загрузка была стабильной, но долгой, ~2часов по самой большой сущности, да, сущность прибавила в весе х2, наречем её
L)..
Хотелки инженера:
- перейти к хранению снепшотов на s3 и соответственно загрузки из s3
- dbt
- единообразие
.
Из этого уже имеется: рабочий фреймворк для запуска dbt моделей -> единообразие, осталось складировать снепшоты на s3. Был стандартный путь: читаем табличку батчами, сохраняем в файлы на s3, но хотелось чего-то другого и простого (скажем честно, хотелось экспериментов 😎).
Так у нас же ClickHouse, подумал я, и почему бы не использовать его, как compute🧐
Входные данные:
- бек postgres
- табличная функция postgresql
- табличная функция s3 (можно не только читать но и писать)
.
Получается примерно так:
INSERT INTO s3('.../bucket/entity/date=2024-10-20/data.parquet')
SELECT * FROM postgresql(host, db, table, ...).
Очень просто и со вкусом, а что самое интересное работает это как молния ⚡ Для сущности
L время отработки запроса составляет ~15минут 🔥.
Далее дело за малым: сделать вьюху над файлом, модельку dbt... 👉
.
Ну чем не кайфота. Такую интеграцию нельзя назвать полноценной ( тк имеем только снепшот на текущий момент, нет делитов, апдейтов и др), но если это закрывает потребность заказчика, то почему нет.
.
Будут у вас вопросики к такому подходу?
.
#work #cases #clickhouse
👍4🔥2❤1