Scippy

UG

Ubiquity Generator framework

paraCommPth.h
Go to the documentation of this file.
1 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2 /* */
3 /* This file is part of the program and software framework */
4 /* UG --- Ubquity Generator Framework */
5 /* */
6 /* Copyright Written by Yuji Shinano <shinano@zib.de>, */
7 /* Copyright (C) 2021 by Zuse Institute Berlin, */
8 /* licensed under LGPL version 3 or later. */
9 /* Commercial licenses are available through <licenses@zib.de> */
10 /* */
11 /* This code is free software; you can redistribute it and/or */
12 /* modify it under the terms of the GNU Lesser General Public License */
13 /* as published by the Free Software Foundation; either version 3 */
14 /* of the License, or (at your option) any later version. */
15 /* */
16 /* This program is distributed in the hope that it will be useful, */
17 /* but WITHOUT ANY WARRANTY; without even the implied warranty of */
18 /* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */
19 /* GNU Lesser General Public License for more details. */
20 /* */
21 /* You should have received a copy of the GNU Lesser General Public License */
22 /* along with this program. If not, see <http://www.gnu.org/licenses/>. */
23 /* */
24 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
25 
26 /**@file paraCommPth.h
27  * @brief ParaComm extension for Pthreads communication
28  * @author Yuji Shinano
29  *
30  *
31  *
32  */
33 
34 /*---+----1----+----2----+----3----+----4----+----5----+----6----+----7----+----8----+----9----+----0----+----1----+----2*/
35 
36 
37 #ifndef __PARA_COMM_PTH_H__
38 #define __PARA_COMM_PTH_H__
39 
40 #include <pthread.h>
41 #include <stdexcept>
42 #include <iostream>
43 #include <ostream>
44 #include <fstream>
45 #include <sstream>
46 #include <string>
47 #include <iomanip>
48 #include <cassert>
49 #include "paraDef.h"
50 #include "paraComm.h"
51 #include "paraSysTimer.h"
52 #include "paraParamSetTh.h"
53 #include "paraTimerTh.h"
54 #include "paraPthLock.h"
55 #include "paraPthCondVar.h"
56 
57 #define HashEntry( tid ) ( hashCode(tid) % UG::HashTableSize )
58 
59 #define TAG_TRACE( call, fromTo, sourceDest, tag ) \
60  if( tagTraceFlag ) \
61  { \
62  if( tag >= 0 ) \
63  { \
64  /* std::cout << " call = " << #call << ", Rank = " << getRank() << ", tag = " << tag << ", " << tagStringTable[tag] << std::endl; */ \
65  *getOstream() << timer.getRTimeInterval() << " [Rank = " << getRank() << "] " << #call << " " << #fromTo \
66  << " " << sourceDest << " with Tag = " << getTagString(tag) << std::endl; \
67  } \
68  else \
69  { \
70  /* std::cout << " call = " << #call << ", Rank = " << getRank() << ", tag = " << tag << std::endl; */ \
71  *getOstream() << timer.getRTimeInterval() << " [Rank = " << getRank() << "] " << #call << " " << #fromTo \
72  << " " << sourceDest << " as broadcast" << std::endl; \
73  } \
74 }
75 
76 namespace UG
77 {
78 
79 ///
80 /// user defined transfer data types
81 ///
82 static const int UG_USER_TYPE_FIRST = TYPE_LAST + 1;
83 static const int ParaInstanceType = UG_USER_TYPE_FIRST + 0;
84 static const int ParaSolutionType = UG_USER_TYPE_FIRST + 1;
85 static const int ParaParamSetType = UG_USER_TYPE_FIRST + 2;
86 static const int ParaTaskType = UG_USER_TYPE_FIRST + 3;
87 static const int ParaSolverStateType = UG_USER_TYPE_FIRST + 4;
88 static const int ParaCalculationStateType = UG_USER_TYPE_FIRST + 5;
89 static const int ParaSolverTerminationStateType = UG_USER_TYPE_FIRST + 6;
90 static const int ParaRacingRampUpParamType = UG_USER_TYPE_FIRST + 7;
91 // static const int ParaSolverDiffParamType = USER_TYPE_FIRST + 8;
92 static const int UG_USER_TYPE_LAST = UG_USER_TYPE_FIRST + 7;
93 //static const int ParaInitialStatType = USER_TYPE_FIRST + 9;
94 
95 static const int HashTableSize = 751; ///< size of thread table : this limits the number of threads
96 
97 ///
98 /// Class for message queue element
99 ///
100 /// NOTE : For basic data types, this is copy of sender side memory.
101 /// When the memory is copied at receive function, the memory
102 /// have to be freed.
103 /// For user defined data type, this is the receiver side memory,
104 /// because it is better to allocate memory in the sender side for
105 /// mutex locking. Sender side functions have to allocate memory.
106 /// In this case, memory do not have to be freed. The memory is for
107 /// receiver side.
108 ///
110 {
111  int source; ///< source thread rank of this message
112  int count; ///< number of elements of the data type
113  int dataTypeId; ///< data type id
114  int tag; ///< -1 : in case of broadcast message
115  void *data; ///< NOTE : For basic data types, this is copy of sender side memory.
116  ///< When the memory is copied at receive function, the memory
117  ///< have to be freed.
118  ///< For user defined data type, this is the receiver side memory,
119  ///< because it is better to allocate memory in the sender side for
120  ///< mutex locking. Sender side functions have to allocate memory.
121  ///< In this case, memory do not hvae to freed. The memory is for
122  ///< receiver side.
123  MessageQueueElement *next; ///< point to next message queue element
124 
125 public:
126 
127  ///
128  /// default constructor of MessageQueueElement
129  ///
131  ) : source(-1), count(0), dataTypeId(-1), tag(-1), data(0), next(0)
132  {
133  }
134 
135  ///
136  /// constructor of MessageQueueElement
137  ///
139  int inSource, ///< source thread rank of this message
140  int inCount, ///< number of the data type elements
141  int inDataTypeId, ///< data type id
142  int inTag, ///< tag of the message, -1 : in case of broadcast message
143  void *inData ///< data of the message
144  ) : source(inSource), count(inCount), dataTypeId(inDataTypeId), tag(inTag), data(inData), next(0)
145  {
146  }
147 
148  ///
149  /// destructor of MessageQueueElement
150  ///
152  )
153  {
154  }
155 
156  ///
157  /// getter of source rank
158  /// @return rank of the source
159  ///
161  )
162  {
163  return source;
164  }
165 
166  ///
167  /// getter of the number of the data type elements
168  /// @return the number of the data type elements
169  ///
170  int getCount(
171  )
172  {
173  return count;
174  }
175 
176  ///
177  /// getter of the data type id
178  /// @return data type id
179  ///
181  )
182  {
183  return dataTypeId;
184  }
185 
186  ///
187  /// getter of the message tag
188  /// @return tag of the message
189  ///
190  int getTag(
191  )
192  {
193  return tag;
194  }
195 
196  ///
197  /// getter of data
198  /// @return pointer to the data
199  ///
200  void *getData(
201  )
202  {
203  return data;
204  }
205 
206  ///
207  /// getter of the pointer to the next MessageQueueElement
208  /// @return pointer to MessageQueueElement
209  ///
211  )
212  {
213  return next;
214  }
215 
216  ///
217  /// link to the next MessageQueueElement
218  ///
219  void link(
220  MessageQueueElement *nextElement ///< pointer to MessageQueueElement
221  )
222  {
223  next = nextElement;
224  }
225 
226 };
227 
228 ///
229 /// Class of MessageQueueTableElement
230 ///
232 {
233  // bool sentMessage;
234  // Lock queueLock;
235  // ConditionVariable sentMsg;
236  MessageQueueElement *head; ///< head of the message queue
237  MessageQueueElement *tail; ///< tail of the message queue
238  int size; ///< number of the messages in queue
239 
240 public:
241 
242  ///
243  /// default constructor of MessageQueueTableElement
244  ///
246  ) : head(0), tail(0), size(0)
247  // ) : sentMessage(false), head(0), tail(0)
248  {
249  // sentMsg.setLock(&queueLock);
250  }
251 
252  ///
253  /// destructor of MessageQueueTableElement
254  ///
256  )
257  {
258  // LOCKED (&queueLock) {
259  while( head )
260  {
261  MessageQueueElement *next = head->getNext();
262  delete head;
263  head = next;
264  }
265  //}
266  }
267 
268  ///
269  /// check if the specified message exists or nor
270  /// @return pointer to MessageQueueElement, 0: no element
271  ///
273  int source, ///< source rank
274  int datatypeId, ///< data type id
275  int tag ///< tag of the message
276  )
277  {
278  MessageQueueElement *ret = 0;
279  // LOCKED (&queueLock) {
280  for( MessageQueueElement *current = head; current; current = current->getNext() )
281  {
282  if( current->getSource() == source
283  && current->getDataTypeId() == datatypeId
284  && current->getTag() == tag )
285  {
286  ret = current;
287  break;
288  }
289  }
290  // }
291  return ret;
292  }
293 
294  ///
295  /// check if the specified message with tag exists or nor
296  /// @return pointer to MessageQueueElement, 0: no element
297  ///
299  int tag ///< tag of the message
300  )
301  {
302  MessageQueueElement *ret = 0;
303  // LOCKED (&queueLock) {
304  for( MessageQueueElement *current = head; current; current = current->getNext() )
305  {
306  if( current->getTag() == tag )
307  {
308  ret = current;
309  break;
310  }
311  }
312  // }
313  return ret;
314  }
315 
316  ///
317  /// extracts a message
318  /// @return pointer to the message
319  ///
321  bool *sentMessage, ///< for synchronization
322  int source, ///< source rank
323  int datatypeId, ///< data type id
324  int tag ///< tag of the message
325  )
326  {
327  MessageQueueElement *ret = 0;
328 
329  // LOCKED (&queueLock) {
330  MessageQueueElement *prev = head;
331  MessageQueueElement *current = head;
332  while( current )
333  {
334  MessageQueueElement *next = current->getNext();
335  if( current->getSource() == source
336  && current->getDataTypeId() == datatypeId
337  && current->getTag() == tag )
338  {
339  ret = current;
340  ret->link(0);
341  if( current == head )
342  {
343  if( current == tail )
344  {
345  head = 0;
346  tail = 0;
347  *sentMessage = false;
348  }
349  else
350  {
351  head=next;
352  }
353  }
354  else
355  {
356  if( current == tail )
357  {
358  tail = prev;
359  }
360  prev->link(next);
361  }
362  size--;
363  break;
364  }
365  else
366  {
367  prev = current;
368  current = next;
369  }
370  }
371  // }
372  return ret;
373  }
374 
375  ///
376  /// extracts a message
377  /// (This method is only for desctructor of ParaCommCPP11. No lock is necessary.)
378  ///
379  /// @return pointer to the message
380  ///
382  bool *sentMessage ///< for synchronization
383  )
384  {
385  if( head == 0 ) return 0;
386 
387  MessageQueueElement *current = head;
388  MessageQueueElement *next = current->getNext();
389  current->link(0);
390  if( current == tail )
391  {
392  head = 0;
393  tail = 0;
394  *sentMessage = false;
395  }
396  else
397  {
398  head=next;
399  }
400  size--;
401  return current;
402  }
403 
404  ///
405  /// enqueue a message
406  ///
407  void enqueue(
408  ConditionVariable *sentMsg, ///< condition variable for synchronization
409  bool *sentMessage, ///< mutex for synchronization
410  MessageQueueElement *newElement ///< message queue element to enter
411  )
412  {
413  // LOCKED (&queueLock) {
414  if( tail == 0 )
415  {
416  head = tail = newElement;
417  }
418  else
419  {
420  tail->link(newElement);
421  tail = newElement;
422  }
423  size++;
424  *sentMessage = true;
425  (*sentMsg).signal();
426  // }
427  }
428 
429  ///
430  /// getter of head
431  /// @return head element pointer
432  ///
434  )
435  {
436  return head;
437  }
438 
439  ///
440  /// getter of size
441  /// @return size of the message queue
442  ///
443  int getSize(
444  )
445  {
446  return size;
447  }
448 
449  ///
450  /// check if the queue is empty or not
451  /// @return true, if it is empty, else false
452  ///
453  bool isEmpty(
454  )
455  {
456  bool empty = true;
457  // LOCKED (&queueLock) {
458  if( head ) empty = false;
459  // }
460  assert( !empty || size == 0 );
461  return empty;
462  }
463 
464  ///
465  /// wait for a message coming to a queue
466  ///
468  ConditionVariable *sentMsg, ///< condition variable for synchronization
469  bool *sentMessage ///< flag for synchronization
470  )
471  {
472  CONDITIONVARIABLE_WAIT (sentMsg, *sentMessage == true);
473  }
474 
475  ///
476  /// wait for a specified message coming to a queue
477  ///
479  ConditionVariable *sentMsg, ///< condition variable for synchronization
480  bool *sentMessage, ///< flag for synchronization
481  int source, ///< source rank of the message
482  int datatypeId, ///< data type id of the message
483  int tag ///< tag of the message
484  )
485  {
486  for(;;)
487  {
488  // CONDITIONVARIABLE_WAIT (&sentMsg, sentMessage == true);
489  CONDITIONVARIABLE_WAIT (sentMsg, *sentMessage == true);
490  MessageQueueElement *current = head;
491  while( current )
492  {
493  MessageQueueElement *next = current->getNext();
494  if( current->getSource() == source
495  && current->getDataTypeId() == datatypeId
496  && current->getTag() == tag )
497  {
498  break;
499  }
500  current = next;
501  }
502  if( current ) break;
503  *sentMessage = false;
504  }
505  }
506 
507  ///
508  /// wait for a specified message coming to a queue
509  ///
511  ConditionVariable *sentMsg, ///< condition variable for synchronization
512  bool *sentMessage, ///< flag for synchronization
513  int source, ///< source rank of the message
514  int *tag ///< tag of the message
515  )
516  {
517  for(;;)
518  {
519  // CONDITIONVARIABLE_WAIT (&sentMsg, sentMessage == true);
520  CONDITIONVARIABLE_WAIT (sentMsg, *sentMessage == true);
521  MessageQueueElement *current = head;
522  if( *tag == TagAny )
523  {
524  while( current )
525  {
526  MessageQueueElement *next = current->getNext();
527  if( current->getSource() == source )
528  {
529  *tag = current->getTag();
530  break;
531  }
532  current = next;
533  }
534  }
535  else
536  {
537  while( current )
538  {
539  MessageQueueElement *next = current->getNext();
540  if( current->getSource() == source
541  && current->getTag() == (*tag) )
542  {
543  break;
544  }
545  current = next;
546  }
547  }
548  if( current ) break;
549  *sentMessage = false;
550  }
551  }
552 
553 };
554 
555 ///
556 /// Class of ThreadsTableElement
557 ///
559 {
560 
561 #ifdef _UG_NO_THREAD_LOCAL_STATIC
562  pthread_t tid; ///< thread id of this thread
563 #endif
564  int rank; ///< rank of this thread
565  std::ostream *tos; ///< tag trace stream for this thread
566  ThreadsTableElement *next; ///< next ThradsTableElement pointer
567 
568 public:
569 
570  ///
571  /// default constructor of ThreadsTableElement
572  ///
574  ) :
575 #ifdef _UG_NO_THREAD_LOCAL_STATIC
576  tid(0),
577 #endif
578  rank(0), tos(0), next(0)
579  {
580  }
581 
582  ///
583  /// constructor of ThreadsTableElement
584  ///
586 #ifdef _UG_NO_THREAD_LOCAL_STATIC
587  pthread_t inTid, ///< thread id
588 #endif
589  int inRank, ///< thread rank
590  ParaParamSet *paraParamSet ///< UG parameter set
591  ) :
592 #ifdef _UG_NO_THREAD_LOCAL_STATIC
593  tid(inTid),
594 #endif
595  rank(inRank), tos(0), next(0)
596  {
597  if( paraParamSet->getBoolParamValue(TagTrace) )
598  {
599  if( paraParamSet->isStringParamDefaultValue(TagTraceFileName) )
600  {
601  tos = &std::cout;
602  }
603  else
604  {
605  std::ostringstream s;
606  s << paraParamSet->getStringParamValue(TagTraceFileName) << inRank;
607  std::ofstream *ofs = new std::ofstream();
608  ofs->open(s.str().c_str());
609  tos = ofs;
610  }
611  }
612  }
613 
614  ///
615  /// destructor of ThreadsTableElement
616  ///
618  )
619  {
620  if( tos )
621  {
622  if( tos != &std::cout )
623  {
624  std::ofstream *ofs = dynamic_cast<std::ofstream *>(tos);
625  ofs->close();
626  delete ofs;
627  }
628  }
629  }
630 
631 #ifdef _UG_NO_THREAD_LOCAL_STATIC
632  ///
633  /// get thread id
634  /// @return thread id
635  ///
636  pthread_t getTid(
637  )
638  {
639  return tid;
640  }
641 #endif
642 
643  ///
644  /// getter of this thread rank
645  /// @return the thread rank
646  ///
647  int getRank(
648  )
649  {
650  return rank;
651  }
652 
653  ///
654  /// setter of this thread rank
655  ///
656  void setRank(
657  int r ///< rank to be set
658  )
659  {
660  rank = r;
661  }
662 
663  ///
664  /// getter of tag trace stream of this rank
665  /// @return ponter to the tag trace stream
666  ///
667  std::ostream *getOstream(
668  )
669  {
670  return tos;
671  }
672 
673  ///
674  /// get next element
675  /// @return next element
676  ///
678  )
679  {
680  return next;
681  }
682 
683 #ifdef _UG_NO_THREAD_LOCAL_STATIC
684  void link(
685  ThreadsTableElement *nextElement
686  )
687  {
688  next = nextElement;
689  }
690 #endif
691 
692 };
693 
695 class ParaTask;
696 class ParaSolverState;
698 class ParaDiffSubproblem;
699 class ParaInstance;
700 class ParaSolution;
701 
702 ///
703 /// Communicator object for pthreads thread communications
704 ///
705 class ParaCommPth : public ParaComm
706 {
707 protected:
708  int comSize; ///< communicator size : number of threads joined in this system
709  bool tagTraceFlag; ///< indicate if tags are traced or not
710  int **token; ///< index 0: token
711  ///< index 1: token color
712  ///< -1: green
713  ///< > 0: yellow ( termination origin solver number )
714  ///< -2: red ( means the solver can terminate )
715  ParaSysTimer timer; ///< system timer
716  static const char *tagStringTable[]; ///< tag name string table
717  static ThreadsTableElement *threadsTable[HashTableSize]; ///< threads table: index is thread rank
718 #ifndef _UG_NO_THREAD_LOCAL_STATIC
719  static __thread int localRank; ///< local thread rank
720 #endif
721  MessageQueueTableElement **messageQueueTable; ///< message queue table
722  bool *sentMessage; ///< sent message flag for synchronization
723  Lock *queueLock; ///< Lock for synchronization
724  ConditionVariable *sentMsg; ///< Condition variable for synchronization
725 
726  Lock *tokenAccessLock; ///< lock to access token
727  Lock applicationLock; ///< lock for application
728  Lock rankLock; ///< lock to access rank
729 
730  ///
731  /// get hash code from thread id
732  /// @return hash code
733  ///
734  unsigned int hashCode(
735  pthread_t tid ///< thread id
736  );
737 
738  ///
739  /// allocate memory and copy message
740  ///
741  void *allocateMemAndCopy(
742  const void* buffer, ///< pointer to buffer of the message
743  int count, ///< the number of data element in the message
744  const int datatypeId ///< data type of each element in the message
745  );
746 
747  ///
748  /// copy message
749  ///
750  void copy(
751  void *dest, ///< destination to copy the data
752  const void *src, ///< source of the data
753  int count, ///< the number of data element
754  int datatypeId ///< data type of each element
755  );
756 
757  ///
758  /// free memory
759  ///
760  void freeMem(
761  void* buffer, ///< pointer to buffer of the message
762  int count, ///< the number of data element
763  const int datatypeId ///< data type of each element
764  );
765 
766  ///
767  /// free memory
768  /// @return
769  ///
770  bool freeStandardTypes(
771  MessageQueueElement *elem ///< pointer to a message queue element
772  );
773 
774  ///
775  /// check if tag string table (for debugging) set up correctly
776  /// @return true if tag string table is set up correctly, false otherwise
777  ///
778  virtual bool tagStringTableIsSetUpCoorectly(
779  );
780 
781 public:
782 
783  ///
784  /// constructor of ParaCommPth
785  ///
787  )
788  : comSize(-1),
789  tagTraceFlag(false),
790  token(0),
791  messageQueueTable(0),
792  sentMessage(0),
793  queueLock(0),
794  sentMsg(0),
795  tokenAccessLock(0)
796  {
797  }
798 
799  ///
800  /// destructor of this communicator
801  ///
802  virtual ~ParaCommPth(
803  );
804 
805  ///
806  /// initializer of this communicator
807  ///
808  virtual void init(
809  int argc, ///< the number of arguments
810  char **argv ///< pointers to the arguments
811  );
812 
813  ///
814  /// get start time of this communicator
815  /// (should not be used)
816  /// @return start time
817  ///
818  double getStartTime(
819  )
820  {
821  return timer.getStartTime();
822  }
823 
824  ///
825  /// get rank of caller's thread
826  /// @return rank of caller's thread
827  ///
828  int getRank(
829  );
830 
831  // int getRank( pthread_t tid);
832 
833  ///
834  /// get size of this communicator, which indicates how many threads in a UG process
835  /// @return the number of threads
836  ///
837  int getSize(
838  )
839  {
840  return comSize;
841  }
842 
843  ///
844  /// get size of the messageQueueTable
845  /// @return if dest >= 0 then return the number of only messages waiting to send to dest,
846  /// othrewise return the number of all messages waiting to send.
847  ///
849  int dest=-1
850  )
851  {
852  if( dest >= 0 )
853  {
854  return messageQueueTable[dest]->getSize();
855  }
856  else
857  {
858  int nTotalMessages;
859  for( int i = 0; i < ( comSize + 1 ); i++ )
860  {
861  nTotalMessages += messageQueueTable[i]->getSize();
862  }
863  return nTotalMessages;
864  }
865  }
866 
867  ///
868  /// initializer for LoadCoordinator
869  ///
870  void lcInit(
871  ParaParamSet *paraParamSet ///< UG parameter set
872  );
873 
874  ///
875  /// initializer for Solvers
876  ///
878  ParaParamSet *paraParamSet ///< UG parameter set
879  )
880  {
881  }
882 
883  ///
884  /// initializer for a specific Solver
885  ///
886  void solverInit(
887  int rank, ///< rank of the Solver
888  ParaParamSet *paraParamSet ///< UG parameter set
889  );
890 
891  ///
892  /// reinitializer of a specific Solver
893  ///
894  void solverReInit(
895  int rank, ///< rank of the Solver
896  ParaParamSet *paraParamSet ///< UG parameter set
897  );
898 
899  ///
900  /// delete Solver from this communicator
901  ///
902  void solverDel(
903  int rank ///< rank of the Solver
904  );
905 
906  ///
907  /// abort. How it works sometimes depends on communicator used
908  ///
909  void abort(
910  )
911  {
912  std::abort();
913  }
914 
915  ///
916  /// function to wait Terminated message
917  /// (This function is not used currently)
918  /// @return true when MPI communication is used, false when thread communication used
919  ///
921  )
922  {
923  return false;
924  }
925 
926  ///
927  /// wait token when UG runs with deterministic mode
928  /// @return true, when token is arrived to the rank
929  ///
930  bool waitToken(
931  int rank ///< rank to check if token is arrived
932  );
933 
934  ///
935  /// pass token to from the rank to the next
936  ///
937  void passToken(
938  int rank ///< from this rank, the token is passed
939  );
940 
941  ///
942  /// pass termination token from the rank to the next
943  /// @return true, when the termination token is passed from this rank, false otherwise
944  ///
945  bool passTermToken(
946  int rank ///< from this rank, the termination token is passed
947  );
948 
949  ///
950  /// set received token to this communicator
951  ///
952  void setToken(
953  int rank, ///< rank to set the token
954  int *inToken ///< token to be set
955  );
956 
957  ///
958  /// wait until thread id is registered to thread table
959  ///
960  void waitUntilRegistered(
961  );
962 
963  ///
964  /// notify that all solvers are registered
965  ///
966  void registedAllSolvers(
967  );
968 
969  ///
970  /// get ostream pointer
971  /// @return pointer to ostram
972  ///
973  std::ostream *getOstream(
974  );
975 
976  ///
977  /// lock UG application to synchronize with other threads
978  ///
979  void lockApp(
980  )
981  {
982  applicationLock.lock();
983  }
984 
985  ///
986  /// unlock UG application to synchronize with other threads
987  ///
988  void unlockApp(
989  )
990  {
991  applicationLock.unlock();
992  }
993 
994  ///
995  /// lock rank
996  ///
997  void lockRank(
998  )
999  {
1000  rankLock.lock();
1001  }
1002 
1003  ///
1004  /// unlock rank
1005  ///
1007  )
1008  {
1009  rankLock.unlock();
1010  }
1011 
1012  ///
1013  /// lock UG application to synchronize with other threads
1014  /// (for debugging)
1015  ///
1016  void lockApp(
1017  char const *f, ///< string to indicate what is locked
1018  int l ///< a number to show something
1019  )
1020  {
1021  applicationLock.lock(f,l);
1022  }
1023 
1024  ///
1025  /// lock rank
1026  /// (for debugging)
1027  ///
1028  void lockRank(
1029  char const *f, ///< string to indicate what is locked
1030  int l ///< a number to show something
1031  )
1032  {
1033  rankLock.lock(f,l);
1034  // std::cout << "Rank Locked: " << f << ", " << l << std::endl;
1035  }
1036 
1037  ///
1038  /// unlock rank
1039  /// (for debugging)
1040  ///
1042  char const *f, ///< string to indicate what is locked
1043  int l ///< a number to show something
1044  )
1045  {
1046  rankLock.unlock();
1047  // std::cout << "Rank Unlocked: " << f << ", " << l << std::endl;
1048  }
1049 
1050  ///
1051  /// create ParaTimer object
1052  /// @return pointer to ParaTimer object
1053  ///
1055  )
1056  {
1057  return new ParaTimerTh();
1058  }
1059 
1060  ///
1061  /// broadcast function for standard ParaData types
1062  /// @return always 0 (for future extensions)
1063  ///
1064  int bcast(
1065  void* buffer, ///< point to the head of sending message
1066  int count, ///< the number of data in the message
1067  const int datatypeId, ///< data type in the message
1068  int root ///< root rank for broadcasting
1069  );
1070 
1071  ///
1072  /// send function for standard ParaData types
1073  /// @return always 0 (for future extensions)
1074  ///
1075  int send(
1076  void* bufer, ///< point to the head of sending message
1077  int count, ///< the number of data in the message
1078  const int datatypeId, ///< data type in the message
1079  int dest, ///< destination to send the message
1080  const int tag ///< tag of this message
1081  );
1082 
1083  ///
1084  /// receive function for standard ParaData types
1085  /// @return always 0 (for future extensions)
1086  ///
1087  int receive(
1088  void* bufer, ///< point to the head of receiving message
1089  int count, ///< the number of data in the message
1090  const int datatypeId, ///< data type in the message
1091  int source, ///< source of the message coming from
1092  const int tag ///< tag of the message
1093  );
1094 
1095  ///
1096  /// wait function for a specific tag from a specific source coming from
1097  /// @return always 0 (for future extensions)
1098  ///
1099  void waitSpecTagFromSpecSource(
1100  const int source, ///< source rank which the message should come from
1101  const int tag, ///< tag which the message should wait
1102  int *receivedTag ///< tag of the message which is arrived
1103  );
1104 
1105  ///
1106  /// probe function which waits a new message
1107  /// @return always true
1108  ///
1109  bool probe(
1110  int *source, ///< source rank of the message arrived
1111  int *tag ///< tag of the message arrived
1112  );
1113 
1114  ///
1115  /// iProbe function which checks if a new message is arrived or not
1116  /// @return true when a new message exists
1117  ///
1118  bool iProbe(
1119  int *source, ///< source rank of the message arrived
1120  int *tag ///< tag of the message arrived
1121  );
1122 
1123  ///
1124  /// User type send for created data type
1125  /// @return always 0 (for future extensions)
1126  ///
1127  int uTypeSend(
1128  void* bufer, ///< point to the head of sending message
1129  const int datatypeId, ///< created data type
1130  int dest, ///< destination rank
1131  int tag ///< tag of the message
1132  );
1133 
1134  ///
1135  /// User type receive for created data type
1136  /// @return always 0 (for future extensions)
1137  ///
1138  int uTypeReceive(
1139  void** bufer, ///< point to the head of receiving message
1140  const int datatypeId, ///< created data type
1141  int source, ///< source rank
1142  int tag ///< tag of the message
1143  );
1144 
1145  ///
1146  /// get Tag string for debugging
1147  /// @return string which shows Tag
1148  ///
1149  virtual const char *getTagString(
1150  int tag /// tag to be converted to string
1151  );
1152 
1153 };
1154 
1155 #define DEF_PARA_COMM( para_comm, comm ) ParaCommPth *para_comm = dynamic_cast< ParaCommPth* >(comm)
1156 #define LOCK_APP( comm ) comm->lockApp(__FILE__, __LINE__)
1157 #define LOCK_RANK( comm ) comm->lockRank(__FILE__, __LINE__)
1158 #define UNLOCK_RANK( comm ) comm->unlockRank(__FILE__, __LINE__)
1159 }
1160 
1161 #endif // __PARA_COMM_PTH_H__
int count
number of the data type elements
~MessageQueueElement()
destructor of MessageQueueElement
Definition: paraCommPth.h:151
static const char * tagStringTable[]
mutex for interrupt message monitor
Condition variable.
void unlockApp()
unlock UG application to synchronize with other threads
Definition: paraCommPth.h:988
std::ostream * getOstream()
getter of tag trace stream of this rank
Definition: paraCommPth.h:667
class ParaSolverState (ParaSolver state object for notification message)
int getDataTypeId()
getter of the data type id
Definition: paraCommPth.h:180
void lockApp()
lock UG application to synchronize with other threads
Definition: paraCommPth.h:979
static const int ParaCalculationStateType
int ** token
index 0: token index 1: token color -1: green > 0: yellow ( termination origin solver number ) -2: re...
Definition: paraCommPth.h:710
MessageQueueElement * getHead()
getter of head
Definition: paraCommPth.h:433
ParaTimer extension for threads.
MessageQueueElement * extarctElement(bool *sentMessage)
extracts a message (This method is only for desctructor of ParaCommCPP11. No lock is necessary...
Definition: paraCommPth.h:381
MessageQueueElement * checkElementWithTag(int tag)
check if the specified message with tag exists or nor
Definition: paraCommPth.h:298
static ScipParaParamSet * paraParamSet
Definition: fscip.cpp:74
MessageQueueElement()
default constructor of MessageQueueElement
Definition: paraCommPth.h:130
Pthread condition variable for UG Framework.
void * data
data of the message
int getSize()
get size of this communicator, which indicates how many threads in a UG process
Definition: paraCommPth.h:837
void lock()
Acquire this lock. The function sets the internal file/line (debugging) fields to generic values...
Definition: paraPthLock.h:133
void lockApp(char const *f, int l)
lock UG application to synchronize with other threads (for debugging)
Definition: paraCommPth.h:1016
void abort()
abort. How it works sometimes depends on communicator used
Definition: paraCommPth.h:909
static const int TagAny
Definition: paraTagDef.h:44
static const int HashTableSize
size of thread table : this limits the number of threads
Definition: paraCommPth.h:95
static __thread int localRank
local thread rank
Definition: paraCommPth.h:719
ParaTimer * createParaTimer()
create ParaTimer object
Definition: paraCommPth.h:1054
MessageQueueElement * checkElement(int source, int datatypeId, int tag)
check if the specified message exists or nor
Definition: paraCommPth.h:272
Base class of Calculation state in a ParaSolver.
int getCount()
getter of the number of the data type elements
Definition: paraCommPth.h:170
void lockRank()
lock rank
Definition: paraCommPth.h:997
static const int ParaInstanceType
Definition: paraCommCPP11.h:98
Defines for UG Framework.
const char * getTagString(int tag)
get Tag string for debugging
MessageQueueElement * getNext()
getter of the pointer to the next MessageQueueElement
Definition: paraCommPth.h:210
#define CONDITIONVARIABLE_WAIT(var, cond)
Perform exception-safe waiting on a condition variable. This macro waits on VAR until COND is true...
class ParaTimerMpi (Timer used in thread communication)
Definition: paraTimerTh.h:49
Lock * queueLock
Lock for synchronization.
Definition: paraCommPth.h:723
MessageQueueElement * next
point to next message queue element
ThreadsTableElement()
default constructor of ThreadsTableElement
Definition: paraCommPth.h:573
ThreadsTableElement(int inRank, ParaParamSet *paraParamSet)
constructor of ThreadsTableElement
Definition: paraCommPth.h:585
Class of MessageQueueTableElement.
Lock applicationLock
lock for application
Definition: paraCommPth.h:727
static const int TYPE_LAST
Definition: paraComm.h:80
Class for message queue element.
void unlockRank(char const *f, int l)
unlock rank (for debugging)
Definition: paraCommPth.h:1041
ConditionVariable * sentMsg
Condition variable for synchronization.
Definition: paraCommPth.h:724
MessageQueueElement(int inSource, int inCount, int inDataTypeId, int inTag, void *inData)
constructor of MessageQueueElement
Definition: paraCommPth.h:138
static const int TagTraceFileName
Definition: paraParamSet.h:126
int getSource()
getter of source rank
Definition: paraCommPth.h:160
ThreadsTableElement * next
next ThradsTableElement pointer
Definition: paraCommPth.h:566
Pthread lock for UG Framework.
MessageQueueElement * extarctElement(bool *sentMessage, int source, int datatypeId, int tag)
extracts a message
Definition: paraCommPth.h:320
Communicator object for pthreads thread communications.
Definition: paraCommPth.h:705
int getNumOfMessagesWaitingToSend(int dest=-1)
get size of the messageQueueTable
Definition: paraCommPth.h:848
int comSize
communicator size : number of threads joined in this system
Definition: paraCommPth.h:708
bool waitTerminatedMessage()
function to wait Terminated message (This function is not used currently)
Definition: paraCommPth.h:920
void unlock()
Release this lock.
Definition: paraPthLock.h:161
static const int ParaSolverTerminationStateType
Lock * tokenAccessLock
lock to access token
Definition: paraCommPth.h:726
Base class of communicator for UG Framework.
static const int TagTrace
Definition: paraParamSet.h:72
System timer.
void link(MessageQueueElement *nextElement)
link to the next MessageQueueElement
Definition: paraCommPth.h:219
bool isStringParamDefaultValue(int param)
check if string parameter is default value or not
class ParaSolverTerminationState (Solver termination state in a ParaSolver)
void waitMessage(ConditionVariable *sentMsg, bool *sentMessage, int source, int *tag)
wait for a specified message coming to a queue
Definition: paraCommPth.h:510
ParaSysTimer timer
system timer
Definition: paraCommPth.h:715
void unlockRank()
unlock rank
Definition: paraCommPth.h:1006
class ParaParamSet
Definition: paraParamSet.h:850
ParaCommPth()
constructor of ParaCommPth
Definition: paraCommPth.h:786
int tag
tag of the message, -1 : in case of broadcast message
class for instance data
Definition: paraInstance.h:50
void setRank(int r)
setter of this thread rank
Definition: paraCommPth.h:656
Class for the difference between instance and subproblem.
static const int ParaSolutionType
Definition: paraCommCPP11.h:99
int dataTypeId
data type id
const char * getStringParamValue(int param)
for char parameters
Class that implements a lock. The class wraps around pthread_mutex_t and adds some safeguards...
Definition: paraPthLock.h:82
Class of ThreadsTableElement.
MessageQueueTableElement ** messageQueueTable
message queue table
Definition: paraCommPth.h:721
void waitMessage(ConditionVariable *sentMsg, bool *sentMessage, int source, int datatypeId, int tag)
wait for a specified message coming to a queue
Definition: paraCommPth.h:478
Class ParaSysTimer.
Definition: paraSysTimer.h:107
void waitMessage(ConditionVariable *sentMsg, bool *sentMessage)
wait for a message coming to a queue
Definition: paraCommPth.h:467
~ThreadsTableElement()
destructor of ThreadsTableElement
Definition: paraCommPth.h:617
MessageQueueTableElement()
default constructor of MessageQueueTableElement
Definition: paraCommPth.h:245
void lockRank(char const *f, int l)
lock rank (for debugging)
Definition: paraCommPth.h:1028
double getStartTime(void)
get start time
int getTag()
getter of the message tag
Definition: paraCommPth.h:190
void * getData()
getter of data
Definition: paraCommPth.h:200
bool tagTraceFlag
indicate if tags are traced or not
Definition: paraCommPth.h:709
bool isEmpty()
check if the queue is empty or not
Definition: paraCommPth.h:453
int source
source thread rank of this message
int getSize()
getter of size
Definition: paraCommPth.h:443
double getStartTime()
get start time of this communicator (should not be used)
Definition: paraCommPth.h:818
bool tagStringTableIsSetUpCoorectly()
check if tag string table (for debugging) set up correctly
static const int ParaTaskType
Lock rankLock
lock to access rank
Definition: paraCommPth.h:728
static const int UG_USER_TYPE_LAST
class ParaTimer
Definition: paraTimer.h:48
bool * sentMessage
sent message flag for synchronization
Definition: paraCommPth.h:722
static const int ParaSolverStateType
int getRank()
getter of this thread rank
Definition: paraCommPth.h:647
ThreadsTableElement * getNext()
get next element
Definition: paraCommPth.h:677
static const int ParaParamSetType
static const int UG_USER_TYPE_FIRST
user defined transfer data types
Definition: paraCommCPP11.h:97
static const int ParaRacingRampUpParamType
Base class of communicator object.
Definition: paraComm.h:101
void enqueue(ConditionVariable *sentMsg, bool *sentMessage, MessageQueueElement *newElement)
enqueue a message
Definition: paraCommPth.h:407
class for solution
Definition: paraSolution.h:53
~MessageQueueTableElement()
destructor of MessageQueueTableElement
Definition: paraCommPth.h:255
class ParaTask
Definition: paraTask.h:541
bool getBoolParamValue(int param)
for bool parameters
void solverInit(ParaParamSet *paraParamSet)
initializer for Solvers
Definition: paraCommPth.h:877