Я предлагаю такое решение:
SELECT
event_type,
date AS date_start,
LEAD(date) OVER(ORDER BY date) AS date_end
FROM (
SELECT
event_type,
date,
LAG(event_type, 1, -1) OVER(ORDER BY date) AS prev_event_type
FROM events) t
WHERE event_type != prev_event_type;
Что происходит?
LAG - предыдущий элемент, LEAD - последующий.offset = 1 - берем предыдущий элемент, c 2 был бы предпредыдущий и т.д.default_value = -1 - применится для самой первой строки, т.к. еще не с чем сравнивать. Отрицательные события вряд ли будут, поэтому значение кажется безопасным для любой выборки.После этого этапа в данных лежит:
event_type date prev_event_type
1 2024-01-01 -1
2 2024-01-05 1
1 2024-01-08 2
...
Готово!
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥8❤🔥2❤2
Как считать датасет в Spark?
Все, мы импортировали все, что нужно, создали сессию, нашли датасет, готовы работать!
Самые популярные способы:
⭐ csv
На месте path может быть 'file.csv', несколько файлов 'file1.csv,file2.csv' или вся папка целиком 'myfiles/csv_data'.
Из опций основные:
🐱
🐱
🐱
На больших данных использовать
Почему? Потому что спарк пойдет сканировать каждую строчку, чтобы определить корректный тип. Допустим, есть колонка age, заполненная числами типа int, но в сааамом конце у нас все поехало и попало число типа double. В конечном итоге столбец будет тоже иметь тип double, но для этого мы просканировали все на всякий случай. И так с каждым полем. Ну такое😕
Поэтому надо самим задать схему и указать ее при чтении файла:
⭐ json, parquet, hive-таблица
У нас есть HDFS (Hadoop Distributed File System) - это распределенная файловая система, в которой обычно хранятся большие данные. Там они лежат в виде файликов внутри папочек. А Hive - это обертка, чтобы мы могли читать данные из файликов через sql-запросы🐝
#spark
Все, мы импортировали все, что нужно, создали сессию, нашли датасет, готовы работать!
Самые популярные способы:
df = (
spark.read
.option('header', True)
.option('delimiter', ';')
.option('inferSchema', False)
.csv(path)
)
val df = spark.read
.option("header", "true")
.option("delimiter", ";")
.option("inferSchema", "false")
.csv(path)
На месте path может быть 'file.csv', несколько файлов 'file1.csv,file2.csv' или вся папка целиком 'myfiles/csv_data'.
Из опций основные:
header - заголовокdelimiter - разделитель полейinferSchema - автоматический определитель типов полейНа больших данных использовать
inferSchema опасно!Почему? Потому что спарк пойдет сканировать каждую строчку, чтобы определить корректный тип. Допустим, есть колонка age, заполненная числами типа int, но в сааамом конце у нас все поехало и попало число типа double. В конечном итоге столбец будет тоже иметь тип double, но для этого мы просканировали все на всякий случай. И так с каждым полем. Ну такое
Поэтому надо самим задать схему и указать ее при чтении файла:
table_schema = StructType([
StructField('name', StringType(), True), # True - допустимы ли null
StructField('age', IntegerType(), True),
...
])
df = (
spark.read
.option(...)
.schema(table_schema)
.csv(path)
)
StructType - класс для структуры датафрейма. StructField - для поля. Лежат в spark.sql.types.df = spark.read.json(path)
df = spark.read.parquet(path)
df = spark.table('db_schema.table_name')
У нас есть HDFS (Hadoop Distributed File System) - это распределенная файловая система, в которой обычно хранятся большие данные. Там они лежат в виде файликов внутри папочек. А Hive - это обертка, чтобы мы могли читать данные из файликов через sql-запросы
#spark
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥5⚡2💯1
В таблице test есть поле dt с датой и временем в виде строки. Будет ли разница в запросах?
1. SELECT MAX(dt::date) FROM test;
2. SELECT MAX(dt)::date FROM test;
1. SELECT MAX(dt::date) FROM test;
2. SELECT MAX(dt)::date FROM test;
Anonymous Quiz
36%
да, первый запрос отработает быстрее
51%
да, второй запрос отработает быстрее
13%
нет, запросы одинаковы
🔥8
К чему был предыдущий вопрос?
🧳 На днях у нас с коллегой возник такой кейс: есть 50 таблиц, содержащих колонку с датой, но там есть дата, дата и время и unix timestamp. Цель - нужно их все объединить. БД - ClickHouse.
Итерация 1:
Количество считанных строк - 46 млрд. Время работы - 2,5 минуты. Ну... долго ждать, давайте еще раз внимательно посмотрим🤓
Итерация 2 - вынесли приведение даты со временем к дате в самый конец:
Количество считанных строк - 46 млрд. Время работы - 1,5 минуты. Уже лучше, но можно ли еще быстрее?
Итерация 3 - вынесли приведение unix timestamp к дате со временем:
Количество считанных строк - 25 млн. Время работы - 6 секунд. Супер! Нам этом остановимся👍
В итоге мы сначала находим максимальные значения и только потом кастуем. Выигрыш во времени - в 25 раз.
🕒 Время и строки можно посмотреть в системной табличке
🐢 В первых двух случаях в плане запроса есть шаги:
🐆 В третьем:
👍
#sql_tips
Итерация 1:
SELECT
max(
if(
table_name LIKE 'offline%',
toDate(FROM_UNIXTIME(dateTime::INT)),
toDate(dateTime)
)
) AS max_date
FROM schema.source_table
Количество считанных строк - 46 млрд. Время работы - 2,5 минуты. Ну... долго ждать, давайте еще раз внимательно посмотрим
Итерация 2 - вынесли приведение даты со временем к дате в самый конец:
SELECT
toDate(
max(
if(
table_name LIKE 'offline%',
FROM_UNIXTIME(dateTime::INT),
dateTime
)
)
) AS max_date
FROM schema.source_table
Количество считанных строк - 46 млрд. Время работы - 1,5 минуты. Уже лучше, но можно ли еще быстрее?
Итерация 3 - вынесли приведение unix timestamp к дате со временем:
SELECT
toDate(
if(
table_name LIKE 'offline%',
FROM_UNIXTIME(max(dateTime)::INT),
max(dateTime)
)
) AS max_date
FROM schema.source_table
Количество считанных строк - 25 млн. Время работы - 6 секунд. Супер! Нам этом остановимся
В итоге мы сначала находим максимальные значения и только потом кастуем. Выигрыш во времени - в 25 раз.
system.query_log в столбцах query_duration_ms, read_rows.ReadFromMergeTree (schema.source_table) - читаем таблицуExpression ((Conversion before UNION + (Projection + Before ORDER BY))) - кастуем к дате в каждом подзапросеReadFromStorage (MergeTree(with Aggregate projection _minmax_count_projection)) - вот эта проекция сильно быстрее, чем скан всей таблицы, а лишних приведений уже нет. Profit#sql_tips
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥6👏6❤🔥1💯1🆒1
У нас есть список, кортеж, что угодно, и нам нужно достать оттуда элементы:
person = (
'Eveline',
(1990, 12, 31),
'Moscow',
'+79271111111',
'+79272222222'
)
# Вместо того, чтобы обращаться по индексу:
name, birth_date, city, phone1, phone2 = person[0], person[1], person[2], person[3], person[4]
# можем сразу распаковать вот так:
name, birth_date, city, phone1, phone2 = person
Тогда в name пойдет имя, в birth_date - дата рождения и т.д.
Самое главное - количество переменных слева и элементов в объекте справа должно совпадать.
# Хочу скипнуть некоторые поля:
name, _, city, phone1, _ = person
# Хочу сложить телефоны в список:
name, birth_date, city, *phone_list = person
# список может идти в начале и середине тоже
# Хочу взять имя и phone2:
name, *_, phone2 = person
# Хочу взять только имя:
name, *_ = person
# Хочу взять только phone2:
*_, phone2 = person
# Хочу взять имя и год рождения:
name, (year, *_), *_ = person
#python_tips
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥18❤2⚡1
Как переименовать базу данных?
У меня часто бывали случаи, когда нужно было переименовать базу данных, но она не переименовывалась. В том проекте базу каждый сам создавал под свою задачу, поэтому операции с ней ни на кого не влияли.
Что нужно сделать?
Диалект - MS SQL Server
#sql_tips
У меня часто бывали случаи, когда нужно было переименовать базу данных, но она не переименовывалась. В том проекте базу каждый сам создавал под свою задачу, поэтому операции с ней ни на кого не влияли.
Что нужно сделать?
Диалект - MS SQL Server
--1. Переключиться в другую базу
USE master;
GO
--2. Установить однопользовательский режим
ALTER DATABASE [old_database] SET SINGLE_USER WITH ROLLBACK IMMEDIATE;
GO
--3. Поменять название
ALTER DATABASE [old_database] MODIFY NAME = [new_database];
GO
--4. Восстановить режим
ALTER DATABASE [new_database] SET MULTI_USER;
GO
#sql_tips
🔥15🌚1
Как создать пустую таблицу из другой?
Бывают кейсы, когда есть табличка, но нам нужна пустая на ее основе.
Есть два варианта, которые будут работать везде:
1️⃣ TOP/LIMIT
2️⃣ заведомо ложное условие
#sql_tips
Бывают кейсы, когда есть табличка, но нам нужна пустая на ее основе.
Есть два варианта, которые будут работать везде:
SELECT *
INTO new_table
FROM old_table
LIMIT 0
SELECT TOP 0 *
INTO new_table
FROM old_table
SELECT *
INTO new_table
FROM old_table
WHERE 1=0
#sql_tips
Please open Telegram to view this post
VIEW IN TELEGRAM
❤10🔥10👻4
В спарке есть два типа операций: действия и трансформации.
Как и все мы, спарк очень любит лениться и ничего не делать. И он не будет тратить свою энергию на то, что никогда не понадобится. Так вот трансформации будут складироваться в цепочку по порядку, но никогда не выполнятся, если не будет запущено действие.
select() - выбираем столбцыwithColumn() - добавляем новое полеwhere()/filter() - фильтруемorderBy()/sort() - сортируемjoin() - джойнимgroupBy() - группируемunion() - объединяемdistinct() - удаляем дубликаты по всем столбцамdropDuplicates() - удаляем дубликаты по некоторым столбцамdrop() - удаляем столбцыSQL на максималках
show() - выводим датафрейм на экранcount() - считаем количество строкtoPandas() - собираем датафрейм в pandas dfКак использовать?
# Трансформации возвращают датафрейм
# Результата на экране нет
df = df.select('col1', 'col2').sort('col1')
# Действия возвращают НЕ датафрейм
# Результат на экране есть
df.show()
#spark
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥6👏2
Запрос:
SELECT country, COUNT(customer_id)
FROM shop.customers
WHERE date = '2024-02-08'
GROUP BY country
HAVING COUNT(customer) > 5;
Варианты на Spark:
1) df = (
spark.table("shop.customers")
.select("country", "count(customer_id)")
.where("date = '2024-02-08'")
.groupBy("country")
.having("count(customer_id) > 5")
)
2) df = (
spark.table("shop.customers")
.where("date = '2024-02-08'")
.groupBy("country")
.agg(F.count("customer_id").alias("cnt"))
.where("cnt > 5")
)
3) df = (
spark.table("shop.customers")
.where("date = '2024-02-08'")
.select("country")
.count("customer_id")
.groupBy("country")
.having("count(customer_id) > 5")
)
4) df = (
spark.table("shop.customers")
.filter(F.col("date") == '2024-02-08')
.groupBy("country")
.agg(F.count("customer_id"))
.where("count(customer_id)> 5")
)
Голосовалка ниже
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥1
Выберите правильную запись/правильные записи sql-запроса на Spark (пост выше)
Anonymous Poll
26%
1
38%
2
22%
3
45%
4
🔥1🤔1
Правильные варианты - 2, 4.
Тут есть несколько интересных моментов:
Делается через
.agg() - на вход подается функция, которая что-то считает:F.min(), F.max(), F.count(), F.countDistinct(), F.sum(), F.sumDistinct(), F.avg(), F.mean и т.д.Дополнительно в селекте прописывать не нужно:
df.groupBy("department") \
.agg(F.sum("salary"), \
F.avg("salary"))df.groupBy("department")
.agg(F.sum("salary").alias("sum_salary"), \
F.avg("salary").alias("avg_salary"))После агрегации у нас появляется новое поле, по которому сразу можно фильтровать. Без элиасов поля будут называться как функция(поле)
# равнозначны
.agg(F.count("customer_id").alias("cnt"))
.where("cnt > 5")
.agg(F.count("customer_id"))
.where("count(customer_id) > 5")
Внутри можно использовать как стандартный sql в кавычках, так и делать питоновские/скаловские сравнения:
.where("date = '2024-02-08'")
.filter("date = '2024-02-08'")
.where(F.col('date') == '2024-02-08')
.filter(F.col('date') == '2024-02-08')Обратите внимание, что на Scala тройное равно:
.where("date = '2024-02-08'")
.filter("date = '2024-02-08'")
.where(col("date") === "2024-02-08")
.filter(col("date") === "2024-02-08")col - это обертка над колонкой, которая дает доступ к более крутым функциям поверх: alias(), desc(), like(), isNull() и т.д. #spark
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥7
⚔️UNION vs CASE WHEN
🔵 Когда-то давно мне на собесах попадались задачки, где нужно было вывести показатели либо в одном столбце, но в разных строчках, либо в разных столбцах, но в одной строке.
Тогда я затупила, но у вас есть шансы пропустить этот этап😁
Они проверяли знание union и case when:
1️⃣ UNION/UNION ALL - для одного столбца
Если разницы для запроса нет, лучше выбирать
Элиасы во втором и последующем запросах можно не указывать, они будут тянуться из первого.
2️⃣ CASE WHEN - для разных столбцов
#sql_tips
Тогда я затупила, но у вас есть шансы пропустить этот этап
Они проверяли знание union и case when:
SELECT 'amt_not_null' as agg_type, SUM(amount) AS val
FROM test
WHERE id IS NOT NULL
UNION ALL
SELECT 'amt_null', SUM(amount)
FROM test
WHERE id IS NULL
UNION - удаляет дубликаты строк, UNION ALL - нет.Если разницы для запроса нет, лучше выбирать
UNION ALL, потому что без проверки на дубли он более производительный.Элиасы во втором и последующем запросах можно не указывать, они будут тянуться из первого.
SELECT
SUM(CASE WHEN id IS NOT NULL THEN amount ELSE 0 END) AS amt_not_null,
SUM(CASE WHEN id IS NULL THEN amount ELSE 0 END) AS amt_null
FROM test
#sql_tips
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥15❤🔥1
👛Особенность CASE WHEN
Если нам нужно добавить интервалы/категории и т.д., то можно воспользоваться этим примером:
Здесь не нужно ограничивать с двух сторон:
потому что условия проверяются по порядку и после первого попадания дальше сравниваться не будут. Поэтому нужно начинать с самых узких.
#sql_tips
Если нам нужно добавить интервалы/категории и т.д., то можно воспользоваться этим примером:
SELECT
num,
CASE
WHEN num < 0 THEN '< 0'
WHEN num < 10 THEN '[0; 10)'
WHEN num < 50 THEN '[10; 50)'
WHEN num < 100 THEN '[50; 100)'
END AS interval
FROM test
Здесь не нужно ограничивать с двух сторон:
WHEN num > 0 AND num < 10 THEN '[0; 10)'
потому что условия проверяются по порядку и после первого попадания дальше сравниваться не будут. Поэтому нужно начинать с самых узких.
#sql_tips
🔥8❤2👏2
Я частенько оставляю в заметках команды, которые попадаются не раз, тут списочек для терминала:
# путь к дефолтному питону, на котором все запускается
which python
# /c/Users/Username/AppData/Local/Programs/Python/Python311/python
# все пути к установленным питонам
where python
whereis python
# версия
spark-submit --version
# установить все зависимости из файлика
pip install -r requirements.txt
Вместо питона может быть java, mvn (maven - сборщик проектов) - все, что угодно.
# поменять сообщение последнего коммита
git commit --amend -m "new message"
# запуллить актуальную версию из УКАЗАННОЙ веточки репозитория и затереть все свои локальные изменения
git reset --hard origin/develop
# запуллить актуальную версию из ТЕКУЩЕЙ веточки репозитория и затереть все свои локальные изменения
git reset --hard HEAD
# удалить локальную веточку
git branch -d localBranchName
# удалить веточку в репозитории
git push origin --delete remoteBranchName
# найти файл по названию/расширению/паттерну в ТЕКУЩЕЙ директории
find . -name "*.txt"
# искать, начиная с корня
find / -name "*.txt"
# найти файлы в ТЕКУЩЕЙ директории, в которых содержится строка
grep -r "import" .
# искать, начиная с корня
grep -r "import" /
# заархивировать папку
zip -r file.zip source_folder
# разархивировать
unzip file.zip -d destination_folder
# убить приложение
yarn app -kill <app_id>
# найти, кто убил ваше приложение
yarn app -status <app_id> | grep killed
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥13❤🔥4🤓1
occurrences - это сколько раз инн повторяется в таблицеnum_inn - количество инн, которые встречаются occurrences раз156, 444, 777 - встречаются 1 раз, но всего их 3
283 - 2 раза, всего таких инн 1
900 - 4 раза, всего тоже 1
Я нашла как минимум 2 варианта, один из которых более лаконичный, чуть менее оптимальный, но довольно необычный в качестве зарядки для мозга.
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥7🤔2
SELECT
occurrences,
COUNT(1) AS num_inn
FROM (
SELECT COUNT(1) AS occurrences
FROM test
GROUP BY inn
)
GROUP BY occurrences
ORDER BY num_inn DESC, occurrences;
Я обычно пишу count(1) вместо count(*), чтобы на всякий база не пошла читать все столбцы. Но многие говорят, что это миф и они абсолютно одинаковы, так что решать вам
Также в подзапросе можно группировать по столбцу, но не включать его в селект.
from -> join -> where -> group by -> having -> select (вместе с оконками) -> order by -> limit
То есть на этапе селекта мы уже сделали все группировки и можем спокойно использовать:
SELECT DISTINCT
COUNT(inn) AS occurrences,
COUNT(COUNT(inn)) OVER (PARTITION BY COUNT(inn)) AS num_inn
FROM test
GROUP BY inn
ORDER BY num_inn DESC, occurrences;
Sort(cost=43.09..43.59)
Unique(cost=52.31..53.81)
Второй запрос тяжелее, но вам может встретиться кейс, когда первый вариант не подойдет, так что имейте в виду
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥5⚡2💯1
Когда стоит вопрос: использовать
LEFT или LIKE - надо брать второе. Особенно в базах, которые поддерживают индексацию в LIKE.Какие есть варианты запроса?
SELECT * FROM accounts
WHERE LEFT(account_num, 1) = '5'
SELECT * FROM accounts
WHERE account_num LIKE '5%'
План запроса:
|--Clustered Index Scan(..., WHERE:(substring(account_num,(1),(1))=N'5'))
|--Clustered Index Seek(..., SEEK:(account_num >= N'5' AND account_num < N'6'), WHERE:(account_num like N'5%') ORDERED FORWARD)
Index Scan - вытаскиваем все строки из таблицы и ищем совпадение с подстрокой.Index Seek - сразу вытаскиваем нужную группу записей, потом фильтруем.PROFIT
Проблема в использовании функций. Поэтому, если возможно, заменяйте функции поверх полей на операторы, чтобы индексы могли нормально работать:
SELECT * FROM documents
WHERE YEAR(valid_from) = 2023 and MONTH(valid_from) = 6
--Index Scan
SELECT * FROM documents
WHERE valid_from BETWEEN '2023-06-01' AND '2023-06-30'
--Index Seek 👍
#sql_tips
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥17👏4
SQL джойны в Spark
🎹 В спарке всего 7 видов:
1) inner
2) left outer
3) right outer
4) full outer
5) cross
6) left semi
7) left anti
На первых четырех останавливаться не будем, вы их точно знаете😉
Схема такая:
- с какой табличкой джойним
- условия джойна
- тип джойна
Почти для каждого типа есть несколько обозначений, например, left, leftouter, left_outer - все равнозначны. Посмотреть на примеры можно тут
✨ Часто бывает, что столбцы называются одинаково, тут в ход идут элиасы. Так мы не привязываемся к конкретным названиям датафреймов:
CROSS JOIN
В спарк конфиге при создании сессии есть параметр, который может включать и отключать cross join:
Если отключите, то в коде вызовется исключение. В Spark3 по умолчанию True, в Spark2 - False.
LEFT SEMI
То же самое, что inner join, но возвращаются только колонки из левого датасета для сметчившихся строк.
LEFT ANTI
Возвращает колонки из левого датасета, но для НЕсметчившихся строк.
🤗 Вообще left semi я особо не использую, т.к. если уж нужно джойнить с другой табличкой, то наверняка оттуда надо что-то забрать😁 А вот left anti - удобная штука, когда надо поресерчить проблемные строки и не загромождать экран ненужными колонками
#spark
1) inner
2) left outer
3) right outer
4) full outer
5) cross
6) left semi
7) left anti
На первых четырех останавливаться не будем, вы их точно знаете
Схема такая:
- с какой табличкой джойним
- условия джойна
- тип джойна
df = df1.join(df2, F.col('id') == F.col('parent_id'), 'left')Почти для каждого типа есть несколько обозначений, например, left, leftouter, left_outer - все равнозначны. Посмотреть на примеры можно тут
df = df1.alias('df1').join(df2.alias('df2'), F.col('df1.id') == F.col('df2.id'), 'left')CROSS JOIN
В спарк конфиге при создании сессии есть параметр, который может включать и отключать cross join:
.config("spark.sql.crossJoin.enabled", True)Если отключите, то в коде вызовется исключение. В Spark3 по умолчанию True, в Spark2 - False.
LEFT SEMI
То же самое, что inner join, но возвращаются только колонки из левого датасета для сметчившихся строк.
LEFT ANTI
Возвращает колонки из левого датасета, но для НЕсметчившихся строк.
#spark
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥18❤3
Data Engineer в Яндекс 360
Москва/СПБ
middle, senior
Яндекс 360 — это Почта, Диск, Телемост с видеозвонками, Документы, Мессенджер, Рассылки, Заметки, Календарь, Трекер, Вики и Формы. Каждый день этими сервисами пользуются миллионы человек и тысячи компаний.
Наши продукты, команды и, конечно, объёмы данных быстро растут. Мы ищем дата-инженера, который поможет организовать хранилище, навести порядок и вывести аналитику наших сервисов на новый уровень.
Какие задачи вас ждут
- Вести разработку ETL-процессов поставки данных
- Выявлять неэффективность в существующих процессах и оптимизировать их
- Проектировать структуру хранения данных
- Погружаться в данные из прикладных бизнес-областей и становиться в них экспертом
- Сопровождать разработанные ETL-процессы
- Участвовать в развитии платформы DWH
Мы ждём, что вы
- Уверенно владеете Python
- Уверенно владеете SQL (join, агрегация, оконные функции, оптимизация сложных запросов)
- Умеете пользоваться Git
- Знаете, как строить и оптимизировать ETL-процессы
Будет плюсом, если вы
- Разбираетесь в слоях DWH и моделях хранения данных
- Работали с большими объёмами данных
Откликнуться
Please open Telegram to view this post
VIEW IN TELEGRAM
yandex.ru
Вакансия «Data Engineer в Мессенджер (Ереван)» в Яндексе — работа в компании Яндекс для IT-специалистов
Работа в компании Яндекс для специалиста «Data Engineer в Мессенджер (Ереван)» с уровнем квалификации от «Специалист» до «Старший» — Высокая заработная плата и социальные гарантии в IT-компании России
❤6🔥2💯1
CROSS JOIN LATERAL, LEFT JOIN LATERAL. Так что с другими бд, я надеюсь, тоже несложно разобраться.В чем суть?
По факту это те же самые джойны, но не с табличкой, а с результатом функции, которая возвращает табличку.
Допустим, есть функция поиска книги по автору:
CREATE FUNCTION getBookByAuthorId(@AuthorId int)
RETURNS TABLE AS RETURN
(
SELECT * FROM book
WHERE author_id = @AuthorId
)
Используем в запросе:
SELECT a.author_name, b.book_name, b.price
FROM author a
CROSS APPLY getBookByAuthorId(a.id) b
Ненайденные строчки также заполняются NULL.
В этом случае аналогично подзапросу в селекте:
--outer apply
SELECT a.author_name, book_list
FROM author a
OUTER APPLY (
SELECT STRING_AGG(book_name, ', ') AS book_list
FROM book b
WHERE b.author_id = a.id
) sub
--подзапрос
SELECT a.author_name, (
SELECT STRING_AGG(book_name, ', ')
FROM book b
WHERE b.author_id = a.id
) AS book_list
FROM author a
--STRING_AGG соединяет строки в одну ячейку с разделителем
С агрегирующими функциями прокатит только outer apply, потому что агрегировать поверх подзапроса нельзя. А так можно:
SELECT a.author_name, count(book_id) as book_num
FROM author a
OUTER APPLY (
SELECT b.id as book_id
FROM book b
WHERE b.author_id = a.id
) sub
GROUP BY a.author_name
#sql_tips
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥5❤2👏2
Все sql-джойны внутри превращаются в один из 4х типов:
Какой из них используется - смотрим через
df.explain()Сейчас пройдемся по каждому и посмотрим на условия срабатывания.
- условие на равенство
F.col('df1.id') == F.col('df2.id')- ключи должны быть сортируемы
Примеры несортируемых ключей: бинарный формат, сложные структуры
Что такое broadcast?
shuffle - группируются одинаковые ключи из разных экзекьюторов на одном => большие расходы на сеть. Но если у нас есть маленькая табличка (десятки мегабайт, но по умолчанию просто 10), то мы можем скопировать ее на все экзекьюторы, поджойнить там же и избежать шафла. Лимит на размер - память самого экзекьютора.- условие на равенство
- включен broadcast в настройках сессии
# 2й аргумент - размер в МБ
.config('spark.sql.autoBroadcastJoinThreshold', 100)
# так отключается broadcast
.config('spark.sql.autoBroadcastJoinThreshold', -1)
# broadcast join
df1.join(F.broadcast(df2), condition, join_type)
- условие на неравенство
- включен broadcast
NestedLoop - потому что мы итерируемся по маленькому датасету и проверяем каждую строчку
- условие на неравенство
Самый медленный, может приводить к OOM-ошибкам (Out of Memory).
#spark
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥10⚡2💯2❤🔥1❤1