CodeGym/Blog Java/Random-ES/Mejor juntos: Java y la clase Thread. Parte V: Ejecutor, ...
John Squirrels
Nivel 41
San Francisco

Mejor juntos: Java y la clase Thread. Parte V: Ejecutor, ThreadPool, Fork/Join

Publicado en el grupo Random-ES

Introducción

Entonces, sabemos que Java tiene hilos. Puede leer sobre eso en la revisión titulada Mejor juntos: Java y la clase Thread. Parte I — Hilos de ejecución . Mejor juntos: Java y la clase Thread.  Parte V: Ejecutor, ThreadPool, Fork/Join - 1Echemos otro vistazo al código típico:
public static void main(String[] args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Como puede ver, el código para iniciar una tarea es bastante típico, pero tenemos que repetirlo para la nueva tarea. Una solución es ponerlo en un método separado, por ejemplo execute(Runnable runnable), . Pero los creadores de Java consideraron nuestra situación y crearon la Executorinterfaz:
public static void main(String[] args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Este código es claramente más conciso: ahora simplemente escribimos código para iniciar el Runnablehilo. Eso es genial, ¿no? Pero esto es sólo el comienzo: Mejor juntos: Java y la clase Thread.  Parte V: Ejecutor, ThreadPool, Fork/Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Como puede ver, la Executorinterfaz tiene una ExecutorServicesubinterfaz. El Javadoc para esta interfaz dice que ExecutorServicedescribe un particular Executorque proporciona métodos para apagar el Executor. También permite obtener una java.util.concurrent.Futurepara realizar un seguimiento del proceso de ejecución. Previamente, en Mejor juntos: Java y la clase Thread. Parte IV: Callable, Future y amigos , revisamos brevemente las capacidades de Future. Si lo olvidaste o nunca lo leíste, te sugiero que refresques tu memoria ;) ¿Qué más dice el Javadoc? Nos dice que tenemos una java.util.concurrent.Executorsfábrica especial que nos permite crear implementaciones predeterminadas de ExecutorService.

EjecutorService

Revisemos. Tenemos Executorque ejecutar (es decir, invocar execute()) una determinada tarea en un subproceso, y el código que crea el subproceso está oculto para nosotros. Tenemos ExecutorService— un específico Executorque tiene varias opciones para controlar el progreso. Y tenemos la Executorsfábrica que nos permite crear un archivo ExecutorService. Ahora hagámoslo nosotros mismos:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Puede ver que especificamos un grupo de subprocesos fijos cuyo tamaño es 2. Luego, enviamos las tareas al grupo una por una. Cada tarea devuelve un correo electrónico Stringque contiene el nombre del subproceso ( currentThread().GetName()). Es importante cerrar el ExecutorServiceal final, porque de lo contrario nuestro programa no terminará. La Executorsfábrica tiene métodos de fábrica adicionales. Por ejemplo, podemos crear un grupo que consta de un solo subproceso ( newSingleThreadExecutor) o un grupo que incluye un caché ( newCachedThreadPool) del que se eliminan los subprocesos después de que estén inactivos durante 1 minuto. En realidad, estos ExecutorServiceestán respaldados por una cola de bloqueo , en la que se colocan las tareas y desde la cual se ejecutan las tareas. Puede encontrar más información sobre el bloqueo de colas en este video . También puedes leer estorevisión sobre BlockingQueue . Y consulte la respuesta a la pregunta "¿Cuándo preferir LinkedBlockingQueue sobre ArrayBlockingQueue?" En los términos más simples, un BlockingQueuebloquea un hilo en dos casos:
  • el hilo intenta obtener elementos de una cola vacía
  • el subproceso intenta poner elementos en una cola completa
Si observamos la implementación de los métodos de fábrica, podemos ver cómo funcionan. Por ejemplo:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
o
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Como podemos ver, las implementaciones de ExecutorServicese crean dentro de los métodos de fábrica. Y en su mayor parte, estamos hablando de ThreadPoolExecutor. Solo se modifican los parámetros que afectan al trabajo. Mejor juntos: Java y la clase Thread.  Parte V: Ejecutor, ThreadPool, Fork/Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

Como vimos anteriormente, ThreadPoolExecutores lo que generalmente se crea dentro de los métodos de fábrica. La funcionalidad se ve afectada por los argumentos que pasamos como el número máximo y mínimo de subprocesos, así como el tipo de cola que se utiliza. Pero java.util.concurrent.BlockingQueuese puede utilizar cualquier implementación de la interfaz. Hablando de ThreadPoolExecutor, debemos mencionar algunas características interesantes. Por ejemplo, no puede enviar tareas a ThreadPoolExecutorsi no hay espacio disponible:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Este código se bloqueará con un error como este:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
En otras palabras, taskno se puede enviar, porque SynchronousQueueestá diseñado para que en realidad consta de un solo elemento y no nos permite poner nada más en él. Podemos ver que tenemos cero queued tasks("tareas en cola = 0") aquí. Pero no hay nada extraño en esto, porque es una característica especial de SynchronousQueue, que de hecho es una cola de 1 elemento que siempre está vacía. Cuando un subproceso pone un elemento en la cola, esperará hasta que otro subproceso tome el elemento de la cola. En consecuencia, podemos reemplazarlo con new LinkedBlockingQueue<>(1)y el error cambiará para mostrar ahora queued tasks = 1. Debido a que la cola es solo 1 elemento, no podemos agregar un segundo elemento. Y eso es lo que hace que el programa falle. Continuando con nuestra discusión sobre la cola, vale la pena señalar que elThreadPoolExecutorLa clase tiene métodos adicionales para dar servicio a la cola. Por ejemplo, el threadPoolExecutor.purge()método eliminará todas las tareas canceladas de la cola para liberar espacio en la cola. Otra función interesante relacionada con la cola es el controlador de tareas rechazadas:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
En este ejemplo, nuestro controlador simplemente muestra Rejectedcada vez que se rechaza una tarea en la cola. Conveniente, ¿no? Además, ThreadPoolExecutortiene una subclase interesante: ScheduledThreadPoolExecutor, que es un ScheduledExecutorService. Proporciona la capacidad de realizar una tarea basada en un temporizador.

ScheduledExecutorService

ScheduledExecutorService(que es un tipo de ExecutorService) nos permite ejecutar tareas en un horario. Veamos un ejemplo:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Todo es simple aquí. Las tareas se envían y luego obtenemos un archivo java.util.concurrent.ScheduledFuture. Un horario también puede ser útil en la siguiente situación:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Aquí enviamos una Runnabletarea para su ejecución a una frecuencia fija ("FixedRate") con un cierto retraso inicial. En este caso, después de 1 segundo, la tarea comenzará a ejecutarse cada 2 segundos. Hay una opción similar:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Pero en este caso, las tareas se realizan con un intervalo específico ENTRE cada ejecución. Es decir, se taskejecutará después de 1 segundo. Luego, tan pronto como se complete, pasarán 2 segundos y luego se iniciará una nueva tarea. Aquí hay algunos recursos adicionales sobre este tema: Mejor juntos: Java y la clase Thread.  Parte V: Ejecutor, ThreadPool, Fork/Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

TrabajoRobarPiscina

Además de los grupos de subprocesos anteriores, hay uno más. Honestamente podemos decir que es un poco especial. Se llama grupo de robo de trabajo. En resumen, el robo de trabajo es un algoritmo en el que los subprocesos inactivos comienzan a tomar tareas de otros subprocesos o tareas de una cola compartida. Veamos un ejemplo:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Si ejecutamos este código, creará ExecutorService5 subprocesos para nosotros, porque cada subproceso se colocará en la cola de espera para el objeto de bloqueo. Ya descubrimos monitores y bloqueos en Mejor juntos: Java y la clase Thread. Parte II — Sincronización . Ahora reemplacemos Executors.newCachedThreadPool()con Executors.newWorkStealingPool(). ¿Qué cambiará? Veremos que nuestras tareas se ejecutan en menos de 5 subprocesos. ¿Recuerdas que CachedThreadPoolse crea un hilo para cada tarea? Esto se debe a que wait()bloqueó el subproceso, las tareas posteriores quieren completarse y se crearon nuevos subprocesos para ellos en el grupo. Con un grupo de robo, los subprocesos no permanecen inactivos para siempre. Comienzan a realizar las tareas de sus vecinos. ¿ Qué hace que un WorkStealingPoolgrupo sea tan diferente de otros grupos de subprocesos? El hecho de que la magiaForkJoinPoolvive dentro de ella:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
En realidad, hay una diferencia más. De forma predeterminada, los subprocesos creados para a ForkJoinPoolson subprocesos daemon, a diferencia de los subprocesos creados a través de un onrdinary ThreadPool. En general, debe recordar los subprocesos daemon porque, por ejemplo, CompletableFuturetambién utiliza subprocesos daemon a menos que especifique uno propio ThreadFactoryque cree subprocesos que no sean daemon. ¡Estas son las sorpresas que pueden acechar en lugares inesperados! :)

TenedorÚnetePiscina

En esta parte, volveremos a hablar sobre ForkJoinPool(también llamado marco de bifurcación/unión), que vive "bajo el capó" de WorkStealingPool. En general, el marco fork/join apareció en Java 1.7. Y aunque Java 11 está cerca, vale la pena recordarlo. Esta no es la implementación más común, pero es bastante interesante. Hay una buena revisión sobre esto en la web: Comprensión de Java Fork-Join Framework con ejemplos . El ForkJoinPooldepende de java.util.concurrent.RecursiveTask. También java.util.concurrent.RecursiveActionhay RecursiveActionno devuelve un resultado. Por lo tanto, RecursiveTaskes similar a Callable, y RecursiveActiones similar a unnable. Podemos ver que el nombre incluye los nombres de dos métodos importantes: forky join. ElforkEl método inicia alguna tarea de forma asíncrona en un subproceso separado. Y el joinmétodo le permite esperar a que se realice el trabajo. Para obtener la mejor comprensión, debe leer From Imperative Programming to Fork/Join to Parallel Streams in Java 8 .

Resumen

Bueno, eso concluye esta parte de la revisión. Hemos aprendido que Executorse inventó originalmente para ejecutar hilos. Luego, los creadores de Java decidieron continuar con la idea y crearon ExecutorService. ExecutorServicenos permite enviar tareas para su ejecución usando submit()y invoke(), y también cerrar el servicio. Debido ExecutorServicea que necesita implementaciones, escribieron una clase con métodos de fábrica y la llamaron Executors. Le permite crear grupos de subprocesos ( ThreadPoolExecutor). Además, existen grupos de subprocesos que también nos permiten especificar un cronograma de ejecución. Y un ForkJoinPoolse esconde detrás de un WorkStealingPool. Espero que haya encontrado lo que escribí anteriormente no solo interesante, sino también comprensible :) Siempre me complace escuchar sus sugerencias y comentarios. Mejor juntos: Java y la clase Thread. Parte I — Hilos de ejecución Mejor juntos: Java y la clase Thread. Parte II — Sincronización Mejor juntos: Java y la clase Thread. Parte III — Interacción Mejor juntos: Java y la clase Thread. Parte IV — Callable, Future y amigos Mejor juntos: Java y la clase Thread. Parte VI — ¡Dispara!
Comentarios
  • Populares
  • Nuevas
  • Antiguas
Debes iniciar sesión para dejar un comentario
Esta página aún no tiene comentarios