Scippy

UG

Ubiquity Generator framework

paraLoadCoordinator.h
Go to the documentation of this file.
1/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2/* */
3/* This file is part of the program and software framework */
4/* UG --- Ubquity Generator Framework */
5/* */
6/* Copyright Written by Yuji Shinano <shinano@zib.de>, */
7/* Copyright (C) 2021-2024 by Zuse Institute Berlin, */
8/* licensed under LGPL version 3 or later. */
9/* Commercial licenses are available through <licenses@zib.de> */
10/* */
11/* This code is free software; you can redistribute it and/or */
12/* modify it under the terms of the GNU Lesser General Public License */
13/* as published by the Free Software Foundation; either version 3 */
14/* of the License, or (at your option) any later version. */
15/* */
16/* This program is distributed in the hope that it will be useful, */
17/* but WITHOUT ANY WARRANTY; without even the implied warranty of */
18/* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */
19/* GNU Lesser General Public License for more details. */
20/* */
21/* You should have received a copy of the GNU Lesser General Public License */
22/* along with this program. If not, see <http://www.gnu.org/licenses/>. */
23/* */
24/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
25
26/**@file paraLoadCoordinator.h
27 * @brief Load Coordinator.
28 * @author Yuji Shinano
29 *
30 *
31 *
32 */
33
34/*---+----1----+----2----+----3----+----4----+----5----+----6----+----7----+----8----+----9----+----0----+----1----+----2*/
35
36
37#ifndef __PARA_LOADCOORDINATOR_H__
38#define __PARA_LOADCOORDINATOR_H__
39
40#include <fstream>
41#include <list>
42#include <queue>
43#include <mutex>
44#include "paraDef.h"
45#include "paraComm.h"
47#include "paraTask.h"
48#include "paraSolverState.h"
49#include "paraSolverPool.h"
50#include "paraInitiator.h"
51#include "paraTimer.h"
53
54#ifdef UG_WITH_UGS
55#include "ugs/ugsDef.h"
56#include "ugs/ugsParaCommMpi.h"
57#endif
58
59namespace UG
60{
61
62///
63/// running phase definition
64///
66{
67 RampUpPhase, ///< ramp-up phase
68 NormalRunningPhase, ///< normal running phase (primary phase)
69 TerminationPhase ///< termination phase, includes interrupting phase
70};
71
72///
73/// Class for LoadCoordinator
74///
76{
77
78protected:
79
81
82 int nHandlers; ///< number of valid handlers
83 MessageHandlerFunctionPointer *messageHandler; ///< message handlers table for primary phase
84 MessageHandlerFunctionPointer *racingRampUpMessageHandler; ///< message handlers table for racing stage
85
86 int globalSubtreeIdGen; ///< global subtree id generator
87#ifdef UG_WITH_UGS
88 UGS::UgsParaCommMpi *commUgs; ///< communicator used for UGS: None zero means LC is running under UGS
89#endif
90 ParaComm *paraComm; ///< communicator used
91 ParaParamSet *paraParams; ///< UG parameter set
93 bool *racingSolversExist; ///< indicate if racing solver exits or not, true: exists
94 bool restarted; ///< indicates that this run is restarted from checkpoint files
95 RunningPhase runningPhase; ///< status of LoadCoordinator
96
97 bool computationIsInterrupted; ///< indicate that current computation is interrupted or not
98 bool interruptedFromControlTerminal; ///< interrupted from control terminal
99 bool hardTimeLimitIsReached; ///< indicate that hard time limit is reached or not
100 bool memoryLimitIsReached; ///< indicate if memory limit is reached or not in a solver, when base solver has memory management feature
101 bool interruptIsRequested; ///< indicate that all solver interrupt message is requested or not
102
103 ///
104 /// Pools in LoadCorrdinator
105 ///
108
109 long long nSolvedInInterruptedRacingSolvers; ///< number of tasks solved of the winner solver in the racing solvers
110 long long nTasksLeftInInterruptedRacingSolvers; ///< number of of tasks remains of the the winner solver in the racing solvers
111
112 ///
113 /// For checkpoint
114 ///
115 double previousCheckpointTime; ///< previous checkpoint time
116 char lastCheckpointTimeStr[26]; ///< lastCheckpointTimeStr[0] == ' ' means no checkpoint
117
118 ///
119 /// epsilon
120 ///
121 double eps; ///< absolute values smaller than this are considered zero */
122
123 ///
124 /// racing winner information
125 ///
126 int racingWinner; ///< racing winner, -1: not determined
127 ParaRacingRampUpParamSet *racingWinnerParams; ///< racing winner parameter set
128
129 ///
130 /// racing termination information
131 ///
132 bool racingTermination; ///< racing termination flag, true: if a racing solver solved the problem
133 int nSolvedRacingTermination; ///< number of tasks solved at the racing termination solver
134
135 ///
136 /// counter to check if all solvers are terminated or not
137 ///
138 size_t nTerminated; ///< number of terminated Solvers
139
140 ///
141 /// Timers for LoadCoordinator
142 ///
143 ParaTimer *paraTimer; ///< normal timer used
144 ParaDeterministicTimer *paraDetTimer; ///< deterministic timer used in case of deterministic mode
145 ///< this timer need to be created in case of deterministic mode
146
147 ///
148 /// output streams and flags which indicate the output is specified or not
149 ///
150 bool logSolvingStatusFlag; ///< indicate if solving status is logged or not
151 std::ofstream ofsLogSolvingStatus; ///< ofstream for solving status
152 std::ostream *osLogSolvingStatus; ///< ostram for solving status to switch output location
153 bool logTasksTransferFlag; ///< indicate if task transfer info. is logged or not
154 std::ofstream ofsLogTasksTransfer; ///< ofstream for task transfer info.
155 std::ostream *osLogTasksTransfer; ///< ostream for task transfer info. to switch output location
156 std::ofstream ofsStatisticsFinalRun; ///< ofstream for statistics of the final run
157 std::ostream *osStatisticsFinalRun; ///< ostream for statistics of the final run
158 std::ofstream ofsStatisticsRacingRampUp; ///< ofstream for statistics for racing solvers
159 std::ostream *osStatisticsRacingRampUp; ///< ostream for statistics for racing solvers to switch output location
160
161 ParaSolution *pendingSolution; ///< pending solution during merging
162 bool terminationIssued; ///< indicate termination request is issued
163 std::mutex routineMutex; ///< used to exclusive control of routines
164
165 ///
166 /// write transfer log
167 ///
168 virtual void writeTransferLog(
169 int rank, ///< solver rank
170 ParaCalculationState *state ///< calculation status
171 );
172 ///
173 /// write transfer log
174 ///
175 virtual void writeTransferLog(
176 int rank ///< solver rank
177 );
178
179 ///
180 /// write transfer log in racing
181 ///
182 virtual void writeTransferLogInRacing(
183 int rank, ///< solver rank
184 ParaCalculationState *state ///< calculation status
185 );
186
187 ///
188 /// write transfer log in racing
189 ///
190 virtual void writeTransferLogInRacing(
191 int rank ///< solver rank
192 );
193
194 ///
195 /// notify ramp-up to all solvers
196 ///
197 virtual void sendRampUpToAllSolvers(
198 );
199
200 ///
201 /// notify retry ramp-up to all solvers (Maybe, this remove from this base class)
202 ///
204 )
205 {
206 }
207
208 ///
209 /// send interrupt request to all solvers
210 ///
212 ) = 0;
213
214 ///
215 /// terminate all solvers
216 ///
217 virtual void terminateAllSolvers(
218 );
219
220 ///
221 /// create a new global subtree Id
222 /// @return global subtree id generated
223 ///
225 )
226 {
227 return ++globalSubtreeIdGen;
228 }
229
230 ///////////////////////
231 ///
232 /// Message handlers
233 ///
234 ///////////////////////
235
236 ///
237 /// function to process TagTask message
238 /// @return always 0 (for extension)
239 ///
240 virtual int processTagTask(
241 int source, ///< source solver rank
242 int tag ///< TagTask
243 ) = 0;
244
245 ///
246 /// function to process TagSolution message
247 /// @return always 0 (for extension)
248 ///
250 int source, ///< source solver rank
251 int tag ///< TagSolution
252 ) = 0;
253
254 ///
255 /// function to process TagSolverState message
256 /// @return always 0 (for extension)
257 ///
259 int source, ///< source solver rank
260 int tag ///< TagSolverState
261 ) = 0;
262
263 ///
264 /// function to process TagCompletionOfCalculation message
265 /// @return always 0 (for extension)
266 ///
268 int source, ///< source solver rank
269 int tag ///< TagCompletionOfCalculation
270 ) = 0;
271
272 ///
273 /// function to process TagTerminated message
274 /// @return always 0 (for extension)
275 ///
276 virtual int processTagTerminated(
277 int source, ///< source solver rank
278 int tag ///< TagTerminated
279 );
280
281 ///
282 /// function to process TagHardTimeLimit message
283 /// @return always 0 (for extension)
284 ///
285 virtual int processTagHardTimeLimit(
286 int source, ///< source solver rank
287 int tag ///< TagHardTimeLimit
288 );
289
290 ///
291 /// function to process TagToken message
292 /// @return always 0 (for extension)
293 ///
294 virtual int processTagToken(
295 int source, ///< source solver rank
296 int tag ///< TagToken
297 );
298
299 ///////////////////////
300 ///
301 /// message handlers specialized for racing ramp-up
302 ///
303 ///////////////////////
304
305 ///
306 /// function to process TagSolverState message in racing ramp-up stage
307 /// @return always 0 (for extension)
308 ///
310 int source, ///< source solver rank
311 int tag ///< TagSolverState
312 ) = 0;
313
314 ///
315 /// function to process TagCompletionOfCalculation message in racing ramp-up stage
316 /// @return always 0 (for extension)
317 ///
319 int source, ///< source solver rank
320 int tag ///< TagCompletionOfCalculation
321 ) = 0;
322
323// ///
324// /// check if current stage is in racing or not
325// /// @return true, if current stage is in racing
326// ///
327// bool isRacingStage(
328// )
329// {
330// if( // ( !paraInitiator->getPrefixWarm() ) &&
331// runningPhase == RampUpPhase &&
332// paraParams->getIntParamValue(RampUpPhaseProcess) > 0 && racingWinner < 0 )
333// return true;
334// else
335// return false;
336// }
337
338 ///
339 /// send specified tag to all solvers
340 ///
342 const int tag ///< tag which is sent to all solvers
343 );
344
345#ifdef UG_WITH_UGS
346
347 ///
348 /// check and read incument solution
349 ///
350 // int checkAndReadIncumbent(
351 // );
352
353#endif
354
355 ///
356 /// run function to start main process
357 ///
358 virtual void run(
359 ) = 0;
360
361 ///
362 /// send ParaTasks to idle solvers
363 /// @return true, if a ParaTasks is sent
364 ///
366 ) = 0;
367
368#ifdef UG_WITH_ZLIB
369
370 ///
371 /// function to update checkpoint files
372 ///
374 ) = 0;
375
376#endif
377
378public:
379
380 ///
381 /// constructor
382 ///
384#ifdef UG_WITH_UGS
385 UGS::UgsParaCommMpi *inComUgs, ///< communicator used for UGS
386#endif
387 int nHandlers, ///< number of handlers
388 ParaComm *inComm, ///< communicator used
389 ParaParamSet *inParaParamSet, ///< UG parameter set used
390 ParaInitiator *paraInitiator, ///< ParaInitiator for initialization of solving algorithm
391 bool *racingSolversExist, ///< indicate racing solver exits or not
392 ParaTimer *paraTimer, ///< ParaTimer used
393 ParaDeterministicTimer *detTimer ///< DeterministicTimer used
394 );
395
396 ///
397 /// destructor
398 ///
400 )
401 {
402 /// destructor should be implemented appropriately in a derived class of ParaLoadCoordinator
403 if( paraSolverPool ) delete paraSolverPool;
405
406 if( messageHandler ) delete [] messageHandler;
408 }
409
410 ///
411 /// interrupt from out side
412 ///
413 virtual void interrupt(
414 )
415 {
418 }
419
420#ifdef UG_WITH_ZLIB
421
422 ///
423 /// warm start (restart)
424 ///
425 virtual void warmStart(
426 )
427 {
428 // if user want to support warm start (restart), user need to implement this
429 }
430
431#endif
432
433 ///
434 /// run for normal ramp-up
435 ///
436 virtual void run(
437 ParaTask *paraTask ///< root ParaTask
438 )
439 {
440 }
441
442 ///
443 /// run for racing ramp-up
444 ///
445 virtual void run(
446 ParaTask *paraTask, ///< root ParaTask
447 int nRacingSolvers, ///< number of racing solvers
448 ParaRacingRampUpParamSet **racingRampUpParams ///< racing parameters
449 )
450 {
451 }
452
453 ///
454 /// execute UG parallel solver totally solver dependent way
455 ///
456 virtual void parallelDispatch(
457 )
458 {
459 run();
460 }
461
462};
463
464}
465
466#endif // __PARA_LOADCOORDINATOR_H__
467
Base class of Calculation state in a ParaSolver.
Base class of communicator object.
Definition: paraComm.h:102
class for deterministic timer
Class for initiator.
Definition: paraInitiator.h:63
Class for LoadCoordinator.
ParaParamSet * paraParams
UG parameter set.
virtual void interrupt()
interrupt from out side
bool hardTimeLimitIsReached
indicate that hard time limit is reached or not
ParaComm * paraComm
communicator used
char lastCheckpointTimeStr[26]
lastCheckpointTimeStr[0] == ' ' means no checkpoint
virtual void run()=0
run function to start main process
virtual void terminateAllSolvers()
terminate all solvers
virtual int processTagTerminated(int source, int tag)
function to process TagTerminated message
MessageHandlerFunctionPointer * racingRampUpMessageHandler
message handlers table for racing stage
bool restarted
indicates that this run is restarted from checkpoint files
std::mutex routineMutex
used to exclusive control of routines
virtual int processRacingRampUpTagCompletionOfCalculation(int source, int tag)=0
function to process TagCompletionOfCalculation message in racing ramp-up stage
long long nTasksLeftInInterruptedRacingSolvers
number of of tasks remains of the the winner solver in the racing solvers
bool * racingSolversExist
indicate if racing solver exits or not, true: exists
bool terminationIssued
indicate termination request is issued
std::ostream * osLogTasksTransfer
ostream for task transfer info. to switch output location
virtual int processTagCompletionOfCalculation(int source, int tag)=0
function to process TagCompletionOfCalculation message
virtual void writeTransferLog(int rank, ParaCalculationState *state)
write transfer log
size_t nTerminated
counter to check if all solvers are terminated or not
virtual int processTagSolverState(int source, int tag)=0
function to process TagSolverState message
ParaSolverPool * paraSolverPool
Pools in LoadCorrdinator.
virtual bool sendParaTasksToIdleSolvers()=0
send ParaTasks to idle solvers
int nSolvedRacingTermination
number of tasks solved at the racing termination solver
virtual void sendRampUpToAllSolvers()
notify ramp-up to all solvers
virtual void warmStart()
warm start (restart)
virtual void sendInterruptRequest()=0
send interrupt request to all solvers
bool memoryLimitIsReached
indicate if memory limit is reached or not in a solver, when base solver has memory management featur...
std::ofstream ofsStatisticsFinalRun
ofstream for statistics of the final run
virtual ~ParaLoadCoordinator()
destructor
ParaRacingSolverPool * paraRacingSolverPool
racing solver pool
virtual void updateCheckpointFiles()=0
function to update checkpoint files
ParaLoadCoordinator(int nHandlers, ParaComm *inComm, ParaParamSet *inParaParamSet, ParaInitiator *paraInitiator, bool *racingSolversExist, ParaTimer *paraTimer, ParaDeterministicTimer *detTimer)
constructor
long long nSolvedInInterruptedRacingSolvers
number of tasks solved of the winner solver in the racing solvers
virtual int processTagSolution(int source, int tag)=0
function to process TagSolution message
void sendTagToAllSolvers(const int tag)
send specified tag to all solvers
int(UG::ParaLoadCoordinator::* MessageHandlerFunctionPointer)(int, int)
ParaSolution * pendingSolution
pending solution during merging
double previousCheckpointTime
For checkpoint.
ParaInitiator * paraInitiator
initiator
int createNewGlobalSubtreeId()
create a new global subtree Id
int globalSubtreeIdGen
global subtree id generator
int racingWinner
racing winner information
ParaTimer * paraTimer
Timers for LoadCoordinator.
RunningPhase runningPhase
status of LoadCoordinator
bool interruptIsRequested
indicate that all solver interrupt message is requested or not
std::ostream * osStatisticsFinalRun
ostream for statistics of the final run
std::ofstream ofsStatisticsRacingRampUp
ofstream for statistics for racing solvers
std::ofstream ofsLogTasksTransfer
ofstream for task transfer info.
bool interruptedFromControlTerminal
interrupted from control terminal
virtual void parallelDispatch()
execute UG parallel solver totally solver dependent way
virtual int processTagToken(int source, int tag)
function to process TagToken message
virtual void run(ParaTask *paraTask)
run for normal ramp-up
std::ostream * osStatisticsRacingRampUp
ostream for statistics for racing solvers to switch output location
virtual int processTagTask(int source, int tag)=0
Message handlers.
std::ostream * osLogSolvingStatus
ostram for solving status to switch output location
virtual void writeTransferLogInRacing(int rank, ParaCalculationState *state)
write transfer log in racing
std::ofstream ofsLogSolvingStatus
ofstream for solving status
bool logTasksTransferFlag
indicate if task transfer info. is logged or not
ParaDeterministicTimer * paraDetTimer
deterministic timer used in case of deterministic mode this timer need to be created in case of deter...
ParaRacingRampUpParamSet * racingWinnerParams
racing winner parameter set
virtual int processTagHardTimeLimit(int source, int tag)
function to process TagHardTimeLimit message
bool logSolvingStatusFlag
output streams and flags which indicate the output is specified or not
int nHandlers
number of valid handlers
virtual void run(ParaTask *paraTask, int nRacingSolvers, ParaRacingRampUpParamSet **racingRampUpParams)
run for racing ramp-up
MessageHandlerFunctionPointer * messageHandler
message handlers table for primary phase
bool computationIsInterrupted
indicate that current computation is interrupted or not
bool racingTermination
racing termination information
virtual void sendRetryRampUpToAllSolvers()
notify retry ramp-up to all solvers (Maybe, this remove from this base class)
virtual int processRacingRampUpTagSolverState(int source, int tag)=0
message handlers specialized for racing ramp-up
class ParaParamSet
Definition: paraParamSet.h:850
class ParaRacingRampUpParamSet (parameter set for racing ramp-up)
class ParaRacingSolverPool (Racing Solver Pool)
class for solution
Definition: paraSolution.h:54
class ParaSolverPool (Solver Pool base class)
class ParaTask
Definition: paraTask.h:542
class ParaTimer
Definition: paraTimer.h:49
RunningPhase
running phase definition
@ TerminationPhase
termination phase, includes interrupting phase
@ RampUpPhase
ramp-up phase
@ NormalRunningPhase
normal running phase (primary phase)
Base class for calculation state.
Base class of communicator for UG Framework.
Defines for UG Framework.
Base class for deterministic timer.
Base class of initiator that maintains original problem and incumbent solution.
Solver pool.
This class has solver state to be transferred.
Base class for ParaTask.
Base class for Timer.