CodeGym /Kursy /JAVA 25 SELF /Potoki przetwarzania plików: producer–consumer

Potoki przetwarzania plików: producer–consumer

JAVA 25 SELF
Poziom 59 , Lekcja 4
Dostępny

1. Jak skoordynować kilka wątków przy przetwarzaniu plików

Gdy pracujesz z dużymi plikami lub złożonym przetwarzaniem danych, często pojawia się zadanie: podzielić pracę między kilka wątków. Na przykład jeden wątek czyta linie z pliku, a inne — przetwarzają te linie (szukają słów, liczą statystyki itp.).

Jaki jest problem?

  • Jeśli wszystkie wątki będą pracowały z tym samym zasobem (na przykład z plikiem lub wspólną kolekcją), łatwo o błędy: jedne wątki mogą „wyprzedzać” inne, pojawią się wyścigi, utrata danych, przeciążenie pamięci.
  • Jeśli wątków jest dużo, a koordynacji brak — jedne będą bezczynne, inne przeciążone.

Potrzebne jest rozwiązanie, które:

  • Umożliwia bezpieczną wymianę danych między wątkami.
  • Nie dopuszcza do przepełnienia pamięci (jeśli przetwarzanie jest wolniejsze niż odczyt).
  • Pozwala łatwo zakończyć pracę wszystkich wątków, gdy dane się skończą.

2. Wzorzec „Producer–Consumer” (Producent–Konsument)

Producer–Consumer to klasyczny wzorzec, który pomaga wątkom działać w sposób zgrany, nie przeszkadzając sobie nawzajem.

Są w nim dwie role: Producer (producent) tworzy dane — na przykład czyta linie z pliku lub odbiera wiadomości z sieci — i odkłada je do wspólnej kolejki. Consumer (konsument) pobiera dane z tej kolejki i je przetwarza: zlicza słowa, zapisuje do bazy albo do innego pliku.

Główna idea polega na tym, że te dwa typy wątków działają niezależnie. Producent może czytać szybciej, niż konsument nadąża przetwarzać, albo odwrotnie — i nikt nie blokuje drugiego. Pomiędzy nimi znajduje się bufor — kolejka, która wyrównuje tempo pracy.

Wizualny schemat

[Plik] --(czyta)--> [Producer] --(wkłada do kolejki)--> [BlockingQueue] --(pobiera)--> [Consumer] --(przetwarza)

3. Implementacja z BlockingQueue

W Javie do organizacji takiej wymiany idealnie pasuje interfejs BlockingQueue (na przykład implementacja ArrayBlockingQueue).

Czym jest BlockingQueue?

BlockingQueue to bezpieczna względem wątków kolejka o ograniczonym rozmiarze, która sama dba o synchronizację. Jeśli producent próbuje dodać element, a kolejka jest już pełna, wątek automatycznie się blokuje i czeka, aż zwolni się miejsce. Analogicznie, jeśli konsument próbuje pobrać element, a kolejka jest pusta, po prostu czeka, aż ktoś coś do niej włoży.

Taki mechanizm automatycznie rozwiązuje klasyczny problem „wyścigu” wątków i przepełnienia pamięci: producenci nie zasypują kolejki nadmiarem elementów, a konsumenci nie próbują pracować z pustką. Wszystkie wątki spokojnie i zgranie wykonują swoją pracę.

Przykład utworzenia kolejki

import java.util.concurrent.*;

BlockingQueue<String> queue = new ArrayBlockingQueue<>(100); // bufor na 100 elementów

100 to maksymalna liczba elementów w kolejce. Jeśli producent działa szybciej, będzie czekał, aż konsument przetworzy część danych.

4. Koordynacja wątków: backpressure i zakończenie pracy

Ograniczenie rozmiaru kolejki (backpressure)

Backpressure to mechanizm, który nie pozwala producentowi „zalać” całej pamięci, jeśli konsument nie nadąża z przetwarzaniem danych.

  • Jeśli kolejka jest pełna, producent automatycznie „przyhamowuje” (metoda put() jest blokowana).
  • Jeśli kolejka jest pusta, konsument czeka (metoda take() jest blokowana).

To pozwala systemowi działać stabilnie nawet przy różnych prędkościach wątków.

Zakończenie pracy: „poison pill” (zatruta pigułka)

Gdy producent zakończy czytanie pliku, trzeba jakoś przekazać konsumentom, że danych już nie będzie i pora się zakończyć.

Rozwiązanie:

  • Do kolejki odkłada się specjalny obiekt — „poison pill” (na przykład napis "__END__" lub inna specjalna wartość, ale nie null).
  • Konsument, otrzymawszy „poison pill”, rozumie, że pora się zakończyć.

Jeśli konsumentów jest kilka, trzeba włożyć tyle „poison pill”, ilu jest konsumentów!

5. Przykładowy potok: odczyt pliku i przetwarzanie linii

Zaimplementujmy prosty potok:

  • Jeden wątek czyta linie z pliku i odkłada je do kolejki.
  • Kilka wątków pobiera linie z kolejki, zlicza słowa i wypisuje wynik.

Krok 1. Producer — czytelnik pliku

import java.io.*;
import java.util.concurrent.*;

public class FileProducer implements Runnable {
    private final BlockingQueue<String> queue;
    private final File file;
    private final int consumerCount;
    private final String POISON_PILL = "__END__";

    public FileProducer(BlockingQueue<String> queue, File file, int consumerCount) {
        this.queue = queue;
        this.file = file;
        this.consumerCount = consumerCount;
    }

    @Override
    public void run() {
        try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
            String line;
            while ((line = reader.readLine()) != null) {
                queue.put(line); // jeśli kolejka jest pełna — czekamy
            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        } finally {
            // Wkładamy poison pill dla każdego konsumenta
            try {
                for (int i = 0; i < consumerCount; i++) {
                    queue.put(POISON_PILL);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

Krok 2. Consumer — przetwarzacz linii

import java.util.concurrent.BlockingQueue;

public class LineConsumer implements Runnable {
    private final BlockingQueue<String> queue;
    private final String POISON_PILL = "__END__";

    public LineConsumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                String line = queue.take(); // jeśli kolejka jest pusta — czekamy
                if (POISON_PILL.equals(line)) {
                    break; // zakończenie pracy
                }
                int wordCount = line.trim().isEmpty() ? 0 : line.trim().split("\\s+").length;
                System.out.println(Thread.currentThread().getName() + ": " + wordCount + " słów w wierszu: " + line);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

Krok 3. Uruchomienie potoku za pomocą Executors

import java.io.File;
import java.util.concurrent.*;

public class PipelineDemo {
    public static void main(String[] args) throws Exception {
        int consumerCount = 3;
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);

        File file = new File("input.txt"); // twój plik

        // Uruchamiamy producenta
        Thread producer = new Thread(new FileProducer(queue, file, consumerCount));
        producer.start();

        // Uruchamiamy konsumentów przez ExecutorService
        ExecutorService consumers = Executors.newFixedThreadPool(consumerCount);
        for (int i = 0; i < consumerCount; i++) {
            consumers.submit(new LineConsumer(queue));
        }

        // Czekamy na zakończenie producenta
        producer.join();

        // Czekamy na zakończenie konsumentów
        consumers.shutdown();
        consumers.awaitTermination(1, TimeUnit.MINUTES);

        System.out.println("Przetwarzanie zakończone!");
    }
}

6. Wizualizacja: schemat działania potoku

flowchart LR
    A[Producer: czyta plik] -- put() --> Q[BlockingQueue]
    Q -- take() --> C1[Consumer 1: zlicza słowa]
    Q -- take() --> C2[Consumer 2: zlicza słowa]
    Q -- take() --> C3[Consumer 3: zlicza słowa]
    A -.->|poison pill| Q
    Q -.->|poison pill| C1
    Q -.->|poison pill| C2
    Q -.->|poison pill| C3

7. Praktyczne wskazówki i dobre praktyki

  • Ograniczaj rozmiar kolejki — to chroni przed przepełnieniem pamięci.
  • Używaj „poison pill” do zakończenia — inaczej konsumenci mogą zawisnąć na zawsze.
  • Nie używaj null jako „poison pill”, jeśli w kolejce mogą pojawiać się prawdziwe wartości null. Lepiej użyć specjalnego napisu lub obiektu.
  • Obsługuj InterruptedException — to ważne dla poprawnego zakończenia wątków.
  • Używaj ExecutorService do zarządzania pulą wątków — to prostsze i bezpieczniejsze niż ręczne tworzenie wątków.

8. Typowe błędy przy implementacji potoku

Błąd nr 1: Nieograniczona kolejka → OutOfMemoryError. Jeśli użyjesz LinkedBlockingQueue bez ograniczenia rozmiaru, producent może „zalać” całą pamięć, jeśli konsument nie nadąża.

Błąd nr 2: Brak „poison pill” → konsumenci się zawieszają. Jeśli zapomnisz włożyć „zatrute pigułki”, wątki-konsumenci będą czekać na nowe dane w nieskończoność.

Błąd nr 3: Włożono tylko jedną „poison pill”, a konsumentów jest kilka. Każdy wątek-konsument potrzebuje własnej „pigułki” — inaczej nie wszyscy się zakończą.

Błąd nr 4: Nieobsłużone InterruptedException. Jeśli wątek został przerwany, trzeba poprawnie zakończyć pracę (przywrócić znacznik przerwania), w przeciwnym razie możliwe są „zawieszone” wątki.

Błąd nr 5: Wspólna zmienna między wątkami bez synchronizacji. Nie próbuj „wynajdować koła na nowo” — używaj BlockingQueue, a nie zwykłych list lub tablic.

1
Ankieta/quiz
Równoległa praca z plikami, poziom 59, lekcja 4
Niedostępny
Równoległa praca z plikami
Równoległa praca z plikami
Komentarze
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION