Scippy

UG

Ubiquity Generator framework

paraLoadCoordinator.cpp
Go to the documentation of this file.
1 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2 /* */
3 /* This file is part of the program and software framework */
4 /* UG --- Ubquity Generator Framework */
5 /* */
6 /* Copyright Written by Yuji Shinano <shinano@zib.de>, */
7 /* Copyright (C) 2021 by Zuse Institute Berlin, */
8 /* licensed under LGPL version 3 or later. */
9 /* Commercial licenses are available through <licenses@zib.de> */
10 /* */
11 /* This code is free software; you can redistribute it and/or */
12 /* modify it under the terms of the GNU Lesser General Public License */
13 /* as published by the Free Software Foundation; either version 3 */
14 /* of the License, or (at your option) any later version. */
15 /* */
16 /* This program is distributed in the hope that it will be useful, */
17 /* but WITHOUT ANY WARRANTY; without even the implied warranty of */
18 /* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */
19 /* GNU Lesser General Public License for more details. */
20 /* */
21 /* You should have received a copy of the GNU Lesser General Public License */
22 /* along with this program. If not, see <http://www.gnu.org/licenses/>. */
23 /* */
24 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
25 
26 /**@file paraLoadCoordinator.cpp
27  * @brief Load coordinator.
28  * @author Yuji Shinano
29  *
30  *
31  *
32  */
33 
34 /*---+----1----+----2----+----3----+----4----+----5----+----6----+----7----+----8----+----9----+----0----+----1----+----2*/
35 
36 #ifdef _MSC_VER
37 #include <functional>
38 #else
39 #include <unistd.h>
40 #endif
41 #include <cstdlib>
42 #include <cmath>
43 #include <ctime>
44 #include <cfloat>
45 #include <cstdio>
46 #include <cerrno>
47 #include <cstring>
48 #include <climits>
49 #include <algorithm>
50 #include <iomanip>
51 
52 #ifdef UG_WITH_ZLIB
53 #include "gzstream.h"
54 #endif
55 
56 #include "paraLoadCoordinator.h"
57 #include "paraInitialStat.h"
58 
59 using namespace UG;
60 
62 #ifdef UG_WITH_UGS
63  UGS::UgsParaCommMpi *inCommUgs,
64 #endif
65  int inNHandlers,
66  ParaComm *inComm,
67  ParaParamSet *inParaParamSet,
68  ParaInitiator *inParaInitiator,
69  bool *inRacingSolversExist,
70  ParaTimer *inParaTimer,
71  ParaDeterministicTimer *inParaDetTimer
72  )
73  : nHandlers(inNHandlers),
74  messageHandler(0),
75  racingRampUpMessageHandler(0),
76  globalSubtreeIdGen(0),
77  paraParams(inParaParamSet),
78  paraInitiator(inParaInitiator),
79  racingSolversExist(inRacingSolversExist),
80  restarted(false),
81  runningPhase(RampUpPhase),
82  computationIsInterrupted(false),
83  interruptedFromControlTerminal(false),
84  hardTimeLimitIsReached(false),
85  memoryLimitIsReached(false),
86  interruptIsRequested(false),
87  paraSolverPool(0),
88  paraRacingSolverPool(0),
89  nSolvedInInterruptedRacingSolvers(-1),
90  nTasksLeftInInterruptedRacingSolvers(-1),
91  previousCheckpointTime(0.0),
92  eps(MINEPSILON),
93  racingWinner(-1),
94  racingWinnerParams(0),
95  racingTermination(false),
96  nSolvedRacingTermination(0),
97  nTerminated(0),
98  paraTimer(inParaTimer),
99  paraDetTimer(inParaDetTimer),
100  osLogSolvingStatus(0),
101  osLogTasksTransfer(0),
102  osStatisticsFinalRun(0),
103  osStatisticsRacingRampUp(0),
104  pendingSolution(0),
105  terminationIssued(false)
106 {
107 #ifdef UG_WITH_UGS
108  commUgs = inCommUgs;
109 #endif
110  paraComm = inComm;
111 
112  ///
113  /// register message handlers
114  ///
116  for( int i = 0; i < nHandlers; i++ )
117  {
118  messageHandler[i] = 0;
119  }
127  {
129  }
130 
131  ///
132  /// set up status log and transfer log
133  ///
136  {
137  std::ostringstream s;
138 #ifdef UG_WITH_UGS
139  if( commUgs )
140  {
142  << commUgs->getMySolverName() << "_"
143  << paraInitiator->getParaInstance()->getProbName() << "_LC" << paraComm->getRank() << ".status";
144  }
145  else
146  {
148  << paraInitiator->getParaInstance()->getProbName() << "_LC" << paraComm->getRank() << ".status";
149  }
150 #else
152  << paraInitiator->getParaInstance()->getProbName() << "_LC" << paraComm->getRank() << ".status";
153 #endif
154  ofsLogSolvingStatus.open(s.str().c_str(), std::ios::app );
155  if( !ofsLogSolvingStatus )
156  {
157  std::cout << "Solving status log file cannot open : file name = " << s.str() << std::endl;
158  exit(1);
159  }
161  }
162 
165  {
166  std::ostringstream s;
167 #ifdef UG_WITH_UGS
168  if( commUgs )
169  {
171  << commUgs->getMySolverName() << "_"
172  << paraInitiator->getParaInstance()->getProbName() << "_LC" << paraComm->getRank() << ".transfer";
173  }
174  else
175  {
177  << paraInitiator->getParaInstance()->getProbName() << "_LC" << paraComm->getRank() << ".transfer";
178  }
179 #else
181  << paraInitiator->getParaInstance()->getProbName() << "_LC" << paraComm->getRank() << ".transfer";
182 #endif
183  ofsLogTasksTransfer.open(s.str().c_str(), std::ios::app);
184  if( !ofsLogTasksTransfer )
185  {
186  std::cout << "Task transfer log file cannot open : file name = " << s.str() << std::endl;
187  exit(1);
188  }
190  }
191 
193  {
194  //
195  // open statistic files
196  //
197  std::ostringstream ssfr;
198 #ifdef UG_WITH_UGS
199  if( commUgs )
200  {
202  << commUgs->getMySolverName() << "_"
203  << paraInitiator->getParaInstance()->getProbName() << "_statistics_final_LC" << paraComm->getRank();
204  }
205  else
206  {
208  << paraInitiator->getParaInstance()->getProbName() << "_statistics_final_LC" << paraComm->getRank();
209  }
210 #else
212  << paraInitiator->getParaInstance()->getProbName() << "_statistics_final_LC" << paraComm->getRank();
213 #endif
214  ofsStatisticsFinalRun.open(ssfr.str().c_str(), std::ios::app);
215  if( !ofsStatisticsFinalRun )
216  {
217  std::cout << "Statistics file for final run cannot open : file name = " << ssfr.str() << std::endl;
218  exit(1);
219  }
221 
222 // if( paraParams->getIntParamValue(RampUpPhaseProcess) == 1 ||
223 // paraParams->getIntParamValue(RampUpPhaseProcess) == 2
224 // ) /** racing ramp-up */
225 // {
226 // std::ostringstream ssrru;
227 //#ifdef UG_WITH_UGS
228 // if( commUgs )
229 // {
230 // ssrru << paraParams->getStringParamValue(LogSolvingStatusFilePath)
231 // << commUgs->getMySolverName() << "_"
232 // << paraInitiator->getParaInstance()->getProbName() << "_statistics_racing_LC" << paraComm->getRank();
233 // }
234 // else
235 // {
236 // ssrru << paraParams->getStringParamValue(LogSolvingStatusFilePath)
237 // << paraInitiator->getParaInstance()->getProbName() << "_statistics_racing_LC" << paraComm->getRank();
238 // }
239 //#else
240 // ssrru << paraParams->getStringParamValue(LogSolvingStatusFilePath)
241 // << paraInitiator->getParaInstance()->getProbName() << "_statistics_racing_LC" << paraComm->getRank();
242 //#endif
243 // ofsStatisticsRacingRampUp.open(ssrru.str().c_str(), std::ios::app);
244 // if( !ofsStatisticsRacingRampUp )
245 // {
246 // std::cout << "Statistics file for racing ramp-up cannot open : file name = " << ssrru.str() << std::endl;
247 // exit(1);
248 // }
249 // osStatisticsRacingRampUp = &ofsStatisticsRacingRampUp;
250 // }
251  }
252 
254 
255  lastCheckpointTimeStr[0] = ' ';
256  lastCheckpointTimeStr[1] = '\0';
257 
259  {
260  assert(paraDetTimer);
261  }
262 
263 }
264 
265 int
267  int source,
268  int tag
269  )
270 {
271 #ifdef _COMM_CPP11
272  std::lock_guard<std::mutex> lock(routineMutex);
273 #endif
274 
276  paraSolverTerminationState->receive(paraComm, source, tag);
277 
278 // std::cout << "TagTerminated received from " << source << ", num active solvers = " << paraSolverPool->getNumActiveSolvers() << std::endl;
279 
280  if( paraDetTimer )
281  {
282  if( paraDetTimer->getElapsedTime() < paraSolverTerminationState->getDeterministicTime() )
283  {
284  paraDetTimer->update( paraSolverTerminationState->getDeterministicTime() - paraDetTimer->getElapsedTime() );
285  }
287  paraComm->send( NULL, 0, ParaBYTE, source, TagAckCompletion )
288  );
289  }
290 
292  {
293  *osStatisticsFinalRun << paraSolverTerminationState->toString(paraInitiator);
294  osStatisticsFinalRun->flush();
295  }
297  {
298  std::cout << paraSolverTerminationState->toString(paraInitiator) << std::endl;
299  }
300 
301  if( (!racingTermination) && paraSolverTerminationState->getInterruptedMode() == 1 )
302  {
304  }
305 
306  paraSolverPool->terminated(source);
307  nTerminated++;
308 
309  delete paraSolverTerminationState;
310 
311  return 0;
312 }
313 
314 int
316  int source,
317  int tag
318  )
319 {
321  paraComm->receive( NULL, 0, ParaBYTE, source, TagHardTimeLimit)
322  );
323  hardTimeLimitIsReached = true;
324  return 0;
325 }
326 
327 int
329  int source,
330  int tag
331  )
332 {
333 
334  int token[2];
336  paraComm->receive( token, 2, ParaINT, source, TagToken)
337  );
338  if( !paraSolverPool->isTerminated(token[0]) )
339  {
341  paraComm->send( token, 2, ParaINT, token[0], TagToken )
342  );
343  }
344  else
345  {
346  int startRank = token[0];
347  token[0] = ( token[0] % (paraComm->getSize() - 1) ) + 1;
348  while( paraSolverPool->isTerminated(token[0]) && token[0] != startRank )
349  {
350  token[0] = ( token[0] % (paraComm->getSize() - 1) ) + 1;
351  }
352  if( !paraSolverPool->isTerminated(token[0]) )
353  {
355  paraComm->send( token, 2, ParaINT, token[0], TagToken )
356  );
357  }
358  }
359 
360  paraComm->setToken(0, token); // for debug
361 
362  return 0;
363 }
364 
365 void
367  )
368 {
369  for( int i = 1; i < paraComm->getSize(); i++ )
370  {
372  paraComm->send( NULL, 0, ParaBYTE, i, TagRampUp )
373  );
374  }
375 }
376 
377 void
379  )
380 {
381  terminationIssued = true;
382  int exitSolverRequest = 0; // do nothing
383  for( int i = 1; i < paraComm->getSize(); i++ )
384  {
386  {
388  paraComm->send( &exitSolverRequest, 1, ParaINT, i, TagInterruptRequest )
389  );
390  }
392  {
394  paraComm->send( NULL, 0, ParaBYTE, i, TagTerminateRequest )
395  );
398  {
399  int token[2];
400  token[0] = i;
401  token[1] = -2;
403  paraComm->send( token, 2, ParaINT, token[0], TagToken )
404  );
405  }
406  }
407  }
408 }
409 
410 void
412  int rank
413  )
414 {
415  // output comp infomation to tree log file
417  {
418  *osLogTasksTransfer << "[Solver-ID: " << rank
419  << "] ParaTask was sent " << (paraSolverPool->getCurrentTask(rank))->toString() << std::endl;
420  }
421 }
422 
423 void
425  int rank,
426  ParaCalculationState *state
427  )
428 {
429  // output comp infomation to tree log file
431  {
432  *osLogTasksTransfer << "[Solver-ID: " << rank
433  << "] Solved " << (paraSolverPool->getCurrentTask(rank))->toString() << std::endl;
434  *osLogTasksTransfer << "[Solver-ID: " << rank
435  << "] " << state->toString() << std::endl;
436  }
437 }
438 
439 void
441  int rank
442  )
443 {
444  // output comp infomation to tree log file
446  {
447  *osLogTasksTransfer << "[Solver-ID: " << rank
448  << "] ParaTask was sent " << (paraRacingSolverPool->getCurrentTask(rank))->toString() << std::endl;
449  }
450 }
451 
452 void
454  int rank,
455  ParaCalculationState *state
456  )
457 {
458  // output comp infomation to tree log file
460  {
461  *osLogTasksTransfer << "[Solver-ID: " << rank
462  << "] Solved " << (paraRacingSolverPool->getCurrentTask(rank))->toString() << std::endl;
463  *osLogTasksTransfer << "[Solver-ID: " << rank
464  << "] " << state->toString() << std::endl;
465  }
466 }
467 
468 void
470  const int tag
471  )
472 {
473  for( int i = 1; i < paraComm->getSize(); i++ )
474  {
476  paraComm->send( NULL, 0, ParaBYTE, i, tag )
477  );
478  }
479 }
ramp-up phase
MessageHandlerFunctionPointer * messageHandler
message handlers table for primary phase
virtual int processTagSolution(int source, int tag)=0
function to process TagSolution message
std::ostream * osStatisticsFinalRun
ostream for statistics of the final run
virtual ParaSolverTerminationState * createParaSolverTerminationState()=0
create ParaSolverTerminationState object by default constructor
ParaSolverPool * paraSolverPool
Pools in LoadCorrdinator.
virtual std::string toString(ParaInitiator *initiator)=0
stringfy ParaSolverTerminationState object
virtual ParaTask * getCurrentTask(int rank)=0
get current solving ParaTask in the Solver specified by rank
static const int LogSolvingStatusFilePath
Definition: paraParamSet.h:127
int getInterruptedMode()
getter of interrupted flag
virtual void writeTransferLogInRacing(int rank, ParaCalculationState *state)
write transfer log in racing
virtual double getEpsilon()=0
get epsilon specified
static const int TagTerminateRequest
Definition: paraTagDef.h:56
static ScipParaInitiator * paraInitiator
Definition: fscip.cpp:76
int(UG::ParaLoadCoordinator::* MessageHandlerFunctionPointer)(int, int)
double getDeterministicTime()
getter of deterministic time
virtual bool isSolverActive(int rank)=0
check if the Solver specified by rank is active or not
static const int TagRampUp
Definition: paraTagDef.h:50
ParaInitiator * paraInitiator
initiator
virtual void setToken(int rank, int *token)
set received token to this communicator
Definition: paraComm.h:221
static const int TagHardTimeLimit
Definition: paraTagDef.h:61
std::ostream * osLogSolvingStatus
ostram for solving status to switch output location
Base class of Calculation state in a ParaSolver.
virtual bool isTerminated(int rank)=0
check if the Solver specified by rank is terminated or not
virtual int processTagSolverState(int source, int tag)=0
function to process TagSolverState message
void sendTagToAllSolvers(const int tag)
check if current stage is in racing or not
virtual int processTagTerminated(int source, int tag)
function to process TagTerminated message
virtual void receive(ParaComm *comm, int source, int tag)=0
receive this object
ParaRacingSolverPool * paraRacingSolverPool
racing solver pool
ParaDeterministicTimer * paraDetTimer
deterministic timer used in case of deterministic mode this timer need to be created in case of deter...
Class for initiator.
Definition: paraInitiator.h:62
class for deterministic timer
virtual void terminateAllSolvers()
terminate all solvers
Base class for initial statistics collecting class.
virtual const char * getProbName()=0
get problem name
#define PARA_COMM_CALL(paracommcall)
Definition: paraComm.h:47
static const int Deterministic
Definition: paraParamSet.h:76
virtual int getRank()=0
get rank of this process or this thread depending on run-time environment
virtual int processTagTask(int source, int tag)=0
Message handlers.
std::ofstream ofsLogTasksTransfer
ofstream for task transfer info.
std::ofstream ofsLogSolvingStatus
ofstream for solving status
virtual double getElapsedTime()=0
getter of the deterministic time
std::mutex routineMutex
used to exclusive control of routines
static const int TagSolverState
Definition: paraTagDef.h:53
static const int TagSolution
Definition: paraTagDef.h:51
ParaParamSet * paraParams
UG parameter set.
virtual int processTagToken(int source, int tag)
function to process TagToken message
virtual void terminateRequested(int rank)=0
set the Solver specified by rank is terminate requested
static const int TagToken
Definition: paraTagDef.h:63
static const int LogTasksTransfer
Definition: paraParamSet.h:74
class ParaSolverTerminationState (Solver termination state in a ParaSolver)
virtual int processTagHardTimeLimit(int source, int tag)
function to process TagHardTimeLimit message
virtual int receive(void *bufer, int count, const int datatypeId, int source, const int tag)=0
receive function for standard ParaData types
ParaComm * paraComm
communicator used
ParaLoadCoordinator(int nHandlers, ParaComm *inComm, ParaParamSet *inParaParamSet, ParaInitiator *paraInitiator, bool *racingSolversExist, ParaTimer *paraTimer, ParaDeterministicTimer *detTimer)
constructor
bool logTasksTransferFlag
indicate if task transfer info. is logged or not
class ParaParamSet
Definition: paraParamSet.h:850
std::ostream * osLogTasksTransfer
ostream for task transfer info. to switch output location
Load Coordinator.
const char * getStringParamValue(int param)
for char parameters
virtual int processTagCompletionOfCalculation(int source, int tag)=0
function to process TagCompletionOfCalculation message
static const int ParaBYTE
Definition: paraComm.h:79
static const int TagTerminated
Definition: paraTagDef.h:58
virtual void terminated(int rank)=0
set the Solver specified by rank is terminated
bool computationIsInterrupted
indicate that current computation is interrupted or not
virtual bool isTerminateRequested(int rank)=0
check if the Solver specified by rank is terminate requested or not
static const int Quiet
Definition: paraParamSet.h:71
bool hardTimeLimitIsReached
indicate that hard time limit is reached or not
bool racingTermination
racing termination information
virtual bool isInterruptRequested(int rank)=0
check if the Solver specified by rank is interrupt requested or not
static const int TagInterruptRequest
Definition: paraTagDef.h:57
virtual void writeTransferLog(int rank, ParaCalculationState *state)
write transfer log
virtual int send(void *bufer, int count, const int datatypeId, int dest, const int tag)=0
send function for standard ParaData types
virtual void sendRampUpToAllSolvers()
notify ramp-up to all solvers
virtual ParaTask * getCurrentTask(int rank)=0
get root ParaTask object of the Solver specified
static const int LogSolvingStatus
Definition: paraParamSet.h:73
virtual int getSize()=0
get number of UG processes or UG threads depending on run-time environment
bool logSolvingStatusFlag
output streams and flags which indicate the output is specified or not
static const int TagTask
Definition: paraTagDef.h:47
virtual ParaInstance * getParaInstance()=0
get instance object
#define MINEPSILON
Definition: paraDef.h:50
static const int ParaINT
Definition: paraComm.h:66
static const int StatisticsToStdout
Definition: paraParamSet.h:77
class ParaTimer
Definition: paraTimer.h:48
int nHandlers
number of valid handlers
bool terminationIssued
indicate termination request is issued
size_t nTerminated
counter to check if all solvers are terminated or not
virtual void update(double value)=0
update function of the deterministic time. the deterministic time is a kind of counter ...
virtual std::string toString()=0
stringfy ParaCalculationState
static const int TagCompletionOfCalculation
Definition: paraTagDef.h:54
static const double eps
std::ofstream ofsStatisticsFinalRun
ofstream for statistics of the final run
char lastCheckpointTimeStr[26]
lastCheckpointTimeStr[0] == &#39; &#39; means no checkpoint
static const int LogTasksTransferFilePath
Definition: paraParamSet.h:128
Base class of communicator object.
Definition: paraComm.h:101
static const int TagAckCompletion
Definition: paraTagDef.h:62
bool getBoolParamValue(int param)
for bool parameters