Java 101: параллелизм в Java без проблем, часть 1

В связи с усложнением параллельных приложений многие разработчики обнаруживают, что низкоуровневые возможности потоковой передачи Java недостаточны для удовлетворения их потребностей в программировании. В таком случае, возможно, пришло время открыть для себя утилиты Java Concurrency Utilities. Начните с java.util.concurrentподробного введения Джеффа Фризена в структуру Executor, типы синхронизаторов и пакет Java Concurrent Collections.

Java 101: следующее поколение

Первая статья в этой новой серии JavaWorld знакомит с Java Date and Time API .

Платформа Java обеспечивает возможности низкоуровневой обработки потоков, которые позволяют разработчикам писать параллельные приложения, в которых одновременно выполняются разные потоки. Однако стандартная многопоточность Java имеет некоторые недостатки:

  • Низкого уровня Примитивы параллелизма в Java ( synchronized, volatile, wait(), notify(), и notifyAll()) не просто правильно использовать. Опасности потоковой передачи, такие как взаимоблокировка, нехватка потоков и состояния гонки, возникающие в результате неправильного использования примитивов, также трудно обнаружить и отладить.
  • Использование synchronizedкоординации доступа между потоками приводит к проблемам с производительностью, которые влияют на масштабируемость приложения, что является требованием для многих современных приложений.
  • Базовые возможности многопоточности Java находятся на слишком низком уровне. Разработчикам часто требуются конструкции более высокого уровня, такие как семафоры и пулы потоков, которых не предлагают возможности низкоуровневой потоковой передачи Java. В результате разработчики будут создавать свои собственные конструкции, что отнимает много времени и подвержено ошибкам.

Платформа JSR 166: Concurrency Utilities была разработана для удовлетворения потребности в средствах высокоуровневой обработки потоков. Основанная в начале 2002 года, структура была формализована и реализована двумя годами позже в Java 5. Усовершенствования последовали в Java 6, Java 7 и готовящейся к выпуску Java 8.

Эта состоящая из двух частей серия Java 101: следующее поколение знакомит разработчиков программного обеспечения, знакомых с основами потоковой передачи Java, в пакеты и среду Java Concurrency Utilities. В части 1 я представляю обзор инфраструктуры Java Concurrent Utilities и представляю ее среду Executor, утилиты синхронизатора и пакет Java Concurrent Collections.

Понимание потоков Java

Прежде чем погрузиться в эту серию статей, убедитесь, что вы знакомы с основами многопоточности. Начните с введения Java 101 в возможности низкоуровневой обработки потоков в Java:

  • Часть 1. Введение в потоки и исполняемые файлы
  • Часть 2: Синхронизация потоков
  • Часть 3. Планирование потоков, ожидание / уведомление и прерывание потока
  • Часть 4. Группы потоков, волатильность, локальные переменные потока, таймеры и смерть потока

Внутри утилит Java Concurrency

Платформа Java Concurrency Utilities - это библиотека типов , предназначенная для использования в качестве строительных блоков для создания параллельных классов или приложений. Эти типы поточно-ориентированы, тщательно протестированы и обладают высокой производительностью.

Типы в утилитах параллелизма Java организованы в небольшие структуры; а именно, среда Executor, синхронизатор, параллельные коллекции, блокировки, атомарные переменные и Fork / Join. Далее они разделены на основной пакет и пару подпакетов:

  • java.util.concurrent содержит типы утилит высокого уровня, которые обычно используются в параллельном программировании. Примеры включают семафоры, барьеры, пулы потоков и параллельные хэш-карты.
    • Java.util.concurrent.atomic подпакет содержит служебные классы низкого уровня, поддержку безблокировочного поточно-программирование на холостом переменных.
    • Java.util.concurrent.locks подпакет содержит типы коммунального низкого уровня для блокировки и ожидания условий, которые отличаются от использования синхронизации низкого уровня в Java и мониторов.

Платформа Java Concurrency Utilities также предоставляет аппаратные инструкции низкого уровня сравнения и замены (CAS) , варианты которых обычно поддерживаются современными процессорами. CAS намного легче, чем механизм синхронизации Java на основе монитора, и используется для реализации некоторых высокомасштабируемых параллельных классов. java.util.concurrent.locks.ReentrantLockНапример, класс на основе CAS более производительный, чем эквивалентный synchronizedпримитив на основе монитора . ReentrantLockпредлагает больший контроль над блокировкой. (В части 2 я подробнее объясню, как работает CAS java.util.concurrent.)

System.nanoTime ()

Фреймворк Java Concurrency Utilities включает long nanoTime(), который является членом java.lang.Systemкласса. Этот метод обеспечивает доступ к источнику времени с наносекундной детализацией для проведения измерений относительного времени.

В следующих разделах я расскажу о трех полезных функциях Java Concurrency Utilities, сначала объясняя, почему они так важны для современного параллелизма, а затем продемонстрирую, как они работают для повышения скорости, надежности, эффективности и масштабируемости параллельных приложений Java.

Фреймворк Executor

В потоках задача - это единица работы. Одна из проблем с низкоуровневой потоковой передачей в Java заключается в том, что отправка задачи тесно связана с политикой выполнения задачи, как показано в листинге 1.

Листинг 1. Server.java (Версия 1)

import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; class Server { public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(9000); while (true) { final Socket s = socket.accept(); Runnable r = new Runnable() { @Override public void run() { doWork(s); } }; new Thread(r).start(); } } static void doWork(Socket s) { } }

Приведенный выше код описывает простое серверное приложение ( doWork(Socket)для краткости оставлено пустым). Серверный поток постоянно вызывает socket.accept()ожидание входящего запроса, а затем запускает поток для обслуживания этого запроса, когда он поступает.

Поскольку это приложение создает новый поток для каждого запроса, оно плохо масштабируется при работе с огромным количеством запросов. Например, каждый созданный поток требует памяти, а слишком много потоков могут исчерпать доступную память, вынуждая приложение завершить работу.

Вы можете решить эту проблему, изменив политику выполнения задач. Вместо того, чтобы всегда создавать новый поток, вы можете использовать пул потоков, в котором фиксированное количество потоков будет обслуживать входящие задачи. Однако вам придется переписать приложение, чтобы внести это изменение.

java.util.concurrentвключает платформу Executor, небольшую структуру типов, которая отделяет отправку задачи от политик выполнения задачи. Используя среду Executor, можно легко настроить политику выполнения задач программы без необходимости значительного переписывания кода.

Внутри фреймворка Executor

Платформа Executor основана на Executorинтерфейсе, который описывает исполнителя как любой объект, способный выполнять java.lang.Runnableзадачи. Этот интерфейс объявляет следующий отдельный метод выполнения Runnableзадачи:

void execute(Runnable command)

Вы отправляете Runnableзадачу, передав ее пользователю execute(Runnable). Если исполнитель не может выполнить задачу по какой-либо причине (например, если исполнитель был выключен), этот метод выдаст файл RejectedExecutionException.

Ключевой концепцией является то, что отправка задачи не связана с политикой выполнения задачи , которая описывается Executorреализацией. Таким образом, выполняемая задача может выполняться через новый поток, объединенный поток, вызывающий поток и т. Д.

Обратите внимание, что Executorэто очень ограничено. Например, вы не можете завершить работу исполнителя или определить, завершена ли асинхронная задача. Вы также не можете отменить запущенную задачу. По этим и другим причинам платформа Executor предоставляет интерфейс ExecutorService, который расширяется Executor.

ExecutorServiceОсобого внимания заслуживают пять из методов:

  • boolean awaitTermination (long timeout, TimeUnit unit) блокирует вызывающий поток до тех пор, пока все задачи не завершат выполнение после запроса на завершение работы , не истечет время ожидания или текущий поток не будет прерван, в зависимости от того, что произойдет раньше. Максимальное время ожидания определяется timeout, и это значение выражается в unitединицах, указанных в TimeUnitперечислении; например TimeUnit.SECONDS,. Этот метод генерируется, java.lang.InterruptedExceptionкогда текущий поток прерывается. Он возвращает истину, когда исполнитель завершает работу, и ложь, когда тайм-аут истекает до завершения.
  • boolean isShutdown () возвращает истину, когда исполнитель был выключен.
  • void shutdown () инициирует упорядоченное завершение работы, при котором выполняются ранее отправленные задачи, но новые задачи не принимаются.
  • Future submit (Callable task) отправляет на выполнение задачу, возвращающую значение, и возвращает Futureожидающие результаты задачи.
  • Future submit (Runnable task) отправляет Runnableзадачу на выполнение и возвращает объект, Futureпредставляющий эту задачу.

FutureИнтерфейс представляет результат асинхронных вычислений. Результат известен как будущее, потому что он обычно не будет доступен до определенного момента в будущем. Вы можете вызывать методы для отмены задачи, возврата результата задачи (неопределенное ожидание или истечение тайм-аута, когда задача не завершена) и определять, была ли задача отменена или завершена.

CallableИнтерфейс похож на Runnableинтерфейс в том , что он обеспечивает единый метод , описывающий задачу для выполнения. В отличии от Runnable«S void run()методы, Callable» s V call() throws Exceptionметод может возвращать значение и бросить исключение.

Фабричные методы исполнителя

В какой-то момент вам захочется получить исполнителя. Платформа Executor предоставляет Executorsдля этой цели служебный класс. Executorsпредлагает несколько заводских методов для получения различных типов исполнителей, которые предлагают определенные политики выполнения потоков. Вот три примера:

  • ExecutorService newCachedThreadPool () создает пул потоков, который создает новые потоки по мере необходимости, но повторно использует ранее созданные потоки, когда они доступны. Потоки, которые не использовались в течение 60 секунд, завершаются и удаляются из кеша. Этот пул потоков обычно повышает производительность программ, которые выполняют множество краткосрочных асинхронных задач.
  • ExecutorService newSingleThreadExecutor () создает исполнитель, который использует один рабочий поток, работающий в неограниченной очереди - задачи добавляются в очередь и выполняются последовательно (одновременно может быть не более одной задачи). Если этот поток завершается из-за сбоя во время выполнения перед завершением работы исполнителя, будет создан новый поток, который займет его место, когда необходимо выполнить последующие задачи.
  • ExecutorService newFixedThreadPool (int nThreads) создает пул потоков, который повторно использует фиксированное количество потоков, работающих в общей неограниченной очереди. Большинство nThreadsпотоков активно обрабатывают задачи. Если дополнительные задачи отправляются, когда все потоки активны, они ждут в очереди, пока поток не станет доступным. Если какой-либо поток завершается из-за сбоя во время выполнения перед завершением работы, будет создан новый поток, который займет его место, когда необходимо выполнить следующие задачи. Потоки пула существуют до завершения работы исполнителя.

Каркасный Исполнитель предлагает дополнительные типы (например, ScheduledExecutorServiceинтерфейс), но типы вы, вероятно , работать с наиболее часто ExecutorService, Future, Callable, и Executors.

См. java.util.concurrentДокументацию Javadoc для изучения дополнительных типов.

Работа с фреймворком Executor

Вы обнаружите, что с фреймворком Executor довольно легко работать. В листинге 2 я использовал Executorи Executorsдля замены примера сервера из листинга 1 на более масштабируемую альтернативу на основе пула потоков.

Листинг 2. Server.java (Версия 2)

import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.Executor; import java.util.concurrent.Executors; class Server { static Executor pool = Executors.newFixedThreadPool(5); public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(9000); while (true) { final Socket s = socket.accept(); Runnable r = new Runnable() { @Override public void run() { doWork(s); } }; pool.execute(r); } } static void doWork(Socket s) { } }

В листинге 2 newFixedThreadPool(int)получен исполнитель на основе пула потоков, который повторно использует пять потоков. Он также заменяет new Thread(r).start();на pool.execute(r);для выполнения выполняемых задач через любой из этих потоков.

В листинге 3 представлен еще один пример, в котором приложение считывает содержимое произвольной веб-страницы. Он выводит результирующие строки или сообщение об ошибке, если содержимое недоступно в течение максимум пяти секунд.