Where is data, Lebowski – Telegram
Where is data, Lebowski
237 subscribers
83 photos
2 videos
83 links
Канал про разное в data-мире:
- от библиотек визуализации до data egineering
- от графиков до элементов разработки
- от .csv до API
Download Telegram
🦆Утки или 🐍 змеи ?
.
Мир данных изменяется очень быстро, не успел приготовить утку по-duckdb-овски, на повестке дня уже новый схожий инструмент и в твоём любимом питончике - chdb (от создателей Clickhouse).
.
Инструменты уже успели сравнить - тут 🤔
.
Парочку статей для ознакомления:
🦆 Переводная статья о duckdb
🐍 О внутреннем устройстве
🐍 Про chdb от Антона Жиянова (человек, который развивает и много разрабатывает для SQLite)
.
А вы что-то трогали, использовали?
ps: будет серия заметок про duckdb (построим небольшой пайплайн)
#duckdb #chdb
🔥2👍1
⚡️Clickhouse-notes - Оператор EXCHANGE
.
Довольно занятный оператор, с ним познакомил меня коллега. Когда я поэкспериментировал с ним, то был приятно удивлен простоте использования и тем возможностям которые он даёт. Синтаксис маскимально прост:
EXCHANGE TABLES stg.exchange_table AND odm.table;

Для предыдущего поста, использовал его, чтобы сменить ORDER BY для таблицы cdc.openweathermap_raw 😉
.
Как следует из названия, оператор что-то где-то обменивает♻️, а именно:
- атомарно обменивает имена двух таблиц или словарей (читай обмениваются ссылки на данные)
.
Какие +:
🎈 можно атомарно подменять данные разных таблиц, партиций (настолько быстро, насколько возможно и для пользователя downtime -> 0️⃣). Как обычно проиходит батчевая загрузка: загружаем батч в stg, удаляем из целевой таблицы, перегружаем из stg в target. Всё бы ничего, но такой паттерн не для Clickhouse, операции update\delete для него тяжелые и лучше бы обойтись без них. EXCHANGE даёт такую возможность: загрузили всю таблицу (партицию) в stg , обменяли c target и DROP или TRUNCATE. Да, есть минус: необходимо место для хранения дубликата всей таблицы (если перегрузка происходит полностью) - но порой это оправданные затраты
🎈 можно менять DDL таблицы фактически без downtime (ведь если меняются имена таблиц, то можно поменять и их определение). Например, выставили вы неправильную секцию PARTITION BY или ORDER BY или в целом захотели их изменить или начали использовать кодеки для сжатия (кстати, тоже интересная тема, говорят, что узким местом в любом случае является чтение с диска, а разжатие данных происходит очень быстро, поэтому важнее сжать, чтобы меньше читать с диска ☝️).
Паттерн такой: создаете рядом таблицу с нужным DDL и обмениваете их -> старая таблица имеет новый DDL (новые данные сортируются\сжимаются\партицируются по-новому), старые данные нужно только перелить. Или пока данные льются в подменную таблицу, модифируем старый DDL (добавляем сжатия, меняем партиционирование ....) -> обмениваем обратно, новый кусок данных в подменной таблице заливаем.
.
💪На практике так меняли движки таблиц (для стрима аппметрики), добавляли сжатие
.
#clickhouse #exchange
🔥3👍1
⚡️Clickhouse-notes - MATERIALIZED VIEW
.
Для поста про ORDER BY была создана github репа, в ней есть пайплайн который каждый 5 минут получает погодные данные по API и складывает их в raw таблицу.
Глянуть схему 👀
.
MATERIALIZED VIEW (MV) - можно рассматривать как триггер, реагирующий на вставку данных (в отличие от триггеров в реляционках, реагирует на блок вставляемых данных, а не на отдельную строку ☝️). В Clickhouse MV отделено от целевой таблицы, поэтому для себя MV представляю как процесс, запускающийся для вставляемых данных и загружающий их в целевую таблицу.
.
При помощи MV можно решать ряд задач:
▪️парсинг сырых данных
▪️фильтрация сырья
▪️роутинг данных
▪️агрегация данных
.
На примере решим первую задачку - парсинг сырья. В нашем пайплайне исходные данные сохраняются в таблицу cdc.openweathermap_raw:
CREATE TABLE cdc.openweathermap_raw
(
record_timestamp DateTime64(6),
record_value String
)
ENGINE=MergeTree
ORDER BY (record_timestamp)

где record_value - json объект с погодой, который получаем из api. Создадим MV, который будет парсить json и сохранять в таблицу cdc.openweathermap. Посмотреть пример json можно на сайте.
Пишем DDL целевой таблицы:
CREATE OR REPLACE TABLE cdc.openweathermap
(
record_timestamp DateTime64,
dt DateTime,
main String,
denoscription String,
temp Float64,
feels_like Float64,
pressure Float64,
humidity Float64
)
ENGINE=MergeTree
PARTITION BY toStartOfWeek(record_timestamp)
ORDER BY (main, denoscription, record_timestamp)

Вытаскиваем поля: время (оно в UTC), общее описание погоды, детальное описание, температура, ощущаемая температура, давление (ГПаскаль), влажность.
Именно MV и будет парсить и перекладывать данные из сырья в cdc.openweathermap. Напишем запрос, который парсит данные:
SELECT record_timestamp ,
FROM_UNIXTIME(toUInt64(JSONExtractString(record_value, 'dt'))) AS dt,
JSONExtractString(arrayElement(JSONExtractArrayRaw(record_value, 'weather'), 1), 'main') AS main,
JSONExtractString(arrayElement(JSONExtractArrayRaw(record_value, 'weather'), 1), 'denoscription') AS denoscription,
JSONExtractFloat(JSONExtractString(record_value, 'main'), 'temp') AS temp ,
JSONExtractFloat(JSONExtractString(record_value, 'main'), 'feels_like') AS feels_like,
0.750062 * JSONExtractUInt(JSONExtractString(record_value, 'main'), 'pressure') AS pressure,
JSONExtractUInt(JSONExtractString(record_value, 'main'), 'humidity') AS humidity
FROM cdc.openweathermap_raw

Чтобы превратить тыкву в золушку необходимо добавить код создания MV: по классике что создаем, где создаем и для для MV куда пишем данные (этот кусочек ниже):
CREATE MATERIALIZED VIEW cdc.openweathermap_mv
TO cdc.openweathermap
...
🔥1
Полный код найдете в репозитории
.
Наш процесс парсинга готов - теперь только НОВЫЕ поступающие данные будут последовательно проходить стадии: вставка в raw, выполнение MV, вставка в целевую таблицу (сам процесс происходит автоматически в фоне).
Чтобы и старые данные прошли через MV нужно дополнительно присесть:
- можно через обмен таблицами
- можно до создания MV выполнить соответствующий инсерт в целевую таблицу, после создать MV
.
Это не очень удобно, но очень гибко: на практике так парсятся логи аппметрики (в Q1 хотим роутить данные тем же механизмом) + делал кейс, когда N топиков кафки парсятся в одну таблицу событий (просто все MV смотрят в одну целевую + структуру единая).
.
В теме MV существуют проекции (projection) - аналогичный механизм, только процесс создания упрощен и менее гибок, но содержит большой +, все данные имеющиеся в исходной таблице будут пропущены через проекцию (об этом я с ужасом узнал на продовом кластере😱).
Естественно тестить надо на чем-то большом и мощном, выбрал аппметрику (сотни млн событий ежедневно, лог за пару последних лет). В рамках одной из задач требуется оценка кол-ва данных за день, создал проекцию (group by date) и думаю, только новые данные будут обрабатываться. Каково же было моё удивление, когда в результате запрос я обнаружил результаты всей истории (стоит отметить, что получились они за пару секунд🚀, в то время как обычный запрос агрегации будет работать десятки минут, если не упадет по памяти).
.
Для глубокого погружения рекомендую отличное видео 🎥
#clickhouse #materializedview #mv #view
🔥2👍1
♻️ Апдейт к посту про ORDER BY
.
В репе используется Python pipeline для обращения к API (решили и решили). Но в ClickHouse есть табличная функция url, используя которую можно описать альтернативный способ загрузки данных.
.
Например, вот так получаем ответ от API:
SELECT * 
FROM url('https://api.openweathermap.org/data/2.5/weather?lat=55.567586&lon=38.225004&appid=YOUR_APPID&units=metric',
JSONAsString, 'record_value String')

.
Добавляем дополнительное поле с record_timestamp, синтаксис INSERT
и получаем готовый код загрузки сырья из API
Теперь достаточно по расписанию выполнять данный скрипт☝️
.
ps: в работе стараюсь искать наиболее нативный способ (с использованием встроенного функционала) решения задачи - а как вы предпочитаете:
✈️ строить самолет
🎧 или использовать минимум средств

.
#clickhouse #url #alternative
👍2
Никогда не задумывались Как слоники 🐘 превратились в сливу🍈 🤔
.
#meme
👍1😁1
​​⚡️Clickhouse-notes MATERIALIZED VIEW - 2
.
Развиваем наши знания по матвьюхам:
- в первых сериях мы настроили парсинг данных (из "сырого" json выбираем нужные поля и складываем в отдельную таблицу cdc.openweathermap)
.
Теперь собираем дневную статистику:
- мин\макс\сред для температуры, давления, влажности + кол-во записей.
Решаем задачу аналогичным путем: используем materialized view, только теперь будут нюансы:
- приемной таблицы не будет (но это не точно 😉), то есть нет секции TO ....
- определим параметры MV при её создании (Engine, PARTITION BY, ....)
- используем ключевое слово POPULATE для применения MV ко всем существующим данным ( ☝️будьте аккуратны с ним, если в вашей таблице 100500млн строк - может быть ресурсозатратно).
.
Составляем первую часть запроса - создание и параметры MV:
CREATE MATERIALIZED VIEW odm.openweathermap_daily_stats_mv
ENGINE = SummingMergeTree
PARTITION BY toStartOfWeek(dated_at)
ORDER BY (dated_at)
POPULATE
AS
...

.
🔹ENGINE = SummingMergeTree - специальный движок для агрегаций (count\max\min\avg)
На самом деле существует только один общий движок AggregatingMergeTree - это мы увидим в DDL который сохранится в базе, просто его синтаксис чуть сложнее и для простый агрегаций есть такой алиас SummingMergeTree
🔹PARTITION BY toStartOfWeek(dated_at) - партиционирование MV по неделям
🔹POPULATE - данных в таблице не так много -> применим MV ко всем существующим данным
.
Вторая часть DDL MV это запрос SELECT с нужной нам агрегацией данных, например, он мог бы быть таким:
SELECT toDate(record_timestamp) AS dated_at,
count(1) AS record_counts,
min(temp) AS min_temp,
max(temp) AS max_temp,
avg(temp) AS avg_temp,
...
FROM cdc.openweathermap o
GROUP BY dated_at

.
Но так получится сделать только для завершенных дней, но для дней данные по которым приходят наш пайплайн должен обновлять статистику, чтобы этого достичь необходимо хранить промежуточные данные (например, для вычисления среднего необходимо хранить сумму и кол-во значений) - такое состояние называется State и определяется как: фукнция агрегации + постфикс State, например, avg -> avgState. Данные состояния и хранятся в базе, а в момент запроса пользователя необходимо рассчитать конечный результат ("смержить"), для этого используется постфикс Merge:
avg -> avgState -> avgMerge. Merge функция применяется уже к полям MV (или таблицы, в которую складывается результат). Итого запрос DDL для MV выглядит так:
CREATE MATERIALIZED VIEW odm.openweathermap_daily_stats_mv
ENGINE = SummingMergeTree
PARTITION BY toStartOfWeek(dated_at)
ORDER BY (dated_at)
POPULATE
AS
SELECT toDate(record_timestamp) AS dated_at,
countState(1) AS record_counts,
minState(temp) AS min_temp,
maxState(temp) AS max_temp,
avgState(temp) AS avg_temp,
...
FROM cdc.openweathermap o
GROUP BY dated_at

.
Для получения данных запрос выглядит так:
- запрашиваем данные из MV
- выполняем теже агрегации
- постфикс Merge

SELECT dated_at,
countMerge(record_counts) AS record_counts,
minMerge(min_temp) AS min_temp,
maxMerge(max_temp) AS max_temp,
avgMerge(avg_temp) AS avg_temp,
...
FROM odm.openweathermap_daily_stats_mv
GROUP BY dated_at


И вишенка на торте 🍒, чтобы скрыть от пользователя данную логику, необходимо обернуть Merge запрос в VIEW, например, odm.openweathermap_daily_stats_v. Полный код найдете в репо.

🧐 А где же хранятся промежуточные данные, вы могли бы спросить? MV не хранит данные, при таком DDL под капотом создаётся внутренняя таблицы, в нашем случае .inner_id.5bacf24b-9e5a-4874-95c4-5ad93eaacbde, в ней и лежат предагрегаты (скрины в репо и в комментах).

#clickhouse #mv
🔥1
​​ O - Optimization
.
В далеком посте , который рассказывал про оптимизацию с помощью условий WHERE
всё уже было сказано и вот второй раз наступил на грабли (пока еще нет, но мог бы 🔨).
.
Также недавно прошел курс по GreenPlum, конспект можно найти тут. В числе прочего была рассмотрена тема оптимизации, где отмечено:
Признаки неоптимального SQL-запроса:
- лишние сортировки
- distinct
- union вместо union all
- Условия join, содержащие неравенства или OR (тк будет использован Nested Loop )
- Неоптимальные фильтры по ключу партицирования (явное использование ключа без преобразований)
- Неоптимальные фильтры по полям, входящим в индексы:
-В Greenplum версии 6 индексы B-tree, построенные на значениях функции от значений полей, используются в запросах с фильтрами только при построении плана Postgres.
-При построении плана запроса GPORCA использование в фильтрах любых функций от полей, по которым построены индексы на таблицу, исключает использование индексов и приводит к неоптимальному выполнению запроса.

.
Тк много работаю с Clickhouse, я думал он не такой, он же быстрый🚀, но не тут-то было. В работе использую планировщик explain у него есть ряд опций, например:
- estimate показывает сколько будет прочитано строк, засечек индекса и тд

В одном из запросов была такая конструкция:

WHERE datetime >= '2024-01-15' and datetime < toDate('2024-01-15') + INTERVAL '1 day'

.
Кажется, слишком много букв, переписал так:

WHERE toDate(datetime) = '2024-01-15'

И запрос начал выполняться чуть дольше, стоит отметить что таблица партицинирована по месяцам.
Вывод explain estimate для первого случая rows = 24064519

Для второго rows = 312864901 (будет прочитана вся таблица 😱)

Итог: используйте планировщик, много интересного он может рассказать - ClickHouse Explain

#optimization #clickhouse
🔥2
​​📔 Executable Jupyter Notebooks
.
Не редко (особенно в аналитических целях) разработка ведется в Jupyter Notebooks и несколько
больно переносить наработки в py-модули, в хотелось бы запустить что-то: ./my-notebook.ipynb
или run my-notebook.ipyb. Например, в Databricks так и сделали, notebook является единицей job\task и исполняется под капотом.
.
Может и нам так можно🧐
.
Возможностей не так много, но они есть:
- сохранить jupyter как скрипт .py . При таком сохранении часть кода специфичная для notebooks может быть не выполнена, например
display или отдельно стоящий вывод: df.tail()
- библиотека papermill - вот оно наше сокровище, позволяет запускть jupyter notebooks, а также их параметризовать, ну все как мы любим
.
Для создания параметризированного notebook необходимо:
- указать ячейку с параметрами, например, в ячейке может быть код:
# external params
period = "day"
output_file = "tmp.txt"


- указать, что ячейка = ячейка с параметрами: необходимо добавить cell tag = parameters к той ячейке, где располагаются переменные-параметры. papermill парсит их и устанавливает переданные вами значения. Где это находится смотри на скрине в комментариях или в репо.
- описать обработку параметров в коде (необязательно =))
- запустить jupyter notebook

papermill extract.ipynb -p period "week" -p output_file "weather-forecast.json" executed.ipynb --log-output --log-level INFO

.
Все артефакты найдете в репо

#papermill #jupyter #runasnoscript
🔥2
​​☝️ Последнее слово (но это не точно)
.
В предыдущих сериях по clickhouse-notes мы создали:
- таблицу сырья
- таблицу распарсенных данных и матвью, её наполняющую
- матвью с подневной статистикой и вьюху над ней
.
Дополним сет, данными последней точки: последние полученные данные. Например, нам пригодится для отображения
на дашборде последних значений.
.
Варианты есть различные, мы поступим так:
- создаём таблицу с движком ReplacingMergeTree, odm.openweathermap_last_point - чтобы хранить только последнюю запись (но не всё так просто)
- создаем MV для поиска этой последней записи, odm.openweathermap_last_point_weather_mv
.
Структура таблицы:
CREATE TABLE odm.openweathermap_last_point
(
last_ts DateTime,
temp Float64,
pressure Float64,
humidity Float64,
metric String
)
ENGINE = ReplacingMergeTree(last_ts)
ORDER BY metric
SETTINGS index_granularity = 8192

- ReplacingMergeTree(last_ts) - оставляем только последнюю запись
- ORDER BY metric - синтетическое поле, по его совпадению и "схлопываются" дубли
.
Полный код таблицы и MV в репе. А пока посмотрим на возможные артефакты:
- от СClickhouse у нас есть гарантия 100% схлопывания дублей
- это может произойти в любой момент времени (хоть сразу, после вставки дубля, хоть через полдня)
.
Что в такой ситуации делать:
- нативный метод - схлопнуть дубли в момент запроса, используя ключевое слово FINAL
SELECT *
FROM odm.openweathermap_last_point
FINAL


FINAL - использование может быть очень накладно, если нужно схлопнуть довольно большой объем данных (не наш случай, поэтому мы им и воспользуемся). Чтобы скрыть от пользователя особенности технической реализации необходимо создать, например, odm.openweathermap_last_point_weather_v
.
Другие способы найдете в репе (сортировка и выборка 1 строки, использование argMax).

🫶 Скоро зафиналим какой-нибудь клевый дашборд

#clickhouse #mv #replacingmergetree
👍2🔥2
Пятничное радостное
.
Вот такой получился рефакторинг😍

#airflow #dag #рефакторинг
🔥3👍2🏆2🦄1
​​🔊 Митап, митап, митап
.
С частью команды DWH, во-первых встретились, во-вторых, встречу провели с пользой: сходили на митап по Greenplum от ЯCloud.
.
1️⃣ В первой части Дмитрий Немчин из Tinkoff поведал тайны обслуживания GP. Презантация была скорее обзорной, тк интереснее было послушать про то как устроены их небольшие фреймворки по обслуживанию, а названия у них очень забавные:
- отстрел плохишей по различным правилам - Raskolnokov
- управление вьюхами и ролевой моделью gpACL, gpViews
- управление жизненным циклом данных (удаление, охлаждение, партиционирование) Guillotine
- мониторинг запросов, очередей, нагрузок - Inquisitor
.
Понравился процесс работы с песочницей: данные в ней живут 21 день, после дропаются без разговоров - очень хочется внедрить такой процесс у нас, так sandbox - маленькое болотце. Есть и другой подход, который базируется не на времени, а на пропорции между кол-вом запросов в песочницу и в прод, если доля песочницы первыщает то идем разбираться.
.
2️⃣ Во второй части, Кирилл рассказал о расширении Yezzey - позиционируется как охлаждение данных в s3, для организации гибридного хранения. Но по результатам тестом выглядит так, будто можно все данных хранить в s3, а мощности GP только для Compute использовать, ну и диски будут только для spill файлов😁
Эта часть нас и заинтересовала, тк есть большая потребность в охлаждении данных с последующим их использованием.
.
Ну и конечно, не обошлось без технических шуток. Номинация приз вечера получила шутка: Вы всё еще используете HEAP таблицы 😉

#gp #greenplum #охлаждение #meetup
👍3🔥2
Это заслуживает 5 звёзд
.
Когда приходится использовать нетривиальное (или наоброт тривиальное)
определение JSON или неJSON, наподобие такого:


CASE WHEN (json_answer ->> 'answer') LIKE '%{%' THEN json_object_keys((json_answer ->> 'answer')::json)
ELSE Null END

.
То ожидаешь какого-то интересного кейса, но такого подвоха не ожидал. Одна строка ломает всё:

1 строка, 1 строка из 2млн, Карл


#de #work #буднидатаинженера
😢1
​​Как много мы пишем boilerplate code🤨
.
Стоит сказать, что иногда и прикольно его пописать😉 Но в рабочем потоке задач, хочется, чтобы рутинные
действия были атоматизированы, а особенно создание нового дага, проекта или чего-то подобного.
.
На помощь придет библиотека cookiecutter, наконец-то до неё добрался. И снова здесь магия шаблонизатора jinja2 😍
.
Рекомендую ознакомиться с jinja2, мне отлично помог краткий курс, тк шаблонизация встречается всё чаще и чаще:
- шаблоны при веб-разработке (Flask\Django)
- dbt = SQL over jinja2
- cookiecutter - печеньки 🍪, обернутые в jinja2
.
Напиши, где ты еще встречал шаблонизацию при помощи jinja2👀
.
Процесс создания шаблона выглядит очень просто:
- описываем структуру
- добавляем необходимые файлы
- внутри файлов можно указать шаблонный код и подстановку нужные значений при вводе
.
Официальной доки обычно хватает, но мне потребовалась статья, в ней показано дерево папок при создании шаблона.
.
Пройдя по этому пути, у меня получился простой шаблон для создания дага (но как водиться, он решает 80%), также поэкспериментировал с шаблоном для DA ресерча. Найти примеры тут
.
На просторах github существует большое разнообразие шаблонов под различные нужды, мне приглянулись:
- Python Best Practices
- pyscaffoldext-dsproject
.
#template #cookiecutter #jinja2
1
​​Распараллель меня, если сможешь
.
TaskFlow и динамический маппинг тасок в AirFlow. Эти две возможности
также открыли для меня коллеги. Стоит отметить, что программирование дагов для меня
неPythonic way - странный синтаксис, странная логика, как будто неPython внутри Python🤷‍♂️
.
Две вышеобозначенные возможности не добавляют питонячности, хотя и можно найти какие-то сходства с
функциональным программированием.
.
TaskFlow - набор удобных декораторов для создания дагов, тасок, как Python-функций. Теперь, вместо:

def etl():
pass

with Dag(...) as dag:
etl = Python(task_id="etl", python_callable=etl, ...., da=dag)


Можно написать:

@dag(....)
def create_dag():

@task
def etl():
pass

create_dag()

.
Красивый код получается, когда используются подходы в чистом виде: или классический или декораторы, когда классический стиль и TaskFlow смешиваются получается каша 🥣. TaskFlow даёт одно очевидное преимущество: TaskFlow сам заботиться о перемещении данных между input\output tasks, как если бы вы использовали обычные функции - это очень круто🤘 Пример ниже:

@task
def get_data_from_api() -> List[]:
...
return data

api_data = get_data_from_api()

@task
def etl(data: List[]) -> None:
# transfrom and save data
...

etl = etl(data=api_data)

.
Второй интересный паттерн: динамический маппинг тасок. Сегодня у тебя 1 хост, а завтра 7. Сегодня нужно 3 таски за разные дни, а завтра 8. Чтобы на зависеть от неизвестных входящих параметров следует брать на заметку динамический маппинг. Реализуется он довольно просто: у каждого оператора (таска) есть методы, например, partial, expand, что это нам дает:

1️⃣ Генерим несколько дат и выполняем etl за каждую

@task
def get_dates() -> List[str]:
return ['2024-02-01', '2024-02-04', '2024-02-03']

@task
def etl(dated_at: str) -> None:
# some code, which required from dated_at

dates = get_dates()

# метод expand - позволяет вызвать экземпляр таски для каждого элемента списка
etl = etl.expand(dated_at=dates)


2️⃣ На s3 N файлов, нужно однотипно все обработать или просто загрузить

@task
def find_s3_files() -> List[str]:
return [{'filepath': 's3://file_0.parquet'},
{'filepath': 's3://file_1.parquet'},
{'filepath': 's3://file_2.parquet'}]

files = find_s3_files()

# используем partial для задания аргументов одинаковых для каждой таски
# выполняем загрузку каждого файла отдельной таской
load = ClickhouseOperator.partial(
task_id="load",
connection_id="conn_id",
sql="""INSERT INTO stg.table SELECT * FROM s3('{{ params.filepath }})'""",
).expand(params=files)

.
Ранее такое приходилось реализовывать через циклы, а начиная с версии 2.3 у нас есть удобная возможность - используйте💪
.
Самое интересное, что маппить можно не только 1 -> N, но и Nin -> Nout, то есть рельтута одного маппинга подавать на вход другого и работать это будет именно так как ожидается: не сначала N первых тасок, а после N последующих. Таски свяжутся в последовательные кусочки: N1 -> N1, N2 -> N2 и тд 🔥

Набросал небольшой пример, как можно работать с API при помощи динамического маппинга - покрутите, экспериментируйте.

🔗 Links:
- TaskFlow
- Dynamic Task Mapping
- Create dynamic Airflow tasks (astronomer)

#airflow #taskflow #mapping
🔥4👍1
🎨 Сам себе Кандинский
.
Финалим серию постов про Clickhouse и наши разборки с погодой.
.
📒Рецепт приготовления:
- данные о погоде (вьюха со среднедневными данными + last_point)
- Metabase BI
.
Естественно, пришлось поработать напильником 🪚, тк Metabase из коробки не умеет ходить в Clickhouse, но
сообщество не дремлет и выкатили версию с его поддержкой.
.
Ссылку на версию Metabase + Clickhouse и внешний вид дашборда найдете в репе (или ниже).
.
🔗 Links:
- Metabase + clickhouse-driver
- Run Metabase in docker
- Get started in Metabase

Визуализация дело тонкое, проэтому если тебе нужно прокачать этот навык, то вот рекомендации:
- Бесплатный курс от DataYoga (Рома Бунин в создателях)
- Визуализация данных и введение в BI-инструменты от ЯПрактикум (Рома Бунин + Александр Богачёв - убийственное комбо) или можно почитать книгу Богачёва Графики, которые убеждают всех
.
#bi #clickhouse #dash #dashboard
👍4
​​💪 Прокачай себя до уровня PRO или полезные лайфхаки
.
Воды не будет, только те, конструкции, которые сам нашел и часто использую в Python+Pandas:
1️⃣ Pandas метод assign
Когда нужно создать столбцы на лету:

import pandas as pd


df.assign(column_one = lambda row: row["existing_column"] // 1024,
column_two = 24,
....)


2️⃣ Pandas метод pipe
Как apply, только для всего датафрейма и принимает аргументы функции:

import numpy as np
import pandas as pd


def calculate(df: pd.DataFrame, thr: float = 2.56, columns: list = []):
for col in columns:
df[f"{col}_transformed"] = np.sqrt(df[col]) > 2.56
return df

df.pipe(calculate, thr=5, columns=["value_1", "value_2"])


3️⃣ Структуры данных из collections
Если не список и не датафрейм, то точно defaultdict. Обычно использую для сбора результатов в цикле:

from collections import defaultdict

res = defaultdict(list) # общая структура словарь, где для каждого ключа значением по-умолчанию будет пустой список

for idx, value enumerate([123, 999, 678]):
res["idx"].append(idx)
res["value"].append(value**2)

res["value"] # это список =)


Кейсы:
- подготовка данных для визуализации (например, отдать по API)
- сбор статистики при выполнении сложных SQL запросов
- ...

4️⃣ Модуль itertools
- batched открыл для себя, когда сам сначала написал такую штуку (немножко изобрел 🚲)
- combinations - итератор комбинаций из элементов списка
- zip_longest - "длинный" брат zip (если не знал, то zip проходится по самому короткому из списков)

5️⃣ Модуль functools - магия функционального программирования
- partial - кажется, это самая часто используемая функцию из модуля (ну может после lru_cache())

import typing
import pandas as pd
import numpy as np


def func(df: pd.DataFrame, columns: list, method: t.Callable = np.mean, window_width: int = 2):
assert window_width == 2, "Ширина окна должна быть 2+!"

for col in columns:
df[f"{col}_rolling"] = df[col].rolling(window_width, min_periods=2).apply(lambda window: method(window))

return df

func_mean = partial(func, columns=["value_1", "value_2"], method=np.median)

# теперь можно так, остальные параметры уже частично применились выше
func_mean(df=df)


Кейсы:
- ресерч методов или алгоритмов для решения одной и той же задачи
- отправка уведомлений только нужным адресам (чтобы каждый раз не прописывать их в аргументах)

6️⃣ reduce - применяет функцию из 2 аргументов итерабельно к списку, на выходе 1 значение

l = [0, 1, -5, 7, 8]

# x - первый элемент списка, или результат применения функции к текущему элементу, y - текущий элемент
res = reduce(lambda x, y: x+y, l)

print(res) # 11

.
И последнее (но это не точно): не забывайте заглядывать в доки, чтобы не городить монструозные преобразования, которые решаются специальным параметром. Помните, в казалось бы, простой функции pandas.read_csv - ~50 параметров, скорее всего ваш кейс уже там есть🙃
.
#python #lifehack
👍4🔥1
🤘 Collab Practice
.
За время ревьюверства на курсах по Анализу данных и Data Science накопилось небольшое кол-во
Collab-ноутбуков с полезными штуками.
.
Делюсь, может кому-то пригодится:
1️⃣ Пропуски и их заполнение
2️⃣ Как не выстрелить себе в ногу при приведении типов данных
3️⃣ xticks или как самому накинуть на ось Х
4️⃣ Категоризация данных
.
Любые замечания или дополнения принимаются 🤝
#python #collab #practice
🔥7
Пусть это будет пятничным мемом во вторник😁
.
Когда вы с коллегами решили ввести CI проверку на наличие delete-ов при работа с ClickHouse🤟
🔥2👍1😁1
​​Выходим из затишья
,
Почти теория: чем выше рабочая нагрузка тем меньше постов в блогах =)
Станем разрушать эту теорию.
,
За время затишья удалось:
- погрузиться в методологию DV2.0 (datavault)
- разобраться в рабочих фреймворках 👌
- написать свои фреймворки 💪
- стать teamlead (всё только началось)
.
Обо всем поговорим - планируется несколько серий постов, уххх 📒 А сейчас о нашем быстром друге Clichouse.
Парочка рабочих кейсов:
- проблема inodes
Что такое inodes: wiki, по-простому, файлы с метаинформацией и как оказалось их кол-во,
хоть и большое, но ограничено. И вот если вставлять очень активно и много, то они могут закончиться и тогда кластер переходит в read-only. Не слабо такое хватануть в понедельник утром, проблему решала поддержка YaCloud

- массовые вставки и отъезд по TTL
Хранение в клике может быть разным:
- на дисках кластера
- на дисках кластера и на внешних дисках (один из вариантов внутренний object_storage), почитать в доке - это гибридное хранилище
.
TTL - это возможность управлять жизненным циклом данных, возможности достаточно интересные, но сейчас нас интересует возможность
изменять тип хранения, то есть по достижении определенного момента времени хранение данных должно измениться, local -> object_storage.
При совокупности некоторых параметров и кейсов может наступить забавная ситуация:
- бекфилл данных
- данные настолько далеко в истории, что сразу наступает TTL
.
Что происходит: вставка идет на локальный диск и сразу данные отъезжают в object_storage и если данных много - получается два очень активных процесса. И это бомба замедленного действия, которая стреляет в ногу = поврежденные файлы, которые ломают процесс отъезда даннных -> накопление данных на локальных дисках -> место заканчивается -> read-only. Положение спасала поддержка YaCloud, так что
не создавайте таких массовых процессов 😉
.
Вот такие рабочие кейсы, до встречи🖐
.
#work #clickhouse
🔥4👍3
Colorized it!
.
Пост удобства, у коллеги заметил цветовое разделение коннектов к базам в DataGrip - показалось крайне интересным.
Но что, у меня-то Dbeaver, сходу решения не нашлось =(
.
Со второй попытки копания в настройках удалось найти нужную 🥳
.
Маленькая инструкция в репо.
.
#dbeaver #colorize
👍1