👩❤️👨 Потокобезопасная очередь (в java интерфейс BlockingQueue) часто используется для коммуникации между потоками приложения.
🚌 Если потоку требуется передать какую-то информацию другому потоку, то он помещает (produce) ее в очередь, а поток на другой стороне очереди потребляет (consume) ее и совершает необходимые действия с использованием полученных данных. Обычно общаются между собой не два потока, а две группы потоков, которые соответственно именуются consumers & producers.
🦾 Решение удобное:
- Потоки-продюсеры всегда пишут в конец очереди и в случае, если очередь полностью заполнена, могут приостанавливать "производство" новых сообщений, ожидая, пока появится место.
- Потоки-потребители находятся в бесконечном цикле, получая очередную задачу из очереди, выполняя и сразу направляясь за следующей.
🎎 Продюсеры и консьюмеры при этом могут не знать ничего друг о друге, не иметь разделяемых структур данных.
🕹️ Масштабируется такое решение довольно хорошо. Если потребители не успевают обрабатывать элементы из очереди и ее размер начинает возрастать, то можно увеличить количество консьюмеров или же уменьшить количество производителей. И наоборот, если очередь разгребается мгновенно и какие-то из консьюмеров простаивают - можно убрать лишние или же увеличить количество потоков-производителей.
💼 Еще одним плюсом такого решения является то, что потоки в данной реализации можно создать один раз и переиспользовать в течении всего времени работы приложения. То есть поддерживать некий "пул" потоков. Оптимизация состоит в том, что не требуется постоянно создавать новые потоки, что является очень затратной операцией.
🤝 На самом деле паттерн “пул” применяется и к другим ресурсам, создание которых дорого стоит. Например, TCP подключения, которые требуют round trip на проведение первого рукопожатия . Если сетевая задержка, 50ms, то это целых 100ms на создание одного нового коннекта.
♨️ В следующем посте поговорим про готовую реализацию пула потоков в Java
🚌 Если потоку требуется передать какую-то информацию другому потоку, то он помещает (produce) ее в очередь, а поток на другой стороне очереди потребляет (consume) ее и совершает необходимые действия с использованием полученных данных. Обычно общаются между собой не два потока, а две группы потоков, которые соответственно именуются consumers & producers.
🦾 Решение удобное:
- Потоки-продюсеры всегда пишут в конец очереди и в случае, если очередь полностью заполнена, могут приостанавливать "производство" новых сообщений, ожидая, пока появится место.
- Потоки-потребители находятся в бесконечном цикле, получая очередную задачу из очереди, выполняя и сразу направляясь за следующей.
🎎 Продюсеры и консьюмеры при этом могут не знать ничего друг о друге, не иметь разделяемых структур данных.
🕹️ Масштабируется такое решение довольно хорошо. Если потребители не успевают обрабатывать элементы из очереди и ее размер начинает возрастать, то можно увеличить количество консьюмеров или же уменьшить количество производителей. И наоборот, если очередь разгребается мгновенно и какие-то из консьюмеров простаивают - можно убрать лишние или же увеличить количество потоков-производителей.
💼 Еще одним плюсом такого решения является то, что потоки в данной реализации можно создать один раз и переиспользовать в течении всего времени работы приложения. То есть поддерживать некий "пул" потоков. Оптимизация состоит в том, что не требуется постоянно создавать новые потоки, что является очень затратной операцией.
🤝 На самом деле паттерн “пул” применяется и к другим ресурсам, создание которых дорого стоит. Например, TCP подключения, которые требуют round trip на проведение первого рукопожатия . Если сетевая задержка, 50ms, то это целых 100ms на создание одного нового коннекта.
♨️ В следующем посте поговорим про готовую реализацию пула потоков в Java
🔥9❤4👍3💯3
🔼 В посте выше мы обсудили, что для передачи данных между группами потоков удобно использовать потокобезопасную очередь, а для оптимизации дорогостоящего создания потоков стоит использовать "пул" потоков, которые можно переиспользовать в течение работы приложения.
👂 Нет необходимости реализовывать вышеописанные концепции вручную. Интерфейс
✍️ Как создать такой экзекутор? Воспользоваться статическими фабричными методами класса
🕶️ API экзекуторов дает вам возможность влиять на различные параментры и аспекты его поведения, например:
1. Количество потоков
2. Виды и размеры очередей
3. Стратегии поведения при переполнении очереди (интерфейс
4. Стратегия создания потоков (интерфейс
✍️ Как отправить задачу на выполнение в пул потоков? Достаточно вызвать метод
#kotlin #java #threads
👂 Нет необходимости реализовывать вышеописанные концепции вручную. Интерфейс
ExecutorService и его реализации, например, ThreadPoolExecutor, берут эту работу на себя. Они уже поддерживают внутри себя очередь (по умолчанию LinkedBlockingQueue), пул предсозданных потоков, выставляют довольно простое API для отправки задач на выполнение и дают 100500 возможностей изменять свое поведение, как вам необходимо.✍️ Как создать такой экзекутор? Воспользоваться статическими фабричными методами класса
Executors. Самый простой из них принимает на вход только один параметр: необходимое количество потоков в пуле:val threadsNum = 16 // number of threads-consumers
val executor = Executors.newFixedThreadPool(threadsNum)
🕶️ API экзекуторов дает вам возможность влиять на различные параментры и аспекты его поведения, например:
1. Количество потоков
2. Виды и размеры очередей
3. Стратегии поведения при переполнении очереди (интерфейс
RejectedExecutionHandler): выбросить непоместившийся элемент / ждать, пока не появится место / любая кастомная стратегия: сохранене в базу данных, логирование и тд.4. Стратегия создания потоков (интерфейс
ThreadFactory): как будут именоваться потоки, будут ли они потоками демонами или нет и тд.✍️ Как отправить задачу на выполнение в пул потоков? Достаточно вызвать метод
submit и передать туда функцию (интерфейсы Runnable/Callable). Эта функция не будет выполнена мгновенно на текущем потоке, а будет помещена в очередь и выполнится на одном из потоков, поддерживаемых в пуле. Поток вызова в этот момент может продолжать выполняться дальше и жить своей жизнью.executor.submit({ System.out.println(“я исполняюсь в потоке-консьюмере”) })
#kotlin #java #threads
🔥13👍3❤2🤝2
"Блокирующие" / "синхронные" операции. Почему я их не люблю и что с ними делать.
🔒 "Блокирующими" они называются не просто так - такие операции действительно "блокируют" поток, на котором выполняются, до тех пор, пока не будет получен результат операции. Другое их название: "синхронные".
➕ Арифметическая операция может служить примером блокирующей, например:
💪🏽 В предыдущем примере поток выполняет полезную для нас работу (нам по каким-то причинам необходимо было сложить b и 3).
🤬 Примерами "вредных" блокирующих операций могут служить те, которые вызывают "пассивное" ожидание потока. Приведу пример: чтение байтов из TCP соединения (из сокета). Поток не может контроллировать наличие этих байтов, таким образом, он вынужен "пассивно" ожидать, пока в соединении появится что-то, что можно прочитать.
🚶🏼♂️🕳🚶🏼♂️🚶🏼♂️🕳🕳🕳🚶🏼♂️🚶🏼♂️ Мы не можем занять этот промежуток безделия чем-то полезным, дать потоку другую работу на выполнение, он просто ждет, пока операция будет выполнена. Поток является нашим производственным ресурсом, как конвейер на фабрике. И в моменты пассивного ожидания он простаивает, мы не добиваемся его максимальной утилизации. Образуются, как я говорю, “дырки” в его использовании.
Есть два основных пути, которыми мы идем, чтобы оптимизировать недостатки блокирующих вызовов.
1. Увеличение количества потоков
2. Использование aсинхронных операций
Ситуация: вы выполняете HTTP вызовы к системе рассылки оповещений. В среднем выполнение вызова занимает 200ms. Вам необходимо достичь пропускной способности 1000 (rps) запросов в секунду.
📈 Метод увеличения количества потоков
При использовании блокирующего вызова каждый поток будет пассивно ожидать результата 200ms, что дает вам пропускную способность 5rps для одного потока. Чтобы сделать 1000 rps вам, таким образом, необходимо 200 потоков. Это уже довольно много - каждый поток занимает место в памяти, есть некие накладные расходы на переключение контекста потоков и тд. Если бы необходимая пропускная способност была бы 10000rps, то количество потоков выросло бы до 2000.
🚀 Метод использования Асинхронных операций
Если ваш http-client поддерживает асинхронное API, то вы можете передать callback в вашу операцию. Приблизительно так:
🚶🏼♂️🚶🏼♂️🚶🏼♂️🚶🏼♂️🚶🏼♂️ И это действительно поможет решить проблемы с пассивным ожиданием потока. Те самые “дырки” - “схлопнутся”, поток будет выполнять исключительно полезную работу: отправлять операцию на исполнение и не будет дожидаться ее результата. В этом случае вам может хватить нескольких потоков, в отличие от сотен в предыдущем случае.
📞 Использование асинхронных операций с коллбеком оптимизирует код, НО ухудшает его читаемость. Можно получить callback-hell (коллбеки, в которых есть другие коллбеки и тд) и потерять все концы при траблшутинге.
⏬ Как языки программирования решают эту проблему расскажу в следующем посте
#threads
🔒 "Блокирующими" они называются не просто так - такие операции действительно "блокируют" поток, на котором выполняются, до тех пор, пока не будет получен результат операции. Другое их название: "синхронные".
➕ Арифметическая операция может служить примером блокирующей, например:
val a = b + 3;
💪🏽 В предыдущем примере поток выполняет полезную для нас работу (нам по каким-то причинам необходимо было сложить b и 3).
🤬 Примерами "вредных" блокирующих операций могут служить те, которые вызывают "пассивное" ожидание потока. Приведу пример: чтение байтов из TCP соединения (из сокета). Поток не может контроллировать наличие этих байтов, таким образом, он вынужен "пассивно" ожидать, пока в соединении появится что-то, что можно прочитать.
🚶🏼♂️🕳🚶🏼♂️🚶🏼♂️🕳🕳🕳🚶🏼♂️🚶🏼♂️ Мы не можем занять этот промежуток безделия чем-то полезным, дать потоку другую работу на выполнение, он просто ждет, пока операция будет выполнена. Поток является нашим производственным ресурсом, как конвейер на фабрике. И в моменты пассивного ожидания он простаивает, мы не добиваемся его максимальной утилизации. Образуются, как я говорю, “дырки” в его использовании.
Есть два основных пути, которыми мы идем, чтобы оптимизировать недостатки блокирующих вызовов.
1. Увеличение количества потоков
2. Использование aсинхронных операций
Ситуация: вы выполняете HTTP вызовы к системе рассылки оповещений. В среднем выполнение вызова занимает 200ms. Вам необходимо достичь пропускной способности 1000 (rps) запросов в секунду.
📈 Метод увеличения количества потоков
При использовании блокирующего вызова каждый поток будет пассивно ожидать результата 200ms, что дает вам пропускную способность 5rps для одного потока. Чтобы сделать 1000 rps вам, таким образом, необходимо 200 потоков. Это уже довольно много - каждый поток занимает место в памяти, есть некие накладные расходы на переключение контекста потоков и тд. Если бы необходимая пропускная способност была бы 10000rps, то количество потоков выросло бы до 2000.
🚀 Метод использования Асинхронных операций
Если ваш http-client поддерживает асинхронное API, то вы можете передать callback в вашу операцию. Приблизительно так:
httpCLient.execute(httpRequest, new Callback() {
override fun onResponse(response: HttpResponse) {
// handling
}
})
🚶🏼♂️🚶🏼♂️🚶🏼♂️🚶🏼♂️🚶🏼♂️ И это действительно поможет решить проблемы с пассивным ожиданием потока. Те самые “дырки” - “схлопнутся”, поток будет выполнять исключительно полезную работу: отправлять операцию на исполнение и не будет дожидаться ее результата. В этом случае вам может хватить нескольких потоков, в отличие от сотен в предыдущем случае.
📞 Использование асинхронных операций с коллбеком оптимизирует код, НО ухудшает его читаемость. Можно получить callback-hell (коллбеки, в которых есть другие коллбеки и тд) и потерять все концы при траблшутинге.
⏬ Как языки программирования решают эту проблему расскажу в следующем посте
#threads
🔥16❤6👍5
🧠 Итак, нам нужен способ получить все преимущества асинхронного программирования, где потоки не простаивают без работы, но сохранить преимущества блокирующих вызовов - простоту кода. Вызвали операцию - дождались результата в этом же потоке и идем дальше. Скажем, при выполнении HTTP вызова мы хотим использовать такую конструкцию:
🚜 Но одновременно мы хотим достичь того, чтобы в момент выполнения
Именно этого результата позволяет достичь концепция "виртуальных" потоков (их называют и иначе: "легковесными", "зелеными")
🍏 Такие потоки не являются отражением реальных потоков операционной системы, это абстракция языка программирования (в go - горутины, в kotlin - корутины, в java - виртуальные потоки).
🧬 Давайте вспомним, что множество потоков могут использовать одно единственное ядро процессора для "псевдопараллельного" выполнения: выделяем потоку квант времени процессора, потом сохраняем контекст его выполнения в памяти, загружаем контекст другого потока и даем выполняться ему. Таким образом сотни и даже тысячи потоков могут создавать иллюзию одновременного выполнения на ограниченном количестве ядер.
🚀 Аналогично сотни и тысячи виртуальных потоков могут выполняться на единственном потоке операционной системы. Реализация языка программирования должна уметь сохранять контекст выполнения виртуальных потоков в памяти, уметь его восстанавливать и продолжать исполнение.
🫁 В языке Kotlin корутины (coroutines) являются воплощением виртуальных потоков. Используя их, вы можете писать код так, как будто бы в нем нет асинхронности, но одновременно получать ее преимущества.
#kotlin #coroutines #threads
val response = performCall(request)
🚜 Но одновременно мы хотим достичь того, чтобы в момент выполнения
performCall поток исполнения не блокировался, а передавался для выполнения другой полезной работы. А в момент, когда HTTP вызов закончится исполнение вернулось бы к этому месту в коде и продолжилось.Именно этого результата позволяет достичь концепция "виртуальных" потоков (их называют и иначе: "легковесными", "зелеными")
🍏 Такие потоки не являются отражением реальных потоков операционной системы, это абстракция языка программирования (в go - горутины, в kotlin - корутины, в java - виртуальные потоки).
🧬 Давайте вспомним, что множество потоков могут использовать одно единственное ядро процессора для "псевдопараллельного" выполнения: выделяем потоку квант времени процессора, потом сохраняем контекст его выполнения в памяти, загружаем контекст другого потока и даем выполняться ему. Таким образом сотни и даже тысячи потоков могут создавать иллюзию одновременного выполнения на ограниченном количестве ядер.
🚀 Аналогично сотни и тысячи виртуальных потоков могут выполняться на единственном потоке операционной системы. Реализация языка программирования должна уметь сохранять контекст выполнения виртуальных потоков в памяти, уметь его восстанавливать и продолжать исполнение.
Еще раз: множество физических потоков может выполняться на одном ядре, а множество виртуальных потоков может выполняться на одном физическом потоке
🫁 В языке Kotlin корутины (coroutines) являются воплощением виртуальных потоков. Используя их, вы можете писать код так, как будто бы в нем нет асинхронности, но одновременно получать ее преимущества.
#kotlin #coroutines #threads
🔥15👍9🆒3❤1
Буферизация запросов
🚶🏻♀️🚶🏻🚶🏼♂️🚶🏻♀️ Очередь - очень полезная структура данных. Мы уже обсуждали ее использование для коммуникации потоков приложения. Аналогично она может использоваться и для общения между сервисами через шину/брокера, например Kafka или RabbitMQ.
🧳 В этом посте давайте обсудим другой распространенный пример использования очередей - буферизация запросов. Если сервис не может выполнить входящий запрос немедленно, то можно на время помещать этот запрос в контейнер/буфер для последующей обработки.
🧨 Такой буфер может быть помещен на входе в сервис (очередь http-запросов). Множество таких очередей может присутствовать и внутри сервиса и часто довольно неявно для вас. Вы можете даже не подозревать об их присутствии. Примеры: очередь на получение коннекции к БД из пула, очереди на получение блокировки (любой лок, семафор, мьютекс), очередь к пулу потоков (они тоже могут быть скрыты внутри библиотек и фреймворков, которые вы используете).
📦 Проблема запросов, которые не могут быть обработаны сервисом немедленно
Когда скорость поступления запросов начинает превышать скорость их выполнения вашим сервисом, вы оказываетесь перед выбором, что делать:
1. Отклонять поступающие запросы сразу: возвращать отправителю со словами, что возможности сервиса превышены. Это самый простой и иногда самый верный способ.
2. “Попридерживать” запросы, пока не сможем их выполнить. Тут мы и начинаем помещать такие запросы в очередь.
💀 Важно в этот момент понимать, что очередь никак не ускоряет работу вашего сервиса, она лишь копит запросы в надежде, что в будущем нагрузка ослабнет (или сервис начнет работать быстрее) и запрос будет выполнен. Если же входная нагрузка на ваш сервис всегда превышает его возможность обработать такое количество запросов, то очередь будет расти бесконечно и вы имеете все шансы потратить всю память приложения на хранение запросов, которые никогда не будут обработаны. Заметим также, что из-за содержания в очереди может расти и само время обработки запроса.
✍️ Вывод: все очереди должны быть ограничены. А подходящие размеры вы подбираете сами, опираясь на знания о вашем сервисе и требования к обработке трафика.
🎃 Получается, очереди ничем не могут нам помочь и только откладывают неизбежное?
Это не так. Буферизация запросов очень полезна в моменты временных повышений нагрузки, она помогает “придержать” в очередях избыточное количество трафика и обработать его позже, когда нагрузка спадет, не потеряв запросы. Однако вам придется принять решение, как поступать при переполнении очередей. Стратегии могут быть различные от выкидывания запросов до сохранения их в персистентные хранилища для последующей обработки. Если вы используете
#highload #queues
🚶🏻♀️🚶🏻🚶🏼♂️🚶🏻♀️ Очередь - очень полезная структура данных. Мы уже обсуждали ее использование для коммуникации потоков приложения. Аналогично она может использоваться и для общения между сервисами через шину/брокера, например Kafka или RabbitMQ.
🧳 В этом посте давайте обсудим другой распространенный пример использования очередей - буферизация запросов. Если сервис не может выполнить входящий запрос немедленно, то можно на время помещать этот запрос в контейнер/буфер для последующей обработки.
🧨 Такой буфер может быть помещен на входе в сервис (очередь http-запросов). Множество таких очередей может присутствовать и внутри сервиса и часто довольно неявно для вас. Вы можете даже не подозревать об их присутствии. Примеры: очередь на получение коннекции к БД из пула, очереди на получение блокировки (любой лок, семафор, мьютекс), очередь к пулу потоков (они тоже могут быть скрыты внутри библиотек и фреймворков, которые вы используете).
📦 Проблема запросов, которые не могут быть обработаны сервисом немедленно
Когда скорость поступления запросов начинает превышать скорость их выполнения вашим сервисом, вы оказываетесь перед выбором, что делать:
1. Отклонять поступающие запросы сразу: возвращать отправителю со словами, что возможности сервиса превышены. Это самый простой и иногда самый верный способ.
2. “Попридерживать” запросы, пока не сможем их выполнить. Тут мы и начинаем помещать такие запросы в очередь.
💀 Важно в этот момент понимать, что очередь никак не ускоряет работу вашего сервиса, она лишь копит запросы в надежде, что в будущем нагрузка ослабнет (или сервис начнет работать быстрее) и запрос будет выполнен. Если же входная нагрузка на ваш сервис всегда превышает его возможность обработать такое количество запросов, то очередь будет расти бесконечно и вы имеете все шансы потратить всю память приложения на хранение запросов, которые никогда не будут обработаны. Заметим также, что из-за содержания в очереди может расти и само время обработки запроса.
✍️ Вывод: все очереди должны быть ограничены. А подходящие размеры вы подбираете сами, опираясь на знания о вашем сервисе и требования к обработке трафика.
🎃 Получается, очереди ничем не могут нам помочь и только откладывают неизбежное?
Это не так. Буферизация запросов очень полезна в моменты временных повышений нагрузки, она помогает “придержать” в очередях избыточное количество трафика и обработать его позже, когда нагрузка спадет, не потеряв запросы. Однако вам придется принять решение, как поступать при переполнении очередей. Стратегии могут быть различные от выкидывания запросов до сохранения их в персистентные хранилища для последующей обработки. Если вы используете
ExecutorService, то можете определить вашу собственную стратегию при переполнении очереди с помощью класса RejectedExecutionHandler или использовать одну из дефолтных стратегий.#highload #queues
👍13🔥6❤3😍1
Forwarded from Oleg
А что делать в ситуации, когда сервис набрал запросов в очередь и упал? А нам как раз важно не терять запросы
❤5
☝🏽 Зависит от природы очереди. Если это очередь в памяти приложения (например, очередь ExecutorService), то вместе с падением приложения потеряется и содержимое очереди (можно что-то накрутить, но никакой гарантии не будет).
🚌 Чтобы избежать такой ситуации можно, например, помещать трафик в персистентное хранилище (Kafka, RabitMQ в персистентном режиме, или просто БД), вычитывать оттуда в темпе, комфортном для приложения, обрабатывать, а потом "коммитить" сообщения, то есть отмечать в очереди, что сообщение обработано (ну или вообще удалять). В Kafka для этого есть возможность делать "коммит (acknowlegment)", сохранять индекс последнего обработанного сообщения в партиции. В Rabbit тоже есть механизм consumer acknowlegment. В базе данных это может быть просто флаг isProcessed, например.
🪨 Есть даже вариант сохранять все сообщения прямо на файловую систему во втроенную БД (например, RocksDB). Или в очередь Redis (c включенным persistent режимом), хотя там есть свои нюансы.
🔐 Если же не хочется терять много сообщений, но чуть-чуть не страшно, то можно просто ограничивать размеры очередей, чтобы не потерять миллионы скопившихся сообщений.
🔩 Ну и ограничивать каким-то образом ваших клиентов, чтобы не "убивали" вас потоком сообщений. Для этого есть механизмы back-pressure.
- Вы можете установить лимиты скорости - rate limits (количество сообщений в единицу времени), на параллельные запросы (пример, не более 10 параллельных запросов от отного клиента в каждый момент), лимиты на входящие коннекты и тд.
- В хорошем случае вы можете давать клиенту фидбек ("я готов принять столько-то сообщений" / "стоп, пока не шли") и тд. А в идеальном случае вы сами сможете контроллировать поток сообщений "вычитывая" (read/pull) сообщения от клиента. Вроде того, как вы вычитываете из Кафки. Тут вы прямо минимизируете количество трафика в памяти и соответственно потенциальные потери.
💎 Ну и самое важное в вашем случае - чтобы клиент (тот сервис, который вам послал эти сообщения) имел механизм retry, помнил, какие сообщение не были обработаны и повторял попытки. В общем, сам обо всем заботился :)
🚌 Чтобы избежать такой ситуации можно, например, помещать трафик в персистентное хранилище (Kafka, RabitMQ в персистентном режиме, или просто БД), вычитывать оттуда в темпе, комфортном для приложения, обрабатывать, а потом "коммитить" сообщения, то есть отмечать в очереди, что сообщение обработано (ну или вообще удалять). В Kafka для этого есть возможность делать "коммит (acknowlegment)", сохранять индекс последнего обработанного сообщения в партиции. В Rabbit тоже есть механизм consumer acknowlegment. В базе данных это может быть просто флаг isProcessed, например.
🪨 Есть даже вариант сохранять все сообщения прямо на файловую систему во втроенную БД (например, RocksDB). Или в очередь Redis (c включенным persistent режимом), хотя там есть свои нюансы.
🔐 Если же не хочется терять много сообщений, но чуть-чуть не страшно, то можно просто ограничивать размеры очередей, чтобы не потерять миллионы скопившихся сообщений.
🔩 Ну и ограничивать каким-то образом ваших клиентов, чтобы не "убивали" вас потоком сообщений. Для этого есть механизмы back-pressure.
- Вы можете установить лимиты скорости - rate limits (количество сообщений в единицу времени), на параллельные запросы (пример, не более 10 параллельных запросов от отного клиента в каждый момент), лимиты на входящие коннекты и тд.
- В хорошем случае вы можете давать клиенту фидбек ("я готов принять столько-то сообщений" / "стоп, пока не шли") и тд. А в идеальном случае вы сами сможете контроллировать поток сообщений "вычитывая" (read/pull) сообщения от клиента. Вроде того, как вы вычитываете из Кафки. Тут вы прямо минимизируете количество трафика в памяти и соответственно потенциальные потери.
💎 Ну и самое важное в вашем случае - чтобы клиент (тот сервис, который вам послал эти сообщения) имел механизм retry, помнил, какие сообщение не были обработаны и повторял попытки. В общем, сам обо всем заботился :)
🔥14👍5🍾2
Кейс с собеседования
⬇️ Ниже приведен фрагмент кода. Представьте, что этот код работает у вас на продакшене прямо сейчас.
Один поток
🦥 В какой-то момент вы замечаете, что маппинг не происходит: программа перестает писать в файл info-логи. В логах нет никаких ошибок. Вы проверяете ваш java-процесс с помощью
👩🏻⚕️ Вы не знаете воспроизведется ли в следующий раз такая ситуация, поэтому не хотите останавливать процесс. Ваша задача придумать способ провести диагностику происходящего "на лету", здесь и сейчас. Что бы вы сделали?
🥟 Не спешите открывать ответ, подумайте, какие варианты вы бы могли предложить на собеседовании. Пишите ваши варианты в комментарии. Ответ, который я ожидал бы услышать:thread dump. Подробности в следующем посте.
💡 Попробуйте проанализировать код, попытайтесь найти обстоятельства, которые привели к симптомам, описанным выше. Какие изменения вы бы внесли в код для предотвращения подобных ситуаций? Пишите в комменты!
#java #kotlin #threads #case #interview
⬇️ Ниже приведен фрагмент кода. Представьте, что этот код работает у вас на продакшене прямо сейчас.
Один поток
Producer генерирует последовательность целых чисел. Несколько потоков Consumer получают эти числа из очереди и производят маппинг этих чисел на другие.class Consumer(
private val codesQueue: BlockingQueue<Int>,
private val codeMappings: List<Int>
) {
@Volatile
var isActive: Boolean = true
fun consume() {
while (isActive || codesQueue.isNotEmpty()) {
val code = codesQueue.take()
log.info("Code $code mapped to ${codeMappings[code]}")
// do something with errorCodeMapping
}
}
}
class Producer(private val codesQueue: BlockingQueue<Int>) {
@Volatile
var isActive: Boolean = true
fun supply() {
while (isActive) {
codesQueue.put(Random().nextInt())
}
}
}
🦥 В какой-то момент вы замечаете, что маппинг не происходит: программа перестает писать в файл info-логи. В логах нет никаких ошибок. Вы проверяете ваш java-процесс с помощью
ps -aux | grep java, он жив — то есть программа не завершилась.👩🏻⚕️ Вы не знаете воспроизведется ли в следующий раз такая ситуация, поэтому не хотите останавливать процесс. Ваша задача придумать способ провести диагностику происходящего "на лету", здесь и сейчас. Что бы вы сделали?
🥟 Не спешите открывать ответ, подумайте, какие варианты вы бы могли предложить на собеседовании. Пишите ваши варианты в комментарии. Ответ, который я ожидал бы услышать:
💡 Попробуйте проанализировать код, попытайтесь найти обстоятельства, которые привели к симптомам, описанным выше. Какие изменения вы бы внесли в код для предотвращения подобных ситуаций? Пишите в комменты!
#java #kotlin #threads #case #interview
🔥11👍5❤3🤝1
Thread dump и анализ кейса
📸 Дампом потоков называют снимок состояния всех текущих потоков JVM. Он очень полезен для диагностики проблем с производительностью, блокировками, утечками ресурсов. Дамп представляет из себя текстовый файл, где для каждого из существующих потоков java-процесса вы можете найти его состояние (RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED) и полный стек вызовов.
⬆️ Вспоминаем ситуацию, описанную в предыдущем посте:
🔧 В этом случае для диагностики вы можете прибегнуть к thread dump. Примеры утилит, которые помогут вам сделать снимок потоков: jstack, jvisualvm, JMC (Java Mission Control), jcmd. Ниже пример команды для утилиты jstack, на вход необходимо передать
🧨 Ниже приведен фрагмент thread dump, представляющий интересующий нас поток-producer.
🧐 Как мы видим, поток находится в состоянии перманентного (не ограниченного временем) ожидания (
🔍 Очень подозрительно, но мы не можем найти дамп потока
🕵🏽 Поток завершился в следствие появления неожиданного исключения. Это могло произойти в следующей строке:
🤯 А исключения мы могли не увидеть в логах по нескольким причинам. Например, вызывающий нас код мог поймать исключение, завершить поток и не записать об этом в лог. Или же был использован базовый обработчик исключения в потоке (
🥇 Какие выводы мы можем сделать?
1. Следует оборачивать происходящее в потоке в глобальный обработчик исключений try-catch, логировать перехваченные исключения и поддерживать работу потока, чтобы приложение могло прогрессировать даже вопреки ошибочным ситуациям.
2. Следует явно ограничивать временем методы, которые блокируют поток исполнения или использовать их варианты/альтернативы с таймаутом!
#java #kotlin #threads #interview #case
📸 Дампом потоков называют снимок состояния всех текущих потоков JVM. Он очень полезен для диагностики проблем с производительностью, блокировками, утечками ресурсов. Дамп представляет из себя текстовый файл, где для каждого из существующих потоков java-процесса вы можете найти его состояние (RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED) и полный стек вызовов.
⬆️ Вспоминаем ситуацию, описанную в предыдущем посте:
Программа перестает писать info-логи в файл. В логах нет никаких ошибок. Вы проверяете ваш java-процесс с помощью ps -aux | grep java, - он жив, то есть программа не завершилась.
🔧 В этом случае для диагностики вы можете прибегнуть к thread dump. Примеры утилит, которые помогут вам сделать снимок потоков: jstack, jvisualvm, JMC (Java Mission Control), jcmd. Ниже пример команды для утилиты jstack, на вход необходимо передать
pid java-процесса.jstack <pid> > threaddump.txt
🧨 Ниже приведен фрагмент thread dump, представляющий интересующий нас поток-producer.
"Thread-0" #16 daemon prio=5 os_prio=31 cpu=0.97ms elapsed=3.79s tid=0x00007fd90d0ed800 nid=0x5f03 waiting on condition [0x000070000a578000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@17.0.9/Native Method)
. . . . . . .
at java.util.concurrent.LinkedBlockingQueue.put(java.base@17.0.9/LinkedBlockingQueue.java:343)
at ru.quipy.Producer.supply(Example.kt:30)
at ru.quipy.ExampleKt.main$lambda-4(Example.kt:48)
at ru.quipy.ExampleKt$$Lambda$32/0x0000000132014658.run(Unknown Source)
at java.lang.Thread.run(java.base@17.0.9/Thread.java:840)
java.lang.Thread.State: WAITING) в методе LinkedBlockingQueue.put. Если вы находитесь на собеседовании, то можете сказать, что лучше использовать аналогичный метод LinkedBlockingQueue.offer, который не блокируется навечно, а использует ограниченную временем блокировку, что даст нам возможность выйти из ожидания, понять, что операция не удалась и сделать об этом запись в лог. Аналогично и для метода LinkedBlockingQueue.take: вообще все блокирущие методы стоит ограничивать временем, тогда не возникнет ситуация с блокировкой потока при завершении программы, которую вы описали в комментах к предыдущему посту.🔍 Очень подозрительно, но мы не можем найти дамп потока
consumer из чего может следовать вывод, что такового в JVM нет. Вероятно, данный поток уже завершился. Это объясняет, почему он больше не делает записи в логи и почему поток supplier не может поместить ничего в очередь: она достигла максимального размера из-за отсутствия потока-потребителя. 🕵🏽 Поток завершился в следствие появления неожиданного исключения. Это могло произойти в следующей строке:
log.info("Consumed $code. Mapped to ${codeMappings[code]}"), если элемента с индексом code в списке не существует.interface UncaughtExceptionHandler), дефолтная версия которого (class ThreadGroup) просто делает запись в System.err, который мог не перенаправляться в лог.1. Следует оборачивать происходящее в потоке в глобальный обработчик исключений try-catch, логировать перехваченные исключения и поддерживать работу потока, чтобы приложение могло прогрессировать даже вопреки ошибочным ситуациям.
2. Следует явно ограничивать временем методы, которые блокируют поток исполнения или использовать их варианты/альтернативы с таймаутом!
#java #kotlin #threads #interview #case
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥18👍4❤2🤝1
👨🏻🎓 Всем привет! Меня зовут Андрей Суховицкий. Я tech lead и лектор Университета ИТМО. На этом канале я рассказываю про backend. С предложениями тем, фидбеком пишите: @sukhoa
🔥 Основные темы: kotlin, java, coroutines, многопоточное программирование, system design, реализация высоконагруженных и надежных систем.
Вы можете проголосовать за канал
_______________________________
Мой Youtube-канал
Мой курс по event-sourcing. Промокод TG_TEAM на -30%. Сейчас доступен только тариф без ментора.
_______________
Навигация:
#java, #kotlin - посты, релевантные для разработчика на этих языках
#threads - многопоточка и другое полезное о потоках
#highload - все, связанное с реализацией высоконагруженных систем
#coroutines - связанное с kotlin-coroutines, их применением и реализацией
#queues - кейсы, связанные с различными видами очередей - in-memory, брокеры и тд
#case - описание реальных ситуаций и их траблшутинг
#interview - потенциальные вопросы на собеседовании, анализ, разбор
#metrics_basics - основы сбора, хранения и визуализации метрик, prometheus
🔥 Основные темы: kotlin, java, coroutines, многопоточное программирование, system design, реализация высоконагруженных и надежных систем.
Вы можете проголосовать за канал
_______________________________
Мой Youtube-канал
Мой курс по event-sourcing. Промокод TG_TEAM на -30%. Сейчас доступен только тариф без ментора.
_______________
Навигация:
#java, #kotlin - посты, релевантные для разработчика на этих языках
#threads - многопоточка и другое полезное о потоках
#highload - все, связанное с реализацией высоконагруженных систем
#coroutines - связанное с kotlin-coroutines, их применением и реализацией
#queues - кейсы, связанные с различными видами очередей - in-memory, брокеры и тд
#case - описание реальных ситуаций и их траблшутинг
#interview - потенциальные вопросы на собеседовании, анализ, разбор
#metrics_basics - основы сбора, хранения и визуализации метрик, prometheus
Telegram
Канал Андрея про бекенд
Проголосуйте за канал, чтобы он получил больше возможностей.
🤝7🔥1
Канал Андрея про бекенд pinned «👨🏻🎓 Всем привет! Меня зовут Андрей Суховицкий. Я tech lead и лектор Университета ИТМО. На этом канале я рассказываю про backend. С предложениями тем, фидбеком пишите: @sukhoa 🔥 Основные темы: kotlin, java, coroutines, многопоточное программирование, system…»
Давайте потокам осмысленные имена
🚨 Это архиважная вещь в промышленной разработке на java. Вы могли заметить, что необходимый нам поток из поста выше 👆🏼носит название
🔍 Крупные сервисы могут иметь сотни и даже тысячи потоков. Может быть очень сложно найти среди них именно те, что нужны вам. Но задача будет гораздо проще, если вы будете давать осмысленные имена потокам и пулам потоков, которые работают над одним типом задач.
🧵 Давайте начнем с одиночных потоков. Конечно, мы не часто создаем потоки через конструктор класса Thread, но полезно знать, что этот конструктор позволяет вам дать потоку имя.
🏭Теперь давайте перейдем к пулам потоков. Статический метод
🚲 На вход классу🥇 🥈 🥉 . Теперь давайте передадим эту фабрику пулу:
🎯 Вуаля, теперь в thread dump ваши потоки будут иметь осмысленные имена и вам не придется тратить уйму времени на изучение стека ненужных потоков!
#java #kotlin #threads
🚨 Это архиважная вещь в промышленной разработке на java. Вы могли заметить, что необходимый нам поток из поста выше 👆🏼носит название
Thread-0 . Я без труда нашел его в thread dump только потому, что программа была мала. Но даже ее дамп включал 21 поток, подавляющее большинство которых было служебными потоками JVM (в основном потоками сборщика мусора).🔍 Крупные сервисы могут иметь сотни и даже тысячи потоков. Может быть очень сложно найти среди них именно те, что нужны вам. Но задача будет гораздо проще, если вы будете давать осмысленные имена потокам и пулам потоков, которые работают над одним типом задач.
🧵 Давайте начнем с одиночных потоков. Конечно, мы не часто создаем потоки через конструктор класса Thread, но полезно знать, что этот конструктор позволяет вам дать потоку имя.
val producerThread = Thread(null, { producer.supply() }, "producer-thread")
🏭Теперь давайте перейдем к пулам потоков. Статический метод
Executors.newFixedThreadPool, через который удобно создавать пулы потоков, принимает на вход необходимый размер пула и еще один интересный параметр с типом ThreadFactory. У этого интерфейса всего один метод Thread newThread(Runnable task). Пул потоков делегирует переданному объекту ThreadFactory задачу создания потоков. Давайте реализуем свой вариант ThreadFactory, который именует потоки необходимым образом и передадим такой объект нашему пулу:class NamedThreadFactory(private val prefix: String) : ThreadFactory {
private val sequence = AtomicInteger(1)
override fun newThread(r: Runnable): Thread {
val thread = Thread(r)
val seq = sequence.getAndIncrement()
thread.name = prefix + (if (seq > 1) "-$seq" else "")
return thread
}
}
🚲 На вход классу
NamedThreadFactory передаем строку, которая будет являться префиксом имени всех потоков, созданных этой фабрикой. С помощью атомарного каунтера sequence мы сможем создавать уникальную часть имени для наших потоковval httpCallsExecutor = Executors.newFixedThreadPool(16, NamedThreadFactory(“http-calls-pool”))
🎯 Вуаля, теперь в thread dump ваши потоки будут иметь осмысленные имена и вам не придется тратить уйму времени на изучение стека ненужных потоков!
#java #kotlin #threads
Please open Telegram to view this post
VIEW IN TELEGRAM
👍16🔥4🤝1
Использование Thread.sleep для тестирования асинхронных программ
📲 Тестируем сервис отправки push-уведомлений. Внешние процессы вызвают его, чтобы отправить push-уведомление c заданным текстом на мобильное устройство. Сервис возвращает идентификатор уведомления, по которому можно проверить его статус. Само уведомление может быть отправлено позже, для этого наш сервис совершает http-вызов к внешнему провайдеру.
Как будем тестировать?
👺 Если тестирование модульное, то, вероятно, мы хотим сделать
⏳ Между вызовом сервиса и проверкой прошло очень мало времени, запрос мог провести его в буферах-очередях, ждать выполнения http-вызова, или выполнения запроса в БД. Вероятность этого еще выше, если тест интеграционный - сразу после самбита уведомления мы пытаемся получить его статус из базы данных, но он не успевает измениться.
⏰ Чтобы сделать тест "зеленым", частенько выбирают некоторую “взятую с потолка” константу, например, 5 секунд и помещают инструкцию, которая заставляет поток "уснуть", между вызовом сервиса и проверкой:
Как правило, это решает проблему прохождения теста, но генерирует несколько новых:
1. Пяти секунд может быть недостаточно в определенных обстоятельствах и тест будет падать, но не всегда, а, например, в 1% случаев. Не так много, чтобы переписать тест, но достаточно, чтобы с завидной регулярностью фейлить весь билд и раздражать😤 . То есть вы своими руками делаете свои тесты недетерминированными, flucky тестами.
2. Вторая причина часто более заметна для команды. Инструкция🤔 .
Ставьте 🔥, если интересно прочитать про решение данной проблемы. Кто уже сталкивался - кидайте в комментарии ваши методы и интересные библиотеки, которые используются в вашей команде.
#kotlin #java #async
📲 Тестируем сервис отправки push-уведомлений. Внешние процессы вызвают его, чтобы отправить push-уведомление c заданным текстом на мобильное устройство. Сервис возвращает идентификатор уведомления, по которому можно проверить его статус. Само уведомление может быть отправлено позже, для этого наш сервис совершает http-вызов к внешнему провайдеру.
interface NotificationService {
fun submitNotification(deviceId: UUID, text: String): UUID
}
Как будем тестировать?
👺 Если тестирование модульное, то, вероятно, мы хотим сделать
mock для http-client-a, с помощью которого будет посылаться уведомление и проверить, что один из его методов был вызван. Аналогично можно сделать mock классов доступа к базе данных и проверить, что статус оповещения был обновлен. Однако, если написать такой код, то может оказаться, что тест в большом количестве случаев не проходит:notificationService.submitNotification(deviceId, “Hello”)
verify(dbService, times(1)).updateStatus(any());
⏳ Между вызовом сервиса и проверкой прошло очень мало времени, запрос мог провести его в буферах-очередях, ждать выполнения http-вызова, или выполнения запроса в БД. Вероятность этого еще выше, если тест интеграционный - сразу после самбита уведомления мы пытаемся получить его статус из базы данных, но он не успевает измениться.
⏰ Чтобы сделать тест "зеленым", частенько выбирают некоторую “взятую с потолка” константу, например, 5 секунд и помещают инструкцию, которая заставляет поток "уснуть", между вызовом сервиса и проверкой:
notificationService.submitNotification(deviceId, “Hello”)
Thread.sleep(5000L) // 5000 milliseconds
verify(dbService, times(1)).updateStatus(any());
Как правило, это решает проблему прохождения теста, но генерирует несколько новых:
1. Пяти секунд может быть недостаточно в определенных обстоятельствах и тест будет падать, но не всегда, а, например, в 1% случаев. Не так много, чтобы переписать тест, но достаточно, чтобы с завидной регулярностью фейлить весь билд и раздражать
2. Вторая причина часто более заметна для команды. Инструкция
Thread.sleep(5000L) выставляет нижнюю границу выполнения теста равной пяти секундам, тогда как в удачном сценарии тест мог занять 10, 50, 100, 200 миллисекунд, секунду. Что, если у вас 200 тестов? Время выполнения 1000 секунд или около 16 минут, тогда как в удачном сценарии ваш билд мог занять во много раз меньше - 1, 2 … 4 минуты. Часто проблема нарастает очень плавно с увеличением количества тестов и неопытные команды не могут понять, в чем причина увеличения времени сборки Ставьте 🔥, если интересно прочитать про решение данной проблемы. Кто уже сталкивался - кидайте в комментарии ваши методы и интересные библиотеки, которые используются в вашей команде.
#kotlin #java #async
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥68❤2👍1🤝1
Как “не спать” при тестировании асинхронных программ.
🛌 В предыдущем посте я описал мотивацию использования
⏳ Проблему для нас создает тот факт, что мы не знаем, сколько времени в действительности займет тестируемая операция (от выполнения к выполнению ее длительность может варьироваться). Поэтому приходится подбирать некую константу, которая в большинстве случаев превышает максимальную длительность операции. Пример: отправка оповещения в среднем занимает около 150ms, но в 3% случаев превышает 5 секунд.
⏰ Нам бы хотелось, чтобы поток ожидал примерно столько же, сколько в действительности выполняется тестируемая операция. Если ее фактическая длительность 150ms, то поток должен находиться в ожидании (150ms + некоторая фиксированная константа, например, еще 50ms). Если операция выполнялась 5s, то и время ожидания должно быть 5s + 50ms.
Для этого необходим механизм, умеющий с заданной периодичностью проверять является ли некоторое условие истинным. Если условие не выполняется, то “засыпать” на короткий промежуток времени, если условие выполняется, то выходить из ожидания.
🔔 Пример: клиент отправляет уведомление. Мы хотим протестировать, что после отправки его статус в базе данных будет обновлен на
Давайте посмотрим на пример кода, где в качестве такого механизма используется библиотека Awaitility.
⚙️ У библиотеки есть большое количество конфигурируемых параметров. Например:
1. Период времени между проверкой условия (poll interval)
2. Начальная задержка перед первой проверкой
3. Минимальное время ожидания (например, вы хотите, чтобы оповещение отправлялось не раньше, чем через 5 секунд)
4. Максимальное время ожидания, за которое ваше условие должно выполниться
5. Игнорируемые исключения
6. Пул потоков, на котором будет выполняться тестирование
🐈⬛ В библиотеке также присутствует модуль для Kotlin, который помогает использовать API библиотеки в конструкциях, похожих на естественный язык, используя инфиксные функции.
🏋️♀️ Вы можете для тренировки написать собственный небольшой инструмент для ожидания (на java, kotlin, kotlin-coroutines) и поделиться кодом в комментариях 😊
#java #kotlin #async
🛌 В предыдущем посте я описал мотивацию использования
Thread.sleep при тестировании асинхронного кода и связанные с этим проблемы. Однако, пассивное ожидание потока очень помогает нам при тестировании асинхронного кода, ведь тесту нужно дождаться завершения асинхронной операции, чтобы начать проводить различные проверки.Thread.sleep(5_000) даст нам зеленый тест в c 97% вероятностью.Для этого необходим механизм, умеющий с заданной периодичностью проверять является ли некоторое условие истинным. Если условие не выполняется, то “засыпать” на короткий промежуток времени, если условие выполняется, то выходить из ожидания.
processed. Будем с периодичностью в 50ms делать запрос в базу данных, получать статус и, если он еще не изменен на processed, то ждать следующие 50ms. Нам станет известно, что статус обновился, через кратчайший промежуток времени.Давайте посмотрим на пример кода, где в качестве такого механизма используется библиотека Awaitility.
id = service.submitNotification(deviceId, “Hello”)
with()
.pollInterval(50_MILLISECONDS)
.await()
.atMost(8, SECONDS)
.until(notificationStatusIsUpdated());
1. Период времени между проверкой условия (poll interval)
2. Начальная задержка перед первой проверкой
3. Минимальное время ожидания (например, вы хотите, чтобы оповещение отправлялось не раньше, чем через 5 секунд)
4. Максимальное время ожидания, за которое ваше условие должно выполниться
5. Игнорируемые исключения
6. Пул потоков, на котором будет выполняться тестирование
#java #kotlin #async
Please open Telegram to view this post
VIEW IN TELEGRAM
👍25🔥8👎3
Траблшутинг кейс
💣 У вас есть простой сервис, который принимает входящие http-запросы, в среднем около 1к rps (requests per second). Он добавляет к запросу небольшое количество метаданных и пересылает по HTTP далее, в следующий сервис.
👀 Вы заметили, что сервис стал хуже справляться с пиковыми нагрузками: 6k rps. Симптомы следующие:
1. При увеличении входящего трафика начинают скапливаться внутренние очереди
2. Время выполнения http запроса в следующий сервис возрастает в разы или даже на порядки. (В норме среднее
3. Возрастает значение метрики количества используемых JVM файловых дескрипторов (рост примерно со 100 до 170).
4. Возрастают также и другие показатели: RAM, CPU.
🔎 Давайте исследовать:
1. Подозрение падает на следующий в цепочке сервис. Мы предполагаем, что под нагрузкой время его ответа увеличивается, и это отражается на нашем приложении.
🏓Команда сервиса опровергает эту догадку, предъявляя нам метрики, где видно: среднее время обработки запроса на их стороне оставалось стабильным в течение всего времени.
🧑🏻🔬 Мы проводим и свой (не самый репрезентативный) эксперимент - во время пиковой нагрузки, используя какой-нибудь http-client, например,
2. Рост файловых дескрипторов. Вспоминаем, что в Linux TCP коннекция является файлом, то есть с ростом числа подключений растет и количество захваченных дескрипторов.
📡 Кто может раздувать количество соединений? - Http-client. Зачем? Из-за особенности протокола
3. Зная эту особенность HTTP, давайте считать пропускную способность:
📲 Длительность запроса
4. Мы можем посчитать, сколько нужно подключений, чтобы сервис выдержал 6к запросов без накапливания очереди.
⚖️ Итак
⚙️ Что можно сделать
1. Увеличить количество подключений
2. Перейти на протокол HTTP версии 2. Он лишен такого недостатка, как блокирующиеся коннекции. С помощью процесса мультиплексирования HTTP 2 дает возможность одновременно передавать и получать данные большого количества запросов, используя даже единственное соединение. Но переход на него в данном случае возможен только, если вы можете перевести на него и следующий в цепочке сервис, что не всегда в вашей власти.
Ставьте 🔥, если такой формат постов кажется вам интересным :)
#http #case #interview
💣 У вас есть простой сервис, который принимает входящие http-запросы, в среднем около 1к rps (requests per second). Он добавляет к запросу небольшое количество метаданных и пересылает по HTTP далее, в следующий сервис.
1. При увеличении входящего трафика начинают скапливаться внутренние очереди
2. Время выполнения http запроса в следующий сервис возрастает в разы или даже на порядки. (В норме среднее
30ms, при нагрузке начинает нарастать и растет вплоть до 20-30s)3. Возрастает значение метрики количества используемых JVM файловых дескрипторов (рост примерно со 100 до 170).
4. Возрастают также и другие показатели: RAM, CPU.
🔎 Давайте исследовать:
1. Подозрение падает на следующий в цепочке сервис. Мы предполагаем, что под нагрузкой время его ответа увеличивается, и это отражается на нашем приложении.
🏓Команда сервиса опровергает эту догадку, предъявляя нам метрики, где видно: среднее время обработки запроса на их стороне оставалось стабильным в течение всего времени.
🧑🏻🔬 Мы проводим и свой (не самый репрезентативный) эксперимент - во время пиковой нагрузки, используя какой-нибудь http-client, например,
curl, делаем несколько запросов к их сервису с нашей машины и видим, что время ответа около 30ms.2. Рост файловых дескрипторов. Вспоминаем, что в Linux TCP коннекция является файлом, то есть с ростом числа подключений растет и количество захваченных дескрипторов.
📡 Кто может раздувать количество соединений? - Http-client. Зачем? Из-за особенности протокола
HTTP1. Запросы в нем блокируют подключение: пока один запрос не будет выполнен, следующий запрос не отправляется.3. Зная эту особенность HTTP, давайте считать пропускную способность:
📲 Длительность запроса
30ms, то есть одно подключение может за секунду обрабатывать 1000ms/30ms≈33.3 запроса. Чтобы удовлетворить обычную нагрузку на сервер в 1к, нужно около 1000rps/33.3req ≈ 30 подключений к следующему сервису. Мы знаем, что у нас было 100 дескрипторов, но не знаем, какие из них были подключениями.4. Мы можем посчитать, сколько нужно подключений, чтобы сервис выдержал 6к запросов без накапливания очереди.
6000 / 33.33 будет ≈ 180 подключений. У нас же есть около 170 (и мы не знаем, сколько из них действительно наши). В любом случае у нас будет накапливаться очередь внутри http-client, в которой запросы могут проводить долгое время, особенно в период повышенной нагрузки.1. Увеличить количество подключений
per host. Почти у всех http-клиентов есть возможность ограничить количество подключений к одному хосту. Часто это совсем небольшое значение. Например, в OkHttp client или AsyncHttpClient оно по умолчанию равно 5. И все клиенты (почти) дают возможность выставлять кастомное значение этого параметра, например:AsyncHttpClientConfig.Builder.setMaxConnectionsPerHost(maxConnectionsPerHost)
2. Перейти на протокол HTTP версии 2. Он лишен такого недостатка, как блокирующиеся коннекции. С помощью процесса мультиплексирования HTTP 2 дает возможность одновременно передавать и получать данные большого количества запросов, используя даже единственное соединение. Но переход на него в данном случае возможен только, если вы можете перевести на него и следующий в цепочке сервис, что не всегда в вашей власти.
Ставьте 🔥, если такой формат постов кажется вам интересным :)
#http #case #interview
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥118👍8🤝1
Недавно у меня возникла следующая задача: необходимо запросить данные у трех сервисов, но достаточно получить первый валидный ответ от любого их них, остальные 2 вызова можно не ожидать.
🤝 Интерфейс вызова следующий:
🤯 Я попытылся написать это на java, но ничего адекватного по количеству строк и общей читаемости кода у меня не получилось.
🐣 Решение я нашел в использовании корутин. Первое, что я сделал: превратил
🪢 Далее я использовал интересную функцию -
🍎 Таким образом, моя задача сводится к довольно простой вещи - поместим все наши вызовы в список и передадим функции
💩 В реальности сервисы могут кидать исключения или возвращать пустой результат
🚨 И тут надо помнить еще один интересный нюанс: если вы снова передадите в
Финальное решение выглядит как-то так (за исключением try-catch, чтобы сократить):
☄️ Отмечу еще, что select бывает невероятно полезен в сочетании с использованием каналов (kotlinx.coroutines.Channel). Канал - аналог BlockingQueue в мире корутин, позже выложу пост о них. Если у вас есть задача мониторить множество очередей на предмет новых элементов и сразу же передавать их на обработку, то лучше средства не найти.
🪐 Интересно, что функция
Ставьте 🔥, если было интересно. Если вы знаете код на java, который решает аналогичную задачу и является плюс-минус таким же лаконичным, делитесь в комментах!
#kotlin #coroutines #java
🤝 Интерфейс вызова следующий:
fun performCall(key: Key) : CompletableFuture<Value>
🐣 Решение я нашел в использовании корутин. Первое, что я сделал: превратил
CompletableFuture<Value> в Deferred<Value>, используя extension-функцию public fun <T> CompletionStage<T>.asDeferred(): Deferred<T>. Это необходимо, чтобы переместить наш код в плоскость корутин. 🪢 Далее я использовал интересную функцию -
select. Ей на вход можно передать любое количество “ожидающих (suspended)” функций и select будет ждать, пока любая из этих функций не вернет результат. Тогда исполнение продолжится и select вернет вам результат этой "победившей" корутины. 🍎 Таким образом, моя задача сводится к довольно простой вещи - поместим все наши вызовы в список и передадим функции
select, а она вернет нам первый полученный результат. Это код, который нам нужен:val calls = services.map { performCall(key).asDeferred() }.toList()
val res = select {
calls.forEach { call ->
call.onAwait { it } // it это результат вызова
}
}
null. А нам необходимо дождаться именно первого валидного результата. Мы можем сделать следующее - проанализируем результат и в случае null будем снова заходить в select. 🚨 И тут надо помнить еще один интересный нюанс: если вы снова передадите в
select тот же самый список, то он повторно выдаст вам результат ошибочного вызова, ведь он и правда завершен :) Поэтому давайте удалять из списка те вызовы, которые выполнились и не являются валидными. Только не забывайте поместить их в thread-safe контейнер, иначе легко словите ConcurrentModificationException. Финальное решение выглядит как-то так (за исключением try-catch, чтобы сократить):
val calls = services.map { performCall(key).asDeferred() }.toCollection(ConcurrentHashMap.newKeySet())
var res: Value? = null
while (calls.isNotEmpty()) {
val res = select {
calls.forEach { call ->
call.onAwait { callRes ->
calls.remove(call)
callRes
}
}
}
if (res != null) break
}
☄️ Отмечу еще, что select бывает невероятно полезен в сочетании с использованием каналов (kotlinx.coroutines.Channel). Канал - аналог BlockingQueue в мире корутин, позже выложу пост о них. Если у вас есть задача мониторить множество очередей на предмет новых элементов и сразу же передавать их на обработку, то лучше средства не найти.
🪐 Интересно, что функция
select по свой сути является аналогом системного вызова select на основе которого реализован "неблокирующий" ввод-вывод. Он позволяет мониторить множество TCP соединений на предмет новых событий IO и превращать их в единый "уплотненный" или иначе "мультиплексированный" поток событий, что позволяет очень эффективно использовать ресурсы системы. О "мультиплексировании" уже говорили тут и будем говорить еще.Ставьте 🔥, если было интересно. Если вы знаете код на java, который решает аналогичную задачу и является плюс-минус таким же лаконичным, делитесь в комментах!
#kotlin #coroutines #java
Please open Telegram to view this post
VIEW IN TELEGRAM
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥37👍7❤3🆒1
🗒 У нас есть следующий фрагмент кода:
👀 Функция
🎃 При исполнении на одном потоке функция
🐌 Наша задача: увеличить пропускную способность, чтобы функция
🎈 На эту задачу есть несколько правильных ответов. Какие-то можно обосновать теоретическими расчетами, какие-то практическим экспериментом. Но гораздо интереснее, как бы вы стали рассуждать, если бы получили такую задачу на собеседовании. Так что пиши ваши рассуждения в комменты!
#interview #threads
fun task() {
val data = prepareData()
process(data)
}
👀 Функция
prepareData производит получение и подготовку данных для их последующей обработки функцией process. 🎃 При исполнении на одном потоке функция
prepareData занимает 5% времени выполнения, process 95%. Единственный поток позволяет выполнять 10 операций в секунду. Важно отметить, что функция prepare работает с разделяемыми данными и должна выполняться под локом в многопоточной среде.🐌 Наша задача: увеличить пропускную способность, чтобы функция
task могла выполняться 100 раз в секунду. Сколько потоков понадобится, чтобы достичь желаемого прироста?🎈 На эту задачу есть несколько правильных ответов. Какие-то можно обосновать теоретическими расчетами, какие-то практическим экспериментом. Но гораздо интереснее, как бы вы стали рассуждать, если бы получили такую задачу на собеседовании. Так что пиши ваши рассуждения в комменты!
#interview #threads
🔥11🤔4👍3
