дата инженеретта – Telegram
дата инженеретта
2.98K subscribers
242 photos
28 videos
4 files
102 links
мелкое — крупно,
в глубоком разговоре
мудрость приходит

по вопросам сюда: @aigul_sea
Download Telegram
💡Ответ💡

💻На скринах ответы ChatGPT 3.5 на английском и русском. В общем, иногда дает правильный ответ, но потом его заносит даже в новом диалоге. На русском получить норм ответ у меня так и не получилось. 4й дает примерно такие же результаты, за исключением нейминга и кучи переносов строк))

Домашку проверила) В целом, у всех плюс-минус похожие варианты❤️

Отметила несколько моментов:

1️⃣ агрегирующую функцию можно совмещать с оконкой

2️⃣ чтобы учесть одинаковые метрики, я бы использовала rank/dense_rank вместо row_number

3️⃣ косячные фильтры на дату
--тут не войдет 31 число
WHERE datetime >= '2024-03-01' AND datetime < '2024-03-31'

--тут за 31.03 войдет только 00:00:00
WHERE datetime between '2024-03-01' AND '2024-03-31'

--поэтому так правильнее
WHERE datetime > '2024-03-01' AND datetime < '2024-04-01'

Ну, и всякие extract/date_part тоже подойдут.
Please open Telegram to view this post
VIEW IN TELEGRAM
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥12👌1
Помогите закрыть таску!😘

💻Исходные данные
У нас есть три вьюшки с id и json-полем. Вьюшки собираются по разным фильтрам.

📌Цель: собрать массив степов, при этом дополнить предыдущие недостающие степы значением default_json.

🎥Пример:
для id = 1: [json]
для id = 2: [default_json, json]
для id = 3: [default_json, default_json, json]
и т.д.

При этом в default_json прокидывается текущий степ, поэтому нам нужно не потерять это название.

🚀Пока идея такая:
- делаем full join
- делаем unpivot
- добавляем флаг is_missed на основе null
- итерируемся с конца в рамках id и удаляем все is_missed = 1, пока не найдём is_missed = 0

Лично мне это кажется слишком громоздкий и малочитаемым, поэтому хочется двинуться в сторону unuon, но тогда теряется текущий степ. Или после джойна в рамках строки как-то найти последнее заполненное поле.

🤩Any ideas?
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥51
👫Про нетворкинг

На этих выходных я сходила на сходку аналитиков, где мы пообщались на разные темы и поделились своими мнениями.

👋Сначала мы все знакомились: удивительно, но знакомство 12 человек друг с другом заняло больше часа) Рассказали, кто где работает, чем занимается, кратенькую историю жизни.

Потом обсудили несколько резюме. На всякий случай я писала заметки себе на будущее, и вот что туда попало:

1️⃣ Работу гораздо проще найти, написав напрямую тимлидам из телеграм-чатиков, чем через hh.

2️⃣ Нужно уметь хорошо списывать) Например, если формат и наполнение резюме всем заходит, то не надо прикручивать велосипеды.

3️⃣ Очень удобно, если есть стек и он сгруппирован по тегам: языки программирования, базы данных и т.д. (мнение от hr)

4️⃣ Ключевая структура:
- что делал
- чем делал (инструменты)
- какая была цель
- какой результат и вэлью
Результатом может быть увеличение хорошей метрики, уменьшение плохой метрики или стабилизация/сохранение какой-то метрики.
Я сформулировала это так: все continuous задачи перевести в done.

5️⃣ Можно рассказывать про свои пет-проекты на собесах (и даже шерить экран!)

6️⃣ Если проходили курсы, то можно их вписать в место работы с пометкой "учебный опыт", но обязательно нужно уйти от шаблонности других выпускников курса. Возможно, докрутить на проекты что-то свое, что будет выгодно отличать вас от остальных.

☕️Обсудили, что у рабочих задач, в отличие от учебных, может не быть ответа, что любая задача становится нерешимой, если провести достаточное количество совещаний, и top management driven development)
Обсудили свои планы, идеи, решения, что я прямо замотивировалась встать и начать делать!

Всего мы просидели 6 часов, но я немного огорчилась, что так мало!
Please open Telegram to view this post
VIEW IN TELEGRAM
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥199❤‍🔥4💯2
Логика с json'ами

На прошлой неделе я вкидывала задачу. Мы ещё раз пообсуждали и вот на чем остановились:

1️⃣Делаем фулл джойн исходных витрин: view1, view1_1, ...
Идею с кросс джойнами мы решили оставить)

2️⃣Делим фулловую таблицу на n вьюшек, где n - это количество шагов. Начинаем с конца (последнего заполненного).

Пример:
when step1_2 is not null => v_step1_2 - новая вьюшка для строк, у которых есть степ1_2:

3 null null json
5 json null json
6 null json json
7 json json json


3️⃣Делаем left anti join фулловой таблицы с созданной вьюшкой, получаем строки, которые не вошли:

1 json null null
2 null json null
4 json json null

Повторяем шаги 2 и 3 n раз.
NB! В следующих вьюшках количество столбцов уменьшается (исключаем уже выделенный степ)

1) when step1_1 is not null => v_step1_1

2 null json
4 json json

После left anti join обновленная фулловая:

1 json null null

На последнем этапе просто берем id и первый столбец.

Итак, у нас 3 вьюшки с разным количеством столбцов.

4️⃣Делаем coalesce(stepn, default_json)

5️⃣Делаем array(всех шагов) => получаем одинаковую структуру для всех вьюшек с полями id и array

6️⃣Union

🕺🕺🕺🕺🕺🕺🕺
Please open Telegram to view this post
VIEW IN TELEGRAM
🤯8🔥3🆒3
Написала статью про планы запросов!

Пару месяцев лежала в бэклоге, месяц собиралась в голове, 3 дня писалась, и теперь она здесь)

Сесть и сделать сподвигла последняя дз по курсу амбассадоров. Там был выбор между статьёй и подкастом. Но до подкастов я как-нибудь тоже доберусь)

https://habr.com/ru/articles/807421/
🔥26❤‍🔥54
Теория или практика?

📖 Наконец-то прочитала книжку "Spark в действии". Очень понравился вот этот рисунок: он супер понятно описывает суть ленивых вычислений) Spark копит все преобразования над данными, пока что-то не заставит его запустить весь пайплайн (например, show).

Отметила для себя пару новых моментов:

1️⃣ Можно создать табличку через sql и сразу перевести её в датафрейм (я думала, что такое работает только для реальных таблиц)

Например:
spark.sql(
'select id from users'
) \
.createOrReplaceTempView('view_users')
df_users = spark.table('view_users')


2️⃣ Есть интересные методы сортировки asc_nulls_first, asc_nulls_last (для desc так же), чтобы обозначить порядок для нуллов

🐰 Ещё позабавили некоторые формулировки)

"Агрегации - ... в известной степени приближение к методам машинного обучения и искусственного интеллекта".


"Во время моей работы со Spark я встречал два типа людей: тех, кто имел опыт работы с Apache Hadoop, и тех, у кого не было такого опыта".


Надеюсь, что это трудности перевода))

🐱 Вообще пришло осознание, что теоретические книжки легко читаются, когда ты уже пощупал это на практике.

А вы как считаете, когда нужно читать умные книжки?

#spark
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥178❤‍🔥4
Репликация табличек

👫Кратко про стек
clickhouse - это база данных))
airflow - оркестратор)
dbt - тулза для удобной работы с трансформацией данных, контроля их качества, ещё позволяет все DDL делать через селекты

💐Кейс
Недавно у нас данные в кликхаусе переехали на новый кластер с поддержкой репликации. А поверх этих данных крутились пайплайны в airflow и запускались проверочки на dbt. И в один момент все упало😐

Что ж, надо фиксить. Самая большая боль - это как поднять упавшие dbt модельки, которые писали данные в таблички. Например, есть такие, которые каждый день собирают количество строк в данных для мониторинга.

Пошел пул проблем:

🔴откуда берутся таблички с суффиксами dbt_backup, dbt_tmp_tables, dbt_new_data и почему они затирают оригинальные

🔴почему инкрементальные модельки (которые дописывают порцию данных) не могут это делать при повторных запусках

🔴как создавать таблички из csv файликов с движками Replicated

🔴почему вдруг я не могу запускать больше 5 параллельных тасок в даге и как это исправить

🔴нет грантов на операцию sync replica

🔴почему при чтении одной и той же таблицы то 100 строк, то 0
...

Задала вопрос в сообществе dbt в слаке, и там один чел прокомментил, что столкнулся с теми же самыми проблемами. Совпадение? (он из WB)

В общем, это всё удалось решить так:

⭐️в dbt обязательно прописывать в конфигах cluster (спец. штука для реплицированных таблиц)

⭐️для csv файликов тоже в конфигах можно прописать дефолтный движок

⭐️для параллельного запуска тасок есть параметр max_active_tasks

⭐️перепроверить, что в коде с модельками и сами таблички заданы через Replicated

Многие проблемы решились последним пунктом, в том числе с 0 строк. Тут суть в том, что когда создается стандартная табличка (с движком без Replicated), то она находится на одной ноде. Но когда мы запросом туда стучимся, то нас могут отправить на другую ноду, где этих данных нет👍

А вы работали с репликацией, есть запомнившаяся мудрость на будущее?

P.S. все, идите отдыхайте 😶🐱🐱
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥11106
🔍Задачка на поиск

С одним из подписчиков обсуждали рабочую задачку, накидали варианты. Сейчас я к ней вернулась, и у меня созрело финальное решение)

💬Описание
На фронте есть табличка с именами и датами, по которой есть поиск. Поиск нужен по столбцу дат. Поисковой запрос идёт на бэк, и грузятся данные из бд.

💫Особенности (как же без них)
Дата в формате mm/dd/yyyy: 05/04/2024 = 04.05.2024. В бд как обычно: 2024-05-04.

Примеры
1️⃣ Ввод: 11/20
Вывод:
🔘mm/11/20 - любой месяц
🔘11/20/yyyy - любой год

2️⃣ Ввод: 1/2
Вывод:
🔘mm/_1/2___ - дни могут быть 01, 11, 21, 31 (если есть), год - начинается на 2
🔘_1/2_/yyyy - месяц 01 или 11, день - с 20 по 29

💐Что нужно?
Предложить наиболее эффективный вариант поиска.

Какие идеи?
Please open Telegram to view this post
VIEW IN TELEGRAM
4🔥2
💡Ответ💡

В комментах предложили навесить функциональный индекс. Выглядит как то, что нужно.

Что это?
Это такой индекс, в котором ключи посчитаны через функцию.

😎Решение №1
Никогда его не использовала, но для нашего случая будет по типу:
CREATE INDEX us_date_idx ON my_table (concat_ws('/', month(dt), day(dt), year(dt)))


Делаем 'concat like входная строка' - применяется наш индекс.

👉Решение №2
Если мы не можем создать индекс, идея такая:

1. Разделить поисковую строку по /
2. Три случая:

▪️1 элемент - день, месяц или год
like '%elem1%'


▪️2 элемента - либо день и год, либо месяц и день

--день и год 
--1/2 - это конец дня и начало года
like 'year%day'

--месяц и день
--1/2 - это конец месяца и начало дня
like '-%month-day%'


▪️3 элемента
--1/29/202 - месяц и год могут быть недописаны 
like 'year%-%month-day'
Please open Telegram to view this post
VIEW IN TELEGRAM
5
dbt_deep_dive.pdf
1.6 MB
💕dbt deep dive💕

У нас в команде есть история с deep dive. Мы выбираем тему, собираем все наши знания и задачи, раскапываем всё. Потом делаем презу и просвещаем коллег.

Эта преза моего дип дайва по dbt. Там рассказывается:
✔️что делает этот инструмент
✔️как создать проект
✔️что в этом проекте есть
✔️как запускать

А вы какие фишки используете?
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥346💯5👌21
Как заруинить джойн в спарке?

С помощью udf!

🧐 Что это?
UDF (user defined function) - это функция, которую мы сами написали и можем применить в спарке (и не только).

🤩 Пример
Самый обычный джойн:
df1.join(df2, ['id'], 'inner')


План запроса - стандартный SortMerge (без деталей):
== Physical Plan ==
+- Project
+- SortMergeJoin
:- Sort
: +- Exchange hashpartitioning
: +- Filter isnotnull
: +- Scan ExistingRDD
+- Sort ...



😭 Перепишем запрос через udf:
compare_udf = F.udf(lambda x, y: x == y, BooleanType()) 

df1.alias('df1') \
.join(
df2.alias('df2'),
compare_udf(
F.col('df1.id'),
F.col('df2.id')
),
'inner'
)


И все - теперь у нас под капотом декартово произведение:
== Physical Plan ==
*(3) Project
+- *(3) Filter pythonUDF0#56: boolean
+- BatchEvalPython [<lambda>(id#0L, id#4L)], [pythonUDF0#56]
+- CartesianProduct
:- *(1) Scan ExistingRDD
+- *(2) Scan ExistingRDD


👀 А все потому, что для спарка udf - это черный ящик, и он не будет заглядывать вовнутрь. Так что схема такая:

SortMerge -> Cartesian
ShuffledHash -> Cartesian
BroadcastHash -> BroadcastNestedLoop


#spark
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥15951👍1
🧳 Рабочий кейс

Недавно с коллегой обсуждали задачу:

Нужно запускать расчеты по разным продуктам с разной начальной датой, чтобы не дублировать уже загруженную инфу.

1️⃣ Первая версия кода была такая - каждой функции по циклу:

for dt in pd.date_range(start_dt1, end_dt):
load1(start_dt1)

for dt in pd.date_range(start_dt2, end_dt):
load2(start_dt2)


И таких функций 10. 20. А Хотелось компактно организовать🎁

2️⃣ Вторая версия - итерируемся от минимальной даты и навешиваем условие перед запуском каждой функции:

min_dt = min(start_dt1, start_dt2)

for dt in pd.date_range(min_dt, end_dt):
if dt >= start_dt1:
load1(dt)
elif dt >= start_dt2:
load2(dt)

Тоже с условиями слишком громоздко🏔

3️⃣ В итоге мы дошли до такой финальной версии:
⭐️создаем функцию для генерации цикла по датам
⭐️создаем список функций с датами
⭐️передаем функции как аргументы

def loop(start_dt, end_dt, func):
for dt in pd.date_range(start_dt, end_dt):
func(dt)


func_list = [(load1, start_dt1), (load2, start_dt2)]

for func, dt in func_list:
loop(dt, end_dt, func)


Чтобы добавить новую функцию, теперь достаточно дописать ее в func_list.
А у вас есть идеи элегантного решения?
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥6❤‍🔥441
🔠🔠🔠Unit-тесты для данных
появились в версии dbt 1.8

https://docs.getdbt.com/docs/build/unit-tests

https://github.com/dbt-labs/dbt-core/releases/tag/v1.8.0

Уже успели попробовать?
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥81
Обработка больших данных

Представьте, что у вас есть огромная таблица. Она накапливает по миллиарду строк на каждый день. И вам нужно вытащить агрегаты за последние полгода.

Если считать в лоб, то надо пройтись по ~ 1 млрд * 6 месяцев * 30 дней = 180 млрд строк 🔥

Вы запустили запрос, все зависло, кластер упал, оборудование перегрелось, дата центр сгорел - вас уволили и заставили влезть в кредиты😱😱😬😬 ну, или нет)

Что делать?

Такие таблицы обычно считаются по дням. Посчитали день - положили в датафрейм. Потом новый кусок объединили с предыдущими.
Или так: посчитали - сохранили в таблицу. И каждый новый день по очереди дописывается.

Поэтому семь раз режем, один раз мерим.✂️
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥21👏51
DESCRIBE FORMATTED

Посмотреть пути партиций в hdfs можно командой:
spark.sql("DESCRIBE FORMATTED web.visits partition (visit_date='2024-03-01')")
.where("col_name = 'Location'")
.select("data_type")
.show()


DESCRIBE FORMATTED хранит инфу о полях и типах данных плюс метаинформацию (дату создания, партиции, форматы хранения, локацию) и возвращает примерно такую табличку:
+-----------------------------+ 
|col_name |data_type |comment|
+-----------------------------+
|visit_date|string |Дата |
|# Storage | | |
|Location |hdfs://...| |
+-----------------------------+


И после получения локации мы можем считать файл в датафрейм:
val df = spark.read.parquet("ctrl+c наш location")


#spark
🔥10👌2💯2
Пул мудростей

Собрали с коллегами по несчастью папочку с полезными каналами про данные

Велком, если хотите увеличить площадь вашего погружения) 🐤
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥6❤‍🔥2
Логи расчетов

Просто оставлю здесь это искусство🎨
Please open Telegram to view this post
VIEW IN TELEGRAM
72
😊 Розыгрыш! 😊

Разыгрываю книжку "Spark в действии", которую даже сама читала в электронном варианте😁
Готова переслать по России или лично вручить в Мск!

Нужно:
1️⃣ Быть подписанным на канал
2️⃣ Оставить коммент - напишите любое ключевое слово из sql/spark/языков программирование/airflow итд итп

Итоги будут подведены 17.06.2024

Удачи🙂
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥3511
Как ПСИ проходили

🦆 Мы сейчас мигрируем витрины данных на свой кластер и перед каждым релизом проходим ПСИ.

🤓 ПСИ - это приемо-сдаточное испытание, где проверяют, а что за данные, а как, а почему, а с какой целью, а какой атрибутивный состав, а какая логика расчета и тому подобное. Что потоки с витринами успешно запускаются без падений.

👉 И вот во время созвона мы обнаруживаем, что для установки на прод не хватает release notes, которые автоматом проставляются. Т.к. это все для нас впервые, то мы не в курсах, как их быстро добавить.

🏃‍♂️ Пишу коллеге, который нам помогал по этому поводу, - не читает. Звоню(!):

- Как проставить release notes?
- Надо запустить доп. пайплайн в Jenkins
- Где его найти?
- Лежит рядом с основным (хотя бы это знаем)

Иду в Jenkins, как заполнять поля - непонятно, смотрю примеры, делаю по аналогии, запускаю - отработало!!!

👨‍🦯 Лид тайм от реагирования до исполнения: 16 минут.
ПСИ успешно пройдено.

Mic Drop
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥23🏆3🤝2👻1