Как мониторить подвисшие сенсоры?
Начнем с того, что в Airflow есть несколько состояний для таски:
⭐️ none - пока отдыхает
⭐️ scheduled - должна быть запущена, все зависимости выполнены
⭐️ queued - ждет свободный воркер
⭐️ running - работает
⭐️ success - успешно завершилась
⭐️ restarting - перезапустили
⭐️ failed - упала
⭐️ skipped - пропущена
⭐️ upstream_failed - упала предыдущая таска, которая нам нужна
⭐️ up_for_retry - упала, но будет перезапущена
⭐️ up_for_reschedule - сенсор будет перезапущен
⭐️ deferred - отложена и ждет триггер
⭐️ removed - удалена из дага после запуска
Подвисшие сенсоры уходят в статус deferred. У нас они имеют такой нейминг - mytask_awaiting_somedag. Я написала себе запрос, который выводит:
⭕️ название дага, на который смотрит сенсор
⭕️ количество сенсоров, которые ждут этот даг
⭕️ общее количество подвисших сенсоров
И так можно сразу понять, на какой даг смотрит наибольшее количество сенсоров, и посмотреть причину
Начнем с того, что в Airflow есть несколько состояний для таски:
Подвисшие сенсоры уходят в статус deferred. У нас они имеют такой нейминг - mytask_awaiting_somedag. Я написала себе запрос, который выводит:
И так можно сразу понять, на какой даг смотрит наибольшее количество сенсоров, и посмотреть причину
with sensored as (
SELECT
substr(
task_id,
strpos(task_id, 'awaiting_') + length('awaiting_')
) as sensor,
dag_id
FROM airflow.public.task_instance
WHERE state = 'deferred'
)
select
sensor,
count(1) over(partition by sensor) as sensor_cnt,
count(1) over() as total_cnt,
dag_id
from sensored
order by 2 desc, sensor, dag_id;
Please open Telegram to view this post
VIEW IN TELEGRAM
1👍22🔥6❤5
Я сделала стикеры😎
Версия 1.0
Давно была мысль придумать свой стикерпак, и вот я ее реализовала🙌 Если бы я знала, сколько знаний и умений это потребует, то не стала бы так возиться))
Пару дней накидывала в заметках кучу идей. А потом пришлось осваиваться в графическом дизайне. Это просто нечто🤯 🤯 Я сидела все выходные и выясняла:
🤩 как работать со слоями?
🤩 как выровнять картинки с текстом?
🤩 как поправить кривые, чтобы они выглядели норм?
🤩 что такое тип узла?
🤩 как сделать изогнутый текст?
🤩 как нарисовать форму вокруг всего текста, а не отдельных буковок?
🤩 как перевести текст в кривые?
🤩 как объединить элементы?
🤩 как сделать обводку?
🤩 как сделать поля наклеек?
🤩 как нарисовать контур реза и какого цвета?
🤩 в каком масштабе сохранять?
🤩 как соотносятся пункты и мм?
🤩 какими делать поля документа?
🤩 Хотелось несколько раз бросить это дело, потому что я никак не могла найти инфу и функционал в проге. Но что-то двигало меня вперед🤩
А еще приходилось гуглить “что такое плоттерная резка”, внимательно читать требования к стикерпаку, вымерять расстояния линеечкой, переделывать готовый стикерпак под нужный масштаб и кучу раз менять расположение, чтобы все поместилось и не порезалось. В итоге 80% идей остались за бортом, некоторые уже реализованные элементы пришлось вырезать на этапе аппрува от типографии😭
Потом забрать цветопробу, заплатить денежку (20 штук - 6к, тоже как бы недешево), забрать финалочку в день отъезда на конфу и вуаля🙌
Версия 1.0
Давно была мысль придумать свой стикерпак, и вот я ее реализовала
Пару дней накидывала в заметках кучу идей. А потом пришлось осваиваться в графическом дизайне. Это просто нечто
А еще приходилось гуглить “что такое плоттерная резка”, внимательно читать требования к стикерпаку, вымерять расстояния линеечкой, переделывать готовый стикерпак под нужный масштаб и кучу раз менять расположение, чтобы все поместилось и не порезалось. В итоге 80% идей остались за бортом, некоторые уже реализованные элементы пришлось вырезать на этапе аппрува от типографии
Потом забрать цветопробу, заплатить денежку (20 штук - 6к, тоже как бы недешево), забрать финалочку в день отъезда на конфу и вуаля
Please open Telegram to view this post
VIEW IN TELEGRAM
❤24 9🔥6👍3
Как прошла SmartData 2025?
Это были 2 дня конфы в Питере - доклады, тусовка крутых ребят, активности и обсуждения. Ниже будут инсайты от оргов, команд и почему я просто обязана поехать в следующем году!
📚 Доклады
В одно время идут сразу 3 доклада, поэтому большинство осталось за кадром. Тут топ из тех, которые я посетила:
1️⃣ Текущее состояние рынка даты
Presto/Trino + K8s + S3, Iceberg, StarRocks, Polars - движется сюда
2️⃣ Куда развивается айсберг
Из продвинутого - мат вьюшки, UDF, интероперабельность вьюшек и т.д.
Поясняю про вьюшки:
Spark создает - Spark читает - ок🤩
Trino создает - Trino читает - ок🤩
Trino создает - Spark читает - не ок🤩 , а должно быть ок (наоборот тоже)
Также есть глобальная проблема: в айсберге фичу могут внедрить, но пройдет много времени, пока движок научится это поддерживать
3️⃣ Про датасеты в Airflow
Все супер понятно, с несколькими кейсами, проблемами и решениями. Я взяла контакт, нам может пригодиться
4️⃣ Spark Connect
Как раз недавно коллега вкидывал идею использовать Spark Connect для одной задачи, надо будет пересмотреть
5️⃣ Self-service для деплоя витрин в Авито
Тут вообще приколдесная штука, они в битбакете в комментах пишут команды dwh test, dwh merge, это проверяют кучи тест-кейсов, есть автоопределение зависимостей, циклических зависимостей. С точки зрения идеи и реализации мне кажется безумно крутым, но вопрос - оно действительно нужно было?
6️⃣ DQ as a Service
Интересные фичи:
⁃ проверки группируются, чтобы не спамить каждую
⁃ проверки на месяц могут ссылаться на проверки по дню, чтобы не пересчитывать заново
⁃ ETL-процесс отправляет свой результат, чтобы проверка его переиспользовала
⁃ некоторые проверки на сэмплах данных могут не отличаться от всего объема
Уходя в первый день, я услышала мнение:
🐱 Нетворкинг
Тут самое прикольное!
Я пообщалась с представителем программного комитета, с ребятами из компаний по поводу их подходов, вживую познакомилась с нашими коллегами, с еще одним автором де канала и его очень крутыми коллегами. Меня даже узнали несколько человек, сказали, что один из адекватных каналов🙂
Чел из программного комитета поделился, что докладов изначально в 3 раза больше, они ценят уникальность (без написанных статей с хабра и повторов с других конф). Если определенных технологий нет - были слабенькие доклады. Иногда программный комитет хочет сходить на несколько докладов и расставляет так, чтобы они не пересекались))
✨ Активности и инсайты
Активности были на стендах партнеров конфы. Я там познакомилась с организатором мероприятий - вы в курсе, что самый базовый стенд без особых наворотов, подиумов и подсветки уже стоит 1 млн??
Игрулек было достаточно, я набрала себе столько мерча, что уже можно продавать)) Из полезного:
🤩 Бросала магнитные дротики. Куда попадешь - такая сложность вопроса, берите на заметку)
Junior:
SQL-запрос для дубликатов?
Зачем нужны индексы в бд?
Middle:
Как вы настроите мониторинг для пайплайна, чтобы знать, если данные не пришли вовремя?
Как обеспечить идемпотентность в пайплайне?
Senior:
Как вы оцените, когда пора переходить от batch-обработки к streaming-архитектуре?
🤩 Собирала архитектуру
Это было самое прикольное! Мы клали в реальную корзину реальные технологии и шли сканировать на кассу. Представьте: я купила айсберг. А если вы не знали, то Магнит выкупил Азбуку вкуса
Пока собирала архитектуру в Х5, ребята рассказали про крутую собственную разработку. У них много инстансов Airflow, и они синкуются через Redis - там хранятся статусы о состояниях дагов, на которых строятся зависимости. Это просто🔥
🍓 На сладенькое
В конце был глобальный розыгрыш сумки с мерчом от партнеров и билета на следующий год. Представляете, это выиграла я!!!!
Это были 2 дня конфы в Питере - доклады, тусовка крутых ребят, активности и обсуждения. Ниже будут инсайты от оргов, команд и почему я просто обязана поехать в следующем году!
В одно время идут сразу 3 доклада, поэтому большинство осталось за кадром. Тут топ из тех, которые я посетила:
Presto/Trino + K8s + S3, Iceberg, StarRocks, Polars - движется сюда
Из продвинутого - мат вьюшки, UDF, интероперабельность вьюшек и т.д.
Поясняю про вьюшки:
Spark создает - Spark читает - ок
Trino создает - Trino читает - ок
Trino создает - Spark читает - не ок
Также есть глобальная проблема: в айсберге фичу могут внедрить, но пройдет много времени, пока движок научится это поддерживать
Все супер понятно, с несколькими кейсами, проблемами и решениями. Я взяла контакт, нам может пригодиться
Как раз недавно коллега вкидывал идею использовать Spark Connect для одной задачи, надо будет пересмотреть
Тут вообще приколдесная штука, они в битбакете в комментах пишут команды dwh test, dwh merge, это проверяют кучи тест-кейсов, есть автоопределение зависимостей, циклических зависимостей. С точки зрения идеи и реализации мне кажется безумно крутым, но вопрос - оно действительно нужно было?
Интересные фичи:
⁃ проверки группируются, чтобы не спамить каждую
⁃ проверки на месяц могут ссылаться на проверки по дню, чтобы не пересчитывать заново
⁃ ETL-процесс отправляет свой результат, чтобы проверка его переиспользовала
⁃ некоторые проверки на сэмплах данных могут не отличаться от всего объема
Уходя в первый день, я услышала мнение:
SmartData - это, значит, очень клевое место, если народ в начале докладов разбегается по залам. Это говорит об уровне конфы
Тут самое прикольное!
Я пообщалась с представителем программного комитета, с ребятами из компаний по поводу их подходов, вживую познакомилась с нашими коллегами, с еще одним автором де канала и его очень крутыми коллегами. Меня даже узнали несколько человек, сказали, что один из адекватных каналов
Чел из программного комитета поделился, что докладов изначально в 3 раза больше, они ценят уникальность (без написанных статей с хабра и повторов с других конф). Если определенных технологий нет - были слабенькие доклады. Иногда программный комитет хочет сходить на несколько докладов и расставляет так, чтобы они не пересекались))
Активности были на стендах партнеров конфы. Я там познакомилась с организатором мероприятий - вы в курсе, что самый базовый стенд без особых наворотов, подиумов и подсветки уже стоит 1 млн??
Игрулек было достаточно, я набрала себе столько мерча, что уже можно продавать)) Из полезного:
Junior:
SQL-запрос для дубликатов?
Зачем нужны индексы в бд?
Middle:
Как вы настроите мониторинг для пайплайна, чтобы знать, если данные не пришли вовремя?
Как обеспечить идемпотентность в пайплайне?
Senior:
Как вы оцените, когда пора переходить от batch-обработки к streaming-архитектуре?
Это было самое прикольное! Мы клали в реальную корзину реальные технологии и шли сканировать на кассу. Представьте: я купила айсберг. А если вы не знали, то Магнит выкупил Азбуку вкуса
Пока собирала архитектуру в Х5, ребята рассказали про крутую собственную разработку. У них много инстансов Airflow, и они синкуются через Redis - там хранятся статусы о состояниях дагов, на которых строятся зависимости. Это просто
В конце был глобальный розыгрыш сумки с мерчом от партнеров и билета на следующий год. Представляете, это выиграла я!!!!
Please open Telegram to view this post
VIEW IN TELEGRAM
1🔥22❤13 11
5. Лидерство в команде
Продолжаю рассказывать про курс команды, и это последний модуль🥲
Четвертый модуль - тут
Описание программы - тут
Темы, которые мы разобрали:
🤩 что такое лидерство
🤩 теории лидерства (личностная, поведенческая, ситуационная, организационно-ролевая)
🤩 работа с приоритетами (матрица Эйзенхауэра, метод ABC)
🤩 способы принятий решений (проблемно-ориентированный, результат-ориентированный, авторский/лидерский, генеративно-критериальный, эволюционный)
🤩 личный бренд руководителя команды
🤩 теория по очень сложным фреймворкам (Human System Dynamics, Warm Data Labs)
Из всех практик мне больше всего понравилась с диаграммой Исикавы. Нам нужно было взять одну проблему и исследовать ее, при этом ничего не решать. Я помню, что в универе нам про это рассказывали, но смысла я тогда не особо уловила
🌧 У одной девушки из подгруппы была необычная проблема: джун начинает фиксить баг, находит несовершенства в архитектуре и начинает все рефакторить. Он делает это хорошо, но задача затягивается на месяц-два. Поэтому ему не поручают хотфиксы, а задачи дают за два-три релиза вперед. В итоге получается, что на оставшуюся часть команды ложится больше нагрузки
В ходе разбора рыбы Исикавы по косточкам мы выяснили, что:
🤩 нет 1:1 и обратной связи
🤩 нет точек контроля
🤩 нет четкой постановки задач
🤩 у задач нет оценки
🤩 у задач нет дедлайна
🤩 джун понимает, что задачи несрочные и неважные
В общем, диаграмма позволяет понять причины, причины причин и т.д. В конце мы обсудили разные сценарии решения и вариант "А что если не решать?" Это тоже как один из подходов к решению
Книги, которые я себе записала:
📒 Маршалл Голдсмит, Марк Рейтер "Триггеры"
📒 Тимоти Феррис "Как работать по 4 часа в неделю и при этом не торчать в офисе «от звонка до звонка», жить где угодно и богатеть"
Ну вот и все, выпускной тоже прошел - пообщались с однокурсниками, преподами и получили дипломы🐸
Продолжаю рассказывать про курс команды, и это последний модуль
Четвертый модуль - тут
Описание программы - тут
Темы, которые мы разобрали:
Нам в лидерах нравится то, что мы в каком-то виде имеем в себе
Бренд руководителя - это истории, которые про нас знают
Из всех практик мне больше всего понравилась с диаграммой Исикавы. Нам нужно было взять одну проблему и исследовать ее, при этом ничего не решать. Я помню, что в универе нам про это рассказывали, но смысла я тогда не особо уловила
В ходе разбора рыбы Исикавы по косточкам мы выяснили, что:
В общем, диаграмма позволяет понять причины, причины причин и т.д. В конце мы обсудили разные сценарии решения и вариант "А что если не решать?" Это тоже как один из подходов к решению
Книги, которые я себе записала:
Ну вот и все, выпускной тоже прошел - пообщались с однокурсниками, преподами и получили дипломы
Please open Telegram to view this post
VIEW IN TELEGRAM
👍17🔥11❤9💅2
Что с нами станет?
На днях зашла в линкедин, и первый пост в ленте был от европейского стартапа. В посте приветствовали нового коллегу - моего контакта (назовем его Паша)
🎹 С Пашей я познакомилась на 2м курсе, когда увидела афишу фортепианного вечера, а он оказался ее оргом. Лайнап артистов уже был собран, но я все-таки напросилась, и мы какое-то время хорошо общались
Я помню Пашу, когда он учился на мехмате, потом наши пути не пересекались. Оказалось, что он получил степень магистра геометрии, а два года назад уехал в Италию учиться на второй маге по ИИ
🍕 Так вот в посте сказано, что он работал бизнес-аналитиком, а потом перешел на AI Engineer/AI Specialist. Я сходила к Паше за краткой пояснялкой этой профессии:
Может, нам тоже надо переквалифицироваться, пока не поздно?
На днях зашла в линкедин, и первый пост в ленте был от европейского стартапа. В посте приветствовали нового коллегу - моего контакта (назовем его Паша)
Я помню Пашу, когда он учился на мехмате, потом наши пути не пересекались. Оказалось, что он получил степень магистра геометрии, а два года назад уехал в Италию учиться на второй маге по ИИ
По существу, разработчик AI архитектур. Соединять, оркестрировать, оценивать перформанс LLM и блоки пре-/пост-процессинга данных для них
Может, нам тоже надо переквалифицироваться, пока не поздно?
Please open Telegram to view this post
VIEW IN TELEGRAM
Lakehouse Podcast
Наконец посмотрела подкаст про лейкхаус с моим прошлым техлидом. Он сейчас CPO в Arenadata, крутой чел, и мне всегда интересно, как у него дела
Сначала я подумала, что это просто рандомный подкаст, в детали не вчитывалась, поэтому вспомнила про него только спустя два месяца😅 А там как раз есть некоторые ответы на вопросы из этого поста. Ниже некоторые моменты, которые мне особенно запомнились
🙂 "На какие бизнесовые метрики влияет лейкхаус?"
У меня сразу возникла ассоциация, как Бах выражался про игру на инструменте))
🙂 Пара интересных метрик
Time to Insight - время до получения инсайта из данных
Time to Data - время до того, как можно воспользоваться данными
🤩 Еще я всегда слышала про Cloud-Native - сервисы, которые изначально пилились под облака
🤩 Но не слышала про Cloud-Ready - изначально не под облако, но легко мигрировать
🤩 А еще гуглинг привел к Cloud-Compatible - не под облако, но работает и с ним
🤩 И к Cloud-Enabled - не под облако, но его как-то адаптировали
🤩 И к Cloud-Agnostic - без использования специфичных сервисов провайдера в отличие от native
На самом деле этих cloud- штук очень много, между ними может быть тончайшая разница или одно быть частью другого
🙂 Финальная мысль
Наконец посмотрела подкаст про лейкхаус с моим прошлым техлидом. Он сейчас CPO в Arenadata, крутой чел, и мне всегда интересно, как у него дела
Сначала я подумала, что это просто рандомный подкаст, в детали не вчитывалась, поэтому вспомнила про него только спустя два месяца
Основная бизнес-метрика - точность попадания в качество и во время
У меня сразу возникла ассоциация, как Бах выражался про игру на инструменте))
Играть на музыкальном инструменте просто. Нужно вовремя нажимать нужные клавиши
Time to Insight - время до получения инсайта из данных
Time to Data - время до того, как можно воспользоваться данными
На самом деле этих cloud- штук очень много, между ними может быть тончайшая разница или одно быть частью другого
Возможно ли, что лейкхаус станет устаревшим раньше, чем большинство компаний успеют его внедрить?
Please open Telegram to view this post
VIEW IN TELEGRAM
YouTube
Lakehouse: почему дом данных всё время на реконструкции
Встречайте 10-й юбилейный выпуск подкаста Pro Данные! Вместе с экспертами из Arenadata и DIS Group обсуждаем Lakehouse. Почему компании сейчас активно присматриваются к этой архитектуре и всем ли стоит туда идти?
В гостях:
Алексей Быков, product owner,…
В гостях:
Алексей Быков, product owner,…
1👍11🔥6❤3 1
Full + Incremental Load
Начала читать книгу "Data Engineering Design Patterns" (2025), 375 страниц. Несколько раз видела хорошие отзывы, по содержанию очень прикольная. Это про паттерны загрузки данных, как лучше работать с ошибками в данных, как организовать правильный перезапуск пайплайна и еще много всего
В книге дали ссылочку на гитхаб с готовым кодом❤
Эта серия постов будет неким конспектом с добавлением моих мыслей
Итак, начнем
Data Ingestion/Загрузка данных
🌷 Full Load
Опасности и решения:
1. Следить за ростом датасета. В идеале не слишком много строк, растет медленно
2. drop-insert - опасная штука, пользователи могут читать в момент записи. Использовать вьюшку:
🤩 пользователи ходят в вьюшку
🤩 вьюшка смотрит на table1
🤩 данные пишутся в table2
🤩 table2 подменяется на table1
У нас реально были такие проблемы:
Что мы сделали:
1. Добавили зависимости по событию от источников
2. shadow calc: создается копия витрины, все манипуляции происходят с копией в стейджинге, в конце делается rename
🪴 Incremental Load
1⃣ Pattern: Incremental Loader
1й способ. Иметь дату загрузки, чтобы определить инкремент. Опасно использовать дату события, потому что они могут долетать позже. Например, временно отключился интернет, события долетели с лагом, а мы уже обработали этот период. Последняя дата загрузки должна где-то сохраняться
2й способ. Делать партиции по времени. Например, даг работает каждый час и всегда берет данные за предыдущий час
Опасности и решения:
1. Для удаленных строк применять soft delete (просто маркируем удаленной) вместо hard delete, иначе они просто останутся у нас в системе
2. Использовать Insert-only/append-only - в табличку только добавляем данные
Реализации:
1. Для даты загрузки - обязательно добавлять фильтр
2. Для партиций по времени - добавить сенсор, который смотрит на появление следующей партиции. Если партиция появилась, значит, текущий период закончился и его можно обработать. Плюс обязательно передать дату в Airflow через {{ ds }}
Я была удивлена, прочитав этот механизм. Все делаем по книжке, получается😎
2⃣ Pattern: Change Data Capture
Используется, когда события нужно получать быстро (~30s). CDC - это стриминг логов журналов (WAL) баз данных
Был приведен пример с Delta Lake, но для Iceberg я тоже нашла примеры
На этом пока все, это была даже не половина всей главы🥺
#depatterns
Начала читать книгу "Data Engineering Design Patterns" (2025), 375 страниц. Несколько раз видела хорошие отзывы, по содержанию очень прикольная. Это про паттерны загрузки данных, как лучше работать с ошибками в данных, как организовать правильный перезапуск пайплайна и еще много всего
В книге дали ссылочку на гитхаб с готовым кодом
Эта серия постов будет неким конспектом с добавлением моих мыслей
Итак, начнем
Data Ingestion/Загрузка данных
Опасности и решения:
1. Следить за ростом датасета. В идеале не слишком много строк, растет медленно
2. drop-insert - опасная штука, пользователи могут читать в момент записи. Использовать вьюшку:
У нас реально были такие проблемы:
File does not exist:
hdfs://warehouse/hive/my_db.db/my_table/2
6-01_29_data.0.parq
It is possible the underlying files have been
updated. You can explicitly invalidate the
cache in Spark by running 'REFRESH TABLE
tableName' command in SQL or by recreating
the Dataset/DataFrame involved.
Что мы сделали:
1. Добавили зависимости по событию от источников
2. shadow calc: создается копия витрины, все манипуляции происходят с копией в стейджинге, в конце делается rename
1й способ. Иметь дату загрузки, чтобы определить инкремент. Опасно использовать дату события, потому что они могут долетать позже. Например, временно отключился интернет, события долетели с лагом, а мы уже обработали этот период. Последняя дата загрузки должна где-то сохраняться
2й способ. Делать партиции по времени. Например, даг работает каждый час и всегда берет данные за предыдущий час
Опасности и решения:
1. Для удаленных строк применять soft delete (просто маркируем удаленной) вместо hard delete, иначе они просто останутся у нас в системе
2. Использовать Insert-only/append-only - в табличку только добавляем данные
Реализации:
1. Для даты загрузки - обязательно добавлять фильтр
f'ingestion_time BETWEEN "{date_from}" AND "{date_to}"'2. Для партиций по времени - добавить сенсор, который смотрит на появление следующей партиции. Если партиция появилась, значит, текущий период закончился и его можно обработать. Плюс обязательно передать дату в Airflow через {{ ds }}
Я была удивлена, прочитав этот механизм. Все делаем по книжке, получается
Используется, когда события нужно получать быстро (~30s). CDC - это стриминг логов журналов (WAL) баз данных
Был приведен пример с Delta Lake, но для Iceberg я тоже нашла примеры
На этом пока все, это была даже не половина всей главы
#depatterns
Please open Telegram to view this post
VIEW IN TELEGRAM
👍29❤8🔥3
Сопутки загрузок
🍔 Data Compaction
Pattern: Compactor
Стриминг пишет очень много мелких файлов, и эта проблема актуальна не только для hdfs, но и для s3. Потому что нужно перебирать много файлов, открывать/закрывать, скапливается куча меты для табличных форматов (delta lake, iceberg, hudi)
🧊 Решение - объединить в файлы побольше. В iceberg для этого есть процедура rewrite_data_files. Если таблица тяжелая, компакшн можно встроить в сам процесс загрузки. Или поставить на внерабочие часы, чтобы никому не мешать. Кстати, со стримингами у нас даже дважды компактится: каждый час и каждый день
После компакшена в iceberg можно почистить все ненужное через expire_snapshots - задаем время, раньше которого удаляются все снепшоты вместе с данными. Или просто подождать, пока оно само по ретеншену отвалится
☕️ Data Readiness
Pattern: Readiness Marker
Иногда потребителям данных нужно знать, что данные готовы
1й способ. Создавать файл по типу _SUCCESS в спарке отдельной таской в Airflow. Тогда потребители через FileSensor смогут мониторить появление файла
2й способ. Дождаться создания следующей партиции - это значит, что текущая закрыта
Тут используется pull - потребители сами чекают, готовы ли данные
⛅️ Event Driven
Pattern: External Trigger
А тут про push - загрузчик сам уведомляет, что данные появились
🌱 🌱 🌱 🌱 🌱
Очень прикольно, что даются описание и примеры для батча и для стриминга. И все с использованием самого популярного: Spark, Flink, Kafka, Airflow, Python, Scala, SQL, иногда с добавлением чисто западных штук
💐 Кстати, мне тут очень понравились несколько выражений из книги, первые два очень поэтичные:
#depatterns
Pattern: Compactor
Стриминг пишет очень много мелких файлов, и эта проблема актуальна не только для hdfs, но и для s3. Потому что нужно перебирать много файлов, открывать/закрывать, скапливается куча меты для табличных форматов (delta lake, iceberg, hudi)
После компакшена в iceberg можно почистить все ненужное через expire_snapshots - задаем время, раньше которого удаляются все снепшоты вместе с данными. Или просто подождать, пока оно само по ретеншену отвалится
Pattern: Readiness Marker
Иногда потребителям данных нужно знать, что данные готовы
1й способ. Создавать файл по типу _SUCCESS в спарке отдельной таской в Airflow. Тогда потребители через FileSensor смогут мониторить появление файла
2й способ. Дождаться создания следующей партиции - это значит, что текущая закрыта
Тут используется pull - потребители сами чекают, готовы ли данные
Pattern: External Trigger
А тут про push - загрузчик сам уведомляет, что данные появились
Очень прикольно, что даются описание и примеры для батча и для стриминга. И все с использованием самого популярного: Spark, Flink, Kafka, Airflow, Python, Scala, SQL, иногда с добавлением чисто западных штук
Sad but true, but your data engineering life will rarely be a bed of roses.
However, life is not that rosy.
Always expecting the worst is probably not the best way to go through life, but it’s definitely one of the best approaches you can take to your data engineering projects.
#depatterns
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥13❤4👍4
Козырной вопрос
А у вас есть прикольный вопрос, который вы спрашиваете у руководителя на собесе? Команда, стек, задачи, скрам/не скрам - это все понятно. Но есть ли что-то особенное?
Мне когда-то один из коллег вкинул вопрос про мемы. Когда я не чувствую метча с лидом и понимаю, что я не хочу там быть, то я не спрашиваю. Но если мне ребята и проект кажутся прикольными, то я спрашиваю:
Ответы бывали разные:
🤩 нет такого
🤩 нет такого, но есть рабочий чат, в принципе туда можно кидать (но никто так не делает)
🤩 у нас спринты называются мемами
🤩 есть, я даже могу найти последние, но они больше про инфраструктуру у клиента
🤩 есть в корп мессенджере
🤩 есть и в корп, и в тележке
🤩 конечно! есть командный, еще есть для всего департамента
Становится понятно, какие люди там работают, обстановка сразу разбавляется, и мы заканчиваем на положительной ноте)
А у вас есть прикольный вопрос, который вы спрашиваете у руководителя на собесе? Команда, стек, задачи, скрам/не скрам - это все понятно. Но есть ли что-то особенное?
Мне когда-то один из коллег вкинул вопрос про мемы. Когда я не чувствую метча с лидом и понимаю, что я не хочу там быть, то я не спрашиваю. Но если мне ребята и проект кажутся прикольными, то я спрашиваю:
А у вас есть мемный чатик?
Ответы бывали разные:
Становится понятно, какие люди там работают, обстановка сразу разбавляется, и мы заканчиваем на положительной ноте)
Please open Telegram to view this post
VIEW IN TELEGRAM
👍17🔥9💅4❤1
Управляем ошибками. Часть 1
Unprocessable Records
К нам пришли строки, которые падают при обработке. Есть 2 пути:
1. Сразу падать. Но если в стриминге много плохих событий, то мы замучаемся постоянно переподнимать
2. Не падать, а игнорить их - но особым образом
📒 Pattern: Dead-Letter
Что делаем:
1. Определяем места в коде, где что-то может упасть
2. Оборачиваем в try/catch, if/else
3. Добавляем мету для анализа ошибки
4. Пишем зафейленные строки/файлы в другую папку
5. Добавляем алерты
6. Пишем пайплайн для перезапуска из зафейленной папки (опционально)
У нас есть подобная штука - если поля критичные и точно не должны быть пустыми, то такие данные сразу складываются отдельно. Но разбираются ли причины - это загадка)🤷♀️
Опасности и решения:
1. Если просто отфильтровать некорректные записи, то другие пайплайны будут использовать неполные данные. Но если мы поправим ошибки и перепроцессим, то пользователям тоже придется все перезапускать. А там может быть 20 пайплайнов, которые ссылаются друг на друга. И вообще у них может быть не реализован пересчет за прошлое🙂
2. Чтобы отличать скорректированные записи, может понадобиться какой-нибудь флаг
3. Можно заполнять NULL при ошибке, но тогда придется сравнивать: это действительно NULL-значение в источнике, или что-то пошло не так? (мне не нравится)
4. Обязательно алертить, если количество проигноренных данных очень большое, и даже останавливать джобу. Прям с остановкой дальнейших тасок я не сталкивалась, интересный подход на полном доверии к dq)
🌿 🌿 Duplicated Records
Pattern: Windowed Deduplicator
Для батча все просто: distinct/dropDuplicates или окно с row_number = 1
Для стриминга нужно выделить временное окно и сохранять уже обработанные уникальные ключи:
В этом примере ключи будут храниться в течение 10 минут. Если для новой записи ключ уже существует, он скипнется. Если тот же самый ключ придет через 11 минут, то будут дубликаты
#depatterns
Unprocessable Records
К нам пришли строки, которые падают при обработке. Есть 2 пути:
1. Сразу падать. Но если в стриминге много плохих событий, то мы замучаемся постоянно переподнимать
2. Не падать, а игнорить их - но особым образом
Что делаем:
1. Определяем места в коде, где что-то может упасть
2. Оборачиваем в try/catch, if/else
3. Добавляем мету для анализа ошибки
4. Пишем зафейленные строки/файлы в другую папку
5. Добавляем алерты
6. Пишем пайплайн для перезапуска из зафейленной папки (опционально)
У нас есть подобная штука - если поля критичные и точно не должны быть пустыми, то такие данные сразу складываются отдельно. Но разбираются ли причины - это загадка)
Опасности и решения:
1. Если просто отфильтровать некорректные записи, то другие пайплайны будут использовать неполные данные. Но если мы поправим ошибки и перепроцессим, то пользователям тоже придется все перезапускать. А там может быть 20 пайплайнов, которые ссылаются друг на друга. И вообще у них может быть не реализован пересчет за прошлое
2. Чтобы отличать скорректированные записи, может понадобиться какой-нибудь флаг
3. Можно заполнять NULL при ошибке, но тогда придется сравнивать: это действительно NULL-значение в источнике, или что-то пошло не так? (мне не нравится)
4. Обязательно алертить, если количество проигноренных данных очень большое, и даже останавливать джобу. Прям с остановкой дальнейших тасок я не сталкивалась, интересный подход на полном доверии к dq)
Pattern: Windowed Deduplicator
Для батча все просто: distinct/dropDuplicates или окно с row_number = 1
Для стриминга нужно выделить временное окно и сохранять уже обработанные уникальные ключи:
.withWatermark("visit_time", "10 minutes")
.dropDuplicates(["visit_id", "visit_time"])
В этом примере ключи будут храниться в течение 10 минут. Если для новой записи ключ уже существует, он скипнется. Если тот же самый ключ придет через 11 минут, то будут дубликаты
#depatterns
Please open Telegram to view this post
VIEW IN TELEGRAM
❤13👍7🔥4
Управляем ошибками. Часть 2
📼 Late Data
Посмотрим на 3 паттерна для работы с данными, которые пришли позже, чем ожидалось
1️⃣ Pattern: Late Data Detector
Пока что для меня сложная и непонятная история про стриминг. В целом, книга легко читается, но требуется время, чтобы переварить. Все концепты сжаты, но очень насыщенны. Перечитаю, когда нужно будет с этим работать
Интересная мысль - "shifting the late data problem"
Пример: мы пишем партиции по времени обработки. В партицию 21:00 к нам залетел кусок данных за 20:00 и 19:00. А наши пользователи используют партиции по времени события. Тогда мы перекладываем ответственность ковыряться в этих партициях на них😁
2️⃣ Pattern: Static Late Data Integrator
Как вообще можно перегрузить данные за прошлое?
1. Создать кучу дагранов, где каждый перегружает 1 день. Если упало - перезапускаем конкретный день
2. Создать один дагран, где в коде генерируется список нужных дат. И по каждой дате запускается загрузка. Если упало - просто перезапускаем, пойдет считаться с упавшего дня. Это и есть Static Late Data Integrator. А статическое - потому что мы сами задаем 14 дней или сколько угодно
И тут я поняла, что неосознанно это и делала. У нас часто была проблема, что данные в источники просто не приходили😁 Потом мы шли разбираться с владельцами, и данные заливались, но позднее. Чтобы это учитывать, в моем подходе был такой алгоритм:
1. Задаем стартовую и конечную даты расчета
2. Создаем диапазон значений
3. Из меты достаем существующие партиции
4. Находим разницу
5. Итерируемся по потеряшкам
Если в будущем снова будет пустая дата, нам не придется перезапускать определенный день - он пойдет считаться сам
3️⃣ Pattern: Dynamic Late Data Integrator
Предлагается завести табличку с 4 полями:
🤩 партиция
🤩 время обработки
🤩 время добавления новых записей
🤩 флаг обработано или нет
Так мы запросом можем найти партиции, которые уже обрабатывались, но в которые попали новые данные. А в iceberg есть удобное свойство last_updated_at на уровне таблицы
🤩 Filtering
Pattern: Filter Interceptor
Как будто это антипаттерн. Предлагается создать доп колонки с фильтрами id_is_not_null, status_is_not_failed и выводить количество отфильтрованных записей, чтобы понимать, на каком этапе ошибка в коде или в данных. Но прям пробегаться по каждой записи в датафрейме… Как будто это все-таки dq
🌳 Fault Tolerance
Pattern: Checkpointer
Просто нужно создавать чекпоинты и хранить последний оффсет обработанной записи и состояние, если оно есть
Еще раз напомнили про семантики доставки:
🤩 exactly once - нужны другие паттерны, расскажу, когда дойду
🤩 at least once - чекпоинт после обработки, могут быть дубликаты при перезапуске после падения
🤩 at most once - чекпоинт до обработки, данные потеряются при падении
#depatterns
Посмотрим на 3 паттерна для работы с данными, которые пришли позже, чем ожидалось
Пока что для меня сложная и непонятная история про стриминг. В целом, книга легко читается, но требуется время, чтобы переварить. Все концепты сжаты, но очень насыщенны. Перечитаю, когда нужно будет с этим работать
Интересная мысль - "shifting the late data problem"
Пример: мы пишем партиции по времени обработки. В партицию 21:00 к нам залетел кусок данных за 20:00 и 19:00. А наши пользователи используют партиции по времени события. Тогда мы перекладываем ответственность ковыряться в этих партициях на них
Как вообще можно перегрузить данные за прошлое?
1. Создать кучу дагранов, где каждый перегружает 1 день. Если упало - перезапускаем конкретный день
2. Создать один дагран, где в коде генерируется список нужных дат. И по каждой дате запускается загрузка. Если упало - просто перезапускаем, пойдет считаться с упавшего дня. Это и есть Static Late Data Integrator. А статическое - потому что мы сами задаем 14 дней или сколько угодно
И тут я поняла, что неосознанно это и делала. У нас часто была проблема, что данные в источники просто не приходили
1. Задаем стартовую и конечную даты расчета
2. Создаем диапазон значений
full_range = pd.date_range(start=str(start_dt), end=str(end_dt)).strftime("%Y-%m-%d")
3. Из меты достаем существующие партиции
def get_existing_partitions(table_name):
partitions = (
spark.sql(f"show partitions {table_name}")
.select(F.split(F.col("partition"), "=")[1].alias("dt"))
.collect()
)
return [p[0] for p in partitions]
4. Находим разницу
lost_range = full_range.difference(existing_partitions_pdf)
5. Итерируемся по потеряшкам
for dt in lost_range:
calc_mart(dt)
Если в будущем снова будет пустая дата, нам не придется перезапускать определенный день - он пойдет считаться сам
Предлагается завести табличку с 4 полями:
Так мы запросом можем найти партиции, которые уже обрабатывались, но в которые попали новые данные. А в iceberg есть удобное свойство last_updated_at на уровне таблицы
Pattern: Filter Interceptor
Как будто это антипаттерн. Предлагается создать доп колонки с фильтрами id_is_not_null, status_is_not_failed и выводить количество отфильтрованных записей, чтобы понимать, на каком этапе ошибка в коде или в данных. Но прям пробегаться по каждой записи в датафрейме… Как будто это все-таки dq
Pattern: Checkpointer
Просто нужно создавать чекпоинты и хранить последний оффсет обработанной записи и состояние, если оно есть
Еще раз напомнили про семантики доставки:
#depatterns
Please open Telegram to view this post
VIEW IN TELEGRAM
❤10👍7🤔1