Scippy

UG

Ubiquity Generator framework

paraCommPth.cpp
Go to the documentation of this file.
1 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2 /* */
3 /* This file is part of the program and software framework */
4 /* UG --- Ubquity Generator Framework */
5 /* */
6 /* Copyright Written by Yuji Shinano <shinano@zib.de>, */
7 /* Copyright (C) 2021 by Zuse Institute Berlin, */
8 /* licensed under LGPL version 3 or later. */
9 /* Commercial licenses are available through <licenses@zib.de> */
10 /* */
11 /* This code is free software; you can redistribute it and/or */
12 /* modify it under the terms of the GNU Lesser General Public License */
13 /* as published by the Free Software Foundation; either version 3 */
14 /* of the License, or (at your option) any later version. */
15 /* */
16 /* This program is distributed in the hope that it will be useful, */
17 /* but WITHOUT ANY WARRANTY; without even the implied warranty of */
18 /* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */
19 /* GNU Lesser General Public License for more details. */
20 /* */
21 /* You should have received a copy of the GNU Lesser General Public License */
22 /* along with this program. If not, see <http://www.gnu.org/licenses/>. */
23 /* */
24 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
25 
26 /**@file paraCommPth.cpp
27  * @brief ParaComm extension for Pthreads communication
28  * @author Yuji Shinano
29  *
30  *
31  *
32  */
33 
34 /*---+----1----+----2----+----3----+----4----+----5----+----6----+----7----+----8----+----9----+----0----+----1----+----2*/
35 
36 
37 #include <cstring>
38 #include <cstdlib>
39 #ifndef _MSC_VER
40 #include <unistd.h>
41 #endif
42 #include "paraCommPth.h"
43 #include "paraTask.h"
44 #include "paraSolution.h"
46 #include "paraCalculationState.h"
47 #include "paraSolverState.h"
49 #include "paraInitialStat.h"
50 
51 using namespace UG;
52 
54 ConditionVariable solverRanksGenerated(&rankGenLock);
57 
58 #ifndef _UG_NO_THREAD_LOCAL_STATIC
59 __thread int ParaCommPth::localRank = -1; /*< local thread rank */
60 #endif
61 
62 const char *
76  TAG_STR(TagRacingRampUpParamSets),
81  TAG_STR(TagParaInstance)
82 };
83 
84 void
85 ParaCommPth::init( int argc, char **argv )
86 {
87  // don't have to take any lock, because only LoadCoordinator call this function
88 
89  timer.start();
90  comSize = 0;
91 
92  for( int i = 1; i < argc; i++ )
93  {
94  if( strcmp(argv[i], "-sth") == 0 )
95  {
96  i++;
97  if( i < argc )
98  comSize = atoi(argv[i]); // if -sth 0, then it is considered as use the number of cores system has
99  else
100  {
101  std::cerr << "missing the number of solver threads after parameter '-sth'" << std::endl;
102  exit(1);
103  }
104  }
105  }
106 
107  if( comSize > 0 )
108  {
109  comSize++;
110  }
111  else
112  {
113  comSize = sysconf(_SC_NPROCESSORS_CONF) + 1;
114  }
115 
117  token = new int*[comSize];
118  for( int i = 0; i < comSize; i++ )
119  {
120  token[i] = new int[2];
121  token[i][0] = 0;
122  token[i][1] = -1;
123  }
124 
125  /** if you add tag, you should add tagStringTale too */
126  // assert( sizeof(tagStringTable)/sizeof(char*) == N_TH_TAGS );
127  assert( tagStringTableIsSetUpCoorectly() );
128 
129  /** initialize hashtable */
130  for(int i = 0; i < HashTableSize; i++ )
131  {
132  threadsTable[i] = 0;
133  }
134 
135  messageQueueTable = new MessageQueueTableElement *[comSize + 1]; // +1 for TimeLimitMonitor
136  sentMessage = new bool[comSize + 1];
137  queueLock = new Lock[comSize + 1];
138  sentMsg = new ConditionVariable[comSize + 1];
139  for( int i = 0; i < ( comSize + 1 ); i++ )
140  {
142  sentMsg[i].setLock(&queueLock[i]);
143  sentMessage[i] = false;
144  }
145 
146 }
147 
148 void
151  )
152 {
153 
154  // pthread_t tid = pthread_self();
155 
156  // don't have to take any lock, because only LoadCoordinator call this function
157  LOCKED (&rankGenLock ) {
158 #ifndef _UG_NO_THREAD_LOCAL_STATIC
159  assert( localRank == -1 );
160  assert( threadsTable[0] == 0 );
161  localRank = 0;
162  threadsTable[0] = new ThreadsTableElement(0, paraParamSet);
163 #else
164  pthread_t tid = pthread_self();
165  threadsTable[HashEntry(tid)] = new ThreadsTableElement(tid, 0, paraParamSet);
166 #endif
167  }
168  tagTraceFlag = paraParamSet->getBoolParamValue(TagTrace);
169 }
170 
171 void
173  int rank,
175  )
176 {
177  // don't have to take any lock, because only LoadCoordinator call this function
178  // CHANGED in multi-threaded solver case
179  LOCKED (&rankGenLock ) {
180 #ifndef _UG_NO_THREAD_LOCAL_STATIC
181  assert( localRank == -1 );
182  assert( threadsTable[rank] == 0 );
183  localRank = rank;
184  threadsTable[localRank] = new ThreadsTableElement(localRank, paraParamSet);
185  // std::cout << "tid = " << pthread_self() << " is initialized as Rank = " << localRank << std::endl;
186 #else
187  pthread_t tid = pthread_self();
188  int index = HashEntry(tid);
189  if( threadsTable[index] == 0 )
190  {
191  threadsTable[index] = new ThreadsTableElement(tid, rank, paraParamSet);
192  }
193  else
194  {
195  ThreadsTableElement *elem = threadsTable[index];
196  while( elem->getNext() != 0 )
197  {
198  if( pthread_equal( elem->getTid(), tid ) )
199  {
200  THROW_LOGICAL_ERROR4("Invalid solver tid is registered. Rank = ", rank, ", tid = ", tid );
201  }
202  elem = elem->getNext();
203  }
204  elem->link(new ThreadsTableElement(tid, rank, paraParamSet));
205  }
206  // std::cout << "tid = " << tid << " for Rank = " << rank << " is added to table" << std::endl;
207 #endif
208  }
209 }
210 
211 void
213  int rank,
215  )
216 {
217  // don't have to take any lock, because only LoadCoordinator call this function
218  // CHANGED in multi-threaded solver case
219  LOCKED (&rankGenLock ) {
220 #ifndef _UG_NO_THREAD_LOCAL_STATIC
221  assert( localRank == -1 );
222  assert( threadsTable[rank] != 0 );
223  localRank = rank;
224  // threadsTable[localRank] = new ThreadsTableElement(localRank, paraParamSet);
225 #else
226  // ****** CAUTION *******
227  // Should not use this, since no chance to release ThreadsTableElement entry.
228  // This means that new entries are always added and shuld not use.
229  // ***********************
230  THROW_LOGICAL_ERROR1("solverReInit only can work with thread local variable. The following routine should not be used!");
231  pthread_t tid = pthread_self();
232  int index = HashEntry(tid);
233  if( threadsTable[index] == 0 )
234  {
235  threadsTable[index] = new ThreadsTableElement(tid, rank, paraParamSet);
236  }
237  else
238  {
239  ThreadsTableElement *elem = threadsTable[index];
240  while( elem != 0 )
241  {
242  if( pthread_equal( elem->getTid(), tid ) )
243  {
244  elem->setRank(rank);
245  break;
246  }
247  elem = elem->getNext();
248  }
249  if( elem == 0 )
250  {
251  THROW_LOGICAL_ERROR4("Invalid solver tid is reInit. Rank = ", rank, ", tid = ", tid );
252  }
253  }
254  // std::cout << "tid = " << tid << " for Rank = " << rank << " is added to table" << std::endl;
255 #endif
256  }
257 
258 }
259 
260 void
262  int rank
263  )
264 {
265  LOCKED (&rankGenLock ) {
266 #ifndef _UG_NO_THREAD_LOCAL_STATIC
267  assert(rank == localRank);
268  if( threadsTable[rank] == 0 )
269  {
270  THROW_LOGICAL_ERROR2("Invalid remove thread. Rank = ", rank);
271  }
272  else
273  {
274  ThreadsTableElement *elem = threadsTable[rank];
275  delete elem;
276  threadsTable[rank] = 0;
277  localRank = -1;
278  }
279 #else
280  pthread_t tid = pthread_self();
281  int index = HashEntry(tid);
282  if( threadsTable[index] == 0 )
283  {
284  THROW_LOGICAL_ERROR4("Invalid remove thread. Rank = ", rank, ", tid = ", tid );
285  }
286  else
287  {
288  ThreadsTableElement *elem = threadsTable[index];
289  ThreadsTableElement *pre = elem;
290  while( elem && !pthread_equal( elem->getTid(), tid ) )
291  {
292  pre = elem;
293  elem = elem->getNext();
294  }
295  if( !elem || !pthread_equal( elem->getTid(), tid ) )
296  {
297  THROW_LOGICAL_ERROR4("Invalid remove thread. Rank = ", rank, ", tid = ", tid );
298  }
299  if( elem == threadsTable[index] )
300  {
301  threadsTable[index] = elem->getNext();
302  }
303  else
304  {
305  pre->link(elem->getNext());
306  }
307  delete elem;
308  }
309  // std::cout << "tid = " << tid << " is deleleted from table" << std::endl;
310 #endif
311  }
312 }
313 
314 void
316  )
317 {
318 #ifdef _UG_NO_THREAD_LOCAL_STATIC
319  pthread_t tid = pthread_self();
320 #endif
321 
322  bool registered = false;
323  LOCKED(&rankGenLock){
324  while( !registered )
325  {
326 #ifndef _UG_NO_THREAD_LOCAL_STATIC
327  if( threadsTable[localRank] != 0 )
328  {
329  registered = true;
330  break;
331  }
332 #else
334  while( elem && !pthread_equal( elem->getTid(), tid ) )
335  {
336  elem = elem->getNext();
337  }
338  if( elem )
339  {
340  assert( pthread_equal( elem->getTid(), tid ) );
341  // std::cout << "tid = " << tid << ", hash = " << HashEntry(tid) << std::endl;
342  registered = true;
343  break;
344  }
345 #endif
346  solverRanksGenerated.wait();
347  }
348  }
349 }
350 
351 void
353  )
354 {
355  solverRanksGenerated.broadcast();
356 }
357 
358 bool
360  int rank
361  )
362 {
363  // int rank = getRank(); // multi-thread solver may change rank here
364  LOCK_RAII(&tokenAccessLock[rank]);
365  if( token[rank][0] == rank )
366  {
367  return true;
368  }
369  else
370  {
371  int receivedTag;
372  int source;
373  probe(&source, &receivedTag);
374  TAG_TRACE (Probe, From, source, receivedTag);
375  if( source == 0 && receivedTag == TagToken )
376  {
377  receive(token[rank], 2, ParaINT, 0, TagToken);
378  assert( token[rank][0] == rank );
379  return true;
380  }
381  else
382  {
383  return false;
384  }
385  }
386 }
387 
388 void
390  int rank
391  )
392 {
393  // int rank = getRank(); // multi-thread solver may change rank here
394  LOCK_RAII(&tokenAccessLock[rank]);
395  assert( token[rank][0] == rank && rank != 0 );
396  token[rank][0] = ( token[rank][0] % (comSize - 1) ) + 1;
397  token[rank][1] = -1;
398  send(token[rank], 2, ParaINT, 0, TagToken);
399 }
400 
401 bool
403  int rank
404  )
405 {
406  // int rank = getRank(); // multi-thread solver may change rank here
407  LOCK_RAII(&tokenAccessLock[rank]);
408  if( rank == token[rank][0] )
409  {
410  if( token[rank][1] == token[rank][0] ) token[rank][1] = -2;
411  else if( token[rank][1] == -1 ) token[rank][1] = token[rank][0];
412  token[rank][0] = ( token[rank][0] % (comSize - 1) ) + 1;
413  }
414  else
415  {
416  THROW_LOGICAL_ERROR4("Invalid token update. Rank = ", getRank(), ", token = ", token[0] );
417  }
418  send(token[rank], 2, ParaINT, 0, TagToken);
419  if( token[rank][1] == -2 )
420  {
421  return true;
422  }
423  else
424  {
425  return false;
426  }
427 }
428 
429 void
431  int rank,
432  int *inToken
433  )
434 {
435  // int rank = getRank();
436  LOCK_RAII(&tokenAccessLock[rank]);
437  assert( rank == 0 || ( rank != 0 && inToken[0] == rank ) );
438  token[rank][0] = inToken[0];
439  token[rank][1] = inToken[1];
440 }
441 
443 {
444  LOCK_RAII(&rankGenLock); // rankGenLock is not good name
445  for(int i = 0; i < HashTableSize; i++ )
446  {
447  if( threadsTable[i] )
448  {
449  while( threadsTable[i] )
450  {
452  delete threadsTable[i];
453  threadsTable[i] = next;
454  }
455  }
456  }
457 
458  for( int i = 0; i < comSize; i++ )
459  {
460  delete [] token[i];
461  }
462  delete [] token;
463  delete [] tokenAccessLock;
464 
465  for(int i = 0; i < (comSize + 1); i++)
466  {
468  while( elem )
469  {
470  if( elem->getData() )
471  {
472  if( !freeStandardTypes(elem) )
473  {
474  ABORT_LOGICAL_ERROR2("Requested type is not implemented. Type = ", elem->getDataTypeId() );
475  }
476  }
477  delete elem;
479  }
480  delete messageQueueTable[i];
481  }
482  delete [] messageQueueTable;
483 
484  if( sentMessage ) delete [] sentMessage;
485  if( queueLock ) delete [] queueLock;
486  if( sentMsg ) delete [] sentMsg;
487 
488 }
489 
490 unsigned int
492  pthread_t tid
493  )
494 {
495  union {
496  pthread_t tid;
497  unsigned char cTid[sizeof(pthread_t)];
498  } reinterpret;
499 
500  reinterpret.tid = tid;
501  unsigned int h = 0;
502  for (unsigned int i = 0; i < sizeof(pthread_t); i++) {
503  h = 31*h + reinterpret.cTid[i];
504  }
505  return h;
506 }
507 
508 int
510  )
511 {
512  LOCK_RAII(&rankGenLock);
513 #ifndef _UG_NO_THREAD_LOCAL_STATIC
514  if( localRank >= 0 ) return localRank;
515  else return -1;
516 #else
517  pthread_t tid = pthread_self();
518  // std::cout << "gerRank tid = " << tid << std::endl;
520  while( elem && !pthread_equal( elem->getTid(), tid ) )
521  {
522  elem = elem->getNext();
523  }
524  if( elem )
525  {
526  return elem->getRank();
527  }
528  else
529  {
530  return -1; // No ug threads
531  }
532 #endif
533 }
534 
535 std::ostream *
537  )
538 {
539  LOCK_RAII(&rankGenLock);
540 #ifdef _UG_NO_THREAD_LOCAL_STATIC
541  pthread_t tid = pthread_self();
543  assert(elem);
544  while( !pthread_equal( elem->getTid(), tid ) )
545  {
546  elem = elem->getNext();
547  assert(elem);
548  }
549  return elem->getOstream();
550 #else
551  return threadsTable[localRank]->getOstream();
552 #endif
553 }
554 
555 void *
557  const void* buffer,
558  int count,
559  const int datatypeId
560  )
561 {
562  void *newBuf = 0;
563  if( count == 0 ) return newBuf;
564 
565  switch(datatypeId)
566  {
567  case ParaCHAR :
568  {
569  newBuf = new char[count];
570  memcpy(newBuf, buffer, (unsigned long int)sizeof(char)*count);
571  break;
572  }
573  case ParaSHORT :
574  {
575  newBuf = new short[count];
576  memcpy(newBuf, buffer, (unsigned long int)sizeof(short)*count);
577  break;
578  }
579  case ParaINT :
580  {
581  newBuf = new int[count];
582  memcpy(newBuf, buffer, (unsigned long int)sizeof(int)*count);
583  break;
584  }
585  case ParaLONG :
586  {
587  newBuf = new long[count];
588  memcpy(newBuf, buffer, (unsigned long int)sizeof(long)*count);
589  break;
590  }
591  case ParaUNSIGNED_CHAR :
592  {
593  newBuf = new unsigned char[count];
594  memcpy(newBuf, buffer, (unsigned long int)sizeof(unsigned char)*count);
595  break;
596  }
597  case ParaUNSIGNED_SHORT :
598  {
599  newBuf = new unsigned short[count];
600  memcpy(newBuf, buffer, (unsigned long int)sizeof(unsigned short)*count);
601  break;
602  }
603  case ParaUNSIGNED :
604  {
605  newBuf = new unsigned int[count];
606  memcpy(newBuf, buffer, (unsigned long int)sizeof(unsigned int)*count);
607  break;
608  }
609  case ParaUNSIGNED_LONG :
610  {
611  newBuf = new unsigned long[count];
612  memcpy(newBuf, buffer, (unsigned long int)sizeof(unsigned long)*count);
613  break;
614  }
615  case ParaFLOAT :
616  {
617  newBuf = new float[count];
618  memcpy(newBuf, buffer, (unsigned long int)sizeof(float)*count);
619  break;
620  }
621  case ParaDOUBLE :
622  {
623  newBuf = new double[count];
624  memcpy(newBuf, buffer, (unsigned long int)sizeof(double)*count);
625  break;
626  }
627  case ParaLONG_DOUBLE :
628  {
629  newBuf = new long double[count];
630  memcpy(newBuf, buffer, (unsigned long int)sizeof(long double)*count);
631  break;
632  }
633  case ParaBYTE :
634  {
635  newBuf = new char[count];
636  memcpy(newBuf, buffer, (unsigned long int)sizeof(char)*count);
637  break;
638  }
639  case ParaSIGNED_CHAR :
640  {
641  newBuf = new char[count];
642  memcpy(newBuf, buffer, (unsigned long int)sizeof(char)*count);
643  break;
644  }
645  case ParaLONG_LONG :
646  {
647  newBuf = new long long[count];
648  memcpy(newBuf, buffer, (unsigned long int)sizeof(long long)*count);
649  break;
650  }
652  {
653  newBuf = new unsigned long long[count];
654  memcpy(newBuf, buffer, (unsigned long int)sizeof(unsigned long long)*count);
655  break;
656  }
657  case ParaBOOL :
658  {
659  newBuf = new bool[count];
660  memcpy(newBuf, buffer, (unsigned long int)sizeof(bool)*count);
661  break;
662  }
663  default :
664  THROW_LOGICAL_ERROR2("This type is not implemented. Type = ", datatypeId);
665  }
666 
667  return newBuf;
668 }
669 
670 void
672  void *dest, const void *src, int count, int datatypeId
673  )
674 {
675 
676  if( count == 0 ) return;
677 
678  switch(datatypeId)
679  {
680  case ParaCHAR :
681  {
682  memcpy(dest, src, (unsigned long int)sizeof(char)*count);
683  break;
684  }
685  case ParaSHORT :
686  {
687  memcpy(dest, src, (unsigned long int)sizeof(short)*count);
688  break;
689  }
690  case ParaINT :
691  {
692  memcpy(dest, src, (unsigned long int)sizeof(int)*count);
693  break;
694  }
695  case ParaLONG :
696  {
697  memcpy(dest, src, (unsigned long int)sizeof(long)*count);
698  break;
699  }
700  case ParaUNSIGNED_CHAR :
701  {
702  memcpy(dest, src, (unsigned long int)sizeof(unsigned char)*count);
703  break;
704  }
705  case ParaUNSIGNED_SHORT :
706  {
707  memcpy(dest, src, (unsigned long int)sizeof(unsigned short)*count);
708  break;
709  }
710  case ParaUNSIGNED :
711  {
712  memcpy(dest, src, (unsigned long int)sizeof(unsigned int)*count);
713  break;
714  }
715  case ParaUNSIGNED_LONG :
716  {
717  memcpy(dest, src, (unsigned long int)sizeof(unsigned long)*count);
718  break;
719  }
720  case ParaFLOAT :
721  {
722  memcpy(dest, src, (unsigned long int)sizeof(float)*count);
723  break;
724  }
725  case ParaDOUBLE :
726  {
727  memcpy(dest, src, (unsigned long int)sizeof(double)*count);
728  break;
729  }
730  case ParaLONG_DOUBLE :
731  {
732  memcpy(dest, src, (unsigned long int)sizeof(long double)*count);
733  break;
734  }
735  case ParaBYTE :
736  {
737  memcpy(dest, src, (unsigned long int)sizeof(char)*count);
738  break;
739  }
740  case ParaSIGNED_CHAR :
741  {
742  memcpy(dest, src, (unsigned long int)sizeof(char)*count);
743  break;
744  }
745  case ParaLONG_LONG :
746  {
747  memcpy(dest, src, (unsigned long int)sizeof(long long)*count);
748  break;
749  }
751  {
752  memcpy(dest, src, (unsigned long int)sizeof(unsigned long long)*count);
753  break;
754  }
755  case ParaBOOL :
756  {
757  memcpy(dest, src, (unsigned long int)sizeof(bool)*count);
758  break;
759  }
760  default :
761  THROW_LOGICAL_ERROR2("This type is not implemented. Type = ", datatypeId);
762  }
763 
764 }
765 
766 void
768  void* buffer,
769  int count,
770  const int datatypeId
771  )
772 {
773 
774  if( count == 0 ) return;
775 
776  switch(datatypeId)
777  {
778  case ParaCHAR :
779  {
780  delete [] static_cast<char *>(buffer);
781  break;
782  }
783  case ParaSHORT :
784  {
785  delete [] static_cast<short *>(buffer);
786  break;
787  }
788  case ParaINT :
789  {
790  delete [] static_cast<int *>(buffer);
791  break;
792  }
793  case ParaLONG :
794  {
795  delete [] static_cast<long *>(buffer);
796  break;
797  }
798  case ParaUNSIGNED_CHAR :
799  {
800  delete [] static_cast<unsigned char *>(buffer);
801  break;
802  }
803  case ParaUNSIGNED_SHORT :
804  {
805  delete [] static_cast<unsigned short *>(buffer);
806  break;
807  }
808  case ParaUNSIGNED :
809  {
810  delete [] static_cast<unsigned int *>(buffer);
811  break;
812  }
813  case ParaUNSIGNED_LONG :
814  {
815  delete [] static_cast<unsigned long *>(buffer);
816  break;
817  }
818  case ParaFLOAT :
819  {
820  delete [] static_cast<float *>(buffer);
821  break;
822  }
823  case ParaDOUBLE :
824  {
825  delete [] static_cast<double *>(buffer);
826  break;
827  }
828  case ParaLONG_DOUBLE :
829  {
830  delete [] static_cast<long double *>(buffer);
831  break;
832  }
833  case ParaBYTE :
834  {
835  delete [] static_cast<char *>(buffer);
836  break;
837  }
838  case ParaSIGNED_CHAR :
839  {
840  delete [] static_cast<char *>(buffer);
841  break;
842  }
843  case ParaLONG_LONG :
844  {
845  delete [] static_cast<long long *>(buffer);
846  break;
847  }
849  {
850  delete [] static_cast<unsigned long long *>(buffer);
851  break;
852  }
853  case ParaBOOL :
854  {
855  delete [] static_cast<bool *>(buffer);;
856  break;
857  }
858  default :
859  THROW_LOGICAL_ERROR2("This type is not implemented. Type = ", datatypeId);
860  }
861 
862 }
863 
864 bool
866  MessageQueueElement *elem ///< pointer to a message queue element
867  )
868 {
869  if( elem->getDataTypeId() < UG_USER_TYPE_FIRST )
870  {
871  freeMem(elem->getData(), elem->getCount(), elem->getDataTypeId() );
872  }
873  else
874  {
875  switch( elem->getDataTypeId())
876  {
877  case ParaInstanceType:
878  {
879  delete reinterpret_cast<ParaInstance *>(elem->getData());
880  break;
881  }
882  case ParaSolutionType:
883  {
884  delete reinterpret_cast<ParaSolution *>(elem->getData());
885  break;
886  }
887  case ParaParamSetType:
888  {
889  delete reinterpret_cast<ParaParamSet *>(elem->getData());
890  break;
891  }
892  case ParaTaskType:
893  {
894  delete reinterpret_cast<ParaTask *>(elem->getData());
895  break;
896  }
897  case ParaSolverStateType:
898  {
899  delete reinterpret_cast<ParaSolverState *>(elem->getData());
900  break;
901  }
903  {
904  delete reinterpret_cast<ParaCalculationState *>(elem->getData());
905  break;
906  }
908  {
909  delete reinterpret_cast<ParaSolverTerminationState *>(elem->getData());
910  break;
911  }
913  {
914  delete reinterpret_cast<ParaRacingRampUpParamSet *>(elem->getData());
915  break;
916  }
917  default:
918  {
919  return false;
920  }
921  }
922  }
923  return true;
924 }
925 
926 bool
928  )
929 {
930  // std::cout << "size = " << sizeof(tagStringTable)/sizeof(char*) << ", N_TH_TAGS = " << N_TH_TAGS << std::endl;
931  return ( sizeof(tagStringTable)/sizeof(char*) == N_TH_TAGS );
932 }
933 
934 const char *
936  int tag /// tag to be converted to string
937  )
938 {
939  assert( tag >= 0 && tag < N_TH_TAGS );
940  return tagStringTable[tag];
941 }
942 
943 int
945  void* buffer,
946  int count,
947  const int datatypeId,
948  int root
949  )
950 {
951  if( getRank() == root )
952  {
953  for(int i=0; i < comSize; i++)
954  {
955  if( i != root )
956  {
957  send(buffer, count, datatypeId, i, -1);
958  }
959  }
960  }
961  else
962  {
963  receive(buffer, count, datatypeId, root, -1);
964  }
965  return 0;
966 }
967 
968 int
970  void* buffer,
971  int count,
972  const int datatypeId,
973  int dest,
974  const int tag
975  )
976 {
977  LOCKED ( &queueLock[dest] )
978  {
979  messageQueueTable[dest]->enqueue(&sentMsg[dest],&sentMessage[dest],
980  new MessageQueueElement(getRank(), count, datatypeId, tag,
981  allocateMemAndCopy(buffer, count, datatypeId) ) );
982  }
983  TAG_TRACE (Send, To, dest, tag);
984  return 0;
985 }
986 
987 int
989  void* buffer,
990  int count,
991  const int datatypeId,
992  int source,
993  const int tag
994  )
995 {
996  int qRank = getRank();
997  MessageQueueElement *elem = 0;
998  if( !messageQueueTable[qRank]->checkElement(source, datatypeId, tag) )
999  {
1000  messageQueueTable[qRank]->waitMessage(&sentMsg[qRank], &sentMessage[qRank], source, datatypeId, tag);
1001  }
1002  LOCKED ( &queueLock[qRank] )
1003  {
1004  elem = messageQueueTable[qRank]->extarctElement(&sentMessage[qRank],source, datatypeId, tag);
1005  }
1006  assert(elem);
1007  copy( buffer, elem->getData(), count, datatypeId );
1008  freeMem(elem->getData(), count, datatypeId );
1009  delete elem;
1010  TAG_TRACE (Recv, From, source, tag);
1011  return 0;
1012 }
1013 
1014 void
1016  const int source,
1017  const int tag,
1018  int *receivedTag
1019  )
1020 {
1021  /*
1022  // Just wait, iProbe and receive will be performed after this call
1023  messageQueueTable[getRank()]->waitMessage(source, datatypeId, tag);
1024  TAG_TRACE (Probe, From, source, tag);
1025  return 0;
1026  */
1027  int qRank = getRank();
1028  // LOCKED ( &queueLock[getRank()] )
1029  // {
1030  (*receivedTag) = tag;
1031  messageQueueTable[qRank]->waitMessage(&sentMsg[qRank], &sentMessage[qRank], source, receivedTag);
1032  // }
1033  TAG_TRACE (Probe, From, source, *receivedTag);
1034  return;
1035 }
1036 
1037 bool
1039  int* source,
1040  int* tag
1041  )
1042 {
1043  int qRank = getRank();
1044  messageQueueTable[qRank]->waitMessage(&sentMsg[qRank], &sentMessage[qRank]);
1045  MessageQueueElement *elem = messageQueueTable[qRank]->getHead();
1046  *source = elem->getSource();
1047  *tag = elem->getTag();
1048  TAG_TRACE (Probe, From, *source, *tag);
1049  return true;
1050 }
1051 
1052 bool
1054  int* source,
1055  int* tag
1056  )
1057 {
1058  bool flag = false;
1059  int qRank = getRank();
1060  LOCKED ( &queueLock[qRank] )
1061  {
1062  if( *tag == TagAny )
1063  {
1064  flag = !(messageQueueTable[qRank]->isEmpty());
1065  if( flag )
1066  {
1067  MessageQueueElement *elem = messageQueueTable[qRank]->getHead();
1068  *source = elem->getSource();
1069  *tag = elem->getTag();
1070  TAG_TRACE (Iprobe, From, *source, *tag);
1071  }
1072  }
1073  else
1074  {
1076  if( elem )
1077  {
1078  *source = elem->getSource();
1079  *tag = elem->getTag();
1080  TAG_TRACE (Iprobe, From, *source, *tag);
1081  flag = true;
1082  }
1083  }
1084  }
1085  return flag;
1086 }
1087 
1088 int
1090  void* buffer,
1091  const int datatypeId,
1092  int dest,
1093  const int tag
1094  )
1095 {
1096  LOCKED ( &queueLock[dest] )
1097  {
1098  messageQueueTable[dest]->enqueue(&sentMsg[dest],&sentMessage[dest],
1099  new MessageQueueElement(getRank(), 1, datatypeId, tag, buffer ) );
1100  }
1101  TAG_TRACE (Send, To, dest, tag);
1102  return 0;
1103 }
1104 
1105 int
1107  void** buffer,
1108  const int datatypeId,
1109  int source,
1110  const int tag
1111  )
1112 {
1113  int qRank = getRank();
1114  if( !messageQueueTable[qRank]->checkElement(source, datatypeId, tag) )
1115  {
1116  messageQueueTable[qRank]->waitMessage(&sentMsg[qRank], &sentMessage[qRank], source, datatypeId, tag);
1117  }
1118  MessageQueueElement *elem = 0;
1119  LOCKED ( &queueLock[qRank] )
1120  {
1121 
1122  elem = messageQueueTable[qRank]->extarctElement(&sentMessage[qRank], source, datatypeId, tag);
1123  }
1124  assert(elem);
1125  *buffer = elem->getData();
1126  delete elem;
1127  TAG_TRACE (Recv, From, source, tag);
1128  return 0;
1129 }
Base class for calculation state.
static const int ParaSHORT
Definition: paraComm.h:65
Condition variable.
std::ostream * getOstream()
getter of tag trace stream of this rank
class ParaSolverState (ParaSolver state object for notification message)
int getDataTypeId()
getter of the data type id
static const int TagNotificationId
Definition: paraTagDef.h:55
#define THROW_LOGICAL_ERROR4(msg1, msg2, msg3, msg4)
Definition: paraDef.h:103
static const int ParaCalculationStateType
static const int ParaCHAR
Definition: paraComm.h:64
int ** token
index 0: token index 1: token color -1: green > 0: yellow ( termination origin solver number ) -2: re...
Definition: paraCommPth.h:710
MessageQueueElement * getHead()
getter of head
This class has solver state to be transferred.
int send(void *bufer, int count, const int datatypeId, int dest, const int tag)
send function for standard ParaData types
static const int TagIncumbentValue
Definition: paraTagDef.h:52
MessageQueueElement * checkElementWithTag(int tag)
check if the specified message with tag exists or nor
static ScipParaParamSet * paraParamSet
Definition: fscip.cpp:74
void setLock(Lock *l)
set Lock from outside.
ParaComm extension for Pthreads communication.
int getRank()
get rank of caller&#39;s thread
static const int TagTerminateRequest
Definition: paraTagDef.h:56
virtual bool tagStringTableIsSetUpCoorectly()
check if tag string table (for debugging) set up correctly
virtual void init(int argc, char **argv)
initializer of this communicator
Definition: paraCommPth.cpp:85
Lock rankGenLock
Definition: paraCommPth.cpp:53
void setToken(int rank, int *inToken)
set received token to this communicator
static const int TagRampUp
Definition: paraTagDef.h:50
static const int TagAny
Definition: paraTagDef.h:44
int uTypeSend(void *bufer, const int datatypeId, int dest, int tag)
User type send for created data type.
Base class for ParaTask.
static const int ParaLONG_LONG
Definition: paraComm.h:68
static const int ParaUNSIGNED_LONG
Definition: paraComm.h:73
static const int HashTableSize
size of thread table : this limits the number of threads
Definition: paraCommPth.h:95
static __thread int localRank
local thread rank
Definition: paraCommPth.h:719
static const int TagHardTimeLimit
Definition: paraTagDef.h:61
void waitUntilRegistered()
wait until thread id is registered to thread table
bool waitToken(int rank)
wait token when UG runs with deterministic mode
Base class of Calculation state in a ParaSolver.
int getCount()
getter of the number of the data type elements
static const int ParaBOOL
Definition: paraComm.h:78
static const int ParaInstanceType
Definition: paraCommCPP11.h:98
void waitSpecTagFromSpecSource(const int source, const int tag, int *receivedTag)
wait function for a specific tag from a specific source coming from
bool passTermToken(int rank)
pass termination token from the rank to the next
static ThreadsTableElement * threadsTable[HashTableSize]
threads table: index is thread rank
Definition: paraCommPth.h:717
virtual ~ParaCommPth()
destructor of this communicator
Lock * queueLock
Lock for synchronization.
Definition: paraCommPth.h:723
Class of MessageQueueTableElement.
static const int ParaUNSIGNED
Definition: paraComm.h:72
Base class for initial statistics collecting class.
Class for message queue element.
ConditionVariable * sentMsg
Condition variable for synchronization.
Definition: paraCommPth.h:724
void passToken(int rank)
pass token to from the rank to the next
static const int ParaLONG
Definition: paraComm.h:67
int getSource()
getter of source rank
static const int ParaLONG_DOUBLE
Definition: paraComm.h:77
void * allocateMemAndCopy(const void *buffer, int count, const int datatypeId)
allocate memory and copy message
static const int ParaDOUBLE
Definition: paraComm.h:76
MessageQueueElement * extarctElement(bool *sentMessage, int source, int datatypeId, int tag)
extracts a message
int comSize
communicator size : number of threads joined in this system
Definition: paraCommPth.h:708
void lcInit(ParaParamSet *paraParamSet)
initializer for LoadCoordinator
static const int TagTaskReceived
Definition: paraTagDef.h:48
static const int TagSolverState
Definition: paraTagDef.h:53
static const int ParaSIGNED_CHAR
Definition: paraComm.h:69
int bcast(void *buffer, int count, const int datatypeId, int root)
broadcast function for standard ParaData types
static const int TagSolution
Definition: paraTagDef.h:51
void enqueue(std::condition_variable &sentMsg, std::mutex &queueLockMutex, bool *sentMessage, MessageQueueElement *newElement)
enqueue a message
static const int TagToken
Definition: paraTagDef.h:63
#define THROW_LOGICAL_ERROR1(msg1)
Definition: paraDef.h:52
static const int ParaSolverTerminationStateType
Lock * tokenAccessLock
lock to access token
Definition: paraCommPth.h:726
static const int TagTrace
Definition: paraParamSet.h:72
This class contains solver termination state which is transferred form Solver to LC.
class ParaSolverTerminationState (Solver termination state in a ParaSolver)
Base class for racing ramp-up parameter set.
ParaSysTimer timer
system timer
Definition: paraCommPth.h:715
static const char * tagStringTable[]
tag name string table
Definition: paraCommPth.h:716
static const int ParaUNSIGNED_CHAR
Definition: paraComm.h:70
class ParaParamSet
Definition: paraParamSet.h:850
class for instance data
Definition: paraInstance.h:50
void registedAllSolvers()
notify that all solvers are registered
void setRank(int r)
setter of this thread rank
#define THROW_LOGICAL_ERROR2(msg1, msg2)
Definition: paraDef.h:69
static const int ParaSolutionType
Definition: paraCommCPP11.h:99
void freeMem(void *buffer, int count, const int datatypeId)
free memory
int uTypeReceive(void **bufer, const int datatypeId, int source, int tag)
User type receive for created data type.
Class that implements a lock. The class wraps around pthread_mutex_t and adds some safeguards...
Definition: paraPthLock.h:82
Class of ThreadsTableElement.
MessageQueueTableElement ** messageQueueTable
message queue table
Definition: paraCommPth.h:721
static const int ParaBYTE
Definition: paraComm.h:79
static const int TagTerminated
Definition: paraTagDef.h:58
#define HashEntry(tid)
Definition: paraCommPth.h:57
int receive(void *bufer, int count, const int datatypeId, int source, const int tag)
receive function for standard ParaData types
void start(void)
start timer
static const int ParaUNSIGNED_LONG_LONG
Definition: paraComm.h:74
#define TAG_TRACE(call, fromTo, sourceDest, tag)
Definition: paraCommCPP11.h:58
static const int TagDiffSubproblem
Definition: paraTagDef.h:49
int getTag()
getter of the message tag
void * getData()
getter of data
#define LOCK_RAII(lck)
Definition: paraPthLock.h:295
static const int ParaFLOAT
Definition: paraComm.h:75
bool freeStandardTypes(MessageQueueElement *elem)
free memory
static const int TagInterruptRequest
Definition: paraTagDef.h:57
#define TAG_STR(tag)
Definition: paraTagDef.h:40
bool tagTraceFlag
indicate if tags are traced or not
Definition: paraCommPth.h:709
bool isEmpty()
check if the queue is empty or not
#define LOCKED(lck)
Definition: paraPthLock.h:292
unsigned int hashCode(pthread_t tid)
get hash code from thread id
void waitMessage(std::condition_variable &sentMsg, std::mutex &queueLockMutex, bool *sentMessage)
wait for a message coming to a queue
static const int TagTask
Definition: paraTagDef.h:47
static const int ParaTaskType
std::ostream * getOstream()
get ostream pointer
bool probe(int *source, int *tag)
probe function which waits a new message
static const int ParaINT
Definition: paraComm.h:66
bool * sentMessage
sent message flag for synchronization
Definition: paraCommPth.h:722
static const int ParaSolverStateType
class ParaRacingRampUpParamSet (parameter set for racing ramp-up)
int getRank()
getter of this thread rank
bool iProbe(int *source, int *tag)
iProbe function which checks if a new message is arrived or not
ThreadsTableElement * getNext()
get next element
Definition: paraCommPth.h:677
static const int ParaUNSIGNED_SHORT
Definition: paraComm.h:71
#define ABORT_LOGICAL_ERROR2(msg1, msg2)
Definition: paraDef.h:78
static const int TagCompletionOfCalculation
Definition: paraTagDef.h:54
static const int ParaParamSetType
void copy(void *dest, const void *src, int count, int datatypeId)
copy message
static const int UG_USER_TYPE_FIRST
user defined transfer data types
Definition: paraCommCPP11.h:97
static const int ParaRacingRampUpParamType
virtual const char * getTagString(int tag)
get Tag string for debugging
void solverReInit(int rank, ParaParamSet *paraParamSet)
reinitializer of a specific Solver
class for solution
Definition: paraSolution.h:53
Base class for solution.
void solverDel(int rank)
delete Solver from this communicator
static const int TagWinner
Definition: paraTagDef.h:60
static const int TagAckCompletion
Definition: paraTagDef.h:62
class ParaTask
Definition: paraTask.h:541
bool getBoolParamValue(int param)
for bool parameters
void solverInit(ParaParamSet *paraParamSet)
initializer for Solvers
Definition: paraCommPth.h:877