Как прошла SmartData 2025?
Это были 2 дня конфы в Питере - доклады, тусовка крутых ребят, активности и обсуждения. Ниже будут инсайты от оргов, команд и почему я просто обязана поехать в следующем году!
📚 Доклады
В одно время идут сразу 3 доклада, поэтому большинство осталось за кадром. Тут топ из тех, которые я посетила:
1️⃣ Текущее состояние рынка даты
Presto/Trino + K8s + S3, Iceberg, StarRocks, Polars - движется сюда
2️⃣ Куда развивается айсберг
Из продвинутого - мат вьюшки, UDF, интероперабельность вьюшек и т.д.
Поясняю про вьюшки:
Spark создает - Spark читает - ок🤩
Trino создает - Trino читает - ок🤩
Trino создает - Spark читает - не ок🤩 , а должно быть ок (наоборот тоже)
Также есть глобальная проблема: в айсберге фичу могут внедрить, но пройдет много времени, пока движок научится это поддерживать
3️⃣ Про датасеты в Airflow
Все супер понятно, с несколькими кейсами, проблемами и решениями. Я взяла контакт, нам может пригодиться
4️⃣ Spark Connect
Как раз недавно коллега вкидывал идею использовать Spark Connect для одной задачи, надо будет пересмотреть
5️⃣ Self-service для деплоя витрин в Авито
Тут вообще приколдесная штука, они в битбакете в комментах пишут команды dwh test, dwh merge, это проверяют кучи тест-кейсов, есть автоопределение зависимостей, циклических зависимостей. С точки зрения идеи и реализации мне кажется безумно крутым, но вопрос - оно действительно нужно было?
6️⃣ DQ as a Service
Интересные фичи:
⁃ проверки группируются, чтобы не спамить каждую
⁃ проверки на месяц могут ссылаться на проверки по дню, чтобы не пересчитывать заново
⁃ ETL-процесс отправляет свой результат, чтобы проверка его переиспользовала
⁃ некоторые проверки на сэмплах данных могут не отличаться от всего объема
Уходя в первый день, я услышала мнение:
🐱 Нетворкинг
Тут самое прикольное!
Я пообщалась с представителем программного комитета, с ребятами из компаний по поводу их подходов, вживую познакомилась с нашими коллегами, с еще одним автором де канала и его очень крутыми коллегами. Меня даже узнали несколько человек, сказали, что один из адекватных каналов🙂
Чел из программного комитета поделился, что докладов изначально в 3 раза больше, они ценят уникальность (без написанных статей с хабра и повторов с других конф). Если определенных технологий нет - были слабенькие доклады. Иногда программный комитет хочет сходить на несколько докладов и расставляет так, чтобы они не пересекались))
✨ Активности и инсайты
Активности были на стендах партнеров конфы. Я там познакомилась с организатором мероприятий - вы в курсе, что самый базовый стенд без особых наворотов, подиумов и подсветки уже стоит 1 млн??
Игрулек было достаточно, я набрала себе столько мерча, что уже можно продавать)) Из полезного:
🤩 Бросала магнитные дротики. Куда попадешь - такая сложность вопроса, берите на заметку)
Junior:
SQL-запрос для дубликатов?
Зачем нужны индексы в бд?
Middle:
Как вы настроите мониторинг для пайплайна, чтобы знать, если данные не пришли вовремя?
Как обеспечить идемпотентность в пайплайне?
Senior:
Как вы оцените, когда пора переходить от batch-обработки к streaming-архитектуре?
🤩 Собирала архитектуру
Это было самое прикольное! Мы клали в реальную корзину реальные технологии и шли сканировать на кассу. Представьте: я купила айсберг. А если вы не знали, то Магнит выкупил Азбуку вкуса
Пока собирала архитектуру в Х5, ребята рассказали про крутую собственную разработку. У них много инстансов Airflow, и они синкуются через Redis - там хранятся статусы о состояниях дагов, на которых строятся зависимости. Это просто🔥
🍓 На сладенькое
В конце был глобальный розыгрыш сумки с мерчом от партнеров и билета на следующий год. Представляете, это выиграла я!!!!
Это были 2 дня конфы в Питере - доклады, тусовка крутых ребят, активности и обсуждения. Ниже будут инсайты от оргов, команд и почему я просто обязана поехать в следующем году!
В одно время идут сразу 3 доклада, поэтому большинство осталось за кадром. Тут топ из тех, которые я посетила:
Presto/Trino + K8s + S3, Iceberg, StarRocks, Polars - движется сюда
Из продвинутого - мат вьюшки, UDF, интероперабельность вьюшек и т.д.
Поясняю про вьюшки:
Spark создает - Spark читает - ок
Trino создает - Trino читает - ок
Trino создает - Spark читает - не ок
Также есть глобальная проблема: в айсберге фичу могут внедрить, но пройдет много времени, пока движок научится это поддерживать
Все супер понятно, с несколькими кейсами, проблемами и решениями. Я взяла контакт, нам может пригодиться
Как раз недавно коллега вкидывал идею использовать Spark Connect для одной задачи, надо будет пересмотреть
Тут вообще приколдесная штука, они в битбакете в комментах пишут команды dwh test, dwh merge, это проверяют кучи тест-кейсов, есть автоопределение зависимостей, циклических зависимостей. С точки зрения идеи и реализации мне кажется безумно крутым, но вопрос - оно действительно нужно было?
Интересные фичи:
⁃ проверки группируются, чтобы не спамить каждую
⁃ проверки на месяц могут ссылаться на проверки по дню, чтобы не пересчитывать заново
⁃ ETL-процесс отправляет свой результат, чтобы проверка его переиспользовала
⁃ некоторые проверки на сэмплах данных могут не отличаться от всего объема
Уходя в первый день, я услышала мнение:
SmartData - это, значит, очень клевое место, если народ в начале докладов разбегается по залам. Это говорит об уровне конфы
Тут самое прикольное!
Я пообщалась с представителем программного комитета, с ребятами из компаний по поводу их подходов, вживую познакомилась с нашими коллегами, с еще одним автором де канала и его очень крутыми коллегами. Меня даже узнали несколько человек, сказали, что один из адекватных каналов
Чел из программного комитета поделился, что докладов изначально в 3 раза больше, они ценят уникальность (без написанных статей с хабра и повторов с других конф). Если определенных технологий нет - были слабенькие доклады. Иногда программный комитет хочет сходить на несколько докладов и расставляет так, чтобы они не пересекались))
Активности были на стендах партнеров конфы. Я там познакомилась с организатором мероприятий - вы в курсе, что самый базовый стенд без особых наворотов, подиумов и подсветки уже стоит 1 млн??
Игрулек было достаточно, я набрала себе столько мерча, что уже можно продавать)) Из полезного:
Junior:
SQL-запрос для дубликатов?
Зачем нужны индексы в бд?
Middle:
Как вы настроите мониторинг для пайплайна, чтобы знать, если данные не пришли вовремя?
Как обеспечить идемпотентность в пайплайне?
Senior:
Как вы оцените, когда пора переходить от batch-обработки к streaming-архитектуре?
Это было самое прикольное! Мы клали в реальную корзину реальные технологии и шли сканировать на кассу. Представьте: я купила айсберг. А если вы не знали, то Магнит выкупил Азбуку вкуса
Пока собирала архитектуру в Х5, ребята рассказали про крутую собственную разработку. У них много инстансов Airflow, и они синкуются через Redis - там хранятся статусы о состояниях дагов, на которых строятся зависимости. Это просто
В конце был глобальный розыгрыш сумки с мерчом от партнеров и билета на следующий год. Представляете, это выиграла я!!!!
Please open Telegram to view this post
VIEW IN TELEGRAM
1🔥22❤13 11
5. Лидерство в команде
Продолжаю рассказывать про курс команды, и это последний модуль🥲
Четвертый модуль - тут
Описание программы - тут
Темы, которые мы разобрали:
🤩 что такое лидерство
🤩 теории лидерства (личностная, поведенческая, ситуационная, организационно-ролевая)
🤩 работа с приоритетами (матрица Эйзенхауэра, метод ABC)
🤩 способы принятий решений (проблемно-ориентированный, результат-ориентированный, авторский/лидерский, генеративно-критериальный, эволюционный)
🤩 личный бренд руководителя команды
🤩 теория по очень сложным фреймворкам (Human System Dynamics, Warm Data Labs)
Из всех практик мне больше всего понравилась с диаграммой Исикавы. Нам нужно было взять одну проблему и исследовать ее, при этом ничего не решать. Я помню, что в универе нам про это рассказывали, но смысла я тогда не особо уловила
🌧 У одной девушки из подгруппы была необычная проблема: джун начинает фиксить баг, находит несовершенства в архитектуре и начинает все рефакторить. Он делает это хорошо, но задача затягивается на месяц-два. Поэтому ему не поручают хотфиксы, а задачи дают за два-три релиза вперед. В итоге получается, что на оставшуюся часть команды ложится больше нагрузки
В ходе разбора рыбы Исикавы по косточкам мы выяснили, что:
🤩 нет 1:1 и обратной связи
🤩 нет точек контроля
🤩 нет четкой постановки задач
🤩 у задач нет оценки
🤩 у задач нет дедлайна
🤩 джун понимает, что задачи несрочные и неважные
В общем, диаграмма позволяет понять причины, причины причин и т.д. В конце мы обсудили разные сценарии решения и вариант "А что если не решать?" Это тоже как один из подходов к решению
Книги, которые я себе записала:
📒 Маршалл Голдсмит, Марк Рейтер "Триггеры"
📒 Тимоти Феррис "Как работать по 4 часа в неделю и при этом не торчать в офисе «от звонка до звонка», жить где угодно и богатеть"
Ну вот и все, выпускной тоже прошел - пообщались с однокурсниками, преподами и получили дипломы🐸
Продолжаю рассказывать про курс команды, и это последний модуль
Четвертый модуль - тут
Описание программы - тут
Темы, которые мы разобрали:
Нам в лидерах нравится то, что мы в каком-то виде имеем в себе
Бренд руководителя - это истории, которые про нас знают
Из всех практик мне больше всего понравилась с диаграммой Исикавы. Нам нужно было взять одну проблему и исследовать ее, при этом ничего не решать. Я помню, что в универе нам про это рассказывали, но смысла я тогда не особо уловила
В ходе разбора рыбы Исикавы по косточкам мы выяснили, что:
В общем, диаграмма позволяет понять причины, причины причин и т.д. В конце мы обсудили разные сценарии решения и вариант "А что если не решать?" Это тоже как один из подходов к решению
Книги, которые я себе записала:
Ну вот и все, выпускной тоже прошел - пообщались с однокурсниками, преподами и получили дипломы
Please open Telegram to view this post
VIEW IN TELEGRAM
👍17🔥11❤9💅2
Что с нами станет?
На днях зашла в линкедин, и первый пост в ленте был от европейского стартапа. В посте приветствовали нового коллегу - моего контакта (назовем его Паша)
🎹 С Пашей я познакомилась на 2м курсе, когда увидела афишу фортепианного вечера, а он оказался ее оргом. Лайнап артистов уже был собран, но я все-таки напросилась, и мы какое-то время хорошо общались
Я помню Пашу, когда он учился на мехмате, потом наши пути не пересекались. Оказалось, что он получил степень магистра геометрии, а два года назад уехал в Италию учиться на второй маге по ИИ
🍕 Так вот в посте сказано, что он работал бизнес-аналитиком, а потом перешел на AI Engineer/AI Specialist. Я сходила к Паше за краткой пояснялкой этой профессии:
Может, нам тоже надо переквалифицироваться, пока не поздно?
На днях зашла в линкедин, и первый пост в ленте был от европейского стартапа. В посте приветствовали нового коллегу - моего контакта (назовем его Паша)
Я помню Пашу, когда он учился на мехмате, потом наши пути не пересекались. Оказалось, что он получил степень магистра геометрии, а два года назад уехал в Италию учиться на второй маге по ИИ
По существу, разработчик AI архитектур. Соединять, оркестрировать, оценивать перформанс LLM и блоки пре-/пост-процессинга данных для них
Может, нам тоже надо переквалифицироваться, пока не поздно?
Please open Telegram to view this post
VIEW IN TELEGRAM
Lakehouse Podcast
Наконец посмотрела подкаст про лейкхаус с моим прошлым техлидом. Он сейчас CPO в Arenadata, крутой чел, и мне всегда интересно, как у него дела
Сначала я подумала, что это просто рандомный подкаст, в детали не вчитывалась, поэтому вспомнила про него только спустя два месяца😅 А там как раз есть некоторые ответы на вопросы из этого поста. Ниже некоторые моменты, которые мне особенно запомнились
🙂 "На какие бизнесовые метрики влияет лейкхаус?"
У меня сразу возникла ассоциация, как Бах выражался про игру на инструменте))
🙂 Пара интересных метрик
Time to Insight - время до получения инсайта из данных
Time to Data - время до того, как можно воспользоваться данными
🤩 Еще я всегда слышала про Cloud-Native - сервисы, которые изначально пилились под облака
🤩 Но не слышала про Cloud-Ready - изначально не под облако, но легко мигрировать
🤩 А еще гуглинг привел к Cloud-Compatible - не под облако, но работает и с ним
🤩 И к Cloud-Enabled - не под облако, но его как-то адаптировали
🤩 И к Cloud-Agnostic - без использования специфичных сервисов провайдера в отличие от native
На самом деле этих cloud- штук очень много, между ними может быть тончайшая разница или одно быть частью другого
🙂 Финальная мысль
Наконец посмотрела подкаст про лейкхаус с моим прошлым техлидом. Он сейчас CPO в Arenadata, крутой чел, и мне всегда интересно, как у него дела
Сначала я подумала, что это просто рандомный подкаст, в детали не вчитывалась, поэтому вспомнила про него только спустя два месяца
Основная бизнес-метрика - точность попадания в качество и во время
У меня сразу возникла ассоциация, как Бах выражался про игру на инструменте))
Играть на музыкальном инструменте просто. Нужно вовремя нажимать нужные клавиши
Time to Insight - время до получения инсайта из данных
Time to Data - время до того, как можно воспользоваться данными
На самом деле этих cloud- штук очень много, между ними может быть тончайшая разница или одно быть частью другого
Возможно ли, что лейкхаус станет устаревшим раньше, чем большинство компаний успеют его внедрить?
Please open Telegram to view this post
VIEW IN TELEGRAM
YouTube
Lakehouse: почему дом данных всё время на реконструкции
Встречайте 10-й юбилейный выпуск подкаста Pro Данные! Вместе с экспертами из Arenadata и DIS Group обсуждаем Lakehouse. Почему компании сейчас активно присматриваются к этой архитектуре и всем ли стоит туда идти?
В гостях:
Алексей Быков, product owner,…
В гостях:
Алексей Быков, product owner,…
1👍11🔥6❤3 1
Full + Incremental Load
Начала читать книгу "Data Engineering Design Patterns" (2025), 375 страниц. Несколько раз видела хорошие отзывы, по содержанию очень прикольная. Это про паттерны загрузки данных, как лучше работать с ошибками в данных, как организовать правильный перезапуск пайплайна и еще много всего
В книге дали ссылочку на гитхаб с готовым кодом❤
Эта серия постов будет неким конспектом с добавлением моих мыслей
Итак, начнем
Data Ingestion/Загрузка данных
🌷 Full Load
Опасности и решения:
1. Следить за ростом датасета. В идеале не слишком много строк, растет медленно
2. drop-insert - опасная штука, пользователи могут читать в момент записи. Использовать вьюшку:
🤩 пользователи ходят в вьюшку
🤩 вьюшка смотрит на table1
🤩 данные пишутся в table2
🤩 table2 подменяется на table1
У нас реально были такие проблемы:
Что мы сделали:
1. Добавили зависимости по событию от источников
2. shadow calc: создается копия витрины, все манипуляции происходят с копией в стейджинге, в конце делается rename
🪴 Incremental Load
1⃣ Pattern: Incremental Loader
1й способ. Иметь дату загрузки, чтобы определить инкремент. Опасно использовать дату события, потому что они могут долетать позже. Например, временно отключился интернет, события долетели с лагом, а мы уже обработали этот период. Последняя дата загрузки должна где-то сохраняться
2й способ. Делать партиции по времени. Например, даг работает каждый час и всегда берет данные за предыдущий час
Опасности и решения:
1. Для удаленных строк применять soft delete (просто маркируем удаленной) вместо hard delete, иначе они просто останутся у нас в системе
2. Использовать Insert-only/append-only - в табличку только добавляем данные
Реализации:
1. Для даты загрузки - обязательно добавлять фильтр
2. Для партиций по времени - добавить сенсор, который смотрит на появление следующей партиции. Если партиция появилась, значит, текущий период закончился и его можно обработать. Плюс обязательно передать дату в Airflow через {{ ds }}
Я была удивлена, прочитав этот механизм. Все делаем по книжке, получается😎
2⃣ Pattern: Change Data Capture
Используется, когда события нужно получать быстро (~30s). CDC - это стриминг логов журналов (WAL) баз данных
Был приведен пример с Delta Lake, но для Iceberg я тоже нашла примеры
На этом пока все, это была даже не половина всей главы🥺
#depatterns
Начала читать книгу "Data Engineering Design Patterns" (2025), 375 страниц. Несколько раз видела хорошие отзывы, по содержанию очень прикольная. Это про паттерны загрузки данных, как лучше работать с ошибками в данных, как организовать правильный перезапуск пайплайна и еще много всего
В книге дали ссылочку на гитхаб с готовым кодом
Эта серия постов будет неким конспектом с добавлением моих мыслей
Итак, начнем
Data Ingestion/Загрузка данных
Опасности и решения:
1. Следить за ростом датасета. В идеале не слишком много строк, растет медленно
2. drop-insert - опасная штука, пользователи могут читать в момент записи. Использовать вьюшку:
У нас реально были такие проблемы:
File does not exist:
hdfs://warehouse/hive/my_db.db/my_table/2
6-01_29_data.0.parq
It is possible the underlying files have been
updated. You can explicitly invalidate the
cache in Spark by running 'REFRESH TABLE
tableName' command in SQL or by recreating
the Dataset/DataFrame involved.
Что мы сделали:
1. Добавили зависимости по событию от источников
2. shadow calc: создается копия витрины, все манипуляции происходят с копией в стейджинге, в конце делается rename
1й способ. Иметь дату загрузки, чтобы определить инкремент. Опасно использовать дату события, потому что они могут долетать позже. Например, временно отключился интернет, события долетели с лагом, а мы уже обработали этот период. Последняя дата загрузки должна где-то сохраняться
2й способ. Делать партиции по времени. Например, даг работает каждый час и всегда берет данные за предыдущий час
Опасности и решения:
1. Для удаленных строк применять soft delete (просто маркируем удаленной) вместо hard delete, иначе они просто останутся у нас в системе
2. Использовать Insert-only/append-only - в табличку только добавляем данные
Реализации:
1. Для даты загрузки - обязательно добавлять фильтр
f'ingestion_time BETWEEN "{date_from}" AND "{date_to}"'2. Для партиций по времени - добавить сенсор, который смотрит на появление следующей партиции. Если партиция появилась, значит, текущий период закончился и его можно обработать. Плюс обязательно передать дату в Airflow через {{ ds }}
Я была удивлена, прочитав этот механизм. Все делаем по книжке, получается
Используется, когда события нужно получать быстро (~30s). CDC - это стриминг логов журналов (WAL) баз данных
Был приведен пример с Delta Lake, но для Iceberg я тоже нашла примеры
На этом пока все, это была даже не половина всей главы
#depatterns
Please open Telegram to view this post
VIEW IN TELEGRAM
👍29❤8🔥3
Сопутки загрузок
🍔 Data Compaction
Pattern: Compactor
Стриминг пишет очень много мелких файлов, и эта проблема актуальна не только для hdfs, но и для s3. Потому что нужно перебирать много файлов, открывать/закрывать, скапливается куча меты для табличных форматов (delta lake, iceberg, hudi)
🧊 Решение - объединить в файлы побольше. В iceberg для этого есть процедура rewrite_data_files. Если таблица тяжелая, компакшн можно встроить в сам процесс загрузки. Или поставить на внерабочие часы, чтобы никому не мешать. Кстати, со стримингами у нас даже дважды компактится: каждый час и каждый день
После компакшена в iceberg можно почистить все ненужное через expire_snapshots - задаем время, раньше которого удаляются все снепшоты вместе с данными. Или просто подождать, пока оно само по ретеншену отвалится
☕️ Data Readiness
Pattern: Readiness Marker
Иногда потребителям данных нужно знать, что данные готовы
1й способ. Создавать файл по типу _SUCCESS в спарке отдельной таской в Airflow. Тогда потребители через FileSensor смогут мониторить появление файла
2й способ. Дождаться создания следующей партиции - это значит, что текущая закрыта
Тут используется pull - потребители сами чекают, готовы ли данные
⛅️ Event Driven
Pattern: External Trigger
А тут про push - загрузчик сам уведомляет, что данные появились
🌱 🌱 🌱 🌱 🌱
Очень прикольно, что даются описание и примеры для батча и для стриминга. И все с использованием самого популярного: Spark, Flink, Kafka, Airflow, Python, Scala, SQL, иногда с добавлением чисто западных штук
💐 Кстати, мне тут очень понравились несколько выражений из книги, первые два очень поэтичные:
#depatterns
Pattern: Compactor
Стриминг пишет очень много мелких файлов, и эта проблема актуальна не только для hdfs, но и для s3. Потому что нужно перебирать много файлов, открывать/закрывать, скапливается куча меты для табличных форматов (delta lake, iceberg, hudi)
После компакшена в iceberg можно почистить все ненужное через expire_snapshots - задаем время, раньше которого удаляются все снепшоты вместе с данными. Или просто подождать, пока оно само по ретеншену отвалится
Pattern: Readiness Marker
Иногда потребителям данных нужно знать, что данные готовы
1й способ. Создавать файл по типу _SUCCESS в спарке отдельной таской в Airflow. Тогда потребители через FileSensor смогут мониторить появление файла
2й способ. Дождаться создания следующей партиции - это значит, что текущая закрыта
Тут используется pull - потребители сами чекают, готовы ли данные
Pattern: External Trigger
А тут про push - загрузчик сам уведомляет, что данные появились
Очень прикольно, что даются описание и примеры для батча и для стриминга. И все с использованием самого популярного: Spark, Flink, Kafka, Airflow, Python, Scala, SQL, иногда с добавлением чисто западных штук
Sad but true, but your data engineering life will rarely be a bed of roses.
However, life is not that rosy.
Always expecting the worst is probably not the best way to go through life, but it’s definitely one of the best approaches you can take to your data engineering projects.
#depatterns
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥13❤4👍4
Козырной вопрос
А у вас есть прикольный вопрос, который вы спрашиваете у руководителя на собесе? Команда, стек, задачи, скрам/не скрам - это все понятно. Но есть ли что-то особенное?
Мне когда-то один из коллег вкинул вопрос про мемы. Когда я не чувствую метча с лидом и понимаю, что я не хочу там быть, то я не спрашиваю. Но если мне ребята и проект кажутся прикольными, то я спрашиваю:
Ответы бывали разные:
🤩 нет такого
🤩 нет такого, но есть рабочий чат, в принципе туда можно кидать (но никто так не делает)
🤩 у нас спринты называются мемами
🤩 есть, я даже могу найти последние, но они больше про инфраструктуру у клиента
🤩 есть в корп мессенджере
🤩 есть и в корп, и в тележке
🤩 конечно! есть командный, еще есть для всего департамента
Становится понятно, какие люди там работают, обстановка сразу разбавляется, и мы заканчиваем на положительной ноте)
А у вас есть прикольный вопрос, который вы спрашиваете у руководителя на собесе? Команда, стек, задачи, скрам/не скрам - это все понятно. Но есть ли что-то особенное?
Мне когда-то один из коллег вкинул вопрос про мемы. Когда я не чувствую метча с лидом и понимаю, что я не хочу там быть, то я не спрашиваю. Но если мне ребята и проект кажутся прикольными, то я спрашиваю:
А у вас есть мемный чатик?
Ответы бывали разные:
Становится понятно, какие люди там работают, обстановка сразу разбавляется, и мы заканчиваем на положительной ноте)
Please open Telegram to view this post
VIEW IN TELEGRAM
👍17🔥9💅4❤1
Управляем ошибками. Часть 1
Unprocessable Records
К нам пришли строки, которые падают при обработке. Есть 2 пути:
1. Сразу падать. Но если в стриминге много плохих событий, то мы замучаемся постоянно переподнимать
2. Не падать, а игнорить их - но особым образом
📒 Pattern: Dead-Letter
Что делаем:
1. Определяем места в коде, где что-то может упасть
2. Оборачиваем в try/catch, if/else
3. Добавляем мету для анализа ошибки
4. Пишем зафейленные строки/файлы в другую папку
5. Добавляем алерты
6. Пишем пайплайн для перезапуска из зафейленной папки (опционально)
У нас есть подобная штука - если поля критичные и точно не должны быть пустыми, то такие данные сразу складываются отдельно. Но разбираются ли причины - это загадка)🤷♀️
Опасности и решения:
1. Если просто отфильтровать некорректные записи, то другие пайплайны будут использовать неполные данные. Но если мы поправим ошибки и перепроцессим, то пользователям тоже придется все перезапускать. А там может быть 20 пайплайнов, которые ссылаются друг на друга. И вообще у них может быть не реализован пересчет за прошлое🙂
2. Чтобы отличать скорректированные записи, может понадобиться какой-нибудь флаг
3. Можно заполнять NULL при ошибке, но тогда придется сравнивать: это действительно NULL-значение в источнике, или что-то пошло не так? (мне не нравится)
4. Обязательно алертить, если количество проигноренных данных очень большое, и даже останавливать джобу. Прям с остановкой дальнейших тасок я не сталкивалась, интересный подход на полном доверии к dq)
🌿 🌿 Duplicated Records
Pattern: Windowed Deduplicator
Для батча все просто: distinct/dropDuplicates или окно с row_number = 1
Для стриминга нужно выделить временное окно и сохранять уже обработанные уникальные ключи:
В этом примере ключи будут храниться в течение 10 минут. Если для новой записи ключ уже существует, он скипнется. Если тот же самый ключ придет через 11 минут, то будут дубликаты
#depatterns
Unprocessable Records
К нам пришли строки, которые падают при обработке. Есть 2 пути:
1. Сразу падать. Но если в стриминге много плохих событий, то мы замучаемся постоянно переподнимать
2. Не падать, а игнорить их - но особым образом
Что делаем:
1. Определяем места в коде, где что-то может упасть
2. Оборачиваем в try/catch, if/else
3. Добавляем мету для анализа ошибки
4. Пишем зафейленные строки/файлы в другую папку
5. Добавляем алерты
6. Пишем пайплайн для перезапуска из зафейленной папки (опционально)
У нас есть подобная штука - если поля критичные и точно не должны быть пустыми, то такие данные сразу складываются отдельно. Но разбираются ли причины - это загадка)
Опасности и решения:
1. Если просто отфильтровать некорректные записи, то другие пайплайны будут использовать неполные данные. Но если мы поправим ошибки и перепроцессим, то пользователям тоже придется все перезапускать. А там может быть 20 пайплайнов, которые ссылаются друг на друга. И вообще у них может быть не реализован пересчет за прошлое
2. Чтобы отличать скорректированные записи, может понадобиться какой-нибудь флаг
3. Можно заполнять NULL при ошибке, но тогда придется сравнивать: это действительно NULL-значение в источнике, или что-то пошло не так? (мне не нравится)
4. Обязательно алертить, если количество проигноренных данных очень большое, и даже останавливать джобу. Прям с остановкой дальнейших тасок я не сталкивалась, интересный подход на полном доверии к dq)
Pattern: Windowed Deduplicator
Для батча все просто: distinct/dropDuplicates или окно с row_number = 1
Для стриминга нужно выделить временное окно и сохранять уже обработанные уникальные ключи:
.withWatermark("visit_time", "10 minutes")
.dropDuplicates(["visit_id", "visit_time"])
В этом примере ключи будут храниться в течение 10 минут. Если для новой записи ключ уже существует, он скипнется. Если тот же самый ключ придет через 11 минут, то будут дубликаты
#depatterns
Please open Telegram to view this post
VIEW IN TELEGRAM
❤13👍7🔥4
Управляем ошибками. Часть 2
📼 Late Data
Посмотрим на 3 паттерна для работы с данными, которые пришли позже, чем ожидалось
1️⃣ Pattern: Late Data Detector
Пока что для меня сложная и непонятная история про стриминг. В целом, книга легко читается, но требуется время, чтобы переварить. Все концепты сжаты, но очень насыщенны. Перечитаю, когда нужно будет с этим работать
Интересная мысль - "shifting the late data problem"
Пример: мы пишем партиции по времени обработки. В партицию 21:00 к нам залетел кусок данных за 20:00 и 19:00. А наши пользователи используют партиции по времени события. Тогда мы перекладываем ответственность ковыряться в этих партициях на них😁
2️⃣ Pattern: Static Late Data Integrator
Как вообще можно перегрузить данные за прошлое?
1. Создать кучу дагранов, где каждый перегружает 1 день. Если упало - перезапускаем конкретный день
2. Создать один дагран, где в коде генерируется список нужных дат. И по каждой дате запускается загрузка. Если упало - просто перезапускаем, пойдет считаться с упавшего дня. Это и есть Static Late Data Integrator. А статическое - потому что мы сами задаем 14 дней или сколько угодно
И тут я поняла, что неосознанно это и делала. У нас часто была проблема, что данные в источники просто не приходили😁 Потом мы шли разбираться с владельцами, и данные заливались, но позднее. Чтобы это учитывать, в моем подходе был такой алгоритм:
1. Задаем стартовую и конечную даты расчета
2. Создаем диапазон значений
3. Из меты достаем существующие партиции
4. Находим разницу
5. Итерируемся по потеряшкам
Если в будущем снова будет пустая дата, нам не придется перезапускать определенный день - он пойдет считаться сам
3️⃣ Pattern: Dynamic Late Data Integrator
Предлагается завести табличку с 4 полями:
🤩 партиция
🤩 время обработки
🤩 время добавления новых записей
🤩 флаг обработано или нет
Так мы запросом можем найти партиции, которые уже обрабатывались, но в которые попали новые данные. А в iceberg есть удобное свойство last_updated_at на уровне таблицы
🤩 Filtering
Pattern: Filter Interceptor
Как будто это антипаттерн. Предлагается создать доп колонки с фильтрами id_is_not_null, status_is_not_failed и выводить количество отфильтрованных записей, чтобы понимать, на каком этапе ошибка в коде или в данных. Но прям пробегаться по каждой записи в датафрейме… Как будто это все-таки dq
🌳 Fault Tolerance
Pattern: Checkpointer
Просто нужно создавать чекпоинты и хранить последний оффсет обработанной записи и состояние, если оно есть
Еще раз напомнили про семантики доставки:
🤩 exactly once - нужны другие паттерны, расскажу, когда дойду
🤩 at least once - чекпоинт после обработки, могут быть дубликаты при перезапуске после падения
🤩 at most once - чекпоинт до обработки, данные потеряются при падении
#depatterns
Посмотрим на 3 паттерна для работы с данными, которые пришли позже, чем ожидалось
Пока что для меня сложная и непонятная история про стриминг. В целом, книга легко читается, но требуется время, чтобы переварить. Все концепты сжаты, но очень насыщенны. Перечитаю, когда нужно будет с этим работать
Интересная мысль - "shifting the late data problem"
Пример: мы пишем партиции по времени обработки. В партицию 21:00 к нам залетел кусок данных за 20:00 и 19:00. А наши пользователи используют партиции по времени события. Тогда мы перекладываем ответственность ковыряться в этих партициях на них
Как вообще можно перегрузить данные за прошлое?
1. Создать кучу дагранов, где каждый перегружает 1 день. Если упало - перезапускаем конкретный день
2. Создать один дагран, где в коде генерируется список нужных дат. И по каждой дате запускается загрузка. Если упало - просто перезапускаем, пойдет считаться с упавшего дня. Это и есть Static Late Data Integrator. А статическое - потому что мы сами задаем 14 дней или сколько угодно
И тут я поняла, что неосознанно это и делала. У нас часто была проблема, что данные в источники просто не приходили
1. Задаем стартовую и конечную даты расчета
2. Создаем диапазон значений
full_range = pd.date_range(start=str(start_dt), end=str(end_dt)).strftime("%Y-%m-%d")
3. Из меты достаем существующие партиции
def get_existing_partitions(table_name):
partitions = (
spark.sql(f"show partitions {table_name}")
.select(F.split(F.col("partition"), "=")[1].alias("dt"))
.collect()
)
return [p[0] for p in partitions]
4. Находим разницу
lost_range = full_range.difference(existing_partitions_pdf)
5. Итерируемся по потеряшкам
for dt in lost_range:
calc_mart(dt)
Если в будущем снова будет пустая дата, нам не придется перезапускать определенный день - он пойдет считаться сам
Предлагается завести табличку с 4 полями:
Так мы запросом можем найти партиции, которые уже обрабатывались, но в которые попали новые данные. А в iceberg есть удобное свойство last_updated_at на уровне таблицы
Pattern: Filter Interceptor
Как будто это антипаттерн. Предлагается создать доп колонки с фильтрами id_is_not_null, status_is_not_failed и выводить количество отфильтрованных записей, чтобы понимать, на каком этапе ошибка в коде или в данных. Но прям пробегаться по каждой записи в датафрейме… Как будто это все-таки dq
Pattern: Checkpointer
Просто нужно создавать чекпоинты и хранить последний оффсет обработанной записи и состояние, если оно есть
Еще раз напомнили про семантики доставки:
#depatterns
Please open Telegram to view this post
VIEW IN TELEGRAM
❤10👍7🤔1
Мои заметки
У меня скопилось много заметок с митапов, из книг и разговоров. Я решила все собрать в один пост, а у себя почистить) Это будут просто рандомные мысли, которые меня зацепили, и иногда пояснения контекста. Разделила по категориям, чтобы это был порядочный хаос
Про жизнь
🟡 Никто не обязан знать все и быть всегда правым
🟡 В большинстве случаев все решаемо
Про работу
🟣 На груминге задачи: «Мы не знаем, как это сделать, но надо как-то сделать»
🟣 Если что-то хорошо работает, оно не должно хорошо работать. Просто вы еще не знаете, что именно работает плохо
Про карьеру
🟡 «Синьор одного проекта» - не имеет достаточного опыта с процессами, с другими командами, с решением кейсов
🟡 Фидбек интервьюера на собесе: «Иногда важно не что отвечает, а как»
Про менеджерство
🔴 Микроменеджмент - недостаток доверия - ревность
На одной конфе сказали левую половину фразы. Я подумала, что ревность тоже возникает от недостатка доверия. Значит ли это, что микроменеджмент = ревность?🤔
🔴 Я не понимаю, почему в играх ты повышаешь уровень, когда все проходишь без ошибок. Ведь ты получаешь новые уровни, когда ошибаешься. Ошибка - ресурс, который конвертируется в результат
🔴 Проджект менеджер - это пастух встреч
Про технику
⭕️ База данных - всего лишь кэшированная версия журнала (т.к. в ней последние значения полей, а в журнале - все)
⭕️ В потоках сортировка слиянием неприменима, т к. сортировка невозможна на бесконечном наборе данных
⭕️ CDC - это хак, журналы низкоуровневые и до определенного момента не использовались
⭕️ В Hadoop используется концепция «складывай как есть, потом решим». Однако на практике выясняется: быстро получить данные, даже представленные в странном, трудном для применения, необработанном формате, зачастую более ценно, чем пытаться заранее выбрать идеальную модель данных
⭕️ Для разных людей по-разному могут работать сайты - для активных пользователей быстрее, для пассивных - дольше (не факт, что это реальная инфа, но мысль интересная)
Про безопасность
🟡 Менее страшно, если взломали того, где ИБ не было. Страшно, если взломали того, где ИБ было
🟡 Самая безопасная система - выключенная
🟡 Большинство не являющихся критически важными для безопасности систем выбирают дешевизну и ненадежность вместо дороговизны и надежности
Про красивое
🟣 Учусь дышать над поломанным смыслом
🟣 Инерция пылинки
Про байки
На одной из конф рассказывали байки, одна из них меня очень зацепила:
🔴 Правда или ложь: если человеку, который купил вещь, в течение недели/месяца не показывать похожие вещи в рекомендациях, то конверсия в продажи вырастет?
❤️ - правда
🤔 - ложь
У меня скопилось много заметок с митапов, из книг и разговоров. Я решила все собрать в один пост, а у себя почистить) Это будут просто рандомные мысли, которые меня зацепили, и иногда пояснения контекста. Разделила по категориям, чтобы это был порядочный хаос
Про жизнь
Про работу
Про карьеру
Про менеджерство
На одной конфе сказали левую половину фразы. Я подумала, что ревность тоже возникает от недостатка доверия. Значит ли это, что микроменеджмент = ревность?
Про технику
Про безопасность
Про красивое
Про байки
На одной из конф рассказывали байки, одна из них меня очень зацепила:
❤️ - правда
🤔 - ложь
Please open Telegram to view this post
VIEW IN TELEGRAM
❤32🤔8
Возможен ли Sort-Merge Join без шафла?
Ага, в спарке есть такая оптимизация - Storage Partition Join (SPJ). Если таблицы одинаково партиционированы и джойнятся по этому ключу, то не приходится перемещать кучу данных. Правда, это работает только в форматах, поддерживающих DataSource API V2. Например, в Iceberg, Hudi, Delta Lake
Что нужно:
🌸 Spark 3.3+
🌸 Условие равенства в джойне
🌸 У таблиц должно быть общее поле-партиция, которое входит в ключ джойна
🎈 Пример
Я взяла 2 таблицы, партицировала, заполнила данными и положила. Потом их поджойнила, и вот какой план запроса у меня сначала получился (картинка 1):
У нас есть Exchange - происходит шафл даных. Потом я накинула несколько конфигов:
План запроса чуть поменялся (картинка 2) - исчез шафл:
В одной из статей говорится, что перформанс увеличился на 45-70% на джойнах. А кто-то уже использовал на практике?
@data_engineerette
Ага, в спарке есть такая оптимизация - Storage Partition Join (SPJ). Если таблицы одинаково партиционированы и джойнятся по этому ключу, то не приходится перемещать кучу данных. Правда, это работает только в форматах, поддерживающих DataSource API V2. Например, в Iceberg, Hudi, Delta Lake
Что нужно:
Я взяла 2 таблицы, партицировала, заполнила данными и положила. Потом их поджойнила, и вот какой план запроса у меня сначала получился (картинка 1):
== Physical Plan ==
CollectLimit
+- * Project
+- * SortMergeJoin Inner
:- * Sort
: +- Exchange
: +- * Project
: +- BatchScan spark_catalog.test.table1
+- * Sort
+- Exchange
+- * Project
+- BatchScan spark_catalog.test.table2
У нас есть Exchange - происходит шафл даных. Потом я накинула несколько конфигов:
# включает Storage Partition Join
spark.conf.set("spark.sql.sources.v2.bucketing.enabled", "true")
# сохраняет текущее партиционирование при планировании запроса
spark.conf.set("spark.sql.iceberg.planning.preserve-data-grouping", "true")
# убирает шафл, когда партиции между таблицами не совпадают по количеству
spark.conf.set("spark.sql.sources.v2.bucketing.pushPartValues.enabled", "true")
# иначе ключи джойна должны быть такие же, как партиции, в том же порядке
spark.conf.set("spark.sql.requireAllClusterKeysForCoPartition", "false")
# борется с перекосом данных
spark.conf.set("spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled", "true")
# разрешает включать части партиций в ключ джойна
spark.conf.set("spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled", "true")
# на всякий можно отключить broadcast, адаптивку и sortmerge (если это точно нужно)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.adaptive.enabled", "false")
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
План запроса чуть поменялся (картинка 2) - исчез шафл:
== Physical Plan ==
CollectLimit
+- * Project
+- * SortMergeJoin Inner
:- * Sort
: +- * Project
: +- BatchScan spark_catalog.test.table1
+- * Sort
+- * Project
+- BatchScan spark_catalog.test.table2
В одной из статей говорится, что перформанс увеличился на 45-70% на джойнах. А кто-то уже использовал на практике?
@data_engineerette
Please open Telegram to view this post
VIEW IN TELEGRAM
❤13 7🤔1
Минутка рефлексии
Не люблю рефлексировать, потому что это всегда вгоняет меня в тоску😔 Но мем прям метчится со мной. У меня практически никогда не бывает такого, что я прыгаю от радости или злюсь и агрессирую. Мне просто норм
Вот вы пишете код, sql-запросы, копаетесь в логах ошибок, запускается пайплайны, уточняете требования, ресерчите источники данных, делаете преобразования, нажимаете на кнопочки, стучите по клавиатуре - это вам в кайф? Думаете ли вы: "Уррра, завтра я буду разбираться в проблемах с качеством данных"? "Наконец-то этот поток упал, чтобы я выяснил все проблемы и поднял его"?
Вот в конце июля опубликовали 2025 Stack Overflow Developer Survey, и там всего лишь 25% людей прям счастливы на работе. Остальным 25% вообще не нравится, 50% ок
Не люблю рефлексировать, потому что это всегда вгоняет меня в тоску
Вот вы пишете код, sql-запросы, копаетесь в логах ошибок, запускается пайплайны, уточняете требования, ресерчите источники данных, делаете преобразования, нажимаете на кнопочки, стучите по клавиатуре - это вам в кайф? Думаете ли вы: "Уррра, завтра я буду разбираться в проблемах с качеством данных"? "Наконец-то этот поток упал, чтобы я выяснил все проблемы и поднял его"?
Вот в конце июля опубликовали 2025 Stack Overflow Developer Survey, и там всего лишь 25% людей прям счастливы на работе. Остальным 25% вообще не нравится, 50% ок
Please open Telegram to view this post
VIEW IN TELEGRAM
❤16😭11👍6
Как ускорить ClickHouse?
Обсуждали с одним де перформанс систем и связку Trino + ClickHouse. Моя первая мысль - серьезно? КХ и так быстрый, куда еще к нему Trino накручивать?? Моя вторая мысль - тестим! Дисклеймер - мои тесты не претендуют на супер истину, но дают определенное представление
💻 Я взяла ноут, подняла кх с 1 нодой и трино с кх каталогом. Создала тестовые таблички с 1к, 10к, 100к, 1млн, 10млн строк. Больше уже не отрабатывало
Придумала такой запрос:
Тут есть работа со строками, агрегация, оконка и т.д. - поэтому тестим в какой-то мере тяжелую операцию
Запрос я запустила по 50 раз на каждом размере данных для кх и трино. Потом так же погоняла insert моей cte в таблицу
Изначально сходила в system.query_log и посчитала длительность, строки, байты, память в среднем по всем запускам. Потом поняла, что для трино они будут отражать только кусок работы в кх, поэтому сравнение будет некорректным
По итогу решила сравнить время: query_duration_ms vs elapsedTime
Какие выводы я могу сделать:
1. Трино поверх КХ проигрывает КХ. Так получилось при моих данных и в моих условиях. Хотя есть мнение, что на большом объеме данных он работает быстрее бд (~100гб)
2. При insert трино проигрывает еще значительнее, потому что сначала пишет данные во временную таблицу, потом перекладывает в основную:
3. В query_log по Трино не собирается часть метрик: user_aggregate_functions, used_functions, used_data_type_families. А также отличаются result_rows, query. Если это важно для мониторинга, то использовать не получится
4. Я также хотела отделить разные тесты комментами, чтобы потом удобнее искать в query_log:
Но в трино нет такого функционала, так что многие необходимые кликхаусные штучки просто не будут работать
@data_engineerette
Обсуждали с одним де перформанс систем и связку Trino + ClickHouse. Моя первая мысль - серьезно? КХ и так быстрый, куда еще к нему Trino накручивать?? Моя вторая мысль - тестим! Дисклеймер - мои тесты не претендуют на супер истину, но дают определенное представление
Придумала такой запрос:
with cte as (
select
id,
concat(cast(id, 'String'), '_', cast(id, 'String')),
sum(id),
row_number() over (order by id) as rn
from test.tbl1000
group by 1, 2
)
select * from cte
where rn in (14, 49)
order by rn desc
Тут есть работа со строками, агрегация, оконка и т.д. - поэтому тестим в какой-то мере тяжелую операцию
Запрос я запустила по 50 раз на каждом размере данных для кх и трино. Потом так же погоняла insert моей cte в таблицу
Изначально сходила в system.query_log и посчитала длительность, строки, байты, память в среднем по всем запускам. Потом поняла, что для трино они будут отражать только кусок работы в кх, поэтому сравнение будет некорректным
По итогу решила сравнить время: query_duration_ms vs elapsedTime
Какие выводы я могу сделать:
1. Трино поверх КХ проигрывает КХ. Так получилось при моих данных и в моих условиях. Хотя есть мнение, что на большом объеме данных он работает быстрее бд (~100гб)
2. При insert трино проигрывает еще значительнее, потому что сначала пишет данные во временную таблицу, потом перекладывает в основную:
INSERT INTO "test"."result1000" ("id", "str", "sum_", "rn") SELECT "id", "str", "sum_", "rn" FROM "test"."tmp_trino_c5cb30af" temp_table
3. В query_log по Трино не собирается часть метрик: user_aggregate_functions, used_functions, used_data_type_families. А также отличаются result_rows, query. Если это важно для мониторинга, то использовать не получится
4. Я также хотела отделить разные тесты комментами, чтобы потом удобнее искать в query_log:
SETTINGS log_comment = 'test'
Но в трино нет такого функционала, так что многие необходимые кликхаусные штучки просто не будут работать
@data_engineerette
Please open Telegram to view this post
VIEW IN TELEGRAM
👍24🌚1
Please open Telegram to view this post
VIEW IN TELEGRAM
😁33🔥14❤4😭2💅1