DE – Telegram
523 subscribers
313 photos
81 videos
15 files
406 links
Data Engineering Technologies.
SQL, Python, Kafka, Spark, Pandas, Airflow, Clickhouse, Greenplum, Postgres, dbt, LLM agentic systems, AI, robots, drones etc.

Boost channel - https://news.1rj.ru/str/boost/data_engi
Download Telegram
Вчера выкатили стабильный Python 3.14.

Коротко: t‑строки (template strings), отложенная оценка аннотаций + annotationlib, подинтерпретаторы в stdlib, удалённая отладка (pdb -p), официальный free‑threaded CPython как вариант сборки, compression.zstd, UUID v6/v7/v8, цветной REPL, экспериментальный JIT в бинарях macOS/Windows, новый Python Install Manager для Windows.

#python #release
9😁1
Forwarded from DataEng
Последние года 4 я использовал Apache Airflow исключительно в облаке, преимущественно в Amazon — Amazon Managed Apache Airflow. И как обычно бывает, в облаках всё так или иначе между собой связано. Логи хранятся в Cloud Watch, воркеры запускаются в изолированной среде (Amazon Fargate). С июля месяца я стал активно использовать self-hosted Airflow на своих серверах (для своих личных целей), и в целях экономии храню всё в файлах. Так уж получилось, что задачу с регулярной "чисткой" я постоянно откладывал и вот настал час X, когда всё легко из-за исчерпания inodes в файловой системе. Для этого случая я написал DAG, который каждый день в полночь чистит папки со старыми логами, делюсь с вами вдруг он пригодится:

import os
import shutil
from datetime import datetime, timedelta

import pendulum
import structlog
from airflow.sdk import DAG, task

logger = structlog.get_logger(__name__)

@task
def cleanup_airflow_logs(days_to_keep):
log_base_path = os.environ.get("AIRFLOW_HOME", "/opt/airflow") + "/logs"
cutoff_date = datetime.now() - timedelta(days=days_to_keep)

for root, dirs, files in os.walk(log_base_path):
for dir_name in dirs:
dir_path = os.path.join(root, dir_name)
try:
if os.path.getmtime(dir_path) < cutoff_date.timestamp():
logger.info(f"Deleting old log directory: {dir_path}")
shutil.rmtree(dir_path)
except Exception as e:
logger.error(f"Error deleting directory {dir_path}: {e}")


with DAG(
dag_id="airflow_log_cleanup_dag",
start_date=pendulum.datetime(2025, 10, 1, tz="Asia/Almaty"),
schedule="@daily", # Run daily at midnight
catchup=False,
default_args={
"owner": "airflow",
"retries": 2,
"retry_delay": timedelta(minutes=5),
},
max_active_runs=1,
) as dag:
cleanup_airflow_logs(days_to_keep=14)


Здесь учитывается стандартный шаблон именования логов и директорий, включающий дату и время. Я по привычке использую structlog для ведения логов.
11❤‍🔥2
Глаз да глаз нужен за этими затейниками 🙂
Please open Telegram to view this post
VIEW IN TELEGRAM
😁10
Кстати, чтобы написать свой простейший MCP сервер очень просто:


from fastmcp import FastMCP

mcp = FastMCP("Demo 🚀")

@mcp.tool
def add(a: int, b: int) -> int:
"""Add two numbers"""
return a + b


FastMCP - это небольшая обертка над библиотекой Starlette, которая позволяет легко создавать MCP серверы - тут вы видите и noscript, и denoscription, и inputSchema каждого метода.

#MCP #протоколы #LLM #AI
❤‍🔥4👏21
😁6💯21
12 Factor Agents - принципы разработки агентов

По следам от 12 Factor Apps, Декстер Хорти собрал ключевые принципы по разработке агентов, делюсь!

1) Преобразование естественного языка в вызовы инструментов. Агент интерпретирует команды в формализованные вызовы функций, а исполняет их — детерминированный код
2) Владей своими промптами (как кодом). Промпт — это артефакт. Его надо хранить, версионировать, тестировать и относиться к нему не как к важной части кода.
3) Управляй контекстом агента. Активно контролируй, что попадает в это окно. Убирай уже не нужное, суммаризируй, оставляй важное, выделяй ограниченные слоты под фичи — внимание модели ограниченно.
4) Вызов тулинга — это просто JSON. LLM генерирует название тула и параметры, остальное должен делать ваш детерминированный вход. Модель решает "что делать", а ваше приложение "как делать"
5) Держи тех.состояние и бизнес состояние вместе. Каждый шаг агента — это и логика, и данные. Всё в одном состоянии, чтобы потом не ковыряться по логам в поисках, где оно сломалось
6) API для управление циклом агента. Агент должен иметь простой API для запуска, приостановки и возобновления работы, уметь вставать на паузу и возвращаться к работе спустя время на каждом этапе жизненного цикла
7) Запрос к человеку — как инструмент. Любое обращение в процессе работы к человеку должно быть представлено агенту, как инструмент requestClarification(message:string)
8) Явно контролируйте логику исполнения. Модель лишь "говорит" что, а твой код решает стоит ли выполнять, когда и как.
9) Добавляйте ошибки исполнения инструментов в контекст. Если что-то упало, сократи информацию и добавь это в контекст агента. Пусть модель сама догадается, что дальше: повторить, спросить или изменить стратегию
10) Используйте малых специализированных агентов. Лучше много маленьких агентов, решающих узконаправленные задачи, чем один перегруженный агент всего
11) Доставляй агентов в любые интерфейсы, удобные пользователю. Slack, Telegram, веб, CLI — неважно. Главное — единый API и омниканальность. Пользователь не должен думать, где общаться с агентом. Он просто должен это делать.
12) Агент как Stateless Reducer. На входе — состояние и запрос. На выходе — новое состояние и ответ. Всё как в хорошем Redux. Никакого собственного внутреннего состояния, он должен работать как чистая функция

Бонус:
13) Предзагрузи все необходимое. Агент не должен постоянно дёргать одни и те же данные. Наполняй контекст тем, что почти наверняка пригодится.

Репа: https://github.com/humanlayer/12-factor-agents
Доклад: https://www.youtube.com/watch?v=8kMaTybvDUw
6😁1
😶‍🌫️ Agentic AI от Andrew Ng - как строить ИИ, который не просто отвечает, а действует: планирует шаги, вызывает инструменты и улучшает себя через рефлексию.

Что внутри:

▶️ Паттерны: Reflection, Tool Use, Planning, Multi‑Agent.
▶️ Интеграции: базы данных, API, веб‑поиск, исполнение кода, Model Context Protocol (MCP).
▶️ Практика и продакшн: evals, error analysis, оптимизация и деплой.
▶️ Капстоун: собираем Research Agent, который сам планирует ресёрч и выдаёт развёрнутый отчёт.


Формат: 5 модулей, ~6 ч, self‑paced, уровень Intermediate. По завершении - сертификат.

🔜 Курс: https://deeplearning.ai/courses/agentic-ai/

Если ты делаешь ассистентов, RAG или внутренние сервисы - этот курс закрывает "как" и "почему" агентные пайплайны реально работают.

#edu #course #ai #llm #agents
Please open Telegram to view this post
VIEW IN TELEGRAM
❤‍🔥6👏21
Оказывается, если вы хотите подчеркнуть, что ваш контент "NOT BY AI", то для вас уже сделали специальный trademark - https://notbyai.fyi/

Эти ребята сделали целый бизнес по продаже этих вотермарок (не знаю, насколько успешный). Для индивидуального использования вы можете просто задонатить сколько хотите - и значки отправят вам на почту. Но корпоративные юзер - расчехляйте кошельки🌚

Просто удивительно, на чем только люди не делают деньги
😁8
Forwarded from DataEng
Как я чищу метаданные Apache Airflow 3.x

В блоге Apache Airflow есть скрипт для периодической чистки метаданных в БД Apache Airflow. Но начиная с Airflow 3.0 невозможно обращаться к БД напрямую (через модели SQLAlchemy), в связи с этим скрипт не работает. Даже вызов shell команды airflow db clean через BashOperator выдаст ошибку по типу:

Could not parse SQLAlchemy URL from string 'airflow-db-not-allowed:///': source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"


RuntimeError: Direct database access via the ORM is not allowed in Airflow 3.0


Как решить? Я нашел выход запуска через старый добрый Cron:

0 0 * * * /home/airflow/.airflow/bin/airflow db clean --clean-before-timestamp "$(date -d '7 days ago' +'%Y-%m-%d %H:%M:%S')" --skip-archive -y


В полночь команда запускается и чистит все данные , оставляя только последние 7 дней. Но это работает на моём личном self-hosted Airflow, как быть с MWAA я пока не знаю, т.к. не обновлялся и не знаю как поведёт себя DAG.
11
7😁922❤‍🔥1
🚀 Как не сжечь Airflow при масштабировании Celery до 16 воркеров

Если ты решился расти с 1–2 нод до целого кластера - держи короткий гайд, как не утопить Redis и не ловить OOM-ы.


0⃣ Сначала делай замеры, потом крути ручки

Включи celery events и следи за:

🔘 потреблением RAM / CPU каждого воркера;
🔘 p50/p90/p99 времен выполнения задач;
🔘 latency брокера (redis-cli --latency) - держи p95 < 10–20 мс.


1️⃣ Сделай три тестовых DAG’а

Чтобы понять, где предел, сделай три типа задач:

🔘 чисто CPU (например хеширование);
🔘 I/O (сети, файлы);
🔘 смешанные.

Поднимай concurrency по шагам - смотри, где падает throughput или растёт latency.

2️⃣ Найди конкуренцию на ядро

Идеал - максимум задач без лагов и OOM.
Обычно выходит 0.5–0.7 × число ядер.

Пример: 8 vCPU 🔜 ставь concurrency ~5.


3️⃣ Посчитай ёмкость для 16 воркеров

🔘 Определи конфиг ноды: 8 vCPU, 16–32 ГБ RAM.
🔘 Дальше просто:
per_worker_conc = cores × коэффициент
cluster_conc = per_worker_conc × 16

🔘 Проверяй память:
RAM ≈ per_task_RSS × per_worker_conc × 1.3

🔘 Redis и Postgres - лучше на отдельных нодах, не внутри оркестратора.


4️⃣ Настройки Celery / Airflow

🔘 worker_concurrency = твоя формула;
🔘 worker_prefetch_multiplier = 1–2;
🔘 task_acks_late = True (если идемпотентно);
🔘 таймауты = p99 × 1.5–2;
🔘 тяжёлые DAG’и в отдельные очереди.


5️⃣ Как понять, что всё ок

🔘 Redis p95 < 20 мс;
🔘 задержка queued 🔜 running < 10 с;
🔘 RSS < 85 % лимита;
🔘 CPU БД < 70 %;
🔘 нет OOM и "heartbeat lost".


✔️ Быстрый старт

▶️ Воркеры: 8 vCPU / 16–32 ГБ RAM
▶️ Concurrency ~5
▶️ Prefetch = 1
▶️ 16 нод ➡️ ~80 одновременных тасок

#airflow #celery #redis #config #ha #cluster
Please open Telegram to view this post
VIEW IN TELEGRAM
184
😁122
Agents 2.0: от примитивных циклов к глубокомыслящим агентам

Год назад ИИ-агент был простым while-циклом: промпт → LLM → инструмент → повтор. Решало обычные быстрые задачи типа "какая погода в Токио", но на задачах в 50 шагов, который могут выполняться несколько дней агент терялся, зацикливался, галлюцинировал.

Сейчас архитектура меняется. Deep Agents не просто реагируют, они планируют, делегируют работу специализированным саб-агентам и хранят память вне контекста.

В заметке выделяется четыре ключевые составляющие:
- Явное детальное планирование через цепочку размышлений с сохранением в markdown to-do с отслеживанием прогресса
- Иерархическая оркестрация с распределенением задач между узкими специалистами под-агентами (researcher, coder, writer) и именно они уже вызывают инструменты. Оркестратору возвращается только резальтат
- Внешняя память в файловой системе или векторных БД вместо переполненного контекста. Причём часть под-агентов могут только писать в память, а часть - только читать!
- Промпты на тысячи токенов с детальными инструкциями и указанием структур доступных инструментов и когда стоит подключить человека.

Интересное краткое отражение факта эволюции развития агентских систем.
Полезно и для понимания архитектуры и для презентаций.

#DeepAgents #Orchestrator #SubAgents
———
@tsingular
91
Forwarded from Время Валеры
На днях в open source выпустили распределённую файловую систему, которая рассчитана на эксабайты (тысячи петабайт).

Сделали это чуваки из XTX, мощные трейдеры, которые известны двумя вещами: тем, что у них (по крайней мере недавно) был топ-3 кластер по количеству ГПУ, и тем, что их основатель, Александр Герко, так любит Лондон, что каждый год платит 500+ млн фунтов налогов на доходы как физическое лицо.

Из интересного (они выделили 9 пунктов, но только 5 мне кажутся отличительными)

Has no single point of failure in its metadata services.
Is hardware agnostic and uses TCP/IP to communicate.
Utilizes different types of storage (such as flash vs. hard disks) cost effectively.
Exposes read/write access through its own API over TCP and UDP, and a Linux kernel filesystem module.
Requires no external service and has a minimal set of build dependencies

Начали работы над системой в 2022 году, в середине 2024 мигрировали весь ML

TernFS' metadata is split into 256 logical shards. Shards never communicate with each other. This is a general principle in TernFS: Splitting the metadata into 256 shards from the get-go simplifies the design, given that horizontal scaling of metadata requires no rebalancing, just the addition of more metadata servers.

Ну и заодно свой формат сериализации разработали, чтобы разработчики передвигали не json, thrift, а что-то там свое.

Еще из интересного - обсуждение когда нужно зеркалить файлы, а когда делать Reed-Solomon coding.

Рекомендую почитать
❤‍🔥62
Forwarded from DataEng
The Annual Airflow Survey

Если вы пользуетесь Apache Airflow, то давайте поможем сообществу собрать больше информации об использовании Airflow. До 20 ноября необходимо заполнить опросник на сайте взамен вы получите возможность пройти сертификацию по Airflow от Astronomer бесплатно (я сдавал их экзамен и он хороший). Опросник небольшой, замёт не более 10 минут вашего времени.
10
😁11
Время Валеры
На днях в open source выпустили распределённую файловую систему, которая рассчитана на эксабайты (тысячи петабайт). Сделали это чуваки из XTX, мощные трейдеры, которые известны двумя вещами: тем, что у них (по крайней мере недавно) был топ-3 кластер по количеству…
Новая ФС от алготрейдеров? Почему бы и нет. Говорят, каждый большой техно-бизнес рано или поздно пишет свою файловую систему - вот и XTX Markets не стали исключением. Они выкатили в open source собственную распределённую FS под названием TernFS. Эта система способна переваривать эксабайты данных и рассчитана на миллионы файлов и высокопараллельные нагрузки - то, что надо для их ML-ферм, где обычные NFS/Ceph уже не вывозят.

▶️ Конечно, чудес не бывает. TernFS не любит совсем мелкие файлы (<2 МБ) и не рассчитана на бесконечное создание миллионов отдельных каталогов. Файлы в ней только дописываются, но не изменяются после факта (immutable-дизайн). Своей системы разрешений/аутентификации нет - этим должен заниматься внешний уровень. Проще говоря, TernFS хороша для больших объёмов статичных данных (логов, моделей, бэкапов, результатов расчётов и пр.), но не годится как замена обычной FS в сценариях с частыми правками и мелочью.

▶️▶️ Вывод: XTX проделали тот же трюк, что и Google с Facebook - создали свою распределённую ФС под специфичные задачи, но внезапно поделились ею с миром. Репозиторий открыт (GPLv2) на GitHub - xtxmarkets/ternfs. Так что если однажды придётся укрощать эксабайты данных или просто хочешь разобраться, как устроена современная файловая система такого масштаба 6 почему бы не заглянуть? Всё-таки не каждый день финансисты выкатывают в open source технологии на уровне топовых IT-грандов 😉.

#fs #algotrading #trading #de
Please open Telegram to view this post
VIEW IN TELEGRAM
7😁1
Forwarded from DataEng
Ребята из Qdrant запустили бесплатный семидневный курс про свою векторную базу данных: Qdrant Essentials

Меня в последнее время интересует тема векторного поиска и векторных БД, и Qdrant как раз неплохой кандидат на изучение и реализацию какого-нибудь проекта (спойлер: домашнего векторного поисковика). Курс прям как по заказу!
❤‍🔥6😁1
Lakehouse 1.0 vs 2.0

🚀 Lakehouse 1.0 - попытка починить склад

Первая версия Lakehouse решала боль Data Warehouse, но делала это по-старому:

💾 Закрытая логика: Метрики и схемы жили в силосах BI-систем. Любая миграция - боль.

🔄 Одна сущность - три версии правды: Отчёт маркетинга ≠ отчёт продаж ≠ отчёт ops.

⚙️ Тесно связанные компоненты: compute + storage = монолит.

🔒 Vendor lock: Форматы, метаданные, даже governance принадлежали платформе-вендору.

🧩 Нет нативной семантики: Каждый инструмент крутил свои метрики вручную.

Результат - дорого, фрагментированно и очень трудно масштабировать без потери доверия к данным.

〰️〰️〰️〰️〰️〰️〰️〰️〰️〰️〰️〰️〰️

🏄‍♂️ Lakehouse 2.0 - построить систему, как работают команды

Второе поколение решает ключевую проблему - делает данные живыми, гибкими и нативными для разных стеков:

🧠 Semantic / Metrics Layer: Единый слой правды. Метрики, онтологии и контракты данных живут вместе.

🌐 Децентрализованные операции: Каждая команда (Sales, Mkt, Ops) может обновлять общую истину через свои доменные API.

🧩 Composable Compute: Можно выбирать движок: Spark, Trino, Dremio - без зависимости от формата.

🪶 Open Formats & Metadata: Никаких "запертых" схем. Метаданные - общие, интероперабельные.

🗂️ Catalog beyond data: Каталог объединяет логику, политику, lineage и ресурсы - мультиинструментальная, не моновендорная экосистема.

Lakehouse 1.0 - это инфраструктура.
Lakehouse 2.0 - это культура данных.

#de #dwh #lakehouse
Please open Telegram to view this post
VIEW IN TELEGRAM
6❤‍🔥2👏1