Scippy

UG

Ubiquity Generator framework

paraCommMpi.cpp
Go to the documentation of this file.
1 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2 /* */
3 /* This file is part of the program and software framework */
4 /* UG --- Ubquity Generator Framework */
5 /* */
6 /* Copyright Written by Yuji Shinano <shinano@zib.de>, */
7 /* Copyright (C) 2021 by Zuse Institute Berlin, */
8 /* licensed under LGPL version 3 or later. */
9 /* Commercial licenses are available through <licenses@zib.de> */
10 /* */
11 /* This code is free software; you can redistribute it and/or */
12 /* modify it under the terms of the GNU Lesser General Public License */
13 /* as published by the Free Software Foundation; either version 3 */
14 /* of the License, or (at your option) any later version. */
15 /* */
16 /* This program is distributed in the hope that it will be useful, */
17 /* but WITHOUT ANY WARRANTY; without even the implied warranty of */
18 /* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */
19 /* GNU Lesser General Public License for more details. */
20 /* */
21 /* You should have received a copy of the GNU Lesser General Public License */
22 /* along with this program. If not, see <http://www.gnu.org/licenses/>. */
23 /* */
24 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
25 
26 /**@file paraCommMpi.cpp
27  * @brief ParaComm extension for MPI communication.
28  * @author Yuji Shinano
29  *
30  *
31  *
32  */
33 
34 /*---+----1----+----2----+----3----+----4----+----5----+----6----+----7----+----8----+----9----+----0----+----1----+----2*/
35 
36 #include <cassert>
37 #include <cstring>
38 #include "paraTagDef.h"
39 #include "paraCommMpi.h"
40 #include "paraIsendRequest.h"
41 
42 using namespace UG;
43 
44 MPI_Datatype
46 
47 const char *
61  TAG_STR(TagRacingRampUpParamSets),
66 };
67 
68 void
71  )
72 {
73  tagTraceFlag = paraParamSet->getBoolParamValue(TagTrace);
74  if( tagTraceFlag )
75  {
76  if( paraParamSet->isStringParamDefaultValue(TagTraceFileName) )
77  {
78  tos = &std::cout;
79  }
80  else
81  {
82  std::ostringstream s;
83  s << paraParamSet->getStringParamValue(TagTraceFileName) << myRank;
84  ofs.open(s.str().c_str());
85  tos = &ofs;
86  }
87  }
88  if( paraParamSet->getBoolParamValue(Deterministic) )
89  {
90  token[0] = 0;
91  token[1] = -1;
92  }
93 }
94 
95 void
98  )
99 {
100  tagTraceFlag = paraParamSet->getBoolParamValue(TagTrace);
101  if( tagTraceFlag )
102  {
103  if( paraParamSet->isStringParamDefaultValue(TagTraceFileName) )
104  {
105  tos = &std::cout;
106  }
107  else
108  {
109  std::ostringstream s;
110  s << paraParamSet->getStringParamValue(TagTraceFileName) << myRank;
111  ofs.open(s.str().c_str());
112  tos = &ofs;
113  }
114  }
115 }
116 
117 void
119  )
120 {
121  MPI_Abort(MPI_COMM_WORLD, 0);
122 }
123 
124 bool
126  int tempRank
127  )
128 {
129 #ifdef _MUTEX_CPP11
130  std::lock_guard<std::mutex> lock(tokenAccessLock);
131 #else
132  pthread_mutex_lock(&tokenAccessLock);
133 #endif
134  if( token[0] == myRank )
135  {
136 #ifndef _MUTEX_CPP11
137  pthread_mutex_unlock(&tokenAccessLock);
138 #endif
139  return true;
140  }
141  else
142  {
143  int previousRank = myRank - 1;
144  if( previousRank == 0 )
145  {
146  if( token[0] != -1 )
147  {
148  previousRank = myCommSize - 1;
149  }
150  }
151  int receivedTag;
152  MPI_Status mpiStatus;
153  MPI_CALL (
154  MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, myComm, &mpiStatus)
155  );
156  receivedTag = mpiStatus.MPI_TAG;
157  TAG_TRACE (Probe, From, mpiStatus.MPI_SOURCE, receivedTag);
158  if( receivedTag == TagToken )
159  {
160  receive(token, 2, ParaINT, 0, TagToken);
161  assert(token[0] == myRank);
162 #ifndef _MUTEX_CPP11
163  pthread_mutex_unlock(&tokenAccessLock);
164 #endif
165  return true;
166  }
167  else
168  {
169 #ifndef _MUTEX_CPP11
170  pthread_mutex_unlock(&tokenAccessLock);
171 #endif
172  return false;
173  }
174  }
175 }
176 
177 void
179  int tempRank
180  )
181 {
182 #ifdef _MUTEX_CPP11
183  std::lock_guard<std::mutex> lock(tokenAccessLock);
184 #else
185  pthread_mutex_lock(&tokenAccessLock);
186 #endif
187  assert( token[0] == myRank );
188  token[0] = ( token[0] % (myCommSize - 1) ) + 1;
189  token[1] = -1;
190  send(token, 2, ParaINT, 0, TagToken);
191 #ifndef _MUTEX_CPP11
192  pthread_mutex_unlock(&tokenAccessLock);
193 #endif
194 }
195 
196 bool
198  int tempRank
199  )
200 {
201 #ifdef _MUTEX_CPP11
202  std::lock_guard<std::mutex> lock(tokenAccessLock);
203 #else
204  pthread_mutex_lock(&tokenAccessLock);
205 #endif
206  if( myRank == token[0] )
207  {
208  if( token[1] == token[0] ) token[1] = -2;
209  else if( token[1] == -1 ) token[1] = token[0];
210  token[0] = ( token[0] % (myCommSize - 1) ) + 1;
211  }
212  else
213  {
214  THROW_LOGICAL_ERROR4("Invalid token update. Rank = ", getRank(), ", token = ", token[0] );
215  }
216  send(token, 2, ParaINT, 0, TagToken);
217  if( token[1] == -2 )
218  {
219 #ifndef _MUTEX_CPP11
220  pthread_mutex_unlock(&tokenAccessLock);
221 #endif
222  return true;
223  }
224  else
225  {
226 #ifndef _MUTEX_CPP11
227  pthread_mutex_unlock(&tokenAccessLock);
228 #endif
229  return false;
230  }
231 }
232 
233 /// MPI call wrappers */
234 void
235 ParaCommMpi::init( int argc, char **argv )
236 {
237 
238 #ifdef UG_WITH_UGS
239  if( !commUgs )
240  {
241  MPI_Init( &argc, &argv );
242 
243 //
244 // To test if MPI support MPI_THREAD_MULTIPLE
245 //
246 // int provided;
247 // MPI_CALL(
248 // MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided)
249 // );
250 // if (provided < MPI_THREAD_MULTIPLE)
251 // {
252 // std::cerr << "Error: the MPI library doesn't provide the required thread level" << std::endl;
253 // MPI_Abort(MPI_COMM_WORLD, 0);
254 // }
255 
256  }
257 #else
258 // MPI_Init( &argc, &argv );
259 
260 //
261 // To test if MPI support MPI_THREAD_MULTIPLE
262 //
263  int provided, claimed;
264  MPI_CALL(
265  MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided)
266  );
267  MPI_Query_thread( &claimed );
268  // printf( "Query thread level= %d Init_thread level= %d\n", claimed, provided );
269  assert(provided == MPI_THREAD_MULTIPLE);
270  if (provided < MPI_THREAD_MULTIPLE)
271  {
272  std::cerr << "Error: the MPI library doesn't provide the required thread level" << std::endl;
273  MPI_Abort(MPI_COMM_WORLD, 0);
274  }
275  // std::cout << "***** MPI multiple! *****" << std::endl;
276 
277 #endif
278  startTime = MPI_Wtime();
279  char *pprocName = procName;
280  MPI_CALL(
281  MPI_Get_processor_name(pprocName, &namelen)
282  );
283 
284  /// if you add tag, you should add tagStringTale too */
285  // assert( sizeof(tagStringTable)/sizeof(char*) == N_MPI_TAGS );
286  assert( tagStringTableIsSetUpCoorectly() );
287 
288  /// Data Types */
289  datatypes[ParaCHAR] = MPI_CHAR;
290  datatypes[ParaSHORT] = MPI_SHORT;
291  datatypes[ParaINT] = MPI_INT;
292  datatypes[ParaLONG] = MPI_LONG;
293  datatypes[ParaUNSIGNED_CHAR] = MPI_UNSIGNED_CHAR;
294  datatypes[ParaUNSIGNED_SHORT] = MPI_UNSIGNED_SHORT;
295  datatypes[ParaUNSIGNED] = MPI_UNSIGNED;
296  datatypes[ParaUNSIGNED_LONG] = MPI_UNSIGNED_LONG;
297  datatypes[ParaFLOAT] = MPI_FLOAT;
298  datatypes[ParaDOUBLE] = MPI_DOUBLE;
299  datatypes[ParaLONG_DOUBLE] = MPI_LONG_DOUBLE;
300  datatypes[ParaBYTE] = MPI_BYTE;
301 
302 #ifdef _ALIBABA
303  datatypes[ParaSIGNED_CHAR] = MPI_CHAR;
304  datatypes[ParaLONG_LONG] = MPI_LONG;
305  datatypes[ParaUNSIGNED_LONG_LONG] = MPI_UNSIGNED_LONG;
306  datatypes[ParaBOOL] = MPI_INT;
307 #else
308  datatypes[ParaSIGNED_CHAR] = MPI_SIGNED_CHAR;
309  datatypes[ParaLONG_LONG] = MPI_LONG_LONG;
310  datatypes[ParaUNSIGNED_LONG_LONG] = MPI_UNSIGNED_LONG_LONG;
311  datatypes[ParaBOOL] = MPI_INT;
312 #endif
313 
314 }
315 
317 {
318  MPI_Finalize();
319 }
320 
321 bool
323  )
324 {
325  return ( sizeof(tagStringTable)/sizeof(char*) == N_MPI_TAGS );
326 }
327 
328 const char *
330  int tag /// tag to be converted to string
331  )
332 {
333  assert( tag >= 0 && tag < N_MPI_TAGS );
334  return tagStringTable[tag];
335 }
336 
337 int
339  void* buffer,
340  int count,
341  const int datatypeId,
342  int root
343  )
344 {
345  MPI_CALL(
346  MPI_Bcast( buffer, count, datatypes[datatypeId], root, myComm )
347  );
348  return 0;
349 }
350 
351 int
353  void* buffer,
354  int count,
355  const int datatypeId,
356  int dest,
357  const int tag
358  )
359 {
360  MPI_CALL(
361  MPI_Send( buffer, count, datatypes[datatypeId], dest, tag, myComm )
362  );
363  TAG_TRACE (Send, To, dest, tag);
364  return 0;
365 }
366 
367 int
369  void* buffer,
370  int count,
371  const int datatypeId,
372  int dest,
373  const int tag,
374  MPI_Request *req
375  )
376 {
377  MPI_CALL(
378  MPI_Isend( buffer, count, datatypes[datatypeId], dest, tag, myComm, req )
379  );
380  TAG_TRACE (iSend, To, dest, tag);
381  return 0;
382 }
383 
384 int
386  void* buffer,
387  int count,
388  const int datatypeId,
389  int source,
390  const int tag
391  )
392 {
393  MPI_Status mpiStatus;
394  MPI_CALL (
395  MPI_Recv( buffer, count, datatypes[datatypeId], source, tag, myComm, &mpiStatus )
396  );
397  TAG_TRACE (Recv, From, source, tag);
398  return 0;
399 }
400 
401 void
403  const int source,
404  const int tag,
405  int *receivedTag
406  )
407 {
408  MPI_Status mpiStatus;
409  if( tag == TagAny )
410  {
411  MPI_CALL (
412  MPI_Probe(source, MPI_ANY_TAG, myComm, &mpiStatus)
413  );
414  }
415  else
416  {
417  MPI_CALL (
418  MPI_Probe(source, tag, myComm, &mpiStatus)
419  );
420  }
421  if( tag == TagAny )
422  {
423  (*receivedTag) = mpiStatus.MPI_TAG;
424  TAG_TRACE (Probe, From, source, (*receivedTag));
425  return;
426  }
427  else
428  {
429  assert( tag == mpiStatus.MPI_TAG );
430  (*receivedTag) = mpiStatus.MPI_TAG;
431  TAG_TRACE (Probe, From, source, (*receivedTag));
432  return;
433  }
434 }
435 
436 bool
438  int* source,
439  int* tag
440  )
441 {
442  MPI_Status mpiStatus;
443  MPI_CALL (
444  MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, myComm, &mpiStatus)
445  );
446  *source = mpiStatus.MPI_SOURCE;
447  *tag = mpiStatus.MPI_TAG;
448  TAG_TRACE (Probe, From, *source, *tag);
449  return true;
450 }
451 
452 bool
454  int* source,
455  int* tag
456  )
457 {
458  int flag;
459  MPI_Status mpiStatus;
460  if( *tag == TagAny )
461  {
462  MPI_CALL (
463  MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, myComm, &flag, &mpiStatus)
464  );
465  }
466  else
467  {
468  assert(*tag >= 0);
469  MPI_CALL (
470  MPI_Iprobe(MPI_ANY_SOURCE, *tag, myComm, &flag, &mpiStatus)
471  );
472  }
473  if( flag )
474  {
475  *source = mpiStatus.MPI_SOURCE;
476  *tag = mpiStatus.MPI_TAG;
477  TAG_TRACE (Iprobe, From, *source, *tag);
478  }
479  return flag;
480 }
481 
482 int
484  void* buffer,
485  int count,
486  MPI_Datatype datatype,
487  int root
488  )
489 {
490  MPI_CALL(
491  MPI_Bcast( buffer, count, datatype, root, myComm )
492  );
493  return 0;
494 }
495 
496 int
498  void* buffer,
499  int count,
500  MPI_Datatype datatype,
501  int dest,
502  const int tag
503  )
504 {
505  MPI_CALL (
506  MPI_Send( buffer, count, datatype, dest, tag, myComm )
507  // MPI_Ssend( buffer, count, datatype, dest, tag, myComm ) // after racing, program hang
508  );
509  TAG_TRACE (Send, To, dest, tag);
510  return 0;
511 }
512 
513 int
515  void* buffer,
516  int count,
517  MPI_Datatype datatype,
518  int dest,
519  const int tag,
520  MPI_Request *req
521  )
522 {
523  MPI_CALL (
524  MPI_Isend( buffer, count, datatype, dest, tag, myComm, req )
525  // MPI_Ssend( buffer, count, datatype, dest, tag, myComm ) // after racing, program hang
526  );
527  TAG_TRACE (iSend, To, dest, tag);
528  return 0;
529 }
530 
531 int
533  void* buffer,
534  int count,
535  MPI_Datatype datatype,
536  int source,
537  const int tag
538  )
539 {
540  MPI_Status mpiStatus;
541  MPI_CALL (
542  MPI_Recv( buffer, count, datatype, source, tag, myComm, &mpiStatus )
543  );
544  TAG_TRACE (Recv, From, source, tag);
545  return 0;
546 }
547 
548 int
550  )
551 {
552  if( !iSendRequestDeque.empty() )
553  {
554  std::deque<ParaIsendRequest *>::iterator it = iSendRequestDeque.begin();
555  while( it != iSendRequestDeque.end() )
556  {
557  ParaIsendRequest *temp = *it;
558  if( temp->test() )
559  {
560  it = iSendRequestDeque.erase(it);
561  delete temp;
562  }
563  else
564  {
565  it++;
566  }
567  }
568  }
569  return iSendRequestDeque.size();
570 }
571 
572 void
574  )
575 {
576  if( !iSendRequestDeque.empty() )
577  {
578  std::deque<ParaIsendRequest *>::iterator it = iSendRequestDeque.begin();
579  while( it != iSendRequestDeque.end() )
580  {
581  ParaIsendRequest *temp = *it;
582  temp->wait();
583  it = iSendRequestDeque.erase(it);
584  delete temp;
585  }
586  }
587 }
static const int ParaSHORT
Definition: paraComm.h:65
pthread_mutex_t tokenAccessLock
mutex for pthread thread
Definition: paraCommMpi.h:116
void lcInit(ParaParamSet *paraParamSet)
initializer for LoadCoordinator
Definition: paraCommMpi.cpp:69
static const int TagNotificationId
Definition: paraTagDef.h:55
#define THROW_LOGICAL_ERROR4(msg1, msg2, msg3, msg4)
Definition: paraDef.h:103
static const int ParaCHAR
Definition: paraComm.h:64
static const int TagIncumbentValue
Definition: paraTagDef.h:52
static const char * tagStringTable[]
table for tag name string
Definition: paraCommMpi.h:111
static ScipParaParamSet * paraParamSet
Definition: fscip.cpp:74
virtual void init(int argc, char **argv)
initializer of this communicator
static const int TagTerminateRequest
Definition: paraTagDef.h:56
bool tagTraceFlag
indicate if tags are traced or not
Definition: paraCommMpi.h:101
int myCommSize
communicator size : number of processes joined in this system
Definition: paraCommMpi.h:97
std::ofstream ofs
output file stream for tag trace
Definition: paraCommMpi.h:102
int ureceive(void *bufer, int count, MPI_Datatype datatype, int source, int tag)
User type receive for created data type.
int getRank()
get rank of caller&#39;s thread
Definition: paraCommMpi.h:213
static const int TagRampUp
Definition: paraTagDef.h:50
static const int TagAny
Definition: paraTagDef.h:44
static const int ParaLONG_LONG
Definition: paraComm.h:68
static const int ParaUNSIGNED_LONG
Definition: paraComm.h:73
static const int TagHardTimeLimit
Definition: paraTagDef.h:61
virtual ~ParaCommMpi()
destructor of this communicator
static const int ParaBOOL
Definition: paraComm.h:78
static const int TYPE_LIST_SIZE
Definition: paraComm.h:81
void waitSpecTagFromSpecSource(const int source, const int tag, int *receivedTag)
wait function for a specific tag from a specific source coming from
std::deque< ParaIsendRequest * > iSendRequestDeque
Definition: paraCommMpi.h:133
double startTime
start time of this communicator
Definition: paraCommMpi.h:104
static const int ParaUNSIGNED
Definition: paraComm.h:72
static const int Deterministic
Definition: paraParamSet.h:76
static const int ParaLONG
Definition: paraComm.h:67
static const int TagTraceFileName
Definition: paraParamSet.h:126
static const int ParaLONG_DOUBLE
Definition: paraComm.h:77
static const int ParaDOUBLE
Definition: paraComm.h:76
ParaComm extension for MPI communication.
static const int TagTaskReceived
Definition: paraTagDef.h:48
static const int TagSolverState
Definition: paraTagDef.h:53
static const int ParaSIGNED_CHAR
Definition: paraComm.h:69
static const int TagSolution
Definition: paraTagDef.h:51
virtual int send(void *bufer, int count, const int datatypeId, int dest, const int tag)
send function for standard ParaData types
static const int TagToken
Definition: paraTagDef.h:63
static const int TagTrace
Definition: paraParamSet.h:72
bool isStringParamDefaultValue(int param)
check if string parameter is default value or not
bool probe(int *source, int *tag)
probe function which waits a new message
static const int ParaUNSIGNED_CHAR
Definition: paraComm.h:70
class ParaParamSet
Definition: paraParamSet.h:850
MPI_Comm myComm
MPI communicator.
Definition: paraCommMpi.h:96
char procName[MPI_MAX_PROCESSOR_NAME]
process name
Definition: paraCommMpi.h:100
int token[2]
index 0: token index 1: token color -1: green > 0: yellow ( termination origin solver number ) -2: re...
Definition: paraCommMpi.h:105
const char * getStringParamValue(int param)
for char parameters
virtual const char * getTagString(int tag)
get Tag string for debugging
virtual bool waitToken(int rank)
wait token when UG runs with deterministic mode
static const int ParaBYTE
Definition: paraComm.h:79
static const int TagTerminated
Definition: paraTagDef.h:58
Fundamental Tag definitions.
int iUsend(void *bufer, int count, MPI_Datatype datatype, int dest, int tag, MPI_Request *req)
User type send for created data type.
static MPI_Datatype datatypes[TYPE_LIST_SIZE]
data type mapping table to MPI data type
Definition: paraCommMpi.h:110
static const int ParaUNSIGNED_LONG_LONG
Definition: paraComm.h:74
#define TAG_TRACE(call, fromTo, sourceDest, tag)
Definition: paraCommCPP11.h:58
static const int TagDiffSubproblem
Definition: paraTagDef.h:49
virtual bool passTermToken(int rank)
pass termination token from the rank to the next
static const int ParaFLOAT
Definition: paraComm.h:75
int receive(void *bufer, int count, const int datatypeId, int source, const int tag)
receive function for standard ParaData types
int bcast(void *buffer, int count, const int datatypeId, int root)
broadcast function for standard ParaData types
static const int TagInterruptRequest
Definition: paraTagDef.h:57
#define TAG_STR(tag)
Definition: paraTagDef.h:40
static const int TagTask
Definition: paraTagDef.h:47
void abort()
abort. How it works sometimes depends on communicator used
static const int ParaINT
Definition: paraComm.h:66
int ubcast(void *buffer, int count, MPI_Datatype datatype, int root)
User type bcast for created data type.
virtual void passToken(int rank)
pass token to from the rank to the next
std::ostream * tos
output file stream for tag trace to change file name
Definition: paraCommMpi.h:103
static const int ParaUNSIGNED_SHORT
Definition: paraComm.h:71
static const int TagCompletionOfCalculation
Definition: paraTagDef.h:54
int namelen
length of this process name
Definition: paraCommMpi.h:99
virtual bool tagStringTableIsSetUpCoorectly()
check if tag string table (for debugging) set up correctly
#define MPI_CALL(mpicall)
Definition: paraCommMpi.h:68
int myRank
rank of this process
Definition: paraCommMpi.h:98
int usend(void *bufer, int count, MPI_Datatype datatype, int dest, int tag)
User type send for created data type.
int iSend(void *bufer, int count, const int datatypeId, int dest, const int tag, MPI_Request *req)
send function for standard ParaData types
static const int TagWinner
Definition: paraTagDef.h:60
bool iProbe(int *source, int *tag)
iProbe function which checks if a new message is arrived or not
static const int TagAckCompletion
Definition: paraTagDef.h:62
void solverInit(ParaParamSet *paraParamSet)
initializer for Solvers
Definition: paraCommMpi.cpp:96
bool getBoolParamValue(int param)
for bool parameters