Qual è il modo più semplice per parallelizzare un’attività in java?

Diciamo che ho un compito come:

for(Object object: objects) { Result result = compute(objects); list.add(result); } 

Qual è il modo più semplice per parallelizzare ogni calcolo () (assumendo che siano già parallelizzabili)?

Non ho bisogno di una risposta che corrisponda rigorosamente al codice sopra, solo una risposta generale. Ma se hai bisogno di maggiori informazioni: i miei compiti sono legati all’IO e questo è per un’applicazione Spring Web e le attività verranno eseguite in una richiesta HTTP.

Consiglierei di dare un’occhiata a ExecutorService .

In particolare, qualcosa del genere:

 ExecutorService EXEC = Executors.newCachedThreadPool(); List> tasks = new ArrayList>(); for (final Object object: objects) { Callable c = new Callable() { @Override public Result call() throws Exception { return compute(object); } }; tasks.add(c); } List> results = EXEC.invokeAll(tasks); 

Si noti che l’uso di newCachedThreadPool potrebbe essere negativo se gli objects sono una grande lista. Un pool di thread memorizzato nella cache potrebbe creare un thread per attività! Potresti voler usare newFixedThreadPool(n) dove n è qualcosa di ragionevole (come il numero di core che hai, assumendo che compute() sia legato alla CPU).

Ecco il codice completo che viene eseguito in realtà:

 import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ExecutorServiceExample { private static final Random PRNG = new Random(); private static class Result { private final int wait; public Result(int code) { this.wait = code; } } public static Result compute(Object obj) throws InterruptedException { int wait = PRNG.nextInt(3000); Thread.sleep(wait); return new Result(wait); } public static void main(String[] args) throws InterruptedException, ExecutionException { List objects = new ArrayList(); for (int i = 0; i < 100; i++) { objects.add(new Object()); } List> tasks = new ArrayList>(); for (final Object object : objects) { Callable c = new Callable() { @Override public Result call() throws Exception { return compute(object); } }; tasks.add(c); } ExecutorService exec = Executors.newCachedThreadPool(); // some other exectuors you could try to see the different behaviours // ExecutorService exec = Executors.newFixedThreadPool(3); // ExecutorService exec = Executors.newSingleThreadExecutor(); try { long start = System.currentTimeMillis(); List> results = exec.invokeAll(tasks); int sum = 0; for (Future fr : results) { sum += fr.get().wait; System.out.println(String.format("Task waited %d ms", fr.get().wait)); } long elapsed = System.currentTimeMillis() - start; System.out.println(String.format("Elapsed time: %d ms", elapsed)); System.out.println(String.format("... but compute tasks waited for total of %d ms; speed-up of %.2fx", sum, sum / (elapsed * 1d))); } finally { exec.shutdown(); } } } 

Per una risposta più dettagliata, leggi Concurrency Java in Practice e usa java.util.concurrent .

Ecco qualcosa che uso nei miei progetti:

 public class ParallelTasks { private final Collection tasks = new ArrayList(); public ParallelTasks() { } public void add(final Runnable task) { tasks.add(task); } public void go() throws InterruptedException { final ExecutorService threads = Executors.newFixedThreadPool(Runtime.getRuntime() .availableProcessors()); try { final CountDownLatch latch = new CountDownLatch(tasks.size()); for (final Runnable task : tasks) threads.execute(new Runnable() { public void run() { try { task.run(); } finally { latch.countDown(); } } }); latch.await(); } finally { threads.shutdown(); } } } // ... public static void main(final String[] args) throws Exception { ParallelTasks tasks = new ParallelTasks(); final Runnable waitOneSecond = new Runnable() { public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { } } }; tasks.add(waitOneSecond); tasks.add(waitOneSecond); tasks.add(waitOneSecond); tasks.add(waitOneSecond); final long start = System.currentTimeMillis(); tasks.go(); System.err.println(System.currentTimeMillis() - start); } 

Che stampa un po ‘più di 2000 sulla mia scatola dual-core.

È ansible utilizzare ThreadPoolExecutor . Ecco un esempio di codice: http://programmingexamples.wikidot.com/threadpoolexecutor (troppo lungo per portarlo qui)

L’ array parallelo Fork / Join è un’opzione

Si può semplicemente creare qualche thread e ottenere il risultato.

 Thread t = new Mythread(object); if (t.done()) { // get result // add result } 

EDIT: Penso che altre soluzioni sono più cool.

Stavo per menzionare una class esecutore. Ecco alcuni esempi di codice che inseriresti nella class executor.

  private static ExecutorService threadLauncher = Executors.newFixedThreadPool(4); private List> callableList = new ArrayList>(); public void addCallable(Callable callable) { this.callableList.add(callable); } public void clearCallables(){ this.callableList.clear(); } public void executeThreads(){ try { threadLauncher.invokeAll(this.callableList); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } public Object[] getResult() { List> resultList = null; Object[] resultArray = null; try { resultList = threadLauncher.invokeAll(this.callableList); resultArray = new Object[resultList.size()]; for (int i = 0; i < resultList.size(); i++) { resultArray[i] = resultList.get(i).get(); } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } return resultArray; } 

Quindi per usarlo si farebbero chiamate alla class executor per popolarlo ed eseguirlo.

 executor.addCallable( some implementation of callable) // do this once for each task Object[] results = executor.getResult(); 

Con Java8 e versioni successive è ansible creare uno stream e quindi eseguire l’elaborazione in parallelo con parallelStream :

 List objects = ...; List result = objects.parallelStream().map(object -> { return compute(object); }).collect(Collectors.toList()); 

Nota: l’ordine dei risultati potrebbe non corrispondere all’ordine degli oggetti nell’elenco.

Dettagli su come impostare il giusto numero di thread sono disponibili in questa domanda StackOverflow how-many-threads-are-spawned-in-parallelstream-in-java-8