1. CountDownLatch: start na sygnał
W świecie wielowątkowym często trzeba zorganizować zgraną pracę grupy wątków — tak, aby wszystkie zaczęły, skończyły lub przeszły do kolejnego etapu razem. Na przykład:
Wyobraź sobie wyścig. Samochody stoją na starcie — ktoś już rozgrzał silnik, ktoś jeszcze sprawdza opony. Ale dopóki sędzia nie machnie flagą, nikt nie rusza. To właśnie zadanie koordynacji.
Albo inny przykład: przygotowujesz kolację z przyjaciółmi — ktoś kroi warzywa, ktoś stawia wodę, ktoś szuka, gdzie podziała się sól. Najważniejsze, by wszyscy skończyli przygotowania, zanim zaczniecie gotować.
Do takich przypadków Java daje gotowe narzędzia synchronizacji — bezpieczne, zrozumiałe i bez bólu z wait() i notify(). Jednym z najprzydatniejszych jest CountDownLatch. Działa jak blokada-licznik: dopóki nie spadnie do zera, „drzwi” są zamknięte i nikt nie idzie dalej. A kiedy wszyscy się zameldują — latch otwiera się i wątki synchronicznie ruszają do boju.
CountDownLatch
CountDownLatch to „jednorazowy zawór”, który pozwala jednemu lub kilku wątkom czekać, aż inne wątki zakończą określoną liczbę operacji.
To jak start maratonu: wszyscy biegacze stoją na linii i czekają na strzał z pistoletu startowego. Gdy tylko sędzia strzeli (countdown dojdzie do zera) — wszyscy biegną.
Jak to w ogóle działa
CountDownLatch to jak gwizdek startowy dla wątków. Przy tworzeniu podajesz liczbę — na przykład 3. To jak trzy sygnały, które trzeba otrzymać, zanim zacznie się wyścig.
Wątki, które mają czekać na start, wywołują await(). Stoją na linii i są gotowe ruszyć, ale trzymają hamulec. Inne wątki, wykonując przygotowania, w miarę gotowości wywołują countDown() — jakby mówiły: „Jestem gotowy!”.
Gdy licznik dojdzie do zera — bam! — wszystkie oczekujące wątki startują równocześnie.
Ale pamiętaj: CountDownLatch jest jednorazowy. Po tym, jak licznik spadnie do zera, nie da się go przywrócić. To nie rewolwer, tylko petarda: huknie — i koniec.
Przykład: oczekiwanie zakończenia N zadań
import java.util.concurrent.CountDownLatch;
public class LatchDemo {
public static void main(String[] args) throws InterruptedException {
int workers = 3;
CountDownLatch latch = new CountDownLatch(workers);
for (int i = 1; i <= workers; i++) {
int id = i;
new Thread(() -> {
System.out.println("Pracownik " + id + " rozpoczął pracę");
try { Thread.sleep(500 + id * 200); } catch (InterruptedException ignored) {}
System.out.println("Pracownik " + id + " zakończył pracę");
latch.countDown(); // zmniejszamy licznik
}).start();
}
System.out.println("Główny wątek czeka na zakończenie wszystkich pracowników...");
latch.await(); // czekamy, aż wszyscy pracownicy zakończą
System.out.println("Wszyscy pracownicy skończyli! Kontynuujemy pracę główną.");
}
}
Wynik:
Główny wątek czeka na zakończenie wszystkich pracowników...
Pracownik 1 rozpoczął pracę
Pracownik 2 rozpoczął pracę
Pracownik 3 rozpoczął pracę
Pracownik 1 zakończył pracę
Pracownik 2 zakończył pracę
Pracownik 3 zakończył pracę
Wszyscy pracownicy skończyli! Kontynuujemy pracę główną.
Przykład: jednoczesny start „na sygnał”
CountDownLatch startSignal = new CountDownLatch(1);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " czeka na start");
startSignal.await(); // czekamy na sygnał
System.out.println(Thread.currentThread().getName() + " startuje!");
} catch (InterruptedException ignored) {}
}).start();
}
Thread.sleep(1000);
System.out.println("Sygnał do startu!");
startSignal.countDown(); // wszystkie wątki startują jednocześnie
2. CyclicBarrier: wielokrotne fazy i działania barierowe
CyclicBarrier: spotykamy się przy ognisku
CyclicBarrier to miejsce spotkania wątków. Każdy biegnie własną trasą, robi coś swojego, a potem wszyscy zbierają się przy „barierze” — jak przy ognisku w górach. Gdy zbiorą się wszyscy, bariera się otwiera, a grupa idzie dalej.
Główna różnica w porównaniu z CountDownLatch — tę barierę można używać w kółko. Po każdym wspólnym postoju „przeładowuje się” i drużyna może iść do kolejnego etapu.
Wyobraźmy sobie: Grupa turystów idzie długim szlakiem. Każdy porusza się w swoim tempie: ktoś fotografuje motyle, ktoś szuka Wi‑Fi. Ale na każdym przełęczy spotykają się przy ognisku, czekają na siebie i decydują, dokąd iść dalej. Tak właśnie działa CyclicBarrier.
Jak to działa
Tworzysz barierę i podajesz, ilu uczestników ma się zebrać, na przykład 4. Każdy wątek, docierając do punktu kontrolnego, wywołuje await() — i czeka na resztę. Gdy cała czwórka się zbierze, bariera „pyka” i puszcza wszystkich dalej.
Można nawet ustawić „działanie barierowe” — kawałek kodu, który wykona się dokładnie raz, gdy grupa się zbierze. Na przykład rozpalić to ognisko albo zapisać log: „Etap zakończony, idziemy dalej”. W tym celu do konstruktora przekazuje się Runnable.
Ważne: w przeciwieństwie do jednorazowego CountDownLatch, CyclicBarrier jest wielorazowy. Po każdym „zebraniu” znów jest gotowy na następny etap — jak wieczne ognisko biwakowe, które można rozpalać znów i znów.
Przykład: synchronizacja faz
import java.util.concurrent.CyclicBarrier;
public class BarrierDemo {
public static void main(String[] args) {
int parties = 3;
CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
System.out.println("Wszyscy podeszli do bariery! Zaczynamy nową fazę.");
});
for (int i = 1; i <= parties; i++) {
int id = i;
new Thread(() -> {
try {
System.out.println("Wątek " + id + " pracuje w fazie 1");
Thread.sleep(300 + id * 200);
System.out.println("Wątek " + id + " czeka na barierę");
barrier.await(); // czekamy na pozostałych
System.out.println("Wątek " + id + " pracuje w fazie 2");
Thread.sleep(200 + id * 100);
System.out.println("Wątek " + id + " czeka na barierę (2)");
barrier.await(); // czekamy ponownie
System.out.println("Wątek " + id + " zakończył pracę");
} catch (Exception e) {
System.out.println("Błąd: " + e);
}
}).start();
}
}
}
Wynik:
Wątek 1 pracuje w fazie 1
Wątek 2 pracuje w fazie 1
Wątek 3 pracuje w fazie 1
Wątek 1 czeka na barierę
Wątek 2 czeka na barierę
Wątek 3 czeka na barierę
Wszyscy podeszli do bariery! Zaczynamy nową fazę.
Wątek 1 pracuje w fazie 2
...
Działanie barierowe
Do konstruktora CyclicBarrier można przekazać działanie (Runnable), które wykona się raz, gdy wszystkie wątki podejdą do bariery (np. zaktualizować stan, wypisać log).
Pułapki: co jeśli jeden wątek padł?
Jeśli jeden z wątków wyrzuci wyjątek lub nie dotrze do bariery, pozostałe będą czekać w nieskończoność — albo dostaną BrokenBarrierException. Bariera „psuje się” i trzeba ją utworzyć ponownie.
Tak można przepisać ten rozdział w bardziej żywym, obrazowym i potocznym stylu — aby brzmiał jak naturalna kontynuacja „orkiestrowej” linii:
3. Phaser: wprawny dyrygent wielkiego koncertu
Phaser to coś w rodzaju „superbariery”. Łączy zalety CountDownLatch i CyclicBarrier, a przy tym jest znacznie bardziej elastyczny. To jak orkiestra, w której muzycy mogą przychodzić i odchodzić między częściami koncertu, a dyrygent i tak dba, by każda część zaczęła się wtedy, gdy wszyscy są gotowi.
W odróżnieniu od zwykłej bariery Phaser pracuje etapami — fazy następują jedna po drugiej. Ktoś gra tylko w pierwszej części, ktoś dołącza później, a ktoś odchodzi wcześniej — Phaser radzi sobie z tym spokojnie.
Jak to działa
Najpierw tworzy się Phaser, zwykle z podaną liczbą uczestników — parties. Każdy wątek rejestruje się (register()), wykonuje swoją partię i na końcu fazy wywołuje arriveAndAwaitAdvance() — zgłasza zakończenie i czeka na innych. Gdy wszyscy dotrą do tego punktu, Phaser przełącza się na kolejną fazę i proces się powtarza.
Jeśli uczestnik nie jest już potrzebny — może ładnie „ukłonić się” i zejść ze sceny przez arriveAndDeregister(). Nowi z kolei mogą dołączyć w trakcie koncertu — przez register().
Kiedy Phaser jest lepszy niż Barrier
Phaser warto wybrać, jeśli program żyje w więcej niż jednym rytmie:
- liczba wątków zmienia się w locie,
- są liczne etapy i nie wszyscy uczestnicy muszą brać udział w każdym,
- albo po prostu chcesz maksymalnej elastyczności bez zbędnej zabawy z ręczną synchronizacją.
W istocie Phaser to dyrygent. Nie tylko macha batutą, ale dostosowuje się do składu orkiestry, liczby części koncertu, a nawet do tego, że ktoś się spóźnił lub wyszedł wcześniej.
Przykład: przetwarzanie etapowe z dynamiczną liczbą wątków
import java.util.concurrent.Phaser;
public class PhaserDemo {
public static void main(String[] args) {
Phaser phaser = new Phaser(1); // wątek główny
for (int i = 1; i <= 3; i++) {
phaser.register(); // rejestrujemy uczestnika
int id = i;
new Thread(() -> {
for (int phase = 1; phase <= 2; phase++) {
System.out.println("Wątek " + id + " pracuje w fazie " + phase);
try { Thread.sleep(200 + id * 100); } catch (InterruptedException ignored) {}
phaser.arriveAndAwaitAdvance(); // czekamy na pozostałych
}
System.out.println("Wątek " + id + " zakończył pracę");
phaser.arriveAndDeregister(); // wychodzimy z phasera
}).start();
}
// Wątek główny też uczestniczy w fazach
for (int phase = 1; phase <= 2; phase++) {
phaser.arriveAndAwaitAdvance();
System.out.println("Wątek główny: zakończono fazę " + phase);
}
phaser.arriveAndDeregister();
System.out.println("Wszystkie fazy zakończone!");
}
}
Cechy szczególne:
- Można dodawać/usuwać uczestników w locie.
- Można poznać numer bieżącej fazy: phaser.getPhase().
- Można zakończyć phasera: phaser.forceTermination().
4. Exchanger: wymiana porcjami danych między wątkami
Exchanger<T> to synchronizator do wymiany danych między dwoma wątkami. Każdy wątek wywołuje exchange(data), a gdy oba wątki się spotkają, wymieniają się danymi.
Analogia: Dwóch kurierów spotyka się na skrzyżowaniu i wymienia paczki.
Jak to działa?
- Jeden wątek wywołuje exchange(data1) — czeka na drugi.
- Drugi wątek wywołuje exchange(data2) — oba otrzymują dane drugiego.
- Jeśli drugi wątek nie przyszedł — pierwszy czeka (można ustawić timeout).
Przykład: wymiana buforów między producer i consumer
import java.util.concurrent.Exchanger;
public class ExchangerDemo {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
// Producer
new Thread(() -> {
String data = "Dane od producer";
try {
System.out.println("Producer: wysyła dane");
String response = exchanger.exchange(data);
System.out.println("Producer: otrzymał odpowiedź: " + response);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// Consumer
new Thread(() -> {
try {
String received = exchanger.exchange("Odpowiedź od consumer");
System.out.println("Consumer: otrzymał dane: " + received);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
Wynik:
Producer: wysyła dane
Consumer: otrzymał dane: Dane od producer
Producer: otrzymał odpowiedź: Odpowiedź od consumer
Zastosowania:
- Wymiana buforów między wątkami (np. jeden czyta z pliku, drugi zapisuje do sieci).
- Synchronizacja faz między dwoma wątkami.
5. Praktyka: równoległe przetwarzanie potokowe
Zadanie: „tik” gry (fazy)
Załóżmy, że mamy kilka wątków, z których każdy odpowiada za swoją część świata gry (np. fizyka, AI, rendering). Wszystkie muszą zsynchronizować się przy każdym „tiku” (fazie), aby nie było desynchronizacji.
Rozwiązanie: Użyj CyclicBarrier lub Phaser.
import java.util.concurrent.CyclicBarrier;
public class GameTickDemo {
public static void main(String[] args) {
int subsystems = 3;
CyclicBarrier barrier = new CyclicBarrier(subsystems, () -> {
System.out.println("Wszystkie podsystemy zakończyły tik. Zaczynamy następny.");
});
for (int i = 1; i <= subsystems; i++) {
int id = i;
new Thread(() -> {
for (int tick = 1; tick <= 5; tick++) {
System.out.println("Podsystem " + id + " działa w tiku " + tick);
try { Thread.sleep(100 + id * 50); } catch (InterruptedException ignored) {}
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
}
Zadanie: „zawór” dla dużej liczby workerów
Załóżmy, że mamy 100 wątków‑workerów, które mają wystartować jednocześnie po przygotowaniu (np. test obciążenia).
Rozwiązanie: Użyj CountDownLatch.
import java.util.concurrent.CountDownLatch;
public class MassStartDemo {
public static void main(String[] args) throws InterruptedException {
int workers = 100;
CountDownLatch ready = new CountDownLatch(workers);
CountDownLatch start = new CountDownLatch(1);
for (int i = 0; i < workers; i++) {
new Thread(() -> {
System.out.println("Wątek gotów do startu");
ready.countDown(); // sygnalizujemy gotowość
try {
start.await(); // czekamy na wspólny sygnał
System.out.println("Wątek startuje!");
} catch (InterruptedException ignored) {}
}).start();
}
ready.await(); // czekamy, aż wszystkie wątki będą gotowe
System.out.println("Wszyscy gotowi! START!");
start.countDown(); // dajemy sygnał do startu
}
}
6. Typowe błędy przy pracy z synchronizatorami
Błąd nr 1: Używanie CountDownLatch jako wielorazowej bariery.
CountDownLatch jest jednorazowy! Po osiągnięciu zera nie można go „przeładować”. Do wielorazowych faz używaj CyclicBarrier lub Phaser.
Błąd nr 2: Brak obsługi wyjątków (InterruptedException, BrokenBarrierException).
Metody await() mogą wyrzucać wyjątki — zawsze je obsługuj, w przeciwnym razie wątek może „zawisnąć” albo zakończyć się błędem. Zwracaj uwagę na InterruptedException i BrokenBarrierException.
Błąd nr 3: Jeden z wątków nie dotarł do bariery.
Jeśli jeden wątek „padł” albo nie wywołał await(), pozostałe będą czekać w nieskończoność (lub dostaną BrokenBarrierException). Dopilnuj, by wszyscy uczestnicy docierali do bariery.
Błąd nr 4: Zapomniane deregister() w Phaser.
Jeśli wątek zakończył pracę, ale nie wywołał arriveAndDeregister(), Phaser będzie czekał na „martwego” uczestnika. Zawsze poprawnie usuwaj wątki z Phaser.
Błąd nr 5: Używanie Exchanger dla więcej niż dwóch wątków.
Exchanger działa tylko do wymiany między dwoma wątkami. Jeśli wątków jest więcej — skończy się zakleszczeniem (deadlock).
Błąd nr 6: Mieszanie różnych synchronizatorów bez zrozumienia ich działania.
Nie używaj jednocześnie kilku różnych barier/latchy dla tej samej grupy wątków — może to prowadzić do zamieszania i zawieszeń.
GO TO FULL VERSION