Scippy

UG

Ubiquity Generator framework

paraCommCPP11.h
Go to the documentation of this file.
1/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2/* */
3/* This file is part of the program and software framework */
4/* UG --- Ubquity Generator Framework */
5/* */
6/* Copyright Written by Yuji Shinano <shinano@zib.de>, */
7/* Copyright (C) 2021-2024 by Zuse Institute Berlin, */
8/* licensed under LGPL version 3 or later. */
9/* Commercial licenses are available through <licenses@zib.de> */
10/* */
11/* This code is free software; you can redistribute it and/or */
12/* modify it under the terms of the GNU Lesser General Public License */
13/* as published by the Free Software Foundation; either version 3 */
14/* of the License, or (at your option) any later version. */
15/* */
16/* This program is distributed in the hope that it will be useful, */
17/* but WITHOUT ANY WARRANTY; without even the implied warranty of */
18/* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */
19/* GNU Lesser General Public License for more details. */
20/* */
21/* You should have received a copy of the GNU Lesser General Public License */
22/* along with this program. If not, see <http://www.gnu.org/licenses/>. */
23/* */
24/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
25
26/**@file paraCommCPP11.h
27 * @brief ParaComm extension for C++11 thread communication
28 * @author Yuji Shinano
29 *
30 *
31 *
32 */
33
34/*---+----1----+----2----+----3----+----4----+----5----+----6----+----7----+----8----+----9----+----0----+----1----+----2*/
35
36
37#ifndef __PARA_COMM_CPP11_H__
38#define __PARA_COMM_CPP11_H__
39
40#include <thread>
41#include <mutex>
42#include <condition_variable>
43#include <stdexcept>
44#include <iostream>
45#include <ostream>
46#include <fstream>
47#include <sstream>
48#include <string>
49#include <iomanip>
50#include <cassert>
51#include "paraDef.h"
52#include "paraComm.h"
53#include "paraSysTimer.h"
54#include "paraParamSetTh.h"
55#include "paraTimerTh.h"
56
57
58#define TAG_TRACE( call, fromTo, sourceDest, tag ) \
59 if( tagTraceFlag ) \
60 { \
61 if( getOstream() ) \
62 { \
63 if( tag >= 0 ) \
64 { \
65 /* std::cout << " call = " << #call << ", Rank = " << getRank() << ", tag = " << tag << ", " << tagStringTable[tag] << std::endl; */ \
66 *getOstream() << timer.getRTimeInterval() << " [Rank = " << getRank() << "] " << #call << " " << #fromTo \
67 << " " << sourceDest << " with Tag = " << getTagString(tag) << std::endl; \
68 } \
69 else \
70 { \
71 /* std::cout << " call = " << #call << ", Rank = " << getRank() << ", tag = " << tag << std::endl; */ \
72 *getOstream() << timer.getRTimeInterval() << " [Rank = " << getRank() << "] " << #call << " " << #fromTo \
73 << " " << sourceDest << " as broadcast" << std::endl; \
74 } \
75 } \
76 else \
77 { \
78 if( tag >= 0 ) \
79 { \
80 std::cout << "### Before creating ostream, Rank = " << getRank() << " issues TAG_TRACE with " << #call << " " << #fromTo \
81 << " " << sourceDest << " with Tag = " << getTagString(tag) << std::endl; \
82 } \
83 else \
84 { \
85 std::cout << "### Before creating ostream, Rank = " << getRank() << " issues TAG_TRACE with " << #call << " " << #fromTo \
86 << " " << sourceDest << " as broadcast" << std::endl; \
87 } \
88 } \
89}
90
91namespace UG
92{
93
94///
95/// user defined transfer data types
96///
97static const int UG_USER_TYPE_FIRST = TYPE_LAST + 1;
98static const int ParaInstanceType = UG_USER_TYPE_FIRST + 0;
99static const int ParaSolutionType = UG_USER_TYPE_FIRST + 1;
101static const int ParaTaskType = UG_USER_TYPE_FIRST + 3;
107
108static const int ThreadTableSize = 751; ///< size of thread table : this limits the number of threads
109
110///
111/// Class for message queue element
112///
113/// NOTE : For basic data types, this is copy of sender side memory.
114/// When the memory is copied at receive function, the memory
115/// have to be freed.
116/// For user defined data type, this is the receiver side memory,
117/// because it is better to allocate memory in the sender side for
118/// mutex locking. Sender side functions have to allocate memory.
119/// In this case, memory do not have to be freed. The memory is for
120/// receiver side.
121///
123{
124
125 int source; ///< source thread rank of this message
126 int count; ///< number of the data type elements
127 int dataTypeId; ///< data type id
128 int tag; ///< tag of the message, -1 : in case of broadcast message
129 void *data; ///< data of the message
130 MessageQueueElement *next; ///< point to next message queue element
131
132public:
133
134 ///
135 /// default constructor of MessageQueueElement
136 ///
138 )
139 : source(-1),
140 count(0),
141 dataTypeId(-1),
142 tag(-1),
143 data(0),
144 next(0)
145 {
146 }
147
148 ///
149 /// constructor of MessageQueueElement
150 ///
152 int inSource, ///< source thread rank of this message
153 int inCount, ///< number of the data type elements
154 int inDataTypeId, ///< data type id
155 int inTag, ///< tag of the message, -1 : in case of broadcast message
156 void *inData ///< data of the message
157 ) : source(inSource), count(inCount), dataTypeId(inDataTypeId), tag(inTag), data(inData), next(0)
158 {
159 }
160
161 ///
162 /// destructor of MessageQueueElement
163 ///
165 )
166 {
167 }
168
169 ///
170 /// getter of source rank
171 /// @return rank of the source
172 ///
174 )
175 {
176 return source;
177 }
178
179 ///
180 /// getter of the number of the data type elements
181 /// @return the number of the data type elements
182 ///
184 )
185 {
186 return count;
187 }
188
189 ///
190 /// getter of the data type id
191 /// @return data type id
192 ///
194 )
195 {
196 return dataTypeId;
197 }
198
199 ///
200 /// getter of the message tag
201 /// @return tag of the message
202 ///
204 )
205 {
206 return tag;
207 }
208
209 ///
210 /// getter of data
211 /// @return pointer to the data
212 ///
213 void *getData(
214 )
215 {
216 return data;
217 }
218
219 ///
220 /// getter of the pointer to the next MessageQueueElement
221 /// @return pointer to MessageQueueElement
222 ///
224 )
225 {
226 return next;
227 }
228
229 ///
230 /// link to the next MessageQueueElement
231 ///
232 void link(
233 MessageQueueElement *nextElement ///< pointer to MessageQueueElement
234 )
235 {
236 next = nextElement;
237 }
238};
239
240///
241/// Class of MessageQueueTableElement
242///
244{
245
246 MessageQueueElement *head; ///< head of the message queue
247 MessageQueueElement *tail; ///< tail of the message queue
248 int size; ///< number of the messages in queue
249
250public:
251
252 ///
253 /// default constructor of MessageQueueTableElement
254 ///
256 ) : head(0), tail(0), size(0)
257 {
258 }
259
260 ///
261 /// destructor of MessageQueueTableElement
262 ///
264 )
265 {
266 while( head )
267 {
269 delete head;
270 head = next;
271 }
272 }
273
274 ///
275 /// check if the specified message exists or nor
276 /// @return pointer to MessageQueueElement, 0: no element
277 ///
279 int source, ///< source rank
280 int datatypeId, ///< data type id
281 int tag ///< tag of the message
282 )
283 {
284 MessageQueueElement *ret = 0;
285 for( MessageQueueElement *current = head; current; current = current->getNext() )
286 {
287 if( current->getSource() == source
288 && current->getDataTypeId() == datatypeId
289 && current->getTag() == tag )
290 {
291 ret = current;
292 break;
293 }
294 }
295 return ret;
296 }
297
298 ///
299 /// check if the specified message with tag exists or nor
300 /// @return pointer to MessageQueueElement, 0: no element
301 ///
303 int tag ///< tag of the message
304 )
305 {
306 MessageQueueElement *ret = 0;
307 for( MessageQueueElement *current = head; current; current = current->getNext() )
308 {
309 if( current->getTag() == tag )
310 {
311 ret = current;
312 break;
313 }
314 }
315 return ret;
316 }
317
318 ///
319 /// extracts a message
320 /// @return pointer to the message
321 ///
323 bool *sentMessage, ///< for synchronization
324 int source, ///< source rank
325 int datatypeId, ///< data type id
326 int tag ///< tag of the message
327 )
328 {
329 MessageQueueElement *ret = 0;
330
332 MessageQueueElement *current = head;
333 while( current )
334 {
335 MessageQueueElement *next = current->getNext();
336 if( current->getSource() == source
337 && current->getDataTypeId() == datatypeId
338 && current->getTag() == tag )
339 {
340 ret = current;
341 ret->link(0);
342 if( current == head )
343 {
344 if( current == tail )
345 {
346 head = 0;
347 tail = 0;
348 *sentMessage = false;
349 }
350 else
351 {
352 head=next;
353 }
354 }
355 else
356 {
357 if( current == tail )
358 {
359 tail = prev;
360 }
361 prev->link(next);
362 }
363 size--;
364 break;
365 }
366 else
367 {
368 prev = current;
369 current = next;
370 }
371 }
372 return ret;
373 }
374
375 ///
376 /// extracts a message
377 /// (This method is only for desctructor of ParaCommCPP11. No lock is necessary.)
378 ///
379 /// @return pointer to the message
380 ///
382 bool *sentMessage ///< for synchronization
383 )
384 {
385 if( head == 0 ) return 0;
386
387 MessageQueueElement *current = head;
388 MessageQueueElement *next = current->getNext();
389 current->link(0);
390 if( current == tail )
391 {
392 head = 0;
393 tail = 0;
394 *sentMessage = false;
395 }
396 else
397 {
398 head=next;
399 }
400 size--;
401 return current;
402 }
403
404 ///
405 /// enqueue a message
406 ///
408 std::condition_variable& sentMsg, ///< condition variable for synchronization
409 std::mutex& queueLockMutex, ///< mutex for synchronization
410 bool *sentMessage, ///< flag for synchronization
411 MessageQueueElement *newElement ///< message queue element to enter
412 )
413 {
414 if( tail == 0 )
415 {
416 head = tail = newElement;
417 }
418 else
419 {
420 tail->link(newElement);
421 tail = newElement;
422 }
423 size++;
424 *sentMessage = true;
425 sentMsg.notify_one();
426 }
427
428 ///
429 /// getter of head
430 /// @return head element pointer
431 ///
433 )
434 {
435 return head;
436 }
437
438 ///
439 /// getter of size
440 /// @return size of the message queue
441 ///
443 )
444 {
445 return size;
446 }
447
448 ///
449 /// check if the queue is empty or not
450 /// @return true, if it is empty, else false
451 ///
453 )
454 {
455 bool empty = true;
456 if( head ) empty = false;
457 assert( !empty || size == 0 );
458 return empty;
459 }
460
461 ///
462 /// wait for a message coming to a queue
463 ///
465 std::condition_variable& sentMsg, ///< condition variable for synchronization
466 std::mutex& queueLockMutex, ///< mutex for synchronization
467 bool *sentMessage ///< flag for synchronization
468 )
469 {
470 std::unique_lock<std::mutex> lk(queueLockMutex);
471 sentMsg.wait(lk, [&sentMessage]{return (*sentMessage == true);});
472 }
473
474 ///
475 /// wait for a specified message coming to a queue
476 ///
478 std::condition_variable& sentMsg, ///< condition variable for synchronization
479 std::mutex& queueLockMutex, ///< mutex for synchronization
480 bool *sentMessage, ///< flag for synchronization
481 int source, ///< source rank of the message
482 int tag ///< tag of the message
483 )
484 {
485 for(;;)
486 {
487 std::unique_lock<std::mutex> lk(queueLockMutex);
488 sentMsg.wait(lk, [&sentMessage]{return (*sentMessage == true);});
489 MessageQueueElement *current = head;
490 while( current )
491 {
492 MessageQueueElement *next = current->getNext();
493 if( current->getSource() == source
494 && current->getTag() == tag )
495 {
496 break;
497 }
498 current = next;
499 }
500 if( current ) break;
501 *sentMessage = false;
502 }
503 }
504
505 ///
506 /// wait for a specified message coming to a queue
507 ///
509 std::condition_variable& sentMsg, ///< condition variable for synchronization
510 std::mutex& queueLockMutex, ///< mutex for synchronization
511 bool *sentMessage, ///< flag for synchronization
512 int source, ///< source rank of the message
513 int datatypeId, ///< data type id of the message
514 int tag ///< tag of the message
515 )
516 {
517 for(;;)
518 {
519 std::unique_lock<std::mutex> lk(queueLockMutex);
520 sentMsg.wait(lk, [&sentMessage]{return (*sentMessage == true);});
521 MessageQueueElement *current = head;
522 while( current )
523 {
524 MessageQueueElement *next = current->getNext();
525 if( current->getSource() == source
526 && current->getDataTypeId() == datatypeId
527 && current->getTag() == tag )
528 {
529 break;
530 }
531 current = next;
532 }
533 if( current ) break;
534 *sentMessage = false;
535 }
536 }
537
538 ///
539 /// wait for a specified message coming to a queue
540 ///
542 std::condition_variable& sentMsg, ///< condition variable for synchronization
543 std::mutex& queueLockMutex, ///< mutex for synchronization
544 bool *sentMessage, ///< flag for synchronization
545 int source, ///< source rank of the message
546 int *tag ///< tag of the message
547 )
548 {
549 for(;;)
550 {
551 std::unique_lock<std::mutex> lk(queueLockMutex);
552 sentMsg.wait(lk, [&sentMessage]{return (*sentMessage == true);});
553 MessageQueueElement *current = head;
554 if( *tag == TagAny )
555 {
556 while( current )
557 {
558 MessageQueueElement *next = current->getNext();
559 if( current->getSource() == source )
560 {
561 *tag = current->getTag();
562 break;
563 }
564 current = next;
565 }
566 }
567 else
568 {
569 while( current )
570 {
571 MessageQueueElement *next = current->getNext();
572 if( current->getSource() == source
573 && current->getTag() == (*tag) )
574 {
575 break;
576 }
577 current = next;
578 }
579 }
580 if( current ) break;
581 *sentMessage = false;
582 }
583 }
584
585};
586
587///
588/// Class of ThreadsTableElement
589///
591{
592
593 int rank; ///< rank of this thread
594 std::ostream *tos; ///< tag trace stream for this thread
595
596public:
597
598 ///
599 /// default constructor of ThreadsTableElement
600 ///
602 )
603 : rank(0),
604 tos(0)
605 {
606 }
607
608 ///
609 /// constructor of ThreadsTableElement
610 ///
612 int inRank, ///< rank of this thread
613 ParaParamSet *paraParamSet ///< UG parameter set
614 )
615 : rank(inRank),
616 tos(0)
617 {
618 if( paraParamSet->getBoolParamValue(TagTrace) )
619 {
620 if( paraParamSet->isStringParamDefaultValue(TagTraceFileName) )
621 {
622 tos = &std::cout;
623 }
624 else
625 {
626 std::ostringstream s;
627 s << paraParamSet->getStringParamValue(TagTraceFileName) << inRank;
628 std::ofstream *ofs = new std::ofstream();
629 ofs->open(s.str().c_str());
630 tos = ofs;
631 }
632 }
633 }
634
635 ///
636 /// destructor of ThreadsTableElement
637 ///
639 )
640 {
641 if( tos )
642 {
643 if( tos != &std::cout )
644 {
645 std::ofstream *ofs = dynamic_cast<std::ofstream *>(tos);
646 ofs->close();
647 delete ofs;
648 }
649 }
650 }
651
652 ///
653 /// getter of this thread rank
654 /// @return the thread rank
655 ///
657 )
658 {
659 return rank;
660 }
661
662 ///
663 /// setter of this thread rank
664 ///
666 int r ///< rank to be set
667 )
668 {
669 rank = r;
670 }
671
672 ///
673 /// getter of tag trace stream of this rank
674 /// @return ponter to the tag trace stream
675 ///
676 std::ostream *getOstream(
677 )
678 {
679 return tos;
680 }
681};
682
683class ParaCalculationState;
684class ParaTask;
685class ParaSolverState;
686class ParaSolverTerminationState;
687class ParaDiffSubproblem;
688class ParaInstance;
689class ParaSolution;
690
691///
692/// Communicator object for C++11 thread communications
693///
695{
696protected:
697 int comSize; ///< communicator size : number of threads joined in this system
698 bool tagTraceFlag; ///< indicate if tags are traced or not
699 int **token; ///< index 0: token
700 ///< index 1: token color
701 ///< -1: green
702 ///< > 0: yellow ( termination origin solver number )
703 ///< -2: red ( means the solver can terminate )
704 ParaSysTimer timer; ///< system timer
705 static const char *tagStringTable[]; ///< tag name string table
706 static ThreadsTableElement *threadsTable[ThreadTableSize]; ///< threads table: index is thread rank
707 static thread_local int localRank; ///< local thread rank
708 MessageQueueTableElement **messageQueueTable; ///< message queue table
709 bool *sentMessage; ///< sent message flag for synchronization
710 std::mutex *queueLockMutex; ///< mutex for synchronization
711 std::condition_variable *sentMsg; ///< condition variable for synchronization
712
713 std::mutex *tokenAccessLockMutex; ///< mutex to access token
714 std::mutex rankLockMutex; ///< mutex to access rank
715 std::mutex applicationLockMutex; ///< mutex for applications
716
717 ///
718 /// allocate memory and copy message
719 ///
720 void *allocateMemAndCopy(
721 const void* buffer, ///< pointer to buffer of the message
722 int count, ///< the number of data element in the message
723 const int datatypeId ///< data type of each element in the message
724 );
725
726 ///
727 /// copy message
728 ///
729 void copy(
730 void *dest, ///< destination to copy the data
731 const void *src, ///< source of the data
732 int count, ///< the number of data element
733 int datatypeId ///< data type of each element
734 );
735
736 ///
737 /// free memory
738 ///
739 void freeMem(
740 void* buffer, ///< pointer to buffer of the message
741 int count, ///< the number of data element
742 const int datatypeId ///< data type of each element
743 );
744
745 ///
746 /// free memory
747 /// @return
748 ///
750 MessageQueueElement *elem ///< pointer to a message queue element
751 );
752
753 ///
754 /// check if tag string table (for debugging) set up correctly
755 /// @return true if tag string table is set up correctly, false otherwise
756 ///
758 );
759
760public:
761
762 // using UG::ParaComm::lockApp;
763 // using UG::ParaComm::unlockApp;
764
765 ///
766 /// constructor of ParaComCPP11
767 ///
769 )
770 : comSize(-1),
771 tagTraceFlag(false),
772 token(0),
774 sentMessage(0),
776 sentMsg(0),
778 {
779 }
780
781 ///
782 /// destructor of this communicator
783 ///
784 virtual ~ParaCommCPP11(
785 );
786
787 ///
788 /// initializer of this communicator
789 ///
790 virtual void init(
791 int argc, ///< the number of arguments
792 char **argv ///< pointers to the arguments
793 );
794
795 ///
796 // set local rank in case of using a communicator for shared memory
797 //
798 virtual void setLocalRank(
799 int inRank
800 )
801 {
802 localRank = inRank;
803 }
804
805 ///
806 /// get start time of this communicator
807 /// (should not be used)
808 /// @return start time
809 ///
811 )
812 {
813 return timer.getStartTime();
814 }
815
816 ///
817 /// get rank of caller's thread
818 /// @return rank of caller's thread
819 ///
820 int getRank(
821 );
822
823 ///
824 /// get size of this communicator, which indicates how many threads in a UG process
825 /// @return the number of threads
826 ///
828 )
829 {
830 return comSize;
831 }
832
833 ///
834 /// get size of the messageQueueTable
835 /// @return if dest >= 0 then return the number of only messages waiting to send to dest,
836 /// othrewise return the number of all messages waiting to send.
837 ///
839 int dest=-1
840 )
841 {
842 if( dest >= 0 )
843 {
844 return messageQueueTable[dest]->getSize();
845 }
846 else
847 {
848 int nTotalMessages = 0;
849 for( int i = 0; i < ( comSize + 1 ); i++ )
850 {
851 nTotalMessages += messageQueueTable[i]->getSize();
852 }
853 return nTotalMessages;
854 }
855 }
856
857 ///
858 /// initializer for LoadCoordinator
859 ///
860 virtual void lcInit(
861 ParaParamSet *paraParamSet ///< UG parameter set
862 );
863
864 ///
865 /// initializer for Solvers
866 ///
867 virtual void solverInit(
868 ParaParamSet *paraParamSet ///< UG parameter set
869 )
870 {
871 }
872
873 ///
874 /// initializer for a specific Solver
875 ///
876 virtual void solverInit(
877 int rank, ///< rank of the Solver
878 ParaParamSet *paraParamSet ///< UG parameter set
879 );
880
881 ///
882 /// reinitializer of a specific Solver
883 ///
884 virtual void solverReInit(
885 int rank, ///< rank of the Solver
886 ParaParamSet *paraParamSet ///< UG parameter set
887 );
888
889 ///
890 /// delete Solver from this communicator
891 ///
892 virtual void solverDel(
893 int rank ///< rank of the Solver
894 );
895
896 ///
897 /// abort. How it works sometimes depends on communicator used
898 ///
899 void abort(
900 )
901 {
902 std::abort();
903 }
904
905 ///
906 /// function to wait Terminated message
907 /// (This function is not used currently)
908 /// @return true when MPI communication is used, false when thread communication used
909 ///
911 )
912 {
913 return false;
914 }
915
916 ///
917 /// wait token when UG runs with deterministic mode
918 /// @return true, when token is arrived to the rank
919 ///
920 virtual bool waitToken(
921 int rank ///< rank to check if token is arrived
922 );
923
924 ///
925 /// pass token to from the rank to the next
926 ///
927 virtual void passToken(
928 int rank ///< from this rank, the token is passed
929 );
930
931 ///
932 /// pass termination token from the rank to the next
933 /// @return true, when the termination token is passed from this rank, false otherwise
934 ///
935 virtual bool passTermToken(
936 int rank ///< from this rank, the termination token is passed
937 );
938
939 ///
940 /// set received token to this communicator
941 ///
942 virtual void setToken(
943 int rank, ///< rank to set the token
944 int *inToken ///< token to be set
945 );
946
947 ///
948 /// get ostream pointer
949 /// @return pointer to ostram
950 ///
951 std::ostream *getOstream(
952 );
953
954 ///
955 /// lock rank
956 ///
957 void lockRank()
958 {
959 rankLockMutex.lock();
960 }
961
962 ///
963 /// unlock rank
964 ///
966 {
967 rankLockMutex.unlock();
968 }
969
970 ///
971 /// lock UG application to synchronize with other threads
972 ///
973 virtual void lockApp(
974 )
975 {
977 // std::cout << "### Rank" << this->getRank() << ":AppLocked!" << std::endl;
978 }
979
980 ///
981 /// lock UG application to synchronize with other threads
982 /// (for debug)
983 ///
984 virtual void lockApp(
985 char const *f, ///< string to indicate what is locked
986 int l ///< a number to show something
987 )
988 {
990 std::cout << "### Rank" << this->getRank() << ":AppLocked! [" << f << ", " << l << "]" << std::endl;
991 }
992
993 ///
994 /// unlock UG application to synchronize with other threads
995 ///
996 virtual void unlockApp(
997 )
998 {
999 applicationLockMutex.unlock();
1000 // std::cout << "### Rank" << this->getRank() << ":AppUnlocked!" << std::endl;
1001 }
1002
1003 ///
1004 /// unlock UG application to synchronize with other threads
1005 /// (for debug)
1006 ///
1007 virtual void unlockApp(
1008 char const *f, ///< string to indicate what is locked
1009 int l ///< a number to show something
1010 )
1011 {
1012 applicationLockMutex.unlock();
1013 std::cout << "### Rank" << this->getRank() << ":AppUnlocked! [" << f << ", " << l << "]" << std::endl;
1014 }
1015
1016 ///
1017 /// unlock rank
1018 /// (for debugging)
1019 ///
1021 char const *f, ///< string to indicate what is locked
1022 int l ///< a number to show something
1023 )
1024 {
1025 rankLockMutex.unlock();
1026 std::cout << "Rank Unlocked: " << f << ", " << l << std::endl;
1027 }
1028
1029 ///
1030 /// create ParaTimer object
1031 /// @return pointer to ParaTimer object
1032 ///
1034 )
1035 {
1036 return new ParaTimerTh();
1037 }
1038
1039 ///
1040 /// broadcast function for standard ParaData types
1041 /// @return always 0 (for future extensions)
1042 ///
1043 int bcast(
1044 void* buffer, ///< point to the head of sending message
1045 int count, ///< the number of data in the message
1046 const int datatypeId, ///< data type in the message
1047 int root ///< root rank for broadcasting
1048 );
1049
1050 ///
1051 /// send function for standard ParaData types
1052 /// @return always 0 (for future extensions)
1053 ///
1054 int send(
1055 void* bufer, ///< point to the head of sending message
1056 int count, ///< the number of data in the message
1057 const int datatypeId, ///< data type in the message
1058 int dest, ///< destination to send the message
1059 const int tag ///< tag of this message
1060 );
1061
1062 ///
1063 /// receive function for standard ParaData types
1064 /// @return always 0 (for future extensions)
1065 ///
1066 int receive(
1067 void* bufer, ///< point to the head of receiving message
1068 int count, ///< the number of data in the message
1069 const int datatypeId, ///< data type in the message
1070 int source, ///< source of the message coming from
1071 const int tag ///< tag of the message
1072 );
1073
1074 ///
1075 /// wait function for a specific tag from a specific source coming from
1076 ///
1078 const int source, ///< source rank which the message should come from
1079 const int tag, ///< tag which the message should wait
1080 int *receivedTag ///< tag of the message which is arrived
1081 );
1082
1083 ///
1084 /// probe function which waits a new message
1085 /// @return always true
1086 ///
1087 bool probe(
1088 int *source, ///< source rank of the message arrived
1089 int *tag ///< tag of the message arrived
1090 );
1091
1092 ///
1093 /// iProbe function which checks if a new message is arrived or not
1094 /// @return true when a new message exists
1095 ///
1096 bool iProbe(
1097 int *source, ///< source rank of the message arrived
1098 int *tag ///< tag of the message arrived
1099 );
1100
1101 ///
1102 /// User type send for created data type
1103 /// @return always 0 (for future extensions)
1104 ///
1105 int uTypeSend(
1106 void* bufer, ///< point to the head of sending message
1107 const int datatypeId, ///< created data type
1108 int dest, ///< destination rank
1109 int tag ///< tag of the message
1110 );
1111
1112 ///
1113 /// User type receive for created data type
1114 /// @return always 0 (for future extensions)
1115 ///
1116 int uTypeReceive(
1117 void** bufer, ///< point to the head of receiving message
1118 const int datatypeId, ///< created data type
1119 int source, ///< source rank
1120 int tag ///< tag of the message
1121 );
1122
1123 ///
1124 /// get Tag string for debugging
1125 /// @return string which shows Tag
1126 ///
1127 virtual const char *getTagString(
1128 int tag /// tag to be converted to string
1129 );
1130
1131};
1132
1133#define DEF_PARA_COMM( para_comm, comm ) ParaCommCPP11 *para_comm = dynamic_cast< ParaCommCPP11* >(comm)
1134#define LOCK_APP( comm ) comm->lockApp(__FILE__, __LINE__)
1135#define UNLOCK_APP( comm ) comm->lockApp(__FILE__, __LINE__)
1136#define LOCK_RANK( comm ) comm->lockRank(__FILE__, __LINE__)
1137#define UNLOCK_RANK( comm ) comm->unlockRank(__FILE__, __LINE__)
1138}
1139
1140#endif // __PARA_COMM_CPP11_H__
Class for message queue element.
int source
source thread rank of this message
int dataTypeId
data type id
int getSource()
getter of source rank
MessageQueueElement()
default constructor of MessageQueueElement
MessageQueueElement * next
point to next message queue element
int getDataTypeId()
getter of the data type id
void link(MessageQueueElement *nextElement)
link to the next MessageQueueElement
MessageQueueElement(int inSource, int inCount, int inDataTypeId, int inTag, void *inData)
constructor of MessageQueueElement
int tag
tag of the message, -1 : in case of broadcast message
void * getData()
getter of data
void * data
data of the message
int count
number of the data type elements
int getTag()
getter of the message tag
~MessageQueueElement()
destructor of MessageQueueElement
int getCount()
getter of the number of the data type elements
MessageQueueElement * getNext()
getter of the pointer to the next MessageQueueElement
Class of MessageQueueTableElement.
void waitMessage(std::condition_variable &sentMsg, std::mutex &queueLockMutex, bool *sentMessage, int source, int *tag)
wait for a specified message coming to a queue
void enqueue(std::condition_variable &sentMsg, std::mutex &queueLockMutex, bool *sentMessage, MessageQueueElement *newElement)
enqueue a message
void waitMessage(std::condition_variable &sentMsg, std::mutex &queueLockMutex, bool *sentMessage, int source, int datatypeId, int tag)
wait for a specified message coming to a queue
void waitMessage(std::condition_variable &sentMsg, std::mutex &queueLockMutex, bool *sentMessage, int source, int tag)
wait for a specified message coming to a queue
MessageQueueElement * extarctElement(bool *sentMessage, int source, int datatypeId, int tag)
extracts a message
int getSize()
getter of size
int size
number of the messages in queue
MessageQueueElement * extarctElement(bool *sentMessage)
extracts a message (This method is only for desctructor of ParaCommCPP11. No lock is necessary....
MessageQueueElement * getHead()
getter of head
MessageQueueElement * tail
tail of the message queue
MessageQueueElement * checkElement(int source, int datatypeId, int tag)
check if the specified message exists or nor
~MessageQueueTableElement()
destructor of MessageQueueTableElement
MessageQueueTableElement()
default constructor of MessageQueueTableElement
void waitMessage(std::condition_variable &sentMsg, std::mutex &queueLockMutex, bool *sentMessage)
wait for a message coming to a queue
MessageQueueElement * head
head of the message queue
MessageQueueElement * checkElementWithTag(int tag)
check if the specified message with tag exists or nor
bool isEmpty()
check if the queue is empty or not
Communicator object for C++11 thread communications.
virtual void solverInit(ParaParamSet *paraParamSet)
initializer for Solvers
int comSize
communicator size : number of threads joined in this system
virtual void lockApp()
lock UG application to synchronize with other threads
bool probe(int *source, int *tag)
probe function which waits a new message
int send(void *bufer, int count, const int datatypeId, int dest, const int tag)
send function for standard ParaData types
void unlockRank()
unlock rank
int ** token
index 0: token index 1: token color -1: green > 0: yellow ( termination origin solver number ) -2: re...
bool iProbe(int *source, int *tag)
iProbe function which checks if a new message is arrived or not
std::mutex rankLockMutex
mutex to access rank
virtual void unlockApp(char const *f, int l)
unlock UG application to synchronize with other threads (for debug)
int receive(void *bufer, int count, const int datatypeId, int source, const int tag)
receive function for standard ParaData types
void freeMem(void *buffer, int count, const int datatypeId)
free memory
std::ostream * getOstream()
get ostream pointer
int getSize()
get size of this communicator, which indicates how many threads in a UG process
virtual bool tagStringTableIsSetUpCoorectly()
check if tag string table (for debugging) set up correctly
bool tagTraceFlag
indicate if tags are traced or not
void * allocateMemAndCopy(const void *buffer, int count, const int datatypeId)
allocate memory and copy message
virtual void setLocalRank(int inRank)
int uTypeSend(void *bufer, const int datatypeId, int dest, int tag)
User type send for created data type.
std::mutex applicationLockMutex
mutex for applications
virtual const char * getTagString(int tag)
get Tag string for debugging
ParaSysTimer timer
system timer
int uTypeReceive(void **bufer, const int datatypeId, int source, int tag)
User type receive for created data type.
virtual void passToken(int rank)
pass token to from the rank to the next
MessageQueueTableElement ** messageQueueTable
message queue table
static const char * tagStringTable[]
tag name string table
virtual void lockApp(char const *f, int l)
lock UG application to synchronize with other threads (for debug)
double getStartTime()
get start time of this communicator (should not be used)
int getRank()
get rank of caller's thread
virtual void init(int argc, char **argv)
initializer of this communicator
int getNumOfMessagesWaitingToSend(int dest=-1)
get size of the messageQueueTable
ParaTimer * createParaTimer()
create ParaTimer object
virtual void solverDel(int rank)
delete Solver from this communicator
bool freeStandardTypes(MessageQueueElement *elem)
free memory
virtual bool waitTerminatedMessage()
function to wait Terminated message (This function is not used currently)
virtual void unlockApp()
unlock UG application to synchronize with other threads
void lockRank()
lock rank
void abort()
abort. How it works sometimes depends on communicator used
bool * sentMessage
sent message flag for synchronization
void copy(void *dest, const void *src, int count, int datatypeId)
copy message
virtual ~ParaCommCPP11()
destructor of this communicator
static thread_local int localRank
local thread rank
virtual void solverReInit(int rank, ParaParamSet *paraParamSet)
reinitializer of a specific Solver
virtual bool passTermToken(int rank)
pass termination token from the rank to the next
virtual void setToken(int rank, int *inToken)
set received token to this communicator
void unlockRank(char const *f, int l)
unlock rank (for debugging)
static ThreadsTableElement * threadsTable[ThreadTableSize]
threads table: index is thread rank
int bcast(void *buffer, int count, const int datatypeId, int root)
broadcast function for standard ParaData types
void waitSpecTagFromSpecSource(const int source, const int tag, int *receivedTag)
wait function for a specific tag from a specific source coming from
virtual void lcInit(ParaParamSet *paraParamSet)
initializer for LoadCoordinator
std::mutex * queueLockMutex
mutex for synchronization
std::mutex * tokenAccessLockMutex
mutex to access token
std::condition_variable * sentMsg
condition variable for synchronization
ParaCommCPP11()
constructor of ParaComCPP11
virtual bool waitToken(int rank)
wait token when UG runs with deterministic mode
Base class of communicator object.
Definition: paraComm.h:102
class ParaParamSet
Definition: paraParamSet.h:850
Class ParaSysTimer.
Definition: paraSysTimer.h:107
double getStartTime(void)
get start time
class ParaTimerMpi (Timer used in thread communication)
Definition: paraTimerTh.h:50
class ParaTimer
Definition: paraTimer.h:49
Class of ThreadsTableElement.
void setRank(int r)
setter of this thread rank
std::ostream * getOstream()
getter of tag trace stream of this rank
~ThreadsTableElement()
destructor of ThreadsTableElement
std::ostream * tos
tag trace stream for this thread
int rank
rank of this thread
int getRank()
getter of this thread rank
ThreadsTableElement(int inRank, ParaParamSet *paraParamSet)
constructor of ThreadsTableElement
ThreadsTableElement()
default constructor of ThreadsTableElement
static ScipParaParamSet * paraParamSet
Definition: fscip.cpp:74
static const int TYPE_LAST
Definition: paraComm.h:80
static const int UG_USER_TYPE_LAST
static const int ParaTaskType
static const int ParaInstanceType
Definition: paraCommCPP11.h:98
static const int ParaParamSetType
static const int ParaSolverStateType
static const int ParaCalculationStateType
static const int TagAny
Definition: paraTagDef.h:44
static const int ParaSolverTerminationStateType
static const int TagTraceFileName
Definition: paraParamSet.h:126
static const int UG_USER_TYPE_FIRST
user defined transfer data types
Definition: paraCommCPP11.h:97
static const int ThreadTableSize
size of thread table : this limits the number of threads
static const int ParaSolutionType
Definition: paraCommCPP11.h:99
static const int ParaRacingRampUpParamType
static const int TagTrace
Definition: paraParamSet.h:72
Base class of communicator for UG Framework.
Defines for UG Framework.
System timer.
ParaTimer extension for threads.