CodeGym/Java Blog/무작위의/더 나은 조합: Java와 Thread 클래스. 파트 V — 집행자, ThreadPool, 포크/조인
John Squirrels
레벨 41
San Francisco

더 나은 조합: Java와 Thread 클래스. 파트 V — 집행자, ThreadPool, 포크/조인

무작위의 그룹에 게시되었습니다
회원

소개

따라서 우리는 Java에 스레드가 있다는 것을 알고 있습니다. Better Together: Java and the Thread class 라는 제목의 리뷰에서 이에 대해 읽을 수 있습니다 . 파트 I — 실행 스레드 . 더 나은 조합: Java와 Thread 클래스.  파트 V — 집행자, ThreadPool, 포크/조인 - 1일반적인 코드를 다시 살펴보겠습니다.
public static void main(String[] args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
보시다시피 작업을 시작하는 코드는 매우 일반적이지만 새 작업에 대해 반복해야 합니다. 한 가지 해결책은 이를 별도의 메서드에 넣는 것입니다. 예를 들어 execute(Runnable runnable). 그러나 Java 제작자는 우리의 곤경을 고려하여 다음과 같은 인터페이스를 고안했습니다 Executor.
public static void main(String[] args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
이 코드는 분명히 더 간결합니다. 이제 Runnable스레드에서 시작하는 코드를 작성하기만 하면 됩니다. 대단하지 않나요? 그러나 이것은 시작에 불과합니다. 더 나은 조합: Java와 Thread 클래스.  파트 V — 집행자, ThreadPool, 포크/조인 - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

보시다시피 Executor인터페이스에는 하위 인터페이스가 있습니다 ExecutorService. 이 인터페이스에 대한 Javadoc 에서는 ExecutorService가 . 또한 실행 프로세스를 추적하기 위해 a를 얻을 수 있습니다 . 이전에는 Better Together: Java 및 Thread 클래스에서 설명했습니다. 파트 IV — Callable, Future 및 friends 에서 의 기능을 간략하게 검토했습니다 . 잊었거나 읽지 않았다면 기억을 되살리기를 제안합니다 ;) Javadoc은 또 뭐라고 말합니까? . _ _ ExecutorExecutorjava.util.concurrent.FutureFuturejava.util.concurrent.ExecutorsExecutorService

ExecutorService

복습 해보자. 스레드에서 특정 작업을 Executor실행(예: 호출)해야 하며 스레드 를 생성하는 코드는 숨겨져 있습니다. 진행 상황을 제어할 수 있는 몇 가지 옵션이 있는 특정 항목이 execute()있습니다 . 그리고 우리 는 . 이제 직접 해봅시다. ExecutorServiceExecutorExecutorsExecutorService
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
크기가 2인 고정 스레드 풀을 지정한 것을 볼 수 있습니다. 그런 다음 풀에 작업을 하나씩 제출합니다. 각 작업은 String스레드 이름( currentThread().GetName())을 포함하는 를 반환합니다. ExecutorService마지막에 를 종료하는 것이 중요합니다 . 그렇지 않으면 프로그램이 종료되지 않기 때문입니다. 공장 Executors에는 추가 공장 메서드가 있습니다. 예를 들어, 하나의 스레드( )로 구성된 풀 또는 스레드가 1분 동안 유휴 상태가 되면 제거되는 newSingleThreadExecutor캐시( )를 포함하는 풀을 만들 수 있습니다. newCachedThreadPool실제로 이들은 태스크가 배치되고 태스크가 실행되는 블로킹 큐ExecutorService 에 의해 지원됩니다 . 차단 대기열에 대한 자세한 내용은 이 비디오 에서 확인할 수 있습니다 . 당신은 또한 이것을 읽을 수 있습니다BlockingQueue에 대한 검토 . "ArrayBlockingQueue보다 LinkedBlockingQueue를 선호하는 경우는 언제입니까?"라는 질문에 대한 답변을 확인하십시오. 가장 간단한 용어로 a는 BlockingQueue두 가지 경우에 스레드를 차단합니다.
  • 스레드는 빈 큐에서 항목을 가져오려고 시도합니다.
  • 스레드는 전체 대기열에 항목을 넣으려고 시도합니다.
팩토리 메소드의 구현을 보면 어떻게 작동하는지 알 수 있습니다. 예를 들어:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
또는
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
보시다시피 구현은 ExecutorService팩토리 메소드 내부에서 생성됩니다. 그리고 대부분의 경우 ThreadPoolExecutor. 작업에 영향을 주는 매개변수만 변경됩니다. 더 나은 조합: Java와 Thread 클래스.  파트 V — 집행자, ThreadPool, 포크/조인 - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

앞에서 본 것처럼 ThreadPoolExecutor일반적으로 팩토리 메서드 내에서 생성되는 것입니다. 기능은 스레드의 최대 및 최소 수와 사용 중인 대기열 유형으로 전달하는 인수의 영향을 받습니다. 그러나 java.util.concurrent.BlockingQueue인터페이스의 모든 구현을 사용할 수 있습니다. 에 대해 ThreadPoolExecutor몇 가지 흥미로운 기능을 언급해야 합니다. ThreadPoolExecutor예를 들어 사용 가능한 공간이 없는 경우 작업을 제출할 수 없습니다 .
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
이 코드는 다음과 같은 오류와 함께 충돌합니다.
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
즉, task제출할 수 없습니다. SynchronousQueue실제로 단일 요소로 구성되고 더 이상 아무것도 추가할 수 없도록 설계되었기 때문입니다. queued tasks여기에서 0("대기 중인 작업 = 0")이 있음을 알 수 있습니다 . 그러나 이것에 대해 이상한 점은 없습니다. 왜냐하면 이것은 SynchronousQueue사실 항상 비어 있는 1-요소 대기열인 의 특수 기능이기 때문입니다! 한 스레드가 큐에 요소를 넣으면 다른 스레드가 큐에서 요소를 가져올 때까지 기다립니다. 따라서 이를 로 교체하면 new LinkedBlockingQueue<>(1)오류가 now show 로 변경됩니다 queued tasks = 1. 대기열은 1개의 요소이기 때문에 두 번째 요소를 추가할 수 없습니다. 그리고 이것이 프로그램이 실패하는 원인입니다. 대기열에 대한 논의를 계속하면ThreadPoolExecutor클래스에는 대기열을 서비스하기 위한 추가 메서드가 있습니다. 예를 들어 이 threadPoolExecutor.purge()메서드는 대기열의 공간을 확보하기 위해 취소된 모든 작업을 대기열에서 제거합니다. 또 다른 흥미로운 대기열 관련 기능은 거부된 작업에 대한 핸들러입니다.
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
이 예에서 처리기는 Rejected대기열의 작업이 거부될 때마다 단순히 표시합니다. 편리하지 않습니까? 또한 ThreadPoolExecutor에는 흥미로운 하위 클래스 ScheduledThreadPoolExecutor인 가 있습니다 ScheduledExecutorService. 타이머를 기반으로 작업을 수행하는 기능을 제공합니다.

ScheduledExecutorService

ScheduledExecutorService(의 한 유형 ExecutorService) 일정에 따라 작업을 실행할 수 있습니다. 예를 살펴보겠습니다.
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
여기에서는 모든 것이 간단합니다. 작업이 제출되고 java.util.concurrent.ScheduledFuture. 일정은 다음 상황에서도 도움이 될 수 있습니다.
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Runnable여기에서 특정 초기 지연과 함께 고정 빈도("FixedRate")로 실행할 작업을 제출합니다 . 이 경우 1초 후에 태스크가 2초마다 실행되기 시작합니다. 비슷한 옵션이 있습니다.
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
그러나이 경우 작업은 각 실행 사이의 특정 간격으로 수행됩니다. 즉, 는 task1초 후에 실행됩니다. 그러면 완료되자마자 2초가 지나면 새로운 작업이 시작됩니다. 다음은 이 주제에 대한 몇 가지 추가 리소스입니다. 더 나은 조합: Java와 Thread 클래스.  파트 V — 집행자, ThreadPool, 포크/조인 - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

작업훔치기풀

위의 스레드 풀 외에도 스레드 풀이 하나 더 있습니다. 솔직히 조금 특별하다고 말할 수 있습니다. 일을 훔치는 풀이라고 합니다. 즉, 작업 도용은 유휴 스레드가 다른 스레드에서 작업을 시작하거나 공유 큐에서 작업을 시작하는 알고리즘입니다. 예를 살펴보겠습니다.
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
이 코드를 실행하면 ExecutorService각 스레드가 잠금 개체의 대기 큐에 놓이기 때문에 5개의 스레드가 생성됩니다. 우리는 이미 Better Together: Java and the Thread 클래스 에서 모니터와 잠금을 파악했습니다 . 파트 II — 동기화 . 이제 Executors.newCachedThreadPool(). Executors.newWorkStealingPool()_ 무엇이 바뀔까요? 작업이 5개 미만의 스레드에서 실행되는 것을 볼 수 있습니다. CachedThreadPool각 작업에 대한 스레드를 생성한다는 것을 기억하십니까 ? wait()스레드를 차단하고 후속 작업이 완료되기를 원하며 풀에서 새 스레드가 생성되었기 때문입니다 . 도용 풀을 사용하면 스레드가 영원히 유휴 상태가 되지 않습니다. 그들은 이웃의 작업을 수행하기 시작합니다. WorkStealingPool다른 스레드 풀과 다른 점은 무엇입니까 ? 마법이라는 사실ForkJoinPool그 안에 산다:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
사실 차이점이 하나 더 있습니다. 기본적으로 a에 대해 생성된 스레드는 ForkJoinPoolonrdinary를 통해 생성된 스레드와 달리 데몬 스레드입니다 ThreadPool. 일반적으로 데몬 스레드를 기억해야 합니다. 예를 들어 데몬 스레드가 아닌 스레드를 생성하는 스레드를 CompletableFuture직접 지정하지 않는 한 데몬 스레드도 사용하기 때문입니다. ThreadFactory예상치 못한 곳에 숨어 있을 수 있는 놀라움입니다! :)

포크조인풀

ForkJoinPool이 파트 에서는 WorkStealingPool​​. 일반적으로 fork/join 프레임워크는 Java 1.7에서 다시 나타났습니다. 그리고 Java 11이 가까이에 있지만 여전히 기억할 가치가 있습니다. 이것은 가장 일반적인 구현은 아니지만 매우 흥미롭습니다. 이에 대한 좋은 리뷰가 웹에 있습니다: Understanding Java Fork-Join Framework with Examples . ForkJoinPool에 의존 합니다 java.util.concurrent.RecursiveTask. 또한 있습니다 java.util.concurrent.RecursiveAction. RecursiveAction결과를 반환하지 않습니다. 따라서 는 와 RecursiveTask유사 하고 은 와 유사합니다 . 이름에 두 가지 중요한 메서드의 이름이 포함되어 있음을 알 수 있습니다 . 그만큼CallableRecursiveActionunnableforkjoinfork메서드는 별도의 스레드에서 일부 작업을 비동기적으로 시작합니다. 그리고 이 join방법을 사용하면 작업이 완료될 때까지 기다릴 수 있습니다. 가장 잘 이해하려면 From Imperative Programming to Fork/Join to Parallel Streams in Java 8 을 읽어야 합니다 .

요약

이상으로 이번 리뷰를 마치겠습니다. 우리는 그것이 원래 쓰레드를 실행하기 위해 발명되었다는 것을 배웠습니다 Executor. 그런 다음 Java 제작자는 아이디어를 계속하기로 결정하고 ExecutorService. 및 를 ExecutorService사용하여 실행할 작업을 제출 하고 서비스를 종료할 수도 있습니다. 구현이 필요하기 때문에 팩토리 메소드로 클래스를 작성하고 호출했습니다 . 스레드 풀( )을 생성할 수 있습니다 . 또한 실행 일정을 지정할 수 있는 스레드 풀도 있습니다. 그리고 a는 a 뒤에 숨는다 . 위에서 제가 쓴 내용이 재미있을 뿐만 아니라 이해하기 쉬우셨기를 바랍니다 :) 저는 항상 여러분의 제안과 의견을 듣게 되어 기쁩니다. submit()invoke()ExecutorServiceExecutorsThreadPoolExecutorForkJoinPoolWorkStealingPool더 나은 조합: Java와 Thread 클래스. 1부 — 실행 스레드 Java와 Thread 클래스를 함께 사용하면 더 좋습니다. 2부 — 동기화 함께 하면 더 좋습니다: Java와 Thread 클래스. 3부 — 상호 작용 더 나은 조합: Java와 Thread 클래스. 4부 — 호출 가능, 미래 및 친구 더 나은 조합: Java 및 Thread 클래스. 6부 — 발사!
코멘트
  • 인기
  • 신규
  • 이전
코멘트를 남기려면 로그인 해야 합니다
이 페이지에는 아직 코멘트가 없습니다