Scippy

UG

Ubiquity Generator framework

paraCommMpi.cpp
Go to the documentation of this file.
1/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2/* */
3/* This file is part of the program and software framework */
4/* UG --- Ubquity Generator Framework */
5/* */
6/* Copyright Written by Yuji Shinano <shinano@zib.de>, */
7/* Copyright (C) 2021-2024 by Zuse Institute Berlin, */
8/* licensed under LGPL version 3 or later. */
9/* Commercial licenses are available through <licenses@zib.de> */
10/* */
11/* This code is free software; you can redistribute it and/or */
12/* modify it under the terms of the GNU Lesser General Public License */
13/* as published by the Free Software Foundation; either version 3 */
14/* of the License, or (at your option) any later version. */
15/* */
16/* This program is distributed in the hope that it will be useful, */
17/* but WITHOUT ANY WARRANTY; without even the implied warranty of */
18/* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */
19/* GNU Lesser General Public License for more details. */
20/* */
21/* You should have received a copy of the GNU Lesser General Public License */
22/* along with this program. If not, see <http://www.gnu.org/licenses/>. */
23/* */
24/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
25
26/**@file paraCommMpi.cpp
27 * @brief ParaComm extension for MPI communication.
28 * @author Yuji Shinano
29 *
30 *
31 *
32 */
33
34/*---+----1----+----2----+----3----+----4----+----5----+----6----+----7----+----8----+----9----+----0----+----1----+----2*/
35
36#include <cassert>
37#include <cstring>
38#include "paraTagDef.h"
39#include "paraCommMpi.h"
40#include "paraIsendRequest.h"
41
42using namespace UG;
43
44MPI_Datatype
45ParaCommMpi::datatypes[TYPE_LIST_SIZE];
46
47const char *
48ParaCommMpi::tagStringTable[] = {
61 TAG_STR(TagRacingRampUpParamSets),
66};
67
68void
69ParaCommMpi::lcInit(
71 )
72{
73 tagTraceFlag = paraParamSet->getBoolParamValue(TagTrace);
74 if( tagTraceFlag )
75 {
76 if( paraParamSet->isStringParamDefaultValue(TagTraceFileName) )
77 {
78 tos = &std::cout;
79 }
80 else
81 {
82 std::ostringstream s;
83 s << paraParamSet->getStringParamValue(TagTraceFileName) << myRank;
84 ofs.open(s.str().c_str());
85 tos = &ofs;
86 }
87 }
88 if( paraParamSet->getBoolParamValue(Deterministic) )
89 {
90 token[0] = 0;
91 token[1] = -1;
92 }
93}
94
95void
98 )
99{
100 tagTraceFlag = paraParamSet->getBoolParamValue(TagTrace);
101 if( tagTraceFlag )
102 {
103 if( paraParamSet->isStringParamDefaultValue(TagTraceFileName) )
104 {
105 tos = &std::cout;
106 }
107 else
108 {
109 std::ostringstream s;
110 s << paraParamSet->getStringParamValue(TagTraceFileName) << myRank;
111 ofs.open(s.str().c_str());
112 tos = &ofs;
113 }
114 }
115}
116
117void
119 )
120{
121 MPI_Abort(MPI_COMM_WORLD, 0);
122}
123
124bool
126 int tempRank
127 )
128{
129#ifdef _MUTEX_CPP11
130 std::lock_guard<std::mutex> lock(tokenAccessLock);
131#else
132 pthread_mutex_lock(&tokenAccessLock);
133#endif
134 if( token[0] == myRank )
135 {
136#ifndef _MUTEX_CPP11
137 pthread_mutex_unlock(&tokenAccessLock);
138#endif
139 return true;
140 }
141 else
142 {
143 int previousRank = myRank - 1;
144 if( previousRank == 0 )
145 {
146 if( token[0] != -1 )
147 {
148 previousRank = myCommSize - 1;
149 }
150 }
151 int receivedTag;
152 MPI_Status mpiStatus;
153 MPI_CALL (
154 MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, myComm, &mpiStatus)
155 );
156 receivedTag = mpiStatus.MPI_TAG;
157 TAG_TRACE (Probe, From, mpiStatus.MPI_SOURCE, receivedTag);
158 if( receivedTag == TagToken )
159 {
160 receive(token, 2, ParaINT, 0, TagToken);
161 assert(token[0] == myRank);
162#ifndef _MUTEX_CPP11
163 pthread_mutex_unlock(&tokenAccessLock);
164#endif
165 return true;
166 }
167 else
168 {
169#ifndef _MUTEX_CPP11
170 pthread_mutex_unlock(&tokenAccessLock);
171#endif
172 return false;
173 }
174 }
175}
176
177void
179 int tempRank
180 )
181{
182#ifdef _MUTEX_CPP11
183 std::lock_guard<std::mutex> lock(tokenAccessLock);
184#else
185 pthread_mutex_lock(&tokenAccessLock);
186#endif
187 assert( token[0] == myRank );
188 token[0] = ( token[0] % (myCommSize - 1) ) + 1;
189 token[1] = -1;
190 send(token, 2, ParaINT, 0, TagToken);
191#ifndef _MUTEX_CPP11
192 pthread_mutex_unlock(&tokenAccessLock);
193#endif
194}
195
196bool
198 int tempRank
199 )
200{
201#ifdef _MUTEX_CPP11
202 std::lock_guard<std::mutex> lock(tokenAccessLock);
203#else
204 pthread_mutex_lock(&tokenAccessLock);
205#endif
206 if( myRank == token[0] )
207 {
208 if( token[1] == token[0] ) token[1] = -2;
209 else if( token[1] == -1 ) token[1] = token[0];
210 token[0] = ( token[0] % (myCommSize - 1) ) + 1;
211 }
212 else
213 {
214 THROW_LOGICAL_ERROR4("Invalid token update. Rank = ", getRank(), ", token = ", token[0] );
215 }
216 send(token, 2, ParaINT, 0, TagToken);
217 if( token[1] == -2 )
218 {
219#ifndef _MUTEX_CPP11
220 pthread_mutex_unlock(&tokenAccessLock);
221#endif
222 return true;
223 }
224 else
225 {
226#ifndef _MUTEX_CPP11
227 pthread_mutex_unlock(&tokenAccessLock);
228#endif
229 return false;
230 }
231}
232
233/// MPI call wrappers */
234void
235ParaCommMpi::init( int argc, char **argv )
236{
237
238#ifdef UG_WITH_UGS
239 if( !commUgs )
240 {
241 MPI_Init( &argc, &argv );
242
243//
244// To test if MPI support MPI_THREAD_MULTIPLE
245//
246// int provided;
247// MPI_CALL(
248// MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided)
249// );
250// if (provided < MPI_THREAD_MULTIPLE)
251// {
252// std::cerr << "Error: the MPI library doesn't provide the required thread level" << std::endl;
253// MPI_Abort(MPI_COMM_WORLD, 0);
254// }
255
256 }
257#else
258// MPI_Init( &argc, &argv );
259
260//
261// To test if MPI support MPI_THREAD_MULTIPLE
262//
263 int provided, claimed;
264 MPI_CALL(
265 MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided)
266 );
267 MPI_Query_thread( &claimed );
268 // printf( "Query thread level= %d Init_thread level= %d\n", claimed, provided );
269 assert(provided == MPI_THREAD_MULTIPLE);
270 if (provided < MPI_THREAD_MULTIPLE)
271 {
272 std::cerr << "Error: the MPI library doesn't provide the required thread level" << std::endl;
273 MPI_Abort(MPI_COMM_WORLD, 0);
274 }
275 // std::cout << "***** MPI multiple! *****" << std::endl;
276
277#endif
278 startTime = MPI_Wtime();
279 char *pprocName = procName;
280 MPI_CALL(
281 MPI_Get_processor_name(pprocName, &namelen)
282 );
283
284 /// if you add tag, you should add tagStringTale too */
285 // assert( sizeof(tagStringTable)/sizeof(char*) == N_MPI_TAGS );
287
288 /// Data Types */
289 datatypes[ParaCHAR] = MPI_CHAR;
290 datatypes[ParaSHORT] = MPI_SHORT;
291 datatypes[ParaINT] = MPI_INT;
292 datatypes[ParaLONG] = MPI_LONG;
293 datatypes[ParaUNSIGNED_CHAR] = MPI_UNSIGNED_CHAR;
294 datatypes[ParaUNSIGNED_SHORT] = MPI_UNSIGNED_SHORT;
295 datatypes[ParaUNSIGNED] = MPI_UNSIGNED;
296 datatypes[ParaUNSIGNED_LONG] = MPI_UNSIGNED_LONG;
297 datatypes[ParaFLOAT] = MPI_FLOAT;
298 datatypes[ParaDOUBLE] = MPI_DOUBLE;
299 datatypes[ParaLONG_DOUBLE] = MPI_LONG_DOUBLE;
300 datatypes[ParaBYTE] = MPI_BYTE;
301
302#ifdef _ALIBABA
303 datatypes[ParaSIGNED_CHAR] = MPI_CHAR;
304 datatypes[ParaLONG_LONG] = MPI_LONG;
305 datatypes[ParaUNSIGNED_LONG_LONG] = MPI_UNSIGNED_LONG;
306 datatypes[ParaBOOL] = MPI_INT;
307#else
308 datatypes[ParaSIGNED_CHAR] = MPI_SIGNED_CHAR;
309 datatypes[ParaLONG_LONG] = MPI_LONG_LONG;
310 datatypes[ParaUNSIGNED_LONG_LONG] = MPI_UNSIGNED_LONG_LONG;
311 datatypes[ParaBOOL] = MPI_INT;
312#endif
313
314}
315
317{
318 MPI_Finalize();
319}
320
321bool
323 )
324{
325 return ( sizeof(tagStringTable)/sizeof(char*) == N_MPI_TAGS );
326}
327
328const char *
330 int tag /// tag to be converted to string
331 )
332{
333 assert( tag >= 0 && tag < N_MPI_TAGS );
334 return tagStringTable[tag];
335}
336
337int
339 void* buffer,
340 int count,
341 const int datatypeId,
342 int root
343 )
344{
345 MPI_CALL(
346 MPI_Bcast( buffer, count, datatypes[datatypeId], root, myComm )
347 );
348 return 0;
349}
350
351int
353 void* buffer,
354 int count,
355 const int datatypeId,
356 int dest,
357 const int tag
358 )
359{
360 MPI_CALL(
361 MPI_Send( buffer, count, datatypes[datatypeId], dest, tag, myComm )
362 );
363 TAG_TRACE (Send, To, dest, tag);
364 return 0;
365}
366
367int
369 void* buffer,
370 int count,
371 const int datatypeId,
372 int dest,
373 const int tag,
374 MPI_Request *req
375 )
376{
377 MPI_CALL(
378 MPI_Isend( buffer, count, datatypes[datatypeId], dest, tag, myComm, req )
379 );
380 TAG_TRACE (iSend, To, dest, tag);
381 return 0;
382}
383
384int
386 void* buffer,
387 int count,
388 const int datatypeId,
389 int source,
390 const int tag
391 )
392{
393 MPI_Status mpiStatus;
394 MPI_CALL (
395 MPI_Recv( buffer, count, datatypes[datatypeId], source, tag, myComm, &mpiStatus )
396 );
397 TAG_TRACE (Recv, From, source, tag);
398 return 0;
399}
400
401void
403 const int source,
404 const int tag,
405 int *receivedTag
406 )
407{
408 MPI_Status mpiStatus;
409 if( tag == TagAny )
410 {
411 MPI_CALL (
412 MPI_Probe(source, MPI_ANY_TAG, myComm, &mpiStatus)
413 );
414 }
415 else
416 {
417 MPI_CALL (
418 MPI_Probe(source, tag, myComm, &mpiStatus)
419 );
420 }
421 if( tag == TagAny )
422 {
423 (*receivedTag) = mpiStatus.MPI_TAG;
424 TAG_TRACE (Probe, From, source, (*receivedTag));
425 return;
426 }
427 else
428 {
429 assert( tag == mpiStatus.MPI_TAG );
430 (*receivedTag) = mpiStatus.MPI_TAG;
431 TAG_TRACE (Probe, From, source, (*receivedTag));
432 return;
433 }
434}
435
436bool
438 int* source,
439 int* tag
440 )
441{
442 MPI_Status mpiStatus;
443 MPI_CALL (
444 MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, myComm, &mpiStatus)
445 );
446 *source = mpiStatus.MPI_SOURCE;
447 *tag = mpiStatus.MPI_TAG;
448 TAG_TRACE (Probe, From, *source, *tag);
449 return true;
450}
451
452bool
454 int* source,
455 int* tag
456 )
457{
458 int flag;
459 MPI_Status mpiStatus;
460 if( *tag == TagAny )
461 {
462 MPI_CALL (
463 MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, myComm, &flag, &mpiStatus)
464 );
465 }
466 else
467 {
468 assert(*tag >= 0);
469 MPI_CALL (
470 MPI_Iprobe(MPI_ANY_SOURCE, *tag, myComm, &flag, &mpiStatus)
471 );
472 }
473 if( flag )
474 {
475 *source = mpiStatus.MPI_SOURCE;
476 *tag = mpiStatus.MPI_TAG;
477 TAG_TRACE (Iprobe, From, *source, *tag);
478 }
479 return flag;
480}
481
482int
484 void* buffer,
485 int count,
486 MPI_Datatype datatype,
487 int root
488 )
489{
490 MPI_CALL(
491 MPI_Bcast( buffer, count, datatype, root, myComm )
492 );
493 return 0;
494}
495
496int
498 void* buffer,
499 int count,
500 MPI_Datatype datatype,
501 int dest,
502 const int tag
503 )
504{
505 MPI_CALL (
506 MPI_Send( buffer, count, datatype, dest, tag, myComm )
507 // MPI_Ssend( buffer, count, datatype, dest, tag, myComm ) // after racing, program hang
508 );
509 TAG_TRACE (Send, To, dest, tag);
510 return 0;
511}
512
513int
515 void* buffer,
516 int count,
517 MPI_Datatype datatype,
518 int dest,
519 const int tag,
520 MPI_Request *req
521 )
522{
523 MPI_CALL (
524 MPI_Isend( buffer, count, datatype, dest, tag, myComm, req )
525 // MPI_Ssend( buffer, count, datatype, dest, tag, myComm ) // after racing, program hang
526 );
527 TAG_TRACE (iSend, To, dest, tag);
528 return 0;
529}
530
531int
533 void* buffer,
534 int count,
535 MPI_Datatype datatype,
536 int source,
537 const int tag
538 )
539{
540 MPI_Status mpiStatus;
541 MPI_CALL (
542 MPI_Recv( buffer, count, datatype, source, tag, myComm, &mpiStatus )
543 );
544 TAG_TRACE (Recv, From, source, tag);
545 return 0;
546}
547
548int
550 )
551{
552 if( !iSendRequestDeque.empty() )
553 {
554 std::deque<ParaIsendRequest *>::iterator it = iSendRequestDeque.begin();
555 while( it != iSendRequestDeque.end() )
556 {
557 ParaIsendRequest *temp = *it;
558 if( temp->test() )
559 {
560 it = iSendRequestDeque.erase(it);
561 delete temp;
562 }
563 else
564 {
565 it++;
566 }
567 }
568 }
569 return iSendRequestDeque.size();
570}
571
572void
574 )
575{
576 if( !iSendRequestDeque.empty() )
577 {
578 std::deque<ParaIsendRequest *>::iterator it = iSendRequestDeque.begin();
579 while( it != iSendRequestDeque.end() )
580 {
581 ParaIsendRequest *temp = *it;
582 temp->wait();
583 it = iSendRequestDeque.erase(it);
584 delete temp;
585 }
586 }
587}
double startTime
start time of this communicator
Definition: paraCommMpi.h:104
bool probe(int *source, int *tag)
probe function which waits a new message
virtual int send(void *bufer, int count, const int datatypeId, int dest, const int tag)
send function for standard ParaData types
bool iProbe(int *source, int *tag)
iProbe function which checks if a new message is arrived or not
void solverInit(ParaParamSet *paraParamSet)
initializer for Solvers
Definition: paraCommMpi.cpp:96
static MPI_Datatype datatypes[TYPE_LIST_SIZE]
data type mapping table to MPI data type
Definition: paraCommMpi.h:110
int receive(void *bufer, int count, const int datatypeId, int source, const int tag)
receive function for standard ParaData types
int token[2]
index 0: token index 1: token color -1: green > 0: yellow ( termination origin solver number ) -2: re...
Definition: paraCommMpi.h:105
MPI_Comm myComm
MPI communicator.
Definition: paraCommMpi.h:96
virtual bool tagStringTableIsSetUpCoorectly()
check if tag string table (for debugging) set up correctly
bool tagTraceFlag
indicate if tags are traced or not
Definition: paraCommMpi.h:101
std::deque< ParaIsendRequest * > iSendRequestDeque
Definition: paraCommMpi.h:133
int myRank
rank of this process
Definition: paraCommMpi.h:98
std::ostream * tos
output file stream for tag trace to change file name
Definition: paraCommMpi.h:103
virtual const char * getTagString(int tag)
get Tag string for debugging
int iSend(void *bufer, int count, const int datatypeId, int dest, const int tag, MPI_Request *req)
send function for standard ParaData types
virtual void passToken(int rank)
pass token to from the rank to the next
char procName[MPI_MAX_PROCESSOR_NAME]
process name
Definition: paraCommMpi.h:100
int ubcast(void *buffer, int count, MPI_Datatype datatype, int root)
User type bcast for created data type.
int namelen
length of this process name
Definition: paraCommMpi.h:99
static const char * tagStringTable[]
table for tag name string
Definition: paraCommMpi.h:111
int ureceive(void *bufer, int count, MPI_Datatype datatype, int source, int tag)
User type receive for created data type.
pthread_mutex_t tokenAccessLock
mutex for pthread thread
Definition: paraCommMpi.h:116
int iUsend(void *bufer, int count, MPI_Datatype datatype, int dest, int tag, MPI_Request *req)
User type send for created data type.
int getRank()
get rank of caller's thread
Definition: paraCommMpi.h:213
int myCommSize
communicator size : number of processes joined in this system
Definition: paraCommMpi.h:97
virtual void init(int argc, char **argv)
initializer of this communicator
virtual ~ParaCommMpi()
destructor of this communicator
void abort()
abort. How it works sometimes depends on communicator used
virtual bool passTermToken(int rank)
pass termination token from the rank to the next
int bcast(void *buffer, int count, const int datatypeId, int root)
broadcast function for standard ParaData types
void waitSpecTagFromSpecSource(const int source, const int tag, int *receivedTag)
wait function for a specific tag from a specific source coming from
int usend(void *bufer, int count, MPI_Datatype datatype, int dest, int tag)
User type send for created data type.
std::ofstream ofs
output file stream for tag trace
Definition: paraCommMpi.h:102
virtual bool waitToken(int rank)
wait token when UG runs with deterministic mode
class ParaParamSet
Definition: paraParamSet.h:850
static ScipParaParamSet * paraParamSet
Definition: fscip.cpp:74
static const int ParaUNSIGNED_LONG
Definition: paraComm.h:73
static const int TagAckCompletion
Definition: paraTagDef.h:62
static const int TagCompletionOfCalculation
Definition: paraTagDef.h:54
static const int TagWinner
Definition: paraTagDef.h:60
static const int TagSolution
Definition: paraTagDef.h:51
static const int ParaUNSIGNED_SHORT
Definition: paraComm.h:71
static const int TagToken
Definition: paraTagDef.h:63
static const int TagTaskReceived
Definition: paraTagDef.h:48
static const int TagInterruptRequest
Definition: paraTagDef.h:57
static const int TagNotificationId
Definition: paraTagDef.h:55
static const int TagIncumbentValue
Definition: paraTagDef.h:52
static const int N_MPI_TAGS
-1 : no tag
Definition: paraTagDef.h:73
static const int ParaLONG_DOUBLE
Definition: paraComm.h:77
static const int ParaINT
Definition: paraComm.h:66
static const int TagTerminated
Definition: paraTagDef.h:58
static const int ParaLONG
Definition: paraComm.h:67
static const int TagTerminateRequest
Definition: paraTagDef.h:56
static const int TagAny
Definition: paraTagDef.h:44
static const int ParaBYTE
Definition: paraComm.h:79
static const int ParaUNSIGNED
Definition: paraComm.h:72
static const int TagRampUp
Definition: paraTagDef.h:50
static const int TagSolverState
Definition: paraTagDef.h:53
static const int TagHardTimeLimit
Definition: paraTagDef.h:61
static const int Deterministic
Definition: paraParamSet.h:76
static const int ParaFLOAT
Definition: paraComm.h:75
static const int TYPE_LIST_SIZE
Definition: paraComm.h:81
static const int TagTraceFileName
Definition: paraParamSet.h:126
static const int ParaBOOL
Definition: paraComm.h:78
static const int ParaCHAR
Definition: paraComm.h:64
static const int TagDiffSubproblem
Definition: paraTagDef.h:49
static const int TagTask
Definition: paraTagDef.h:47
static const int ParaSHORT
Definition: paraComm.h:65
static const int TagTrace
Definition: paraParamSet.h:72
static const int ParaUNSIGNED_LONG_LONG
Definition: paraComm.h:74
static const int ParaLONG_LONG
Definition: paraComm.h:68
static const int ParaUNSIGNED_CHAR
Definition: paraComm.h:70
static const int ParaDOUBLE
Definition: paraComm.h:76
static const int ParaSIGNED_CHAR
Definition: paraComm.h:69
#define TAG_TRACE(call, fromTo, sourceDest, tag)
Definition: paraCommCPP11.h:58
ParaComm extension for MPI communication.
#define MPI_CALL(mpicall)
Definition: paraCommMpi.h:68
#define THROW_LOGICAL_ERROR4(msg1, msg2, msg3, msg4)
Definition: paraDef.h:103
iSend request data structure
Fundamental Tag definitions.
#define TAG_STR(tag)
Definition: paraTagDef.h:40