Hãy tìm ra phương thức newWorkStealingPool , phương thức chuẩn bị một ExecutorService cho chúng ta.

Nhóm chủ đề này là đặc biệt. Hành vi của nó dựa trên ý tưởng "ăn cắp" công việc.

Các tác vụ được xếp hàng đợi và phân phối giữa các bộ xử lý. Nhưng nếu một bộ xử lý đang bận, thì một bộ xử lý rảnh rỗi khác có thể đánh cắp một tác vụ từ nó và thực thi nó. Định dạng này được giới thiệu trong Java để giảm xung đột trong các ứng dụng đa luồng. Nó được xây dựng trên khuôn khổ fork/join .

rẽ nhánh/tham gia

Trong khuôn khổ fork/join , các tác vụ được phân tách theo cách đệ quy, nghĩa là chúng được chia thành các tác vụ con. Sau đó, các nhiệm vụ con được thực hiện riêng lẻ và kết quả của các nhiệm vụ con được kết hợp để tạo thành kết quả của nhiệm vụ ban đầu.

Phương thức fork bắt đầu một tác vụ không đồng bộ trên một số luồng và phương thức nối cho phép bạn đợi tác vụ này kết thúc.

mớiCông việcTrộm cắpBể bơi

Phương thức newWorkStealingPool có hai cách triển khai:

public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

Ngay từ đầu, chúng tôi lưu ý rằng dưới mui xe, chúng tôi không gọi hàm tạo ThreadPoolExecutor . Ở đây chúng tôi đang làm việc với thực thể ForkJoinPool . Giống như ThreadPoolExecutor , nó là một triển khai của AbstractExecutorService .

Chúng tôi có 2 phương pháp để lựa chọn. Đầu tiên, chúng tôi tự chỉ ra mức độ song song mà chúng tôi muốn thấy. Nếu chúng tôi không chỉ định giá trị này, thì tính song song của nhóm của chúng tôi sẽ bằng với số lõi bộ xử lý có sẵn cho máy ảo Java.

Vẫn còn phải tìm ra cách nó hoạt động trong thực tế:

Collection<Callable<Void>> tasks = new ArrayList<>();
        ExecutorService executorService = Executors.newWorkStealingPool(10);

        for (int i = 0; i < 10; i++) {
            int taskNumber = i;
            Callable<Void> callable = () -> {
                System.out.println("Processed user request #" + taskNumber + " on thread " + Thread.currentThread().getName());
                return null;
            };
            tasks.add(callable);
        }
        executorService.invokeAll(tasks);

Chúng tôi tạo 10 tác vụ hiển thị trạng thái hoàn thành của riêng chúng. Sau đó, chúng tôi khởi chạy tất cả các tác vụ bằng cách sử dụng phương thức invokeAll .

Kết quả khi thực thi 10 task trên 10 thread trong pool:

Yêu cầu người dùng đã xử lý số 9 trên luồng ForkJoinPool-1-worker-10
Yêu cầu người dùng đã xử lý số 4 trên luồng ForkJoinPool-1-worker-5
Yêu cầu người dùng đã xử lý số 7 trên luồng ForkJoinPool-1-worker-8 Yêu
cầu người dùng đã xử lý số 1 trên ForkJoinPool- Chủ đề 1-worker-2
Yêu cầu người dùng đã xử lý #2 trên luồng ForkJoinPool-1-worker-3 Yêu
cầu người dùng đã xử lý #3 trên luồng ForkJoinPool-1-worker-4 Yêu
cầu người dùng đã xử lý #6 trên luồng ForkJoinPool-1-worker-7
Người dùng đã xử lý yêu cầu số 0 trên luồng ForkJoinPool-1-worker-1
Yêu cầu người dùng đã xử lý số 5 trên luồng ForkJoinPool-1-worker-6 Yêu
cầu người dùng đã xử lý số 8 trên luồng ForkJoinPool-1-worker-9

Chúng ta thấy rằng sau khi hàng đợi được hình thành, các luồng sẽ nhận nhiệm vụ để thực thi. Bạn cũng có thể kiểm tra xem 20 tác vụ sẽ được phân bổ như thế nào trong nhóm 10 luồng.

Yêu cầu người dùng đã xử lý số 3 trên luồng ForkJoinPool-1-worker-4
Yêu cầu người dùng đã xử lý số 7 trên luồng ForkJoinPool-1-worker-8 Yêu
cầu người dùng đã xử lý số 2 trên luồng ForkJoinPool-1-worker-3 Yêu cầu
người dùng đã xử lý số 4 trên ForkJoinPool- Chuỗi 1-worker-5
Yêu cầu người dùng đã xử lý số 1 trên chuỗi ForkJoinPool-1-worker-2 Yêu
cầu người dùng đã xử lý số 5 trên chuỗi ForkJoinPool-1-worker-6 Yêu
cầu người dùng đã xử lý số 8 trên chuỗi ForkJoinPool-1-worker-9
Người dùng đã xử lý yêu cầu số 9 trên luồng ForkJoinPool-1-worker-10
Yêu cầu người dùng đã xử lý số 0 trên luồng ForkJoinPool-1-worker-1
Yêu cầu người dùng đã xử lý số 6 trên luồng ForkJoinPool-1-worker-7 Yêu
cầu người dùng đã xử lý số 10 trên ForkJoinPool-1- công nhân-9 chủ đề
Yêu cầu người dùng đã xử lý số 12 trên luồng ForkJoinPool-1-worker-1
Yêu cầu người dùng đã xử lý số 13 trên luồng ForkJoinPool-1-worker-8 Yêu
cầu người dùng đã xử lý số 11 trên luồng ForkJoinPool-1-worker-6 Yêu cầu
người dùng đã xử lý số 15 trên ForkJoinPool- Chuỗi 1-worker-8
Yêu cầu người dùng đã xử lý #14 trên chuỗi ForkJoinPool-1-worker-1
Yêu cầu người dùng đã xử lý #17 trên chuỗi ForkJoinPool-1-worker-6 Yêu
cầu người dùng đã xử lý #16 trên chuỗi ForkJoinPool-1-worker-7 Người
dùng đã xử lý yêu cầu #19 trên luồng ForkJoinPool-1-worker-6
Đã xử lý yêu cầu người dùng #18 trên luồng ForkJoinPool-1-worker-1

Từ đầu ra, chúng ta có thể thấy rằng một số luồng quản lý để hoàn thành một số tác vụ ( ForkJoinPool-1-worker-6 đã hoàn thành 4 tác vụ), trong khi một số chỉ hoàn thành một tác vụ ( ForkJoinPool-1-worker-2 ). Nếu độ trễ 1 giây được thêm vào quá trình triển khai phương thức gọi , thì bức tranh sẽ thay đổi.

Callable<Void> callable = () -> {
   System.out.println("Processed user request #" + taskNumber + " on thread " + Thread.currentThread().getName());
   TimeUnit.SECONDS.sleep(1);
   return null;
};

Để thử nghiệm, hãy chạy mã tương tự trên một máy khác. Kết quả đầu ra:

Yêu cầu người dùng đã xử lý số 2 trên luồng ForkJoinPool-1-worker-23
Yêu cầu người dùng đã xử lý số 7 trên luồng ForkJoinPool-1-worker-31 Yêu
cầu người dùng đã xử lý số 4 trên luồng ForkJoinPool-1-worker-27 Yêu cầu
người dùng đã xử lý số 5 trên ForkJoinPool- Chủ đề 1-worker-13
Yêu cầu người dùng đã xử lý #0 trên luồng ForkJoinPool-1-worker-19 Yêu
cầu người dùng đã xử lý #8 trên luồng ForkJoinPool-1-worker-3 Yêu
cầu người dùng đã xử lý #9 trên luồng ForkJoinPool-1-worker-21 Người
dùng đã xử lý yêu cầu số 6 trên luồng ForkJoinPool-1-worker-17
Yêu cầu người dùng đã xử lý số 3 trên luồng ForkJoinPool-1-worker-9
Yêu cầu người dùng đã xử lý số 1 trên luồng ForkJoinPool-1-worker-5 Yêu
cầu người dùng đã xử lý số 12 trên ForkJoinPool-1- chủ đề công nhân-23
Yêu cầu người dùng đã xử lý số 15 trên luồng ForkJoinPool-1-worker-19
Yêu cầu người dùng đã xử lý số 14 trên luồng ForkJoinPool-1-worker-27 Yêu
cầu người dùng đã xử lý số 11 trên luồng ForkJoinPool-1-worker-3 Yêu cầu
người dùng đã xử lý số 13 trên ForkJoinPool- Chủ đề 1-worker-13
Yêu cầu người dùng đã xử lý #10 trên luồng ForkJoinPool-1-worker-31 Yêu
cầu người dùng đã xử lý #18 trên luồng ForkJoinPool-1-worker-5 Yêu
cầu người dùng đã xử lý #16 trên luồng ForkJoinPool-1-worker-9
Người dùng đã xử lý yêu cầu số 17 trên chuỗi ForkJoinPool-1-worker-21
Yêu cầu người dùng đã xử lý số 19 trên chuỗi ForkJoinPool-1-worker-17

Trong kết quả này, điều đáng chú ý là chúng tôi đã "yêu cầu" các chủ đề trong nhóm. Hơn nữa, tên của chuỗi công nhân không đi từ một đến mười, nhưng thay vào đó, đôi khi cao hơn mười. Nhìn vào những tên riêng, chúng ta thấy rằng thực sự có mười công nhân (3, 5, 9, 13, 17, 19, 21, 23, 27 và 31). Ở đây khá hợp lý để hỏi tại sao điều này lại xảy ra? Bất cứ khi nào bạn không hiểu chuyện gì đang xảy ra, hãy sử dụng trình gỡ rối.

Đây là những gì chúng ta sẽ làm. Hãy đúcngười thi hànhDịch vụphản đối một ForkJoinPool :

final ForkJoinPool forkJoinPool = (ForkJoinPool) executorService;

Chúng tôi sẽ sử dụng hành động Đánh giá biểu thức để kiểm tra đối tượng này sau khi gọi phương thức invokeAll . Để thực hiện việc này, sau phương thức invokeAll , hãy thêm bất kỳ câu lệnh nào, chẳng hạn như một sout trống, và đặt một điểm dừng trên đó.

Chúng ta có thể thấy rằng nhóm có 10 luồng, nhưng kích thước của mảng luồng công nhân là 32. Lạ, nhưng không sao. Hãy tiếp tục đào. Khi tạo một nhóm, hãy thử đặt mức song song thành hơn 32, chẳng hạn như 40.

ExecutorService executorService = Executors.newWorkStealingPool(40);

Trong trình gỡ lỗi, hãy xemforkJoinPool một lần nữa.

Bây giờ, kích thước của mảng luồng công nhân là 128. Chúng ta có thể cho rằng đây là một trong những tối ưu hóa bên trong của JVM. Hãy thử tìm nó trong mã của JDK (openjdk-14):

Đúng như chúng tôi đã nghi ngờ: kích thước của mảng luồng công nhân được tính bằng cách thực hiện các thao tác bitwise trên giá trị song song. Chúng ta không cần cố gắng tìm hiểu chính xác điều gì đang xảy ra ở đây. Chỉ cần biết rằng có sự tối ưu hóa như vậy là đủ.

Một khía cạnh thú vị khác của ví dụ của chúng ta là việc sử dụng phương thức InvokeAll . Điều đáng chú ý là phương thức invokeAll có thể trả về một kết quả, hay đúng hơn là một danh sách các kết quả (trong trường hợp của chúng tôi là List <Future<Void>>) , nơi chúng tôi có thể tìm thấy kết quả của từng tác vụ.

var results = executorService.invokeAll(tasks);
        for (Future<Void> result : results) {
            // Process the task's result
        }

Loại dịch vụ và nhóm luồng đặc biệt này có thể được sử dụng trong các tác vụ có mức độ đồng thời có thể dự đoán được hoặc ít nhất là ngầm định.