дата инженеретта – Telegram
дата инженеретта
2.98K subscribers
242 photos
28 videos
4 files
102 links
мелкое — крупно,
в глубоком разговоре
мудрость приходит

по вопросам сюда: @aigul_sea
Download Telegram
🔗Обязательные импорты в Spark-приложении

# сессия
from pyspark.sql import SparkSession
# функции
from pyspark.sql import functions as F
# типы данных
from pyspark.sql import types as T
# оконки
from pyspark.sql.window import Window


F и T - это code-style, принятый в PySpark, чтобы избежать пересечений с другими либами. В коде будет так: F.function(args).
И вообще импортируем только то, что нужно. import * - это моветон.

// датафрейм и сессия
import org.apache.spark.sql.{DataFrame, SparkSession}
// функции
import org.apache.spark.sql.functions._ // импорт всего
// udf (кастомные функции) и оконки
import org.apache.spark.sql.expressions.{UserDefinedFunction, Window}
// типы данных
import org.apache.spark.sql.types._


В отличие от питона, в скале нужно указывать типы аргументов в функциях, поэтому мы дополнительно импортируем DataFrame, UserDefinedFunction и Window, т.к. они наиболее часто используются. А сами оконки лежат в модуле functions.
def func(df: DataFrame, time_window: Window): DataFrame = {...}


#spark
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥10
🫡Кастомная сортировка

Бывает так, что есть поле, по которому нужно отсортировать, но по алфавиту ну никак не подходит. Тогда можно искусственно создать поле для сортировки через case when и его аналоги:

SELECT color
FROM test
ORDER BY
CASE color
WHEN 'red' THEN 1
WHEN 'orange' THEN 2
WHEN 'yellow' THEN 3
WHEN 'green' THEN 4
WHEN 'blue' THEN 5
WHEN 'indigo' THEN 6
WHEN 'violet' THEN 7
END


#sql_tips
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥38
Объявление переменных Python🐍 vs Scala🏔

Если мы хотим объявить несколько переменных в одной строке, на Python это будет так:
a, b = 'a', 1.1


На Scala нужно добавить ключевое слово val/var и обрамить скобочками:
val (a, b) = ("a", 1.1)


Для удобства можно указать типы полей:
val (a:String, b:Double) = ("a", 1.1)


val - неизменяемый, от слова "value"
var - изменяемый, от слова "variable"

По возможности старайтесь использовать константы, это:
понижает риски появления багов
повышает читаемость кода
позволяет оптимизироваться под капотом, зная, что значения меняться не будут

В Scala есть еще одна интересность - lazy val.
val выполняется, когда мы создали переменную.
lazy val выполняется, когда мы впервые обратились к этой переменной.
Если мы никогда до нее не доберемся, то ресурсы тратится не будут💳

#python_tips #scala_tips
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥95💯1
Вопрос на подумать

Почему вчера запрос отрабатывал 5 минут, а сегодня 2 часа?
Please open Telegram to view this post
VIEW IN TELEGRAM
1
Так почему запрос стал медленнее?

Это вопрос с собесов, и тут главное - не перечислить все варианты, а показать, что вы умняши и соображаете🤓

📝Я бы выделила вот эти группы:
- появилось в разы больше данных
- параллельно с таблицей что-то происходит, и она залочена
- меньше ресурсов
- поменялись индексы
- поменялся план запроса (например, нужно помочь и добавить хинты)
- сетевые проблемы
- конфигурационные изменения (админы взяли и уменьшили параметры)

💬Очень полезные варианты - в комментариях к предыдущему посту.

🧐И, прежде чем столько ждать, лучше провести свой анализ, посмотреть на каунты, план запроса, спросить у коллег. Иначе 2 часа могут растянуться и на подольше🥺

#собес
Please open Telegram to view this post
VIEW IN TELEGRAM
💯12🔥52
🎙Задачка с собеса

Смотрим на картинку.
В первой табличке хранятся приходящие события в разбивке по типу. Во второй - что мы должны получить в итоге.

🛍Представим себе, что это интернет-магазин. Пусть событие 1 - это клик на кнопку "Каталог", а событие 2 - это "Добавить в корзину". Т.е. на нашем сайте сначала 4 раза кликнули по каталогу, потом 3 раза положили в корзину и т.д. Разные дни - не суть важно.

Теперь мы хотим определить, когда каждое из типов событий началось, а когда закончилось. К примеру, 5го января пошло событие 2, и мы считаем, что это конец события 1.

Какие идеи?

#собес
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥5
💡Ответ на задачку💡

🤪Я посмотрела ваши варианты, планы запросов. В целом, интересные идеи, но выглядят немножко громоздкими, и мне пришлось посидеть, подумать, позапускать частями, чтобы понять, что происходит внутри. В плане читаемости сложновато.

Я предлагаю такое решение:
SELECT
event_type,
date AS date_start,
LEAD(date) OVER(ORDER BY date) AS date_end
FROM (
SELECT
event_type,
date,
LAG(event_type, 1, -1) OVER(ORDER BY date) AS prev_event_type
FROM events) t
WHERE event_type != prev_event_type;


Что происходит?

1️⃣ Подтягиваем предыдущий тип события с офсетом и значением по умолчанию.

LAG - предыдущий элемент, LEAD - последующий.
offset = 1 - берем предыдущий элемент, c 2 был бы предпредыдущий и т.д.
default_value = -1 - применится для самой первой строки, т.к. еще не с чем сравнивать. Отрицательные события вряд ли будут, поэтому значение кажется безопасным для любой выборки.

2️⃣ Оставляем только те строки, где текущее событие не равно предыдущему.

После этого этапа в данных лежит:
event_type  date        prev_event_type
1 2024-01-01 -1
2 2024-01-05 1
1 2024-01-08 2
...


3️⃣ Остается только подтянуть следующую дату по порядку.

Готово!
👍
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥8❤‍🔥22
Как считать датасет в Spark?

Все, мы импортировали все, что нужно, создали сессию, нашли датасет, готовы работать!

Самые популярные способы:
csv
df = (
spark.read
.option('header', True)
.option('delimiter', ';')
.option('inferSchema', False)
.csv(path)
)


val df = spark.read
.option("header", "true")
.option("delimiter", ";")
.option("inferSchema", "false")
.csv(path)


На месте path может быть 'file.csv', несколько файлов 'file1.csv,file2.csv' или вся папка целиком 'myfiles/csv_data'.

Из опций основные:

🐱 header - заголовок
🐱 delimiter - разделитель полей
🐱 inferSchema - автоматический определитель типов полей

На больших данных использовать inferSchema опасно!

Почему? Потому что спарк пойдет сканировать каждую строчку, чтобы определить корректный тип. Допустим, есть колонка age, заполненная числами типа int, но в сааамом конце у нас все поехало и попало число типа double. В конечном итоге столбец будет тоже иметь тип double, но для этого мы просканировали все на всякий случай. И так с каждым полем. Ну такое😕

Поэтому надо самим задать схему и указать ее при чтении файла:
table_schema = StructType([
StructField('name', StringType(), True), # True - допустимы ли null
StructField('age', IntegerType(), True),
...
])

df = (
spark.read
.option(...)
.schema(table_schema)
.csv(path)
)


StructType - класс для структуры датафрейма. StructField - для поля. Лежат в spark.sql.types.

json, parquet, hive-таблица

df = spark.read.json(path)
df = spark.read.parquet(path)
df = spark.table('db_schema.table_name')


У нас есть HDFS (Hadoop Distributed File System) - это распределенная файловая система, в которой обычно хранятся большие данные. Там они лежат в виде файликов внутри папочек. А Hive - это обертка, чтобы мы могли читать данные из файликов через sql-запросы🐝

#spark
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥52💯1
В таблице test есть поле dt с датой и временем в виде строки. Будет ли разница в запросах?
1. SELECT MAX(dt::date) FROM test;
2. SELECT MAX(dt)::date FROM test;
Anonymous Quiz
36%
да, первый запрос отработает быстрее
51%
да, второй запрос отработает быстрее
13%
нет, запросы одинаковы
🔥8
К чему был предыдущий вопрос?

🧳На днях у нас с коллегой возник такой кейс: есть 50 таблиц, содержащих колонку с датой, но там есть дата, дата и время и unix timestamp. Цель - нужно их все объединить. БД - ClickHouse.

Итерация 1:
SELECT 
max(
if(
table_name LIKE 'offline%',
toDate(FROM_UNIXTIME(dateTime::INT)),
toDate(dateTime)
)
) AS max_date
FROM schema.source_table

Количество считанных строк - 46 млрд. Время работы - 2,5 минуты. Ну... долго ждать, давайте еще раз внимательно посмотрим🤓

Итерация 2 - вынесли приведение даты со временем к дате в самый конец:
SELECT
toDate(
max(
if(
table_name LIKE 'offline%',
FROM_UNIXTIME(dateTime::INT),
dateTime
)
)
) AS max_date
FROM schema.source_table

Количество считанных строк - 46 млрд. Время работы - 1,5 минуты. Уже лучше, но можно ли еще быстрее?

Итерация 3 - вынесли приведение unix timestamp к дате со временем:
SELECT
toDate(
if(
table_name LIKE 'offline%',
FROM_UNIXTIME(max(dateTime)::INT),
max(dateTime)
)
) AS max_date
FROM schema.source_table

Количество считанных строк - 25 млн. Время работы - 6 секунд. Супер! Нам этом остановимся👍
В итоге мы сначала находим максимальные значения и только потом кастуем. Выигрыш во времени - в 25 раз.

🕒Время и строки можно посмотреть в системной табличке system.query_log в столбцах query_duration_ms, read_rows.

🐢В первых двух случаях в плане запроса есть шаги:

ReadFromMergeTree (schema.source_table) - читаем таблицу

Expression ((Conversion before UNION + (Projection + Before ORDER BY))) - кастуем к дате в каждом подзапросе

🐆В третьем:

ReadFromStorage (MergeTree(with Aggregate projection _minmax_count_projection)) - вот эта проекция сильно быстрее, чем скан всей таблицы, а лишних приведений уже нет. Profit👍

#sql_tips
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥6👏6❤‍🔥1💯1🆒1
🎁Распаковка переменных

У нас есть список, кортеж, что угодно, и нам нужно достать оттуда элементы:
person = (
'Eveline',
(1990, 12, 31),
'Moscow',
'+79271111111',
'+79272222222'
)

# Вместо того, чтобы обращаться по индексу:
name, birth_date, city, phone1, phone2 = person[0], person[1], person[2], person[3], person[4]

# можем сразу распаковать вот так:
name, birth_date, city, phone1, phone2 = person


Тогда в name пойдет имя, в birth_date - дата рождения и т.д.
Самое главное - количество переменных слева и элементов в объекте справа должно совпадать.

💫Хотелки
# Хочу скипнуть некоторые поля:
name, _, city, phone1, _ = person

# Хочу сложить телефоны в список:
name, birth_date, city, *phone_list = person
# список может идти в начале и середине тоже

# Хочу взять имя и phone2:
name, *_, phone2 = person

# Хочу взять только имя:
name, *_ = person

# Хочу взять только phone2:
*_, phone2 = person

# Хочу взять имя и год рождения:
name, (year, *_), *_ = person


#python_tips
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥1821
Как переименовать базу данных?

У меня часто бывали случаи, когда нужно было переименовать базу данных, но она не переименовывалась. В том проекте базу каждый сам создавал под свою задачу, поэтому операции с ней ни на кого не влияли.

Что нужно сделать?
Диалект - MS SQL Server
--1. Переключиться в другую базу
USE master;
GO
--2. Установить однопользовательский режим
ALTER DATABASE [old_database] SET SINGLE_USER WITH ROLLBACK IMMEDIATE;
GO
--3. Поменять название
ALTER DATABASE [old_database] MODIFY NAME = [new_database];
GO
--4. Восстановить режим
ALTER DATABASE [new_database] SET MULTI_USER;
GO


#sql_tips
🔥15🌚1
Как создать пустую таблицу из другой?

Бывают кейсы, когда есть табличка, но нам нужна пустая на ее основе.

Есть два варианта, которые будут работать везде:

1️⃣ TOP/LIMIT
SELECT *
INTO new_table
FROM old_table
LIMIT 0


SELECT TOP 0 *
INTO new_table
FROM old_table


2️⃣ заведомо ложное условие
SELECT *
INTO new_table
FROM old_table
WHERE 1=0


#sql_tips
Please open Telegram to view this post
VIEW IN TELEGRAM
10🔥10👻4
Spark Actions and Transformations

В спарке есть два типа операций: действия и трансформации.

🦥В чем отличия?
Как и все мы, спарк очень любит лениться и ничего не делать. И он не будет тратить свою энергию на то, что никогда не понадобится. Так вот трансформации будут складироваться в цепочку по порядку, но никогда не выполнятся, если не будет запущено действие.

🔮Примеры трансформаций:
select() - выбираем столбцы
withColumn() - добавляем новое поле
where()/filter() - фильтруем
orderBy()/sort() - сортируем
join() - джойним
groupBy() - группируем
union() - объединяем
distinct() - удаляем дубликаты по всем столбцам
dropDuplicates() - удаляем дубликаты по некоторым столбцам
drop() - удаляем столбцы
SQL на максималках

💃Примеры действий:
show() - выводим датафрейм на экран
count() - считаем количество строк
toPandas() - собираем датафрейм в pandas df

Как использовать?
# Трансформации возвращают датафрейм
# Результата на экране нет
df = df.select('col1', 'col2').sort('col1')

# Действия возвращают НЕ датафрейм
# Результат на экране есть
df.show()


#spark
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥6👏2
Мини-задачка

Выберите правильную запись/правильные записи sql-запроса на Spark.

Запрос:
SELECT country, COUNT(customer_id)
FROM shop.customers
WHERE date = '2024-02-08'
GROUP BY country
HAVING COUNT(customer) > 5;


Варианты на Spark:
1) df = (
spark.table("shop.customers")
.select("country", "count(customer_id)")
.where("date = '2024-02-08'")
.groupBy("country")
.having("count(customer_id) > 5")
)

2) df = (
spark.table("shop.customers")
.where("date = '2024-02-08'")
.groupBy("country")
.agg(F.count("customer_id").alias("cnt"))
.where("cnt > 5")
)

3) df = (
spark.table("shop.customers")
.where("date = '2024-02-08'")
.select("country")
.count("customer_id")
.groupBy("country")
.having("count(customer_id) > 5")
)

4) df = (
spark.table("shop.customers")
.filter(F.col("date") == '2024-02-08')
.groupBy("country")
.agg(F.count("customer_id"))
.where("count(customer_id)> 5")
)


Голосовалка ниже
👇
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥1
Выберите правильную запись/правильные записи sql-запроса на Spark (пост выше)
Anonymous Poll
26%
1
38%
2
22%
3
45%
4
🔥1🤔1
💡Ответ💡
Правильные варианты - 2, 4.

Тут есть несколько интересных моментов:

1️⃣Агрегация

Делается через .agg() - на вход подается функция, которая что-то считает:

F.min(), F.max(), F.count(), F.countDistinct(), F.sum(), F.sumDistinct(), F.avg(), F.mean и т.д.

Дополнительно в селекте прописывать не нужно:
df.groupBy("department") \
.agg(F.sum("salary"), \
F.avg("salary"))


2️⃣На любое поле можно навесить элиасы

df.groupBy("department")
.agg(F.sum("salary").alias("sum_salary"), \
F.avg("salary").alias("avg_salary"))


3️⃣ having не существует

После агрегации у нас появляется новое поле, по которому сразу можно фильтровать. Без элиасов поля будут называться как функция(поле)
# равнозначны
.agg(F.count("customer_id").alias("cnt"))
.where("cnt > 5")

.agg(F.count("customer_id"))
.where("count(customer_id) > 5")


4️⃣ where и filter взаимозаменяемы

Внутри можно использовать как стандартный sql в кавычках, так и делать питоновские/скаловские сравнения:
.where("date = '2024-02-08'")
.filter("date = '2024-02-08'")
.where(F.col('date') == '2024-02-08')
.filter(F.col('date') == '2024-02-08')


Обратите внимание, что на Scala тройное равно:
.where("date = '2024-02-08'")
.filter("date = '2024-02-08'")
.where(col("date") === "2024-02-08")
.filter(col("date") === "2024-02-08")


col - это обертка над колонкой, которая дает доступ к более крутым функциям поверх: alias(), desc(), like(), isNull() и т.д.

#spark
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥7
⚔️UNION vs CASE WHEN

🔵Когда-то давно мне на собесах попадались задачки, где нужно было вывести показатели либо в одном столбце, но в разных строчках, либо в разных столбцах, но в одной строке.

Тогда я затупила, но у вас есть шансы пропустить этот этап😁
Они проверяли знание union и case when:

1️⃣ UNION/UNION ALL - для одного столбца
SELECT 'amt_not_null' as agg_type, SUM(amount) AS val
FROM test
WHERE id IS NOT NULL
UNION ALL
SELECT 'amt_null', SUM(amount)
FROM test
WHERE id IS NULL


UNION - удаляет дубликаты строк, UNION ALL - нет.
Если разницы для запроса нет, лучше выбирать UNION ALL, потому что без проверки на дубли он более производительный.
Элиасы во втором и последующем запросах можно не указывать, они будут тянуться из первого.

2️⃣ CASE WHEN - для разных столбцов
SELECT
SUM(CASE WHEN id IS NOT NULL THEN amount ELSE 0 END) AS amt_not_null,
SUM(CASE WHEN id IS NULL THEN amount ELSE 0 END) AS amt_null
FROM test


#sql_tips
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥15❤‍🔥1
👛Особенность CASE WHEN

Если нам нужно добавить интервалы/категории и т.д., то можно воспользоваться этим примером:

SELECT
num,
CASE
WHEN num < 0 THEN '< 0'
WHEN num < 10 THEN '[0; 10)'
WHEN num < 50 THEN '[10; 50)'
WHEN num < 100 THEN '[50; 100)'
END AS interval
FROM test


Здесь не нужно ограничивать с двух сторон:
WHEN num > 0 AND num < 10 THEN '[0; 10)'

потому что условия проверяются по порядку и после первого попадания дальше сравниваться не будут. Поэтому нужно начинать с самых узких.

#sql_tips
🔥82👏2
💻Полезные консольные команды

Я частенько оставляю в заметках команды, которые попадаются не раз, тут списочек для терминала:

1️⃣ Настройки проекта
# путь к дефолтному питону, на котором все запускается
which python
# /c/Users/Username/AppData/Local/Programs/Python/Python311/python

# все пути к установленным питонам
where python
whereis python

# версия
spark-submit --version

# установить все зависимости из файлика
pip install -r requirements.txt


Вместо питона может быть java, mvn (maven - сборщик проектов) - все, что угодно.

2️⃣ GIT
# поменять сообщение последнего коммита
git commit --amend -m "new message"

# запуллить актуальную версию из УКАЗАННОЙ веточки репозитория и затереть все свои локальные изменения
git reset --hard origin/develop

# запуллить актуальную версию из ТЕКУЩЕЙ веточки репозитория и затереть все свои локальные изменения
git reset --hard HEAD

# удалить локальную веточку
git branch -d localBranchName

# удалить веточку в репозитории
git push origin --delete remoteBranchName


3️⃣ Linux
# найти файл по названию/расширению/паттерну в ТЕКУЩЕЙ директории
find . -name "*.txt"

# искать, начиная с корня
find / -name "*.txt"

# найти файлы в ТЕКУЩЕЙ директории, в которых содержится строка
grep -r "import" .

# искать, начиная с корня
grep -r "import" /

# заархивировать папку
zip -r file.zip source_folder

# разархивировать
unzip file.zip -d destination_folder


4️⃣ YARN
# убить приложение
yarn app -kill <app_id>

# найти, кто убил ваше приложение
yarn app -status <app_id> | grep killed
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥13❤‍🔥4🤓1
🍄Задачка

🌸Есть таблица с полем инн и кучей других полей. Столбец с инн может и будет дублироваться. Нам нужно прийти к структуре второй таблички.

occurrences - это сколько раз инн повторяется в таблице
num_inn - количество инн, которые встречаются occurrences раз

➡️Пример⬅️
156, 444, 777 - встречаются 1 раз, но всего их 3
283 - 2 раза, всего таких инн 1
900 - 4 раза, всего тоже 1

Я нашла как минимум 2 варианта, один из которых более лаконичный, чуть менее оптимальный, но довольно необычный в качестве зарядки для мозга.

А какие у вас идеи?
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥7🤔2