⭐ Это заслуживает 5 звёзд ⭐
.
Когда приходится использовать нетривиальное (или наоброт тривиальное)
определение JSON или неJSON, наподобие такого:
.
То ожидаешь какого-то интересного кейса, но такого подвоха не ожидал. Одна строка ломает всё:
1 с
.
Когда приходится использовать нетривиальное (или наоброт тривиальное)
определение JSON или неJSON, наподобие такого:
CASE WHEN (json_answer ->> 'answer') LIKE '%{%' THEN json_object_keys((json_answer ->> 'answer')::json)
ELSE Null END
.
То ожидаешь какого-то интересного кейса, но такого подвоха не ожидал. Одна строка ломает всё:
1 с
трока, 1 строка из 2млн, Карл
#de #work #буднидатаинженера😢1
Как много мы пишем boilerplate code🤨
.
Стоит сказать, что иногда и прикольно его пописать😉 Но в рабочем потоке задач, хочется, чтобы рутинные
действия были атоматизированы, а особенно создание нового дага, проекта или чего-то подобного.
.
На помощь придет библиотека cookiecutter, наконец-то до неё добрался. И снова здесь магия шаблонизатора
.
Рекомендую ознакомиться с
- шаблоны при веб-разработке (
-
- cookiecutter - печеньки 🍪, обернутые в jinja2
.
Напиши, где ты еще встречал шаблонизацию при помощи jinja2👀
.
Процесс создания шаблона выглядит очень просто:
- описываем структуру
- добавляем необходимые файлы
- внутри файлов можно указать шаблонный код и подстановку нужные значений при вводе
.
Официальной доки обычно хватает, но мне потребовалась статья, в ней показано дерево папок при создании шаблона.
.
Пройдя по этому пути, у меня получился простой шаблон для создания дага (но как водиться, он решает 80%), также поэкспериментировал с шаблоном для DA ресерча. Найти примеры тут
.
На просторах github существует большое разнообразие шаблонов под различные нужды, мне приглянулись:
- Python Best Practices
- pyscaffoldext-dsproject
.
#template #cookiecutter #jinja2
.
Стоит сказать, что иногда и прикольно его пописать😉 Но в рабочем потоке задач, хочется, чтобы рутинные
действия были атоматизированы, а особенно создание нового дага, проекта или чего-то подобного.
.
На помощь придет библиотека cookiecutter, наконец-то до неё добрался. И снова здесь магия шаблонизатора
jinja2 😍.
Рекомендую ознакомиться с
jinja2, мне отлично помог краткий курс, тк шаблонизация встречается всё чаще и чаще:- шаблоны при веб-разработке (
Flask\Django)-
dbt = SQL over jinja2- cookiecutter - печеньки 🍪, обернутые в jinja2
.
Напиши, где ты еще встречал шаблонизацию при помощи jinja2👀
.
Процесс создания шаблона выглядит очень просто:
- описываем структуру
- добавляем необходимые файлы
- внутри файлов можно указать шаблонный код и подстановку нужные значений при вводе
.
Официальной доки обычно хватает, но мне потребовалась статья, в ней показано дерево папок при создании шаблона.
.
Пройдя по этому пути, у меня получился простой шаблон для создания дага (но как водиться, он решает 80%), также поэкспериментировал с шаблоном для DA ресерча. Найти примеры тут
.
На просторах github существует большое разнообразие шаблонов под различные нужды, мне приглянулись:
- Python Best Practices
- pyscaffoldext-dsproject
.
#template #cookiecutter #jinja2
❤1
Распараллель меня, если сможешь
.
TaskFlow и динамический маппинг тасок в AirFlow. Эти две возможности
также открыли для меня коллеги. Стоит отметить, что программирование дагов для меня
неPythonic way - странный синтаксис, странная логика, как будто неPython внутри Python🤷♂️
.
Две вышеобозначенные возможности не добавляют питонячности, хотя и можно найти какие-то сходства с
функциональным программированием.
.
TaskFlow - набор удобных декораторов для создания дагов, тасок, как Python-функций. Теперь, вместо:
Можно написать:
.
Красивый код получается, когда используются подходы в чистом виде: или классический или декораторы, когда классический стиль и TaskFlow смешиваются получается каша 🥣. TaskFlow даёт одно очевидное преимущество: TaskFlow сам заботиться о перемещении данных между input\output tasks, как если бы вы использовали обычные функции - это очень круто🤘 Пример ниже:
.
Второй интересный паттерн: динамический маппинг тасок. Сегодня у тебя 1 хост, а завтра 7. Сегодня нужно 3 таски за разные дни, а завтра 8. Чтобы на зависеть от неизвестных входящих параметров следует брать на заметку динамический маппинг. Реализуется он довольно просто: у каждого оператора (таска) есть методы, например,
1️⃣ Генерим несколько дат и выполняем etl за каждую
2️⃣ На s3 N файлов, нужно однотипно все обработать или просто загрузить
.
Ранее такое приходилось реализовывать через циклы, а начиная с версии 2.3 у нас есть удобная возможность - используйте💪
.
Самое интересное, что маппить можно не только 1 -> N, но и Nin -> Nout, то есть рельтута одного маппинга подавать на вход другого и работать это будет именно так как ожидается: не сначала N первых тасок, а после N последующих. Таски свяжутся в последовательные кусочки: N1 -> N1, N2 -> N2 и тд 🔥
Набросал небольшой пример, как можно работать с API при помощи динамического маппинга - покрутите, экспериментируйте.
🔗 Links:
- TaskFlow
- Dynamic Task Mapping
- Create dynamic Airflow tasks (astronomer)
#airflow #taskflow #mapping
.
TaskFlow и динамический маппинг тасок в AirFlow. Эти две возможности
также открыли для меня коллеги. Стоит отметить, что программирование дагов для меня
неPythonic way - странный синтаксис, странная логика, как будто неPython внутри Python🤷♂️
.
Две вышеобозначенные возможности не добавляют питонячности, хотя и можно найти какие-то сходства с
функциональным программированием.
.
TaskFlow - набор удобных декораторов для создания дагов, тасок, как Python-функций. Теперь, вместо:
def etl():
pass
with Dag(...) as dag:
etl = Python(task_id="etl", python_callable=etl, ...., da=dag)
Можно написать:
@dag(....)
def create_dag():
@task
def etl():
pass
create_dag()
.
Красивый код получается, когда используются подходы в чистом виде: или классический или декораторы, когда классический стиль и TaskFlow смешиваются получается каша 🥣. TaskFlow даёт одно очевидное преимущество: TaskFlow сам заботиться о перемещении данных между input\output tasks, как если бы вы использовали обычные функции - это очень круто🤘 Пример ниже:
@task
def get_data_from_api() -> List[]:
...
return data
api_data = get_data_from_api()
@task
def etl(data: List[]) -> None:
# transfrom and save data
...
etl = etl(data=api_data)
.
Второй интересный паттерн: динамический маппинг тасок. Сегодня у тебя 1 хост, а завтра 7. Сегодня нужно 3 таски за разные дни, а завтра 8. Чтобы на зависеть от неизвестных входящих параметров следует брать на заметку динамический маппинг. Реализуется он довольно просто: у каждого оператора (таска) есть методы, например,
partial, expand, что это нам дает:1️⃣ Генерим несколько дат и выполняем etl за каждую
@task
def get_dates() -> List[str]:
return ['2024-02-01', '2024-02-04', '2024-02-03']
@task
def etl(dated_at: str) -> None:
# some code, which required from dated_at
dates = get_dates()
# метод expand - позволяет вызвать экземпляр таски для каждого элемента списка
etl = etl.expand(dated_at=dates)
2️⃣ На s3 N файлов, нужно однотипно все обработать или просто загрузить
@task
def find_s3_files() -> List[str]:
return [{'filepath': 's3://file_0.parquet'},
{'filepath': 's3://file_1.parquet'},
{'filepath': 's3://file_2.parquet'}]
files = find_s3_files()
# используем partial для задания аргументов одинаковых для каждой таски
# выполняем загрузку каждого файла отдельной таской
load = ClickhouseOperator.partial(
task_id="load",
connection_id="conn_id",
sql="""INSERT INTO stg.table SELECT * FROM s3('{{ params.filepath }})'""",
).expand(params=files)
.
Ранее такое приходилось реализовывать через циклы, а начиная с версии 2.3 у нас есть удобная возможность - используйте💪
.
Самое интересное, что маппить можно не только 1 -> N, но и Nin -> Nout, то есть рельтута одного маппинга подавать на вход другого и работать это будет именно так как ожидается: не сначала N первых тасок, а после N последующих. Таски свяжутся в последовательные кусочки: N1 -> N1, N2 -> N2 и тд 🔥
Набросал небольшой пример, как можно работать с API при помощи динамического маппинга - покрутите, экспериментируйте.
🔗 Links:
- TaskFlow
- Dynamic Task Mapping
- Create dynamic Airflow tasks (astronomer)
#airflow #taskflow #mapping
🔥4👍1
🎨 Сам себе Кандинский
.
Финалим серию постов про Clickhouse и наши разборки с погодой.
.
📒Рецепт приготовления:
- данные о погоде (вьюха со среднедневными данными + last_point)
- Metabase BI
.
Естественно, пришлось поработать напильником 🪚, тк Metabase из коробки не умеет ходить в Clickhouse, но
сообщество не дремлет и выкатили версию с его поддержкой.
.
Ссылку на версию Metabase + Clickhouse и внешний вид дашборда найдете в репе (или ниже).
.
🔗 Links:
- Metabase + clickhouse-driver
- Run Metabase in docker
- Get started in Metabase
Визуализация дело тонкое, проэтому если тебе нужно прокачать этот навык, то вот рекомендации:
- Бесплатный курс от DataYoga (Рома Бунин в создателях)
- Визуализация данных и введение в BI-инструменты от ЯПрактикум (Рома Бунин + Александр Богачёв - убийственное комбо) или можно почитать книгу Богачёва Графики, которые убеждают всех
.
#bi #clickhouse #dash #dashboard
.
Финалим серию постов про Clickhouse и наши разборки с погодой.
.
📒Рецепт приготовления:
- данные о погоде (вьюха со среднедневными данными + last_point)
- Metabase BI
.
Естественно, пришлось поработать напильником 🪚, тк Metabase из коробки не умеет ходить в Clickhouse, но
сообщество не дремлет и выкатили версию с его поддержкой.
.
Ссылку на версию Metabase + Clickhouse и внешний вид дашборда найдете в репе (или ниже).
.
🔗 Links:
- Metabase + clickhouse-driver
- Run Metabase in docker
- Get started in Metabase
Визуализация дело тонкое, проэтому если тебе нужно прокачать этот навык, то вот рекомендации:
- Бесплатный курс от DataYoga (Рома Бунин в создателях)
- Визуализация данных и введение в BI-инструменты от ЯПрактикум (Рома Бунин + Александр Богачёв - убийственное комбо) или можно почитать книгу Богачёва Графики, которые убеждают всех
.
#bi #clickhouse #dash #dashboard
👍4
💪 Прокачай себя до уровня PRO или полезные лайфхаки
.
Воды не будет, только те, конструкции, которые сам нашел и часто использую в Python+Pandas:
1️⃣ Pandas метод assign
Когда нужно создать столбцы на лету:
2️⃣ Pandas метод pipe
Как apply, только для всего датафрейма и принимает аргументы функции:
3️⃣ Структуры данных из collections
Если не список и не датафрейм, то точно defaultdict. Обычно использую для сбора результатов в цикле:
Кейсы:
- подготовка данных для визуализации (например, отдать по API)
- сбор статистики при выполнении сложных SQL запросов
- ...
4️⃣ Модуль itertools
-
-
-
5️⃣ Модуль functools - магия функционального программирования
-
Кейсы:
- ресерч методов или алгоритмов для решения одной и той же задачи
- отправка уведомлений только нужным адресам (чтобы каждый раз не прописывать их в аргументах)
6️⃣
.
И последнее (но это не точно): не забывайте заглядывать в доки, чтобы не городить монструозные преобразования, которые решаются специальным параметром. Помните, в казалось бы, простой функции pandas.read_csv - ~50 параметров, скорее всего ваш кейс уже там есть🙃
.
#python #lifehack
.
Воды не будет, только те, конструкции, которые сам нашел и часто использую в Python+Pandas:
1️⃣ Pandas метод assign
Когда нужно создать столбцы на лету:
import pandas as pd
df.assign(column_one = lambda row: row["existing_column"] // 1024,
column_two = 24,
....)
2️⃣ Pandas метод pipe
Как apply, только для всего датафрейма и принимает аргументы функции:
import numpy as np
import pandas as pd
def calculate(df: pd.DataFrame, thr: float = 2.56, columns: list = []):
for col in columns:
df[f"{col}_transformed"] = np.sqrt(df[col]) > 2.56
return df
df.pipe(calculate, thr=5, columns=["value_1", "value_2"])
3️⃣ Структуры данных из collections
Если не список и не датафрейм, то точно defaultdict. Обычно использую для сбора результатов в цикле:
from collections import defaultdict
res = defaultdict(list) # общая структура словарь, где для каждого ключа значением по-умолчанию будет пустой список
for idx, value enumerate([123, 999, 678]):
res["idx"].append(idx)
res["value"].append(value**2)
res["value"] # это список =)
Кейсы:
- подготовка данных для визуализации (например, отдать по API)
- сбор статистики при выполнении сложных SQL запросов
- ...
4️⃣ Модуль itertools
-
batched открыл для себя, когда сам сначала написал такую штуку (немножко изобрел 🚲)-
combinations - итератор комбинаций из элементов списка-
zip_longest - "длинный" брат zip (если не знал, то zip проходится по самому короткому из списков)5️⃣ Модуль functools - магия функционального программирования
-
partial - кажется, это самая часто используемая функцию из модуля (ну может после lru_cache())import typing
import pandas as pd
import numpy as np
def func(df: pd.DataFrame, columns: list, method: t.Callable = np.mean, window_width: int = 2):
assert window_width == 2, "Ширина окна должна быть 2+!"
for col in columns:
df[f"{col}_rolling"] = df[col].rolling(window_width, min_periods=2).apply(lambda window: method(window))
return df
func_mean = partial(func, columns=["value_1", "value_2"], method=np.median)
# теперь можно так, остальные параметры уже частично применились выше
func_mean(df=df)
Кейсы:
- ресерч методов или алгоритмов для решения одной и той же задачи
- отправка уведомлений только нужным адресам (чтобы каждый раз не прописывать их в аргументах)
6️⃣
reduce - применяет функцию из 2 аргументов итерабельно к списку, на выходе 1 значениеl = [0, 1, -5, 7, 8]
# x - первый элемент списка, или результат применения функции к текущему элементу, y - текущий элемент
res = reduce(lambda x, y: x+y, l)
print(res) # 11
.
И последнее (но это не точно): не забывайте заглядывать в доки, чтобы не городить монструозные преобразования, которые решаются специальным параметром. Помните, в казалось бы, простой функции pandas.read_csv - ~50 параметров, скорее всего ваш кейс уже там есть🙃
.
#python #lifehack
👍4🔥1
🤘 Collab Practice
.
За время ревьюверства на курсах по Анализу данных и Data Science накопилось небольшое кол-во
Collab-ноутбуков с полезными штуками.
.
Делюсь, может кому-то пригодится:
1️⃣ Пропуски и их заполнение
2️⃣ Как не выстрелить себе в ногу при приведении типов данных
3️⃣ xticks или как самому накинуть на ось Х
4️⃣ Категоризация данных
.
Любые замечания или дополнения принимаются 🤝
#python #collab #practice
.
За время ревьюверства на курсах по Анализу данных и Data Science накопилось небольшое кол-во
Collab-ноутбуков с полезными штуками.
.
Делюсь, может кому-то пригодится:
1️⃣ Пропуски и их заполнение
2️⃣ Как не выстрелить себе в ногу при приведении типов данных
3️⃣ xticks или как самому накинуть на ось Х
4️⃣ Категоризация данных
.
Любые замечания или дополнения принимаются 🤝
#python #collab #practice
🔥7
Выходим из затишья
,
Почти теория: чем выше рабочая нагрузка тем меньше постов в блогах =)
Станем разрушать эту теорию.
,
За время затишья удалось:
- погрузиться в методологию DV2.0 (datavault) ✊
- разобраться в рабочих фреймворках 👌
- написать свои фреймворки 💪
- стать teamlead (всё только началось)
.
Обо всем поговорим - планируется несколько серий постов, уххх 📒 А сейчас о нашем быстром друге Clichouse.
Парочка рабочих кейсов:
- проблема inodes
Что такое inodes: wiki, по-простому, файлы с метаинформацией и как оказалось их кол-во,
хоть и большое, но ограничено. И вот если вставлять очень активно и много, то они могут закончиться и тогда кластер переходит в read-only. Не слабо такое хватануть в понедельник утром, проблему решала поддержка YaCloud
- массовые вставки и отъезд по TTL
Хранение в клике может быть разным:
- на дисках кластера
- на дисках кластера и на внешних дисках (один из вариантов внутренний object_storage), почитать в доке - это гибридное хранилище
.
TTL - это возможность управлять жизненным циклом данных, возможности достаточно интересные, но сейчас нас интересует возможность
изменять тип хранения, то есть по достижении определенного момента времени хранение данных должно измениться, local -> object_storage.
При совокупности некоторых параметров и кейсов может наступить забавная ситуация:
- бекфилл данных
- данные настолько далеко в истории, что сразу наступает TTL
.
Что происходит: вставка идет на локальный диск и сразу данные отъезжают в object_storage и если данных много - получается два очень активных процесса. И это бомба замедленного действия, которая стреляет в ногу = поврежденные файлы, которые ломают процесс отъезда даннных -> накопление данных на локальных дисках -> место заканчивается -> read-only. Положение спасала поддержка YaCloud, так что
не создавайте таких массовых процессов 😉
.
Вот такие рабочие кейсы, до встречи🖐
.
#work #clickhouse
,
Почти теория: чем выше рабочая нагрузка тем меньше постов в блогах =)
Станем разрушать эту теорию.
,
За время затишья удалось:
- погрузиться в методологию DV2.0 (datavault) ✊
- разобраться в рабочих фреймворках 👌
- написать свои фреймворки 💪
- стать teamlead (всё только началось)
.
Обо всем поговорим - планируется несколько серий постов, уххх 📒 А сейчас о нашем быстром друге Clichouse.
Парочка рабочих кейсов:
- проблема inodes
Что такое inodes: wiki, по-простому, файлы с метаинформацией и как оказалось их кол-во,
хоть и большое, но ограничено. И вот если вставлять очень активно и много, то они могут закончиться и тогда кластер переходит в read-only. Не слабо такое хватануть в понедельник утром, проблему решала поддержка YaCloud
- массовые вставки и отъезд по TTL
Хранение в клике может быть разным:
- на дисках кластера
- на дисках кластера и на внешних дисках (один из вариантов внутренний object_storage), почитать в доке - это гибридное хранилище
.
TTL - это возможность управлять жизненным циклом данных, возможности достаточно интересные, но сейчас нас интересует возможность
изменять тип хранения, то есть по достижении определенного момента времени хранение данных должно измениться, local -> object_storage.
При совокупности некоторых параметров и кейсов может наступить забавная ситуация:
- бекфилл данных
- данные настолько далеко в истории, что сразу наступает TTL
.
Что происходит: вставка идет на локальный диск и сразу данные отъезжают в object_storage и если данных много - получается два очень активных процесса. И это бомба замедленного действия, которая стреляет в ногу = поврежденные файлы, которые ломают процесс отъезда даннных -> накопление данных на локальных дисках -> место заканчивается -> read-only. Положение спасала поддержка YaCloud, так что
не создавайте таких массовых процессов 😉
.
Вот такие рабочие кейсы, до встречи🖐
.
#work #clickhouse
🔥4👍3
FrameWorkStory1.0
.
Однажды был участником дискуссии: фреймворк или библиотека - одно и тоже или есть разница🙈. На тот момент объяснить оппоненту не удалось разницу, кажется, у меня не было правильных слов.
Обратимся к определению, на мой вкус эта часть ближе всего описывает что такое фреймворк:
программное обеспечение, облегчающее разработку и объединение разных компонентов большого программного проекта
.
Фреймворк объединяет строительные блоки, таким образом, что они подходят друг к другу. Блоками могут быть как раз библиотеки. Но что важнее фреймворк диктует правила построения архитектуры приложения, задавая на начальном этапе разработки поведение по умолчанию — «каркас», который нужно будет расширять и изменять согласно указанным требованиям.
.
Почему возникло желание:
- писать каждый раз шаблонный код дага для Airflow стало утомительно
- каждый из инженеров придумывает какие-то куски заново
- каждый переиспользует то, что сам видел или сам писал
- разнообразие функций с одинаковым функционалом
- поддержка кучи кастомных процессов
- ...
.
Почему вообще возникает множество кейсов:
- разные источники (API, S3, базы данных и др)
- API бывают сильно разные, как минимум по типу возвращаемого результата (json, csv, excel)
- формат и структура хранения данных во внешних S3 разнятся (однажды смежная команда разработчиков сказала, что они не знают что такое parquet, поэтому выгрузка в csv)
.
Что хочется:
1. Процесс доставки данных до dwh должен быть унифицированным (это L в ETL), например, в dwh стандартно загружаются файлы parquet со сжатием gzip. То есть всё интеграции должны быть приведены к этому формату
2. Процесс извлечения данных должен быть сведён к разработке только специфических адаптеров, например, клиент для работы с API яметрики или API ВыгрузиМеняПозже будут разными, но отдавать в какое-то единое место будут только parquet со сжатием gzip.
3. Создание дагов должно быть автоматизировано (соблюдай принцип DRY)
4. Структура Дага должна быть унифицирована
5. Управление из единого места
6. Сохранять исходники данные
.
Задачка масштабная, решать всё и сразу не получится, поэтому действуем от кейсов, в моем кармане было 2 штуки:
- загрузка csv из внешнего API
- выгрузка из API (API отдаёт json)
.
В результате нескольких подходов вышла такая архитектура:
- yml всему голова ( в yml описываются всё необходимые части пайплайна, как без боли читать yml это отдельная серия)
- конечной точкой процесса является S3
- два слоя: raw (данные as is), ods (gzip.parquet, партицирование по дням в формате YYYY-MM-DD)
- структура пайплайна совпадает с классикой etl (каждая буковка это таска) :
E = extractor ( возвращает коллекцию объектов ExtractorResourse, может быть только один)
T = transformers ( принимает Resourse, преобразует и возвращает TransformResourse, трансформеров может сколько угодно)
L = S = saver (принимает TransformResourse, сохраняет, может быть только один) по факту он неизменен, тк выходной формат стандартизирован и сохранение на S3
.
Зерно в ваши головы закинул, остальное будет дальше😉
А пока расскажите:
- а как вам идея
- получится ли все автоматизировать
- и есть ли у вас примеры такой автоматизации/фреймворков
.
ps: у меня есть пример API с которым будут эксперименты, но можно в меня позакидывать что-то оригинальное😜
.
#framework #automate
.
Однажды был участником дискуссии: фреймворк или библиотека - одно и тоже или есть разница🙈. На тот момент объяснить оппоненту не удалось разницу, кажется, у меня не было правильных слов.
Обратимся к определению, на мой вкус эта часть ближе всего описывает что такое фреймворк:
программное обеспечение, облегчающее разработку и объединение разных компонентов большого программного проекта
.
Фреймворк объединяет строительные блоки, таким образом, что они подходят друг к другу. Блоками могут быть как раз библиотеки. Но что важнее фреймворк диктует правила построения архитектуры приложения, задавая на начальном этапе разработки поведение по умолчанию — «каркас», который нужно будет расширять и изменять согласно указанным требованиям.
.
Почему возникло желание:
- писать каждый раз шаблонный код дага для Airflow стало утомительно
- каждый из инженеров придумывает какие-то куски заново
- каждый переиспользует то, что сам видел или сам писал
- разнообразие функций с одинаковым функционалом
- поддержка кучи кастомных процессов
- ...
.
Почему вообще возникает множество кейсов:
- разные источники (API, S3, базы данных и др)
- API бывают сильно разные, как минимум по типу возвращаемого результата (json, csv, excel)
- формат и структура хранения данных во внешних S3 разнятся (однажды смежная команда разработчиков сказала, что они не знают что такое parquet, поэтому выгрузка в csv)
.
Что хочется:
1. Процесс доставки данных до dwh должен быть унифицированным (это L в ETL), например, в dwh стандартно загружаются файлы parquet со сжатием gzip. То есть всё интеграции должны быть приведены к этому формату
2. Процесс извлечения данных должен быть сведён к разработке только специфических адаптеров, например, клиент для работы с API яметрики или API ВыгрузиМеняПозже будут разными, но отдавать в какое-то единое место будут только parquet со сжатием gzip.
3. Создание дагов должно быть автоматизировано (соблюдай принцип DRY)
4. Структура Дага должна быть унифицирована
5. Управление из единого места
6. Сохранять исходники данные
.
Задачка масштабная, решать всё и сразу не получится, поэтому действуем от кейсов, в моем кармане было 2 штуки:
- загрузка csv из внешнего API
- выгрузка из API (API отдаёт json)
.
В результате нескольких подходов вышла такая архитектура:
- yml всему голова ( в yml описываются всё необходимые части пайплайна, как без боли читать yml это отдельная серия)
- конечной точкой процесса является S3
- два слоя: raw (данные as is), ods (gzip.parquet, партицирование по дням в формате YYYY-MM-DD)
- структура пайплайна совпадает с классикой etl (каждая буковка это таска) :
E = extractor ( возвращает коллекцию объектов ExtractorResourse, может быть только один)
T = transformers ( принимает Resourse, преобразует и возвращает TransformResourse, трансформеров может сколько угодно)
L = S = saver (принимает TransformResourse, сохраняет, может быть только один) по факту он неизменен, тк выходной формат стандартизирован и сохранение на S3
.
Зерно в ваши головы закинул, остальное будет дальше😉
А пока расскажите:
- а как вам идея
- получится ли все автоматизировать
- и есть ли у вас примеры такой автоматизации/фреймворков
.
ps: у меня есть пример API с которым будут эксперименты, но можно в меня позакидывать что-то оригинальное😜
.
#framework #automate
👍1🔥1
WALLE
Заботливо дадим красивое имя фреймворку
- он строит фундамент DWH ( всё отгружаем на S3)
- построение фундамента - самая грязная работа (не считая подготовки данных от RAW до STAGE слоя)
- да, это просто красиво 😍 (не называть же его YetAnotherDagGenerator или FrameWork №Х)
К предыдущему посту вопросиков накидали:
1.
Автоматизируем 80% рутинной работы и оставим место нетривиальным кейсам (чтобы инженеру было где поразмять извилины 🤯)
2.
я такого не видел. Даже с точки зрения устаревания какого-то процесса, при автоматизации удобнее: убрал yml в архив и всё.
В общем тут посыл такой: если есть любой код - ему нужен рефактор
3.
Очень не нравиться копаться в чужом коде, разбирая, что же там сломалось (хех, можно заметить, что если автоматизацию писал не ты, то это тоже чудой код =))
4.
Честно, очень бы хотелось, но пока тестирование только в dev среде или на персональном стенде.
Общий посыл автоматизации такой:
#walle #framework #automate
Заботливо дадим красивое имя фреймворку
Walle, почему так:- он строит фундамент DWH ( всё отгружаем на S3)
- построение фундамента - самая грязная работа (не считая подготовки данных от RAW до STAGE слоя)
- да, это просто красиво 😍 (не называть же его YetAnotherDagGenerator или FrameWork №Х)
К предыдущему посту вопросиков накидали:
А проблемы при этом возникают, что твоя автоматизация не умеет все, что надо, её надо дописывать,
тестить, рефакторить, иногда не работает… и это ради 10 строчек кода?
1.
автоматизация не умеет всё - справедливо, но всё и не надо. Применяем правило Парето: Автоматизируем 80% рутинной работы и оставим место нетривиальным кейсам (чтобы инженеру было где поразмять извилины 🤯)
2.
рефакторить - Отвечу вопросом на вопрос: а неавтоматизированные даги рефакторить не надо? Написал один раз и забыл, нетя такого не видел. Даже с точки зрения устаревания какого-то процесса, при автоматизации удобнее: убрал yml в архив и всё.
В общем тут посыл такой: если есть любой код - ему нужен рефактор
3.
иногда не работает - справедливо, сделаю отсылку к п2: любой код может не работать =). Также вижу пользу: если не работает то, для всего - нет специфических проблем.Очень не нравиться копаться в чужом коде, разбирая, что же там сломалось (хех, можно заметить, что если автоматизацию писал не ты, то это тоже чудой код =))
4.
тестить - пффф, какие тесты. *уяк - *уяк и в продакшенЧестно, очень бы хотелось, но пока тестирование только в dev среде или на персональном стенде.
Общий посыл автоматизации такой:
Нет стремления покрыть 100% кейсов, есть желание написать код, в котором в одних и тех местах вызываются коннекшены, они имеют
одинаковые названия, код переиспользуется (для алертинга используется 1 и только 1 функция или стандартный набор, а не 8 различных фукнций 🤦♂️),
таски\даги - имеют одинаковый нейминг и одинаковую структуру и тд...
#walle #framework #automate
❤2👍2🔥1
WALLE - Готовим yml
Во вступительном посте оговорили, что фундамент фреймворка - yml конфиг.
Надо научиться читать его и формировать некую стркутуру для управления фреймворком.
Реализуем базу:
- все конфиги храним в директории
- читаем директорию
- данные из yml храним в переменной, например:
-
Какие минимальные данные необходимы для формирования дага:
- dag_id
- расписание (но это не точно)
- дата старта\окончания (мы же хотим управлять процессом)
- catchup
- куда алертить (помним, что хотим единый алертинг: в нашем случае будет телеграмм, поэтому нужен channel_id + токен)
- общая информация (owner\tags\denoscription)
С одной стороны конфиг должен знать что-то про нашу интеграцию, с другой у него должно быть что-то общее (про даг и про задачу),
попробуем выделять независимые секции в конфиге под эти задачи. Начнем с такого:
ps: по API будем забирать топ реддитов за последний час (данных там копейки, десятки строк, но нам не так важно кол-во)
Сохраняем наш первый yml 🥳
.
Учимся читать
.
Ничего необычного, Python библиотека pyyaml уже всё умеет.
От нас достаточно открыть файл, прочитать содержимое как текст и далее прогрузить через библиотеку
Добавил обработку ошибок
Проверяем работу реализованной функции (скрин в репе):
- реализуем функцию
- объект даг нужно положить в
- пусть даг имеет следующую структуру:
- 1 таск группа
- таска start (EmptyOperator)
- таска end
Мои маленькие дата инженеры - это подлежит самостоятельной реализации 😉. Мою реализацию посмотреть тут
.
Ну и плюс, код который будет читать все конфиги из директории
У меня получилась такая структура папок:
Загружаем в AirFlow, наслаждаемся картиной (она в скринах)
На этом самая простая часть закончилась🙃
#walle #framework #automate
Во вступительном посте оговорили, что фундамент фреймворка - yml конфиг.
Надо научиться читать его и формировать некую стркутуру для управления фреймворком.
Реализуем базу:
- все конфиги храним в директории
metadata- читаем директорию
metadata- данные из yml храним в переменной, например:
integration_meta-
integration_meta подается на вход функции создания дага create_dag(integration_meta)Какие минимальные данные необходимы для формирования дага:
- dag_id
- расписание (но это не точно)
- дата старта\окончания (мы же хотим управлять процессом)
- catchup
- куда алертить (помним, что хотим единый алертинг: в нашем случае будет телеграмм, поэтому нужен channel_id + токен)
- общая информация (owner\tags\denoscription)
С одной стороны конфиг должен знать что-то про нашу интеграцию, с другой у него должно быть что-то общее (про даг и про задачу),
попробуем выделять независимые секции в конфиге под эти задачи. Начнем с такого:
version: 2
models:
- name: reddit # имя интеграции, оно же dag_id
denoscription: Топ реддитов за последний час # описание интеграции, оно же dag_denoscription
dag:
# dag_id: "" # можно переопределить dag_id != name
schedule_interval: 0 * * * *
start_date: '2024-08-01'
# end_date: '2024-08-31'
catchup: False
alerting_chat_id: -987654321
alerting_secret_name: alerting_bot_token
owner: dwh
tags:
- dwh
- api_integration
ps: по API будем забирать топ реддитов за последний час (данных там копейки, десятки строк, но нам не так важно кол-во)
Сохраняем наш первый yml 🥳
.
Учимся читать
.
Ничего необычного, Python библиотека pyyaml уже всё умеет.
От нас достаточно открыть файл, прочитать содержимое как текст и далее прогрузить через библиотеку
pyyaml. Немного шаблонного кода:import yaml
def read_yaml(path_to_file: str):
with open(path_to_file, "r") as f_yaml:
try:
content = yaml.safe_load(f_yaml)
return content
except yaml.YAMLError as e:
print(e)
Добавил обработку ошибок
try\except - чуть-чуть защиты от сломанных файлов yml.Проверяем работу реализованной функции (скрин в репе):
{'models': [{'dag': {'alerting_chat_id': -987654321,
'alerting_secret_name': 'alerting_bot_token',
'catchup': False,
'owner': 'dwh',
'schedule_interval': '0 * * * *',
'start_date': '2024-08-01',
'tags': ['dwh', 'reddit', 'api_integration']},
'denoscription': 'Топ реддитов за последний час',
'name': 'reddit'}],
'version': 2}yaml.safe_load возвращает данные в виде python-словаря. Еще немного шаблонного кода для создания AirFlow дага и мы на финишной прямой:- реализуем функцию
create_dag, она должна возвращать объект DAG (from airflow import DAG)- объект даг нужно положить в
globals(), чтобы AirFlow смог узнать про наш даг- пусть даг имеет следующую структуру:
- 1 таск группа
- таска start (EmptyOperator)
- таска end
Мои маленькие дата инженеры - это подлежит самостоятельной реализации 😉. Мою реализацию посмотреть тут
.
Ну и плюс, код который будет читать все конфиги из директории
metadata (тут в помощь модуль из стандартной либы python os) и формировать даги. Дерзаем💪У меня получилась такая структура папок:
- src:
- /common:
- func.py
- /metadata:
- reddit.yml
- /yaml_reader:
- reader.py
- walle.py
Загружаем в AirFlow, наслаждаемся картиной (она в скринах)
На этом самая простая часть закончилась🙃
#walle #framework #automate
❤🔥2🔥1👏1
🧯 По следам бременских рабочих кейсов или ещё раз про ClickHouse
.
Помните решали вопрос с курсорами, чтобы никого не упасть. Упёрлись в другой нюанс, таска выполняется настолько долго, что heartbeat 💓не может прилететь, airflow со спокойной душой шлётвас fail_alert.
.
Тут было два варианта:
- сменить executor (был кубер)
- изменить интеграцию
.
Решено было идти вторым путём. Вариантов, если cdc интеграция невозможна, кот наплакал: к именованным курсорам уже перешли ( загрузка была стабильной, но долгой, ~2часов по самой большой сущности, да, сущность прибавила в весе х2, наречем её
.
Хотелки инженера:
- перейти к хранению снепшотов на s3 и соответственно загрузки из s3
- dbt
- единообразие
.
Из этого уже имеется: рабочий фреймворк для запуска dbt моделей -> единообразие, осталось складировать снепшоты на s3. Был стандартный путь: читаем табличку батчами, сохраняем в файлы на s3, но хотелось чего-то другого и простого (скажем честно, хотелось экспериментов 😎).
Так у нас же ClickHouse, подумал я, и почему бы не использовать его, как compute🧐
Входные данные:
- бек postgres
- табличная функция postgresql
- табличная функция s3 (можно не только читать но и писать)
.
Получается примерно так:
.
Очень просто и со вкусом, а что самое интересное работает это как молния ⚡ Для сущности
.
Далее дело за малым: сделать вьюху над файлом, модельку dbt... 👉
.
Ну чем не кайфота. Такую интеграцию нельзя назвать полноценной ( тк имеем только снепшот на текущий момент, нет делитов, апдейтов и др), но если это закрывает потребность заказчика, то почему нет.
.
Будут у вас вопросики к такому подходу?
.
#work #cases #clickhouse
.
Помните решали вопрос с курсорами, чтобы никого не упасть. Упёрлись в другой нюанс, таска выполняется настолько долго, что heartbeat 💓не может прилететь, airflow со спокойной душой шлёт
.
Тут было два варианта:
- сменить executor (был кубер)
- изменить интеграцию
.
Решено было идти вторым путём. Вариантов, если cdc интеграция невозможна, кот наплакал: к именованным курсорам уже перешли ( загрузка была стабильной, но долгой, ~2часов по самой большой сущности, да, сущность прибавила в весе х2, наречем её
L)..
Хотелки инженера:
- перейти к хранению снепшотов на s3 и соответственно загрузки из s3
- dbt
- единообразие
.
Из этого уже имеется: рабочий фреймворк для запуска dbt моделей -> единообразие, осталось складировать снепшоты на s3. Был стандартный путь: читаем табличку батчами, сохраняем в файлы на s3, но хотелось чего-то другого и простого (скажем честно, хотелось экспериментов 😎).
Так у нас же ClickHouse, подумал я, и почему бы не использовать его, как compute🧐
Входные данные:
- бек postgres
- табличная функция postgresql
- табличная функция s3 (можно не только читать но и писать)
.
Получается примерно так:
INSERT INTO s3('.../bucket/entity/date=2024-10-20/data.parquet')
SELECT * FROM postgresql(host, db, table, ...).
Очень просто и со вкусом, а что самое интересное работает это как молния ⚡ Для сущности
L время отработки запроса составляет ~15минут 🔥.
Далее дело за малым: сделать вьюху над файлом, модельку dbt... 👉
.
Ну чем не кайфота. Такую интеграцию нельзя назвать полноценной ( тк имеем только снепшот на текущий момент, нет делитов, апдейтов и др), но если это закрывает потребность заказчика, то почему нет.
.
Будут у вас вопросики к такому подходу?
.
#work #cases #clickhouse
👍4🔥2❤1
Превратности панд 🐼
.
Кейс из серии явное лучше неявного. Многие сталкиваются с задачей конвертации строковой даты в собственно дату.
.
Рассмотрим пример в пандах для файлика (в нем ничего кроме даты нет):
.
И тут нас ждут тонкости, связанные с версией pandas:
- если вы используете pandas < 2.0, то вероятнее всего код выполнится без ошибок, но ошибки собирать вы будите дальше и сколько времени уйдет на поиска: часы или дни 🤷♂️
.
А что вообще происходит: по дефлоту пандас пытаетсяугадать определить формат даты и преобразовать в дату. Но как выяснилось до версии 2.0 пандас делает это специфически. Открываем наш файл и видим что даты записаны как:
Пока сложно: это 1 декабря или 12 января??? Поищем другие цифры и находим:
Ага, значит наш формат -
12 января стало 1 декабря, а 13 декабря осталось 13 декабря 🤦♂️
.
Версии пандас >= 2.0 кидают ошибку:
И сразу предлагают воспользоваться аргументом
☝️Будьте бдительны при использовании различных инструментов и старайтесь явно прописывать все настройки, не полагаясь на "умноту" этих средств - рано или поздно стрельнет в ногу.
ps: Clickhouse справился с задачей из коробки, только 🤫
#pandas #datetime
.
Кейс из серии явное лучше неявного. Многие сталкиваются с задачей конвертации строковой даты в собственно дату.
.
Рассмотрим пример в пандах для файлика (в нем ничего кроме даты нет):
import pandas as pd
print(pd.version)
df = pd.read_csv("sample.csv")
df["entry_date"] = pd.to_datetime(df["entry_date"])
.
И тут нас ждут тонкости, связанные с версией pandas:
- если вы используете pandas < 2.0, то вероятнее всего код выполнится без ошибок, но ошибки собирать вы будите дальше и сколько времени уйдет на поиска: часы или дни 🤷♂️
.
А что вообще происходит: по дефлоту пандас пытается
12/01/2018 08:26
Пока сложно: это 1 декабря или 12 января??? Поищем другие цифры и находим:
13/12/2018 09:02
Ага, значит наш формат -
%d/%m/%Y %H:%M - супер. А теперь вишенка на торте, смотрим как преобразовал пандас наши строки:12/01/2018 08:26 -> 2018-12-01 08:26:00
13/12/2018 09:02 -> 2018-12-13 09:02:00
12 января стало 1 декабря, а 13 декабря осталось 13 декабря 🤦♂️
.
Версии пандас >= 2.0 кидают ошибку:
ValueError: time data "13/12/2018 09:02" doesn't match format "%m/%d/%Y %H:%M", at position 881
И сразу предлагают воспользоваться аргументом
format, в котором требуется указать пандасу с каким форматом даты он имеет дело. ☝️Будьте бдительны при использовании различных инструментов и старайтесь явно прописывать все настройки, не полагаясь на "умноту" этих средств - рано или поздно стрельнет в ногу.
ps: Clickhouse справился с задачей из коробки, только 🤫
SELECT
entry_date,
parseDateTime64BestEffort(entry_date) AS entry_date_dt
FROM
s3('https://storage.yandexcloud.net/public-bucket-6/sandbox/sample.csv',
'CSVWithNames')
WHERE
entry_date IN ('12/01/2018 08:26', '13/12/2018 09:02');
entry_date |entry_date_dt |
----------------+-------------------+
12/01/2018 08:26|2018-01-12 05:26:00|
13/12/2018 09:02|2018-12-13 06:02:00|
#pandas #datetime
👍2🔥2
#post
👀 Data Overview
.
Поговорим о том какие инструменты есть в Clickhouse для обзора данных. Кейсы, возникающие в работе:
- на s3 сложили файлы, какая структура, какие типы данных
- хотите выгружать данные с бека (например, Postgres) и хочется узнать в какие типы превратить атрибуты или просто узнать о структуре таблицы (а например, локального доступа нет)
- узнать структуру существующей таблицы
.
Да, в Clickhouse есть аналог
А можно такие же, только с перламутровыми пуговицами...
.
Конечно, их есть у меня:
1.
... and so one. В одном из рабочих кейсов перешли от чтения паркет-файла, для формирования data_schema, к выводу DESCRIBE - 👌
.
2. SHOW CREATE - покажет вам DDL как он есть
.
3. Изучить метаданные паркетов поможет указание ParquetMetadata вместо формата файла
.
ps: clickhouse не перестаёт меня удивлять, есть ощущение, что при правильном рецепте это пушка-бомба 💣
#clickhouse #lifehacks
👀 Data Overview
.
Поговорим о том какие инструменты есть в Clickhouse для обзора данных. Кейсы, возникающие в работе:
- на s3 сложили файлы, какая структура, какие типы данных
- хотите выгружать данные с бека (например, Postgres) и хочется узнать в какие типы превратить атрибуты или просто узнать о структуре таблицы (а например, локального доступа нет)
- узнать структуру существующей таблицы
.
Да, в Clickhouse есть аналог
information_schema - это база system, таблиц там много, но нам интересны: tables, columns - тут всё как в PostgeSQL, разобраться довольно просто.А можно такие же, только с перламутровыми пуговицами...
.
Конечно, их есть у меня:
1.
DESCRIBE - команду можно использовать не только с существующими таблицами, но и с табличными функциями:-- local table
DESCRIBE cdc.openweathermap_raw;
name |type |
----------------+-------------+
record_timestamp|DateTime64(6)|
record_value |String |
-- files on s3
describe s3('https://storage.yandexcloud.net/public-bucket-6/reddit/2024-10-26/top_subreddits_2024_10_26_03_00_00.parquet', '', '', 'Parquet');
name |type |
-----------------------+-------+
id |String |
noscript |String |
score |Int64 |
num_comments |Int64 |
author |String |
created_utc |String |
url |String |
upvote_ratio |Float64|
over_18 |UInt8 |
edited |UInt8 |
spoiler |UInt8 |
stickied |UInt8 |
subreddit_name_prefixed|String |
DESCRIBE postgresql('192.168.55.94:5432', 'tmp', 'superstore_v1', 'shpz', '12345', 'stage');
name |type |
--------------+-------------------------+
order_id |Nullable(String) |
order_date |Nullable(Date) |
ship_date |Nullable(Date) |
ship_mode |Nullable(String) |
customer_id |Nullable(String) |
custsomer_name|Nullable(String) |
segment |Nullable(String) |
country |Nullable(String) |
city |Nullable(String) |
state |Nullable(String) |
postal_code |Nullable(Int64) |
region |Nullable(String) |
product_id |Nullable(String) |
category |Nullable(String) |
sub_category |Nullable(String) |
product_name |Nullable(String) |
sales |Nullable(Decimal(38, 19))|
quantity |Nullable(Int64) |
discount |Nullable(Decimal(38, 19))|
profit |Nullable(Decimal(38, 19))|
... and so one. В одном из рабочих кейсов перешли от чтения паркет-файла, для формирования data_schema, к выводу DESCRIBE - 👌
.
2. SHOW CREATE - покажет вам DDL как он есть
SHOW CREATE cdc.openweathermap;
.
3. Изучить метаданные паркетов поможет указание ParquetMetadata вместо формата файла
SELECT * FROM s3('https://storage.yandexcloud.net/public-bucket-6/reddit/2024-10-26/top_subreddits_2024_10_26_03_00_00.parquet', '', '', 'ParquetMetadata')
FORMAT Vertical;
.
num_columns: 13
num_rows: 32
num_row_groups: 1
format_version: 2.6
metadata_size: 2513
total_uncompressed_size: 6182
total_compressed_size: 4068
columns: [('id','id',1,0,'BYTE_ARRAY','String','GZIP',435,254,'41.61%',['PLAIN','RLE','RLE_DICTIONARY']),('noscript','noscript',1,0,'BYTE_ARRAY','String','GZIP',1765,1191,'32.52%',['PLAIN','RLE','RLE_DICTIONARY']),...]
row_groups: [(13,32,6182,4068,[('id','id',254,435,true,(32,0,NULL,'1gcanab','1gcbktk')),...].
ps: clickhouse не перестаёт меня удивлять, есть ощущение, что при правильном рецепте это пушка-бомба 💣
#clickhouse #lifehacks
🔥5❤1👍1
МАТЕМАРКЕТИНГ - 2024
Запоздалый пост о конфе матемаркетинг-2024. На фото наша банда на стенде
Постояли на стенде Побывали на конференции, впечатления:
- масштабно
- с шиком\блеском
- с Себрантом
- шумно
Инженерная секция пока представлена слабо, согласно программе все рассказывали о том как
построить DataMesh. Не стало исключением и выступление нашего Head of DWH, рассказал:
- о том какие проблемы были и как решали
- как делилиапельсин DWH
- где чья ответственность
Интересно было послушать, как это выглядит со стороны, когда сам являешься участником данных перемен🙃
Судя по кол-ву вопросов из зала - выступление получилось топ и заинтересовало многих.
Из интересных докладов можно отметить Рому Бунина о главном качестве аналитика. Это было не просто интересно, это было
визуально приятно, Рома в своем стиле с графиками, динамическим оформлением презы и даже вставками видео рассказал о чем не стоит
забывать аналитика (кажется, не только им):
- харды не главное
- чем выше ваш уровень (middle -> senior) тем важнее для вас софты
Посмотреть на фото Ромы тут
Кажется, это старо как мир: учитесь общаться, договариваться, задавать вопросы, говорить нет,
критически мыслить и ....
#matemarketing
Запоздалый пост о конфе матемаркетинг-2024. На фото наша банда на стенде
- масштабно
- с шиком\блеском
- с Себрантом
- шумно
Инженерная секция пока представлена слабо, согласно программе все рассказывали о том как
построить DataMesh. Не стало исключением и выступление нашего Head of DWH, рассказал:
- о том какие проблемы были и как решали
- как делили
- где чья ответственность
Интересно было послушать, как это выглядит со стороны, когда сам являешься участником данных перемен🙃
Судя по кол-ву вопросов из зала - выступление получилось топ и заинтересовало многих.
Из интересных докладов можно отметить Рому Бунина о главном качестве аналитика. Это было не просто интересно, это было
визуально приятно, Рома в своем стиле с графиками, динамическим оформлением презы и даже вставками видео рассказал о чем не стоит
забывать аналитика (кажется, не только им):
- харды не главное
- чем выше ваш уровень (middle -> senior) тем важнее для вас софты
Посмотреть на фото Ромы тут
Кажется, это старо как мир: учитесь общаться, договариваться, задавать вопросы, говорить нет,
критически мыслить и ....
#matemarketing
🔥6🏆2
WALLE - Выстраиваем структуру
В прошлом посте собрали фундамент:
- сформировали yml
- научились его читать
- научились создавать даг из yml метадаты
Пока в даге только пустые таски (
очертили архитектуру дага, она повторяет процессы E(xtract)T(ransform)L(Save).
Таски transform, save предлагаю объеднить, тк transformer отдает какие-то данные, а saver их сохраняет, то есть таски обмениваются данные, а делать это через
XComm неблагодарное дело, поэтому наши даги будут состоять из двух тасок (минимум):
- extractor
- transformer_and_saver
Исходные данные (например, выгрузка API или файл на S3) в единственном числе (в смысле, что данные ASIS могут быть только одни) поэтому
extractor всегда 1, но может возвращать несколько объектов (например, N путей до файлов)
А вот обработать данные мы уже можем несколькими способами, поэтому transformers может быть несколько, а saver всегда идет в комплекте к transformer.
Для описания тасок выделим секцию
Тут важно проверить, что ошибок в структуре нет и yaml_reader успешно читает такой конфиг (самостоятельно).
Структуру описали, теперь разбираемся что же это за
это некие Python-классы, которые реализуют некоторый базовый интерфейс. На текущий момент мы знаем, что extractor что-то передает transformer, этот
в свою очередь передаёт уже данные в saver. Условимся называть то чем обмениваются классы ресурсом (
- ExtractorResource
- TransformerResource
- SaverResource
Ресурс - это объект Python, будем использовать датаклассы (
- Extractor (выгружает из API и складывает на S3 (это я называю RAW-слоем = данные ASIS) и отдает далее пути до файлов
или ищет наличие файлов на S3 или FTP и возвращает пути к нужным файлам)
- Transformer (читаем данные по полученным путям, перекодирует согласно нашей логике и отдаёт saver-у набор байтов для сохранения)
- Saver (просто сохраняет байтики на S3 в нужном нам формате\партицировании - это я называю ODS-слой)
Интерфейс у нас будет единообразный, поэтому опишем какие методы должны быть реализованы у каждого класса:
- Extractor - должен иметь метод
- Transformer - должен иметь метод
- Saver - имеет 1 метод
Все остальные внутренности каждого класса разрабатываются на усмотрение инженера - творческий процесс однако 😎.
Расчехляем Pycharm и кодируем, ресурсы:
Для проверки идеи и работоспособности дага создадим Mock классы, для примера MockExtractor:
В прошлом посте собрали фундамент:
- сформировали yml
- научились его читать
- научились создавать даг из yml метадаты
Пока в даге только пустые таски (
start\end), сейчас будем исправлять это. Во вступительном постеочертили архитектуру дага, она повторяет процессы E(xtract)T(ransform)L(Save).
Таски transform, save предлагаю объеднить, тк transformer отдает какие-то данные, а saver их сохраняет, то есть таски обмениваются данные, а делать это через
XComm неблагодарное дело, поэтому наши даги будут состоять из двух тасок (минимум):
- extractor
- transformer_and_saver
Исходные данные (например, выгрузка API или файл на S3) в единственном числе (в смысле, что данные ASIS могут быть только одни) поэтому
extractor всегда 1, но может возвращать несколько объектов (например, N путей до файлов)
А вот обработать данные мы уже можем несколькими способами, поэтому transformers может быть несколько, а saver всегда идет в комплекте к transformer.
Для описания тасок выделим секцию
tasks в нашем yml:version: 2
models:
- name: mock # имя интеграции, оно же dag_id
denoscription: Топ реддитов за последний час # описание интеграции, оно же dag_denoscription
dag:
# dag_id: "" # можно переопределить dag_id != name
schedule_interval: 0 * * * *
start_date: '2024-08-01'
# end_date: '2024-08-31'
catchup: False
owner: dwh
tags:
- mock
- api_integration
tasks:
extractor:
MockExtractor:
transformers:
- MockTransformer:
saver:
MockSaver:
alerting_chat_id: -987654321
alerting_secret_name: alerting_bot_token
Тут важно проверить, что ошибок в структуре нет и yaml_reader успешно читает такой конфиг (самостоятельно).
Структуру описали, теперь разбираемся что же это за
MockExtractor, MockTransformer и MockSaver🤔. А этих товарищей нужно реализовать: то есть это некие Python-классы, которые реализуют некоторый базовый интерфейс. На текущий момент мы знаем, что extractor что-то передает transformer, этот
в свою очередь передаёт уже данные в saver. Условимся называть то чем обмениваются классы ресурсом (
Resource). Итого у нас будет 3 ресурса:- ExtractorResource
- TransformerResource
- SaverResource
Ресурс - это объект Python, будем использовать датаклассы (
dataclasses), но до них доберемся чуть позже. А сейчас про функции каждого объекта:- Extractor (выгружает из API и складывает на S3 (это я называю RAW-слоем = данные ASIS) и отдает далее пути до файлов
или ищет наличие файлов на S3 или FTP и возвращает пути к нужным файлам)
- Transformer (читаем данные по полученным путям, перекодирует согласно нашей логике и отдаёт saver-у набор байтов для сохранения)
- Saver (просто сохраняет байтики на S3 в нужном нам формате\партицировании - это я называю ODS-слой)
Интерфейс у нас будет единообразный, поэтому опишем какие методы должны быть реализованы у каждого класса:
- Extractor - должен иметь метод
get_resources - возвращает генератор объетов ExtractResource- Transformer - должен иметь метод
transform, который принимает ExtractResource, возвращает объект TransformResource- Saver - имеет 1 метод
save, который принимает TransformResource и возвращает SaveResource.Все остальные внутренности каждого класса разрабатываются на усмотрение инженера - творческий процесс однако 😎.
Расчехляем Pycharm и кодируем, ресурсы:
@dataclasses.dataclass
class ExtractorResource:
path: str
@dataclasses.dataclass
class TransformerResource:
path: str
content: io.BytesIO
@dataclasses.dataclass
class SaverResource:
path: str
Для проверки идеи и работоспособности дага создадим Mock классы, для примера MockExtractor:
🔥4
ps: можно заматить, экстрактор реализован как генератор (yield) - в конкретном случае не так важно, тк экстрактор не отдает сами данные, а только ссылки (путь на S3 или url и тд)
Остальные классы, реализуем самостоятельно или заглядываем в репу
Каждый из классов имеет единственный аргумент = содержимое yml конфига и ничего более (но это не точно 😉), то есть все необходимые параметры должны быть описаны
в теле самого yml в нужной секции, например, у меня получилось так для экстрактора:
Осталось встроить наши классы в даг, используем TaskFlow API, пример для экстратора:
Комментарии:
- декораторная таска принимает только саму фукнцию или класс в нашем случае
- имя экстратора получаем из ямла
- все экстраторы импортируются
- из словаря
- если параметров экстратора нет, то подставляют пустой словарь
-
Логика для трансформера и saver сохраняется, добавляется только обработка ситуации,
когда трансформер сохраняет сам объекты и отдает только пути (пустой
встречается.
Остальное обдумываем сами или заглядываем в репу.
И после загрузки в AirFlow получаем красоту в UI - repo
#walle
#framework
#automate
from models import ExtractorResource
class MockExtractor:
def init(self, integration_metadata: dict):
self.integration_metadata = integration_metadata
def get_resources(self):
for idx in range(5):
yield ExtractorResource(path=f'mock_s3_file_{idx}.csv')
ps: можно заматить, экстрактор реализован как генератор (yield) - в конкретном случае не так важно, тк экстрактор не отдает сами данные, а только ссылки (путь на S3 или url и тд)
Остальные классы, реализуем самостоятельно или заглядываем в репу
Каждый из классов имеет единственный аргумент = содержимое yml конфига и ничего более (но это не точно 😉), то есть все необходимые параметры должны быть описаны
в теле самого yml в нужной секции, например, у меня получилось так для экстрактора:
tasks:
extractor:
MockExtractor:
src_s3_conection_id: reddit_s3_connection_id
src_s3_bucket: raw-public
src_s3_prefix_template: reddit/{dm_date}
src_s3_partition_fmt: '%Y-%m-%d'
Осталось встроить наши классы в даг, используем TaskFlow API, пример для экстратора:
@task
def _extractor(extractor: t.Callable) -> t.List[str]:
extractor_obj = extractor(
intergation_metadata=intergation_metadata
)
return [resource.dict for resource in extractor_obj.get_resources()]
# извлекаем общую структуру тасок
tasks_meta = intergation_metadata.get("tasks", {})
# извлекаем extractor
extractor_name, extractor_params = list(tasks_meta.get("extractor").items())[0]
if not extractor_params:
extractor_params = {}
logging.info([extractor_name, extractor_params])
extractor = globals()[extractor_name]
# возвращаем список объектов
ext_resources = extractor.override(task_id=f"extractor{extractor_name}")(
extractor=extractor)
Комментарии:
- декораторная таска принимает только саму фукнцию или класс в нашем случае
- имя экстратора получаем из ямла
- все экстраторы импортируются
from extractors import *- из словаря
globals() получаем объект нужного экстрактора по имени из ямла- если параметров экстратора нет, то подставляют пустой словарь
-
resource.dict - нужно, тк XComm не знает как сериализовать нашу модель ресурса, поэтому воспользуемся атрибутом dict. Соответственно внутри таски transform_and_save обратно создадим ресурсЛогика для трансформера и saver сохраняется, добавляется только обработка ситуации,
когда трансформер сохраняет сам объекты и отдает только пути (пустой
TransformerResource.content). Как показала практика: такое нередко встречается.
Остальное обдумываем сами или заглядываем в репу.
И после загрузки в AirFlow получаем красоту в UI - repo
#walle
#framework
#automate
🔥4👍1
Пост Новогоднего настроения 😎
.
Нравиться ли вам, когда ПО меняет иконки или добавляет новогодних эффектов.
Примеры на скрине:
- dbeaver
- confluence
- polaris (управление умной бытовой техникой)
.
У меня вызывает это детский восторг 😛 + 100500 к силе 💪, приятно что пользовательский опыт становиться положительным.
А как вам такие новогодние изменения? Если есть еще примеры кидайте в комменты...
#ny #2025
.
Нравиться ли вам, когда ПО меняет иконки или добавляет новогодних эффектов.
Примеры на скрине:
- dbeaver
- confluence
- polaris (управление умной бытовой техникой)
.
У меня вызывает это детский восторг 😛 + 100500 к силе 💪, приятно что пользовательский опыт становиться положительным.
А как вам такие новогодние изменения? Если есть еще примеры кидайте в комменты...
#ny #2025
🔥2🎄2