Scippy

UG

Ubiquity Generator framework

paraLoadCoordinator.cpp
Go to the documentation of this file.
1/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2/* */
3/* This file is part of the program and software framework */
4/* UG --- Ubquity Generator Framework */
5/* */
6/* Copyright Written by Yuji Shinano <shinano@zib.de>, */
7/* Copyright (C) 2021-2024 by Zuse Institute Berlin, */
8/* licensed under LGPL version 3 or later. */
9/* Commercial licenses are available through <licenses@zib.de> */
10/* */
11/* This code is free software; you can redistribute it and/or */
12/* modify it under the terms of the GNU Lesser General Public License */
13/* as published by the Free Software Foundation; either version 3 */
14/* of the License, or (at your option) any later version. */
15/* */
16/* This program is distributed in the hope that it will be useful, */
17/* but WITHOUT ANY WARRANTY; without even the implied warranty of */
18/* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */
19/* GNU Lesser General Public License for more details. */
20/* */
21/* You should have received a copy of the GNU Lesser General Public License */
22/* along with this program. If not, see <http://www.gnu.org/licenses/>. */
23/* */
24/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
25
26/**@file paraLoadCoordinator.cpp
27 * @brief Load coordinator.
28 * @author Yuji Shinano
29 *
30 *
31 *
32 */
33
34/*---+----1----+----2----+----3----+----4----+----5----+----6----+----7----+----8----+----9----+----0----+----1----+----2*/
35
36#ifdef _MSC_VER
37#include <functional>
38#else
39#include <unistd.h>
40#endif
41#include <cstdlib>
42#include <cmath>
43#include <ctime>
44#include <cfloat>
45#include <cstdio>
46#include <cerrno>
47#include <cstring>
48#include <climits>
49#include <algorithm>
50#include <iomanip>
51
52#ifdef UG_WITH_ZLIB
53#include "gzstream.h"
54#endif
55
56#include "paraLoadCoordinator.h"
57#include "paraInitialStat.h"
58
59using namespace UG;
60
61ParaLoadCoordinator::ParaLoadCoordinator(
62#ifdef UG_WITH_UGS
63 UGS::UgsParaCommMpi *inCommUgs,
64#endif
65 int inNHandlers,
66 ParaComm *inComm,
67 ParaParamSet *inParaParamSet,
68 ParaInitiator *inParaInitiator,
69 bool *inRacingSolversExist,
70 ParaTimer *inParaTimer,
71 ParaDeterministicTimer *inParaDetTimer
72 )
73 : nHandlers(inNHandlers),
74 messageHandler(0),
75 racingRampUpMessageHandler(0),
76 globalSubtreeIdGen(0),
77 paraParams(inParaParamSet),
78 paraInitiator(inParaInitiator),
79 racingSolversExist(inRacingSolversExist),
80 restarted(false),
81 runningPhase(RampUpPhase),
82 computationIsInterrupted(false),
83 interruptedFromControlTerminal(false),
84 hardTimeLimitIsReached(false),
85 memoryLimitIsReached(false),
86 interruptIsRequested(false),
87 paraSolverPool(0),
88 paraRacingSolverPool(0),
89 nSolvedInInterruptedRacingSolvers(-1),
90 nTasksLeftInInterruptedRacingSolvers(-1),
91 previousCheckpointTime(0.0),
93 racingWinner(-1),
94 racingWinnerParams(0),
95 racingTermination(false),
96 nSolvedRacingTermination(0),
97 nTerminated(0),
98 paraTimer(inParaTimer),
99 paraDetTimer(inParaDetTimer),
100 osLogSolvingStatus(0),
101 osLogTasksTransfer(0),
102 osStatisticsFinalRun(0),
103 osStatisticsRacingRampUp(0),
104 pendingSolution(0),
105 terminationIssued(false)
106{
107#ifdef UG_WITH_UGS
108 commUgs = inCommUgs;
109#endif
110 paraComm = inComm;
111
112 ///
113 /// register message handlers
114 ///
116 for( int i = 0; i < nHandlers; i++ )
117 {
118 messageHandler[i] = 0;
119 }
127 {
129 }
130
131 ///
132 /// set up status log and transfer log
133 ///
136 {
137 std::ostringstream s;
138#ifdef UG_WITH_UGS
139 if( commUgs )
140 {
142 << commUgs->getMySolverName() << "_"
143 << paraInitiator->getParaInstance()->getProbName() << "_LC" << paraComm->getRank() << ".status";
144 }
145 else
146 {
148 << paraInitiator->getParaInstance()->getProbName() << "_LC" << paraComm->getRank() << ".status";
149 }
150#else
152 << paraInitiator->getParaInstance()->getProbName() << "_LC" << paraComm->getRank() << ".status";
153#endif
154 ofsLogSolvingStatus.open(s.str().c_str(), std::ios::app );
156 {
157 std::cout << "Solving status log file cannot open : file name = " << s.str() << std::endl;
158 exit(1);
159 }
161 }
162
165 {
166 std::ostringstream s;
167#ifdef UG_WITH_UGS
168 if( commUgs )
169 {
171 << commUgs->getMySolverName() << "_"
172 << paraInitiator->getParaInstance()->getProbName() << "_LC" << paraComm->getRank() << ".transfer";
173 }
174 else
175 {
177 << paraInitiator->getParaInstance()->getProbName() << "_LC" << paraComm->getRank() << ".transfer";
178 }
179#else
181 << paraInitiator->getParaInstance()->getProbName() << "_LC" << paraComm->getRank() << ".transfer";
182#endif
183 ofsLogTasksTransfer.open(s.str().c_str(), std::ios::app);
185 {
186 std::cout << "Task transfer log file cannot open : file name = " << s.str() << std::endl;
187 exit(1);
188 }
190 }
191
193 {
194 //
195 // open statistic files
196 //
197 std::ostringstream ssfr;
198#ifdef UG_WITH_UGS
199 if( commUgs )
200 {
202 << commUgs->getMySolverName() << "_"
203 << paraInitiator->getParaInstance()->getProbName() << "_statistics_final_LC" << paraComm->getRank();
204 }
205 else
206 {
208 << paraInitiator->getParaInstance()->getProbName() << "_statistics_final_LC" << paraComm->getRank();
209 }
210#else
212 << paraInitiator->getParaInstance()->getProbName() << "_statistics_final_LC" << paraComm->getRank();
213#endif
214 ofsStatisticsFinalRun.open(ssfr.str().c_str(), std::ios::app);
216 {
217 std::cout << "Statistics file for final run cannot open : file name = " << ssfr.str() << std::endl;
218 exit(1);
219 }
221
222// if( paraParams->getIntParamValue(RampUpPhaseProcess) == 1 ||
223// paraParams->getIntParamValue(RampUpPhaseProcess) == 2
224// ) /** racing ramp-up */
225// {
226// std::ostringstream ssrru;
227//#ifdef UG_WITH_UGS
228// if( commUgs )
229// {
230// ssrru << paraParams->getStringParamValue(LogSolvingStatusFilePath)
231// << commUgs->getMySolverName() << "_"
232// << paraInitiator->getParaInstance()->getProbName() << "_statistics_racing_LC" << paraComm->getRank();
233// }
234// else
235// {
236// ssrru << paraParams->getStringParamValue(LogSolvingStatusFilePath)
237// << paraInitiator->getParaInstance()->getProbName() << "_statistics_racing_LC" << paraComm->getRank();
238// }
239//#else
240// ssrru << paraParams->getStringParamValue(LogSolvingStatusFilePath)
241// << paraInitiator->getParaInstance()->getProbName() << "_statistics_racing_LC" << paraComm->getRank();
242//#endif
243// ofsStatisticsRacingRampUp.open(ssrru.str().c_str(), std::ios::app);
244// if( !ofsStatisticsRacingRampUp )
245// {
246// std::cout << "Statistics file for racing ramp-up cannot open : file name = " << ssrru.str() << std::endl;
247// exit(1);
248// }
249// osStatisticsRacingRampUp = &ofsStatisticsRacingRampUp;
250// }
251 }
252
254
255 lastCheckpointTimeStr[0] = ' ';
256 lastCheckpointTimeStr[1] = '\0';
257
259 {
260 assert(paraDetTimer);
261 }
262
263}
264
265int
267 int source,
268 int tag
269 )
270{
271#ifdef _COMM_CPP11
272 std::lock_guard<std::mutex> lock(routineMutex);
273#endif
274
276 paraSolverTerminationState->receive(paraComm, source, tag);
277
278// std::cout << "TagTerminated received from " << source << ", num active solvers = " << paraSolverPool->getNumActiveSolvers() << std::endl;
279
280 if( paraDetTimer )
281 {
282 if( paraDetTimer->getElapsedTime() < paraSolverTerminationState->getDeterministicTime() )
283 {
284 paraDetTimer->update( paraSolverTerminationState->getDeterministicTime() - paraDetTimer->getElapsedTime() );
285 }
287 paraComm->send( NULL, 0, ParaBYTE, source, TagAckCompletion )
288 );
289 }
290
292 {
293 *osStatisticsFinalRun << paraSolverTerminationState->toString(paraInitiator);
294 osStatisticsFinalRun->flush();
295 }
297 {
298 std::cout << paraSolverTerminationState->toString(paraInitiator) << std::endl;
299 }
300
301 if( (!racingTermination) && paraSolverTerminationState->getInterruptedMode() == 1 )
302 {
304 }
305
306 paraSolverPool->terminated(source);
307 nTerminated++;
308
309 delete paraSolverTerminationState;
310
311 return 0;
312}
313
314int
316 int source,
317 int tag
318 )
319{
321 paraComm->receive( NULL, 0, ParaBYTE, source, TagHardTimeLimit)
322 );
324 return 0;
325}
326
327int
329 int source,
330 int tag
331 )
332{
333
334 int token[2];
336 paraComm->receive( token, 2, ParaINT, source, TagToken)
337 );
338 if( !paraSolverPool->isTerminated(token[0]) )
339 {
341 paraComm->send( token, 2, ParaINT, token[0], TagToken )
342 );
343 }
344 else
345 {
346 int startRank = token[0];
347 token[0] = ( token[0] % (paraComm->getSize() - 1) ) + 1;
348 while( paraSolverPool->isTerminated(token[0]) && token[0] != startRank )
349 {
350 token[0] = ( token[0] % (paraComm->getSize() - 1) ) + 1;
351 }
352 if( !paraSolverPool->isTerminated(token[0]) )
353 {
355 paraComm->send( token, 2, ParaINT, token[0], TagToken )
356 );
357 }
358 }
359
360 paraComm->setToken(0, token); // for debug
361
362 return 0;
363}
364
365void
367 )
368{
369 for( int i = 1; i < paraComm->getSize(); i++ )
370 {
372 paraComm->send( NULL, 0, ParaBYTE, i, TagRampUp )
373 );
374 }
375}
376
377void
379 )
380{
381 terminationIssued = true;
382 int exitSolverRequest = 0; // do nothing
383 for( int i = 1; i < paraComm->getSize(); i++ )
384 {
386 {
388 paraComm->send( &exitSolverRequest, 1, ParaINT, i, TagInterruptRequest )
389 );
390 }
392 {
395 );
398 {
399 int token[2];
400 token[0] = i;
401 token[1] = -2;
403 paraComm->send( token, 2, ParaINT, token[0], TagToken )
404 );
405 }
406 }
407 }
408}
409
410void
412 int rank
413 )
414{
415 // output comp infomation to tree log file
417 {
418 *osLogTasksTransfer << "[Solver-ID: " << rank
419 << "] ParaTask was sent " << (paraSolverPool->getCurrentTask(rank))->toString() << std::endl;
420 }
421}
422
423void
425 int rank,
427 )
428{
429 // output comp infomation to tree log file
431 {
432 *osLogTasksTransfer << "[Solver-ID: " << rank
433 << "] Solved " << (paraSolverPool->getCurrentTask(rank))->toString() << std::endl;
434 *osLogTasksTransfer << "[Solver-ID: " << rank
435 << "] " << state->toString() << std::endl;
436 }
437}
438
439void
441 int rank
442 )
443{
444 // output comp infomation to tree log file
446 {
447 *osLogTasksTransfer << "[Solver-ID: " << rank
448 << "] ParaTask was sent " << (paraRacingSolverPool->getCurrentTask(rank))->toString() << std::endl;
449 }
450}
451
452void
454 int rank,
456 )
457{
458 // output comp infomation to tree log file
460 {
461 *osLogTasksTransfer << "[Solver-ID: " << rank
462 << "] Solved " << (paraRacingSolverPool->getCurrentTask(rank))->toString() << std::endl;
463 *osLogTasksTransfer << "[Solver-ID: " << rank
464 << "] " << state->toString() << std::endl;
465 }
466}
467
468void
470 const int tag
471 )
472{
473 for( int i = 1; i < paraComm->getSize(); i++ )
474 {
476 paraComm->send( NULL, 0, ParaBYTE, i, tag )
477 );
478 }
479}
Base class of Calculation state in a ParaSolver.
virtual std::string toString()=0
stringfy ParaCalculationState
Base class of communicator object.
Definition: paraComm.h:102
virtual void setToken(int rank, int *token)
set received token to this communicator
Definition: paraComm.h:221
virtual int getSize()=0
get number of UG processes or UG threads depending on run-time environment
virtual int send(void *bufer, int count, const int datatypeId, int dest, const int tag)=0
send function for standard ParaData types
virtual ParaSolverTerminationState * createParaSolverTerminationState()=0
create ParaSolverTerminationState object by default constructor
virtual int receive(void *bufer, int count, const int datatypeId, int source, const int tag)=0
receive function for standard ParaData types
virtual int getRank()=0
get rank of this process or this thread depending on run-time environment
class for deterministic timer
virtual void update(double value)=0
update function of the deterministic time. the deterministic time is a kind of counter
virtual double getElapsedTime()=0
getter of the deterministic time
Class for initiator.
Definition: paraInitiator.h:63
virtual ParaInstance * getParaInstance()=0
get instance object
virtual double getEpsilon()=0
get epsilon specified
virtual const char * getProbName()=0
get problem name
ParaParamSet * paraParams
UG parameter set.
bool hardTimeLimitIsReached
indicate that hard time limit is reached or not
ParaComm * paraComm
communicator used
char lastCheckpointTimeStr[26]
lastCheckpointTimeStr[0] == ' ' means no checkpoint
virtual void terminateAllSolvers()
terminate all solvers
virtual int processTagTerminated(int source, int tag)
function to process TagTerminated message
std::mutex routineMutex
used to exclusive control of routines
bool terminationIssued
indicate termination request is issued
std::ostream * osLogTasksTransfer
ostream for task transfer info. to switch output location
virtual int processTagCompletionOfCalculation(int source, int tag)=0
function to process TagCompletionOfCalculation message
virtual void writeTransferLog(int rank, ParaCalculationState *state)
write transfer log
size_t nTerminated
counter to check if all solvers are terminated or not
virtual int processTagSolverState(int source, int tag)=0
function to process TagSolverState message
ParaSolverPool * paraSolverPool
Pools in LoadCorrdinator.
virtual void sendRampUpToAllSolvers()
notify ramp-up to all solvers
std::ofstream ofsStatisticsFinalRun
ofstream for statistics of the final run
ParaRacingSolverPool * paraRacingSolverPool
racing solver pool
virtual int processTagSolution(int source, int tag)=0
function to process TagSolution message
void sendTagToAllSolvers(const int tag)
send specified tag to all solvers
int(UG::ParaLoadCoordinator::* MessageHandlerFunctionPointer)(int, int)
ParaInitiator * paraInitiator
initiator
std::ostream * osStatisticsFinalRun
ostream for statistics of the final run
std::ofstream ofsLogTasksTransfer
ofstream for task transfer info.
virtual int processTagToken(int source, int tag)
function to process TagToken message
virtual int processTagTask(int source, int tag)=0
Message handlers.
std::ostream * osLogSolvingStatus
ostram for solving status to switch output location
virtual void writeTransferLogInRacing(int rank, ParaCalculationState *state)
write transfer log in racing
std::ofstream ofsLogSolvingStatus
ofstream for solving status
bool logTasksTransferFlag
indicate if task transfer info. is logged or not
ParaDeterministicTimer * paraDetTimer
deterministic timer used in case of deterministic mode this timer need to be created in case of deter...
virtual int processTagHardTimeLimit(int source, int tag)
function to process TagHardTimeLimit message
bool logSolvingStatusFlag
output streams and flags which indicate the output is specified or not
int nHandlers
number of valid handlers
MessageHandlerFunctionPointer * messageHandler
message handlers table for primary phase
bool computationIsInterrupted
indicate that current computation is interrupted or not
bool racingTermination
racing termination information
class ParaParamSet
Definition: paraParamSet.h:850
bool getBoolParamValue(int param)
get bool parameter value
const char * getStringParamValue(int param)
get string parameter value
virtual ParaTask * getCurrentTask(int rank)=0
get root ParaTask object of the Solver specified
virtual ParaTask * getCurrentTask(int rank)=0
get current solving ParaTask in the Solver specified by rank
virtual bool isInterruptRequested(int rank)=0
check if the Solver specified by rank is interrupt requested or not
virtual bool isTerminateRequested(int rank)=0
check if the Solver specified by rank is terminate requested or not
virtual void terminateRequested(int rank)=0
set the Solver specified by rank is terminate requested
virtual void terminated(int rank)=0
set the Solver specified by rank is terminated
virtual bool isSolverActive(int rank)=0
check if the Solver specified by rank is active or not
virtual bool isTerminated(int rank)=0
check if the Solver specified by rank is terminated or not
class ParaSolverTerminationState (Solver termination state in a ParaSolver)
int getInterruptedMode()
getter of interrupted flag
double getDeterministicTime()
getter of deterministic time
virtual std::string toString(ParaInitiator *initiator)=0
stringfy ParaSolverTerminationState object
virtual void receive(ParaComm *comm, int source, int tag)=0
receive this object
class ParaTimer
Definition: paraTimer.h:49
static ScipParaInitiator * paraInitiator
Definition: fscip.cpp:76
Utilities for handling gzipped input and output streams.
static const int TagAckCompletion
Definition: paraTagDef.h:62
static const int TagCompletionOfCalculation
Definition: paraTagDef.h:54
static const int TagSolution
Definition: paraTagDef.h:51
static const int TagToken
Definition: paraTagDef.h:63
static const int TagInterruptRequest
Definition: paraTagDef.h:57
static const double eps
static const int LogTasksTransferFilePath
Definition: paraParamSet.h:128
static const int ParaINT
Definition: paraComm.h:66
static const int LogSolvingStatus
Definition: paraParamSet.h:73
static const int TagTerminated
Definition: paraTagDef.h:58
static const int LogSolvingStatusFilePath
Definition: paraParamSet.h:127
static const int TagTerminateRequest
Definition: paraTagDef.h:56
static const int ParaBYTE
Definition: paraComm.h:79
static const int TagRampUp
Definition: paraTagDef.h:50
static const int TagSolverState
Definition: paraTagDef.h:53
static const int TagHardTimeLimit
Definition: paraTagDef.h:61
static const int Deterministic
Definition: paraParamSet.h:76
@ RampUpPhase
ramp-up phase
static const int TagTask
Definition: paraTagDef.h:47
static const int LogTasksTransfer
Definition: paraParamSet.h:74
static const int Quiet
Definition: paraParamSet.h:71
static const int StatisticsToStdout
Definition: paraParamSet.h:77
#define PARA_COMM_CALL(paracommcall)
Definition: paraComm.h:47
#define MINEPSILON
Definition: paraDef.h:50
Base class for initial statistics collecting class.
Load Coordinator.