Канал Андрея про бекенд – Telegram
Канал Андрея про бекенд
1.68K subscribers
9 photos
13 links
👨🏻‍🎓 Я Андрей Суховицкий - tech lead и лектор Университета ИТМО, @sukhoa

🔥 Темы в канале: kotlin, java, coroutines, многопоточное программирование, system design, реализация высоконагруженных и надежных систем.

Посты по средам

Присоединяйтесь!
Download Telegram
Буферизация запросов

🚶🏻‍♀️🚶🏻🚶🏼‍♂️🚶🏻‍♀️ Очередь - очень полезная структура данных. Мы уже обсуждали ее использование для коммуникации потоков приложения. Аналогично она может использоваться и для общения между сервисами через шину/брокера, например Kafka или RabbitMQ.

🧳 В этом посте давайте обсудим другой распространенный пример использования очередей - буферизация запросов. Если сервис не может выполнить входящий запрос немедленно, то можно на время помещать этот запрос в контейнер/буфер для последующей обработки.

🧨 Такой буфер может быть помещен на входе в сервис (очередь http-запросов). Множество таких очередей может присутствовать и внутри сервиса и часто довольно неявно для вас. Вы можете даже не подозревать об их присутствии. Примеры: очередь на получение коннекции к БД из пула, очереди на получение блокировки (любой лок, семафор, мьютекс), очередь к пулу потоков (они тоже могут быть скрыты внутри библиотек и фреймворков, которые вы используете).

📦 Проблема запросов, которые не могут быть обработаны сервисом немедленно

Когда скорость поступления запросов начинает превышать скорость их выполнения вашим сервисом, вы оказываетесь перед выбором, что делать:

1. Отклонять поступающие запросы сразу: возвращать отправителю со словами, что возможности сервиса превышены. Это самый простой и иногда самый верный способ.

2. “Попридерживать” запросы, пока не сможем их выполнить. Тут мы и начинаем помещать такие запросы в очередь.

💀 Важно в этот момент понимать, что очередь никак не ускоряет работу вашего сервиса, она лишь копит запросы в надежде, что в будущем нагрузка ослабнет (или сервис начнет работать быстрее) и запрос будет выполнен. Если же входная нагрузка на ваш сервис всегда превышает его возможность обработать такое количество запросов, то очередь будет расти бесконечно и вы имеете все шансы потратить всю память приложения на хранение запросов, которые никогда не будут обработаны. Заметим также, что из-за содержания в очереди может расти и само время обработки запроса.

✍️ Вывод: все очереди должны быть ограничены. А подходящие размеры вы подбираете сами, опираясь на знания о вашем сервисе и требования к обработке трафика.

🎃 Получается, очереди ничем не могут нам помочь и только откладывают неизбежное?

Это не так. Буферизация запросов очень полезна в моменты временных повышений нагрузки, она помогает “придержать” в очередях избыточное количество трафика и обработать его позже, когда нагрузка спадет, не потеряв запросы. Однако вам придется принять решение, как поступать при переполнении очередей. Стратегии могут быть различные от выкидывания запросов до сохранения их в персистентные хранилища для последующей обработки. Если вы используете ExecutorService, то можете определить вашу собственную стратегию при переполнении очереди с помощью класса RejectedExecutionHandler или использовать одну из дефолтных стратегий.

#highload #queues
👍13🔥63😍1
Forwarded from Oleg
А что делать в ситуации, когда сервис набрал запросов в очередь и упал? А нам как раз важно не терять запросы
5
☝🏽 Зависит от природы очереди. Если это очередь в памяти приложения (например, очередь ExecutorService), то вместе с падением приложения потеряется и содержимое очереди (можно что-то накрутить, но никакой гарантии не будет).

🚌 Чтобы избежать такой ситуации можно, например, помещать трафик в персистентное хранилище (Kafka, RabitMQ в персистентном режиме, или просто БД), вычитывать оттуда в темпе, комфортном для приложения, обрабатывать, а потом "коммитить" сообщения, то есть отмечать в очереди, что сообщение обработано (ну или вообще удалять). В Kafka для этого есть возможность делать "коммит (acknowlegment)", сохранять индекс последнего обработанного сообщения в партиции. В Rabbit тоже есть механизм consumer acknowlegment. В базе данных это может быть просто флаг isProcessed, например.

🪨 Есть даже вариант сохранять все сообщения прямо на файловую систему во втроенную БД (например, RocksDB). Или в очередь Redis (c включенным persistent режимом), хотя там есть свои нюансы.

🔐 Если же не хочется терять много сообщений, но чуть-чуть не страшно, то можно просто ограничивать размеры очередей, чтобы не потерять миллионы скопившихся сообщений.

🔩 Ну и ограничивать каким-то образом ваших клиентов, чтобы не "убивали" вас потоком сообщений. Для этого есть механизмы back-pressure.

- Вы можете установить лимиты скорости - rate limits (количество сообщений в единицу времени), на параллельные запросы (пример, не более 10 параллельных запросов от отного клиента в каждый момент), лимиты на входящие коннекты и тд.

- В хорошем случае вы можете давать клиенту фидбек ("я готов принять столько-то сообщений" / "стоп, пока не шли") и тд. А в идеальном случае вы сами сможете контроллировать поток сообщений "вычитывая" (read/pull) сообщения от клиента. Вроде того, как вы вычитываете из Кафки. Тут вы прямо минимизируете количество трафика в памяти и соответственно потенциальные потери.

💎 Ну и самое важное в вашем случае - чтобы клиент (тот сервис, который вам послал эти сообщения) имел механизм retry, помнил, какие сообщение не были обработаны и повторял попытки. В общем, сам обо всем заботился :)
🔥14👍5🍾2
Кейс с собеседования

⬇️ Ниже приведен фрагмент кода. Представьте, что этот код работает у вас на продакшене прямо сейчас.

Один поток Producer генерирует последовательность целых чисел. Несколько потоков Consumer получают эти числа из очереди и производят маппинг этих чисел на другие.

class Consumer(
private val codesQueue: BlockingQueue<Int>,
private val codeMappings: List<Int>
) {
@Volatile
var isActive: Boolean = true

fun consume() {
while (isActive || codesQueue.isNotEmpty()) {
val code = codesQueue.take()
log.info("Code $code mapped to ${codeMappings[code]}")
// do something with errorCodeMapping
}
}
}

class Producer(private val codesQueue: BlockingQueue<Int>) {
@Volatile
var isActive: Boolean = true

fun supply() {
while (isActive) {
codesQueue.put(Random().nextInt())
}
}
}


🦥 В какой-то момент вы замечаете, что маппинг не происходит: программа перестает писать в файл info-логи. В логах нет никаких ошибок. Вы проверяете ваш java-процесс с помощью ps -aux | grep java, он жив — то есть программа не завершилась.

👩🏻‍⚕️ Вы не знаете воспроизведется ли в следующий раз такая ситуация, поэтому не хотите останавливать процесс. Ваша задача придумать способ провести диагностику происходящего "на лету", здесь и сейчас. Что бы вы сделали?

🥟 Не спешите открывать ответ, подумайте, какие варианты вы бы могли предложить на собеседовании. Пишите ваши варианты в комментарии. Ответ, который я ожидал бы услышать: thread dump.Подробности в следующем посте.

💡 Попробуйте проанализировать код, попытайтесь найти обстоятельства, которые привели к симптомам, описанным выше. Какие изменения вы бы внесли в код для предотвращения подобных ситуаций? Пишите в комменты!

#java #kotlin #threads #case #interview
🔥11👍53🤝1
Thread dump и анализ кейса

📸 Дампом потоков называют снимок состояния всех текущих потоков JVM. Он очень полезен для диагностики проблем с производительностью, блокировками, утечками ресурсов. Дамп представляет из себя текстовый файл, где для каждого из существующих потоков java-процесса вы можете найти его состояние (RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED) и полный стек вызовов.

⬆️ Вспоминаем ситуацию, описанную в предыдущем посте:

Программа перестает писать info-логи в файл. В логах нет никаких ошибок. Вы проверяете ваш java-процесс с помощью ps -aux | grep java, - он жив, то есть программа не завершилась.

🔧 В этом случае для диагностики вы можете прибегнуть к thread dump. Примеры утилит, которые помогут вам сделать снимок потоков: jstack, jvisualvm, JMC (Java Mission Control), jcmd. Ниже пример команды для утилиты jstack, на вход необходимо передать pid java-процесса.

jstack <pid> > threaddump.txt


🧨 Ниже приведен фрагмент thread dump, представляющий интересующий нас поток-producer.

"Thread-0" #16 daemon prio=5 os_prio=31 cpu=0.97ms elapsed=3.79s tid=0x00007fd90d0ed800 nid=0x5f03 waiting on condition  [0x000070000a578000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@17.0.9/Native Method)
. . . . . . .
at java.util.concurrent.LinkedBlockingQueue.put(java.base@17.0.9/LinkedBlockingQueue.java:343)
at ru.quipy.Producer.supply(Example.kt:30)
at ru.quipy.ExampleKt.main$lambda-4(Example.kt:48)
at ru.quipy.ExampleKt$$Lambda$32/0x0000000132014658.run(Unknown Source)
at java.lang.Thread.run(java.base@17.0.9/Thread.java:840)


🧐 Как мы видим, поток находится в состоянии перманентного (не ограниченного временем) ожидания (java.lang.Thread.State: WAITING) в методе LinkedBlockingQueue.put. Если вы находитесь на собеседовании, то можете сказать, что лучше использовать аналогичный метод LinkedBlockingQueue.offer, который не блокируется навечно, а использует ограниченную временем блокировку, что даст нам возможность выйти из ожидания, понять, что операция не удалась и сделать об этом запись в лог. Аналогично и для метода LinkedBlockingQueue.take: вообще все блокирущие методы стоит ограничивать временем, тогда не возникнет ситуация с блокировкой потока при завершении программы, которую вы описали в комментах к предыдущему посту.

🔍 Очень подозрительно, но мы не можем найти дамп потока consumer из чего может следовать вывод, что такового в JVM нет. Вероятно, данный поток уже завершился. Это объясняет, почему он больше не делает записи в логи и почему поток supplier не может поместить ничего в очередь: она достигла максимального размера из-за отсутствия потока-потребителя.

🕵🏽 Поток завершился в следствие появления неожиданного исключения. Это могло произойти в следующей строке: log.info("Consumed $code. Mapped to ${codeMappings[code]}"), если элемента с индексом code в списке не существует.

🤯 А исключения мы могли не увидеть в логах по нескольким причинам. Например, вызывающий нас код мог поймать исключение, завершить поток и не записать об этом в лог. Или же был использован базовый обработчик исключения в потоке (interface UncaughtExceptionHandler), дефолтная версия которого (class ThreadGroup) просто делает запись в System.err, который мог не перенаправляться в лог.

🥇 Какие выводы мы можем сделать?

1. Следует оборачивать происходящее в потоке в глобальный обработчик исключений try-catch, логировать перехваченные исключения и поддерживать работу потока, чтобы приложение могло прогрессировать даже вопреки ошибочным ситуациям.

2. Следует явно ограничивать временем методы, которые блокируют поток исполнения или использовать их варианты/альтернативы с таймаутом!

#java #kotlin #threads #interview #case
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥18👍42🤝1
👨🏻‍🎓 Всем привет! Меня зовут Андрей Суховицкий. Я tech lead и лектор Университета ИТМО. На этом канале я рассказываю про backend. С предложениями тем, фидбеком пишите: @sukhoa

🔥 Основные темы: kotlin, java, coroutines, многопоточное программирование, system design, реализация высоконагруженных и надежных систем.

Вы можете проголосовать за канал
_______________________________

Мой Youtube-канал
Мой курс по event-sourcing. Промокод TG_TEAM на -30%. Сейчас доступен только тариф без ментора.
_______________

Навигация:
#java, #kotlin - посты, релевантные для разработчика на этих языках
#threads - многопоточка и другое полезное о потоках
#highload - все, связанное с реализацией высоконагруженных систем
#coroutines - связанное с kotlin-coroutines, их применением и реализацией
#queues - кейсы, связанные с различными видами очередей - in-memory, брокеры и тд
#case - описание реальных ситуаций и их траблшутинг
#interview - потенциальные вопросы на собеседовании, анализ, разбор
#metrics_basics - основы сбора, хранения и визуализации метрик, prometheus
🤝7🔥1
Канал Андрея про бекенд pinned «👨🏻‍🎓 Всем привет! Меня зовут Андрей Суховицкий. Я tech lead и лектор Университета ИТМО. На этом канале я рассказываю про backend. С предложениями тем, фидбеком пишите: @sukhoa 🔥 Основные темы: kotlin, java, coroutines, многопоточное программирование, system…»
Давайте потокам осмысленные имена

🚨 Это архиважная вещь в промышленной разработке на java. Вы могли заметить, что необходимый нам поток из поста выше 👆🏼носит название Thread-0 . Я без труда нашел его в thread dump только потому, что программа была мала. Но даже ее дамп включал 21 поток, подавляющее большинство которых было служебными потоками JVM (в основном потоками сборщика мусора).

🔍 Крупные сервисы могут иметь сотни и даже тысячи потоков. Может быть очень сложно найти среди них именно те, что нужны вам. Но задача будет гораздо проще, если вы будете давать осмысленные имена потокам и пулам потоков, которые работают над одним типом задач.

🧵 Давайте начнем с одиночных потоков. Конечно, мы не часто создаем потоки через конструктор класса Thread, но полезно знать, что этот конструктор позволяет вам дать потоку имя.

val producerThread = Thread(null, { producer.supply() }, "producer-thread")


🏭Теперь давайте перейдем к пулам потоков. Статический метод Executors.newFixedThreadPool, через который удобно создавать пулы потоков, принимает на вход необходимый размер пула и еще один интересный параметр с типом ThreadFactory. У этого интерфейса всего один метод Thread newThread(Runnable task). Пул потоков делегирует переданному объекту ThreadFactory задачу создания потоков. Давайте реализуем свой вариант ThreadFactory, который именует потоки необходимым образом и передадим такой объект нашему пулу:

class NamedThreadFactory(private val prefix: String) : ThreadFactory {
private val sequence = AtomicInteger(1)

override fun newThread(r: Runnable): Thread {
val thread = Thread(r)
val seq = sequence.getAndIncrement()
thread.name = prefix + (if (seq > 1) "-$seq" else "")
return thread
}
}


🚲 На вход классу NamedThreadFactory передаем строку, которая будет являться префиксом имени всех потоков, созданных этой фабрикой. С помощью атомарного каунтера sequence мы сможем создавать уникальную часть имени для наших потоков🥇🥈🥉. Теперь давайте передадим эту фабрику пулу:

val httpCallsExecutor = Executors.newFixedThreadPool(16, NamedThreadFactory(“http-calls-pool”))


🎯 Вуаля, теперь в thread dump ваши потоки будут иметь осмысленные имена и вам не придется тратить уйму времени на изучение стека ненужных потоков!

#java #kotlin #threads
Please open Telegram to view this post
VIEW IN TELEGRAM
👍16🔥4🤝1
Использование Thread.sleep для тестирования асинхронных программ

📲 Тестируем сервис отправки push-уведомлений. Внешние процессы вызвают его, чтобы отправить push-уведомление c заданным текстом на мобильное устройство. Сервис возвращает идентификатор уведомления, по которому можно проверить его статус. Само уведомление может быть отправлено позже, для этого наш сервис совершает http-вызов к внешнему провайдеру.

interface NotificationService {
fun submitNotification(deviceId: UUID, text: String): UUID
}


Как будем тестировать?

👺 Если тестирование модульное, то, вероятно, мы хотим сделать mock для http-client-a, с помощью которого будет посылаться уведомление и проверить, что один из его методов был вызван. Аналогично можно сделать mock классов доступа к базе данных и проверить, что статус оповещения был обновлен. Однако, если написать такой код, то может оказаться, что тест в большом количестве случаев не проходит:

notificationService.submitNotification(deviceId, “Hello”)
verify(dbService, times(1)).updateStatus(any());


Между вызовом сервиса и проверкой прошло очень мало времени, запрос мог провести его в буферах-очередях, ждать выполнения http-вызова, или выполнения запроса в БД. Вероятность этого еще выше, если тест интеграционный - сразу после самбита уведомления мы пытаемся получить его статус из базы данных, но он не успевает измениться.

Чтобы сделать тест "зеленым", частенько выбирают некоторую “взятую с потолка” константу, например, 5 секунд и помещают инструкцию, которая заставляет поток "уснуть", между вызовом сервиса и проверкой:

notificationService.submitNotification(deviceId, “Hello”)
Thread.sleep(5000L) // 5000 milliseconds
verify(dbService, times(1)).updateStatus(any());


Как правило, это решает проблему прохождения теста, но генерирует несколько новых:

1. Пяти секунд может быть недостаточно в определенных обстоятельствах и тест будет падать, но не всегда, а, например, в 1% случаев. Не так много, чтобы переписать тест, но достаточно, чтобы с завидной регулярностью фейлить весь билд и раздражать 😤. То есть вы своими руками делаете свои тесты недетерминированными, flucky тестами.

2. Вторая причина часто более заметна для команды. Инструкция Thread.sleep(5000L) выставляет нижнюю границу выполнения теста равной пяти секундам, тогда как в удачном сценарии тест мог занять 10, 50, 100, 200 миллисекунд, секунду. Что, если у вас 200 тестов? Время выполнения 1000 секунд или около 16 минут, тогда как в удачном сценарии ваш билд мог занять во много раз меньше - 1, 2 … 4 минуты. Часто проблема нарастает очень плавно с увеличением количества тестов и неопытные команды не могут понять, в чем причина увеличения времени сборки 🤔.

Ставьте 🔥, если интересно прочитать про решение данной проблемы. Кто уже сталкивался - кидайте в комментарии ваши методы и интересные библиотеки, которые используются в вашей команде.

#kotlin #java #async
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥682👍1🤝1
Как “не спать” при тестировании асинхронных программ.

🛌 В предыдущем посте я описал мотивацию использования Thread.sleep при тестировании асинхронного кода и связанные с этим проблемы. Однако, пассивное ожидание потока очень помогает нам при тестировании асинхронного кода, ведь тесту нужно дождаться завершения асинхронной операции, чтобы начать проводить различные проверки.

Проблему для нас создает тот факт, что мы не знаем, сколько времени в действительности займет тестируемая операция (от выполнения к выполнению ее длительность может варьироваться). Поэтому приходится подбирать некую константу, которая в большинстве случаев превышает максимальную длительность операции. Пример: отправка оповещения в среднем занимает около 150ms, но в 3% случаев превышает 5 секунд. Thread.sleep(5_000) даст нам зеленый тест в c 97% вероятностью.

Нам бы хотелось, чтобы поток ожидал примерно столько же, сколько в действительности выполняется тестируемая операция. Если ее фактическая длительность 150ms, то поток должен находиться в ожидании (150ms + некоторая фиксированная константа, например, еще 50ms). Если операция выполнялась 5s, то и время ожидания должно быть 5s + 50ms.

Для этого необходим механизм, умеющий с заданной периодичностью проверять является ли некоторое условие истинным. Если условие не выполняется, то “засыпать” на короткий промежуток времени, если условие выполняется, то выходить из ожидания.

🔔Пример: клиент отправляет уведомление. Мы хотим протестировать, что после отправки его статус в базе данных будет обновлен на processed. Будем с периодичностью в 50ms делать запрос в базу данных, получать статус и, если он еще не изменен на processed, то ждать следующие 50ms. Нам станет известно, что статус обновился, через кратчайший промежуток времени.

Давайте посмотрим на пример кода, где в качестве такого механизма используется библиотека Awaitility.

id = service.submitNotification(deviceId, “Hello”)

with()
.pollInterval(50_MILLISECONDS)
.await()
.atMost(8, SECONDS)
.until(notificationStatusIsUpdated());


⚙️ У библиотеки есть большое количество конфигурируемых параметров. Например:
1. Период времени между проверкой условия (poll interval)
2. Начальная задержка перед первой проверкой
3. Минимальное время ожидания (например, вы хотите, чтобы оповещение отправлялось не раньше, чем через 5 секунд)
4. Максимальное время ожидания, за которое ваше условие должно выполниться
5. Игнорируемые исключения
6. Пул потоков, на котором будет выполняться тестирование

🐈‍⬛ В библиотеке также присутствует модуль для Kotlin, который помогает использовать API библиотеки в конструкциях, похожих на естественный язык, используя инфиксные функции.

🏋️‍♀️ Вы можете для тренировки написать собственный небольшой инструмент для ожидания (на java, kotlin, kotlin-coroutines) и поделиться кодом в комментариях 😊

#java #kotlin #async
Please open Telegram to view this post
VIEW IN TELEGRAM
👍25🔥8👎3
Траблшутинг кейс

💣 У вас есть простой сервис, который принимает входящие http-запросы, в среднем около 1к rps (requests per second). Он добавляет к запросу небольшое количество метаданных и пересылает по HTTP далее, в следующий сервис.

👀 Вы заметили, что сервис стал хуже справляться с пиковыми нагрузками: 6k rps. Симптомы следующие:

1. При увеличении входящего трафика начинают скапливаться внутренние очереди

2. Время выполнения http запроса в следующий сервис возрастает в разы или даже на порядки. (В норме среднее 30ms, при нагрузке начинает нарастать и растет вплоть до 20-30s)

3. Возрастает значение метрики количества используемых JVM файловых дескрипторов (рост примерно со 100 до 170).

4. Возрастают также и другие показатели: RAM, CPU.

🔎 Давайте исследовать:

1. Подозрение падает на следующий в цепочке сервис. Мы предполагаем, что под нагрузкой время его ответа увеличивается, и это отражается на нашем приложении.

🏓Команда сервиса опровергает эту догадку, предъявляя нам метрики, где видно: среднее время обработки запроса на их стороне оставалось стабильным в течение всего времени.

🧑🏻‍🔬 Мы проводим и свой (не самый репрезентативный) эксперимент - во время пиковой нагрузки, используя какой-нибудь http-client, например, curl, делаем несколько запросов к их сервису с нашей машины и видим, что время ответа около 30ms.

2. Рост файловых дескрипторов. Вспоминаем, что в Linux TCP коннекция является файлом, то есть с ростом числа подключений растет и количество захваченных дескрипторов.

📡 Кто может раздувать количество соединений? - Http-client. Зачем? Из-за особенности протокола HTTP1. Запросы в нем блокируют подключение: пока один запрос не будет выполнен, следующий запрос не отправляется.

3. Зная эту особенность HTTP, давайте считать пропускную способность:

📲 Длительность запроса 30ms, то есть одно подключение может за секунду обрабатывать 1000ms/30ms≈33.3 запроса. Чтобы удовлетворить обычную нагрузку на сервер в 1к, нужно около 1000rps/33.3req ≈ 30 подключений к следующему сервису. Мы знаем, что у нас было 100 дескрипторов, но не знаем, какие из них были подключениями.

4. Мы можем посчитать, сколько нужно подключений, чтобы сервис выдержал 6к запросов без накапливания очереди.

⚖️ Итак 6000 / 33.33 будет ≈ 180 подключений. У нас же есть около 170 (и мы не знаем, сколько из них действительно наши). В любом случае у нас будет накапливаться очередь внутри http-client, в которой запросы могут проводить долгое время, особенно в период повышенной нагрузки.

⚙️ Что можно сделать

1. Увеличить количество подключений per host. Почти у всех http-клиентов есть возможность ограничить количество подключений к одному хосту. Часто это совсем небольшое значение. Например, в OkHttp client или AsyncHttpClient оно по умолчанию равно 5. И все клиенты (почти) дают возможность выставлять кастомное значение этого параметра, например:

AsyncHttpClientConfig.Builder.setMaxConnectionsPerHost(maxConnectionsPerHost)


2. Перейти на протокол HTTP версии 2. Он лишен такого недостатка, как блокирующиеся коннекции. С помощью процесса мультиплексирования HTTP 2 дает возможность одновременно передавать и получать данные большого количества запросов, используя даже единственное соединение. Но переход на него в данном случае возможен только, если вы можете перевести на него и следующий в цепочке сервис, что не всегда в вашей власти.

Ставьте 🔥, если такой формат постов кажется вам интересным :)

#http #case #interview
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥118👍8🤝1
Недавно у меня возникла следующая задача: необходимо запросить данные у трех сервисов, но достаточно получить первый валидный ответ от любого их них, остальные 2 вызова можно не ожидать.

🤝 Интерфейс вызова следующий:
fun performCall(key: Key) : CompletableFuture<Value>


🤯 Я попытылся написать это на java, но ничего адекватного по количеству строк и общей читаемости кода у меня не получилось.

🐣 Решение я нашел в использовании корутин. Первое, что я сделал: превратил CompletableFuture<Value> в Deferred<Value>, используя extension-функцию public fun <T> CompletionStage<T>.asDeferred(): Deferred<T>. Это необходимо, чтобы переместить наш код в плоскость корутин.

🪢 Далее я использовал интересную функцию - select. Ей на вход можно передать любое количество “ожидающих (suspended)” функций и select будет ждать, пока любая из этих функций не вернет результат. Тогда исполнение продолжится и select вернет вам результат этой "победившей" корутины.

🍎 Таким образом, моя задача сводится к довольно простой вещи - поместим все наши вызовы в список и передадим функции select, а она вернет нам первый полученный результат. Это код, который нам нужен:

val calls = services.map { performCall(key).asDeferred() }.toList()
val res = select {
calls.forEach { call ->
call.onAwait { it } // it это результат вызова
}
}


💩 В реальности сервисы могут кидать исключения или возвращать пустой результат null. А нам необходимо дождаться именно первого валидного результата. Мы можем сделать следующее - проанализируем результат и в случае null будем снова заходить в select.

🚨 И тут надо помнить еще один интересный нюанс: если вы снова передадите в select тот же самый список, то он повторно выдаст вам результат ошибочного вызова, ведь он и правда завершен :) Поэтому давайте удалять из списка те вызовы, которые выполнились и не являются валидными. Только не забывайте поместить их в thread-safe контейнер, иначе легко словите ConcurrentModificationException.

Финальное решение выглядит как-то так (за исключением try-catch, чтобы сократить):
val calls = services.map { performCall(key).asDeferred() }.toCollection(ConcurrentHashMap.newKeySet())

var res: Value? = null
while (calls.isNotEmpty()) {
val res = select {
calls.forEach { call ->
call.onAwait { callRes ->
calls.remove(call)
callRes
}
}
}
if (res != null) break
}


☄️ Отмечу еще, что select бывает невероятно полезен в сочетании с использованием каналов (kotlinx.coroutines.Channel). Канал - аналог BlockingQueue в мире корутин, позже выложу пост о них. Если у вас есть задача мониторить множество очередей на предмет новых элементов и сразу же передавать их на обработку, то лучше средства не найти.

🪐 Интересно, что функция select по свой сути является аналогом системного вызова select на основе которого реализован "неблокирующий" ввод-вывод. Он позволяет мониторить множество TCP соединений на предмет новых событий IO и превращать их в единый "уплотненный" или иначе "мультиплексированный" поток событий, что позволяет очень эффективно использовать ресурсы системы. О "мультиплексировании" уже говорили тут и будем говорить еще.

Ставьте 🔥, если было интересно. Если вы знаете код на java, который решает аналогичную задачу и является плюс-минус таким же лаконичным, делитесь в комментах!

#kotlin #coroutines #java
Please open Telegram to view this post
VIEW IN TELEGRAM
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥37👍73🆒1
🗒 У нас есть следующий фрагмент кода:

fun task() {
val data = prepareData()
process(data)
}


👀 Функция prepareData производит получение и подготовку данных для их последующей обработки функцией process.

🎃 При исполнении на одном потоке функция prepareData занимает 5% времени выполнения, process 95%. Единственный поток позволяет выполнять 10 операций в секунду. Важно отметить, что функция prepare работает с разделяемыми данными и должна выполняться под локом в многопоточной среде.

🐌 Наша задача: увеличить пропускную способность, чтобы функция task могла выполняться 100 раз в секунду. Сколько потоков понадобится, чтобы достичь желаемого прироста?

🎈 На эту задачу есть несколько правильных ответов. Какие-то можно обосновать теоретическими расчетами, какие-то практическим экспериментом. Но гораздо интереснее, как бы вы стали рассуждать, если бы получили такую задачу на собеседовании. Так что пиши ваши рассуждения в комменты!

#interview #threads
🔥11🤔4👍3
Сколько бы потоков выделили?
Anonymous Poll
12%
7
41%
11
24%
16
29%
20
7%
32
Ответ 20 на предыдущий опрос может быть не очевидным с первого взгляда. Мы получаем его, воспользовшись законом Амдала

🧶 Итак, в этом посте будем вычислять ответ ответ, опираясь только на теорию.

🎯 Нам нужно увеличить пропускную способность в 10 раз (с 10 операций в секунду до 100). Кажется, что, если один поток может выполнять 10 операций в секунду, то нам нужно увеличить количество потоков пропорционально, до 10.

🔩 Однако, в данной задаче есть нюанс, который делает рост производительности не линейным относительно количества потоков. Иными словами, увеличение потоков в 10 раз не дает пропорциональный выигрыш в производительности.

🔓 Этим нюансом является наличие функции prepareData, которая требует выполнения под локом. Фактически, функция не параллелизуется. Сколько бы потоков мы не добавили, у нас не получится получить для нее никакой выигрыш, говорит закон Амдала. То есть время, которое prepareData занимает от общего времени выполнения, всегда будут оставаться прежним.

🎁 Зато мы можем получать линейный выигрыш от параллелизации остальных 95%, которые занимает функция process.

Доля кода, который никак не получится распараллелить (последовательного) 0.05 (5%). Доля параллелизуемого - 0.95 (95%). Давайте считать:

1. Если у вас 10 потоков, то вы можете в 10 раз ускорить параллелизуемый код, то есть из 0.95 превратить его в 0.095. Сложим теперь доли ускоренного и неускоряемого кода: 0.05 + 0.095 = 0.145. (14.5%, то есть мы стали быстрее на 85.5%). Чтобы понять ВО сколько раз мы ускорились надо единицу поделить на 0.145, получится 6.89. А нам необходим прирост в 10 раз.

2. Если у вас 15 потоков, то вы можете в 15 раз ускорить параллелизуемый код. 0.95 -> 0.063. 0.05 + 0.063 = 0.113. 1 / 0.113 = 8.84. Все еще не 10!

3. Если у вас 20 потоков, то вы можете в 20 раз ускорить параллелизуемый код. 0.95 -> 0.0475. 0.05 + 0.0475 = 0.0975. 1 / 0.0975 = 10.25. Ура! 🏆 Получается, что нам нужно 20 потоков (19 тоже хватит на самом деле).

📈 Данный пост демонстрирует нам закон Амдала, который бывает чрезмерно пессимистичным в своей оценке и рассматривает худший сценарий исполнения. Однако, стоит вынести из него важную идею:

❗️❗️❗️Производительность вашего кода не всегда растет пропорционально увеличению количества потоков. Чем больше в вашем коде фрагментов, требующих последовательного (однопоточного) выполнения, тем меньше прирост производительности при параллелизации.

В следующих постах проверим, выполняется ли закон Амдала на практике для данного кода и увидим еще один способ увеличить пропускную способность нашей функции.
🔥34👍6👌3💯2
⬆️ В прошлом посте мы с помощью закона Амдала выясняли, сколько нужно потоков, чтобы ускорить наш код в 10 раз. Полученный ответ, 20 потоков, занял второе место в нашем опросе, но все еще может казаться довольно удивительным.

🧑‍💻 Как и обещал, я провел небольшой эксперимент для проверки теоретических расчетов и постарался найти объяснение полученным на практике результатам.

Подробнее об этом, а также о выводах и практических рекомендациях читайте по ссылке
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥171👍1🆒1
Последний пост из серии "как ускорить функцию task" 😁

⬆️ Напоминаю, мы с вами задавались вопросом, сколько потребуется потоков, чтобы в 10 раз ускорить функцию task, где 5% кода выполняется под локом. Вычисленный с помощью закона Амдала ответ в 20 потоков был позже поставлен под сомнение практическими экспериментами. Сегодня ставим точку в этой серии постов.

🧱 Предлагаю немного переработать архитектуру данного нам кода:

1. Выделить один отдельный поток, который будет заниматься исключительно препроцессингом данных (функцией prepareData, которая требует исполнения под локом).

2. Выделить группу потоков, которая будет заниматься параллельной обработкой подготовленных данных.

💡 Какие у нас ожидания производительности от данного рефакторинга?

🔎 Вся функция task целиком требует 100ms на выполнение. Функция prepareData занимает 5% этого времени (5ms). Получается, что если поток будет последовательно заниматься только выполнением prepareData, то он сможет делать целых 200 исполнений за одну секунду. Кстати, лок теперь можно убирать, ведь код выполняется на одном потоке.

🧑🏽‍💻 Модифицируем код следующим образом:
class Test {
private val lock = ReentrantLock()

private val sequentialWorker = Executors.newSingleThreadExecutor()
private val parallelWorkers = Executors.newFixedThreadPool(10)

fun task() {
sequentialWorker.submit {
val data = prepareData()
parallelWorkers.submit {
process(data)
}
}
}

private fun prepareData(): Data {
Thread.sleep(5L)
Data(1)
}

private fun process(data: Data) {
Thread.sleep(95L)
}

data class Data(val d: Int)
}


🚀 Здесь используется 11 потоков - один отдельный поток выполняет последовательную часть кода и передает подготовленные данные в группу из оставшихся 10 потоков, которые выполняют обработку подготовленных данных. Как и ожидалось, такой код выполняет около 100 операций в секунду. Более того, при увеличении группы потоков с 10 до 20 этот код выполняет около 200 операций в секунду, что является его максимальным возможным ускорением.

⚖️ Безусловно, данное решение не всегда применимо. Часто блокировки "раскиданы" по всему коду и мы не имеем возможности вынести их в отдельную группу потоков. Но в данном случае, когда препроцессинг всегда осуществляется в одном месте перед вычислениями, мы можем использовать такую архитектуру 🔥.

#threads
🔥15👍4👌1🏆1
Аналог блокирующей очереди для корутин

В одном из предыдущих постов мы обсуждали удобство использования блокирующий очереди (интерфейс java.util.concurrent.BlockingQueue), для коммуникации между потоками. Одна группа потоков помещают данные в хвост очереди, другая группа получает данные из ее головы.

Почему бы нам не использовать такую же очередь для общения корутин друг с другом?

🏈 Дело в том, что операции BlockingQueue могут заблокировать поток исполнения:

- Получение элемента блокируется, если очередь пуста

- Вставка в очередь может блокироваться, если максимальный размер очереди уже достигнут

🧨 Но использовать внутри корутин операции, блокирующие поток, антипаттерн. Поэтому, например, существует отдельный набор мьютексов, которые не блокируют поток, а реализуют взаимное исключение именно для корутин.

🥁 Есть ли неблокирующий аналог для BlockingQueue в мире корутин? Да, интерфейс kotlinx.coroutines.channels.Channel и его реализации концептуально являются воплощениями блокирующей очереди. "Канал" предоставляет методы send и receive, которые тоже обладают блокирующей семантикой, но блокируют они корутину, а не поток, на котором она исполняется.

✍️ Как и очередь, ченнел может иметь ограниченный размер буфера элементов. Вы можете определить его, передав в конструктор. Но есть специальные, зарезервированные значения длины буфера, в зависимости от которых вам будет предоставлена наиболее оптимальная реализация канала. Например, в качестве размера вы можете передать следующие константы:

1. Channel.RENDEZVOUS: в этом случае вам будет создан канал не имеющий никакого буфера. Каждый вызов метода send будет блокировать корутину, пока на встречу (на рандеву) с ним не придет соответствующий метод receive.

2. Channel.UNLIMITED: тут все понятно, буфер будет “бесконечным”, что фактически можно читать, как “сколько памяти хватит”. Метод send в этой реализации не заблокирует корутину никогда.

3. Channel.CONFLATED: довольно интересная реализация канала. Поддерживает буфер размером в один элемент. Каждый последний вызов метода send подменяет то, что было записано ранее. Метод receive соответственно всегда прочитает последнее записанное значение. Как и в прошлом пункте метод send никогда не блокируется.

Создание ченнела с размером буфера равным десяти, запись и чтение из него:

val channel = Channel<Element>(capacity)

// Producer code
channel.send(elementToSend)

// Consumer code
val elementReceived = channel.receive()


Ставьте 🔥, если было полезно и отвечайте на опрос ниже ⬇️

#kotlin #coroutines
🔥133👍2
Все еще в шоке, от того что моя аудитория по большей части не пишет на котлин 😁 Но для тех, кто пишет, продолжу делать посты о нем и о корутинах, очень сильно их люблю.

📝 Напишите в коментах, почему так сложилось, что не используете котлин или корутины в разработке: может быть что-то понравилось / в компании используют другой язык / просто не было возможности попробовать?
14👍5🔥1🥱1🐳1