Scippy

UG

Ubiquity Generator framework

paraCommCPP11.cpp
Go to the documentation of this file.
1 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2 /* */
3 /* This file is part of the program and software framework */
4 /* UG --- Ubquity Generator Framework */
5 /* */
6 /* Copyright Written by Yuji Shinano <shinano@zib.de>, */
7 /* Copyright (C) 2021 by Zuse Institute Berlin, */
8 /* licensed under LGPL version 3 or later. */
9 /* Commercial licenses are available through <licenses@zib.de> */
10 /* */
11 /* This code is free software; you can redistribute it and/or */
12 /* modify it under the terms of the GNU Lesser General Public License */
13 /* as published by the Free Software Foundation; either version 3 */
14 /* of the License, or (at your option) any later version. */
15 /* */
16 /* This program is distributed in the hope that it will be useful, */
17 /* but WITHOUT ANY WARRANTY; without even the implied warranty of */
18 /* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */
19 /* GNU Lesser General Public License for more details. */
20 /* */
21 /* You should have received a copy of the GNU Lesser General Public License */
22 /* along with this program. If not, see <http://www.gnu.org/licenses/>. */
23 /* */
24 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
25 
26 /**@file paraCommCPP11.cpp
27  * @brief ParaComm extension for C++11 thread communication
28  * @author Yuji Shinano
29  *
30  *
31  *
32  */
33 
34 /*---+----1----+----2----+----3----+----4----+----5----+----6----+----7----+----8----+----9----+----0----+----1----+----2*/
35 
36 
37 #include <cstring>
38 #include <cstdlib>
39 #ifndef _MSC_VER
40 #include <unistd.h>
41 #endif
42 #include "paraCommCPP11.h"
43 #include "paraTask.h"
44 #include "paraSolution.h"
46 #include "paraCalculationState.h"
47 #include "paraSolverState.h"
49 #include "paraInitialStat.h"
50 
51 using namespace UG;
52 
53 std::mutex rankLockMutex;
54 
57 
58 thread_local int
59 ParaCommCPP11::localRank = -1; /*< local thread rank */
60 
61 const char *
75  TAG_STR(TagRacingRampUpParamSets),
80  TAG_STR(TagParaInstance)
81 };
82 
83 void
84 ParaCommCPP11::init( int argc, char **argv )
85 {
86  // don't have to take any lock, because only LoadCoordinator call this function
87 
88  timer.start();
89  comSize = 0;
90 
91  for( int i = 1; i < argc; i++ )
92  {
93  if( strcmp(argv[i], "-sth") == 0 )
94  {
95  i++;
96  if( i < argc )
97  comSize = atoi(const_cast<const char*>(argv[i])); // if -sth 0, then it is considered as use the number of cores system has
98  else
99  {
100  std::cerr << "missing the number of solver threads after parameter '-sth'" << std::endl;
101  exit(1);
102  }
103  }
104  }
105 
106  if( comSize > 0 )
107  {
108  comSize++;
109  }
110  else
111  {
112 
113 #ifdef _MSC_VER
114  SYSTEM_INFO sysinfo;
115  GetSystemInfo(&sysinfo);
116  comSize = sysinfo.dwNumberOfProcessors + 1; //includes logical cpu
117 #else
118  comSize = sysconf(_SC_NPROCESSORS_CONF) + 1;
119 #endif
120  }
121 
122  tokenAccessLockMutex = new std::mutex[comSize];
123  token = new int*[comSize];
124  for( int i = 0; i < comSize; i++ )
125  {
126  token[i] = new int[2];
127  token[i][0] = 0;
128  token[i][1] = -1;
129  }
130 
131  /** if you add tag, you should add tagStringTale too */
132  // assert( sizeof(tagStringTable)/sizeof(char*) == N_TH_TAGS );
133  assert( tagStringTableIsSetUpCoorectly() );
134 
135  /** initialize hashtable */
136  for(int i = 0; i < ThreadTableSize; i++ )
137  {
138  threadsTable[i] = 0;
139  }
140 
141  messageQueueTable = new MessageQueueTableElement *[comSize + 1]; // +1 for TimeLimitMonitor
142  sentMessage = new bool[comSize + 1];
143  queueLockMutex = new std::mutex[comSize + 1];
144  sentMsg = new std::condition_variable[comSize + 1];
145  for( int i = 0; i < ( comSize + 1 ); i++ )
146  {
148  sentMessage[i] = false;
149  }
150 
151 }
152 
153 void
156  )
157 {
158  std::lock_guard<std::mutex> lock(rankLockMutex);
159  assert( localRank == -1 );
160  assert( threadsTable[0] == 0 );
161  localRank = 0;
162  threadsTable[0] = new ThreadsTableElement(0, paraParamSet);
163  tagTraceFlag = paraParamSet->getBoolParamValue(TagTrace);
164 }
165 
166 void
168  int rank,
170  )
171 {
172  std::lock_guard<std::mutex> lock(rankLockMutex);
173  assert( localRank == -1 );
174  assert( threadsTable[rank] == 0 );
175  localRank = rank;
176  threadsTable[localRank] = new ThreadsTableElement(localRank, paraParamSet);
177 }
178 
179 void
181  int rank,
183  )
184 {
185  std::lock_guard<std::mutex> lock(rankLockMutex);
186  assert( localRank == -1 );
187  assert( threadsTable[rank] != 0 );
188  localRank = rank;
189  // threadsTable[localRank] = new ThreadsTableElement(localRank, paraParamSet);
190 }
191 
192 void
194  int rank
195  )
196 {
197  std::lock_guard<std::mutex> lock(rankLockMutex);
198  assert(rank == localRank);
199  if( threadsTable[rank] == 0 )
200  {
201  THROW_LOGICAL_ERROR2("Invalid remove thread. Rank = ", rank);
202  }
203  else
204  {
205  ThreadsTableElement *elem = threadsTable[rank];
206  delete elem;
207  threadsTable[rank] = 0;
208  localRank = -1;
209  }
210 }
211 
212 bool
214  int rank
215  )
216 {
217  // int rank = getRank(); // multi-thread solver may change rank here
218  std::lock_guard<std::mutex> lock(tokenAccessLockMutex[rank]);
219  if( token[rank][0] == rank )
220  {
221  return true;
222  }
223  else
224  {
225  int receivedTag;
226  int source;
227  probe(&source, &receivedTag);
228  TAG_TRACE (Probe, From, source, receivedTag);
229  if( source == 0 && receivedTag == TagToken )
230  {
231  receive(token[rank], 2, ParaINT, 0, TagToken);
232  assert( token[rank][0] == rank );
233  return true;
234  }
235  else
236  {
237  return false;
238  }
239  }
240 }
241 
242 void
244  int rank
245  )
246 {
247  // int rank = getRank(); // multi-thread solver may change rank here
248  std::lock_guard<std::mutex> lock(tokenAccessLockMutex[rank]);
249  assert( token[rank][0] == rank && rank != 0 );
250  token[rank][0] = ( token[rank][0] % (comSize - 1) ) + 1;
251  token[rank][1] = -1;
252  send(token[rank], 2, ParaINT, 0, TagToken);
253 }
254 
255 bool
257  int rank
258  )
259 {
260  // int rank = getRank(); // multi-thread solver may change rank here
261  std::lock_guard<std::mutex> lock(tokenAccessLockMutex[rank]);
262  if( rank == token[rank][0] )
263  {
264  if( token[rank][1] == token[rank][0] ) token[rank][1] = -2;
265  else if( token[rank][1] == -1 ) token[rank][1] = token[rank][0];
266  token[rank][0] = ( token[rank][0] % (comSize - 1) ) + 1;
267  }
268  else
269  {
270  THROW_LOGICAL_ERROR4("Invalid token update. Rank = ", getRank(), ", token = ", token[0] );
271  }
272  send(token[rank], 2, ParaINT, 0, TagToken);
273  if( token[rank][1] == -2 )
274  {
275  return true;
276  }
277  else
278  {
279  return false;
280  }
281 }
282 
283 void
285  int rank,
286  int *inToken
287  )
288 {
289  // int rank = getRank();
290  std::lock_guard<std::mutex> lock(tokenAccessLockMutex[rank]);
291  assert( rank == 0 || ( rank != 0 && inToken[0] == rank ) );
292  token[rank][0] = inToken[0];
293  token[rank][1] = inToken[1];
294 }
295 
296 
297 
299 {
300  std::lock_guard<std::mutex> lock(rankLockMutex);
301  for(int i = 0; i < ThreadTableSize; i++ )
302  {
303  if( threadsTable[i] )
304  {
305  delete threadsTable[i];
306  }
307  }
308 
309  for( int i = 0; i < comSize; i++ )
310  {
311  delete [] token[i];
312  }
313  delete [] token;
314  delete [] tokenAccessLockMutex;
315 
316  for(int i = 0; i < (comSize + 1); i++)
317  {
319  while( elem )
320  {
321  if( elem->getData() )
322  {
323  if( !freeStandardTypes(elem) )
324  {
325  ABORT_LOGICAL_ERROR2("Requested type is not implemented. Type = ", elem->getDataTypeId() );
326  }
327  }
328  delete elem;
330  }
331  delete messageQueueTable[i];
332  }
333  delete [] messageQueueTable;
334 
335  if( sentMessage ) delete [] sentMessage;
336  if( queueLockMutex ) delete [] queueLockMutex;
337  if( sentMsg ) delete [] sentMsg;
338 
339 }
340 
341 int
343  )
344 {
345  std::lock_guard<std::mutex> lock(rankLockMutex);
346  if( localRank >= 0 ) return localRank;
347  else return -1; // No ug threads
348 }
349 
350 std::ostream *
352  )
353 {
354  std::lock_guard<std::mutex> lock(rankLockMutex);
355  if( !threadsTable[localRank] ) return 0;
356  // assert( threadsTable[localRank] );
357  return threadsTable[localRank]->getOstream();
358 }
359 
360 void *
362  const void* buffer,
363  int count,
364  const int datatypeId
365  )
366 {
367  void *newBuf = 0;
368  if( count == 0 ) return newBuf;
369 
370  switch(datatypeId)
371  {
372  case ParaCHAR :
373  {
374  newBuf = new char[count];
375  memcpy(newBuf, buffer, (unsigned long int)sizeof(char)*count);
376  break;
377  }
378  case ParaSHORT :
379  {
380  newBuf = new short[count];
381  memcpy(newBuf, buffer, (unsigned long int)sizeof(short)*count);
382  break;
383  }
384  case ParaINT :
385  {
386  newBuf = new int[count];
387  memcpy(newBuf, buffer, (unsigned long int)sizeof(int)*count);
388  break;
389  }
390  case ParaLONG :
391  {
392  newBuf = new long[count];
393  memcpy(newBuf, buffer, (unsigned long int)sizeof(long)*count);
394  break;
395  }
396  case ParaUNSIGNED_CHAR :
397  {
398  newBuf = new unsigned char[count];
399  memcpy(newBuf, buffer, (unsigned long int)sizeof(unsigned char)*count);
400  break;
401  }
402  case ParaUNSIGNED_SHORT :
403  {
404  newBuf = new unsigned short[count];
405  memcpy(newBuf, buffer, (unsigned long int)sizeof(unsigned short)*count);
406  break;
407  }
408  case ParaUNSIGNED :
409  {
410  newBuf = new unsigned int[count];
411  memcpy(newBuf, buffer, (unsigned long int)sizeof(unsigned int)*count);
412  break;
413  }
414  case ParaUNSIGNED_LONG :
415  {
416  newBuf = new unsigned long[count];
417  memcpy(newBuf, buffer, (unsigned long int)sizeof(unsigned long)*count);
418  break;
419  }
420  case ParaFLOAT :
421  {
422  newBuf = new float[count];
423  memcpy(newBuf, buffer, (unsigned long int)sizeof(float)*count);
424  break;
425  }
426  case ParaDOUBLE :
427  {
428  newBuf = new double[count];
429  memcpy(newBuf, buffer, (unsigned long int)sizeof(double)*count);
430  break;
431  }
432  case ParaLONG_DOUBLE :
433  {
434  newBuf = new long double[count];
435  memcpy(newBuf, buffer, (unsigned long int)sizeof(long double)*count);
436  break;
437  }
438  case ParaBYTE :
439  {
440  newBuf = new char[count];
441  memcpy(newBuf, buffer, (unsigned long int)sizeof(char)*count);
442  break;
443  }
444  case ParaSIGNED_CHAR :
445  {
446  newBuf = new char[count];
447  memcpy(newBuf, buffer, (unsigned long int)sizeof(char)*count);
448  break;
449  }
450  case ParaLONG_LONG :
451  {
452  newBuf = new long long[count];
453  memcpy(newBuf, buffer, (unsigned long int)sizeof(long long)*count);
454  break;
455  }
457  {
458  newBuf = new unsigned long long[count];
459  memcpy(newBuf, buffer, (unsigned long int)sizeof(unsigned long long)*count);
460  break;
461  }
462  case ParaBOOL :
463  {
464  newBuf = new bool[count];
465  memcpy(newBuf, buffer, (unsigned long int)sizeof(bool)*count);
466  break;
467  }
468  default :
469  THROW_LOGICAL_ERROR2("This type is not implemented. Type = ", datatypeId);
470  }
471 
472  return newBuf;
473 }
474 
475 void
477  void *dest, const void *src, int count, int datatypeId
478  )
479 {
480 
481  if( count == 0 ) return;
482 
483  switch(datatypeId)
484  {
485  case ParaCHAR :
486  {
487  memcpy(dest, src, (unsigned long int)sizeof(char)*count);
488  break;
489  }
490  case ParaSHORT :
491  {
492  memcpy(dest, src, (unsigned long int)sizeof(short)*count);
493  break;
494  }
495  case ParaINT :
496  {
497  memcpy(dest, src, (unsigned long int)sizeof(int)*count);
498  break;
499  }
500  case ParaLONG :
501  {
502  memcpy(dest, src, (unsigned long int)sizeof(long)*count);
503  break;
504  }
505  case ParaUNSIGNED_CHAR :
506  {
507  memcpy(dest, src, (unsigned long int)sizeof(unsigned char)*count);
508  break;
509  }
510  case ParaUNSIGNED_SHORT :
511  {
512  memcpy(dest, src, (unsigned long int)sizeof(unsigned short)*count);
513  break;
514  }
515  case ParaUNSIGNED :
516  {
517  memcpy(dest, src, (unsigned long int)sizeof(unsigned int)*count);
518  break;
519  }
520  case ParaUNSIGNED_LONG :
521  {
522  memcpy(dest, src, (unsigned long int)sizeof(unsigned long)*count);
523  break;
524  }
525  case ParaFLOAT :
526  {
527  memcpy(dest, src, (unsigned long int)sizeof(float)*count);
528  break;
529  }
530  case ParaDOUBLE :
531  {
532  memcpy(dest, src, (unsigned long int)sizeof(double)*count);
533  break;
534  }
535  case ParaLONG_DOUBLE :
536  {
537  memcpy(dest, src, (unsigned long int)sizeof(long double)*count);
538  break;
539  }
540  case ParaBYTE :
541  {
542  memcpy(dest, src, (unsigned long int)sizeof(char)*count);
543  break;
544  }
545  case ParaSIGNED_CHAR :
546  {
547  memcpy(dest, src, (unsigned long int)sizeof(char)*count);
548  break;
549  }
550  case ParaLONG_LONG :
551  {
552  memcpy(dest, src, (unsigned long int)sizeof(long long)*count);
553  break;
554  }
556  {
557  memcpy(dest, src, (unsigned long int)sizeof(unsigned long long)*count);
558  break;
559  }
560  case ParaBOOL :
561  {
562  memcpy(dest, src, (unsigned long int)sizeof(bool)*count);
563  break;
564  }
565  default :
566  THROW_LOGICAL_ERROR2("This type is not implemented. Type = ", datatypeId);
567  }
568 
569 }
570 
571 void
573  void* buffer,
574  int count,
575  const int datatypeId
576  )
577 {
578 
579  if( count == 0 ) return;
580 
581  switch(datatypeId)
582  {
583  case ParaCHAR :
584  {
585  delete [] static_cast<char *>(buffer);
586  break;
587  }
588  case ParaSHORT :
589  {
590  delete [] static_cast<short *>(buffer);
591  break;
592  }
593  case ParaINT :
594  {
595  delete [] static_cast<int *>(buffer);
596  break;
597  }
598  case ParaLONG :
599  {
600  delete [] static_cast<long *>(buffer);
601  break;
602  }
603  case ParaUNSIGNED_CHAR :
604  {
605  delete [] static_cast<unsigned char *>(buffer);
606  break;
607  }
608  case ParaUNSIGNED_SHORT :
609  {
610  delete [] static_cast<unsigned short *>(buffer);
611  break;
612  }
613  case ParaUNSIGNED :
614  {
615  delete [] static_cast<unsigned int *>(buffer);
616  break;
617  }
618  case ParaUNSIGNED_LONG :
619  {
620  delete [] static_cast<unsigned long *>(buffer);
621  break;
622  }
623  case ParaFLOAT :
624  {
625  delete [] static_cast<float *>(buffer);
626  break;
627  }
628  case ParaDOUBLE :
629  {
630  delete [] static_cast<double *>(buffer);
631  break;
632  }
633  case ParaLONG_DOUBLE :
634  {
635  delete [] static_cast<long double *>(buffer);
636  break;
637  }
638  case ParaBYTE :
639  {
640  delete [] static_cast<char *>(buffer);
641  break;
642  }
643  case ParaSIGNED_CHAR :
644  {
645  delete [] static_cast<char *>(buffer);
646  break;
647  }
648  case ParaLONG_LONG :
649  {
650  delete [] static_cast<long long *>(buffer);
651  break;
652  }
654  {
655  delete [] static_cast<unsigned long long *>(buffer);
656  break;
657  }
658  case ParaBOOL :
659  {
660  delete [] static_cast<bool *>(buffer);;
661  break;
662  }
663  default :
664  THROW_LOGICAL_ERROR2("This type is not implemented. Type = ", datatypeId);
665  }
666 
667 }
668 
669 bool
671  MessageQueueElement *elem ///< pointer to a message queue element
672  )
673 {
674  if( elem->getDataTypeId() < UG_USER_TYPE_FIRST )
675  {
676  freeMem(elem->getData(), elem->getCount(), elem->getDataTypeId() );
677  }
678  else
679  {
680  switch( elem->getDataTypeId())
681  {
682  case ParaInstanceType:
683  {
684  delete reinterpret_cast<ParaInstance *>(elem->getData());
685  break;
686  }
687  case ParaSolutionType:
688  {
689  delete reinterpret_cast<ParaSolution *>(elem->getData());
690  break;
691  }
692  case ParaParamSetType:
693  {
694  delete reinterpret_cast<ParaParamSet *>(elem->getData());
695  break;
696  }
697  case ParaTaskType:
698  {
699  delete reinterpret_cast<ParaTask *>(elem->getData());
700  break;
701  }
702  case ParaSolverStateType:
703  {
704  delete reinterpret_cast<ParaSolverState *>(elem->getData());
705  break;
706  }
708  {
709  delete reinterpret_cast<ParaCalculationState *>(elem->getData());
710  break;
711  }
713  {
714  delete reinterpret_cast<ParaSolverTerminationState *>(elem->getData());
715  break;
716  }
718  {
719  delete reinterpret_cast<ParaRacingRampUpParamSet *>(elem->getData());
720  break;
721  }
722  default:
723  {
724  return false;
725  }
726  }
727  }
728  return true;
729 }
730 
731 bool
733  )
734 {
735  // std::cout << "size = " << sizeof(tagStringTable)/sizeof(char*) << ", N_TH_TAGS = " << N_TH_TAGS << std::endl;
736  return ( sizeof(tagStringTable)/sizeof(char*) == N_TH_TAGS );
737 }
738 
739 const char *
741  int tag /// tag to be converted to string
742  )
743 {
744  assert( tag >= 0 && tag < N_TH_TAGS );
745  return tagStringTable[tag];
746 }
747 
748 
749 int
751  void* buffer,
752  int count,
753  const int datatypeId,
754  int root
755  )
756 {
757  if( getRank() == root )
758  {
759  for(int i=0; i < comSize; i++)
760  {
761  if( i != root )
762  {
763  send(buffer, count, datatypeId, i, -1);
764  }
765  }
766  }
767  else
768  {
769  receive(buffer, count, datatypeId, root, -1);
770  }
771  return 0;
772 }
773 
774 int
776  void* buffer,
777  int count,
778  const int datatypeId,
779  int dest,
780  const int tag
781  )
782 {
783  {
784  std::lock_guard<std::mutex> lock(queueLockMutex[dest]);
785  messageQueueTable[dest]->enqueue(sentMsg[dest], queueLockMutex[dest], &sentMessage[dest],
786  new MessageQueueElement(getRank(), count, datatypeId, tag,
787  allocateMemAndCopy(buffer, count, datatypeId) ) );
788  }
789  TAG_TRACE (Send, To, dest, tag);
790  return 0;
791 }
792 
793 int
795  void* buffer,
796  int count,
797  const int datatypeId,
798  int source,
799  const int tag
800  )
801 {
802  int qRank = getRank();
803  MessageQueueElement *elem = 0;
804  if( !messageQueueTable[qRank]->checkElement(source, datatypeId, tag) )
805  {
806  messageQueueTable[qRank]->waitMessage(sentMsg[qRank], queueLockMutex[qRank], &sentMessage[qRank], source, datatypeId, tag);
807  }
808  {
809  std::lock_guard<std::mutex> lock(queueLockMutex[qRank]);
810  elem = messageQueueTable[qRank]->extarctElement(&sentMessage[qRank],source, datatypeId, tag);
811  }
812  assert(elem);
813  copy( buffer, elem->getData(), count, datatypeId );
814  freeMem(elem->getData(), count, datatypeId );
815  delete elem;
816  TAG_TRACE (Recv, From, source, tag);
817  return 0;
818 }
819 
820 void
822  const int source,
823  const int tag,
824  int *receivedTag
825  )
826 {
827  /*
828  // Just wait, iProbe and receive will be performed after this call
829  messageQueueTable[getRank()]->waitMessage(source, datatypeId, tag);
830  TAG_TRACE (Probe, From, source, tag);
831  return 0;
832  */
833  int qRank = getRank();
834  // LOCKED ( &queueLock[getRank()] )
835  // {
836  // messageQueueTable[qRank]->waitMessage(sentMsg[qRank], queueLockMutex[qRank], &sentMessage[qRank], source, receivedTag);
837  // }
838  (*receivedTag) = tag;
839  messageQueueTable[qRank]->waitMessage(sentMsg[qRank], queueLockMutex[qRank], &sentMessage[qRank], source, receivedTag);
840  TAG_TRACE (Probe, From, source, *receivedTag);
841  return;
842 }
843 
844 bool
846  int* source,
847  int* tag
848  )
849 {
850  int qRank = getRank();
851  messageQueueTable[qRank]->waitMessage(sentMsg[qRank], queueLockMutex[qRank], &sentMessage[qRank]);
853  *source = elem->getSource();
854  *tag = elem->getTag();
855  TAG_TRACE (Probe, From, *source, *tag);
856  return true;
857 }
858 
859 bool
861  int* source,
862  int* tag
863  )
864 {
865  bool flag = false;
866  int qRank = getRank();
867  {
868  std::lock_guard<std::mutex> lock(queueLockMutex[qRank]);
869  flag = !(messageQueueTable[qRank]->isEmpty());
870  if( flag )
871  {
872  if( *tag == TagAny )
873  {
875  *source = elem->getSource();
876  *tag = elem->getTag();
877  TAG_TRACE (Iprobe, From, *source, *tag);
878  }
879  else
880  {
882  if( elem )
883  {
884  *source = elem->getSource();
885  *tag = elem->getTag();
886  TAG_TRACE (Iprobe, From, *source, *tag);
887  // flag = true;
888  }
889  else
890  {
891  flag = false;
892  }
893  }
894  }
895  }
896  return flag;
897 }
898 
899 int
901  void* buffer,
902  const int datatypeId,
903  int dest,
904  const int tag
905  )
906 {
907  {
908  std::lock_guard<std::mutex> lock(queueLockMutex[dest]);
909  messageQueueTable[dest]->enqueue(sentMsg[dest], queueLockMutex[dest], &sentMessage[dest],
910  new MessageQueueElement(getRank(), 1, datatypeId, tag, buffer ) );
911  }
912  TAG_TRACE (Send, To, dest, tag);
913  return 0;
914 }
915 
916 int
918  void** buffer,
919  const int datatypeId,
920  int source,
921  const int tag
922  )
923 {
924  int qRank = getRank();
925  if( !messageQueueTable[qRank]->checkElement(source, datatypeId, tag) )
926  {
927  messageQueueTable[qRank]->waitMessage(sentMsg[qRank], queueLockMutex[qRank], &sentMessage[qRank], source, datatypeId, tag);
928  }
929  MessageQueueElement *elem = 0;
930  {
931  std::lock_guard<std::mutex> lock(queueLockMutex[qRank]);
932  elem = messageQueueTable[qRank]->extarctElement(&sentMessage[qRank], source, datatypeId, tag);
933  }
934  assert(elem);
935  *buffer = elem->getData();
936  delete elem;
937  TAG_TRACE (Recv, From, source, tag);
938  return 0;
939 }
Base class for calculation state.
static const int ParaSHORT
Definition: paraComm.h:65
std::ostream * getOstream()
getter of tag trace stream of this rank
class ParaSolverState (ParaSolver state object for notification message)
int getDataTypeId()
getter of the data type id
static const int TagNotificationId
Definition: paraTagDef.h:55
virtual void setToken(int rank, int *inToken)
set received token to this communicator
#define THROW_LOGICAL_ERROR4(msg1, msg2, msg3, msg4)
Definition: paraDef.h:103
static const int ParaCalculationStateType
static const int ParaCHAR
Definition: paraComm.h:64
MessageQueueElement * getHead()
getter of head
This class has solver state to be transferred.
static const int TagIncumbentValue
Definition: paraTagDef.h:52
MessageQueueElement * checkElementWithTag(int tag)
check if the specified message with tag exists or nor
static ScipParaParamSet * paraParamSet
Definition: fscip.cpp:74
virtual void solverReInit(int rank, ParaParamSet *paraParamSet)
reinitializer of a specific Solver
static const int TagTerminateRequest
Definition: paraTagDef.h:56
int comSize
communicator size : number of threads joined in this system
static const int TagRampUp
Definition: paraTagDef.h:50
static const int TagAny
Definition: paraTagDef.h:44
Base class for ParaTask.
static const int ParaLONG_LONG
Definition: paraComm.h:68
static const int ParaUNSIGNED_LONG
Definition: paraComm.h:73
virtual bool tagStringTableIsSetUpCoorectly()
check if tag string table (for debugging) set up correctly
static const int TagHardTimeLimit
Definition: paraTagDef.h:61
void * allocateMemAndCopy(const void *buffer, int count, const int datatypeId)
allocate memory and copy message
virtual void passToken(int rank)
pass token to from the rank to the next
Base class of Calculation state in a ParaSolver.
static thread_local int localRank
local thread rank
int getCount()
getter of the number of the data type elements
static const int ParaBOOL
Definition: paraComm.h:78
static const int ParaInstanceType
Definition: paraCommCPP11.h:98
virtual ~ParaCommCPP11()
destructor of this communicator
virtual bool waitToken(int rank)
wait token when UG runs with deterministic mode
bool iProbe(int *source, int *tag)
iProbe function which checks if a new message is arrived or not
bool freeStandardTypes(MessageQueueElement *elem)
free memory
Class of MessageQueueTableElement.
void freeMem(void *buffer, int count, const int datatypeId)
free memory
static const int ParaUNSIGNED
Definition: paraComm.h:72
Base class for initial statistics collecting class.
Class for message queue element.
int uTypeReceive(void **bufer, const int datatypeId, int source, int tag)
User type receive for created data type.
std::mutex rankLockMutex
mutex to access rank
std::ostream * getOstream()
get ostream pointer
static const int ParaLONG
Definition: paraComm.h:67
int getSource()
getter of source rank
static const int ParaLONG_DOUBLE
Definition: paraComm.h:77
int bcast(void *buffer, int count, const int datatypeId, int root)
broadcast function for standard ParaData types
static const int ParaDOUBLE
Definition: paraComm.h:76
MessageQueueElement * extarctElement(bool *sentMessage, int source, int datatypeId, int tag)
extracts a message
static ThreadsTableElement * threadsTable[ThreadTableSize]
threads table: index is thread rank
static const int TagTaskReceived
Definition: paraTagDef.h:48
static const int TagSolverState
Definition: paraTagDef.h:53
int ** token
index 0: token index 1: token color -1: green > 0: yellow ( termination origin solver number ) -2: re...
static const int ParaSIGNED_CHAR
Definition: paraComm.h:69
static const int TagSolution
Definition: paraTagDef.h:51
void enqueue(std::condition_variable &sentMsg, std::mutex &queueLockMutex, bool *sentMessage, MessageQueueElement *newElement)
enqueue a message
static const int TagToken
Definition: paraTagDef.h:63
int getRank()
get rank of caller&#39;s thread
static const int ParaSolverTerminationStateType
static const int TagTrace
Definition: paraParamSet.h:72
This class contains solver termination state which is transferred form Solver to LC.
std::condition_variable * sentMsg
condition variable for synchronization
class ParaSolverTerminationState (Solver termination state in a ParaSolver)
Base class for racing ramp-up parameter set.
virtual void lcInit(ParaParamSet *paraParamSet)
initializer for LoadCoordinator
static const int ParaUNSIGNED_CHAR
Definition: paraComm.h:70
virtual void solverInit(ParaParamSet *paraParamSet)
initializer for Solvers
class ParaParamSet
Definition: paraParamSet.h:850
class for instance data
Definition: paraInstance.h:50
ParaSysTimer timer
system timer
#define THROW_LOGICAL_ERROR2(msg1, msg2)
Definition: paraDef.h:69
static const int ParaSolutionType
Definition: paraCommCPP11.h:99
std::mutex rankLockMutex
Class of ThreadsTableElement.
static const int ParaBYTE
Definition: paraComm.h:79
void copy(void *dest, const void *src, int count, int datatypeId)
copy message
static const int TagTerminated
Definition: paraTagDef.h:58
virtual const char * getTagString(int tag)
get Tag string for debugging
int send(void *bufer, int count, const int datatypeId, int dest, const int tag)
send function for standard ParaData types
void start(void)
start timer
static const int ParaUNSIGNED_LONG_LONG
Definition: paraComm.h:74
#define TAG_TRACE(call, fromTo, sourceDest, tag)
Definition: paraCommCPP11.h:58
static const int TagDiffSubproblem
Definition: paraTagDef.h:49
int uTypeSend(void *bufer, const int datatypeId, int dest, int tag)
User type send for created data type.
int getTag()
getter of the message tag
void * getData()
getter of data
static const int ParaFLOAT
Definition: paraComm.h:75
int receive(void *bufer, int count, const int datatypeId, int source, const int tag)
receive function for standard ParaData types
static const int TagInterruptRequest
Definition: paraTagDef.h:57
#define TAG_STR(tag)
Definition: paraTagDef.h:40
bool isEmpty()
check if the queue is empty or not
virtual void solverDel(int rank)
delete Solver from this communicator
void waitMessage(std::condition_variable &sentMsg, std::mutex &queueLockMutex, bool *sentMessage)
wait for a message coming to a queue
bool tagTraceFlag
indicate if tags are traced or not
static const int TagTask
Definition: paraTagDef.h:47
static const int ParaTaskType
static const int ParaINT
Definition: paraComm.h:66
virtual bool passTermToken(int rank)
pass termination token from the rank to the next
MessageQueueTableElement ** messageQueueTable
message queue table
ParaComm extension for C++11 thread communication.
virtual void init(int argc, char **argv)
initializer of this communicator
static const int ParaSolverStateType
class ParaRacingRampUpParamSet (parameter set for racing ramp-up)
std::mutex * queueLockMutex
mutex for synchronization
static const int ParaUNSIGNED_SHORT
Definition: paraComm.h:71
#define ABORT_LOGICAL_ERROR2(msg1, msg2)
Definition: paraDef.h:78
static const int TagCompletionOfCalculation
Definition: paraTagDef.h:54
bool * sentMessage
sent message flag for synchronization
static const int ThreadTableSize
size of thread table : this limits the number of threads
static const int ParaParamSetType
static const char * tagStringTable[]
tag name string table
void waitSpecTagFromSpecSource(const int source, const int tag, int *receivedTag)
wait function for a specific tag from a specific source coming from
static const int UG_USER_TYPE_FIRST
user defined transfer data types
Definition: paraCommCPP11.h:97
static const int ParaRacingRampUpParamType
class for solution
Definition: paraSolution.h:53
Base class for solution.
std::mutex * tokenAccessLockMutex
mutex to access token
bool probe(int *source, int *tag)
probe function which waits a new message
static const int TagWinner
Definition: paraTagDef.h:60
static const int TagAckCompletion
Definition: paraTagDef.h:62
class ParaTask
Definition: paraTask.h:541
bool getBoolParamValue(int param)
for bool parameters