Scippy

UG

Ubiquity Generator framework

paraCommPth.h
Go to the documentation of this file.
1/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2/* */
3/* This file is part of the program and software framework */
4/* UG --- Ubquity Generator Framework */
5/* */
6/* Copyright Written by Yuji Shinano <shinano@zib.de>, */
7/* Copyright (C) 2021-2024 by Zuse Institute Berlin, */
8/* licensed under LGPL version 3 or later. */
9/* Commercial licenses are available through <licenses@zib.de> */
10/* */
11/* This code is free software; you can redistribute it and/or */
12/* modify it under the terms of the GNU Lesser General Public License */
13/* as published by the Free Software Foundation; either version 3 */
14/* of the License, or (at your option) any later version. */
15/* */
16/* This program is distributed in the hope that it will be useful, */
17/* but WITHOUT ANY WARRANTY; without even the implied warranty of */
18/* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */
19/* GNU Lesser General Public License for more details. */
20/* */
21/* You should have received a copy of the GNU Lesser General Public License */
22/* along with this program. If not, see <http://www.gnu.org/licenses/>. */
23/* */
24/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
25
26/**@file paraCommPth.h
27 * @brief ParaComm extension for Pthreads communication
28 * @author Yuji Shinano
29 *
30 *
31 *
32 */
33
34/*---+----1----+----2----+----3----+----4----+----5----+----6----+----7----+----8----+----9----+----0----+----1----+----2*/
35
36
37#ifndef __PARA_COMM_PTH_H__
38#define __PARA_COMM_PTH_H__
39
40#include <pthread.h>
41#include <stdexcept>
42#include <iostream>
43#include <ostream>
44#include <fstream>
45#include <sstream>
46#include <string>
47#include <iomanip>
48#include <cassert>
49#include "paraDef.h"
50#include "paraComm.h"
51#include "paraSysTimer.h"
52#include "paraParamSetTh.h"
53#include "paraTimerTh.h"
54#include "paraPthLock.h"
55#include "paraPthCondVar.h"
56
57#define HashEntry( tid ) ( hashCode(tid) % UG::HashTableSize )
58
59#define TAG_TRACE( call, fromTo, sourceDest, tag ) \
60 if( tagTraceFlag ) \
61 { \
62 if( tag >= 0 ) \
63 { \
64 /* std::cout << " call = " << #call << ", Rank = " << getRank() << ", tag = " << tag << ", " << tagStringTable[tag] << std::endl; */ \
65 *getOstream() << timer.getRTimeInterval() << " [Rank = " << getRank() << "] " << #call << " " << #fromTo \
66 << " " << sourceDest << " with Tag = " << getTagString(tag) << std::endl; \
67 } \
68 else \
69 { \
70 /* std::cout << " call = " << #call << ", Rank = " << getRank() << ", tag = " << tag << std::endl; */ \
71 *getOstream() << timer.getRTimeInterval() << " [Rank = " << getRank() << "] " << #call << " " << #fromTo \
72 << " " << sourceDest << " as broadcast" << std::endl; \
73 } \
74}
75
76namespace UG
77{
78
79///
80/// user defined transfer data types
81///
82static const int UG_USER_TYPE_FIRST = TYPE_LAST + 1;
83static const int ParaInstanceType = UG_USER_TYPE_FIRST + 0;
84static const int ParaSolutionType = UG_USER_TYPE_FIRST + 1;
85static const int ParaParamSetType = UG_USER_TYPE_FIRST + 2;
86static const int ParaTaskType = UG_USER_TYPE_FIRST + 3;
91// static const int ParaSolverDiffParamType = USER_TYPE_FIRST + 8;
93//static const int ParaInitialStatType = USER_TYPE_FIRST + 9;
94
95static const int HashTableSize = 751; ///< size of thread table : this limits the number of threads
96
97///
98/// Class for message queue element
99///
100/// NOTE : For basic data types, this is copy of sender side memory.
101/// When the memory is copied at receive function, the memory
102/// have to be freed.
103/// For user defined data type, this is the receiver side memory,
104/// because it is better to allocate memory in the sender side for
105/// mutex locking. Sender side functions have to allocate memory.
106/// In this case, memory do not have to be freed. The memory is for
107/// receiver side.
108///
110{
111 int source; ///< source thread rank of this message
112 int count; ///< number of elements of the data type
113 int dataTypeId; ///< data type id
114 int tag; ///< -1 : in case of broadcast message
115 void *data; ///< NOTE : For basic data types, this is copy of sender side memory.
116 ///< When the memory is copied at receive function, the memory
117 ///< have to be freed.
118 ///< For user defined data type, this is the receiver side memory,
119 ///< because it is better to allocate memory in the sender side for
120 ///< mutex locking. Sender side functions have to allocate memory.
121 ///< In this case, memory do not hvae to freed. The memory is for
122 ///< receiver side.
123 MessageQueueElement *next; ///< point to next message queue element
124
125public:
126
127 ///
128 /// default constructor of MessageQueueElement
129 ///
131 ) : source(-1), count(0), dataTypeId(-1), tag(-1), data(0), next(0)
132 {
133 }
134
135 ///
136 /// constructor of MessageQueueElement
137 ///
139 int inSource, ///< source thread rank of this message
140 int inCount, ///< number of the data type elements
141 int inDataTypeId, ///< data type id
142 int inTag, ///< tag of the message, -1 : in case of broadcast message
143 void *inData ///< data of the message
144 ) : source(inSource), count(inCount), dataTypeId(inDataTypeId), tag(inTag), data(inData), next(0)
145 {
146 }
147
148 ///
149 /// destructor of MessageQueueElement
150 ///
152 )
153 {
154 }
155
156 ///
157 /// getter of source rank
158 /// @return rank of the source
159 ///
161 )
162 {
163 return source;
164 }
165
166 ///
167 /// getter of the number of the data type elements
168 /// @return the number of the data type elements
169 ///
171 )
172 {
173 return count;
174 }
175
176 ///
177 /// getter of the data type id
178 /// @return data type id
179 ///
181 )
182 {
183 return dataTypeId;
184 }
185
186 ///
187 /// getter of the message tag
188 /// @return tag of the message
189 ///
191 )
192 {
193 return tag;
194 }
195
196 ///
197 /// getter of data
198 /// @return pointer to the data
199 ///
200 void *getData(
201 )
202 {
203 return data;
204 }
205
206 ///
207 /// getter of the pointer to the next MessageQueueElement
208 /// @return pointer to MessageQueueElement
209 ///
211 )
212 {
213 return next;
214 }
215
216 ///
217 /// link to the next MessageQueueElement
218 ///
219 void link(
220 MessageQueueElement *nextElement ///< pointer to MessageQueueElement
221 )
222 {
223 next = nextElement;
224 }
225
226};
227
228///
229/// Class of MessageQueueTableElement
230///
231class MessageQueueTableElement
232{
233 // bool sentMessage;
234 // Lock queueLock;
235 // ConditionVariable sentMsg;
236 MessageQueueElement *head; ///< head of the message queue
237 MessageQueueElement *tail; ///< tail of the message queue
238 int size; ///< number of the messages in queue
239
240public:
241
242 ///
243 /// default constructor of MessageQueueTableElement
244 ///
246 ) : head(0), tail(0), size(0)
247 // ) : sentMessage(false), head(0), tail(0)
248 {
249 // sentMsg.setLock(&queueLock);
250 }
251
252 ///
253 /// destructor of MessageQueueTableElement
254 ///
256 )
257 {
258 // LOCKED (&queueLock) {
259 while( head )
260 {
262 delete head;
263 head = next;
264 }
265 //}
266 }
267
268 ///
269 /// check if the specified message exists or nor
270 /// @return pointer to MessageQueueElement, 0: no element
271 ///
273 int source, ///< source rank
274 int datatypeId, ///< data type id
275 int tag ///< tag of the message
276 )
277 {
278 MessageQueueElement *ret = 0;
279 // LOCKED (&queueLock) {
280 for( MessageQueueElement *current = head; current; current = current->getNext() )
281 {
282 if( current->getSource() == source
283 && current->getDataTypeId() == datatypeId
284 && current->getTag() == tag )
285 {
286 ret = current;
287 break;
288 }
289 }
290 // }
291 return ret;
292 }
293
294 ///
295 /// check if the specified message with tag exists or nor
296 /// @return pointer to MessageQueueElement, 0: no element
297 ///
299 int tag ///< tag of the message
300 )
301 {
302 MessageQueueElement *ret = 0;
303 // LOCKED (&queueLock) {
304 for( MessageQueueElement *current = head; current; current = current->getNext() )
305 {
306 if( current->getTag() == tag )
307 {
308 ret = current;
309 break;
310 }
311 }
312 // }
313 return ret;
314 }
315
316 ///
317 /// extracts a message
318 /// @return pointer to the message
319 ///
321 bool *sentMessage, ///< for synchronization
322 int source, ///< source rank
323 int datatypeId, ///< data type id
324 int tag ///< tag of the message
325 )
326 {
327 MessageQueueElement *ret = 0;
328
329 // LOCKED (&queueLock) {
331 MessageQueueElement *current = head;
332 while( current )
333 {
334 MessageQueueElement *next = current->getNext();
335 if( current->getSource() == source
336 && current->getDataTypeId() == datatypeId
337 && current->getTag() == tag )
338 {
339 ret = current;
340 ret->link(0);
341 if( current == head )
342 {
343 if( current == tail )
344 {
345 head = 0;
346 tail = 0;
347 *sentMessage = false;
348 }
349 else
350 {
351 head=next;
352 }
353 }
354 else
355 {
356 if( current == tail )
357 {
358 tail = prev;
359 }
360 prev->link(next);
361 }
362 size--;
363 break;
364 }
365 else
366 {
367 prev = current;
368 current = next;
369 }
370 }
371 // }
372 return ret;
373 }
374
375 ///
376 /// extracts a message
377 /// (This method is only for desctructor of ParaCommCPP11. No lock is necessary.)
378 ///
379 /// @return pointer to the message
380 ///
382 bool *sentMessage ///< for synchronization
383 )
384 {
385 if( head == 0 ) return 0;
386
387 MessageQueueElement *current = head;
388 MessageQueueElement *next = current->getNext();
389 current->link(0);
390 if( current == tail )
391 {
392 head = 0;
393 tail = 0;
394 *sentMessage = false;
395 }
396 else
397 {
398 head=next;
399 }
400 size--;
401 return current;
402 }
403
404 ///
405 /// enqueue a message
406 ///
408 ConditionVariable *sentMsg, ///< condition variable for synchronization
409 bool *sentMessage, ///< mutex for synchronization
410 MessageQueueElement *newElement ///< message queue element to enter
411 )
412 {
413 // LOCKED (&queueLock) {
414 if( tail == 0 )
415 {
416 head = tail = newElement;
417 }
418 else
419 {
420 tail->link(newElement);
421 tail = newElement;
422 }
423 size++;
424 *sentMessage = true;
425 (*sentMsg).signal();
426 // }
427 }
428
429 ///
430 /// getter of head
431 /// @return head element pointer
432 ///
434 )
435 {
436 return head;
437 }
438
439 ///
440 /// getter of size
441 /// @return size of the message queue
442 ///
444 )
445 {
446 return size;
447 }
448
449 ///
450 /// check if the queue is empty or not
451 /// @return true, if it is empty, else false
452 ///
454 )
455 {
456 bool empty = true;
457 // LOCKED (&queueLock) {
458 if( head ) empty = false;
459 // }
460 assert( !empty || size == 0 );
461 return empty;
462 }
463
464 ///
465 /// wait for a message coming to a queue
466 ///
468 ConditionVariable *sentMsg, ///< condition variable for synchronization
469 bool *sentMessage ///< flag for synchronization
470 )
471 {
472 CONDITIONVARIABLE_WAIT (sentMsg, *sentMessage == true);
473 }
474
475 ///
476 /// wait for a specified message coming to a queue
477 ///
479 ConditionVariable *sentMsg, ///< condition variable for synchronization
480 bool *sentMessage, ///< flag for synchronization
481 int source, ///< source rank of the message
482 int datatypeId, ///< data type id of the message
483 int tag ///< tag of the message
484 )
485 {
486 for(;;)
487 {
488 // CONDITIONVARIABLE_WAIT (&sentMsg, sentMessage == true);
489 CONDITIONVARIABLE_WAIT (sentMsg, *sentMessage == true);
490 MessageQueueElement *current = head;
491 while( current )
492 {
493 MessageQueueElement *next = current->getNext();
494 if( current->getSource() == source
495 && current->getDataTypeId() == datatypeId
496 && current->getTag() == tag )
497 {
498 break;
499 }
500 current = next;
501 }
502 if( current ) break;
503 *sentMessage = false;
504 }
505 }
506
507 ///
508 /// wait for a specified message coming to a queue
509 ///
511 ConditionVariable *sentMsg, ///< condition variable for synchronization
512 bool *sentMessage, ///< flag for synchronization
513 int source, ///< source rank of the message
514 int *tag ///< tag of the message
515 )
516 {
517 for(;;)
518 {
519 // CONDITIONVARIABLE_WAIT (&sentMsg, sentMessage == true);
520 CONDITIONVARIABLE_WAIT (sentMsg, *sentMessage == true);
521 MessageQueueElement *current = head;
522 if( *tag == TagAny )
523 {
524 while( current )
525 {
526 MessageQueueElement *next = current->getNext();
527 if( current->getSource() == source )
528 {
529 *tag = current->getTag();
530 break;
531 }
532 current = next;
533 }
534 }
535 else
536 {
537 while( current )
538 {
539 MessageQueueElement *next = current->getNext();
540 if( current->getSource() == source
541 && current->getTag() == (*tag) )
542 {
543 break;
544 }
545 current = next;
546 }
547 }
548 if( current ) break;
549 *sentMessage = false;
550 }
551 }
552
553};
554
555///
556/// Class of ThreadsTableElement
557///
558class ThreadsTableElement
559{
560
561#ifdef _UG_NO_THREAD_LOCAL_STATIC
562 pthread_t tid; ///< thread id of this thread
563#endif
564 int rank; ///< rank of this thread
565 std::ostream *tos; ///< tag trace stream for this thread
566 ThreadsTableElement *next; ///< next ThradsTableElement pointer
567
568public:
569
570 ///
571 /// default constructor of ThreadsTableElement
572 ///
574 ) :
575#ifdef _UG_NO_THREAD_LOCAL_STATIC
576 tid(0),
577#endif
578 rank(0), tos(0), next(0)
579 {
580 }
581
582 ///
583 /// constructor of ThreadsTableElement
584 ///
586#ifdef _UG_NO_THREAD_LOCAL_STATIC
587 pthread_t inTid, ///< thread id
588#endif
589 int inRank, ///< thread rank
590 ParaParamSet *paraParamSet ///< UG parameter set
591 ) :
592#ifdef _UG_NO_THREAD_LOCAL_STATIC
593 tid(inTid),
594#endif
595 rank(inRank), tos(0), next(0)
596 {
597 if( paraParamSet->getBoolParamValue(TagTrace) )
598 {
599 if( paraParamSet->isStringParamDefaultValue(TagTraceFileName) )
600 {
601 tos = &std::cout;
602 }
603 else
604 {
605 std::ostringstream s;
606 s << paraParamSet->getStringParamValue(TagTraceFileName) << inRank;
607 std::ofstream *ofs = new std::ofstream();
608 ofs->open(s.str().c_str());
609 tos = ofs;
610 }
611 }
612 }
613
614 ///
615 /// destructor of ThreadsTableElement
616 ///
618 )
619 {
620 if( tos )
621 {
622 if( tos != &std::cout )
623 {
624 std::ofstream *ofs = dynamic_cast<std::ofstream *>(tos);
625 ofs->close();
626 delete ofs;
627 }
628 }
629 }
630
631#ifdef _UG_NO_THREAD_LOCAL_STATIC
632 ///
633 /// get thread id
634 /// @return thread id
635 ///
636 pthread_t getTid(
637 )
638 {
639 return tid;
640 }
641#endif
642
643 ///
644 /// getter of this thread rank
645 /// @return the thread rank
646 ///
648 )
649 {
650 return rank;
651 }
652
653 ///
654 /// setter of this thread rank
655 ///
657 int r ///< rank to be set
658 )
659 {
660 rank = r;
661 }
662
663 ///
664 /// getter of tag trace stream of this rank
665 /// @return ponter to the tag trace stream
666 ///
667 std::ostream *getOstream(
668 )
669 {
670 return tos;
671 }
672
673 ///
674 /// get next element
675 /// @return next element
676 ///
678 )
679 {
680 return next;
681 }
682
683#ifdef _UG_NO_THREAD_LOCAL_STATIC
684 void link(
685 ThreadsTableElement *nextElement
686 )
687 {
688 next = nextElement;
689 }
690#endif
691
692};
693
694class ParaCalculationState;
695class ParaTask;
696class ParaSolverState;
697class ParaSolverTerminationState;
698class ParaDiffSubproblem;
699class ParaInstance;
700class ParaSolution;
701
702///
703/// Communicator object for pthreads thread communications
704///
705class ParaCommPth : public ParaComm
706{
707protected:
708 int comSize; ///< communicator size : number of threads joined in this system
709 bool tagTraceFlag; ///< indicate if tags are traced or not
710 int **token; ///< index 0: token
711 ///< index 1: token color
712 ///< -1: green
713 ///< > 0: yellow ( termination origin solver number )
714 ///< -2: red ( means the solver can terminate )
715 ParaSysTimer timer; ///< system timer
716 static const char *tagStringTable[]; ///< tag name string table
717 static ThreadsTableElement *threadsTable[HashTableSize]; ///< threads table: index is thread rank
718#ifndef _UG_NO_THREAD_LOCAL_STATIC
719 static __thread int localRank; ///< local thread rank
720#endif
721 MessageQueueTableElement **messageQueueTable; ///< message queue table
722 bool *sentMessage; ///< sent message flag for synchronization
723 Lock *queueLock; ///< Lock for synchronization
724 ConditionVariable *sentMsg; ///< Condition variable for synchronization
725
726 Lock *tokenAccessLock; ///< lock to access token
727 Lock applicationLock; ///< lock for application
728 Lock rankLock; ///< lock to access rank
729
730 ///
731 /// get hash code from thread id
732 /// @return hash code
733 ///
734 unsigned int hashCode(
735 pthread_t tid ///< thread id
736 );
737
738 ///
739 /// allocate memory and copy message
740 ///
741 void *allocateMemAndCopy(
742 const void* buffer, ///< pointer to buffer of the message
743 int count, ///< the number of data element in the message
744 const int datatypeId ///< data type of each element in the message
745 );
746
747 ///
748 /// copy message
749 ///
750 void copy(
751 void *dest, ///< destination to copy the data
752 const void *src, ///< source of the data
753 int count, ///< the number of data element
754 int datatypeId ///< data type of each element
755 );
756
757 ///
758 /// free memory
759 ///
760 void freeMem(
761 void* buffer, ///< pointer to buffer of the message
762 int count, ///< the number of data element
763 const int datatypeId ///< data type of each element
764 );
765
766 ///
767 /// free memory
768 /// @return
769 ///
771 MessageQueueElement *elem ///< pointer to a message queue element
772 );
773
774 ///
775 /// check if tag string table (for debugging) set up correctly
776 /// @return true if tag string table is set up correctly, false otherwise
777 ///
779 );
780
781public:
782
783 ///
784 /// constructor of ParaCommPth
785 ///
787 )
788 : comSize(-1),
789 tagTraceFlag(false),
790 token(0),
792 sentMessage(0),
793 queueLock(0),
794 sentMsg(0),
796 {
797 }
798
799 ///
800 /// destructor of this communicator
801 ///
802 virtual ~ParaCommPth(
803 );
804
805 ///
806 /// initializer of this communicator
807 ///
808 virtual void init(
809 int argc, ///< the number of arguments
810 char **argv ///< pointers to the arguments
811 );
812
813 ///
814 /// get start time of this communicator
815 /// (should not be used)
816 /// @return start time
817 ///
819 )
820 {
821 return timer.getStartTime();
822 }
823
824 ///
825 /// get rank of caller's thread
826 /// @return rank of caller's thread
827 ///
828 int getRank(
829 );
830
831 // int getRank( pthread_t tid);
832
833 ///
834 /// get size of this communicator, which indicates how many threads in a UG process
835 /// @return the number of threads
836 ///
838 )
839 {
840 return comSize;
841 }
842
843 ///
844 /// get size of the messageQueueTable
845 /// @return if dest >= 0 then return the number of only messages waiting to send to dest,
846 /// othrewise return the number of all messages waiting to send.
847 ///
849 int dest=-1
850 )
851 {
852 if( dest >= 0 )
853 {
854 return messageQueueTable[dest]->getSize();
855 }
856 else
857 {
858 int nTotalMessages;
859 for( int i = 0; i < ( comSize + 1 ); i++ )
860 {
861 nTotalMessages += messageQueueTable[i]->getSize();
862 }
863 return nTotalMessages;
864 }
865 }
866
867 ///
868 /// initializer for LoadCoordinator
869 ///
870 void lcInit(
871 ParaParamSet *paraParamSet ///< UG parameter set
872 );
873
874 ///
875 /// initializer for Solvers
876 ///
878 ParaParamSet *paraParamSet ///< UG parameter set
879 )
880 {
881 }
882
883 ///
884 /// initializer for a specific Solver
885 ///
886 void solverInit(
887 int rank, ///< rank of the Solver
888 ParaParamSet *paraParamSet ///< UG parameter set
889 );
890
891 ///
892 /// reinitializer of a specific Solver
893 ///
894 void solverReInit(
895 int rank, ///< rank of the Solver
896 ParaParamSet *paraParamSet ///< UG parameter set
897 );
898
899 ///
900 /// delete Solver from this communicator
901 ///
902 void solverDel(
903 int rank ///< rank of the Solver
904 );
905
906 ///
907 /// abort. How it works sometimes depends on communicator used
908 ///
909 void abort(
910 )
911 {
912 std::abort();
913 }
914
915 ///
916 /// function to wait Terminated message
917 /// (This function is not used currently)
918 /// @return true when MPI communication is used, false when thread communication used
919 ///
921 )
922 {
923 return false;
924 }
925
926 ///
927 /// wait token when UG runs with deterministic mode
928 /// @return true, when token is arrived to the rank
929 ///
930 bool waitToken(
931 int rank ///< rank to check if token is arrived
932 );
933
934 ///
935 /// pass token to from the rank to the next
936 ///
937 void passToken(
938 int rank ///< from this rank, the token is passed
939 );
940
941 ///
942 /// pass termination token from the rank to the next
943 /// @return true, when the termination token is passed from this rank, false otherwise
944 ///
945 bool passTermToken(
946 int rank ///< from this rank, the termination token is passed
947 );
948
949 ///
950 /// set received token to this communicator
951 ///
952 void setToken(
953 int rank, ///< rank to set the token
954 int *inToken ///< token to be set
955 );
956
957 ///
958 /// wait until thread id is registered to thread table
959 ///
961 );
962
963 ///
964 /// notify that all solvers are registered
965 ///
967 );
968
969 ///
970 /// get ostream pointer
971 /// @return pointer to ostram
972 ///
973 std::ostream *getOstream(
974 );
975
976 ///
977 /// lock UG application to synchronize with other threads
978 ///
980 )
981 {
983 }
984
985 ///
986 /// unlock UG application to synchronize with other threads
987 ///
989 )
990 {
992 }
993
994 ///
995 /// lock rank
996 ///
998 )
999 {
1000 rankLock.lock();
1001 }
1002
1003 ///
1004 /// unlock rank
1005 ///
1007 )
1008 {
1009 rankLock.unlock();
1010 }
1011
1012 ///
1013 /// lock UG application to synchronize with other threads
1014 /// (for debugging)
1015 ///
1017 char const *f, ///< string to indicate what is locked
1018 int l ///< a number to show something
1019 )
1020 {
1021 applicationLock.lock(f,l);
1022 }
1023
1024 ///
1025 /// lock rank
1026 /// (for debugging)
1027 ///
1029 char const *f, ///< string to indicate what is locked
1030 int l ///< a number to show something
1031 )
1032 {
1033 rankLock.lock(f,l);
1034 // std::cout << "Rank Locked: " << f << ", " << l << std::endl;
1035 }
1036
1037 ///
1038 /// unlock rank
1039 /// (for debugging)
1040 ///
1042 char const *f, ///< string to indicate what is locked
1043 int l ///< a number to show something
1044 )
1045 {
1046 rankLock.unlock();
1047 // std::cout << "Rank Unlocked: " << f << ", " << l << std::endl;
1048 }
1049
1050 ///
1051 /// create ParaTimer object
1052 /// @return pointer to ParaTimer object
1053 ///
1055 )
1056 {
1057 return new ParaTimerTh();
1058 }
1059
1060 ///
1061 /// broadcast function for standard ParaData types
1062 /// @return always 0 (for future extensions)
1063 ///
1064 int bcast(
1065 void* buffer, ///< point to the head of sending message
1066 int count, ///< the number of data in the message
1067 const int datatypeId, ///< data type in the message
1068 int root ///< root rank for broadcasting
1069 );
1070
1071 ///
1072 /// send function for standard ParaData types
1073 /// @return always 0 (for future extensions)
1074 ///
1075 int send(
1076 void* bufer, ///< point to the head of sending message
1077 int count, ///< the number of data in the message
1078 const int datatypeId, ///< data type in the message
1079 int dest, ///< destination to send the message
1080 const int tag ///< tag of this message
1081 );
1082
1083 ///
1084 /// receive function for standard ParaData types
1085 /// @return always 0 (for future extensions)
1086 ///
1087 int receive(
1088 void* bufer, ///< point to the head of receiving message
1089 int count, ///< the number of data in the message
1090 const int datatypeId, ///< data type in the message
1091 int source, ///< source of the message coming from
1092 const int tag ///< tag of the message
1093 );
1094
1095 ///
1096 /// wait function for a specific tag from a specific source coming from
1097 ///
1099 const int source, ///< source rank which the message should come from
1100 const int tag, ///< tag which the message should wait
1101 int *receivedTag ///< tag of the message which is arrived
1102 );
1103
1104 ///
1105 /// probe function which waits a new message
1106 /// @return always true
1107 ///
1108 bool probe(
1109 int *source, ///< source rank of the message arrived
1110 int *tag ///< tag of the message arrived
1111 );
1112
1113 ///
1114 /// iProbe function which checks if a new message is arrived or not
1115 /// @return true when a new message exists
1116 ///
1117 bool iProbe(
1118 int *source, ///< source rank of the message arrived
1119 int *tag ///< tag of the message arrived
1120 );
1121
1122 ///
1123 /// User type send for created data type
1124 /// @return always 0 (for future extensions)
1125 ///
1126 int uTypeSend(
1127 void* bufer, ///< point to the head of sending message
1128 const int datatypeId, ///< created data type
1129 int dest, ///< destination rank
1130 int tag ///< tag of the message
1131 );
1132
1133 ///
1134 /// User type receive for created data type
1135 /// @return always 0 (for future extensions)
1136 ///
1137 int uTypeReceive(
1138 void** bufer, ///< point to the head of receiving message
1139 const int datatypeId, ///< created data type
1140 int source, ///< source rank
1141 int tag ///< tag of the message
1142 );
1143
1144 ///
1145 /// get Tag string for debugging
1146 /// @return string which shows Tag
1147 ///
1148 virtual const char *getTagString(
1149 int tag /// tag to be converted to string
1150 );
1151
1152};
1153
1154#define DEF_PARA_COMM( para_comm, comm ) ParaCommPth *para_comm = dynamic_cast< ParaCommPth* >(comm)
1155#define LOCK_APP( comm ) comm->lockApp(__FILE__, __LINE__)
1156#define LOCK_RANK( comm ) comm->lockRank(__FILE__, __LINE__)
1157#define UNLOCK_RANK( comm ) comm->unlockRank(__FILE__, __LINE__)
1158}
1159
1160#endif // __PARA_COMM_PTH_H__
Condition variable.
Class that implements a lock. The class wraps around pthread_mutex_t and adds some safeguards.
Definition: paraPthLock.h:82
void unlock()
Release this lock.
Definition: paraPthLock.h:161
void lock()
Acquire this lock. The function sets the internal file/line (debugging) fields to generic values.
Definition: paraPthLock.h:133
Class for message queue element.
int source
source thread rank of this message
int dataTypeId
data type id
int getSource()
getter of source rank
Definition: paraCommPth.h:160
MessageQueueElement()
default constructor of MessageQueueElement
Definition: paraCommPth.h:130
MessageQueueElement * next
point to next message queue element
int getDataTypeId()
getter of the data type id
Definition: paraCommPth.h:180
void link(MessageQueueElement *nextElement)
link to the next MessageQueueElement
Definition: paraCommPth.h:219
MessageQueueElement(int inSource, int inCount, int inDataTypeId, int inTag, void *inData)
constructor of MessageQueueElement
Definition: paraCommPth.h:138
int tag
tag of the message, -1 : in case of broadcast message
void * getData()
getter of data
Definition: paraCommPth.h:200
void * data
data of the message
int count
number of the data type elements
int getTag()
getter of the message tag
Definition: paraCommPth.h:190
~MessageQueueElement()
destructor of MessageQueueElement
Definition: paraCommPth.h:151
int getCount()
getter of the number of the data type elements
Definition: paraCommPth.h:170
MessageQueueElement * getNext()
getter of the pointer to the next MessageQueueElement
Definition: paraCommPth.h:210
Class of MessageQueueTableElement.
MessageQueueElement * extarctElement(bool *sentMessage, int source, int datatypeId, int tag)
extracts a message
Definition: paraCommPth.h:320
int getSize()
getter of size
Definition: paraCommPth.h:443
int size
number of the messages in queue
void waitMessage(ConditionVariable *sentMsg, bool *sentMessage, int source, int datatypeId, int tag)
wait for a specified message coming to a queue
Definition: paraCommPth.h:478
MessageQueueElement * extarctElement(bool *sentMessage)
extracts a message (This method is only for desctructor of ParaCommCPP11. No lock is necessary....
Definition: paraCommPth.h:381
MessageQueueElement * getHead()
getter of head
Definition: paraCommPth.h:433
MessageQueueElement * tail
tail of the message queue
void enqueue(ConditionVariable *sentMsg, bool *sentMessage, MessageQueueElement *newElement)
enqueue a message
Definition: paraCommPth.h:407
MessageQueueElement * checkElement(int source, int datatypeId, int tag)
check if the specified message exists or nor
Definition: paraCommPth.h:272
~MessageQueueTableElement()
destructor of MessageQueueTableElement
Definition: paraCommPth.h:255
MessageQueueTableElement()
default constructor of MessageQueueTableElement
Definition: paraCommPth.h:245
void waitMessage(ConditionVariable *sentMsg, bool *sentMessage)
wait for a message coming to a queue
Definition: paraCommPth.h:467
MessageQueueElement * head
head of the message queue
MessageQueueElement * checkElementWithTag(int tag)
check if the specified message with tag exists or nor
Definition: paraCommPth.h:298
void waitMessage(ConditionVariable *sentMsg, bool *sentMessage, int source, int *tag)
wait for a specified message coming to a queue
Definition: paraCommPth.h:510
bool isEmpty()
check if the queue is empty or not
Definition: paraCommPth.h:453
Communicator object for pthreads thread communications.
Definition: paraCommPth.h:706
int comSize
communicator size : number of threads joined in this system
Definition: paraCommPth.h:708
bool probe(int *source, int *tag)
probe function which waits a new message
int send(void *bufer, int count, const int datatypeId, int dest, const int tag)
send function for standard ParaData types
void unlockRank()
unlock rank
Definition: paraCommPth.h:1006
int ** token
index 0: token index 1: token color -1: green > 0: yellow ( termination origin solver number ) -2: re...
Definition: paraCommPth.h:710
bool iProbe(int *source, int *tag)
iProbe function which checks if a new message is arrived or not
Lock * queueLock
Lock for synchronization.
Definition: paraCommPth.h:723
void solverInit(ParaParamSet *paraParamSet)
initializer for Solvers
Definition: paraCommPth.h:877
int receive(void *bufer, int count, const int datatypeId, int source, const int tag)
receive function for standard ParaData types
void freeMem(void *buffer, int count, const int datatypeId)
free memory
std::ostream * getOstream()
get ostream pointer
int getSize()
get size of this communicator, which indicates how many threads in a UG process
Definition: paraCommPth.h:837
virtual bool tagStringTableIsSetUpCoorectly()
check if tag string table (for debugging) set up correctly
bool tagTraceFlag
indicate if tags are traced or not
Definition: paraCommPth.h:709
void * allocateMemAndCopy(const void *buffer, int count, const int datatypeId)
allocate memory and copy message
int uTypeSend(void *bufer, const int datatypeId, int dest, int tag)
User type send for created data type.
bool waitTerminatedMessage()
function to wait Terminated message (This function is not used currently)
Definition: paraCommPth.h:920
void lockApp()
lock UG application to synchronize with other threads
Definition: paraCommPth.h:979
virtual const char * getTagString(int tag)
get Tag string for debugging
ParaSysTimer timer
system timer
Definition: paraCommPth.h:715
int uTypeReceive(void **bufer, const int datatypeId, int source, int tag)
User type receive for created data type.
void passToken(int rank)
pass token to from the rank to the next
Lock * tokenAccessLock
lock to access token
Definition: paraCommPth.h:726
void registedAllSolvers()
notify that all solvers are registered
MessageQueueTableElement ** messageQueueTable
message queue table
Definition: paraCommPth.h:721
unsigned int hashCode(pthread_t tid)
get hash code from thread id
static const char * tagStringTable[]
tag name string table
Definition: paraCommPth.h:716
void waitUntilRegistered()
wait until thread id is registered to thread table
virtual ~ParaCommPth()
destructor of this communicator
void lockApp(char const *f, int l)
lock UG application to synchronize with other threads (for debugging)
Definition: paraCommPth.h:1016
static ThreadsTableElement * threadsTable[HashTableSize]
threads table: index is thread rank
Definition: paraCommPth.h:717
double getStartTime()
get start time of this communicator (should not be used)
Definition: paraCommPth.h:818
int getRank()
get rank of caller's thread
virtual void init(int argc, char **argv)
initializer of this communicator
Definition: paraCommPth.cpp:85
void unlockApp()
unlock UG application to synchronize with other threads
Definition: paraCommPth.h:988
Lock rankLock
lock to access rank
Definition: paraCommPth.h:728
int getNumOfMessagesWaitingToSend(int dest=-1)
get size of the messageQueueTable
Definition: paraCommPth.h:848
ParaTimer * createParaTimer()
create ParaTimer object
Definition: paraCommPth.h:1054
void solverDel(int rank)
delete Solver from this communicator
static __thread int localRank
local thread rank
Definition: paraCommPth.h:719
bool freeStandardTypes(MessageQueueElement *elem)
free memory
void lockRank()
lock rank
Definition: paraCommPth.h:997
void abort()
abort. How it works sometimes depends on communicator used
Definition: paraCommPth.h:909
bool * sentMessage
sent message flag for synchronization
Definition: paraCommPth.h:722
void copy(void *dest, const void *src, int count, int datatypeId)
copy message
ConditionVariable * sentMsg
Condition variable for synchronization.
Definition: paraCommPth.h:724
void solverReInit(int rank, ParaParamSet *paraParamSet)
reinitializer of a specific Solver
bool passTermToken(int rank)
pass termination token from the rank to the next
void lockRank(char const *f, int l)
lock rank (for debugging)
Definition: paraCommPth.h:1028
void setToken(int rank, int *inToken)
set received token to this communicator
ParaCommPth()
constructor of ParaCommPth
Definition: paraCommPth.h:786
void unlockRank(char const *f, int l)
unlock rank (for debugging)
Definition: paraCommPth.h:1041
int bcast(void *buffer, int count, const int datatypeId, int root)
broadcast function for standard ParaData types
void waitSpecTagFromSpecSource(const int source, const int tag, int *receivedTag)
wait function for a specific tag from a specific source coming from
void lcInit(ParaParamSet *paraParamSet)
initializer for LoadCoordinator
Lock applicationLock
lock for application
Definition: paraCommPth.h:727
bool waitToken(int rank)
wait token when UG runs with deterministic mode
Base class of communicator object.
Definition: paraComm.h:102
class ParaParamSet
Definition: paraParamSet.h:850
Class ParaSysTimer.
Definition: paraSysTimer.h:107
double getStartTime(void)
get start time
class ParaTimerMpi (Timer used in thread communication)
Definition: paraTimerTh.h:50
class ParaTimer
Definition: paraTimer.h:49
Class of ThreadsTableElement.
void setRank(int r)
setter of this thread rank
Definition: paraCommPth.h:656
ThreadsTableElement * next
next ThradsTableElement pointer
Definition: paraCommPth.h:566
std::ostream * getOstream()
getter of tag trace stream of this rank
Definition: paraCommPth.h:667
~ThreadsTableElement()
destructor of ThreadsTableElement
Definition: paraCommPth.h:617
std::ostream * tos
tag trace stream for this thread
int rank
rank of this thread
int getRank()
getter of this thread rank
Definition: paraCommPth.h:647
ThreadsTableElement(int inRank, ParaParamSet *paraParamSet)
constructor of ThreadsTableElement
Definition: paraCommPth.h:585
ThreadsTableElement * getNext()
get next element
Definition: paraCommPth.h:677
ThreadsTableElement()
default constructor of ThreadsTableElement
Definition: paraCommPth.h:573
static ScipParaParamSet * paraParamSet
Definition: fscip.cpp:74
static const int TYPE_LAST
Definition: paraComm.h:80
static const int UG_USER_TYPE_LAST
static const int ParaTaskType
static const int ParaInstanceType
Definition: paraCommCPP11.h:98
static const int ParaParamSetType
static const int HashTableSize
size of thread table : this limits the number of threads
Definition: paraCommPth.h:95
static const int ParaSolverStateType
static const int ParaCalculationStateType
static const int TagAny
Definition: paraTagDef.h:44
static const int ParaSolverTerminationStateType
static const int TagTraceFileName
Definition: paraParamSet.h:126
static const int UG_USER_TYPE_FIRST
user defined transfer data types
Definition: paraCommCPP11.h:97
static const int ParaSolutionType
Definition: paraCommCPP11.h:99
static const int ParaRacingRampUpParamType
static const int TagTrace
Definition: paraParamSet.h:72
Base class of communicator for UG Framework.
Defines for UG Framework.
Pthread condition variable for UG Framework.
#define CONDITIONVARIABLE_WAIT(var, cond)
Perform exception-safe waiting on a condition variable. This macro waits on VAR until COND is true.
Pthread lock for UG Framework.
System timer.
ParaTimer extension for threads.