21import java.io.IOException;
22import java.util.ArrayList;
23import java.util.HashMap;
26import java.util.concurrent.ArrayBlockingQueue;
27import java.util.concurrent.ExecutionException;
28import java.util.concurrent.Future;
29import java.util.concurrent.RejectedExecutionHandler;
30import java.util.concurrent.ThreadPoolExecutor;
31import java.util.concurrent.TimeUnit;
32import java.util.logging.Level;
33import java.util.logging.Logger;
35import org.apache.commons.lang3.time.StopWatch;
37import denoptim.constants.DENOPTIMConstants;
38import denoptim.exception.DENOPTIMException;
55 private final Map<Task,Future<Object>>
futures;
71 final ThreadPoolExecutor
tpe;
98 futures =
new HashMap<Task,Future<Object>>();
106 TimeUnit.NANOSECONDS,
107 new ArrayBlockingQueue<Runnable>(1));
109 Runtime.getRuntime().addShutdownHook(
new Thread()
118 if (!
tpe.awaitTermination(30, TimeUnit.SECONDS))
123 if (!
tpe.awaitTermination(60, TimeUnit.SECONDS))
128 catch (InterruptedException ie)
135 Thread.currentThread().interrupt();
141 tpe.setRejectedExecutionHandler(
new RejectedExecutionHandler()
144 public void rejectedExecution(Runnable r,
145 ThreadPoolExecutor executor)
150 executor.getQueue().put(r);
152 catch (InterruptedException ex)
154 ex.printStackTrace();
155 String msg =
"EXCEPTION in rejectedExecution.";
156 logger.log(Level.WARNING,msg);
183 boolean hasException =
false;
186 if (tsk.foundException())
215 boolean allDone =
true;
218 if (!tsk.isCompleted())
236 StopWatch watch =
new StopWatch();
243 tpe.prestartAllCoreThreads();
256 while (!
tpe.awaitTermination(5, TimeUnit.SECONDS))
261 catch (InterruptedException ex)
274 msg =
"Overall time: " + watch.toString() +
". "
275 + DENOPTIMConstants.EOL
276 + this.getClass().getSimpleName() +
" run completed."
278 logger.log(Level.INFO, msg);
287 String msg = task.getClass().getSimpleName() +
" "
288 + task.
getId() +
" submitted.";
289 if (logFilePathname!=
null && !logFilePathname.isBlank())
291 msg = msg +
" Log file: " + logFilePathname;
293 logger.log(Level.INFO, msg);
324 List<Task> completed =
new ArrayList<Task>();
332 for (
Task t : completed)
337 }
catch (InterruptedException | ExecutionException e)
340 String msg =
"EXCEPTION upon retrieving results from completed "
341 +
"task. The result from task '" + t.toString() +
"' "
343 logger.log(Level.WARNING,msg);
368 tpe.getQueue().clear();
General set of constants used in DENOPTIM.
static final String EOL
new line character
Runs tasks parallel in asynchronous fashion.
void cleanupCompleted()
Removes only tasks that are marked as completed.
int numThreads
Numner of threads run in parallel.
abstract boolean doPreFlightOperations()
void submitTask(Task task, String logFilePathname)
abstract boolean doPostFlightOperations()
Throwable thrownByTask
If any, here we stores the exception returned by a subtask.
void run()
Run the parallelized task.
boolean allTasksCompleted()
Check for completion of all subtasks.
final Map< Task, Future< Object > > futures
Storage of references to the submitted subtasks and their future returned value.
void stopRun()
Stops all subtasks and shutdown executor.
final List< Task > submitted
Storage of references to the submitted subtasks.
Throwable getExceptionFromSubTask()
void cleanup()
clean all reference to submitted tasks
abstract void createAndSubmitTasks()
Implementations of this method must call the submitTask(Task, String) method to actually send the tas...
boolean subtaskHasException()
Looks for exceptions in the subtasks and, if any, store its reference locally to allow reporting it b...
ParallelAsynchronousTaskExecutor(int numberOfTasks, Logger logger)
Constructor.
final List< Object > results
List of object returned by completed tasks.
final ThreadPoolExecutor tpe
Asynchronous tasks manager.
A task that can throw exceptions.