Where is data, Lebowski – Telegram
Where is data, Lebowski
237 subscribers
83 photos
2 videos
83 links
Канал про разное в data-мире:
- от библиотек визуализации до data egineering
- от графиков до элементов разработки
- от .csv до API
Download Telegram
🎨 Сам себе Кандинский
.
Финалим серию постов про Clickhouse и наши разборки с погодой.
.
📒Рецепт приготовления:
- данные о погоде (вьюха со среднедневными данными + last_point)
- Metabase BI
.
Естественно, пришлось поработать напильником 🪚, тк Metabase из коробки не умеет ходить в Clickhouse, но
сообщество не дремлет и выкатили версию с его поддержкой.
.
Ссылку на версию Metabase + Clickhouse и внешний вид дашборда найдете в репе (или ниже).
.
🔗 Links:
- Metabase + clickhouse-driver
- Run Metabase in docker
- Get started in Metabase

Визуализация дело тонкое, проэтому если тебе нужно прокачать этот навык, то вот рекомендации:
- Бесплатный курс от DataYoga (Рома Бунин в создателях)
- Визуализация данных и введение в BI-инструменты от ЯПрактикум (Рома Бунин + Александр Богачёв - убийственное комбо) или можно почитать книгу Богачёва Графики, которые убеждают всех
.
#bi #clickhouse #dash #dashboard
👍4
​​💪 Прокачай себя до уровня PRO или полезные лайфхаки
.
Воды не будет, только те, конструкции, которые сам нашел и часто использую в Python+Pandas:
1️⃣ Pandas метод assign
Когда нужно создать столбцы на лету:

import pandas as pd


df.assign(column_one = lambda row: row["existing_column"] // 1024,
column_two = 24,
....)


2️⃣ Pandas метод pipe
Как apply, только для всего датафрейма и принимает аргументы функции:

import numpy as np
import pandas as pd


def calculate(df: pd.DataFrame, thr: float = 2.56, columns: list = []):
for col in columns:
df[f"{col}_transformed"] = np.sqrt(df[col]) > 2.56
return df

df.pipe(calculate, thr=5, columns=["value_1", "value_2"])


3️⃣ Структуры данных из collections
Если не список и не датафрейм, то точно defaultdict. Обычно использую для сбора результатов в цикле:

from collections import defaultdict

res = defaultdict(list) # общая структура словарь, где для каждого ключа значением по-умолчанию будет пустой список

for idx, value enumerate([123, 999, 678]):
res["idx"].append(idx)
res["value"].append(value**2)

res["value"] # это список =)


Кейсы:
- подготовка данных для визуализации (например, отдать по API)
- сбор статистики при выполнении сложных SQL запросов
- ...

4️⃣ Модуль itertools
- batched открыл для себя, когда сам сначала написал такую штуку (немножко изобрел 🚲)
- combinations - итератор комбинаций из элементов списка
- zip_longest - "длинный" брат zip (если не знал, то zip проходится по самому короткому из списков)

5️⃣ Модуль functools - магия функционального программирования
- partial - кажется, это самая часто используемая функцию из модуля (ну может после lru_cache())

import typing
import pandas as pd
import numpy as np


def func(df: pd.DataFrame, columns: list, method: t.Callable = np.mean, window_width: int = 2):
assert window_width == 2, "Ширина окна должна быть 2+!"

for col in columns:
df[f"{col}_rolling"] = df[col].rolling(window_width, min_periods=2).apply(lambda window: method(window))

return df

func_mean = partial(func, columns=["value_1", "value_2"], method=np.median)

# теперь можно так, остальные параметры уже частично применились выше
func_mean(df=df)


Кейсы:
- ресерч методов или алгоритмов для решения одной и той же задачи
- отправка уведомлений только нужным адресам (чтобы каждый раз не прописывать их в аргументах)

6️⃣ reduce - применяет функцию из 2 аргументов итерабельно к списку, на выходе 1 значение

l = [0, 1, -5, 7, 8]

# x - первый элемент списка, или результат применения функции к текущему элементу, y - текущий элемент
res = reduce(lambda x, y: x+y, l)

print(res) # 11

.
И последнее (но это не точно): не забывайте заглядывать в доки, чтобы не городить монструозные преобразования, которые решаются специальным параметром. Помните, в казалось бы, простой функции pandas.read_csv - ~50 параметров, скорее всего ваш кейс уже там есть🙃
.
#python #lifehack
👍4🔥1
🤘 Collab Practice
.
За время ревьюверства на курсах по Анализу данных и Data Science накопилось небольшое кол-во
Collab-ноутбуков с полезными штуками.
.
Делюсь, может кому-то пригодится:
1️⃣ Пропуски и их заполнение
2️⃣ Как не выстрелить себе в ногу при приведении типов данных
3️⃣ xticks или как самому накинуть на ось Х
4️⃣ Категоризация данных
.
Любые замечания или дополнения принимаются 🤝
#python #collab #practice
🔥7
Пусть это будет пятничным мемом во вторник😁
.
Когда вы с коллегами решили ввести CI проверку на наличие delete-ов при работа с ClickHouse🤟
🔥2👍1😁1
​​Выходим из затишья
,
Почти теория: чем выше рабочая нагрузка тем меньше постов в блогах =)
Станем разрушать эту теорию.
,
За время затишья удалось:
- погрузиться в методологию DV2.0 (datavault)
- разобраться в рабочих фреймворках 👌
- написать свои фреймворки 💪
- стать teamlead (всё только началось)
.
Обо всем поговорим - планируется несколько серий постов, уххх 📒 А сейчас о нашем быстром друге Clichouse.
Парочка рабочих кейсов:
- проблема inodes
Что такое inodes: wiki, по-простому, файлы с метаинформацией и как оказалось их кол-во,
хоть и большое, но ограничено. И вот если вставлять очень активно и много, то они могут закончиться и тогда кластер переходит в read-only. Не слабо такое хватануть в понедельник утром, проблему решала поддержка YaCloud

- массовые вставки и отъезд по TTL
Хранение в клике может быть разным:
- на дисках кластера
- на дисках кластера и на внешних дисках (один из вариантов внутренний object_storage), почитать в доке - это гибридное хранилище
.
TTL - это возможность управлять жизненным циклом данных, возможности достаточно интересные, но сейчас нас интересует возможность
изменять тип хранения, то есть по достижении определенного момента времени хранение данных должно измениться, local -> object_storage.
При совокупности некоторых параметров и кейсов может наступить забавная ситуация:
- бекфилл данных
- данные настолько далеко в истории, что сразу наступает TTL
.
Что происходит: вставка идет на локальный диск и сразу данные отъезжают в object_storage и если данных много - получается два очень активных процесса. И это бомба замедленного действия, которая стреляет в ногу = поврежденные файлы, которые ломают процесс отъезда даннных -> накопление данных на локальных дисках -> место заканчивается -> read-only. Положение спасала поддержка YaCloud, так что
не создавайте таких массовых процессов 😉
.
Вот такие рабочие кейсы, до встречи🖐
.
#work #clickhouse
🔥4👍3
Colorized it!
.
Пост удобства, у коллеги заметил цветовое разделение коннектов к базам в DataGrip - показалось крайне интересным.
Но что, у меня-то Dbeaver, сходу решения не нашлось =(
.
Со второй попытки копания в настройках удалось найти нужную 🥳
.
Маленькая инструкция в репо.
.
#dbeaver #colorize
👍1
​​FrameWorkStory1.0
.
Однажды был участником дискуссии: фреймворк или библиотека - одно и тоже или есть разница🙈. На тот момент объяснить оппоненту не удалось разницу, кажется, у меня не было правильных слов.
Обратимся к определению, на мой вкус эта часть ближе всего описывает что такое фреймворк:
программное обеспечение, облегчающее разработку и объединение разных компонентов большого программного проекта
.
Фреймворк объединяет строительные блоки, таким образом, что они подходят друг к другу. Блоками могут быть как раз библиотеки. Но что важнее фреймворк диктует правила построения архитектуры приложения, задавая на начальном этапе разработки поведение по умолчанию — «каркас», который нужно будет расширять и изменять согласно указанным требованиям.
.
Почему возникло желание:
- писать каждый раз шаблонный код дага для Airflow стало утомительно
- каждый из инженеров придумывает какие-то куски заново
- каждый переиспользует то, что сам видел или сам писал
- разнообразие функций с одинаковым функционалом
- поддержка кучи кастомных процессов
- ...
.
Почему вообще возникает множество кейсов:
- разные источники (API, S3, базы данных и др)
- API бывают сильно разные, как минимум по типу возвращаемого результата (json, csv, excel)
- формат и структура хранения данных во внешних S3 разнятся (однажды смежная команда разработчиков сказала, что они не знают что такое parquet, поэтому выгрузка в csv)
.
Что хочется:
1. Процесс доставки данных до dwh должен быть унифицированным (это L в ETL), например, в dwh стандартно загружаются файлы parquet со сжатием gzip. То есть всё интеграции должны быть приведены к этому формату
2. Процесс извлечения данных должен быть сведён к разработке только специфических адаптеров, например, клиент для работы с API яметрики или API ВыгрузиМеняПозже будут разными, но отдавать в какое-то единое место будут только parquet со сжатием gzip.
3. Создание дагов должно быть автоматизировано (соблюдай принцип DRY)
4. Структура Дага должна быть унифицирована
5. Управление из единого места
6. Сохранять исходники данные
.
Задачка масштабная, решать всё и сразу не получится, поэтому действуем от кейсов, в моем кармане было 2 штуки:
- загрузка csv из внешнего API
- выгрузка из API (API отдаёт json)
.
В результате нескольких подходов вышла такая архитектура:
- yml всему голова ( в yml описываются всё необходимые части пайплайна, как без боли читать yml это отдельная серия)
- конечной точкой процесса является S3
- два слоя: raw (данные as is), ods (gzip.parquet, партицирование по дням в формате YYYY-MM-DD)
- структура пайплайна совпадает с классикой etl (каждая буковка это таска) :
E = extractor ( возвращает коллекцию объектов ExtractorResourse, может быть только один)
T = transformers ( принимает Resourse, преобразует и возвращает TransformResourse, трансформеров может сколько угодно)
L = S = saver (принимает TransformResourse, сохраняет, может быть только один) по факту он неизменен, тк выходной формат стандартизирован и сохранение на S3
.
Зерно в ваши головы закинул, остальное будет дальше😉
А пока расскажите:
- а как вам идея
- получится ли все автоматизировать
- и есть ли у вас примеры такой автоматизации/фреймворков
.
ps: у меня есть пример API с которым будут эксперименты, но можно в меня позакидывать что-то оригинальное😜
.
#framework #automate
👍1🔥1
​​WALLE

Заботливо дадим красивое имя фреймворку Walle, почему так:
- он строит фундамент DWH ( всё отгружаем на S3)
- построение фундамента - самая грязная работа (не считая подготовки данных от RAW до STAGE слоя)
- да, это просто красиво 😍 (не называть же его YetAnotherDagGenerator или FrameWork №Х)

К предыдущему посту вопросиков накидали:

А проблемы при этом возникают, что твоя автоматизация не умеет все, что надо, её надо дописывать, 
тестить, рефакторить, иногда не работает… и это ради 10 строчек кода?


1. автоматизация не умеет всё - справедливо, но всё и не надо. Применяем правило Парето:
Автоматизируем 80% рутинной работы и оставим место нетривиальным кейсам (чтобы инженеру было где поразмять извилины 🤯)
2. рефакторить - Отвечу вопросом на вопрос: а неавтоматизированные даги рефакторить не надо? Написал один раз и забыл, нет
я такого не видел. Даже с точки зрения устаревания какого-то процесса, при автоматизации удобнее: убрал yml в архив и всё.
В общем тут посыл такой: если есть любой код - ему нужен рефактор
3. иногда не работает - справедливо, сделаю отсылку к п2: любой код может не работать =). Также вижу пользу: если не работает то, для всего - нет специфических проблем.
Очень не нравиться копаться в чужом коде, разбирая, что же там сломалось (хех, можно заметить, что если автоматизацию писал не ты, то это тоже чудой код =))
4. тестить - пффф, какие тесты. *уяк - *уяк и в продакшен
Честно, очень бы хотелось, но пока тестирование только в dev среде или на персональном стенде.


Общий посыл автоматизации такой:

Нет стремления покрыть 100% кейсов, есть желание написать код, в котором в одних и тех местах вызываются коннекшены, они имеют 
одинаковые названия, код переиспользуется (для алертинга используется 1 и только 1 функция или стандартный набор, а не 8 различных фукнций 🤦‍♂️),
таски\даги - имеют одинаковый нейминг и одинаковую структуру и тд...


#walle #framework #automate
2👍2🔥1
​​WALLE - Готовим yml

Во вступительном посте оговорили, что фундамент фреймворка - yml конфиг.
Надо научиться читать его и формировать некую стркутуру для управления фреймворком.

Реализуем базу:
- все конфиги храним в директории metadata
- читаем директорию metadata
- данные из yml храним в переменной, например: integration_meta
- integration_meta подается на вход функции создания дага create_dag(integration_meta)

Какие минимальные данные необходимы для формирования дага:
- dag_id
- расписание (но это не точно)
- дата старта\окончания (мы же хотим управлять процессом)
- catchup
- куда алертить (помним, что хотим единый алертинг: в нашем случае будет телеграмм, поэтому нужен channel_id + токен)
- общая информация (owner\tags\denoscription)

С одной стороны конфиг должен знать что-то про нашу интеграцию, с другой у него должно быть что-то общее (про даг и про задачу),
попробуем выделять независимые секции в конфиге под эти задачи. Начнем с такого:

version: 2
models:
- name: reddit # имя интеграции, оно же dag_id
denoscription: Топ реддитов за последний час # описание интеграции, оно же dag_denoscription
dag:
# dag_id: "" # можно переопределить dag_id != name
schedule_interval: 0 * * * *
start_date: '2024-08-01'
# end_date: '2024-08-31'
catchup: False
alerting_chat_id: -987654321
alerting_secret_name: alerting_bot_token
owner: dwh
tags:
- dwh
- reddit
- api_integration


ps: по API будем забирать топ реддитов за последний час (данных там копейки, десятки строк, но нам не так важно кол-во)

Сохраняем наш первый yml 🥳
.

Учимся читать
.
Ничего необычного, Python библиотека pyyaml уже всё умеет.
От нас достаточно открыть файл, прочитать содержимое как текст и далее прогрузить через библиотеку pyyaml. Немного шаблонного кода:

import yaml

def read_yaml(path_to_file: str):
with open(path_to_file, "r") as f_yaml:
try:
content = yaml.safe_load(f_yaml)
return content
except yaml.YAMLError as e:
print(e)


Добавил обработку ошибок try\except - чуть-чуть защиты от сломанных файлов yml.

Проверяем работу реализованной функции (скрин в репе):


{'models': [{'dag': {'alerting_chat_id': -987654321,
'alerting_secret_name': 'alerting_bot_token',
'catchup': False,
'owner': 'dwh',
'schedule_interval': '0 * * * *',
'start_date': '2024-08-01',
'tags': ['dwh', 'reddit', 'api_integration']},
'denoscription': 'Топ реддитов за последний час',
'name': 'reddit'}],
'version': 2}


yaml.safe_load возвращает данные в виде python-словаря. Еще немного шаблонного кода для создания AirFlow дага и мы на финишной прямой:
- реализуем функцию create_dag, она должна возвращать объект DAG (from airflow import DAG)
- объект даг нужно положить в globals(), чтобы AirFlow смог узнать про наш даг
- пусть даг имеет следующую структуру:
- 1 таск группа
- таска start (EmptyOperator)
- таска end

Мои маленькие дата инженеры - это подлежит самостоятельной реализации 😉. Мою реализацию посмотреть тут
.
Ну и плюс, код который будет читать все конфиги из директории metadata (тут в помощь модуль из стандартной либы python os) и формировать даги. Дерзаем💪

У меня получилась такая структура папок:
- src:
- /common:
- func.py
- /metadata:
- reddit.yml
- /yaml_reader:
- reader.py
- walle.py


Загружаем в AirFlow, наслаждаемся картиной (она в скринах)

На этом самая простая часть закончилась🙃

#walle #framework #automate
❤‍🔥2🔥1👏1
Рабочие мемы:
Что и когда создано, вы говорите...

#work #meme
🤯2
​​🧯 По следам бременских рабочих кейсов или ещё раз про ClickHouse
.
Помните решали вопрос с курсорами, чтобы никого не упасть. Упёрлись в другой нюанс, таска выполняется настолько долго, что heartbeat 💓не может прилететь, airflow со спокойной душой шлёт вас fail_alert.
.
Тут было два варианта:
- сменить executor (был кубер)
- изменить интеграцию
.
Решено было идти вторым путём. Вариантов, если cdc интеграция невозможна, кот наплакал: к именованным курсорам уже перешли ( загрузка была стабильной, но долгой, ~2часов по самой большой сущности, да, сущность прибавила в весе х2, наречем её L).
.
Хотелки инженера:
- перейти к хранению снепшотов на s3 и соответственно загрузки из s3
- dbt
- единообразие
.
Из этого уже имеется: рабочий фреймворк для запуска dbt моделей -> единообразие, осталось складировать снепшоты на s3. Был стандартный путь: читаем табличку батчами, сохраняем в файлы на s3, но хотелось чего-то другого и простого (скажем честно, хотелось экспериментов 😎).
Так у нас же ClickHouse, подумал я, и почему бы не использовать его, как compute🧐
Входные данные:
- бек postgres
- табличная функция postgresql
- табличная функция s3 (можно не только читать но и писать)
.
Получается примерно так:
INSERT INTO s3('.../bucket/entity/date=2024-10-20/data.parquet')
SELECT * FROM postgresql(host, db, table, ...)

.
Очень просто и со вкусом, а что самое интересное работает это как молния Для сущности L время отработки запроса составляет ~15минут 🔥
.
Далее дело за малым: сделать вьюху над файлом, модельку dbt... 👉
.
Ну чем не кайфота. Такую интеграцию нельзя назвать полноценной ( тк имеем только снепшот на текущий момент, нет делитов, апдейтов и др), но если это закрывает потребность заказчика, то почему нет.
.
Будут у вас вопросики к такому подходу?
.
#work #cases #clickhouse
👍4🔥21
​​Превратности панд 🐼
.
Кейс из серии явное лучше неявного. Многие сталкиваются с задачей конвертации строковой даты в собственно дату.
.
Рассмотрим пример в пандах для файлика (в нем ничего кроме даты нет):
import pandas as pd
print(pd.version)

df = pd.read_csv("sample.csv")
df["entry_date"] = pd.to_datetime(df["entry_date"])

.
И тут нас ждут тонкости, связанные с версией pandas:
- если вы используете pandas < 2.0, то вероятнее всего код выполнится без ошибок, но ошибки собирать вы будите дальше и сколько времени уйдет на поиска: часы или дни 🤷‍♂️
.
А что вообще происходит: по дефлоту пандас пытается угадать определить формат даты и преобразовать в дату. Но как выяснилось до версии 2.0 пандас делает это специфически. Открываем наш файл и видим что даты записаны как:
12/01/2018 08:26

Пока сложно: это 1 декабря или 12 января??? Поищем другие цифры и находим:
13/12/2018 09:02

Ага, значит наш формат - %d/%m/%Y %H:%M - супер. А теперь вишенка на торте, смотрим как преобразовал пандас наши строки:
12/01/2018 08:26 -> 2018-12-01 08:26:00
13/12/2018 09:02 -> 2018-12-13 09:02:00

12 января стало 1 декабря, а 13 декабря осталось 13 декабря 🤦‍♂️
.
Версии пандас >= 2.0 кидают ошибку:
ValueError: time data "13/12/2018 09:02" doesn't match format "%m/%d/%Y %H:%M", at position 881

И сразу предлагают воспользоваться аргументом format, в котором требуется указать пандасу с каким форматом даты он имеет дело.
☝️Будьте бдительны при использовании различных инструментов и старайтесь явно прописывать все настройки, не полагаясь на "умноту" этих средств - рано или поздно стрельнет в ногу.

ps: Clickhouse справился с задачей из коробки, только 🤫

SELECT
entry_date,
parseDateTime64BestEffort(entry_date) AS entry_date_dt
FROM
s3('https://storage.yandexcloud.net/public-bucket-6/sandbox/sample.csv',
'CSVWithNames')
WHERE
entry_date IN ('12/01/2018 08:26', '13/12/2018 09:02');

entry_date |entry_date_dt |
----------------+-------------------+
12/01/2018 08:26|2018-01-12 05:26:00|
13/12/2018 09:02|2018-12-13 06:02:00|


#pandas #datetime
👍2🔥2
​​#post

👀 Data Overview
.
Поговорим о том какие инструменты есть в Clickhouse для обзора данных. Кейсы, возникающие в работе:
- на s3 сложили файлы, какая структура, какие типы данных
- хотите выгружать данные с бека (например, Postgres) и хочется узнать в какие типы превратить атрибуты или просто узнать о структуре таблицы (а например, локального доступа нет)
- узнать структуру существующей таблицы
.
Да, в Clickhouse есть аналог information_schema - это база system, таблиц там много, но нам интересны: tables, columns - тут всё как в PostgeSQL, разобраться довольно просто.
А можно такие же, только с перламутровыми пуговицами...
.
Конечно, их есть у меня:
1. DESCRIBE - команду можно использовать не только с существующими таблицами, но и с табличными функциями:

-- local table
DESCRIBE cdc.openweathermap_raw;

name |type |
----------------+-------------+
record_timestamp|DateTime64(6)|
record_value |String |

-- files on s3
describe s3('https://storage.yandexcloud.net/public-bucket-6/reddit/2024-10-26/top_subreddits_2024_10_26_03_00_00.parquet', '', '', 'Parquet');

name |type |
-----------------------+-------+
id |String |
noscript |String |
score |Int64 |
num_comments |Int64 |
author |String |
created_utc |String |
url |String |
upvote_ratio |Float64|
over_18 |UInt8 |
edited |UInt8 |
spoiler |UInt8 |
stickied |UInt8 |
subreddit_name_prefixed|String |

DESCRIBE postgresql('192.168.55.94:5432', 'tmp', 'superstore_v1', 'shpz', '12345', 'stage');

name |type |
--------------+-------------------------+
order_id |Nullable(String) |
order_date |Nullable(Date) |
ship_date |Nullable(Date) |
ship_mode |Nullable(String) |
customer_id |Nullable(String) |
custsomer_name|Nullable(String) |
segment |Nullable(String) |
country |Nullable(String) |
city |Nullable(String) |
state |Nullable(String) |
postal_code |Nullable(Int64) |
region |Nullable(String) |
product_id |Nullable(String) |
category |Nullable(String) |
sub_category |Nullable(String) |
product_name |Nullable(String) |
sales |Nullable(Decimal(38, 19))|
quantity |Nullable(Int64) |
discount |Nullable(Decimal(38, 19))|
profit |Nullable(Decimal(38, 19))|

... and so one. В одном из рабочих кейсов перешли от чтения паркет-файла, для формирования data_schema, к выводу DESCRIBE - 👌
.
2. SHOW CREATE - покажет вам DDL как он есть

SHOW CREATE cdc.openweathermap;

.
3. Изучить метаданные паркетов поможет указание ParquetMetadata вместо формата файла

SELECT * FROM s3('https://storage.yandexcloud.net/public-bucket-6/reddit/2024-10-26/top_subreddits_2024_10_26_03_00_00.parquet', '', '', 'ParquetMetadata')
FORMAT Vertical;
.
num_columns: 13
num_rows: 32
num_row_groups: 1
format_version: 2.6
metadata_size: 2513
total_uncompressed_size: 6182
total_compressed_size: 4068
columns: [('id','id',1,0,'BYTE_ARRAY','String','GZIP',435,254,'41.61%',['PLAIN','RLE','RLE_DICTIONARY']),('noscript','noscript',1,0,'BYTE_ARRAY','String','GZIP',1765,1191,'32.52%',['PLAIN','RLE','RLE_DICTIONARY']),...]
row_groups: [(13,32,6182,4068,[('id','id',254,435,true,(32,0,NULL,'1gcanab','1gcbktk')),...]


.
ps: clickhouse не перестаёт меня удивлять, есть ощущение, что при правильном рецепте это пушка-бомба 💣

#clickhouse #lifehacks
🔥51👍1
​​МАТЕМАРКЕТИНГ - 2024

Запоздалый пост о конфе матемаркетинг-2024. На фото наша банда на стенде

Постояли на стенде Побывали на конференции, впечатления:
- масштабно
- с шиком\блеском
- с Себрантом
- шумно

Инженерная секция пока представлена слабо, согласно программе все рассказывали о том как
построить DataMesh. Не стало исключением и выступление нашего Head of DWH, рассказал:
- о том какие проблемы были и как решали
- как делили апельсин DWH
- где чья ответственность

Интересно было послушать, как это выглядит со стороны, когда сам являешься участником данных перемен🙃
Судя по кол-ву вопросов из зала - выступление получилось топ и заинтересовало многих.

Из интересных докладов можно отметить Рому Бунина о главном качестве аналитика. Это было не просто интересно, это было
визуально приятно, Рома в своем стиле с графиками, динамическим оформлением презы и даже вставками видео рассказал о чем не стоит
забывать аналитика (кажется, не только им):
- харды не главное
- чем выше ваш уровень (middle -> senior) тем важнее для вас софты

Посмотреть на фото Ромы тут

Кажется, это старо как мир: учитесь общаться, договариваться, задавать вопросы, говорить нет,
критически мыслить и ....


#matemarketing
🔥6🏆2
WALLE - Выстраиваем структуру

В прошлом посте собрали фундамент:
- сформировали yml
- научились его читать
- научились создавать даг из yml метадаты

Пока в даге только пустые таски (start\end), сейчас будем исправлять это. Во вступительном посте
очертили архитектуру дага, она повторяет процессы E(xtract)T(ransform)L(Save).
Таски transform, save предлагаю объеднить, тк transformer отдает какие-то данные, а saver их сохраняет, то есть таски обмениваются данные, а делать это через
XComm неблагодарное дело, поэтому наши даги будут состоять из двух тасок (минимум):
- extractor
- transformer_and_saver


Исходные данные (например, выгрузка API или файл на S3) в единственном числе (в смысле, что данные ASIS могут быть только одни) поэтому
extractor всегда 1, но может возвращать несколько объектов (например, N путей до файлов)

А вот обработать данные мы уже можем несколькими способами, поэтому transformers может быть несколько, а saver всегда идет в комплекте к transformer.

Для описания тасок выделим секцию tasks в нашем yml:

version: 2
models:
- name: mock # имя интеграции, оно же dag_id
denoscription: Топ реддитов за последний час # описание интеграции, оно же dag_denoscription
dag:
# dag_id: "" # можно переопределить dag_id != name
schedule_interval: 0 * * * *
start_date: '2024-08-01'
# end_date: '2024-08-31'
catchup: False
owner: dwh
tags:
- mock
- api_integration
tasks:
extractor:
MockExtractor:
transformers:
- MockTransformer:
saver:
MockSaver:
alerting_chat_id: -987654321
alerting_secret_name: alerting_bot_token


Тут важно проверить, что ошибок в структуре нет и yaml_reader успешно читает такой конфиг (самостоятельно).

Структуру описали, теперь разбираемся что же это за MockExtractor, MockTransformer и MockSaver🤔. А этих товарищей нужно реализовать: то есть
это некие Python-классы, которые реализуют некоторый базовый интерфейс. На текущий момент мы знаем, что extractor что-то передает transformer, этот
в свою очередь передаёт уже данные в saver. Условимся называть то чем обмениваются классы ресурсом (Resource). Итого у нас будет 3 ресурса:
- ExtractorResource
- TransformerResource
- SaverResource

Ресурс - это объект Python, будем использовать датаклассы (dataclasses), но до них доберемся чуть позже. А сейчас про функции каждого объекта:
- Extractor (выгружает из API и складывает на S3 (это я называю RAW-слоем = данные ASIS) и отдает далее пути до файлов
или ищет наличие файлов на S3 или FTP и возвращает пути к нужным файлам)
- Transformer (читаем данные по полученным путям, перекодирует согласно нашей логике и отдаёт saver-у набор байтов для сохранения)
- Saver (просто сохраняет байтики на S3 в нужном нам формате\партицировании - это я называю ODS-слой)

Интерфейс у нас будет единообразный, поэтому опишем какие методы должны быть реализованы у каждого класса:
- Extractor - должен иметь метод get_resources - возвращает генератор объетов ExtractResource
- Transformer - должен иметь метод transform, который принимает ExtractResource, возвращает объект TransformResource
- Saver - имеет 1 метод save, который принимает TransformResource и возвращает SaveResource.

Все остальные внутренности каждого класса разрабатываются на усмотрение инженера - творческий процесс однако 😎.

Расчехляем Pycharm и кодируем, ресурсы:

@dataclasses.dataclass
class ExtractorResource:
path: str


@dataclasses.dataclass
class TransformerResource:
path: str
content: io.BytesIO


@dataclasses.dataclass
class SaverResource:
path: str


Для проверки идеи и работоспособности дага создадим Mock классы, для примера MockExtractor:
🔥4
​​
from models import ExtractorResource


class MockExtractor:
def init(self, integration_metadata: dict):
self.integration_metadata = integration_metadata

def get_resources(self):
for idx in range(5):
yield ExtractorResource(path=f'mock_s3_file_{idx}.csv')

ps: можно заматить, экстрактор реализован как генератор (yield) - в конкретном случае не так важно, тк экстрактор не отдает сами данные, а только ссылки (путь на S3 или url и тд)

Остальные классы, реализуем самостоятельно или заглядываем в репу

Каждый из классов имеет единственный аргумент = содержимое yml конфига и ничего более (но это не точно 😉), то есть все необходимые параметры должны быть описаны
в теле самого yml в нужной секции, например, у меня получилось так для экстрактора:

    tasks:
extractor:
MockExtractor:
src_s3_conection_id: reddit_s3_connection_id
src_s3_bucket: raw-public
src_s3_prefix_template: reddit/{dm_date}
src_s3_partition_fmt: '%Y-%m-%d'


Осталось встроить наши классы в даг, используем TaskFlow API, пример для экстратора:

@task
def _extractor(extractor: t.Callable) -> t.List[str]:
extractor_obj = extractor(
intergation_metadata=intergation_metadata
)

return [resource.dict for resource in extractor_obj.get_resources()]

# извлекаем общую структуру тасок
tasks_meta = intergation_metadata.get("tasks", {})

# извлекаем extractor
extractor_name, extractor_params = list(tasks_meta.get("extractor").items())[0]

if not extractor_params:
extractor_params = {}

logging.info([extractor_name, extractor_params])
extractor = globals()[extractor_name]

# возвращаем список объектов
ext_resources = extractor.override(task_id=f"extractor{extractor_name}")(
extractor=extractor)

Комментарии:
- декораторная таска принимает только саму фукнцию или класс в нашем случае
- имя экстратора получаем из ямла
- все экстраторы импортируются from extractors import *
- из словаря globals() получаем объект нужного экстрактора по имени из ямла
- если параметров экстратора нет, то подставляют пустой словарь
- resource.dict - нужно, тк XComm не знает как сериализовать нашу модель ресурса, поэтому воспользуемся атрибутом dict. Соответственно внутри таски transform_and_save обратно создадим ресурс

Логика для трансформера и saver сохраняется, добавляется только обработка ситуации,
когда трансформер сохраняет сам объекты и отдает только пути (пустой TransformerResource.content). Как показала практика: такое нередко
встречается.

Остальное обдумываем сами или заглядываем в репу.

И после загрузки в AirFlow получаем красоту в UI - repo

#walle
#framework
#automate
🔥4👍1
Пост Новогоднего настроения 😎
.
Нравиться ли вам, когда ПО меняет иконки или добавляет новогодних эффектов.
Примеры на скрине:
- dbeaver
- confluence
- polaris (управление умной бытовой техникой)
.
У меня вызывает это детский восторг 😛 + 100500 к силе 💪, приятно что пользовательский опыт становиться положительным.
А как вам такие новогодние изменения? Если есть еще примеры кидайте в комменты...

#ny #2025
🔥2🎄2
Последний рабочий день уходящего года прошёл под девизом песни БГ Этот поезд в огне 🚂
.
А как прошёл ваш день, перетащили всё задачки в Done😉

#work #meme #ny