Scippy

UG

Ubiquity Generator framework

paraCommPth.cpp
Go to the documentation of this file.
1/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2/* */
3/* This file is part of the program and software framework */
4/* UG --- Ubquity Generator Framework */
5/* */
6/* Copyright Written by Yuji Shinano <shinano@zib.de>, */
7/* Copyright (C) 2021-2024 by Zuse Institute Berlin, */
8/* licensed under LGPL version 3 or later. */
9/* Commercial licenses are available through <licenses@zib.de> */
10/* */
11/* This code is free software; you can redistribute it and/or */
12/* modify it under the terms of the GNU Lesser General Public License */
13/* as published by the Free Software Foundation; either version 3 */
14/* of the License, or (at your option) any later version. */
15/* */
16/* This program is distributed in the hope that it will be useful, */
17/* but WITHOUT ANY WARRANTY; without even the implied warranty of */
18/* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */
19/* GNU Lesser General Public License for more details. */
20/* */
21/* You should have received a copy of the GNU Lesser General Public License */
22/* along with this program. If not, see <http://www.gnu.org/licenses/>. */
23/* */
24/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
25
26/**@file paraCommPth.cpp
27 * @brief ParaComm extension for Pthreads communication
28 * @author Yuji Shinano
29 *
30 *
31 *
32 */
33
34/*---+----1----+----2----+----3----+----4----+----5----+----6----+----7----+----8----+----9----+----0----+----1----+----2*/
35
36
37#include <cstring>
38#include <cstdlib>
39#ifndef _MSC_VER
40#include <unistd.h>
41#endif
42#include "paraCommPth.h"
43#include "paraTask.h"
44#include "paraSolution.h"
47#include "paraSolverState.h"
49#include "paraInitialStat.h"
50
51using namespace UG;
52
54ConditionVariable solverRanksGenerated(&rankGenLock);
56ParaCommPth::threadsTable[HashTableSize];
57
58#ifndef _UG_NO_THREAD_LOCAL_STATIC
59__thread int ParaCommPth::localRank = -1; /*< local thread rank */
60#endif
61
62const char *
63ParaCommPth::tagStringTable[] = {
76 TAG_STR(TagRacingRampUpParamSets),
82};
83
84void
85ParaCommPth::init( int argc, char **argv )
86{
87 // don't have to take any lock, because only LoadCoordinator call this function
88
89 timer.start();
90 comSize = 0;
91
92 for( int i = 1; i < argc; i++ )
93 {
94 if( strcmp(argv[i], "-sth") == 0 )
95 {
96 i++;
97 if( i < argc )
98 comSize = atoi(argv[i]); // if -sth 0, then it is considered as use the number of cores system has
99 else
100 {
101 std::cerr << "missing the number of solver threads after parameter '-sth'" << std::endl;
102 exit(1);
103 }
104 }
105 }
106
107 if( comSize > 0 )
108 {
109 comSize++;
110 }
111 else
112 {
113 comSize = sysconf(_SC_NPROCESSORS_CONF) + 1;
114 }
115
117 token = new int*[comSize];
118 for( int i = 0; i < comSize; i++ )
119 {
120 token[i] = new int[2];
121 token[i][0] = 0;
122 token[i][1] = -1;
123 }
124
125 /** if you add tag, you should add tagStringTale too */
126 // assert( sizeof(tagStringTable)/sizeof(char*) == N_TH_TAGS );
128
129 /** initialize hashtable */
130 for(int i = 0; i < HashTableSize; i++ )
131 {
132 threadsTable[i] = 0;
133 }
134
135 messageQueueTable = new MessageQueueTableElement *[comSize + 1]; // +1 for TimeLimitMonitor
136 sentMessage = new bool[comSize + 1];
137 queueLock = new Lock[comSize + 1];
139 for( int i = 0; i < ( comSize + 1 ); i++ )
140 {
142 sentMsg[i].setLock(&queueLock[i]);
143 sentMessage[i] = false;
144 }
145
146}
147
148void
151 )
152{
153
154 // pthread_t tid = pthread_self();
155
156 // don't have to take any lock, because only LoadCoordinator call this function
157 LOCKED (&rankGenLock ) {
158#ifndef _UG_NO_THREAD_LOCAL_STATIC
159 assert( localRank == -1 );
160 assert( threadsTable[0] == 0 );
161 localRank = 0;
163#else
164 pthread_t tid = pthread_self();
166#endif
167 }
168 tagTraceFlag = paraParamSet->getBoolParamValue(TagTrace);
169}
170
171void
173 int rank,
175 )
176{
177 // don't have to take any lock, because only LoadCoordinator call this function
178 // CHANGED in multi-threaded solver case
179 LOCKED (&rankGenLock ) {
180#ifndef _UG_NO_THREAD_LOCAL_STATIC
181 assert( localRank == -1 );
182 assert( threadsTable[rank] == 0 );
183 localRank = rank;
185 // std::cout << "tid = " << pthread_self() << " is initialized as Rank = " << localRank << std::endl;
186#else
187 pthread_t tid = pthread_self();
188 int index = HashEntry(tid);
189 if( threadsTable[index] == 0 )
190 {
191 threadsTable[index] = new ThreadsTableElement(tid, rank, paraParamSet);
192 }
193 else
194 {
195 ThreadsTableElement *elem = threadsTable[index];
196 while( elem->getNext() != 0 )
197 {
198 if( pthread_equal( elem->getTid(), tid ) )
199 {
200 THROW_LOGICAL_ERROR4("Invalid solver tid is registered. Rank = ", rank, ", tid = ", tid );
201 }
202 elem = elem->getNext();
203 }
204 elem->link(new ThreadsTableElement(tid, rank, paraParamSet));
205 }
206 // std::cout << "tid = " << tid << " for Rank = " << rank << " is added to table" << std::endl;
207#endif
208 }
209}
210
211void
213 int rank,
215 )
216{
217 // don't have to take any lock, because only LoadCoordinator call this function
218 // CHANGED in multi-threaded solver case
219 LOCKED (&rankGenLock ) {
220#ifndef _UG_NO_THREAD_LOCAL_STATIC
221 assert( localRank == -1 );
222 assert( threadsTable[rank] != 0 );
223 localRank = rank;
224 // threadsTable[localRank] = new ThreadsTableElement(localRank, paraParamSet);
225#else
226 // ****** CAUTION *******
227 // Should not use this, since no chance to release ThreadsTableElement entry.
228 // This means that new entries are always added and shuld not use.
229 // ***********************
230 THROW_LOGICAL_ERROR1("solverReInit only can work with thread local variable. The following routine should not be used!");
231 pthread_t tid = pthread_self();
232 int index = HashEntry(tid);
233 if( threadsTable[index] == 0 )
234 {
235 threadsTable[index] = new ThreadsTableElement(tid, rank, paraParamSet);
236 }
237 else
238 {
239 ThreadsTableElement *elem = threadsTable[index];
240 while( elem != 0 )
241 {
242 if( pthread_equal( elem->getTid(), tid ) )
243 {
244 elem->setRank(rank);
245 break;
246 }
247 elem = elem->getNext();
248 }
249 if( elem == 0 )
250 {
251 THROW_LOGICAL_ERROR4("Invalid solver tid is reInit. Rank = ", rank, ", tid = ", tid );
252 }
253 }
254 // std::cout << "tid = " << tid << " for Rank = " << rank << " is added to table" << std::endl;
255#endif
256 }
257
258}
259
260void
262 int rank
263 )
264{
265 LOCKED (&rankGenLock ) {
266#ifndef _UG_NO_THREAD_LOCAL_STATIC
267 assert(rank == localRank);
268 if( threadsTable[rank] == 0 )
269 {
270 THROW_LOGICAL_ERROR2("Invalid remove thread. Rank = ", rank);
271 }
272 else
273 {
274 ThreadsTableElement *elem = threadsTable[rank];
275 delete elem;
276 threadsTable[rank] = 0;
277 localRank = -1;
278 }
279#else
280 pthread_t tid = pthread_self();
281 int index = HashEntry(tid);
282 if( threadsTable[index] == 0 )
283 {
284 THROW_LOGICAL_ERROR4("Invalid remove thread. Rank = ", rank, ", tid = ", tid );
285 }
286 else
287 {
288 ThreadsTableElement *elem = threadsTable[index];
289 ThreadsTableElement *pre = elem;
290 while( elem && !pthread_equal( elem->getTid(), tid ) )
291 {
292 pre = elem;
293 elem = elem->getNext();
294 }
295 if( !elem || !pthread_equal( elem->getTid(), tid ) )
296 {
297 THROW_LOGICAL_ERROR4("Invalid remove thread. Rank = ", rank, ", tid = ", tid );
298 }
299 if( elem == threadsTable[index] )
300 {
301 threadsTable[index] = elem->getNext();
302 }
303 else
304 {
305 pre->link(elem->getNext());
306 }
307 delete elem;
308 }
309 // std::cout << "tid = " << tid << " is deleleted from table" << std::endl;
310#endif
311 }
312}
313
314void
316 )
317{
318#ifdef _UG_NO_THREAD_LOCAL_STATIC
319 pthread_t tid = pthread_self();
320#endif
321
322 bool registered = false;
324 while( !registered )
325 {
326#ifndef _UG_NO_THREAD_LOCAL_STATIC
327 if( threadsTable[localRank] != 0 )
328 {
329 registered = true;
330 break;
331 }
332#else
334 while( elem && !pthread_equal( elem->getTid(), tid ) )
335 {
336 elem = elem->getNext();
337 }
338 if( elem )
339 {
340 assert( pthread_equal( elem->getTid(), tid ) );
341 // std::cout << "tid = " << tid << ", hash = " << HashEntry(tid) << std::endl;
342 registered = true;
343 break;
344 }
345#endif
346 solverRanksGenerated.wait();
347 }
348 }
349}
350
351void
353 )
354{
355 solverRanksGenerated.broadcast();
356}
357
358bool
360 int rank
361 )
362{
363 // int rank = getRank(); // multi-thread solver may change rank here
365 if( token[rank][0] == rank )
366 {
367 return true;
368 }
369 else
370 {
371 int receivedTag;
372 int source;
373 probe(&source, &receivedTag);
374 TAG_TRACE (Probe, From, source, receivedTag);
375 if( source == 0 && receivedTag == TagToken )
376 {
377 receive(token[rank], 2, ParaINT, 0, TagToken);
378 assert( token[rank][0] == rank );
379 return true;
380 }
381 else
382 {
383 return false;
384 }
385 }
386}
387
388void
390 int rank
391 )
392{
393 // int rank = getRank(); // multi-thread solver may change rank here
395 assert( token[rank][0] == rank && rank != 0 );
396 token[rank][0] = ( token[rank][0] % (comSize - 1) ) + 1;
397 token[rank][1] = -1;
398 send(token[rank], 2, ParaINT, 0, TagToken);
399}
400
401bool
403 int rank
404 )
405{
406 // int rank = getRank(); // multi-thread solver may change rank here
408 if( rank == token[rank][0] )
409 {
410 if( token[rank][1] == token[rank][0] ) token[rank][1] = -2;
411 else if( token[rank][1] == -1 ) token[rank][1] = token[rank][0];
412 token[rank][0] = ( token[rank][0] % (comSize - 1) ) + 1;
413 }
414 else
415 {
416 THROW_LOGICAL_ERROR4("Invalid token update. Rank = ", getRank(), ", token = ", token[0] );
417 }
418 send(token[rank], 2, ParaINT, 0, TagToken);
419 if( token[rank][1] == -2 )
420 {
421 return true;
422 }
423 else
424 {
425 return false;
426 }
427}
428
429void
431 int rank,
432 int *inToken
433 )
434{
435 // int rank = getRank();
437 assert( rank == 0 || ( rank != 0 && inToken[0] == rank ) );
438 token[rank][0] = inToken[0];
439 token[rank][1] = inToken[1];
440}
441
443{
444 LOCK_RAII(&rankGenLock); // rankGenLock is not good name
445 for(int i = 0; i < HashTableSize; i++ )
446 {
447 if( threadsTable[i] )
448 {
449 while( threadsTable[i] )
450 {
452 delete threadsTable[i];
453 threadsTable[i] = next;
454 }
455 }
456 }
457
458 for( int i = 0; i < comSize; i++ )
459 {
460 delete [] token[i];
461 }
462 delete [] token;
463 delete [] tokenAccessLock;
464
465 for(int i = 0; i < (comSize + 1); i++)
466 {
468 while( elem )
469 {
470 if( elem->getData() )
471 {
472 if( !freeStandardTypes(elem) )
473 {
474 ABORT_LOGICAL_ERROR2("Requested type is not implemented. Type = ", elem->getDataTypeId() );
475 }
476 }
477 delete elem;
479 }
480 delete messageQueueTable[i];
481 }
482 delete [] messageQueueTable;
483
484 if( sentMessage ) delete [] sentMessage;
485 if( queueLock ) delete [] queueLock;
486 if( sentMsg ) delete [] sentMsg;
487
488}
489
490unsigned int
492 pthread_t tid
493 )
494{
495 union {
496 pthread_t tid;
497 unsigned char cTid[sizeof(pthread_t)];
498 } reinterpret;
499
500 reinterpret.tid = tid;
501 unsigned int h = 0;
502 for (unsigned int i = 0; i < sizeof(pthread_t); i++) {
503 h = 31*h + reinterpret.cTid[i];
504 }
505 return h;
506}
507
508int
510 )
511{
513#ifndef _UG_NO_THREAD_LOCAL_STATIC
514 if( localRank >= 0 ) return localRank;
515 else return -1;
516#else
517 pthread_t tid = pthread_self();
518 // std::cout << "gerRank tid = " << tid << std::endl;
520 while( elem && !pthread_equal( elem->getTid(), tid ) )
521 {
522 elem = elem->getNext();
523 }
524 if( elem )
525 {
526 return elem->getRank();
527 }
528 else
529 {
530 return -1; // No ug threads
531 }
532#endif
533}
534
535std::ostream *
537 )
538{
540#ifdef _UG_NO_THREAD_LOCAL_STATIC
541 pthread_t tid = pthread_self();
543 assert(elem);
544 while( !pthread_equal( elem->getTid(), tid ) )
545 {
546 elem = elem->getNext();
547 assert(elem);
548 }
549 return elem->getOstream();
550#else
552#endif
553}
554
555void *
557 const void* buffer,
558 int count,
559 const int datatypeId
560 )
561{
562 void *newBuf = 0;
563 if( count == 0 ) return newBuf;
564
565 switch(datatypeId)
566 {
567 case ParaCHAR :
568 {
569 newBuf = new char[count];
570 memcpy(newBuf, buffer, (unsigned long int)sizeof(char)*count);
571 break;
572 }
573 case ParaSHORT :
574 {
575 newBuf = new short[count];
576 memcpy(newBuf, buffer, (unsigned long int)sizeof(short)*count);
577 break;
578 }
579 case ParaINT :
580 {
581 newBuf = new int[count];
582 memcpy(newBuf, buffer, (unsigned long int)sizeof(int)*count);
583 break;
584 }
585 case ParaLONG :
586 {
587 newBuf = new long[count];
588 memcpy(newBuf, buffer, (unsigned long int)sizeof(long)*count);
589 break;
590 }
591 case ParaUNSIGNED_CHAR :
592 {
593 newBuf = new unsigned char[count];
594 memcpy(newBuf, buffer, (unsigned long int)sizeof(unsigned char)*count);
595 break;
596 }
597 case ParaUNSIGNED_SHORT :
598 {
599 newBuf = new unsigned short[count];
600 memcpy(newBuf, buffer, (unsigned long int)sizeof(unsigned short)*count);
601 break;
602 }
603 case ParaUNSIGNED :
604 {
605 newBuf = new unsigned int[count];
606 memcpy(newBuf, buffer, (unsigned long int)sizeof(unsigned int)*count);
607 break;
608 }
609 case ParaUNSIGNED_LONG :
610 {
611 newBuf = new unsigned long[count];
612 memcpy(newBuf, buffer, (unsigned long int)sizeof(unsigned long)*count);
613 break;
614 }
615 case ParaFLOAT :
616 {
617 newBuf = new float[count];
618 memcpy(newBuf, buffer, (unsigned long int)sizeof(float)*count);
619 break;
620 }
621 case ParaDOUBLE :
622 {
623 newBuf = new double[count];
624 memcpy(newBuf, buffer, (unsigned long int)sizeof(double)*count);
625 break;
626 }
627 case ParaLONG_DOUBLE :
628 {
629 newBuf = new long double[count];
630 memcpy(newBuf, buffer, (unsigned long int)sizeof(long double)*count);
631 break;
632 }
633 case ParaBYTE :
634 {
635 newBuf = new char[count];
636 memcpy(newBuf, buffer, (unsigned long int)sizeof(char)*count);
637 break;
638 }
639 case ParaSIGNED_CHAR :
640 {
641 newBuf = new char[count];
642 memcpy(newBuf, buffer, (unsigned long int)sizeof(char)*count);
643 break;
644 }
645 case ParaLONG_LONG :
646 {
647 newBuf = new long long[count];
648 memcpy(newBuf, buffer, (unsigned long int)sizeof(long long)*count);
649 break;
650 }
652 {
653 newBuf = new unsigned long long[count];
654 memcpy(newBuf, buffer, (unsigned long int)sizeof(unsigned long long)*count);
655 break;
656 }
657 case ParaBOOL :
658 {
659 newBuf = new bool[count];
660 memcpy(newBuf, buffer, (unsigned long int)sizeof(bool)*count);
661 break;
662 }
663 default :
664 THROW_LOGICAL_ERROR2("This type is not implemented. Type = ", datatypeId);
665 }
666
667 return newBuf;
668}
669
670void
672 void *dest, const void *src, int count, int datatypeId
673 )
674{
675
676 if( count == 0 ) return;
677
678 switch(datatypeId)
679 {
680 case ParaCHAR :
681 {
682 memcpy(dest, src, (unsigned long int)sizeof(char)*count);
683 break;
684 }
685 case ParaSHORT :
686 {
687 memcpy(dest, src, (unsigned long int)sizeof(short)*count);
688 break;
689 }
690 case ParaINT :
691 {
692 memcpy(dest, src, (unsigned long int)sizeof(int)*count);
693 break;
694 }
695 case ParaLONG :
696 {
697 memcpy(dest, src, (unsigned long int)sizeof(long)*count);
698 break;
699 }
700 case ParaUNSIGNED_CHAR :
701 {
702 memcpy(dest, src, (unsigned long int)sizeof(unsigned char)*count);
703 break;
704 }
705 case ParaUNSIGNED_SHORT :
706 {
707 memcpy(dest, src, (unsigned long int)sizeof(unsigned short)*count);
708 break;
709 }
710 case ParaUNSIGNED :
711 {
712 memcpy(dest, src, (unsigned long int)sizeof(unsigned int)*count);
713 break;
714 }
715 case ParaUNSIGNED_LONG :
716 {
717 memcpy(dest, src, (unsigned long int)sizeof(unsigned long)*count);
718 break;
719 }
720 case ParaFLOAT :
721 {
722 memcpy(dest, src, (unsigned long int)sizeof(float)*count);
723 break;
724 }
725 case ParaDOUBLE :
726 {
727 memcpy(dest, src, (unsigned long int)sizeof(double)*count);
728 break;
729 }
730 case ParaLONG_DOUBLE :
731 {
732 memcpy(dest, src, (unsigned long int)sizeof(long double)*count);
733 break;
734 }
735 case ParaBYTE :
736 {
737 memcpy(dest, src, (unsigned long int)sizeof(char)*count);
738 break;
739 }
740 case ParaSIGNED_CHAR :
741 {
742 memcpy(dest, src, (unsigned long int)sizeof(char)*count);
743 break;
744 }
745 case ParaLONG_LONG :
746 {
747 memcpy(dest, src, (unsigned long int)sizeof(long long)*count);
748 break;
749 }
751 {
752 memcpy(dest, src, (unsigned long int)sizeof(unsigned long long)*count);
753 break;
754 }
755 case ParaBOOL :
756 {
757 memcpy(dest, src, (unsigned long int)sizeof(bool)*count);
758 break;
759 }
760 default :
761 THROW_LOGICAL_ERROR2("This type is not implemented. Type = ", datatypeId);
762 }
763
764}
765
766void
768 void* buffer,
769 int count,
770 const int datatypeId
771 )
772{
773
774 if( count == 0 ) return;
775
776 switch(datatypeId)
777 {
778 case ParaCHAR :
779 {
780 delete [] static_cast<char *>(buffer);
781 break;
782 }
783 case ParaSHORT :
784 {
785 delete [] static_cast<short *>(buffer);
786 break;
787 }
788 case ParaINT :
789 {
790 delete [] static_cast<int *>(buffer);
791 break;
792 }
793 case ParaLONG :
794 {
795 delete [] static_cast<long *>(buffer);
796 break;
797 }
798 case ParaUNSIGNED_CHAR :
799 {
800 delete [] static_cast<unsigned char *>(buffer);
801 break;
802 }
803 case ParaUNSIGNED_SHORT :
804 {
805 delete [] static_cast<unsigned short *>(buffer);
806 break;
807 }
808 case ParaUNSIGNED :
809 {
810 delete [] static_cast<unsigned int *>(buffer);
811 break;
812 }
813 case ParaUNSIGNED_LONG :
814 {
815 delete [] static_cast<unsigned long *>(buffer);
816 break;
817 }
818 case ParaFLOAT :
819 {
820 delete [] static_cast<float *>(buffer);
821 break;
822 }
823 case ParaDOUBLE :
824 {
825 delete [] static_cast<double *>(buffer);
826 break;
827 }
828 case ParaLONG_DOUBLE :
829 {
830 delete [] static_cast<long double *>(buffer);
831 break;
832 }
833 case ParaBYTE :
834 {
835 delete [] static_cast<char *>(buffer);
836 break;
837 }
838 case ParaSIGNED_CHAR :
839 {
840 delete [] static_cast<char *>(buffer);
841 break;
842 }
843 case ParaLONG_LONG :
844 {
845 delete [] static_cast<long long *>(buffer);
846 break;
847 }
849 {
850 delete [] static_cast<unsigned long long *>(buffer);
851 break;
852 }
853 case ParaBOOL :
854 {
855 delete [] static_cast<bool *>(buffer);;
856 break;
857 }
858 default :
859 THROW_LOGICAL_ERROR2("This type is not implemented. Type = ", datatypeId);
860 }
861
862}
863
864bool
866 MessageQueueElement *elem ///< pointer to a message queue element
867 )
868{
869 if( elem->getDataTypeId() < UG_USER_TYPE_FIRST )
870 {
871 freeMem(elem->getData(), elem->getCount(), elem->getDataTypeId() );
872 }
873 else
874 {
875 switch( elem->getDataTypeId())
876 {
877 case ParaInstanceType:
878 {
879 delete reinterpret_cast<ParaInstance *>(elem->getData());
880 break;
881 }
882 case ParaSolutionType:
883 {
884 delete reinterpret_cast<ParaSolution *>(elem->getData());
885 break;
886 }
887 case ParaParamSetType:
888 {
889 delete reinterpret_cast<ParaParamSet *>(elem->getData());
890 break;
891 }
892 case ParaTaskType:
893 {
894 delete reinterpret_cast<ParaTask *>(elem->getData());
895 break;
896 }
898 {
899 delete reinterpret_cast<ParaSolverState *>(elem->getData());
900 break;
901 }
903 {
904 delete reinterpret_cast<ParaCalculationState *>(elem->getData());
905 break;
906 }
908 {
909 delete reinterpret_cast<ParaSolverTerminationState *>(elem->getData());
910 break;
911 }
913 {
914 delete reinterpret_cast<ParaRacingRampUpParamSet *>(elem->getData());
915 break;
916 }
917 default:
918 {
919 return false;
920 }
921 }
922 }
923 return true;
924}
925
926bool
928 )
929{
930 // std::cout << "size = " << sizeof(tagStringTable)/sizeof(char*) << ", N_TH_TAGS = " << N_TH_TAGS << std::endl;
931 return ( sizeof(tagStringTable)/sizeof(char*) == N_TH_TAGS );
932}
933
934const char *
936 int tag /// tag to be converted to string
937 )
938{
939 assert( tag >= 0 && tag < N_TH_TAGS );
940 return tagStringTable[tag];
941}
942
943int
945 void* buffer,
946 int count,
947 const int datatypeId,
948 int root
949 )
950{
951 if( getRank() == root )
952 {
953 for(int i=0; i < comSize; i++)
954 {
955 if( i != root )
956 {
957 send(buffer, count, datatypeId, i, -1);
958 }
959 }
960 }
961 else
962 {
963 receive(buffer, count, datatypeId, root, -1);
964 }
965 return 0;
966}
967
968int
970 void* buffer,
971 int count,
972 const int datatypeId,
973 int dest,
974 const int tag
975 )
976{
977 LOCKED ( &queueLock[dest] )
978 {
979 messageQueueTable[dest]->enqueue(&sentMsg[dest],&sentMessage[dest],
980 new MessageQueueElement(getRank(), count, datatypeId, tag,
981 allocateMemAndCopy(buffer, count, datatypeId) ) );
982 }
983 TAG_TRACE (Send, To, dest, tag);
984 return 0;
985}
986
987int
989 void* buffer,
990 int count,
991 const int datatypeId,
992 int source,
993 const int tag
994 )
995{
996 int qRank = getRank();
997 MessageQueueElement *elem = 0;
998 if( !messageQueueTable[qRank]->checkElement(source, datatypeId, tag) )
999 {
1000 messageQueueTable[qRank]->waitMessage(&sentMsg[qRank], &sentMessage[qRank], source, datatypeId, tag);
1001 }
1002 LOCKED ( &queueLock[qRank] )
1003 {
1004 elem = messageQueueTable[qRank]->extarctElement(&sentMessage[qRank],source, datatypeId, tag);
1005 }
1006 assert(elem);
1007 copy( buffer, elem->getData(), count, datatypeId );
1008 freeMem(elem->getData(), count, datatypeId );
1009 delete elem;
1010 TAG_TRACE (Recv, From, source, tag);
1011 return 0;
1012}
1013
1014void
1016 const int source,
1017 const int tag,
1018 int *receivedTag
1019 )
1020{
1021 /*
1022 // Just wait, iProbe and receive will be performed after this call
1023 messageQueueTable[getRank()]->waitMessage(source, datatypeId, tag);
1024 TAG_TRACE (Probe, From, source, tag);
1025 return 0;
1026 */
1027 int qRank = getRank();
1028 // LOCKED ( &queueLock[getRank()] )
1029 // {
1030 (*receivedTag) = tag;
1031 messageQueueTable[qRank]->waitMessage(&sentMsg[qRank], &sentMessage[qRank], source, receivedTag);
1032 // }
1033 TAG_TRACE (Probe, From, source, *receivedTag);
1034 return;
1035}
1036
1037bool
1039 int* source,
1040 int* tag
1041 )
1042{
1043 int qRank = getRank();
1044 messageQueueTable[qRank]->waitMessage(&sentMsg[qRank], &sentMessage[qRank]);
1046 *source = elem->getSource();
1047 *tag = elem->getTag();
1048 TAG_TRACE (Probe, From, *source, *tag);
1049 return true;
1050}
1051
1052bool
1054 int* source,
1055 int* tag
1056 )
1057{
1058 bool flag = false;
1059 int qRank = getRank();
1060 LOCKED ( &queueLock[qRank] )
1061 {
1062 if( *tag == TagAny )
1063 {
1064 flag = !(messageQueueTable[qRank]->isEmpty());
1065 if( flag )
1066 {
1068 *source = elem->getSource();
1069 *tag = elem->getTag();
1070 TAG_TRACE (Iprobe, From, *source, *tag);
1071 }
1072 }
1073 else
1074 {
1076 if( elem )
1077 {
1078 *source = elem->getSource();
1079 *tag = elem->getTag();
1080 TAG_TRACE (Iprobe, From, *source, *tag);
1081 flag = true;
1082 }
1083 }
1084 }
1085 return flag;
1086}
1087
1088int
1090 void* buffer,
1091 const int datatypeId,
1092 int dest,
1093 const int tag
1094 )
1095{
1096 LOCKED ( &queueLock[dest] )
1097 {
1098 messageQueueTable[dest]->enqueue(&sentMsg[dest],&sentMessage[dest],
1099 new MessageQueueElement(getRank(), 1, datatypeId, tag, buffer ) );
1100 }
1101 TAG_TRACE (Send, To, dest, tag);
1102 return 0;
1103}
1104
1105int
1107 void** buffer,
1108 const int datatypeId,
1109 int source,
1110 const int tag
1111 )
1112{
1113 int qRank = getRank();
1114 if( !messageQueueTable[qRank]->checkElement(source, datatypeId, tag) )
1115 {
1116 messageQueueTable[qRank]->waitMessage(&sentMsg[qRank], &sentMessage[qRank], source, datatypeId, tag);
1117 }
1118 MessageQueueElement *elem = 0;
1119 LOCKED ( &queueLock[qRank] )
1120 {
1121
1122 elem = messageQueueTable[qRank]->extarctElement(&sentMessage[qRank], source, datatypeId, tag);
1123 }
1124 assert(elem);
1125 *buffer = elem->getData();
1126 delete elem;
1127 TAG_TRACE (Recv, From, source, tag);
1128 return 0;
1129}
Condition variable.
void setLock(Lock *l)
set Lock from outside.
Class that implements a lock. The class wraps around pthread_mutex_t and adds some safeguards.
Definition: paraPthLock.h:82
Class for message queue element.
int getSource()
getter of source rank
int getDataTypeId()
getter of the data type id
void * getData()
getter of data
int getTag()
getter of the message tag
int getCount()
getter of the number of the data type elements
Class of MessageQueueTableElement.
void enqueue(std::condition_variable &sentMsg, std::mutex &queueLockMutex, bool *sentMessage, MessageQueueElement *newElement)
enqueue a message
MessageQueueElement * extarctElement(bool *sentMessage, int source, int datatypeId, int tag)
extracts a message
MessageQueueElement * getHead()
getter of head
void waitMessage(std::condition_variable &sentMsg, std::mutex &queueLockMutex, bool *sentMessage)
wait for a message coming to a queue
MessageQueueElement * checkElementWithTag(int tag)
check if the specified message with tag exists or nor
bool isEmpty()
check if the queue is empty or not
Base class of Calculation state in a ParaSolver.
int comSize
communicator size : number of threads joined in this system
Definition: paraCommPth.h:708
bool probe(int *source, int *tag)
probe function which waits a new message
int send(void *bufer, int count, const int datatypeId, int dest, const int tag)
send function for standard ParaData types
int ** token
index 0: token index 1: token color -1: green > 0: yellow ( termination origin solver number ) -2: re...
Definition: paraCommPth.h:710
bool iProbe(int *source, int *tag)
iProbe function which checks if a new message is arrived or not
Lock * queueLock
Lock for synchronization.
Definition: paraCommPth.h:723
void solverInit(ParaParamSet *paraParamSet)
initializer for Solvers
Definition: paraCommPth.h:877
int receive(void *bufer, int count, const int datatypeId, int source, const int tag)
receive function for standard ParaData types
void freeMem(void *buffer, int count, const int datatypeId)
free memory
std::ostream * getOstream()
get ostream pointer
virtual bool tagStringTableIsSetUpCoorectly()
check if tag string table (for debugging) set up correctly
bool tagTraceFlag
indicate if tags are traced or not
Definition: paraCommPth.h:709
void * allocateMemAndCopy(const void *buffer, int count, const int datatypeId)
allocate memory and copy message
int uTypeSend(void *bufer, const int datatypeId, int dest, int tag)
User type send for created data type.
virtual const char * getTagString(int tag)
get Tag string for debugging
ParaSysTimer timer
system timer
Definition: paraCommPth.h:715
int uTypeReceive(void **bufer, const int datatypeId, int source, int tag)
User type receive for created data type.
void passToken(int rank)
pass token to from the rank to the next
Lock * tokenAccessLock
lock to access token
Definition: paraCommPth.h:726
void registedAllSolvers()
notify that all solvers are registered
MessageQueueTableElement ** messageQueueTable
message queue table
Definition: paraCommPth.h:721
unsigned int hashCode(pthread_t tid)
get hash code from thread id
static const char * tagStringTable[]
tag name string table
Definition: paraCommPth.h:716
void waitUntilRegistered()
wait until thread id is registered to thread table
virtual ~ParaCommPth()
destructor of this communicator
static ThreadsTableElement * threadsTable[HashTableSize]
threads table: index is thread rank
Definition: paraCommPth.h:717
int getRank()
get rank of caller's thread
void solverDel(int rank)
delete Solver from this communicator
static __thread int localRank
local thread rank
Definition: paraCommPth.h:719
bool freeStandardTypes(MessageQueueElement *elem)
free memory
bool * sentMessage
sent message flag for synchronization
Definition: paraCommPth.h:722
void copy(void *dest, const void *src, int count, int datatypeId)
copy message
ConditionVariable * sentMsg
Condition variable for synchronization.
Definition: paraCommPth.h:724
void solverReInit(int rank, ParaParamSet *paraParamSet)
reinitializer of a specific Solver
bool passTermToken(int rank)
pass termination token from the rank to the next
void setToken(int rank, int *inToken)
set received token to this communicator
int bcast(void *buffer, int count, const int datatypeId, int root)
broadcast function for standard ParaData types
void waitSpecTagFromSpecSource(const int source, const int tag, int *receivedTag)
wait function for a specific tag from a specific source coming from
void lcInit(ParaParamSet *paraParamSet)
initializer for LoadCoordinator
bool waitToken(int rank)
wait token when UG runs with deterministic mode
class for instance data
Definition: paraInstance.h:51
class ParaParamSet
Definition: paraParamSet.h:850
class ParaRacingRampUpParamSet (parameter set for racing ramp-up)
class for solution
Definition: paraSolution.h:54
class ParaSolverState (ParaSolver state object for notification message)
class ParaSolverTerminationState (Solver termination state in a ParaSolver)
void start(void)
start timer
class ParaTask
Definition: paraTask.h:542
Class of ThreadsTableElement.
void setRank(int r)
setter of this thread rank
std::ostream * getOstream()
getter of tag trace stream of this rank
int getRank()
getter of this thread rank
ThreadsTableElement * getNext()
get next element
Definition: paraCommPth.h:677
static ScipParaParamSet * paraParamSet
Definition: fscip.cpp:74
static const int ParaUNSIGNED_LONG
Definition: paraComm.h:73
static const int TagAckCompletion
Definition: paraTagDef.h:62
static const int TagCompletionOfCalculation
Definition: paraTagDef.h:54
static const int TagWinner
Definition: paraTagDef.h:60
static const int ParaTaskType
static const int ParaInstanceType
Definition: paraCommCPP11.h:98
static const int TagSolution
Definition: paraTagDef.h:51
static const int ParaUNSIGNED_SHORT
Definition: paraComm.h:71
static const int TagToken
Definition: paraTagDef.h:63
static const int TagTaskReceived
Definition: paraTagDef.h:48
static const int N_TH_TAGS
Definition: paraTagDef.h:85
static const int TagInterruptRequest
Definition: paraTagDef.h:57
static const int TagNotificationId
Definition: paraTagDef.h:55
static const int ParaParamSetType
static const int TagIncumbentValue
Definition: paraTagDef.h:52
static const int HashTableSize
size of thread table : this limits the number of threads
Definition: paraCommPth.h:95
static const int ParaLONG_DOUBLE
Definition: paraComm.h:77
static const int ParaINT
Definition: paraComm.h:66
static const int TagTerminated
Definition: paraTagDef.h:58
static const int ParaSolverStateType
static const int ParaCalculationStateType
static const int ParaLONG
Definition: paraComm.h:67
static const int TagTerminateRequest
Definition: paraTagDef.h:56
static const int TagAny
Definition: paraTagDef.h:44
static const int ParaBYTE
Definition: paraComm.h:79
static const int ParaSolverTerminationStateType
static const int ParaUNSIGNED
Definition: paraComm.h:72
static const int TagRampUp
Definition: paraTagDef.h:50
static const int TagSolverState
Definition: paraTagDef.h:53
static const int TagHardTimeLimit
Definition: paraTagDef.h:61
static const int ParaFLOAT
Definition: paraComm.h:75
static const int ParaBOOL
Definition: paraComm.h:78
static const int ParaCHAR
Definition: paraComm.h:64
static const int UG_USER_TYPE_FIRST
user defined transfer data types
Definition: paraCommCPP11.h:97
static const int TagDiffSubproblem
Definition: paraTagDef.h:49
static const int TagTask
Definition: paraTagDef.h:47
static const int ParaSolutionType
Definition: paraCommCPP11.h:99
static const int ParaRacingRampUpParamType
static const int TagParaInstance
Definition: paraTagDef.h:82
static const int ParaSHORT
Definition: paraComm.h:65
static const int TagTrace
Definition: paraParamSet.h:72
static const int ParaUNSIGNED_LONG_LONG
Definition: paraComm.h:74
static const int ParaLONG_LONG
Definition: paraComm.h:68
static const int ParaUNSIGNED_CHAR
Definition: paraComm.h:70
static const int ParaDOUBLE
Definition: paraComm.h:76
static const int ParaSIGNED_CHAR
Definition: paraComm.h:69
Base class for calculation state.
#define TAG_TRACE(call, fromTo, sourceDest, tag)
Definition: paraCommCPP11.h:58
Lock rankGenLock
Definition: paraCommPth.cpp:53
ParaComm extension for Pthreads communication.
#define HashEntry(tid)
Definition: paraCommPth.h:57
#define THROW_LOGICAL_ERROR1(msg1)
Definition: paraDef.h:52
#define ABORT_LOGICAL_ERROR2(msg1, msg2)
Definition: paraDef.h:78
#define THROW_LOGICAL_ERROR2(msg1, msg2)
Definition: paraDef.h:69
#define THROW_LOGICAL_ERROR4(msg1, msg2, msg3, msg4)
Definition: paraDef.h:103
Base class for initial statistics collecting class.
#define LOCKED(lck)
Definition: paraPthLock.h:292
#define LOCK_RAII(lck)
Definition: paraPthLock.h:295
Base class for racing ramp-up parameter set.
Base class for solution.
This class has solver state to be transferred.
This class contains solver termination state which is transferred form Solver to LC.
#define TAG_STR(tag)
Definition: paraTagDef.h:40
Base class for ParaTask.