Where is data, Lebowski – Telegram
Where is data, Lebowski
237 subscribers
83 photos
2 videos
83 links
Канал про разное в data-мире:
- от библиотек визуализации до data egineering
- от графиков до элементов разработки
- от .csv до API
Download Telegram
Clickhouse-notes ⚡️
.
Продолжаем грузить клик
.
 В предыдущих notes рассмотрели загрузку:
- из файла CSV
- из датафрейма
- или сериализованых данных через HTTP интерфейс
.
Но есть и другие варианты: не всегда же источником являются файлы на локальной машине:
- S3 (наиболее частый случай, поставщик данных в S3 нас не интересует)
- Kafka и всё стримоподобное
- другие базы, например,  Postgres\MySQL
,
В каждом из кейсов могут свои оптимальные шаблоны, мы рассмотрим только возможности, имеющиеся в Clickhouse.
.
1️⃣Для загрузки из S3 используется S3 Engine (у нас еще будет задачка с S3 😉).
Тут всё довольно просто, нужны креды (access_key, access_secret_key), сам файл на S3 и табличная фукнция:
SELECT , toDate(now(), 'UTC') as row_inserion_date 
FROM s3('https://storage.yandexcloud.net/public-bucket-6/reddit/.parquet',
'$AWS_ACCESS_KEY',
'$AWS_SECRET_ACCESS_KEY',
'Parquet',
'id String,
noscript String,
score Nullable(Int64),
num_comments Nullable(Int64),
author String,
created_utc DateTime64,
url String,
upvote_ratio Float,
over_18 Bool,
edited Bool,
spoiler Bool,
stickied Bool,
subreddit_name_prefixed String
')
SETTINGS input_format_allow_errors_num = 2000,
max_threads = 16;


Что здесь хитрого:
- бакет public-bucket-6
- папка reddit
- дальше магические звезды ⭐️ (и это не распаковка Python)
- формат файла Parquet
Особенность при загрузке: необходимо указать схему данных (наименование поля и его тип, если схема не совпадает с тем, что есть на самом деле будет ошибка ).
⭐️() - нужны для pattern matching - чтобы загрузить данные из всех файлов в папке reddit - первая * = даты (партиционирование, обычно по датам), вторая * = просто все файлы с расширением .parquet.
.
В процесcе ETL обычно создаётся VIEW над S3 из нее данные загружаются в STG table (чтобы изменения на S3 не влияли на дальнейший процесс прогрузки).
ps: креды на чтение оставлю в комментах, чтобы вы могли попробовать😉.

Настройка, max_threads распараллеливает чтение😎
,
2️⃣ Следующий способ: движки баз данных или движки таблиц, на примере Postgres.
Довольно интересный способ, но в рабочей среде доводилось только экспериментировать. Например, при помощи движка таблицы можно легко получить структуру таблицы и перенести её в Clickhouse, так как он считает нужным:
CREATE TABLE stage.TMP_PG AS  SELECT * FROM postgresql('192.168.55.121:5432', 'tmp', 'kp_search_results', 'shpz', '12345', 'stage') WHERE 1; 


А вообще в команде принятым способом доставки данных из реляционок является Debezium (это уже история про потоковые данные). Но если нет инфры под Debezium и Кафка - движки баз данных и таблиц - хороший выбор👌
.
3️⃣ Последний кейс: потоковые данные из Кафка.
Хотя и в Clickhouse есть специальный движок, но даже экспериментировать не приходилось, тк для этого в команде есть Kafka-Connect.
ETL выглядит так: Kafka -> Kafka-Connect -> raw table in Clickhouse -> MV -> data table:
1. Kafka-Connect формирует собщение простой структуры:
record_timestamp DateTime64 - время записи сообщения
record_value JSON - тело сообщения


2. Структуру выше хранит raw table
3. MV парсит JSON сообщение и записывает их в data table
4. Пользователь ходит уже в data table

Вообще парадигма MV (materialized view)\Projection очень крутая, возможно надо ей уделить пару заметок.

#clickhouse #3 #extract #load
Маленький, но крайне полезный
.
Сегодня для вас маленький хук, при создании таблицы в клике по образу и подобию уже существующей не обязательно переписывать схему, достаточно указать эту аналогичную таблицу:
CREATE TABLE sandbox.table as stg.another_table
ENGINE=MergeTree
ORDER BY .....
SETTINGS ....
Это полезно, например при тестировании различных настроек: кодеки сжатия, партицирование, движки, секция ORDER BY.

#clickhouse #4 #liketable
👍2
С Новым годом, все причастные к миру данных🎄
.
ps: не будет долгих речей, просто будьте счастливы🦄

И напишите в комментах ваш любимый новогодний фильм 🎥
.
#newyear #2023
4
Праздники пришли когда не ждали
.
Раскопал интересный нюанс, в процессе парсинга дат. Попробуйте выполнить такой код:
SELECT 
parseDateTimeBestEffort('1950-01-01');

.
Результатом будите удивлены😳

#clickhouse #причуды
​​⚡️Clickhouse-notes - ответы (просвещаемся на каникулах 🙃)
.
Под одним из постом был комментарий:
Если не ошибаюсь, то стоит помнить, что order by сортирует только в рамках партиции, а не по всей таблице

.
Вероятно, предполагается парт данных, партиции - это про деление данных на части по какому-либо признаку (чаще всего встречается по времени, например, партицией может быть день\неделя\месяц).
Парт данных в контексте семейства Merge Tree - некоторый кусок неслитых данных, например, при вставке - это блок вставляемых данных (условно 1000 или 100000 строк).
.
Проверим применяется ли секция ORDER BY ко всей таблице.
Для экспериментов написал небольшой pipeline (github repo) для получения текущей погоды по API openweathermap.
DDL raw таблицы:
CREATE DATABASE cdc;

CREATE TABLE cdc.openweathermap_raw
(
record_timestamp DateTime64(6),
record_value String
)
ENGINE=MergeTree
ORDER BY (record_timestamp)
;


Pipeline выполняется по cron каждые 5 минут. Секция ORDER BY указывает сортировку по record_timestamp - это говорит, что сам парт будет отсортирован по указанному полю и все слитые парты (то есть таблица) также будут отсортированы по record_timestamp, что подтверждается экспериментально (см скрин 1 в комментах).
.
Какой нюанс стоит учитывать:
только что вставленный парт может не быть отсортирован (см скрин 2): только что вставленная строка оказывается в начале данных, спустя некоторое время она в фоне будет отсортирована.
.
GIthub репу еще будем использовать для разбора других интересных особенностей clickhouse.
#clickhouse #orderby
🦆Утки или 🐍 змеи ?
.
Мир данных изменяется очень быстро, не успел приготовить утку по-duckdb-овски, на повестке дня уже новый схожий инструмент и в твоём любимом питончике - chdb (от создателей Clickhouse).
.
Инструменты уже успели сравнить - тут 🤔
.
Парочку статей для ознакомления:
🦆 Переводная статья о duckdb
🐍 О внутреннем устройстве
🐍 Про chdb от Антона Жиянова (человек, который развивает и много разрабатывает для SQLite)
.
А вы что-то трогали, использовали?
ps: будет серия заметок про duckdb (построим небольшой пайплайн)
#duckdb #chdb
🔥2👍1
⚡️Clickhouse-notes - Оператор EXCHANGE
.
Довольно занятный оператор, с ним познакомил меня коллега. Когда я поэкспериментировал с ним, то был приятно удивлен простоте использования и тем возможностям которые он даёт. Синтаксис маскимально прост:
EXCHANGE TABLES stg.exchange_table AND odm.table;

Для предыдущего поста, использовал его, чтобы сменить ORDER BY для таблицы cdc.openweathermap_raw 😉
.
Как следует из названия, оператор что-то где-то обменивает♻️, а именно:
- атомарно обменивает имена двух таблиц или словарей (читай обмениваются ссылки на данные)
.
Какие +:
🎈 можно атомарно подменять данные разных таблиц, партиций (настолько быстро, насколько возможно и для пользователя downtime -> 0️⃣). Как обычно проиходит батчевая загрузка: загружаем батч в stg, удаляем из целевой таблицы, перегружаем из stg в target. Всё бы ничего, но такой паттерн не для Clickhouse, операции update\delete для него тяжелые и лучше бы обойтись без них. EXCHANGE даёт такую возможность: загрузили всю таблицу (партицию) в stg , обменяли c target и DROP или TRUNCATE. Да, есть минус: необходимо место для хранения дубликата всей таблицы (если перегрузка происходит полностью) - но порой это оправданные затраты
🎈 можно менять DDL таблицы фактически без downtime (ведь если меняются имена таблиц, то можно поменять и их определение). Например, выставили вы неправильную секцию PARTITION BY или ORDER BY или в целом захотели их изменить или начали использовать кодеки для сжатия (кстати, тоже интересная тема, говорят, что узким местом в любом случае является чтение с диска, а разжатие данных происходит очень быстро, поэтому важнее сжать, чтобы меньше читать с диска ☝️).
Паттерн такой: создаете рядом таблицу с нужным DDL и обмениваете их -> старая таблица имеет новый DDL (новые данные сортируются\сжимаются\партицируются по-новому), старые данные нужно только перелить. Или пока данные льются в подменную таблицу, модифируем старый DDL (добавляем сжатия, меняем партиционирование ....) -> обмениваем обратно, новый кусок данных в подменной таблице заливаем.
.
💪На практике так меняли движки таблиц (для стрима аппметрики), добавляли сжатие
.
#clickhouse #exchange
🔥3👍1
⚡️Clickhouse-notes - MATERIALIZED VIEW
.
Для поста про ORDER BY была создана github репа, в ней есть пайплайн который каждый 5 минут получает погодные данные по API и складывает их в raw таблицу.
Глянуть схему 👀
.
MATERIALIZED VIEW (MV) - можно рассматривать как триггер, реагирующий на вставку данных (в отличие от триггеров в реляционках, реагирует на блок вставляемых данных, а не на отдельную строку ☝️). В Clickhouse MV отделено от целевой таблицы, поэтому для себя MV представляю как процесс, запускающийся для вставляемых данных и загружающий их в целевую таблицу.
.
При помощи MV можно решать ряд задач:
▪️парсинг сырых данных
▪️фильтрация сырья
▪️роутинг данных
▪️агрегация данных
.
На примере решим первую задачку - парсинг сырья. В нашем пайплайне исходные данные сохраняются в таблицу cdc.openweathermap_raw:
CREATE TABLE cdc.openweathermap_raw
(
record_timestamp DateTime64(6),
record_value String
)
ENGINE=MergeTree
ORDER BY (record_timestamp)

где record_value - json объект с погодой, который получаем из api. Создадим MV, который будет парсить json и сохранять в таблицу cdc.openweathermap. Посмотреть пример json можно на сайте.
Пишем DDL целевой таблицы:
CREATE OR REPLACE TABLE cdc.openweathermap
(
record_timestamp DateTime64,
dt DateTime,
main String,
denoscription String,
temp Float64,
feels_like Float64,
pressure Float64,
humidity Float64
)
ENGINE=MergeTree
PARTITION BY toStartOfWeek(record_timestamp)
ORDER BY (main, denoscription, record_timestamp)

Вытаскиваем поля: время (оно в UTC), общее описание погоды, детальное описание, температура, ощущаемая температура, давление (ГПаскаль), влажность.
Именно MV и будет парсить и перекладывать данные из сырья в cdc.openweathermap. Напишем запрос, который парсит данные:
SELECT record_timestamp ,
FROM_UNIXTIME(toUInt64(JSONExtractString(record_value, 'dt'))) AS dt,
JSONExtractString(arrayElement(JSONExtractArrayRaw(record_value, 'weather'), 1), 'main') AS main,
JSONExtractString(arrayElement(JSONExtractArrayRaw(record_value, 'weather'), 1), 'denoscription') AS denoscription,
JSONExtractFloat(JSONExtractString(record_value, 'main'), 'temp') AS temp ,
JSONExtractFloat(JSONExtractString(record_value, 'main'), 'feels_like') AS feels_like,
0.750062 * JSONExtractUInt(JSONExtractString(record_value, 'main'), 'pressure') AS pressure,
JSONExtractUInt(JSONExtractString(record_value, 'main'), 'humidity') AS humidity
FROM cdc.openweathermap_raw

Чтобы превратить тыкву в золушку необходимо добавить код создания MV: по классике что создаем, где создаем и для для MV куда пишем данные (этот кусочек ниже):
CREATE MATERIALIZED VIEW cdc.openweathermap_mv
TO cdc.openweathermap
...
🔥1
Полный код найдете в репозитории
.
Наш процесс парсинга готов - теперь только НОВЫЕ поступающие данные будут последовательно проходить стадии: вставка в raw, выполнение MV, вставка в целевую таблицу (сам процесс происходит автоматически в фоне).
Чтобы и старые данные прошли через MV нужно дополнительно присесть:
- можно через обмен таблицами
- можно до создания MV выполнить соответствующий инсерт в целевую таблицу, после создать MV
.
Это не очень удобно, но очень гибко: на практике так парсятся логи аппметрики (в Q1 хотим роутить данные тем же механизмом) + делал кейс, когда N топиков кафки парсятся в одну таблицу событий (просто все MV смотрят в одну целевую + структуру единая).
.
В теме MV существуют проекции (projection) - аналогичный механизм, только процесс создания упрощен и менее гибок, но содержит большой +, все данные имеющиеся в исходной таблице будут пропущены через проекцию (об этом я с ужасом узнал на продовом кластере😱).
Естественно тестить надо на чем-то большом и мощном, выбрал аппметрику (сотни млн событий ежедневно, лог за пару последних лет). В рамках одной из задач требуется оценка кол-ва данных за день, создал проекцию (group by date) и думаю, только новые данные будут обрабатываться. Каково же было моё удивление, когда в результате запрос я обнаружил результаты всей истории (стоит отметить, что получились они за пару секунд🚀, в то время как обычный запрос агрегации будет работать десятки минут, если не упадет по памяти).
.
Для глубокого погружения рекомендую отличное видео 🎥
#clickhouse #materializedview #mv #view
🔥2👍1
♻️ Апдейт к посту про ORDER BY
.
В репе используется Python pipeline для обращения к API (решили и решили). Но в ClickHouse есть табличная функция url, используя которую можно описать альтернативный способ загрузки данных.
.
Например, вот так получаем ответ от API:
SELECT * 
FROM url('https://api.openweathermap.org/data/2.5/weather?lat=55.567586&lon=38.225004&appid=YOUR_APPID&units=metric',
JSONAsString, 'record_value String')

.
Добавляем дополнительное поле с record_timestamp, синтаксис INSERT
и получаем готовый код загрузки сырья из API
Теперь достаточно по расписанию выполнять данный скрипт☝️
.
ps: в работе стараюсь искать наиболее нативный способ (с использованием встроенного функционала) решения задачи - а как вы предпочитаете:
✈️ строить самолет
🎧 или использовать минимум средств

.
#clickhouse #url #alternative
👍2
Никогда не задумывались Как слоники 🐘 превратились в сливу🍈 🤔
.
#meme
👍1😁1
​​⚡️Clickhouse-notes MATERIALIZED VIEW - 2
.
Развиваем наши знания по матвьюхам:
- в первых сериях мы настроили парсинг данных (из "сырого" json выбираем нужные поля и складываем в отдельную таблицу cdc.openweathermap)
.
Теперь собираем дневную статистику:
- мин\макс\сред для температуры, давления, влажности + кол-во записей.
Решаем задачу аналогичным путем: используем materialized view, только теперь будут нюансы:
- приемной таблицы не будет (но это не точно 😉), то есть нет секции TO ....
- определим параметры MV при её создании (Engine, PARTITION BY, ....)
- используем ключевое слово POPULATE для применения MV ко всем существующим данным ( ☝️будьте аккуратны с ним, если в вашей таблице 100500млн строк - может быть ресурсозатратно).
.
Составляем первую часть запроса - создание и параметры MV:
CREATE MATERIALIZED VIEW odm.openweathermap_daily_stats_mv
ENGINE = SummingMergeTree
PARTITION BY toStartOfWeek(dated_at)
ORDER BY (dated_at)
POPULATE
AS
...

.
🔹ENGINE = SummingMergeTree - специальный движок для агрегаций (count\max\min\avg)
На самом деле существует только один общий движок AggregatingMergeTree - это мы увидим в DDL который сохранится в базе, просто его синтаксис чуть сложнее и для простый агрегаций есть такой алиас SummingMergeTree
🔹PARTITION BY toStartOfWeek(dated_at) - партиционирование MV по неделям
🔹POPULATE - данных в таблице не так много -> применим MV ко всем существующим данным
.
Вторая часть DDL MV это запрос SELECT с нужной нам агрегацией данных, например, он мог бы быть таким:
SELECT toDate(record_timestamp) AS dated_at,
count(1) AS record_counts,
min(temp) AS min_temp,
max(temp) AS max_temp,
avg(temp) AS avg_temp,
...
FROM cdc.openweathermap o
GROUP BY dated_at

.
Но так получится сделать только для завершенных дней, но для дней данные по которым приходят наш пайплайн должен обновлять статистику, чтобы этого достичь необходимо хранить промежуточные данные (например, для вычисления среднего необходимо хранить сумму и кол-во значений) - такое состояние называется State и определяется как: фукнция агрегации + постфикс State, например, avg -> avgState. Данные состояния и хранятся в базе, а в момент запроса пользователя необходимо рассчитать конечный результат ("смержить"), для этого используется постфикс Merge:
avg -> avgState -> avgMerge. Merge функция применяется уже к полям MV (или таблицы, в которую складывается результат). Итого запрос DDL для MV выглядит так:
CREATE MATERIALIZED VIEW odm.openweathermap_daily_stats_mv
ENGINE = SummingMergeTree
PARTITION BY toStartOfWeek(dated_at)
ORDER BY (dated_at)
POPULATE
AS
SELECT toDate(record_timestamp) AS dated_at,
countState(1) AS record_counts,
minState(temp) AS min_temp,
maxState(temp) AS max_temp,
avgState(temp) AS avg_temp,
...
FROM cdc.openweathermap o
GROUP BY dated_at

.
Для получения данных запрос выглядит так:
- запрашиваем данные из MV
- выполняем теже агрегации
- постфикс Merge

SELECT dated_at,
countMerge(record_counts) AS record_counts,
minMerge(min_temp) AS min_temp,
maxMerge(max_temp) AS max_temp,
avgMerge(avg_temp) AS avg_temp,
...
FROM odm.openweathermap_daily_stats_mv
GROUP BY dated_at


И вишенка на торте 🍒, чтобы скрыть от пользователя данную логику, необходимо обернуть Merge запрос в VIEW, например, odm.openweathermap_daily_stats_v. Полный код найдете в репо.

🧐 А где же хранятся промежуточные данные, вы могли бы спросить? MV не хранит данные, при таком DDL под капотом создаётся внутренняя таблицы, в нашем случае .inner_id.5bacf24b-9e5a-4874-95c4-5ad93eaacbde, в ней и лежат предагрегаты (скрины в репо и в комментах).

#clickhouse #mv
🔥1
​​ O - Optimization
.
В далеком посте , который рассказывал про оптимизацию с помощью условий WHERE
всё уже было сказано и вот второй раз наступил на грабли (пока еще нет, но мог бы 🔨).
.
Также недавно прошел курс по GreenPlum, конспект можно найти тут. В числе прочего была рассмотрена тема оптимизации, где отмечено:
Признаки неоптимального SQL-запроса:
- лишние сортировки
- distinct
- union вместо union all
- Условия join, содержащие неравенства или OR (тк будет использован Nested Loop )
- Неоптимальные фильтры по ключу партицирования (явное использование ключа без преобразований)
- Неоптимальные фильтры по полям, входящим в индексы:
-В Greenplum версии 6 индексы B-tree, построенные на значениях функции от значений полей, используются в запросах с фильтрами только при построении плана Postgres.
-При построении плана запроса GPORCA использование в фильтрах любых функций от полей, по которым построены индексы на таблицу, исключает использование индексов и приводит к неоптимальному выполнению запроса.

.
Тк много работаю с Clickhouse, я думал он не такой, он же быстрый🚀, но не тут-то было. В работе использую планировщик explain у него есть ряд опций, например:
- estimate показывает сколько будет прочитано строк, засечек индекса и тд

В одном из запросов была такая конструкция:

WHERE datetime >= '2024-01-15' and datetime < toDate('2024-01-15') + INTERVAL '1 day'

.
Кажется, слишком много букв, переписал так:

WHERE toDate(datetime) = '2024-01-15'

И запрос начал выполняться чуть дольше, стоит отметить что таблица партицинирована по месяцам.
Вывод explain estimate для первого случая rows = 24064519

Для второго rows = 312864901 (будет прочитана вся таблица 😱)

Итог: используйте планировщик, много интересного он может рассказать - ClickHouse Explain

#optimization #clickhouse
🔥2
​​📔 Executable Jupyter Notebooks
.
Не редко (особенно в аналитических целях) разработка ведется в Jupyter Notebooks и несколько
больно переносить наработки в py-модули, в хотелось бы запустить что-то: ./my-notebook.ipynb
или run my-notebook.ipyb. Например, в Databricks так и сделали, notebook является единицей job\task и исполняется под капотом.
.
Может и нам так можно🧐
.
Возможностей не так много, но они есть:
- сохранить jupyter как скрипт .py . При таком сохранении часть кода специфичная для notebooks может быть не выполнена, например
display или отдельно стоящий вывод: df.tail()
- библиотека papermill - вот оно наше сокровище, позволяет запускть jupyter notebooks, а также их параметризовать, ну все как мы любим
.
Для создания параметризированного notebook необходимо:
- указать ячейку с параметрами, например, в ячейке может быть код:
# external params
period = "day"
output_file = "tmp.txt"


- указать, что ячейка = ячейка с параметрами: необходимо добавить cell tag = parameters к той ячейке, где располагаются переменные-параметры. papermill парсит их и устанавливает переданные вами значения. Где это находится смотри на скрине в комментариях или в репо.
- описать обработку параметров в коде (необязательно =))
- запустить jupyter notebook

papermill extract.ipynb -p period "week" -p output_file "weather-forecast.json" executed.ipynb --log-output --log-level INFO

.
Все артефакты найдете в репо

#papermill #jupyter #runasnoscript
🔥2
​​☝️ Последнее слово (но это не точно)
.
В предыдущих сериях по clickhouse-notes мы создали:
- таблицу сырья
- таблицу распарсенных данных и матвью, её наполняющую
- матвью с подневной статистикой и вьюху над ней
.
Дополним сет, данными последней точки: последние полученные данные. Например, нам пригодится для отображения
на дашборде последних значений.
.
Варианты есть различные, мы поступим так:
- создаём таблицу с движком ReplacingMergeTree, odm.openweathermap_last_point - чтобы хранить только последнюю запись (но не всё так просто)
- создаем MV для поиска этой последней записи, odm.openweathermap_last_point_weather_mv
.
Структура таблицы:
CREATE TABLE odm.openweathermap_last_point
(
last_ts DateTime,
temp Float64,
pressure Float64,
humidity Float64,
metric String
)
ENGINE = ReplacingMergeTree(last_ts)
ORDER BY metric
SETTINGS index_granularity = 8192

- ReplacingMergeTree(last_ts) - оставляем только последнюю запись
- ORDER BY metric - синтетическое поле, по его совпадению и "схлопываются" дубли
.
Полный код таблицы и MV в репе. А пока посмотрим на возможные артефакты:
- от СClickhouse у нас есть гарантия 100% схлопывания дублей
- это может произойти в любой момент времени (хоть сразу, после вставки дубля, хоть через полдня)
.
Что в такой ситуации делать:
- нативный метод - схлопнуть дубли в момент запроса, используя ключевое слово FINAL
SELECT *
FROM odm.openweathermap_last_point
FINAL


FINAL - использование может быть очень накладно, если нужно схлопнуть довольно большой объем данных (не наш случай, поэтому мы им и воспользуемся). Чтобы скрыть от пользователя особенности технической реализации необходимо создать, например, odm.openweathermap_last_point_weather_v
.
Другие способы найдете в репе (сортировка и выборка 1 строки, использование argMax).

🫶 Скоро зафиналим какой-нибудь клевый дашборд

#clickhouse #mv #replacingmergetree
👍2🔥2
Пятничное радостное
.
Вот такой получился рефакторинг😍

#airflow #dag #рефакторинг
🔥3👍2🏆2🦄1
​​🔊 Митап, митап, митап
.
С частью команды DWH, во-первых встретились, во-вторых, встречу провели с пользой: сходили на митап по Greenplum от ЯCloud.
.
1️⃣ В первой части Дмитрий Немчин из Tinkoff поведал тайны обслуживания GP. Презантация была скорее обзорной, тк интереснее было послушать про то как устроены их небольшие фреймворки по обслуживанию, а названия у них очень забавные:
- отстрел плохишей по различным правилам - Raskolnokov
- управление вьюхами и ролевой моделью gpACL, gpViews
- управление жизненным циклом данных (удаление, охлаждение, партиционирование) Guillotine
- мониторинг запросов, очередей, нагрузок - Inquisitor
.
Понравился процесс работы с песочницей: данные в ней живут 21 день, после дропаются без разговоров - очень хочется внедрить такой процесс у нас, так sandbox - маленькое болотце. Есть и другой подход, который базируется не на времени, а на пропорции между кол-вом запросов в песочницу и в прод, если доля песочницы первыщает то идем разбираться.
.
2️⃣ Во второй части, Кирилл рассказал о расширении Yezzey - позиционируется как охлаждение данных в s3, для организации гибридного хранения. Но по результатам тестом выглядит так, будто можно все данных хранить в s3, а мощности GP только для Compute использовать, ну и диски будут только для spill файлов😁
Эта часть нас и заинтересовала, тк есть большая потребность в охлаждении данных с последующим их использованием.
.
Ну и конечно, не обошлось без технических шуток. Номинация приз вечера получила шутка: Вы всё еще используете HEAP таблицы 😉

#gp #greenplum #охлаждение #meetup
👍3🔥2
Это заслуживает 5 звёзд
.
Когда приходится использовать нетривиальное (или наоброт тривиальное)
определение JSON или неJSON, наподобие такого:


CASE WHEN (json_answer ->> 'answer') LIKE '%{%' THEN json_object_keys((json_answer ->> 'answer')::json)
ELSE Null END

.
То ожидаешь какого-то интересного кейса, но такого подвоха не ожидал. Одна строка ломает всё:

1 строка, 1 строка из 2млн, Карл


#de #work #буднидатаинженера
😢1
​​Как много мы пишем boilerplate code🤨
.
Стоит сказать, что иногда и прикольно его пописать😉 Но в рабочем потоке задач, хочется, чтобы рутинные
действия были атоматизированы, а особенно создание нового дага, проекта или чего-то подобного.
.
На помощь придет библиотека cookiecutter, наконец-то до неё добрался. И снова здесь магия шаблонизатора jinja2 😍
.
Рекомендую ознакомиться с jinja2, мне отлично помог краткий курс, тк шаблонизация встречается всё чаще и чаще:
- шаблоны при веб-разработке (Flask\Django)
- dbt = SQL over jinja2
- cookiecutter - печеньки 🍪, обернутые в jinja2
.
Напиши, где ты еще встречал шаблонизацию при помощи jinja2👀
.
Процесс создания шаблона выглядит очень просто:
- описываем структуру
- добавляем необходимые файлы
- внутри файлов можно указать шаблонный код и подстановку нужные значений при вводе
.
Официальной доки обычно хватает, но мне потребовалась статья, в ней показано дерево папок при создании шаблона.
.
Пройдя по этому пути, у меня получился простой шаблон для создания дага (но как водиться, он решает 80%), также поэкспериментировал с шаблоном для DA ресерча. Найти примеры тут
.
На просторах github существует большое разнообразие шаблонов под различные нужды, мне приглянулись:
- Python Best Practices
- pyscaffoldext-dsproject
.
#template #cookiecutter #jinja2
1
​​Распараллель меня, если сможешь
.
TaskFlow и динамический маппинг тасок в AirFlow. Эти две возможности
также открыли для меня коллеги. Стоит отметить, что программирование дагов для меня
неPythonic way - странный синтаксис, странная логика, как будто неPython внутри Python🤷‍♂️
.
Две вышеобозначенные возможности не добавляют питонячности, хотя и можно найти какие-то сходства с
функциональным программированием.
.
TaskFlow - набор удобных декораторов для создания дагов, тасок, как Python-функций. Теперь, вместо:

def etl():
pass

with Dag(...) as dag:
etl = Python(task_id="etl", python_callable=etl, ...., da=dag)


Можно написать:

@dag(....)
def create_dag():

@task
def etl():
pass

create_dag()

.
Красивый код получается, когда используются подходы в чистом виде: или классический или декораторы, когда классический стиль и TaskFlow смешиваются получается каша 🥣. TaskFlow даёт одно очевидное преимущество: TaskFlow сам заботиться о перемещении данных между input\output tasks, как если бы вы использовали обычные функции - это очень круто🤘 Пример ниже:

@task
def get_data_from_api() -> List[]:
...
return data

api_data = get_data_from_api()

@task
def etl(data: List[]) -> None:
# transfrom and save data
...

etl = etl(data=api_data)

.
Второй интересный паттерн: динамический маппинг тасок. Сегодня у тебя 1 хост, а завтра 7. Сегодня нужно 3 таски за разные дни, а завтра 8. Чтобы на зависеть от неизвестных входящих параметров следует брать на заметку динамический маппинг. Реализуется он довольно просто: у каждого оператора (таска) есть методы, например, partial, expand, что это нам дает:

1️⃣ Генерим несколько дат и выполняем etl за каждую

@task
def get_dates() -> List[str]:
return ['2024-02-01', '2024-02-04', '2024-02-03']

@task
def etl(dated_at: str) -> None:
# some code, which required from dated_at

dates = get_dates()

# метод expand - позволяет вызвать экземпляр таски для каждого элемента списка
etl = etl.expand(dated_at=dates)


2️⃣ На s3 N файлов, нужно однотипно все обработать или просто загрузить

@task
def find_s3_files() -> List[str]:
return [{'filepath': 's3://file_0.parquet'},
{'filepath': 's3://file_1.parquet'},
{'filepath': 's3://file_2.parquet'}]

files = find_s3_files()

# используем partial для задания аргументов одинаковых для каждой таски
# выполняем загрузку каждого файла отдельной таской
load = ClickhouseOperator.partial(
task_id="load",
connection_id="conn_id",
sql="""INSERT INTO stg.table SELECT * FROM s3('{{ params.filepath }})'""",
).expand(params=files)

.
Ранее такое приходилось реализовывать через циклы, а начиная с версии 2.3 у нас есть удобная возможность - используйте💪
.
Самое интересное, что маппить можно не только 1 -> N, но и Nin -> Nout, то есть рельтута одного маппинга подавать на вход другого и работать это будет именно так как ожидается: не сначала N первых тасок, а после N последующих. Таски свяжутся в последовательные кусочки: N1 -> N1, N2 -> N2 и тд 🔥

Набросал небольшой пример, как можно работать с API при помощи динамического маппинга - покрутите, экспериментируйте.

🔗 Links:
- TaskFlow
- Dynamic Task Mapping
- Create dynamic Airflow tasks (astronomer)

#airflow #taskflow #mapping
🔥4👍1
🎨 Сам себе Кандинский
.
Финалим серию постов про Clickhouse и наши разборки с погодой.
.
📒Рецепт приготовления:
- данные о погоде (вьюха со среднедневными данными + last_point)
- Metabase BI
.
Естественно, пришлось поработать напильником 🪚, тк Metabase из коробки не умеет ходить в Clickhouse, но
сообщество не дремлет и выкатили версию с его поддержкой.
.
Ссылку на версию Metabase + Clickhouse и внешний вид дашборда найдете в репе (или ниже).
.
🔗 Links:
- Metabase + clickhouse-driver
- Run Metabase in docker
- Get started in Metabase

Визуализация дело тонкое, проэтому если тебе нужно прокачать этот навык, то вот рекомендации:
- Бесплатный курс от DataYoga (Рома Бунин в создателях)
- Визуализация данных и введение в BI-инструменты от ЯПрактикум (Рома Бунин + Александр Богачёв - убийственное комбо) или можно почитать книгу Богачёва Графики, которые убеждают всех
.
#bi #clickhouse #dash #dashboard
👍4