Канал Андрея про бекенд – Telegram
Канал Андрея про бекенд
1.68K subscribers
9 photos
13 links
👨🏻‍🎓 Я Андрей Суховицкий - tech lead и лектор Университета ИТМО, @sukhoa

🔥 Темы в канале: kotlin, java, coroutines, многопоточное программирование, system design, реализация высоконагруженных и надежных систем.

Посты по средам

Присоединяйтесь!
Download Telegram
👩‍❤️‍👨 Потокобезопасная очередь (в java интерфейс BlockingQueue) часто используется для коммуникации между потоками приложения.

🚌 Если потоку требуется передать какую-то информацию другому потоку, то он помещает (produce) ее в очередь, а поток на другой стороне очереди потребляет (consume) ее и совершает необходимые действия с использованием полученных данных. Обычно общаются между собой не два потока, а две группы потоков, которые соответственно именуются consumers & producers.

🦾 Решение удобное:

- Потоки-продюсеры всегда пишут в конец очереди и в случае, если очередь полностью заполнена, могут приостанавливать "производство" новых сообщений, ожидая, пока появится место.
- Потоки-потребители находятся в бесконечном цикле, получая очередную задачу из очереди, выполняя и сразу направляясь за следующей.

🎎 Продюсеры и консьюмеры при этом могут не знать ничего друг о друге, не иметь разделяемых структур данных.

🕹️ Масштабируется такое решение довольно хорошо. Если потребители не успевают обрабатывать элементы из очереди и ее размер начинает возрастать, то можно увеличить количество консьюмеров или же уменьшить количество производителей. И наоборот, если очередь разгребается мгновенно и какие-то из консьюмеров простаивают - можно убрать лишние или же увеличить количество потоков-производителей.

💼 Еще одним плюсом такого решения является то, что потоки в данной реализации можно создать один раз и переиспользовать в течении всего времени работы приложения. То есть поддерживать некий "пул" потоков. Оптимизация состоит в том, что не требуется постоянно создавать новые потоки, что является очень затратной операцией.

🤝 На самом деле паттерн “пул” применяется и к другим ресурсам, создание которых дорого стоит. Например, TCP подключения, которые требуют round trip на проведение первого рукопожатия . Если сетевая задержка, 50ms, то это целых 100ms на создание одного нового коннекта.

♨️ В следующем посте поговорим про готовую реализацию пула потоков в Java
🔥94👍3💯3
🔼 В посте выше мы обсудили, что для передачи данных между группами потоков удобно использовать потокобезопасную очередь, а для оптимизации дорогостоящего создания потоков стоит использовать "пул" потоков, которые можно переиспользовать в течение работы приложения.

👂 Нет необходимости реализовывать вышеописанные концепции вручную. Интерфейс ExecutorService и его реализации, например, ThreadPoolExecutor, берут эту работу на себя. Они уже поддерживают внутри себя очередь (по умолчанию LinkedBlockingQueue), пул предсозданных потоков, выставляют довольно простое API для отправки задач на выполнение и дают 100500 возможностей изменять свое поведение, как вам необходимо.

✍️ Как создать такой экзекутор? Воспользоваться статическими фабричными методами класса Executors. Самый простой из них принимает на вход только один параметр: необходимое количество потоков в пуле:

val threadsNum = 16 // number of threads-consumers
val executor = Executors.newFixedThreadPool(threadsNum)


🕶️ API экзекуторов дает вам возможность влиять на различные параментры и аспекты его поведения, например:

1. Количество потоков
2. Виды и размеры очередей
3. Стратегии поведения при переполнении очереди (интерфейс RejectedExecutionHandler): выбросить непоместившийся элемент / ждать, пока не появится место / любая кастомная стратегия: сохранене в базу данных, логирование и тд.
4. Стратегия создания потоков (интерфейс ThreadFactory): как будут именоваться потоки, будут ли они потоками демонами или нет и тд.

✍️ Как отправить задачу на выполнение в пул потоков? Достаточно вызвать метод submit и передать туда функцию (интерфейсы Runnable/Callable). Эта функция не будет выполнена мгновенно на текущем потоке, а будет помещена в очередь и выполнится на одном из потоков, поддерживаемых в пуле. Поток вызова в этот момент может продолжать выполняться дальше и жить своей жизнью.

executor.submit({ System.out.println(“я исполняюсь в потоке-консьюмере”) })


#kotlin #java #threads
🔥13👍32🤝2
"Блокирующие" / "синхронные" операции. Почему я их не люблю и что с ними делать.

🔒 "Блокирующими" они называются не просто так - такие операции действительно "блокируют" поток, на котором выполняются, до тех пор, пока не будет получен результат операции. Другое их название: "синхронные".

Арифметическая операция может служить примером блокирующей, например:
val a = b + 3;


💪🏽 В предыдущем примере поток выполняет полезную для нас работу (нам по каким-то причинам необходимо было сложить b и 3).

🤬 Примерами "вредных" блокирующих операций могут служить те, которые вызывают "пассивное" ожидание потока. Приведу пример: чтение байтов из TCP соединения (из сокета). Поток не может контроллировать наличие этих байтов, таким образом, он вынужен "пассивно" ожидать, пока в соединении появится что-то, что можно прочитать.

🚶🏼‍♂️🕳🚶🏼‍♂️🚶🏼‍♂️🕳🕳🕳🚶🏼‍♂️🚶🏼‍♂️ Мы не можем занять этот промежуток безделия чем-то полезным, дать потоку другую работу на выполнение, он просто ждет, пока операция будет выполнена. Поток является нашим производственным ресурсом, как конвейер на фабрике. И в моменты пассивного ожидания он простаивает, мы не добиваемся его максимальной утилизации. Образуются, как я говорю, “дырки” в его использовании.

Есть два основных пути, которыми мы идем, чтобы оптимизировать недостатки блокирующих вызовов.

1. Увеличение количества потоков
2. Использование aсинхронных операций

Ситуация: вы выполняете HTTP вызовы к системе рассылки оповещений. В среднем выполнение вызова занимает 200ms. Вам необходимо достичь пропускной способности 1000 (rps) запросов в секунду.

📈 Метод увеличения количества потоков

При использовании блокирующего вызова каждый поток будет пассивно ожидать результата 200ms, что дает вам пропускную способность 5rps для одного потока. Чтобы сделать 1000 rps вам, таким образом, необходимо 200 потоков. Это уже довольно много - каждый поток занимает место в памяти, есть некие накладные расходы на переключение контекста потоков и тд. Если бы необходимая пропускная способност была бы 10000rps, то количество потоков выросло бы до 2000.

🚀 Метод использования Асинхронных операций

Если ваш http-client поддерживает асинхронное API, то вы можете передать callback в вашу операцию. Приблизительно так:

 httpCLient.execute(httpRequest, new Callback() {
override fun onResponse(response: HttpResponse) {
// handling
}
})


🚶🏼‍♂️🚶🏼‍♂️🚶🏼‍♂️🚶🏼‍♂️🚶🏼‍♂️ И это действительно поможет решить проблемы с пассивным ожиданием потока. Те самые “дырки” - “схлопнутся”, поток будет выполнять исключительно полезную работу: отправлять операцию на исполнение и не будет дожидаться ее результата. В этом случае вам может хватить нескольких потоков, в отличие от сотен в предыдущем случае.

📞 Использование асинхронных операций с коллбеком оптимизирует код, НО ухудшает его читаемость. Можно получить callback-hell (коллбеки, в которых есть другие коллбеки и тд) и потерять все концы при траблшутинге.

Как языки программирования решают эту проблему расскажу в следующем посте

#threads
🔥166👍5
🧠 Итак, нам нужен способ получить все преимущества асинхронного программирования, где потоки не простаивают без работы, но сохранить преимущества блокирующих вызовов - простоту кода. Вызвали операцию - дождались результата в этом же потоке и идем дальше. Скажем, при выполнении HTTP вызова мы хотим использовать такую конструкцию:

val response = performCall(request)


🚜 Но одновременно мы хотим достичь того, чтобы в момент выполнения performCall поток исполнения не блокировался, а передавался для выполнения другой полезной работы. А в момент, когда HTTP вызов закончится исполнение вернулось бы к этому месту в коде и продолжилось.

Именно этого результата позволяет достичь концепция "виртуальных" потоков (их называют и иначе: "легковесными", "зелеными")

🍏 Такие потоки не являются отражением реальных потоков операционной системы, это абстракция языка программирования (в go - горутины, в kotlin - корутины, в java - виртуальные потоки).

🧬 Давайте вспомним, что множество потоков могут использовать одно единственное ядро процессора для "псевдопараллельного" выполнения: выделяем потоку квант времени процессора, потом сохраняем контекст его выполнения в памяти, загружаем контекст другого потока и даем выполняться ему. Таким образом сотни и даже тысячи потоков могут создавать иллюзию одновременного выполнения на ограниченном количестве ядер.

🚀 Аналогично сотни и тысячи виртуальных потоков могут выполняться на единственном потоке операционной системы. Реализация языка программирования должна уметь сохранять контекст выполнения виртуальных потоков в памяти, уметь его восстанавливать и продолжать исполнение.

Еще раз: множество физических потоков может выполняться на одном ядре, а множество виртуальных потоков может выполняться на одном физическом потоке

🫁 В языке Kotlin корутины (coroutines) являются воплощением виртуальных потоков. Используя их, вы можете писать код так, как будто бы в нем нет асинхронности, но одновременно получать ее преимущества.

#kotlin #coroutines #threads
🔥15👍9🆒31
Буферизация запросов

🚶🏻‍♀️🚶🏻🚶🏼‍♂️🚶🏻‍♀️ Очередь - очень полезная структура данных. Мы уже обсуждали ее использование для коммуникации потоков приложения. Аналогично она может использоваться и для общения между сервисами через шину/брокера, например Kafka или RabbitMQ.

🧳 В этом посте давайте обсудим другой распространенный пример использования очередей - буферизация запросов. Если сервис не может выполнить входящий запрос немедленно, то можно на время помещать этот запрос в контейнер/буфер для последующей обработки.

🧨 Такой буфер может быть помещен на входе в сервис (очередь http-запросов). Множество таких очередей может присутствовать и внутри сервиса и часто довольно неявно для вас. Вы можете даже не подозревать об их присутствии. Примеры: очередь на получение коннекции к БД из пула, очереди на получение блокировки (любой лок, семафор, мьютекс), очередь к пулу потоков (они тоже могут быть скрыты внутри библиотек и фреймворков, которые вы используете).

📦 Проблема запросов, которые не могут быть обработаны сервисом немедленно

Когда скорость поступления запросов начинает превышать скорость их выполнения вашим сервисом, вы оказываетесь перед выбором, что делать:

1. Отклонять поступающие запросы сразу: возвращать отправителю со словами, что возможности сервиса превышены. Это самый простой и иногда самый верный способ.

2. “Попридерживать” запросы, пока не сможем их выполнить. Тут мы и начинаем помещать такие запросы в очередь.

💀 Важно в этот момент понимать, что очередь никак не ускоряет работу вашего сервиса, она лишь копит запросы в надежде, что в будущем нагрузка ослабнет (или сервис начнет работать быстрее) и запрос будет выполнен. Если же входная нагрузка на ваш сервис всегда превышает его возможность обработать такое количество запросов, то очередь будет расти бесконечно и вы имеете все шансы потратить всю память приложения на хранение запросов, которые никогда не будут обработаны. Заметим также, что из-за содержания в очереди может расти и само время обработки запроса.

✍️ Вывод: все очереди должны быть ограничены. А подходящие размеры вы подбираете сами, опираясь на знания о вашем сервисе и требования к обработке трафика.

🎃 Получается, очереди ничем не могут нам помочь и только откладывают неизбежное?

Это не так. Буферизация запросов очень полезна в моменты временных повышений нагрузки, она помогает “придержать” в очередях избыточное количество трафика и обработать его позже, когда нагрузка спадет, не потеряв запросы. Однако вам придется принять решение, как поступать при переполнении очередей. Стратегии могут быть различные от выкидывания запросов до сохранения их в персистентные хранилища для последующей обработки. Если вы используете ExecutorService, то можете определить вашу собственную стратегию при переполнении очереди с помощью класса RejectedExecutionHandler или использовать одну из дефолтных стратегий.

#highload #queues
👍13🔥63😍1
Forwarded from Oleg
А что делать в ситуации, когда сервис набрал запросов в очередь и упал? А нам как раз важно не терять запросы
5
☝🏽 Зависит от природы очереди. Если это очередь в памяти приложения (например, очередь ExecutorService), то вместе с падением приложения потеряется и содержимое очереди (можно что-то накрутить, но никакой гарантии не будет).

🚌 Чтобы избежать такой ситуации можно, например, помещать трафик в персистентное хранилище (Kafka, RabitMQ в персистентном режиме, или просто БД), вычитывать оттуда в темпе, комфортном для приложения, обрабатывать, а потом "коммитить" сообщения, то есть отмечать в очереди, что сообщение обработано (ну или вообще удалять). В Kafka для этого есть возможность делать "коммит (acknowlegment)", сохранять индекс последнего обработанного сообщения в партиции. В Rabbit тоже есть механизм consumer acknowlegment. В базе данных это может быть просто флаг isProcessed, например.

🪨 Есть даже вариант сохранять все сообщения прямо на файловую систему во втроенную БД (например, RocksDB). Или в очередь Redis (c включенным persistent режимом), хотя там есть свои нюансы.

🔐 Если же не хочется терять много сообщений, но чуть-чуть не страшно, то можно просто ограничивать размеры очередей, чтобы не потерять миллионы скопившихся сообщений.

🔩 Ну и ограничивать каким-то образом ваших клиентов, чтобы не "убивали" вас потоком сообщений. Для этого есть механизмы back-pressure.

- Вы можете установить лимиты скорости - rate limits (количество сообщений в единицу времени), на параллельные запросы (пример, не более 10 параллельных запросов от отного клиента в каждый момент), лимиты на входящие коннекты и тд.

- В хорошем случае вы можете давать клиенту фидбек ("я готов принять столько-то сообщений" / "стоп, пока не шли") и тд. А в идеальном случае вы сами сможете контроллировать поток сообщений "вычитывая" (read/pull) сообщения от клиента. Вроде того, как вы вычитываете из Кафки. Тут вы прямо минимизируете количество трафика в памяти и соответственно потенциальные потери.

💎 Ну и самое важное в вашем случае - чтобы клиент (тот сервис, который вам послал эти сообщения) имел механизм retry, помнил, какие сообщение не были обработаны и повторял попытки. В общем, сам обо всем заботился :)
🔥14👍5🍾2
Кейс с собеседования

⬇️ Ниже приведен фрагмент кода. Представьте, что этот код работает у вас на продакшене прямо сейчас.

Один поток Producer генерирует последовательность целых чисел. Несколько потоков Consumer получают эти числа из очереди и производят маппинг этих чисел на другие.

class Consumer(
private val codesQueue: BlockingQueue<Int>,
private val codeMappings: List<Int>
) {
@Volatile
var isActive: Boolean = true

fun consume() {
while (isActive || codesQueue.isNotEmpty()) {
val code = codesQueue.take()
log.info("Code $code mapped to ${codeMappings[code]}")
// do something with errorCodeMapping
}
}
}

class Producer(private val codesQueue: BlockingQueue<Int>) {
@Volatile
var isActive: Boolean = true

fun supply() {
while (isActive) {
codesQueue.put(Random().nextInt())
}
}
}


🦥 В какой-то момент вы замечаете, что маппинг не происходит: программа перестает писать в файл info-логи. В логах нет никаких ошибок. Вы проверяете ваш java-процесс с помощью ps -aux | grep java, он жив — то есть программа не завершилась.

👩🏻‍⚕️ Вы не знаете воспроизведется ли в следующий раз такая ситуация, поэтому не хотите останавливать процесс. Ваша задача придумать способ провести диагностику происходящего "на лету", здесь и сейчас. Что бы вы сделали?

🥟 Не спешите открывать ответ, подумайте, какие варианты вы бы могли предложить на собеседовании. Пишите ваши варианты в комментарии. Ответ, который я ожидал бы услышать: thread dump.Подробности в следующем посте.

💡 Попробуйте проанализировать код, попытайтесь найти обстоятельства, которые привели к симптомам, описанным выше. Какие изменения вы бы внесли в код для предотвращения подобных ситуаций? Пишите в комменты!

#java #kotlin #threads #case #interview
🔥11👍53🤝1
Thread dump и анализ кейса

📸 Дампом потоков называют снимок состояния всех текущих потоков JVM. Он очень полезен для диагностики проблем с производительностью, блокировками, утечками ресурсов. Дамп представляет из себя текстовый файл, где для каждого из существующих потоков java-процесса вы можете найти его состояние (RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED) и полный стек вызовов.

⬆️ Вспоминаем ситуацию, описанную в предыдущем посте:

Программа перестает писать info-логи в файл. В логах нет никаких ошибок. Вы проверяете ваш java-процесс с помощью ps -aux | grep java, - он жив, то есть программа не завершилась.

🔧 В этом случае для диагностики вы можете прибегнуть к thread dump. Примеры утилит, которые помогут вам сделать снимок потоков: jstack, jvisualvm, JMC (Java Mission Control), jcmd. Ниже пример команды для утилиты jstack, на вход необходимо передать pid java-процесса.

jstack <pid> > threaddump.txt


🧨 Ниже приведен фрагмент thread dump, представляющий интересующий нас поток-producer.

"Thread-0" #16 daemon prio=5 os_prio=31 cpu=0.97ms elapsed=3.79s tid=0x00007fd90d0ed800 nid=0x5f03 waiting on condition  [0x000070000a578000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@17.0.9/Native Method)
. . . . . . .
at java.util.concurrent.LinkedBlockingQueue.put(java.base@17.0.9/LinkedBlockingQueue.java:343)
at ru.quipy.Producer.supply(Example.kt:30)
at ru.quipy.ExampleKt.main$lambda-4(Example.kt:48)
at ru.quipy.ExampleKt$$Lambda$32/0x0000000132014658.run(Unknown Source)
at java.lang.Thread.run(java.base@17.0.9/Thread.java:840)


🧐 Как мы видим, поток находится в состоянии перманентного (не ограниченного временем) ожидания (java.lang.Thread.State: WAITING) в методе LinkedBlockingQueue.put. Если вы находитесь на собеседовании, то можете сказать, что лучше использовать аналогичный метод LinkedBlockingQueue.offer, который не блокируется навечно, а использует ограниченную временем блокировку, что даст нам возможность выйти из ожидания, понять, что операция не удалась и сделать об этом запись в лог. Аналогично и для метода LinkedBlockingQueue.take: вообще все блокирущие методы стоит ограничивать временем, тогда не возникнет ситуация с блокировкой потока при завершении программы, которую вы описали в комментах к предыдущему посту.

🔍 Очень подозрительно, но мы не можем найти дамп потока consumer из чего может следовать вывод, что такового в JVM нет. Вероятно, данный поток уже завершился. Это объясняет, почему он больше не делает записи в логи и почему поток supplier не может поместить ничего в очередь: она достигла максимального размера из-за отсутствия потока-потребителя.

🕵🏽 Поток завершился в следствие появления неожиданного исключения. Это могло произойти в следующей строке: log.info("Consumed $code. Mapped to ${codeMappings[code]}"), если элемента с индексом code в списке не существует.

🤯 А исключения мы могли не увидеть в логах по нескольким причинам. Например, вызывающий нас код мог поймать исключение, завершить поток и не записать об этом в лог. Или же был использован базовый обработчик исключения в потоке (interface UncaughtExceptionHandler), дефолтная версия которого (class ThreadGroup) просто делает запись в System.err, который мог не перенаправляться в лог.

🥇 Какие выводы мы можем сделать?

1. Следует оборачивать происходящее в потоке в глобальный обработчик исключений try-catch, логировать перехваченные исключения и поддерживать работу потока, чтобы приложение могло прогрессировать даже вопреки ошибочным ситуациям.

2. Следует явно ограничивать временем методы, которые блокируют поток исполнения или использовать их варианты/альтернативы с таймаутом!

#java #kotlin #threads #interview #case
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥18👍42🤝1
👨🏻‍🎓 Всем привет! Меня зовут Андрей Суховицкий. Я tech lead и лектор Университета ИТМО. На этом канале я рассказываю про backend. С предложениями тем, фидбеком пишите: @sukhoa

🔥 Основные темы: kotlin, java, coroutines, многопоточное программирование, system design, реализация высоконагруженных и надежных систем.

Вы можете проголосовать за канал
_______________________________

Мой Youtube-канал
Мой курс по event-sourcing. Промокод TG_TEAM на -30%. Сейчас доступен только тариф без ментора.
_______________

Навигация:
#java, #kotlin - посты, релевантные для разработчика на этих языках
#threads - многопоточка и другое полезное о потоках
#highload - все, связанное с реализацией высоконагруженных систем
#coroutines - связанное с kotlin-coroutines, их применением и реализацией
#queues - кейсы, связанные с различными видами очередей - in-memory, брокеры и тд
#case - описание реальных ситуаций и их траблшутинг
#interview - потенциальные вопросы на собеседовании, анализ, разбор
#metrics_basics - основы сбора, хранения и визуализации метрик, prometheus
🤝7🔥1
Канал Андрея про бекенд pinned «👨🏻‍🎓 Всем привет! Меня зовут Андрей Суховицкий. Я tech lead и лектор Университета ИТМО. На этом канале я рассказываю про backend. С предложениями тем, фидбеком пишите: @sukhoa 🔥 Основные темы: kotlin, java, coroutines, многопоточное программирование, system…»
Давайте потокам осмысленные имена

🚨 Это архиважная вещь в промышленной разработке на java. Вы могли заметить, что необходимый нам поток из поста выше 👆🏼носит название Thread-0 . Я без труда нашел его в thread dump только потому, что программа была мала. Но даже ее дамп включал 21 поток, подавляющее большинство которых было служебными потоками JVM (в основном потоками сборщика мусора).

🔍 Крупные сервисы могут иметь сотни и даже тысячи потоков. Может быть очень сложно найти среди них именно те, что нужны вам. Но задача будет гораздо проще, если вы будете давать осмысленные имена потокам и пулам потоков, которые работают над одним типом задач.

🧵 Давайте начнем с одиночных потоков. Конечно, мы не часто создаем потоки через конструктор класса Thread, но полезно знать, что этот конструктор позволяет вам дать потоку имя.

val producerThread = Thread(null, { producer.supply() }, "producer-thread")


🏭Теперь давайте перейдем к пулам потоков. Статический метод Executors.newFixedThreadPool, через который удобно создавать пулы потоков, принимает на вход необходимый размер пула и еще один интересный параметр с типом ThreadFactory. У этого интерфейса всего один метод Thread newThread(Runnable task). Пул потоков делегирует переданному объекту ThreadFactory задачу создания потоков. Давайте реализуем свой вариант ThreadFactory, который именует потоки необходимым образом и передадим такой объект нашему пулу:

class NamedThreadFactory(private val prefix: String) : ThreadFactory {
private val sequence = AtomicInteger(1)

override fun newThread(r: Runnable): Thread {
val thread = Thread(r)
val seq = sequence.getAndIncrement()
thread.name = prefix + (if (seq > 1) "-$seq" else "")
return thread
}
}


🚲 На вход классу NamedThreadFactory передаем строку, которая будет являться префиксом имени всех потоков, созданных этой фабрикой. С помощью атомарного каунтера sequence мы сможем создавать уникальную часть имени для наших потоков🥇🥈🥉. Теперь давайте передадим эту фабрику пулу:

val httpCallsExecutor = Executors.newFixedThreadPool(16, NamedThreadFactory(“http-calls-pool”))


🎯 Вуаля, теперь в thread dump ваши потоки будут иметь осмысленные имена и вам не придется тратить уйму времени на изучение стека ненужных потоков!

#java #kotlin #threads
Please open Telegram to view this post
VIEW IN TELEGRAM
👍16🔥4🤝1