$darkmode
DENOPTIM
TasksBatchManager.java
Go to the documentation of this file.
1/*
2 * DENOPTIM
3 * Copyright (C) 2019 Vishwesh Venkatraman <vishwesh.venkatraman@ntnu.no>
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as published
7 * by the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18
19package denoptim.task;
20
21import java.util.ArrayList;
22import java.util.List;
23import java.util.concurrent.CompletionService;
24import java.util.concurrent.ExecutionException;
25import java.util.concurrent.ExecutorCompletionService;
26import java.util.concurrent.ExecutorService;
27import java.util.concurrent.Executors;
28import java.util.concurrent.Future;
29import java.util.concurrent.TimeUnit;
30
31import denoptim.exception.DENOPTIMException;
32import denoptim.graph.Candidate;
33
34
40{
41 private List<Task> taskList;
42 private ExecutorService eservice;
43 private List<Future<Object>> futures;
44
45//------------------------------------------------------------------------------
46
51 {
52 }
53
54//------------------------------------------------------------------------------
55
63 public List<Candidate> executeTasks(List<Task> syncronisedTasks,
64 int numOfProcessors) throws DENOPTIMException
65 {
66 taskList = syncronisedTasks;
67 int numOfJobs = syncronisedTasks.size();
68
69 int n = Math.min(numOfJobs, numOfProcessors);
70
71 // create a pool with a fixed number of threads that are reused.
72 // Here the number of available processors is passed to the factory so
73 // the ExecutorService is created with as many threads in the pool as
74 // available processors.
75
76 eservice = Executors.newFixedThreadPool(n);
77 CompletionService<Object> cservice =
78 new ExecutorCompletionService<>(eservice);
79 futures = new ArrayList<>();
80
81 for (int i=0; i<numOfJobs; i++)
82 {
83 futures.add(cservice.submit(syncronisedTasks.get(i)));
84 }
85
86 Thread shutDownHook = new Thread()
87 {
88 @Override
89 public void run()
90 {
91 eservice.shutdown(); // Disable new tasks from being submitted
92 try
93 {
94 // Wait a while for existing tasks to terminate
95 if (!eservice.awaitTermination(30, TimeUnit.SECONDS))
96 {
97 eservice.shutdownNow(); // Cancel currently executing tasks
98 }
99
100 if (!eservice.awaitTermination(60, TimeUnit.SECONDS))
101 {
102 // pool didn't terminate after the second try
103 }
104 }
105 catch (InterruptedException ie)
106 {
107 for (Task tsk : syncronisedTasks)
108 {
109 tsk.stopTask();
110 }
111
112 syncronisedTasks.clear();
113
114 for (Future<Object> f : futures)
115 {
116 f.cancel(true);
117 }
118
119 // (Re-)Cancel if current thread also interrupted
120 eservice.shutdownNow();
121 // Preserve interrupt status
122 Thread.currentThread().interrupt();
123 }
124 }
125 };
126 Runtime.getRuntime().addShutdownHook(shutDownHook);
127
128 // Waits for completion of tasks
129 ArrayList<Candidate> results = new ArrayList<>();
130 try
131 {
132 for (int i=0; i<syncronisedTasks.size(); i++)
133 {
134 Candidate taskResult = (Candidate) cservice.take().get();
135 results.add(taskResult);
136 }
137 }
138 catch (InterruptedException ie)
139 {
140 // (Re-)Cancel if current thread also interrupted
141 eservice.shutdownNow();
142 // Preserve interrupt status
143 Thread.currentThread().interrupt();
144 throw new DENOPTIMException(ie);
145 }
146 catch (ExecutionException ee)
147 {
148 throw new DENOPTIMException(ee);
149 }
150 finally
151 {
152 eservice.shutdown();
153 Runtime.getRuntime().removeShutdownHook(shutDownHook);
154 shutDownHook = null;
155 }
156
157 // Cleanup
158 syncronisedTasks.clear();
159 for (Future<Object> f : futures)
160 {
161 f.cancel(true);
162 }
163 futures.clear();
164
165 return results;
166 }
167
168//------------------------------------------------------------------------------
169
170 public void stop()
171 {
172 try
173 {
174 eservice.shutdown(); // Disable new tasks from being submitted
175 for (Task tsk : taskList)
176 {
177 tsk.stopTask();
178 }
179 for (Future<Object> f : futures)
180 {
181 f.cancel(true);
182 }
183
184 // Wait a while for existing tasks to terminate
185 if (!eservice.awaitTermination(5, TimeUnit.SECONDS))
186 {
187 eservice.shutdownNow(); // Cancel currently executing tasks
188 }
189 }
190 catch (InterruptedException ie)
191 {
192 // (Re-)Cancel if current thread also interrupted
193 eservice.shutdownNow();
194 }
195 }
196
197//------------------------------------------------------------------------------
198
199}
A candidate is the combination of a denoptim graph with molecular representation and may include also...
Definition: Candidate.java:40
A task that can throw exceptions.
Definition: Task.java:30
Class that manages the submission of a batch of tasks.
List< Candidate > executeTasks(List< Task > syncronisedTasks, int numOfProcessors)
Execute the list of tasks.
List< Future< Object > > futures
TasksBatchManager()
Constructs a new task manager meant to run tasks in batches.