$darkmode
DENOPTIM
FPRunner.java
Go to the documentation of this file.
1/*
2 * DENOPTIM
3 * Copyright (C) 2019 Marco Foscato <marco.foscato@uib.no>
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as published
7 * by the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18
19package denoptim.programs.fitnessevaluator;
20
21import java.io.File;
22import java.util.ArrayList;
23import java.util.HashMap;
24import java.util.List;
25import java.util.Map;
26import java.util.concurrent.ArrayBlockingQueue;
27import java.util.concurrent.Future;
28import java.util.concurrent.RejectedExecutionHandler;
29import java.util.concurrent.ThreadPoolExecutor;
30import java.util.concurrent.TimeUnit;
31import java.util.logging.Level;
32
33import org.apache.commons.io.FileUtils;
34import org.apache.commons.lang3.time.StopWatch;
35import org.openscience.cdk.interfaces.IAtomContainer;
36
37import denoptim.constants.DENOPTIMConstants;
38import denoptim.exception.DENOPTIMException;
39import denoptim.graph.DGraph;
40import denoptim.io.DenoptimIO;
41import denoptim.logging.StaticLogger;
42import denoptim.programs.combinatorial.FragSpaceExplorer;
43import denoptim.programs.denovo.GARunner;
44import denoptim.task.FitnessTask;
45
46
56public class FPRunner
57{
61 final Map<FitnessTask,Future<Object>> futures;
62
66 final List<FitnessTask> submitted;
67
71 final ThreadPoolExecutor tpe;
72
77
78
82 // TODO: use getNumCPU from GAParameters to launch fitness evaluations in parallel.
83 private int numThreads = 1;
84
85//-----------------------------------------------------------------------------
86
92 {
93 this.settings = settings;
94 futures = new HashMap<FitnessTask,Future<Object>>();
95 submitted = new ArrayList<>(numThreads);
96
97 tpe = new ThreadPoolExecutor(numThreads, numThreads, Long.MAX_VALUE,
98 TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1));
99
100 Runtime.getRuntime().addShutdownHook(new Thread()
101 {
102 @Override
103 public void run()
104 {
105 tpe.shutdown(); // Disable new tasks from being submitted
106 try
107 {
108 // Wait a while for existing tasks to terminate
109 if (!tpe.awaitTermination(5, TimeUnit.SECONDS))
110 {
111 tpe.shutdownNow(); // Cancel currently executing tasks
112 }
113
114 if (!tpe.awaitTermination(10, TimeUnit.SECONDS))
115 {
116 // pool didn't terminate after the second try
117 }
118 }
119 catch (InterruptedException ie)
120 {
121 cleanup();
122 // (Re-)Cancel if current thread also interrupted
123 tpe.shutdownNow();
124 // Preserve interrupt status
125 Thread.currentThread().interrupt();
126 }
127 }
128 });
129
130 // by default the ThreadPoolExecutor will throw an exception
131 tpe.setRejectedExecutionHandler(new RejectedExecutionHandler()
132 {
133 @Override
134 public void rejectedExecution(Runnable r,
135 ThreadPoolExecutor executor)
136 {
137 try
138 {
139 // this will block if the queue is full
140 executor.getQueue().put(r);
141 }
142 catch (InterruptedException ex)
143 {
144 ex.printStackTrace();
145 String msg = "EXCEPTION in rejectedExecution.";
146 StaticLogger.appLogger.log(Level.WARNING,msg);
147 }
148 }
149 });
150 }
151
152//------------------------------------------------------------------------------
153
158 public void stopRun()
159 {
160 cleanup();
161 tpe.shutdown();
162 }
163
164//------------------------------------------------------------------------------
165
171 public void run() throws Exception
172 {
173 String msg = "";
174 StopWatch watch = new StopWatch();
175 watch.start();
176
177 List<DGraph> graphs = DenoptimIO.readDENOPTIMGraphsFromFile(
179 List<IAtomContainer> iacs = DenoptimIO.readSDFFile(
180 settings.getInputFile().getAbsolutePath());
181 if (graphs.size() != iacs.size())
182 {
183 throw new DENOPTIMException("Found " + graphs.size() + " and "
184 + iacs.size() + " in " + settings.getInputFile());
185 }
186
187 tpe.prestartAllCoreThreads();
188
189 int evaluationCount = 0;
190 for (int i=0; i<graphs.size(); i++)
191 {
192 DGraph graph = graphs.get(i);
193 IAtomContainer iac = iacs.get(i);
194
196 graph, iac, settings.getWorkDirectory(),
197 settings.getOutputFile().getAbsolutePath()+"_"+i);
198
199 submitted.add(task);
200 futures.put(task,tpe.submit(task));
201 evaluationCount++;
202 if (evaluationCount>(numThreads*2))
203 {
205 }
206 }
207
208 // wait a bit for pending tasks to finish
209 tpe.shutdown();
210 tpe.awaitTermination(settings.getWallTime(), TimeUnit.SECONDS);
211 String collectiveOutput = settings.getOutputFile().getAbsolutePath();
212 for (int i=0; i<graphs.size(); i++)
213 {
214 String tmpFileFromProvider = collectiveOutput + "_" + i;
215 File tmpFile = new File(tmpFileFromProvider);
216 if (!tmpFile.exists() || !tmpFile.canRead())
217 {
218 throw new Error("File '" + tmpFileFromProvider + "' should "
219 + "have been procuded by fitness provider, but is not "
220 + "found after " + settings.getWallTime()
221 + " seconds.");
222 }
223 String content = DenoptimIO.readText(tmpFileFromProvider);
224 // Get rid of trailing newline character
225 content = content.substring(0, content.length()-1);
226 DenoptimIO.writeData(collectiveOutput, content, true);
227 FileUtils.deleteQuietly(new File(tmpFileFromProvider));
228 }
229
230 watch.stop();
231 String plural = "";
232 if (evaluationCount>1)
233 plural = "s";
234 msg = "Overall time: " + watch.toString() + ". " + DENOPTIMConstants.EOL
235 + "Run " + evaluationCount + " evaluation" + plural
236 + " of fitness." + DENOPTIMConstants.EOL
237 + "FitnessRunner run completed." + DENOPTIMConstants.EOL;
238 StaticLogger.appLogger.log(Level.INFO, msg);
239 }
240
241//------------------------------------------------------------------------------
242
246 private void cleanupCompleted()
247 {
248 List<FitnessTask> completed = new ArrayList<FitnessTask>();
249
250 for (FitnessTask t : submitted)
251 {
252 if (t.isCompleted())
253 completed.add(t);
254 }
255
256 for (FitnessTask t : completed)
257 {
258 submitted.remove(t);
259 futures.get(t).cancel(true);
260 futures.remove(t);
261 }
262 }
263
264//------------------------------------------------------------------------------
265
270 private void cleanup()
271 {
272 for (FitnessTask tsk: submitted)
273 {
274 tsk.stopTask();
275 }
276 for (FitnessTask tsk : futures.keySet())
277 {
278 futures.get(tsk).cancel(true);
279 }
280 futures.clear();
281 submitted.clear();
282 tpe.getQueue().clear();
283 }
284
285//------------------------------------------------------------------------------
286
287}
General set of constants used in DENOPTIM.
static final String EOL
new line character
Container for the list of vertices and the edges that connect them.
Definition: DGraph.java:102
Utility methods for input/output.
static ArrayList< IAtomContainer > readSDFFile(String fileName)
Reads a file containing multiple molecules (multiple SD format))
static ArrayList< DGraph > readDENOPTIMGraphsFromFile(File inFile)
Reads a list of <DGraphs from file.
static String readText(String fileName)
Read text from file.
static void writeData(String fileName, String data, boolean append)
Write text-like data file.
Logger class for DENOPTIM.
static final Logger appLogger
String getWorkDirectory()
Gets the pathname to the working directory.
Runs a fitness provider task as defined in the static parameters.
Definition: FPRunner.java:57
void cleanup()
clean all reference to submitted tasks
Definition: FPRunner.java:270
void cleanupCompleted()
Removes only tasks that are marked as completed.
Definition: FPRunner.java:246
final Map< FitnessTask, Future< Object > > futures
Storage of references to the submitted subtasks as Future.
Definition: FPRunner.java:61
void run()
Create and run the fitness task.
Definition: FPRunner.java:171
final ThreadPoolExecutor tpe
Asynchronous tasks manager.
Definition: FPRunner.java:71
FPRunner(FRParameters settings)
Constructor.
Definition: FPRunner.java:91
int numThreads
Number of parallel fitness evaluations we run.
Definition: FPRunner.java:83
void stopRun()
Stops all subtasks and shutdown executor.
Definition: FPRunner.java:158
FRParameters settings
The parameters controlling the.
Definition: FPRunner.java:76
final List< FitnessTask > submitted
Storage of references to the submitted subtasks.
Definition: FPRunner.java:66
Parameters controlling execution of FitnessRunner.
long getWallTime()
Returns the maximum number of seconds to wait for the fitness provider to deliver a result.
Task that assesses the fitness of a given graph.