$darkmode
DENOPTIM
ParallelAsynchronousTaskExecutor.java
Go to the documentation of this file.
1/*
2 * DENOPTIM
3 * Copyright (C) 2019 Marco Foscato <marco.foscato@uib.no>
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as published
7 * by the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18
19package denoptim.task;
20
21import java.io.IOException;
22import java.util.ArrayList;
23import java.util.HashMap;
24import java.util.List;
25import java.util.Map;
26import java.util.concurrent.ArrayBlockingQueue;
27import java.util.concurrent.ExecutionException;
28import java.util.concurrent.Future;
29import java.util.concurrent.RejectedExecutionHandler;
30import java.util.concurrent.ThreadPoolExecutor;
31import java.util.concurrent.TimeUnit;
32import java.util.logging.Level;
33import java.util.logging.Logger;
34
35import org.apache.commons.lang3.time.StopWatch;
36
37import denoptim.constants.DENOPTIMConstants;
38import denoptim.exception.DENOPTIMException;
39
40
48{
55 private final Map<Task,Future<Object>> futures;
56
61 private final List<Task> submitted;
62
66 protected final List<Object> results;
67
71 final ThreadPoolExecutor tpe;
72
76 private Throwable thrownByTask;
77
81 private Logger logger;
82
86 private int numThreads = 1;
87
88
89//-----------------------------------------------------------------------------
90
95 public ParallelAsynchronousTaskExecutor(int numberOfTasks, Logger logger)
96 {
97 this.logger = logger;
98 futures = new HashMap<Task,Future<Object>>();
99 submitted = new ArrayList<>();
100 results = new ArrayList<>();
101 numThreads = numberOfTasks;
102
103 tpe = new ThreadPoolExecutor(numThreads,
105 Long.MAX_VALUE,
106 TimeUnit.NANOSECONDS,
107 new ArrayBlockingQueue<Runnable>(1));
108
109 Runtime.getRuntime().addShutdownHook(new Thread()
110 {
111 @Override
112 public void run()
113 {
114 tpe.shutdown(); // Disable new tasks from being submitted
115 try
116 {
117 // Wait a while for existing tasks to terminate
118 if (!tpe.awaitTermination(30, TimeUnit.SECONDS))
119 {
120 tpe.shutdownNow(); // Cancel currently executing tasks
121 }
122
123 if (!tpe.awaitTermination(60, TimeUnit.SECONDS))
124 {
125 // pool didn't terminate after the second try
126 }
127 }
128 catch (InterruptedException ie)
129 {
131 cleanup();
132 // (Re-)Cancel if current thread also interrupted
133 tpe.shutdownNow();
134 // Preserve interrupt status
135 Thread.currentThread().interrupt();
136 }
137 }
138 });
139
140 // by default the ThreadPoolExecutor will throw an exception
141 tpe.setRejectedExecutionHandler(new RejectedExecutionHandler()
142 {
143 @Override
144 public void rejectedExecution(Runnable r,
145 ThreadPoolExecutor executor)
146 {
147 try
148 {
149 // this will block if the queue is full
150 executor.getQueue().put(r);
151 }
152 catch (InterruptedException ex)
153 {
154 ex.printStackTrace();
155 String msg = "EXCEPTION in rejectedExecution.";
156 logger.log(Level.WARNING,msg);
157 }
158 }
159 });
160 }
161
162//------------------------------------------------------------------------------
163
167 public void stopRun()
168 {
170 cleanup();
171 tpe.shutdown();
172 }
173
174//------------------------------------------------------------------------------
175
181 protected boolean subtaskHasException()
182 {
183 boolean hasException = false;
184 for (Task tsk : submitted)
185 {
186 if (tsk.foundException())
187 {
188 hasException = true;
189 thrownByTask = tsk.getException();
190 break;
191 }
192 }
193 return hasException;
194 }
195
196//------------------------------------------------------------------------------
197
202 protected Throwable getExceptionFromSubTask()
203 {
204 return thrownByTask;
205 }
206
207//------------------------------------------------------------------------------
208
213 protected boolean allTasksCompleted()
214 {
215 boolean allDone = true;
216 for (Task tsk : submitted)
217 {
218 if (!tsk.isCompleted())
219 {
220 allDone = false;
221 break;
222 }
223 }
224 return allDone;
225 }
226
227//------------------------------------------------------------------------------
228
233 public void run() throws DENOPTIMException, IOException
234 {
235 String msg = "";
236 StopWatch watch = new StopWatch();
237 watch.start();
238
240 return;
241
242 // Start parallel threads for generating and/or manipulating fragments
243 tpe.prestartAllCoreThreads();
244
246
247 // This sounds weird when reading the doc, but the following does wait
248 // for the threads to complete.
249 // TODO try ExecutorCompletionService
250
251 // shutdown thread pool
252 tpe.shutdown();
253 try
254 {
255 // wait a bit for pending tasks to finish
256 while (!tpe.awaitTermination(5, TimeUnit.SECONDS))
257 {
258 // do nothing
259 }
260 }
261 catch (InterruptedException ex)
262 {
263 //Do nothing
264 }
265
266 // This collects also the results and frees-up memory
268
270 return;
271
272 // closing messages
273 watch.stop();
274 msg = "Overall time: " + watch.toString() + ". "
275 + DENOPTIMConstants.EOL
276 + this.getClass().getSimpleName() + " run completed."
278 logger.log(Level.INFO, msg);
279 }
280
281//------------------------------------------------------------------------------
282
283 protected void submitTask(Task task, String logFilePathname)
284 {
285 submitted.add(task);
286 futures.put(task, tpe.submit(task));
287 String msg = task.getClass().getSimpleName() + " "
288 + task.getId() + " submitted.";
289 if (logFilePathname!=null && !logFilePathname.isBlank())
290 {
291 msg = msg + " Log file: " + logFilePathname;
292 }
293 logger.log(Level.INFO, msg);
294 if (submitted.size()>numThreads*2)
295 {
297 }
298 }
299
300//------------------------------------------------------------------------------
301
307 abstract protected void createAndSubmitTasks();
308
309//------------------------------------------------------------------------------
310
311 abstract protected boolean doPostFlightOperations();
312
313//------------------------------------------------------------------------------
314
315 abstract protected boolean doPreFlightOperations();
316
317//------------------------------------------------------------------------------
318
322 private void cleanupCompleted()
323 {
324 List<Task> completed = new ArrayList<Task>();
325
326 for (Task t : submitted)
327 {
328 if (t.isCompleted())
329 completed.add(t);
330 }
331
332 for (Task t : completed)
333 {
334 try
335 {
336 results.add(futures.get(t).get());
337 } catch (InterruptedException | ExecutionException e)
338 {
339 e.printStackTrace();
340 String msg = "EXCEPTION upon retrieving results from completed "
341 + "task. The result from task '" + t.toString() + "' "
342 + "are lost.";
343 logger.log(Level.WARNING,msg);
344 }
345 submitted.remove(t);
346 futures.get(t).cancel(true);
347 futures.remove(t);
348 }
349 }
350
351//------------------------------------------------------------------------------
352
356 private void cleanup()
357 {
358 for (Task tsk : submitted)
359 {
360 tsk.stopTask();
361 }
362 for (Task tsk : futures.keySet())
363 {
364 futures.get(tsk).cancel(true);
365 futures.remove(tsk);
366 }
367 submitted.clear();
368 tpe.getQueue().clear();
369 }
370
371//------------------------------------------------------------------------------
372
373}
General set of constants used in DENOPTIM.
static final String EOL
new line character
void cleanupCompleted()
Removes only tasks that are marked as completed.
Throwable thrownByTask
If any, here we stores the exception returned by a subtask.
boolean allTasksCompleted()
Check for completion of all subtasks.
final Map< Task, Future< Object > > futures
Storage of references to the submitted subtasks and their future returned value.
void stopRun()
Stops all subtasks and shutdown executor.
final List< Task > submitted
Storage of references to the submitted subtasks.
void cleanup()
clean all reference to submitted tasks
abstract void createAndSubmitTasks()
Implementations of this method must call the submitTask(Task, String) method to actually send the tas...
boolean subtaskHasException()
Looks for exceptions in the subtasks and, if any, store its reference locally to allow reporting it b...
ParallelAsynchronousTaskExecutor(int numberOfTasks, Logger logger)
Constructor.
final List< Object > results
List of object returned by completed tasks.
final ThreadPoolExecutor tpe
Asynchronous tasks manager.
A task that can throw exceptions.
Definition: Task.java:30