CodeGym /Các khóa học /JAVA 25 SELF /thenCompose + Executor tùy chỉnh + timeout

thenCompose + Executor tùy chỉnh + timeout

JAVA 25 SELF
Mức độ , Bài học
Có sẵn

1. thenCompose vs. thenApply: khác nhau thế nào và khi nào dùng

Trong lập trình bất đồng bộ trên Java (qua CompletableFuture), ta thường cần thực thi các chuỗi thao tác. Có hai phương thức tương tự: thenApplythenCompose. Nhưng chúng hoạt động khác nhau!

thenApply

Phương thức thenApply dùng khi bước tiếp theo chỉ là phép biến đổi giá trị đơn giản, không khởi chạy thao tác bất đồng bộ mới. Nó nhận kết quả của bước trước, xử lý và trả về một giá trị mới (không phải CompletableFuture).

Nếu bạn quen với Stream API, thì thenApply hoạt động gần giống map: lấy kết quả, áp dụng hàm và trả về phiên bản đã biến đổi.

Ví dụ:

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "42");
CompletableFuture<Integer> lengthFuture = cf.thenApply(s -> s.length());
// lengthFuture chứa 2 (độ dài chuỗi "42")

Nói ngắn gọn, thenApply là cách nói: “Khi kết quả sẵn sàng, hãy làm với nó việc này”.

thenCompose

  • Dùng khi bước tiếp theo là một thao tác bất đồng bộ khác (trả về CompletableFuture).
  • Cho phép “mở phẳng” các CompletableFuture lồng nhau (tương tự flatMap).
  • Nếu dùng thenApply với hàm bất đồng bộ, bạn sẽ nhận CompletableFuture<CompletableFuture<T>> — rất bất tiện!

Ví dụ:

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "user42");

// Giả sử ta cần theo tên người dùng lấy đơn hàng của họ (bất đồng bộ)
CompletableFuture<List<Order>> ordersFuture = cf.thenCompose(username -> fetchOrdersAsync(username));
// fetchOrdersAsync trả về CompletableFuture<List<Order>>

Trực quan:

  • thenApply: CF<String>thenApply(s -> s.length())CF<Integer>
  • thenCompose: CF<User>thenCompose(u -> fetchOrdersAsync(u.id))CF<List<Order>>

Khi nào dùng cái nào?

  • Hàm trả về giá trị thông thường — dùng thenApply.
  • Hàm trả về CompletableFuture — dùng thenCompose.

Ví dụ lỗi:

cf.thenApply(username -> fetchOrdersAsync(username)); // Nhận CF<CF<List<Order>>>
cf.thenCompose(username -> fetchOrdersAsync(username)); // Nhận CF<List<Order>>

2. Quản lý pool luồng (Executor): tại sao và cách dùng Executor riêng

Mặc định: ForkJoinPool.commonPool()

Khi bạn viết CompletableFuture.supplyAsync(...) hoặc thenApplyAsync(...) mà không chỉ rõ Executor, Java dùng pool luồng chung — ForkJoinPool.commonPool(). Điều này tiện, nhưng không phải lúc nào cũng phù hợp:

  • Nếu bạn có nhiều thao tác lâu hoặc chặn (yêu cầu mạng, làm việc với tệp), pool chung có thể “tắc nghẽn” và mọi tác vụ sẽ phải chờ.
  • Đôi khi cần cô lập các tác vụ với độ ưu tiên khác nhau hoặc giới hạn số luồng chạy đồng thời.

Khi nào cần Executor riêng?

  • Tác vụ lâu, chặn (ví dụ, truy vấn DB, HTTP, đọc tệp).
  • Cô lập tác vụ: để tác vụ người dùng không ảnh hưởng tác vụ hệ thống.
  • Giới hạn tài nguyên: chẳng hạn, không chạy quá 10 tải xuống đồng thời.

Cách tạo Executor riêng

Thường dùng ThreadPoolExecutor hoặc các factory trong Executors:

ExecutorService myExecutor = Executors.newFixedThreadPool(10);

Cách dùng Executor riêng với CompletableFuture

  • Ở các phương thức supplyAsync, runAsync, thenApplyAsync, thenComposeAsync và các phương thức khác, bạn có thể truyền đối số thứ hai là Executor của bạn.

Ví dụ:

CompletableFuture<String> cf = CompletableFuture.supplyAsync(
    () -> loadDataFromNetwork(), myExecutor
);

cf.thenApplyAsync(data -> processData(data), myExecutor)
  .thenAcceptAsync(result -> System.out.println(result), myExecutor);

Lưu ý: nếu không chỉ rõ Executor, sẽ dùng ForkJoinPool.commonPool().

Khi nào Executor mặc định là đủ?

  • Cho các tác vụ ngắn, CPU-bound (tính toán đơn giản).
  • Khi không quan trọng tác vụ chạy ở luồng nào.

3. Xử lý timeout: orTimeout và completeOnTimeout

Các thao tác bất đồng bộ có thể treo hoặc chạy quá lâu (ví dụ, máy chủ không phản hồi). Để không phải chờ vô hạn, trong CompletableFuture có các phương thức làm việc với timeout.

orTimeout

  • Kết thúc CompletableFuture với ngoại lệ TimeoutException nếu thao tác không hoàn tất trong thời gian cho trước.
  • Không hủy tác vụ thực sự đang chạy, nhưng chuỗi downstream sẽ nhận lỗi.

Cú pháp:

cf.orTimeout(3, TimeUnit.SECONDS)
  .exceptionally(ex -> {
      System.out.println("Hết thời gian chờ: " + ex);
      return null;
  });

Ví dụ:

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
    Thread.sleep(5000); // mô phỏng tác vụ lâu
    return "OK";
});

cf.orTimeout(2, TimeUnit.SECONDS)
  .exceptionally(ex -> {
      System.out.println("Lỗi: " + ex);
      return "TIMEOUT";
  });

Kết quả:

Sau 2 giây sẽ ném TimeoutException, và exceptionally sẽ xử lý lỗi.

completeOnTimeout

  • Kết thúc CompletableFuture với giá trị chỉ định nếu thao tác không hoàn tất trong thời gian timeout.
  • Không ném ngoại lệ, mà trả về giá trị “dự phòng”.

Cú pháp:

cf.completeOnTimeout("DEFAULT", 2, TimeUnit.SECONDS);

Ví dụ:

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
    Thread.sleep(5000);
    return "OK";
});

cf.completeOnTimeout("TIMEOUT", 2, TimeUnit.SECONDS)
  .thenAccept(System.out::println); // Sau 2 giây sẽ in "TIMEOUT"

So sánh orTimeout và completeOnTimeout

Phương thức Làm gì khi timeout? Xử lý tiếp theo thế nào?
orTimeout
Kết thúc với TimeoutException Có thể xử lý qua exceptionally/handle
completeOnTimeout
Kết thúc với giá trị chỉ định thenAccept/thenApply sẽ nhận giá trị này

4. Thực hành: ví dụ với thenCompose, Executor tùy chỉnh và timeout

Bài toán:

  • Lấy người dùng theo id (bất đồng bộ, có độ trễ).
  • Sau đó bất đồng bộ lấy danh sách đơn hàng của người dùng (cũng có độ trễ).
  • Dùng Executor tùy chỉnh.
  • Thêm timeout cho việc lấy đơn hàng.
import java.util.concurrent.*;
import java.util.*;

public class AsyncDemo {
    static ExecutorService ioExecutor = Executors.newFixedThreadPool(4);

    // Mô phỏng lấy người dùng bất đồng bộ
    static CompletableFuture<String> fetchUserAsync(int userId) {
        return CompletableFuture.supplyAsync(() -> {
            sleep(1000);
            return "user" + userId;
        }, ioExecutor);
    }

    // Mô phỏng lấy đơn hàng của người dùng bất đồng bộ
    static CompletableFuture<List<String>> fetchOrdersAsync(String username) {
        return CompletableFuture.supplyAsync(() -> {
            sleep(3000); // Tác vụ lâu!
            return List.of("order1", "order2");
        }, ioExecutor);
    }

    static void sleep(long ms) {
        try { Thread.sleep(ms); } catch (InterruptedException ignored) {}
    }

    public static void main(String[] args) {
        fetchUserAsync(42)
            .thenCompose(username ->
                fetchOrdersAsync(username)
                    .orTimeout(2, TimeUnit.SECONDS) // Timeout khi lấy đơn hàng
                    .exceptionally(ex -> {
                        System.out.println("Không thể lấy đơn hàng: " + ex);
                        return List.of();
                    })
            )
            .thenAccept(orders -> System.out.println("Đơn hàng: " + orders))
            .join(); // Chờ hoàn tất toàn bộ chuỗi

        ioExecutor.shutdown();
    }
}

Điều gì xảy ra:

  • Lấy người dùng (1 giây).
  • Lấy đơn hàng (3 giây, nhưng timeout 2 giây).
  • Nếu không kịp — nhận TimeoutException, trả về danh sách rỗng.
  • Tất cả chạy trên Executor tùy chỉnh.

Kết quả:

Không thể lấy đơn hàng: java.util.concurrent.TimeoutException
Đơn hàng: []

Nếu giảm độ trễ trong fetchOrdersAsync xuống 1_000 ms — bạn sẽ thấy các đơn hàng thực.

5. Lỗi thường gặp và lưu ý

Lỗi số 1: Dùng thenApply thay vì thenCompose cho thao tác bất đồng bộ.
Nếu hàm trả về CompletableFuture mà bạn dùng thenApply, bạn sẽ nhận kiểu lồng CompletableFuture<CompletableFuture<T>>. Điều này làm chuỗi phức tạp và tạo lớp bọc thừa. Giải pháp: dùng thenCompose để “làm phẳng” kết quả thành CompletableFuture<T>.

Lỗi số 2: Chạy tác vụ lâu hoặc IO mà không có Executor riêng.
Mặc định các tác vụ chạy trong ForkJoinPool.commonPool(). Nếu quá tải pool này, độ trễ sẽ tăng và các tác vụ khác trong ứng dụng có thể chậm lại. Giải pháp: tạo ExecutorService riêng và truyền nó vào supplyAsync/thenApplyAsync.

Lỗi số 3: Kỳ vọng rằng orTimeout sẽ hủy tác vụ đang chạy.
orTimeout chỉ kết thúc CompletableFuture bằng ngoại lệ do timeout, còn tác vụ vẫn tiếp tục chạy ở nền. Giải pháp: nếu cần dừng thực thi, dùng cancel(true) hoặc cơ chế ngắt riêng.

Lỗi số 4: Hiểu sai phạm vi của timeout.
orTimeoutcompleteOnTimeout chỉ áp dụng cho một bước cụ thể trong chuỗi, không phải toàn bộ chuỗi. Giải pháp: nếu cần timeout tổng thể cho cả chuỗi, hãy bọc toàn bộ vào một CompletableFuture riêng và áp dụng timeout cho nó.

Lỗi số 5: Không đóng ExecutorService.
Nếu sau khi chạy xong không gọi shutdown()/shutdownNow() trên ExecutorService, các luồng sẽ tiếp tục chạy và chương trình có thể “treo”. Giải pháp: luôn đóng ExecutorService trong finally hoặc dùng try-with-resources ở Java 21+.

1
Khảo sát/đố vui
, cấp độ , bài học
Không có sẵn
Lập trình bất đồng bộ
Lập trình bất đồng bộ
Bình luận
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION