Scippy

UG

Ubiquity Generator framework

paraCommCPP11.h
Go to the documentation of this file.
1 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2 /* */
3 /* This file is part of the program and software framework */
4 /* UG --- Ubquity Generator Framework */
5 /* */
6 /* Copyright Written by Yuji Shinano <shinano@zib.de>, */
7 /* Copyright (C) 2021 by Zuse Institute Berlin, */
8 /* licensed under LGPL version 3 or later. */
9 /* Commercial licenses are available through <licenses@zib.de> */
10 /* */
11 /* This code is free software; you can redistribute it and/or */
12 /* modify it under the terms of the GNU Lesser General Public License */
13 /* as published by the Free Software Foundation; either version 3 */
14 /* of the License, or (at your option) any later version. */
15 /* */
16 /* This program is distributed in the hope that it will be useful, */
17 /* but WITHOUT ANY WARRANTY; without even the implied warranty of */
18 /* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */
19 /* GNU Lesser General Public License for more details. */
20 /* */
21 /* You should have received a copy of the GNU Lesser General Public License */
22 /* along with this program. If not, see <http://www.gnu.org/licenses/>. */
23 /* */
24 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
25 
26 /**@file paraCommCPP11.h
27  * @brief ParaComm extension for C++11 thread communication
28  * @author Yuji Shinano
29  *
30  *
31  *
32  */
33 
34 /*---+----1----+----2----+----3----+----4----+----5----+----6----+----7----+----8----+----9----+----0----+----1----+----2*/
35 
36 
37 #ifndef __PARA_COMM_CPP11_H__
38 #define __PARA_COMM_CPP11_H__
39 
40 #include <thread>
41 #include <mutex>
42 #include <condition_variable>
43 #include <stdexcept>
44 #include <iostream>
45 #include <ostream>
46 #include <fstream>
47 #include <sstream>
48 #include <string>
49 #include <iomanip>
50 #include <cassert>
51 #include "paraDef.h"
52 #include "paraComm.h"
53 #include "paraSysTimer.h"
54 #include "paraParamSetTh.h"
55 #include "paraTimerTh.h"
56 
57 
58 #define TAG_TRACE( call, fromTo, sourceDest, tag ) \
59  if( tagTraceFlag ) \
60  { \
61  if( getOstream() ) \
62  { \
63  if( tag >= 0 ) \
64  { \
65  /* std::cout << " call = " << #call << ", Rank = " << getRank() << ", tag = " << tag << ", " << tagStringTable[tag] << std::endl; */ \
66  *getOstream() << timer.getRTimeInterval() << " [Rank = " << getRank() << "] " << #call << " " << #fromTo \
67  << " " << sourceDest << " with Tag = " << getTagString(tag) << std::endl; \
68  } \
69  else \
70  { \
71  /* std::cout << " call = " << #call << ", Rank = " << getRank() << ", tag = " << tag << std::endl; */ \
72  *getOstream() << timer.getRTimeInterval() << " [Rank = " << getRank() << "] " << #call << " " << #fromTo \
73  << " " << sourceDest << " as broadcast" << std::endl; \
74  } \
75  } \
76  else \
77  { \
78  if( tag >= 0 ) \
79  { \
80  std::cout << "### Before creating ostream, Rank = " << getRank() << " issues TAG_TRACE with " << #call << " " << #fromTo \
81  << " " << sourceDest << " with Tag = " << getTagString(tag) << std::endl; \
82  } \
83  else \
84  { \
85  std::cout << "### Before creating ostream, Rank = " << getRank() << " issues TAG_TRACE with " << #call << " " << #fromTo \
86  << " " << sourceDest << " as broadcast" << std::endl; \
87  } \
88  } \
89 }
90 
91 namespace UG
92 {
93 
94 ///
95 /// user defined transfer data types
96 ///
97 static const int UG_USER_TYPE_FIRST = TYPE_LAST + 1;
98 static const int ParaInstanceType = UG_USER_TYPE_FIRST + 0;
99 static const int ParaSolutionType = UG_USER_TYPE_FIRST + 1;
100 static const int ParaParamSetType = UG_USER_TYPE_FIRST + 2;
101 static const int ParaTaskType = UG_USER_TYPE_FIRST + 3;
102 static const int ParaSolverStateType = UG_USER_TYPE_FIRST + 4;
103 static const int ParaCalculationStateType = UG_USER_TYPE_FIRST + 5;
104 static const int ParaSolverTerminationStateType = UG_USER_TYPE_FIRST + 6;
105 static const int ParaRacingRampUpParamType = UG_USER_TYPE_FIRST + 7;
106 static const int UG_USER_TYPE_LAST = UG_USER_TYPE_FIRST + 7;
107 
108 static const int ThreadTableSize = 751; ///< size of thread table : this limits the number of threads
109 
110 ///
111 /// Class for message queue element
112 ///
113 /// NOTE : For basic data types, this is copy of sender side memory.
114 /// When the memory is copied at receive function, the memory
115 /// have to be freed.
116 /// For user defined data type, this is the receiver side memory,
117 /// because it is better to allocate memory in the sender side for
118 /// mutex locking. Sender side functions have to allocate memory.
119 /// In this case, memory do not have to be freed. The memory is for
120 /// receiver side.
121 ///
123 {
124 
125  int source; ///< source thread rank of this message
126  int count; ///< number of the data type elements
127  int dataTypeId; ///< data type id
128  int tag; ///< tag of the message, -1 : in case of broadcast message
129  void *data; ///< data of the message
130  MessageQueueElement *next; ///< point to next message queue element
131 
132 public:
133 
134  ///
135  /// default constructor of MessageQueueElement
136  ///
138  )
139  : source(-1),
140  count(0),
141  dataTypeId(-1),
142  tag(-1),
143  data(0),
144  next(0)
145  {
146  }
147 
148  ///
149  /// constructor of MessageQueueElement
150  ///
152  int inSource, ///< source thread rank of this message
153  int inCount, ///< number of the data type elements
154  int inDataTypeId, ///< data type id
155  int inTag, ///< tag of the message, -1 : in case of broadcast message
156  void *inData ///< data of the message
157  ) : source(inSource), count(inCount), dataTypeId(inDataTypeId), tag(inTag), data(inData), next(0)
158  {
159  }
160 
161  ///
162  /// destructor of MessageQueueElement
163  ///
165  )
166  {
167  }
168 
169  ///
170  /// getter of source rank
171  /// @return rank of the source
172  ///
174  )
175  {
176  return source;
177  }
178 
179  ///
180  /// getter of the number of the data type elements
181  /// @return the number of the data type elements
182  ///
183  int getCount(
184  )
185  {
186  return count;
187  }
188 
189  ///
190  /// getter of the data type id
191  /// @return data type id
192  ///
194  )
195  {
196  return dataTypeId;
197  }
198 
199  ///
200  /// getter of the message tag
201  /// @return tag of the message
202  ///
203  int getTag(
204  )
205  {
206  return tag;
207  }
208 
209  ///
210  /// getter of data
211  /// @return pointer to the data
212  ///
213  void *getData(
214  )
215  {
216  return data;
217  }
218 
219  ///
220  /// getter of the pointer to the next MessageQueueElement
221  /// @return pointer to MessageQueueElement
222  ///
224  )
225  {
226  return next;
227  }
228 
229  ///
230  /// link to the next MessageQueueElement
231  ///
232  void link(
233  MessageQueueElement *nextElement ///< pointer to MessageQueueElement
234  )
235  {
236  next = nextElement;
237  }
238 };
239 
240 ///
241 /// Class of MessageQueueTableElement
242 ///
244 {
245 
246  MessageQueueElement *head; ///< head of the message queue
247  MessageQueueElement *tail; ///< tail of the message queue
248  int size; ///< number of the messages in queue
249 
250 public:
251 
252  ///
253  /// default constructor of MessageQueueTableElement
254  ///
256  ) : head(0), tail(0), size(0)
257  {
258  }
259 
260  ///
261  /// destructor of MessageQueueTableElement
262  ///
264  )
265  {
266  while( head )
267  {
268  MessageQueueElement *next = head->getNext();
269  delete head;
270  head = next;
271  }
272  }
273 
274  ///
275  /// check if the specified message exists or nor
276  /// @return pointer to MessageQueueElement, 0: no element
277  ///
279  int source, ///< source rank
280  int datatypeId, ///< data type id
281  int tag ///< tag of the message
282  )
283  {
284  MessageQueueElement *ret = 0;
285  for( MessageQueueElement *current = head; current; current = current->getNext() )
286  {
287  if( current->getSource() == source
288  && current->getDataTypeId() == datatypeId
289  && current->getTag() == tag )
290  {
291  ret = current;
292  break;
293  }
294  }
295  return ret;
296  }
297 
298  ///
299  /// check if the specified message with tag exists or nor
300  /// @return pointer to MessageQueueElement, 0: no element
301  ///
303  int tag ///< tag of the message
304  )
305  {
306  MessageQueueElement *ret = 0;
307  for( MessageQueueElement *current = head; current; current = current->getNext() )
308  {
309  if( current->getTag() == tag )
310  {
311  ret = current;
312  break;
313  }
314  }
315  return ret;
316  }
317 
318  ///
319  /// extracts a message
320  /// @return pointer to the message
321  ///
323  bool *sentMessage, ///< for synchronization
324  int source, ///< source rank
325  int datatypeId, ///< data type id
326  int tag ///< tag of the message
327  )
328  {
329  MessageQueueElement *ret = 0;
330 
331  MessageQueueElement *prev = head;
332  MessageQueueElement *current = head;
333  while( current )
334  {
335  MessageQueueElement *next = current->getNext();
336  if( current->getSource() == source
337  && current->getDataTypeId() == datatypeId
338  && current->getTag() == tag )
339  {
340  ret = current;
341  ret->link(0);
342  if( current == head )
343  {
344  if( current == tail )
345  {
346  head = 0;
347  tail = 0;
348  *sentMessage = false;
349  }
350  else
351  {
352  head=next;
353  }
354  }
355  else
356  {
357  if( current == tail )
358  {
359  tail = prev;
360  }
361  prev->link(next);
362  }
363  size--;
364  break;
365  }
366  else
367  {
368  prev = current;
369  current = next;
370  }
371  }
372  return ret;
373  }
374 
375  ///
376  /// extracts a message
377  /// (This method is only for desctructor of ParaCommCPP11. No lock is necessary.)
378  ///
379  /// @return pointer to the message
380  ///
382  bool *sentMessage ///< for synchronization
383  )
384  {
385  if( head == 0 ) return 0;
386 
387  MessageQueueElement *current = head;
388  MessageQueueElement *next = current->getNext();
389  current->link(0);
390  if( current == tail )
391  {
392  head = 0;
393  tail = 0;
394  *sentMessage = false;
395  }
396  else
397  {
398  head=next;
399  }
400  size--;
401  return current;
402  }
403 
404  ///
405  /// enqueue a message
406  ///
407  void enqueue(
408  std::condition_variable& sentMsg, ///< condition variable for synchronization
409  std::mutex& queueLockMutex, ///< mutex for synchronization
410  bool *sentMessage, ///< flag for synchronization
411  MessageQueueElement *newElement ///< message queue element to enter
412  )
413  {
414  if( tail == 0 )
415  {
416  head = tail = newElement;
417  }
418  else
419  {
420  tail->link(newElement);
421  tail = newElement;
422  }
423  size++;
424  *sentMessage = true;
425  sentMsg.notify_one();
426  }
427 
428  ///
429  /// getter of head
430  /// @return head element pointer
431  ///
433  )
434  {
435  return head;
436  }
437 
438  ///
439  /// getter of size
440  /// @return size of the message queue
441  ///
442  int getSize(
443  )
444  {
445  return size;
446  }
447 
448  ///
449  /// check if the queue is empty or not
450  /// @return true, if it is empty, else false
451  ///
452  bool isEmpty(
453  )
454  {
455  bool empty = true;
456  if( head ) empty = false;
457  assert( !empty || size == 0 );
458  return empty;
459  }
460 
461  ///
462  /// wait for a message coming to a queue
463  ///
465  std::condition_variable& sentMsg, ///< condition variable for synchronization
466  std::mutex& queueLockMutex, ///< mutex for synchronization
467  bool *sentMessage ///< flag for synchronization
468  )
469  {
470  std::unique_lock<std::mutex> lk(queueLockMutex);
471  sentMsg.wait(lk, [&sentMessage]{return (*sentMessage == true);});
472  }
473 
474  ///
475  /// wait for a specified message coming to a queue
476  ///
478  std::condition_variable& sentMsg, ///< condition variable for synchronization
479  std::mutex& queueLockMutex, ///< mutex for synchronization
480  bool *sentMessage, ///< flag for synchronization
481  int source, ///< source rank of the message
482  int tag ///< tag of the message
483  )
484  {
485  for(;;)
486  {
487  std::unique_lock<std::mutex> lk(queueLockMutex);
488  sentMsg.wait(lk, [&sentMessage]{return (*sentMessage == true);});
489  MessageQueueElement *current = head;
490  while( current )
491  {
492  MessageQueueElement *next = current->getNext();
493  if( current->getSource() == source
494  && current->getTag() == tag )
495  {
496  break;
497  }
498  current = next;
499  }
500  if( current ) break;
501  *sentMessage = false;
502  }
503  }
504 
505  ///
506  /// wait for a specified message coming to a queue
507  ///
509  std::condition_variable& sentMsg, ///< condition variable for synchronization
510  std::mutex& queueLockMutex, ///< mutex for synchronization
511  bool *sentMessage, ///< flag for synchronization
512  int source, ///< source rank of the message
513  int datatypeId, ///< data type id of the message
514  int tag ///< tag of the message
515  )
516  {
517  for(;;)
518  {
519  std::unique_lock<std::mutex> lk(queueLockMutex);
520  sentMsg.wait(lk, [&sentMessage]{return (*sentMessage == true);});
521  MessageQueueElement *current = head;
522  while( current )
523  {
524  MessageQueueElement *next = current->getNext();
525  if( current->getSource() == source
526  && current->getDataTypeId() == datatypeId
527  && current->getTag() == tag )
528  {
529  break;
530  }
531  current = next;
532  }
533  if( current ) break;
534  *sentMessage = false;
535  }
536  }
537 
538  ///
539  /// wait for a specified message coming to a queue
540  ///
542  std::condition_variable& sentMsg, ///< condition variable for synchronization
543  std::mutex& queueLockMutex, ///< mutex for synchronization
544  bool *sentMessage, ///< flag for synchronization
545  int source, ///< source rank of the message
546  int *tag ///< tag of the message
547  )
548  {
549  for(;;)
550  {
551  std::unique_lock<std::mutex> lk(queueLockMutex);
552  sentMsg.wait(lk, [&sentMessage]{return (*sentMessage == true);});
553  MessageQueueElement *current = head;
554  if( *tag == TagAny )
555  {
556  while( current )
557  {
558  MessageQueueElement *next = current->getNext();
559  if( current->getSource() == source )
560  {
561  *tag = current->getTag();
562  break;
563  }
564  current = next;
565  }
566  }
567  else
568  {
569  while( current )
570  {
571  MessageQueueElement *next = current->getNext();
572  if( current->getSource() == source
573  && current->getTag() == (*tag) )
574  {
575  break;
576  }
577  current = next;
578  }
579  }
580  if( current ) break;
581  *sentMessage = false;
582  }
583  }
584 
585 };
586 
587 ///
588 /// Class of ThreadsTableElement
589 ///
591 {
592 
593  int rank; ///< rank of this thread
594  std::ostream *tos; ///< tag trace stream for this thread
595 
596 public:
597 
598  ///
599  /// default constructor of ThreadsTableElement
600  ///
602  )
603  : rank(0),
604  tos(0)
605  {
606  }
607 
608  ///
609  /// constructor of ThreadsTableElement
610  ///
612  int inRank, ///< rank of this thread
613  ParaParamSet *paraParamSet ///< UG parameter set
614  )
615  : rank(inRank),
616  tos(0)
617  {
618  if( paraParamSet->getBoolParamValue(TagTrace) )
619  {
620  if( paraParamSet->isStringParamDefaultValue(TagTraceFileName) )
621  {
622  tos = &std::cout;
623  }
624  else
625  {
626  std::ostringstream s;
627  s << paraParamSet->getStringParamValue(TagTraceFileName) << inRank;
628  std::ofstream *ofs = new std::ofstream();
629  ofs->open(s.str().c_str());
630  tos = ofs;
631  }
632  }
633  }
634 
635  ///
636  /// destructor of ThreadsTableElement
637  ///
639  )
640  {
641  if( tos )
642  {
643  if( tos != &std::cout )
644  {
645  std::ofstream *ofs = dynamic_cast<std::ofstream *>(tos);
646  ofs->close();
647  delete ofs;
648  }
649  }
650  }
651 
652  ///
653  /// getter of this thread rank
654  /// @return the thread rank
655  ///
656  int getRank(
657  )
658  {
659  return rank;
660  }
661 
662  ///
663  /// setter of this thread rank
664  ///
665  void setRank(
666  int r ///< rank to be set
667  )
668  {
669  rank = r;
670  }
671 
672  ///
673  /// getter of tag trace stream of this rank
674  /// @return ponter to the tag trace stream
675  ///
676  std::ostream *getOstream(
677  )
678  {
679  return tos;
680  }
681 };
682 
684 class ParaTask;
685 class ParaSolverState;
687 class ParaDiffSubproblem;
688 class ParaInstance;
689 class ParaSolution;
690 
691 ///
692 /// Communicator object for C++11 thread communications
693 ///
694 class ParaCommCPP11 : public ParaComm
695 {
696 protected:
697  int comSize; ///< communicator size : number of threads joined in this system
698  bool tagTraceFlag; ///< indicate if tags are traced or not
699  int **token; ///< index 0: token
700  ///< index 1: token color
701  ///< -1: green
702  ///< > 0: yellow ( termination origin solver number )
703  ///< -2: red ( means the solver can terminate )
704  ParaSysTimer timer; ///< system timer
705  static const char *tagStringTable[]; ///< tag name string table
706  static ThreadsTableElement *threadsTable[ThreadTableSize]; ///< threads table: index is thread rank
707  static thread_local int localRank; ///< local thread rank
708  MessageQueueTableElement **messageQueueTable; ///< message queue table
709  bool *sentMessage; ///< sent message flag for synchronization
710  std::mutex *queueLockMutex; ///< mutex for synchronization
711  std::condition_variable *sentMsg; ///< condition variable for synchronization
712 
713  std::mutex *tokenAccessLockMutex; ///< mutex to access token
714  std::mutex rankLockMutex; ///< mutex to access rank
715  std::mutex applicationLockMutex; ///< mutex for applications
716 
717  ///
718  /// allocate memory and copy message
719  ///
720  void *allocateMemAndCopy(
721  const void* buffer, ///< pointer to buffer of the message
722  int count, ///< the number of data element in the message
723  const int datatypeId ///< data type of each element in the message
724  );
725 
726  ///
727  /// copy message
728  ///
729  void copy(
730  void *dest, ///< destination to copy the data
731  const void *src, ///< source of the data
732  int count, ///< the number of data element
733  int datatypeId ///< data type of each element
734  );
735 
736  ///
737  /// free memory
738  ///
739  void freeMem(
740  void* buffer, ///< pointer to buffer of the message
741  int count, ///< the number of data element
742  const int datatypeId ///< data type of each element
743  );
744 
745  ///
746  /// free memory
747  /// @return
748  ///
749  bool freeStandardTypes(
750  MessageQueueElement *elem ///< pointer to a message queue element
751  );
752 
753  ///
754  /// check if tag string table (for debugging) set up correctly
755  /// @return true if tag string table is set up correctly, false otherwise
756  ///
757  virtual bool tagStringTableIsSetUpCoorectly(
758  );
759 
760 public:
761 
762  // using UG::ParaComm::lockApp;
763  // using UG::ParaComm::unlockApp;
764 
765  ///
766  /// constructor of ParaComCPP11
767  ///
769  )
770  : comSize(-1),
771  tagTraceFlag(false),
772  token(0),
773  messageQueueTable(0),
774  sentMessage(0),
775  queueLockMutex(0),
776  sentMsg(0),
777  tokenAccessLockMutex(0)
778  {
779  }
780 
781  ///
782  /// destructor of this communicator
783  ///
784  virtual ~ParaCommCPP11(
785  );
786 
787  ///
788  /// initializer of this communicator
789  ///
790  virtual void init(
791  int argc, ///< the number of arguments
792  char **argv ///< pointers to the arguments
793  );
794 
795  ///
796  // set local rank in case of using a communicator for shared memory
797  //
798  virtual void setLocalRank(
799  int inRank
800  )
801  {
802  localRank = inRank;
803  }
804 
805  ///
806  /// get start time of this communicator
807  /// (should not be used)
808  /// @return start time
809  ///
810  double getStartTime(
811  )
812  {
813  return timer.getStartTime();
814  }
815 
816  ///
817  /// get rank of caller's thread
818  /// @return rank of caller's thread
819  ///
820  int getRank(
821  );
822 
823  ///
824  /// get size of this communicator, which indicates how many threads in a UG process
825  /// @return the number of threads
826  ///
827  int getSize(
828  )
829  {
830  return comSize;
831  }
832 
833  ///
834  /// get size of the messageQueueTable
835  /// @return if dest >= 0 then return the number of only messages waiting to send to dest,
836  /// othrewise return the number of all messages waiting to send.
837  ///
839  int dest=-1
840  )
841  {
842  if( dest >= 0 )
843  {
844  return messageQueueTable[dest]->getSize();
845  }
846  else
847  {
848  int nTotalMessages = 0;
849  for( int i = 0; i < ( comSize + 1 ); i++ )
850  {
851  nTotalMessages += messageQueueTable[i]->getSize();
852  }
853  return nTotalMessages;
854  }
855  }
856 
857  ///
858  /// initializer for LoadCoordinator
859  ///
860  virtual void lcInit(
861  ParaParamSet *paraParamSet ///< UG parameter set
862  );
863 
864  ///
865  /// initializer for Solvers
866  ///
867  virtual void solverInit(
868  ParaParamSet *paraParamSet ///< UG parameter set
869  )
870  {
871  }
872 
873  ///
874  /// initializer for a specific Solver
875  ///
876  virtual void solverInit(
877  int rank, ///< rank of the Solver
878  ParaParamSet *paraParamSet ///< UG parameter set
879  );
880 
881  ///
882  /// reinitializer of a specific Solver
883  ///
884  virtual void solverReInit(
885  int rank, ///< rank of the Solver
886  ParaParamSet *paraParamSet ///< UG parameter set
887  );
888 
889  ///
890  /// delete Solver from this communicator
891  ///
892  virtual void solverDel(
893  int rank ///< rank of the Solver
894  );
895 
896  ///
897  /// abort. How it works sometimes depends on communicator used
898  ///
899  void abort(
900  )
901  {
902  std::abort();
903  }
904 
905  ///
906  /// function to wait Terminated message
907  /// (This function is not used currently)
908  /// @return true when MPI communication is used, false when thread communication used
909  ///
910  virtual bool waitTerminatedMessage(
911  )
912  {
913  return false;
914  }
915 
916  ///
917  /// wait token when UG runs with deterministic mode
918  /// @return true, when token is arrived to the rank
919  ///
920  virtual bool waitToken(
921  int rank ///< rank to check if token is arrived
922  );
923 
924  ///
925  /// pass token to from the rank to the next
926  ///
927  virtual void passToken(
928  int rank ///< from this rank, the token is passed
929  );
930 
931  ///
932  /// pass termination token from the rank to the next
933  /// @return true, when the termination token is passed from this rank, false otherwise
934  ///
935  virtual bool passTermToken(
936  int rank ///< from this rank, the termination token is passed
937  );
938 
939  ///
940  /// set received token to this communicator
941  ///
942  virtual void setToken(
943  int rank, ///< rank to set the token
944  int *inToken ///< token to be set
945  );
946 
947  ///
948  /// get ostream pointer
949  /// @return pointer to ostram
950  ///
951  std::ostream *getOstream(
952  );
953 
954  ///
955  /// lock rank
956  ///
957  void lockRank()
958  {
959  rankLockMutex.lock();
960  }
961 
962  ///
963  /// unlock rank
964  ///
965  void unlockRank()
966  {
967  rankLockMutex.unlock();
968  }
969 
970  ///
971  /// lock UG application to synchronize with other threads
972  ///
973  virtual void lockApp(
974  )
975  {
976  applicationLockMutex.lock();
977  // std::cout << "### Rank" << this->getRank() << ":AppLocked!" << std::endl;
978  }
979 
980  ///
981  /// lock UG application to synchronize with other threads
982  /// (for debug)
983  ///
984  virtual void lockApp(
985  char const *f, ///< string to indicate what is locked
986  int l ///< a number to show something
987  )
988  {
989  applicationLockMutex.lock();
990  std::cout << "### Rank" << this->getRank() << ":AppLocked! [" << f << ", " << l << "]" << std::endl;
991  }
992 
993  ///
994  /// unlock UG application to synchronize with other threads
995  ///
996  virtual void unlockApp(
997  )
998  {
999  applicationLockMutex.unlock();
1000  // std::cout << "### Rank" << this->getRank() << ":AppUnlocked!" << std::endl;
1001  }
1002 
1003  ///
1004  /// unlock UG application to synchronize with other threads
1005  /// (for debug)
1006  ///
1007  virtual void unlockApp(
1008  char const *f, ///< string to indicate what is locked
1009  int l ///< a number to show something
1010  )
1011  {
1012  applicationLockMutex.unlock();
1013  std::cout << "### Rank" << this->getRank() << ":AppUnlocked! [" << f << ", " << l << "]" << std::endl;
1014  }
1015 
1016  ///
1017  /// unlock rank
1018  /// (for debugging)
1019  ///
1021  char const *f, ///< string to indicate what is locked
1022  int l ///< a number to show something
1023  )
1024  {
1025  rankLockMutex.unlock();
1026  std::cout << "Rank Unlocked: " << f << ", " << l << std::endl;
1027  }
1028 
1029  ///
1030  /// create ParaTimer object
1031  /// @return pointer to ParaTimer object
1032  ///
1034  )
1035  {
1036  return new ParaTimerTh();
1037  }
1038 
1039  ///
1040  /// broadcast function for standard ParaData types
1041  /// @return always 0 (for future extensions)
1042  ///
1043  int bcast(
1044  void* buffer, ///< point to the head of sending message
1045  int count, ///< the number of data in the message
1046  const int datatypeId, ///< data type in the message
1047  int root ///< root rank for broadcasting
1048  );
1049 
1050  ///
1051  /// send function for standard ParaData types
1052  /// @return always 0 (for future extensions)
1053  ///
1054  int send(
1055  void* bufer, ///< point to the head of sending message
1056  int count, ///< the number of data in the message
1057  const int datatypeId, ///< data type in the message
1058  int dest, ///< destination to send the message
1059  const int tag ///< tag of this message
1060  );
1061 
1062  ///
1063  /// receive function for standard ParaData types
1064  /// @return always 0 (for future extensions)
1065  ///
1066  int receive(
1067  void* bufer, ///< point to the head of receiving message
1068  int count, ///< the number of data in the message
1069  const int datatypeId, ///< data type in the message
1070  int source, ///< source of the message coming from
1071  const int tag ///< tag of the message
1072  );
1073 
1074  ///
1075  /// wait function for a specific tag from a specific source coming from
1076  /// @return always 0 (for future extensions)
1077  ///
1078  void waitSpecTagFromSpecSource(
1079  const int source, ///< source rank which the message should come from
1080  const int tag, ///< tag which the message should wait
1081  int *receivedTag ///< tag of the message which is arrived
1082  );
1083 
1084  ///
1085  /// probe function which waits a new message
1086  /// @return always true
1087  ///
1088  bool probe(
1089  int *source, ///< source rank of the message arrived
1090  int *tag ///< tag of the message arrived
1091  );
1092 
1093  ///
1094  /// iProbe function which checks if a new message is arrived or not
1095  /// @return true when a new message exists
1096  ///
1097  bool iProbe(
1098  int *source, ///< source rank of the message arrived
1099  int *tag ///< tag of the message arrived
1100  );
1101 
1102  ///
1103  /// User type send for created data type
1104  /// @return always 0 (for future extensions)
1105  ///
1106  int uTypeSend(
1107  void* bufer, ///< point to the head of sending message
1108  const int datatypeId, ///< created data type
1109  int dest, ///< destination rank
1110  int tag ///< tag of the message
1111  );
1112 
1113  ///
1114  /// User type receive for created data type
1115  /// @return always 0 (for future extensions)
1116  ///
1117  int uTypeReceive(
1118  void** bufer, ///< point to the head of receiving message
1119  const int datatypeId, ///< created data type
1120  int source, ///< source rank
1121  int tag ///< tag of the message
1122  );
1123 
1124  ///
1125  /// get Tag string for debugging
1126  /// @return string which shows Tag
1127  ///
1128  virtual const char *getTagString(
1129  int tag /// tag to be converted to string
1130  );
1131 
1132 };
1133 
1134 #define DEF_PARA_COMM( para_comm, comm ) ParaCommCPP11 *para_comm = dynamic_cast< ParaCommCPP11* >(comm)
1135 #define LOCK_APP( comm ) comm->lockApp(__FILE__, __LINE__)
1136 #define UNLOCK_APP( comm ) comm->lockApp(__FILE__, __LINE__)
1137 #define LOCK_RANK( comm ) comm->lockRank(__FILE__, __LINE__)
1138 #define UNLOCK_RANK( comm ) comm->unlockRank(__FILE__, __LINE__)
1139 }
1140 
1141 #endif // __PARA_COMM_CPP11_H__
int count
number of the data type elements
~MessageQueueElement()
destructor of MessageQueueElement
static const char * tagStringTable[]
mutex for interrupt message monitor
std::ostream * getOstream()
getter of tag trace stream of this rank
class ParaSolverState (ParaSolver state object for notification message)
int getDataTypeId()
getter of the data type id
static const int ParaCalculationStateType
MessageQueueElement * getHead()
getter of head
std::mutex applicationLockMutex
mutex for applications
ParaTimer extension for threads.
MessageQueueElement * extarctElement(bool *sentMessage)
extracts a message (This method is only for desctructor of ParaCommCPP11. No lock is necessary...
MessageQueueElement * checkElementWithTag(int tag)
check if the specified message with tag exists or nor
static ScipParaParamSet * paraParamSet
Definition: fscip.cpp:74
MessageQueueElement()
default constructor of MessageQueueElement
virtual void setLocalRank(int inRank)
void unlockRank(char const *f, int l)
unlock rank (for debugging)
ParaTimer * createParaTimer()
create ParaTimer object
void * data
data of the message
int comSize
communicator size : number of threads joined in this system
static const int TagAny
Definition: paraTagDef.h:44
MessageQueueElement * tail
tail of the message queue
virtual void lockApp(char const *f, int l)
lock UG application to synchronize with other threads (for debug)
MessageQueueElement * checkElement(int source, int datatypeId, int tag)
check if the specified message exists or nor
Base class of Calculation state in a ParaSolver.
static thread_local int localRank
local thread rank
int getCount()
getter of the number of the data type elements
virtual bool waitTerminatedMessage()
function to wait Terminated message (This function is not used currently)
static const int ParaInstanceType
Definition: paraCommCPP11.h:98
Defines for UG Framework.
const char * getTagString(int tag)
get Tag string for debugging
MessageQueueElement * getNext()
getter of the pointer to the next MessageQueueElement
class ParaTimerMpi (Timer used in thread communication)
Definition: paraTimerTh.h:49
MessageQueueElement * next
point to next message queue element
ThreadsTableElement()
default constructor of ThreadsTableElement
ThreadsTableElement(int inRank, ParaParamSet *paraParamSet)
constructor of ThreadsTableElement
Class of MessageQueueTableElement.
static const int TYPE_LAST
Definition: paraComm.h:80
Class for message queue element.
std::mutex rankLockMutex
mutex to access rank
void waitMessage(std::condition_variable &sentMsg, std::mutex &queueLockMutex, bool *sentMessage, int source, int datatypeId, int tag)
wait for a specified message coming to a queue
MessageQueueElement(int inSource, int inCount, int inDataTypeId, int inTag, void *inData)
constructor of MessageQueueElement
static const int TagTraceFileName
Definition: paraParamSet.h:126
int getSource()
getter of source rank
void waitMessage(std::condition_variable &sentMsg, std::mutex &queueLockMutex, bool *sentMessage, int source, int *tag)
wait for a specified message coming to a queue
void abort()
abort. How it works sometimes depends on communicator used
MessageQueueElement * extarctElement(bool *sentMessage, int source, int datatypeId, int tag)
extracts a message
double getStartTime()
get start time of this communicator (should not be used)
void unlockRank()
unlock rank
int ** token
index 0: token index 1: token color -1: green > 0: yellow ( termination origin solver number ) -2: re...
int getNumOfMessagesWaitingToSend(int dest=-1)
get size of the messageQueueTable
void enqueue(std::condition_variable &sentMsg, std::mutex &queueLockMutex, bool *sentMessage, MessageQueueElement *newElement)
enqueue a message
static const int ParaSolverTerminationStateType
Base class of communicator for UG Framework.
static const int TagTrace
Definition: paraParamSet.h:72
System timer.
void link(MessageQueueElement *nextElement)
link to the next MessageQueueElement
std::condition_variable * sentMsg
condition variable for synchronization
bool isStringParamDefaultValue(int param)
check if string parameter is default value or not
class ParaSolverTerminationState (Solver termination state in a ParaSolver)
virtual void unlockApp(char const *f, int l)
unlock UG application to synchronize with other threads (for debug)
int size
number of the messages in queue
virtual void solverInit(ParaParamSet *paraParamSet)
initializer for Solvers
class ParaParamSet
Definition: paraParamSet.h:850
int tag
tag of the message, -1 : in case of broadcast message
class for instance data
Definition: paraInstance.h:50
void lockRank()
lock rank
ParaSysTimer timer
system timer
void setRank(int r)
setter of this thread rank
Class for the difference between instance and subproblem.
static const int ParaSolutionType
Definition: paraCommCPP11.h:99
int dataTypeId
data type id
const char * getStringParamValue(int param)
for char parameters
Class of ThreadsTableElement.
Class ParaSysTimer.
Definition: paraSysTimer.h:107
~ThreadsTableElement()
destructor of ThreadsTableElement
MessageQueueTableElement()
default constructor of MessageQueueTableElement
double getStartTime(void)
get start time
virtual void unlockApp()
unlock UG application to synchronize with other threads
int getTag()
getter of the message tag
void * getData()
getter of data
void waitMessage(std::condition_variable &sentMsg, std::mutex &queueLockMutex, bool *sentMessage, int source, int tag)
wait for a specified message coming to a queue
bool isEmpty()
check if the queue is empty or not
int source
source thread rank of this message
int getSize()
getter of size
bool tagStringTableIsSetUpCoorectly()
check if tag string table (for debugging) set up correctly
void waitMessage(std::condition_variable &sentMsg, std::mutex &queueLockMutex, bool *sentMessage)
wait for a message coming to a queue
int rank
rank of this thread
bool tagTraceFlag
indicate if tags are traced or not
int getSize()
get size of this communicator, which indicates how many threads in a UG process
static const int ParaTaskType
static const int UG_USER_TYPE_LAST
class ParaTimer
Definition: paraTimer.h:48
MessageQueueTableElement ** messageQueueTable
message queue table
static const int ParaSolverStateType
virtual void lockApp()
lock UG application to synchronize with other threads
int getRank()
getter of this thread rank
std::mutex * queueLockMutex
mutex for synchronization
ParaCommCPP11()
constructor of ParaComCPP11
bool * sentMessage
sent message flag for synchronization
MessageQueueElement * head
head of the message queue
static const int ThreadTableSize
size of thread table : this limits the number of threads
static const int ParaParamSetType
Communicator object for C++11 thread communications.
static const int UG_USER_TYPE_FIRST
user defined transfer data types
Definition: paraCommCPP11.h:97
static const int ParaRacingRampUpParamType
Base class of communicator object.
Definition: paraComm.h:101
class for solution
Definition: paraSolution.h:53
~MessageQueueTableElement()
destructor of MessageQueueTableElement
std::ostream * tos
tag trace stream for this thread
std::mutex * tokenAccessLockMutex
mutex to access token
class ParaTask
Definition: paraTask.h:541
bool getBoolParamValue(int param)
for bool parameters