Scippy

UG

Ubiquity Generator framework

paraLoadCoordinator.h
Go to the documentation of this file.
1 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2 /* */
3 /* This file is part of the program and software framework */
4 /* UG --- Ubquity Generator Framework */
5 /* */
6 /* Copyright Written by Yuji Shinano <shinano@zib.de>, */
7 /* Copyright (C) 2021 by Zuse Institute Berlin, */
8 /* licensed under LGPL version 3 or later. */
9 /* Commercial licenses are available through <licenses@zib.de> */
10 /* */
11 /* This code is free software; you can redistribute it and/or */
12 /* modify it under the terms of the GNU Lesser General Public License */
13 /* as published by the Free Software Foundation; either version 3 */
14 /* of the License, or (at your option) any later version. */
15 /* */
16 /* This program is distributed in the hope that it will be useful, */
17 /* but WITHOUT ANY WARRANTY; without even the implied warranty of */
18 /* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */
19 /* GNU Lesser General Public License for more details. */
20 /* */
21 /* You should have received a copy of the GNU Lesser General Public License */
22 /* along with this program. If not, see <http://www.gnu.org/licenses/>. */
23 /* */
24 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
25 
26 /**@file paraLoadCoordinator.h
27  * @brief Load Coordinator.
28  * @author Yuji Shinano
29  *
30  *
31  *
32  */
33 
34 /*---+----1----+----2----+----3----+----4----+----5----+----6----+----7----+----8----+----9----+----0----+----1----+----2*/
35 
36 
37 #ifndef __PARA_LOADCOORDINATOR_H__
38 #define __PARA_LOADCOORDINATOR_H__
39 
40 #include <fstream>
41 #include <list>
42 #include <queue>
43 #include <mutex>
44 #include "paraDef.h"
45 #include "paraComm.h"
46 #include "paraCalculationState.h"
47 #include "paraTask.h"
48 #include "paraSolverState.h"
49 #include "paraSolverPool.h"
50 #include "paraInitiator.h"
51 #include "paraTimer.h"
52 #include "paraDeterministicTimer.h"
53 
54 #ifdef UG_WITH_UGS
55 #include "ugs/ugsDef.h"
56 #include "ugs/ugsParaCommMpi.h"
57 #endif
58 
59 namespace UG
60 {
61 
62 ///
63 /// running phase definition
64 ///
66 {
67  RampUpPhase, ///< ramp-up phase
68  NormalRunningPhase, ///< normal running phase (primary phase)
69  TerminationPhase ///< termination phase, includes interrupting phase
70 };
71 
72 ///
73 /// Class for LoadCoordinator
74 ///
76 {
77 
78 protected:
79 
81 
82  int nHandlers; ///< number of valid handlers
83  MessageHandlerFunctionPointer *messageHandler; ///< message handlers table for primary phase
84  MessageHandlerFunctionPointer *racingRampUpMessageHandler; ///< message handlers table for racing stage
85 
86  int globalSubtreeIdGen; ///< global subtree id generator
87 #ifdef UG_WITH_UGS
88  UGS::UgsParaCommMpi *commUgs; ///< communicator used for UGS: None zero means LC is running under UGS
89 #endif
90  ParaComm *paraComm; ///< communicator used
91  ParaParamSet *paraParams; ///< UG parameter set
92  ParaInitiator *paraInitiator; ///< initiator
93  bool *racingSolversExist; ///< indicate if racing solver exits or not, true: exists
94  bool restarted; ///< indicates that this run is restarted from checkpoint files
95  RunningPhase runningPhase; ///< status of LoadCoordinator
96 
97  bool computationIsInterrupted; ///< indicate that current computation is interrupted or not
98  bool interruptedFromControlTerminal; ///< interrupted from control terminal
99  bool hardTimeLimitIsReached; ///< indicate that hard time limit is reached or not
100  bool memoryLimitIsReached; ///< indicate if memory limit is reached or not in a solver, when base solver has memory management feature
101  bool interruptIsRequested; ///< indicate that all solver interrupt message is requested or not
102 
103  ///
104  /// Pools in LoadCorrdinator
105  ///
106  ParaSolverPool *paraSolverPool; ///< solver pool
107  ParaRacingSolverPool *paraRacingSolverPool; ///< racing solver pool
108 
109  long long nSolvedInInterruptedRacingSolvers; ///< number of tasks solved of the winner solver in the racing solvers
110  long long nTasksLeftInInterruptedRacingSolvers; ///< number of of tasks remains of the the winner solver in the racing solvers
111 
112  ///
113  /// For checkpoint
114  ///
115  double previousCheckpointTime; ///< previous checkpoint time
116  char lastCheckpointTimeStr[26]; ///< lastCheckpointTimeStr[0] == ' ' means no checkpoint
117 
118  ///
119  /// epsilon
120  ///
121  double eps; ///< absolute values smaller than this are considered zero */
122 
123  ///
124  /// racing winner information
125  ///
126  int racingWinner; ///< racing winner, -1: not determined
127  ParaRacingRampUpParamSet *racingWinnerParams; ///< racing winner parameter set
128 
129  ///
130  /// racing termination information
131  ///
132  bool racingTermination; ///< racing termination flag, true: if a racing solver solved the problem
133  int nSolvedRacingTermination; ///< number of tasks solved at the racing termination solver
134 
135  ///
136  /// counter to check if all solvers are terminated or not
137  ///
138  size_t nTerminated; ///< number of terminated Solvers
139 
140  ///
141  /// Timers for LoadCoordinator
142  ///
143  ParaTimer *paraTimer; ///< normal timer used
144  ParaDeterministicTimer *paraDetTimer; ///< deterministic timer used in case of deterministic mode
145  ///< this timer need to be created in case of deterministic mode
146 
147  ///
148  /// output streams and flags which indicate the output is specified or not
149  ///
150  bool logSolvingStatusFlag; ///< indicate if solving status is logged or not
151  std::ofstream ofsLogSolvingStatus; ///< ofstream for solving status
152  std::ostream *osLogSolvingStatus; ///< ostram for solving status to switch output location
153  bool logTasksTransferFlag; ///< indicate if task transfer info. is logged or not
154  std::ofstream ofsLogTasksTransfer; ///< ofstream for task transfer info.
155  std::ostream *osLogTasksTransfer; ///< ostream for task transfer info. to switch output location
156  std::ofstream ofsStatisticsFinalRun; ///< ofstream for statistics of the final run
157  std::ostream *osStatisticsFinalRun; ///< ostream for statistics of the final run
158  std::ofstream ofsStatisticsRacingRampUp; ///< ofstream for statistics for racing solvers
159  std::ostream *osStatisticsRacingRampUp; ///< ostream for statistics for racing solvers to switch output location
160 
161  ParaSolution *pendingSolution; ///< pending solution during merging
162  bool terminationIssued; ///< indicate termination request is issued
163  std::mutex routineMutex; ///< used to exclusive control of routines
164 
165  ///
166  /// write transfer log
167  ///
168  virtual void writeTransferLog(
169  int rank, ///< solver rank
170  ParaCalculationState *state ///< calculation status
171  );
172  ///
173  /// write transfer log
174  ///
175  virtual void writeTransferLog(
176  int rank ///< solver rank
177  );
178 
179  ///
180  /// write transfer log in racing
181  ///
182  virtual void writeTransferLogInRacing(
183  int rank, ///< solver rank
184  ParaCalculationState *state ///< calculation status
185  );
186 
187  ///
188  /// write transfer log in racing
189  ///
190  virtual void writeTransferLogInRacing(
191  int rank ///< solver rank
192  );
193 
194  ///
195  /// notify ramp-up to all solvers
196  ///
197  virtual void sendRampUpToAllSolvers(
198  );
199 
200  ///
201  /// notify retry ramp-up to all solvers (Maybe, this remove from this base class)
202  ///
204  )
205  {
206  }
207 
208  ///
209  /// send interrupt request to all solvers
210  ///
211  virtual void sendInterruptRequest(
212  ) = 0;
213 
214  ///
215  /// terminate all solvers
216  ///
217  virtual void terminateAllSolvers(
218  );
219 
220  ///
221  /// create a new global subtree Id
222  /// @return global subtree id generated
223  ///
225  )
226  {
227  return ++globalSubtreeIdGen;
228  }
229 
230  ///////////////////////
231  ///
232  /// Message handlers
233  ///
234  ///////////////////////
235 
236  ///
237  /// function to process TagTask message
238  /// @return always 0 (for extension)
239  ///
240  virtual int processTagTask(
241  int source, ///< source solver rank
242  int tag ///< TagTask
243  ) = 0;
244 
245  ///
246  /// function to process TagSolution message
247  /// @return always 0 (for extension)
248  ///
249  virtual int processTagSolution(
250  int source, ///< source solver rank
251  int tag ///< TagSolution
252  ) = 0;
253 
254  ///
255  /// function to process TagSolverState message
256  /// @return always 0 (for extension)
257  ///
258  virtual int processTagSolverState(
259  int source, ///< source solver rank
260  int tag ///< TagSolverState
261  ) = 0;
262 
263  ///
264  /// function to process TagCompletionOfCalculation message
265  /// @return always 0 (for extension)
266  ///
268  int source, ///< source solver rank
269  int tag ///< TagCompletionOfCalculation
270  ) = 0;
271 
272  ///
273  /// function to process TagTerminated message
274  /// @return always 0 (for extension)
275  ///
276  virtual int processTagTerminated(
277  int source, ///< source solver rank
278  int tag ///< TagTerminated
279  );
280 
281  ///
282  /// function to process TagHardTimeLimit message
283  /// @return always 0 (for extension)
284  ///
285  virtual int processTagHardTimeLimit(
286  int source, ///< source solver rank
287  int tag ///< TagHardTimeLimit
288  );
289 
290  ///
291  /// function to process TagToken message
292  /// @return always 0 (for extension)
293  ///
294  virtual int processTagToken(
295  int source, ///< source solver rank
296  int tag ///< TagToken
297  );
298 
299  ///////////////////////
300  ///
301  /// message handlers specialized for racing ramp-up
302  ///
303  ///////////////////////
304 
305  ///
306  /// function to process TagSolverState message in racing ramp-up stage
307  /// @return always 0 (for extension)
308  ///
310  int source, ///< source solver rank
311  int tag ///< TagSolverState
312  ) = 0;
313 
314  ///
315  /// function to process TagCompletionOfCalculation message in racing ramp-up stage
316  /// @return always 0 (for extension)
317  ///
319  int source, ///< source solver rank
320  int tag ///< TagCompletionOfCalculation
321  ) = 0;
322 
323  ///
324  /// check if current stage is in racing or not
325  /// @return true, if current stage is in racing
326  ///
327 // bool isRacingStage(
328 // )
329 // {
330 // if( // ( !paraInitiator->getPrefixWarm() ) &&
331 // runningPhase == RampUpPhase &&
332 // paraParams->getIntParamValue(RampUpPhaseProcess) > 0 && racingWinner < 0 )
333 // return true;
334 // else
335 // return false;
336 // }
337 
338  ///
339  /// send specified tag to all solvers
340  ///
341  void sendTagToAllSolvers(
342  const int tag ///< tag which is sent to all solvers
343  );
344 
345 #ifdef UG_WITH_UGS
346 
347  ///
348  /// check and read incument solution
349  ///
350  // int checkAndReadIncumbent(
351  // );
352 
353 #endif
354 
355  ///
356  /// run function to start main process
357  ///
358  virtual void run(
359  ) = 0;
360 
361  ///
362  /// send ParaTasks to idle solvers
363  /// @return true, if a ParaTasks is sent
364  ///
365  virtual bool sendParaTasksToIdleSolvers(
366  ) = 0;
367 
368 #ifdef UG_WITH_ZLIB
369 
370  ///
371  /// function to update checkpoint files
372  ///
373  virtual void updateCheckpointFiles(
374  ) = 0;
375 
376 #endif
377 
378 public:
379 
380  ///
381  /// constructor
382  ///
384 #ifdef UG_WITH_UGS
385  UGS::UgsParaCommMpi *inComUgs, ///< communicator used for UGS
386 #endif
387  int nHandlers, ///< number of handlers
388  ParaComm *inComm, ///< communicator used
389  ParaParamSet *inParaParamSet, ///< UG parameter set used
390  ParaInitiator *paraInitiator, ///< ParaInitiator for initialization of solving algorithm
391  bool *racingSolversExist, ///< indicate racing solver exits or not
392  ParaTimer *paraTimer, ///< ParaTimer used
393  ParaDeterministicTimer *detTimer ///< DeterministicTimer used
394  );
395 
396  ///
397  /// destructor
398  ///
400  )
401  {
402  /// destructor should be implemented appropriately in a derived class of ParaLoadCoordinator
403  if( paraSolverPool ) delete paraSolverPool;
404  if( paraRacingSolverPool ) delete paraRacingSolverPool;
405 
406  if( messageHandler ) delete [] messageHandler;
407  if( racingRampUpMessageHandler ) delete [] racingRampUpMessageHandler;
408  }
409 
410  ///
411  /// interrupt from out side
412  ///
413  virtual void interrupt(
414  )
415  {
416  interruptedFromControlTerminal = true;
418  }
419 
420 #ifdef UG_WITH_ZLIB
421 
422  ///
423  /// warm start (restart)
424  ///
425  virtual void warmStart(
426  )
427  {
428  // if user want to support warm start (restart), user need to implement this
429  }
430 
431 #endif
432 
433  ///
434  /// run for normal ramp-up
435  ///
436  virtual void run(
437  ParaTask *paraTask ///< root ParaTask
438  )
439  {
440  }
441 
442  ///
443  /// run for racing ramp-up
444  ///
445  virtual void run(
446  ParaTask *paraTask, ///< root ParaTask
447  int nRacingSolvers, ///< number of racing solvers
448  ParaRacingRampUpParamSet **racingRampUpParams ///< racing parameters
449  )
450  {
451  }
452 
453  ///
454  /// execute UG parallel solver totally solver dependent way
455  ///
456  virtual void parallelDispatch(
457  )
458  {
459  run();
460  }
461 
462 };
463 
464 }
465 
466 #endif // __PARA_LOADCOORDINATOR_H__
467 
Base class for calculation state.
ParaSolution * pendingSolution
pending solution during merging
ramp-up phase
MessageHandlerFunctionPointer * messageHandler
message handlers table for primary phase
virtual int processTagSolution(int source, int tag)=0
function to process TagSolution message
bool memoryLimitIsReached
indicate if memory limit is reached or not in a solver, when base solver has memory management featur...
ParaTimer * paraTimer
Timers for LoadCoordinator.
std::ostream * osStatisticsFinalRun
ostream for statistics of the final run
This class has solver state to be transferred.
virtual void sendInterruptRequest()=0
send interrupt request to all solvers
ParaSolverPool * paraSolverPool
Pools in LoadCorrdinator.
std::ostream * osStatisticsRacingRampUp
ostream for statistics for racing solvers to switch output location
class ParaRacingSolverPool (Racing Solver Pool)
std::ofstream ofsStatisticsRacingRampUp
ofstream for statistics for racing solvers
long long nSolvedInInterruptedRacingSolvers
number of tasks solved of the winner solver in the racing solvers
virtual void writeTransferLogInRacing(int rank, ParaCalculationState *state)
write transfer log in racing
int racingWinner
racing winner information
bool restarted
indicates that this run is restarted from checkpoint files
int(UG::ParaLoadCoordinator::* MessageHandlerFunctionPointer)(int, int)
Solver pool.
ParaInitiator * paraInitiator
initiator
Base class for ParaTask.
virtual int processRacingRampUpTagSolverState(int source, int tag)=0
message handlers specialized for racing ramp-up
int nSolvedRacingTermination
number of tasks solved at the racing termination solver
std::ostream * osLogSolvingStatus
ostram for solving status to switch output location
bool interruptedFromControlTerminal
interrupted from control terminal
virtual void sendRetryRampUpToAllSolvers()
notify retry ramp-up to all solvers (Maybe, this remove from this base class)
long long nTasksLeftInInterruptedRacingSolvers
number of of tasks remains of the the winner solver in the racing solvers
Base class of Calculation state in a ParaSolver.
virtual int processTagSolverState(int source, int tag)=0
function to process TagSolverState message
Base class for deterministic timer.
void sendTagToAllSolvers(const int tag)
check if current stage is in racing or not
virtual void parallelDispatch()
execute UG parallel solver totally solver dependent way
Defines for UG Framework.
virtual int processTagTerminated(int source, int tag)
function to process TagTerminated message
bool * racingSolversExist
indicate if racing solver exits or not, true: exists
ParaRacingSolverPool * paraRacingSolverPool
racing solver pool
ParaDeterministicTimer * paraDetTimer
deterministic timer used in case of deterministic mode this timer need to be created in case of deter...
Class for initiator.
Definition: paraInitiator.h:62
class for deterministic timer
virtual void terminateAllSolvers()
terminate all solvers
virtual ~ParaLoadCoordinator()
destructor
double previousCheckpointTime
For checkpoint.
Base class for Timer.
virtual int processTagTask(int source, int tag)=0
Message handlers.
std::ofstream ofsLogTasksTransfer
ofstream for task transfer info.
std::ofstream ofsLogSolvingStatus
ofstream for solving status
std::mutex routineMutex
used to exclusive control of routines
bool interruptIsRequested
indicate that all solver interrupt message is requested or not
ParaParamSet * paraParams
UG parameter set.
virtual int processTagToken(int source, int tag)
function to process TagToken message
int createNewGlobalSubtreeId()
create a new global subtree Id
Base class of communicator for UG Framework.
virtual int processTagHardTimeLimit(int source, int tag)
function to process TagHardTimeLimit message
virtual void run()=0
run function to start main process
ParaComm * paraComm
communicator used
ParaLoadCoordinator(int nHandlers, ParaComm *inComm, ParaParamSet *inParaParamSet, ParaInitiator *paraInitiator, bool *racingSolversExist, ParaTimer *paraTimer, ParaDeterministicTimer *detTimer)
constructor
virtual bool sendParaTasksToIdleSolvers()=0
send ParaTasks to idle solvers
bool logTasksTransferFlag
indicate if task transfer info. is logged or not
class ParaParamSet
Definition: paraParamSet.h:850
std::ostream * osLogTasksTransfer
ostream for task transfer info. to switch output location
virtual void interrupt()
interrupt from out side
Class for LoadCoordinator.
RunningPhase
running phase definition
virtual int processTagCompletionOfCalculation(int source, int tag)=0
function to process TagCompletionOfCalculation message
bool computationIsInterrupted
indicate that current computation is interrupted or not
Base class of initiator that maintains original problem and incumbent solution.
termination phase, includes interrupting phase
bool hardTimeLimitIsReached
indicate that hard time limit is reached or not
class ParaSolverPool (Solver Pool base class)
bool racingTermination
racing termination information
normal running phase (primary phase)
RunningPhase runningPhase
status of LoadCoordinator
virtual void writeTransferLog(int rank, ParaCalculationState *state)
write transfer log
MessageHandlerFunctionPointer * racingRampUpMessageHandler
message handlers table for racing stage
virtual void sendRampUpToAllSolvers()
notify ramp-up to all solvers
ParaRacingRampUpParamSet * racingWinnerParams
racing winner parameter set
int globalSubtreeIdGen
global subtree id generator
bool logSolvingStatusFlag
output streams and flags which indicate the output is specified or not
virtual void run(ParaTask *paraTask, int nRacingSolvers, ParaRacingRampUpParamSet **racingRampUpParams)
run for racing ramp-up
class ParaTimer
Definition: paraTimer.h:48
int nHandlers
number of valid handlers
bool terminationIssued
indicate termination request is issued
size_t nTerminated
counter to check if all solvers are terminated or not
class ParaRacingRampUpParamSet (parameter set for racing ramp-up)
std::ofstream ofsStatisticsFinalRun
ofstream for statistics of the final run
char lastCheckpointTimeStr[26]
lastCheckpointTimeStr[0] == &#39; &#39; means no checkpoint
Base class of communicator object.
Definition: paraComm.h:101
class for solution
Definition: paraSolution.h:53
virtual void run(ParaTask *paraTask)
run for normal ramp-up
virtual int processRacingRampUpTagCompletionOfCalculation(int source, int tag)=0
function to process TagCompletionOfCalculation message in racing ramp-up stage
class ParaTask
Definition: paraTask.h:541