CodeGym /Kursy /JAVA 25 SELF /Synchronizatory wysokiego poziomu

Synchronizatory wysokiego poziomu

JAVA 25 SELF
Poziom 58 , Lekcja 2
Dostępny

1. CountDownLatch: start na sygnał

W świecie wielowątkowym często trzeba zorganizować zgraną pracę grupy wątków — tak, aby wszystkie zaczęły, skończyły lub przeszły do kolejnego etapu razem. Na przykład:

Wyobraź sobie wyścig. Samochody stoją na starcie — ktoś już rozgrzał silnik, ktoś jeszcze sprawdza opony. Ale dopóki sędzia nie machnie flagą, nikt nie rusza. To właśnie zadanie koordynacji.

Albo inny przykład: przygotowujesz kolację z przyjaciółmi — ktoś kroi warzywa, ktoś stawia wodę, ktoś szuka, gdzie podziała się sól. Najważniejsze, by wszyscy skończyli przygotowania, zanim zaczniecie gotować.

Do takich przypadków Java daje gotowe narzędzia synchronizacji — bezpieczne, zrozumiałe i bez bólu z wait() i notify(). Jednym z najprzydatniejszych jest CountDownLatch. Działa jak blokada-licznik: dopóki nie spadnie do zera, „drzwi” są zamknięte i nikt nie idzie dalej. A kiedy wszyscy się zameldują — latch otwiera się i wątki synchronicznie ruszają do boju.

CountDownLatch

CountDownLatch to „jednorazowy zawór”, który pozwala jednemu lub kilku wątkom czekać, aż inne wątki zakończą określoną liczbę operacji.

To jak start maratonu: wszyscy biegacze stoją na linii i czekają na strzał z pistoletu startowego. Gdy tylko sędzia strzeli (countdown dojdzie do zera) — wszyscy biegną.

Jak to w ogóle działa

CountDownLatch to jak gwizdek startowy dla wątków. Przy tworzeniu podajesz liczbę — na przykład 3. To jak trzy sygnały, które trzeba otrzymać, zanim zacznie się wyścig.

Wątki, które mają czekać na start, wywołują await(). Stoją na linii i są gotowe ruszyć, ale trzymają hamulec. Inne wątki, wykonując przygotowania, w miarę gotowości wywołują countDown() — jakby mówiły: „Jestem gotowy!”.

Gdy licznik dojdzie do zera — bam! — wszystkie oczekujące wątki startują równocześnie.

Ale pamiętaj: CountDownLatch jest jednorazowy. Po tym, jak licznik spadnie do zera, nie da się go przywrócić. To nie rewolwer, tylko petarda: huknie — i koniec.

Przykład: oczekiwanie zakończenia N zadań

import java.util.concurrent.CountDownLatch;

public class LatchDemo {
    public static void main(String[] args) throws InterruptedException {
        int workers = 3;
        CountDownLatch latch = new CountDownLatch(workers);

        for (int i = 1; i <= workers; i++) {
            int id = i;
            new Thread(() -> {
                System.out.println("Pracownik " + id + " rozpoczął pracę");
                try { Thread.sleep(500 + id * 200); } catch (InterruptedException ignored) {}
                System.out.println("Pracownik " + id + " zakończył pracę");
                latch.countDown(); // zmniejszamy licznik
            }).start();
        }

        System.out.println("Główny wątek czeka na zakończenie wszystkich pracowników...");
        latch.await(); // czekamy, aż wszyscy pracownicy zakończą
        System.out.println("Wszyscy pracownicy skończyli! Kontynuujemy pracę główną.");
    }
}

Wynik:

Główny wątek czeka na zakończenie wszystkich pracowników...
Pracownik 1 rozpoczął pracę
Pracownik 2 rozpoczął pracę
Pracownik 3 rozpoczął pracę
Pracownik 1 zakończył pracę
Pracownik 2 zakończył pracę
Pracownik 3 zakończył pracę
Wszyscy pracownicy skończyli! Kontynuujemy pracę główną.

Przykład: jednoczesny start „na sygnał”

CountDownLatch startSignal = new CountDownLatch(1);

for (int i = 0; i < 5; i++) {
    new Thread(() -> {
        try {
            System.out.println(Thread.currentThread().getName() + " czeka na start");
            startSignal.await(); // czekamy na sygnał
            System.out.println(Thread.currentThread().getName() + " startuje!");
        } catch (InterruptedException ignored) {}
    }).start();
}

Thread.sleep(1000);
System.out.println("Sygnał do startu!");
startSignal.countDown(); // wszystkie wątki startują jednocześnie

2. CyclicBarrier: wielokrotne fazy i działania barierowe

CyclicBarrier: spotykamy się przy ognisku

CyclicBarrier to miejsce spotkania wątków. Każdy biegnie własną trasą, robi coś swojego, a potem wszyscy zbierają się przy „barierze” — jak przy ognisku w górach. Gdy zbiorą się wszyscy, bariera się otwiera, a grupa idzie dalej.

Główna różnica w porównaniu z CountDownLatch — tę barierę można używać w kółko. Po każdym wspólnym postoju „przeładowuje się” i drużyna może iść do kolejnego etapu.

Wyobraźmy sobie: Grupa turystów idzie długim szlakiem. Każdy porusza się w swoim tempie: ktoś fotografuje motyle, ktoś szuka Wi‑Fi. Ale na każdym przełęczy spotykają się przy ognisku, czekają na siebie i decydują, dokąd iść dalej. Tak właśnie działa CyclicBarrier.

Jak to działa

Tworzysz barierę i podajesz, ilu uczestników ma się zebrać, na przykład 4. Każdy wątek, docierając do punktu kontrolnego, wywołuje await() — i czeka na resztę. Gdy cała czwórka się zbierze, bariera „pyka” i puszcza wszystkich dalej.

Można nawet ustawić „działanie barierowe” — kawałek kodu, który wykona się dokładnie raz, gdy grupa się zbierze. Na przykład rozpalić to ognisko albo zapisać log: „Etap zakończony, idziemy dalej”. W tym celu do konstruktora przekazuje się Runnable.

Ważne: w przeciwieństwie do jednorazowego CountDownLatch, CyclicBarrier jest wielorazowy. Po każdym „zebraniu” znów jest gotowy na następny etap — jak wieczne ognisko biwakowe, które można rozpalać znów i znów.

Przykład: synchronizacja faz

import java.util.concurrent.CyclicBarrier;

public class BarrierDemo {
    public static void main(String[] args) {
        int parties = 3;
        CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
            System.out.println("Wszyscy podeszli do bariery! Zaczynamy nową fazę.");
        });

        for (int i = 1; i <= parties; i++) {
            int id = i;
            new Thread(() -> {
                try {
                    System.out.println("Wątek " + id + " pracuje w fazie 1");
                    Thread.sleep(300 + id * 200);
                    System.out.println("Wątek " + id + " czeka na barierę");
                    barrier.await(); // czekamy na pozostałych

                    System.out.println("Wątek " + id + " pracuje w fazie 2");
                    Thread.sleep(200 + id * 100);
                    System.out.println("Wątek " + id + " czeka na barierę (2)");
                    barrier.await(); // czekamy ponownie

                    System.out.println("Wątek " + id + " zakończył pracę");
                } catch (Exception e) {
                    System.out.println("Błąd: " + e);
                }
            }).start();
        }
    }
}

Wynik:

Wątek 1 pracuje w fazie 1
Wątek 2 pracuje w fazie 1
Wątek 3 pracuje w fazie 1
Wątek 1 czeka na barierę
Wątek 2 czeka na barierę
Wątek 3 czeka na barierę
Wszyscy podeszli do bariery! Zaczynamy nową fazę.
Wątek 1 pracuje w fazie 2
...

Działanie barierowe

Do konstruktora CyclicBarrier można przekazać działanie (Runnable), które wykona się raz, gdy wszystkie wątki podejdą do bariery (np. zaktualizować stan, wypisać log).

Pułapki: co jeśli jeden wątek padł?

Jeśli jeden z wątków wyrzuci wyjątek lub nie dotrze do bariery, pozostałe będą czekać w nieskończoność — albo dostaną BrokenBarrierException. Bariera „psuje się” i trzeba ją utworzyć ponownie.

Tak można przepisać ten rozdział w bardziej żywym, obrazowym i potocznym stylu — aby brzmiał jak naturalna kontynuacja „orkiestrowej” linii:

3. Phaser: wprawny dyrygent wielkiego koncertu

Phaser to coś w rodzaju „superbariery”. Łączy zalety CountDownLatch i CyclicBarrier, a przy tym jest znacznie bardziej elastyczny. To jak orkiestra, w której muzycy mogą przychodzić i odchodzić między częściami koncertu, a dyrygent i tak dba, by każda część zaczęła się wtedy, gdy wszyscy są gotowi.

W odróżnieniu od zwykłej bariery Phaser pracuje etapami — fazy następują jedna po drugiej. Ktoś gra tylko w pierwszej części, ktoś dołącza później, a ktoś odchodzi wcześniej — Phaser radzi sobie z tym spokojnie.

Jak to działa

Najpierw tworzy się Phaser, zwykle z podaną liczbą uczestników — parties. Każdy wątek rejestruje się (register()), wykonuje swoją partię i na końcu fazy wywołuje arriveAndAwaitAdvance() — zgłasza zakończenie i czeka na innych. Gdy wszyscy dotrą do tego punktu, Phaser przełącza się na kolejną fazę i proces się powtarza.

Jeśli uczestnik nie jest już potrzebny — może ładnie „ukłonić się” i zejść ze sceny przez arriveAndDeregister(). Nowi z kolei mogą dołączyć w trakcie koncertu — przez register().

Kiedy Phaser jest lepszy niż Barrier

Phaser warto wybrać, jeśli program żyje w więcej niż jednym rytmie:

  • liczba wątków zmienia się w locie,
  • są liczne etapy i nie wszyscy uczestnicy muszą brać udział w każdym,
  • albo po prostu chcesz maksymalnej elastyczności bez zbędnej zabawy z ręczną synchronizacją.

W istocie Phaser to dyrygent. Nie tylko macha batutą, ale dostosowuje się do składu orkiestry, liczby części koncertu, a nawet do tego, że ktoś się spóźnił lub wyszedł wcześniej.

Przykład: przetwarzanie etapowe z dynamiczną liczbą wątków

import java.util.concurrent.Phaser;

public class PhaserDemo {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(1); // wątek główny

        for (int i = 1; i <= 3; i++) {
            phaser.register(); // rejestrujemy uczestnika
            int id = i;
            new Thread(() -> {
                for (int phase = 1; phase <= 2; phase++) {
                    System.out.println("Wątek " + id + " pracuje w fazie " + phase);
                    try { Thread.sleep(200 + id * 100); } catch (InterruptedException ignored) {}
                    phaser.arriveAndAwaitAdvance(); // czekamy na pozostałych
                }
                System.out.println("Wątek " + id + " zakończył pracę");
                phaser.arriveAndDeregister(); // wychodzimy z phasera
            }).start();
        }

        // Wątek główny też uczestniczy w fazach
        for (int phase = 1; phase <= 2; phase++) {
            phaser.arriveAndAwaitAdvance();
            System.out.println("Wątek główny: zakończono fazę " + phase);
        }
        phaser.arriveAndDeregister();
        System.out.println("Wszystkie fazy zakończone!");
    }
}

Cechy szczególne:

  • Można dodawać/usuwać uczestników w locie.
  • Można poznać numer bieżącej fazy: phaser.getPhase().
  • Można zakończyć phasera: phaser.forceTermination().

4. Exchanger: wymiana porcjami danych między wątkami

Exchanger<T> to synchronizator do wymiany danych między dwoma wątkami. Każdy wątek wywołuje exchange(data), a gdy oba wątki się spotkają, wymieniają się danymi.

Analogia: Dwóch kurierów spotyka się na skrzyżowaniu i wymienia paczki.

Jak to działa?

  • Jeden wątek wywołuje exchange(data1) — czeka na drugi.
  • Drugi wątek wywołuje exchange(data2) — oba otrzymują dane drugiego.
  • Jeśli drugi wątek nie przyszedł — pierwszy czeka (można ustawić timeout).

Przykład: wymiana buforów między producer i consumer

import java.util.concurrent.Exchanger;

public class ExchangerDemo {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();

        // Producer
        new Thread(() -> {
            String data = "Dane od producer";
            try {
                System.out.println("Producer: wysyła dane");
                String response = exchanger.exchange(data);
                System.out.println("Producer: otrzymał odpowiedź: " + response);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        // Consumer
        new Thread(() -> {
            try {
                String received = exchanger.exchange("Odpowiedź od consumer");
                System.out.println("Consumer: otrzymał dane: " + received);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

Wynik:

Producer: wysyła dane
Consumer: otrzymał dane: Dane od producer
Producer: otrzymał odpowiedź: Odpowiedź od consumer

Zastosowania:

  • Wymiana buforów między wątkami (np. jeden czyta z pliku, drugi zapisuje do sieci).
  • Synchronizacja faz między dwoma wątkami.

5. Praktyka: równoległe przetwarzanie potokowe

Zadanie: „tik” gry (fazy)

Załóżmy, że mamy kilka wątków, z których każdy odpowiada za swoją część świata gry (np. fizyka, AI, rendering). Wszystkie muszą zsynchronizować się przy każdym „tiku” (fazie), aby nie było desynchronizacji.

Rozwiązanie: Użyj CyclicBarrier lub Phaser.

import java.util.concurrent.CyclicBarrier;

public class GameTickDemo {
    public static void main(String[] args) {
        int subsystems = 3;
        CyclicBarrier barrier = new CyclicBarrier(subsystems, () -> {
            System.out.println("Wszystkie podsystemy zakończyły tik. Zaczynamy następny.");
        });

        for (int i = 1; i <= subsystems; i++) {
            int id = i;
            new Thread(() -> {
                for (int tick = 1; tick <= 5; tick++) {
                    System.out.println("Podsystem " + id + " działa w tiku " + tick);
                    try { Thread.sleep(100 + id * 50); } catch (InterruptedException ignored) {}
                    try {
                        barrier.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
}

Zadanie: „zawór” dla dużej liczby workerów

Załóżmy, że mamy 100 wątków‑workerów, które mają wystartować jednocześnie po przygotowaniu (np. test obciążenia).

Rozwiązanie: Użyj CountDownLatch.

import java.util.concurrent.CountDownLatch;

public class MassStartDemo {
    public static void main(String[] args) throws InterruptedException {
        int workers = 100;
        CountDownLatch ready = new CountDownLatch(workers);
        CountDownLatch start = new CountDownLatch(1);

        for (int i = 0; i < workers; i++) {
            new Thread(() -> {
                System.out.println("Wątek gotów do startu");
                ready.countDown(); // sygnalizujemy gotowość
                try {
                    start.await(); // czekamy na wspólny sygnał
                    System.out.println("Wątek startuje!");
                } catch (InterruptedException ignored) {}
            }).start();
        }

        ready.await(); // czekamy, aż wszystkie wątki będą gotowe
        System.out.println("Wszyscy gotowi! START!");
        start.countDown(); // dajemy sygnał do startu
    }
}

6. Typowe błędy przy pracy z synchronizatorami

Błąd nr 1: Używanie CountDownLatch jako wielorazowej bariery.
CountDownLatch jest jednorazowy! Po osiągnięciu zera nie można go „przeładować”. Do wielorazowych faz używaj CyclicBarrier lub Phaser.

Błąd nr 2: Brak obsługi wyjątków (InterruptedException, BrokenBarrierException).
Metody await() mogą wyrzucać wyjątki — zawsze je obsługuj, w przeciwnym razie wątek może „zawisnąć” albo zakończyć się błędem. Zwracaj uwagę na InterruptedException i BrokenBarrierException.

Błąd nr 3: Jeden z wątków nie dotarł do bariery.
Jeśli jeden wątek „padł” albo nie wywołał await(), pozostałe będą czekać w nieskończoność (lub dostaną BrokenBarrierException). Dopilnuj, by wszyscy uczestnicy docierali do bariery.

Błąd nr 4: Zapomniane deregister() w Phaser.
Jeśli wątek zakończył pracę, ale nie wywołał arriveAndDeregister(), Phaser będzie czekał na „martwego” uczestnika. Zawsze poprawnie usuwaj wątki z Phaser.

Błąd nr 5: Używanie Exchanger dla więcej niż dwóch wątków.
Exchanger działa tylko do wymiany między dwoma wątkami. Jeśli wątków jest więcej — skończy się zakleszczeniem (deadlock).

Błąd nr 6: Mieszanie różnych synchronizatorów bez zrozumienia ich działania.
Nie używaj jednocześnie kilku różnych barier/latchy dla tej samej grupy wątków — może to prowadzić do zamieszania i zawieszeń.

Komentarze
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION