De ce ai putea avea nevoie de un ExecutorService pentru 1 fir?

Puteți utiliza metoda Executors.newSingleThreadExecutor pentru a crea un ExecutorService cu un pool care include un singur fir. Logica piscinei este următoarea:

  • Serviciul execută o singură sarcină la un moment dat.
  • Dacă trimitem N sarcini pentru execuție, toate cele N sarcini vor fi executate una după alta de către un singur thread.
  • Dacă firul de execuție este întrerupt, va fi creat un nou fir pentru a executa toate sarcinile rămase.

Să ne imaginăm o situație în care programul nostru necesită următoarele funcționalități:

Trebuie să procesăm cererile utilizatorilor în 30 de secunde, dar nu mai mult de o solicitare pe unitatea de timp.

Creăm o clasă de activități pentru procesarea unei cereri de utilizator:


class Task implements Runnable {
   private final int taskNumber;

   public Task(int taskNumber) {
       this.taskNumber = taskNumber;
   }

   @Override
   public void run() {
       try {
           Thread.sleep(1000);
       } catch (InterruptedException ignored) {
       }
       System.out.printf("Processed request #%d on thread id=%d\\n", taskNumber, Thread.currentThread().getId());
   }
}
    

Clasa modelează comportamentul procesării unei cereri primite și afișează numărul acesteia.

Apoi, în metoda principală , creăm un ExecutorService pentru 1 fir, pe care îl vom folosi pentru a procesa secvenţial cererile primite. Deoarece condițiile sarcinii stipulează „în termen de 30 de secunde”, adăugăm 30 de secunde de așteptare și apoi oprim forțat ExecutorService .


public static void main(String[] args) throws InterruptedException {
   ExecutorService executorService = Executors.newSingleThreadExecutor();

   for (int i = 0; i < 1_000; i++) {
       executorService.execute(new Task(i));
   }
   executorService.awaitTermination(30, TimeUnit.SECONDS);
   executorService.shutdownNow();
}
    

După pornirea programului, consola afișează mesaje despre procesarea cererilor:

Cererea #0 procesată pe id=16
Cererea procesată #1 pe id=16
Cererea procesată #2 pe id=16

Solicitarea #29 pe id=16

După procesarea cererilor timp de 30 de secunde, executorService apelează metoda shutdownNow() , care oprește sarcina curentă (cea care se execută) și anulează toate sarcinile în așteptare. După aceea, programul se încheie cu succes.

Dar totul nu este întotdeauna atât de perfect, deoarece programul nostru ar putea avea cu ușurință o situație în care una dintre sarcinile preluate de singurul thread al pool-ului nostru funcționează incorect și chiar încheie firul nostru. Putem simula această situație pentru a ne da seama cum funcționează executorService cu un singur fir în acest caz.

Pentru a face acest lucru, în timp ce una dintre sarcini este în curs de executare, încheiem firul nostru folosind metoda nesigură și învechită Thread.currentThread().stop() . Facem acest lucru în mod intenționat pentru a simula situația în care una dintre sarcini încheie firul.

Vom schimba metoda de rulare în clasa Task :


@Override
public void run() {
   try {
       Thread.sleep(1000);
   } catch (InterruptedException ignored) {
   }

   if (taskNumber == 5) {
       Thread.currentThread().stop();
   }

   System.out.printf("Processed request #%d on thread id=%d\\n", taskNumber, Thread.currentThread().getId());
}
    

Vom întrerupe sarcina #5.

Să vedem cum arată rezultatul cu firul întrerupt la sfârșitul sarcinii #5:

Cererea procesată #0 pe id-ul firului=16
Cererea procesată #1 pe id-ul firului=16
Cererea procesată #2 pe id-ul firului=16
Cererea procesată #3 pe id-ul firului=16
Cererea procesată #4 pe id-ul firului=16 Cererea
#6 procesată pe thread id=17
Cerere procesată #7 pe thread id=17

Procesat cerere #29 pe thread id=17

Vedem că după ce firul de execuție este întrerupt la sfârșitul sarcinii 5, sarcinile încep să fie executate într-un thread al cărui identificator este 17, deși anterior fuseseră executate pe thread-ul cu identificatorul căruia este 16. Și pentru că pool-ul nostru are un un singur thread, acest lucru poate însemna doar un lucru: executorService a înlocuit firul oprit cu unul nou și a continuat să execute sarcinile.

Astfel, ar trebui să folosim newSingleThreadExecutor cu un pool cu ​​un singur thread atunci când dorim să procesăm sarcini secvențial și doar una câte una și dorim să continuăm procesarea sarcinilor din coadă indiferent de finalizarea sarcinii anterioare (de exemplu, cazul în care din sarcinile noastre omoara firul).

ThreadFactory

Când vorbim de crearea și recrearea de fire, nu putem să nu menționămThreadFactory.

AThreadFactoryeste un obiect care creează fire noi la cerere.

Ne putem crea propria fabrică de creare a firelor și putem transmite o instanță a acesteia metodei Executors.newSingleThreadExecutor(ThreadFactory threadFactory) .


ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "MyThread");
            }
        });
                    
Ignorăm metoda de creare a unui fir nou, pasând un nume de fir constructorului.

ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "MyThread");
                thread.setPriority(Thread.MAX_PRIORITY);
                return thread;
            }
        });
                    
Am schimbat numele și prioritatea firului creat.

Deci vedem că avem 2 metode Executors.newSingleThreadExecutor supraîncărcate. Unul fără parametri și al doilea cu un parametru ThreadFactory .

Folosind o ThreadFactory , puteți configura firele create după cum este necesar, de exemplu, setând priorități, folosind subclase de fire, adăugând un UncaughtExceptionHandler la fir și așa mai departe.