37 #ifndef __PARA_COMM_CPP11_H__ 38 #define __PARA_COMM_CPP11_H__ 42 #include <condition_variable> 58 #define TAG_TRACE( call, fromTo, sourceDest, tag ) \ 66 *getOstream() << timer.getRTimeInterval() << " [Rank = " << getRank() << "] " << #call << " " << #fromTo \ 67 << " " << sourceDest << " with Tag = " << getTagString(tag) << std::endl; \ 72 *getOstream() << timer.getRTimeInterval() << " [Rank = " << getRank() << "] " << #call << " " << #fromTo \ 73 << " " << sourceDest << " as broadcast" << std::endl; \ 80 std::cout << "### Before creating ostream, Rank = " << getRank() << " issues TAG_TRACE with " << #call << " " << #fromTo \ 81 << " " << sourceDest << " with Tag = " << getTagString(tag) << std::endl; \ 85 std::cout << "### Before creating ostream, Rank = " << getRank() << " issues TAG_TRACE with " << #call << " " << #fromTo \ 86 << " " << sourceDest << " as broadcast" << std::endl; \ 157 ) : source(inSource), count(inCount), dataTypeId(inDataTypeId), tag(inTag), data(inData), next(0)
256 ) : head(0), tail(0), size(0)
287 if( current->getSource() == source
288 && current->getDataTypeId() == datatypeId
289 && current->getTag() ==
tag )
309 if( current->getTag() ==
tag )
342 if( current == head )
344 if( current == tail )
348 *sentMessage =
false;
357 if( current == tail )
385 if( head == 0 )
return 0;
390 if( current == tail )
394 *sentMessage =
false;
408 std::condition_variable& sentMsg,
409 std::mutex& queueLockMutex,
416 head = tail = newElement;
420 tail->
link(newElement);
425 sentMsg.notify_one();
456 if( head ) empty =
false;
457 assert( !empty || size == 0 );
465 std::condition_variable& sentMsg,
466 std::mutex& queueLockMutex,
470 std::unique_lock<std::mutex> lk(queueLockMutex);
471 sentMsg.wait(lk, [&sentMessage]{
return (*sentMessage ==
true);});
478 std::condition_variable& sentMsg,
479 std::mutex& queueLockMutex,
487 std::unique_lock<std::mutex> lk(queueLockMutex);
488 sentMsg.wait(lk, [&sentMessage]{
return (*sentMessage ==
true);});
501 *sentMessage =
false;
509 std::condition_variable& sentMsg,
510 std::mutex& queueLockMutex,
519 std::unique_lock<std::mutex> lk(queueLockMutex);
520 sentMsg.wait(lk, [&sentMessage]{
return (*sentMessage ==
true);});
534 *sentMessage =
false;
542 std::condition_variable& sentMsg,
543 std::mutex& queueLockMutex,
551 std::unique_lock<std::mutex> lk(queueLockMutex);
552 sentMsg.wait(lk, [&sentMessage]{
return (*sentMessage ==
true);});
573 && current->
getTag() == (*tag) )
581 *sentMessage =
false;
626 std::ostringstream s;
628 std::ofstream *ofs =
new std::ofstream();
629 ofs->open(s.str().c_str());
643 if( tos != &std::cout )
645 std::ofstream *ofs =
dynamic_cast<std::ofstream *
>(tos);
720 void *allocateMemAndCopy(
749 bool freeStandardTypes(
773 messageQueueTable(0),
777 tokenAccessLockMutex(0)
844 return messageQueueTable[dest]->
getSize();
848 int nTotalMessages = 0;
849 for(
int i = 0; i < ( comSize + 1 ); i++ )
851 nTotalMessages += messageQueueTable[i]->
getSize();
853 return nTotalMessages;
876 virtual void solverInit(
884 virtual void solverReInit(
892 virtual void solverDel(
920 virtual bool waitToken(
927 virtual void passToken(
935 virtual bool passTermToken(
942 virtual void setToken(
951 std::ostream *getOstream(
959 rankLockMutex.lock();
967 rankLockMutex.unlock();
976 applicationLockMutex.lock();
989 applicationLockMutex.lock();
990 std::cout <<
"### Rank" << this->getRank() <<
":AppLocked! [" << f <<
", " << l <<
"]" << std::endl;
999 applicationLockMutex.unlock();
1012 applicationLockMutex.unlock();
1013 std::cout <<
"### Rank" << this->getRank() <<
":AppUnlocked! [" << f <<
", " << l <<
"]" << std::endl;
1025 rankLockMutex.unlock();
1026 std::cout <<
"Rank Unlocked: " << f <<
", " << l << std::endl;
1046 const int datatypeId,
1057 const int datatypeId,
1069 const int datatypeId,
1078 void waitSpecTagFromSpecSource(
1108 const int datatypeId,
1119 const int datatypeId,
1134 #define DEF_PARA_COMM( para_comm, comm ) ParaCommCPP11 *para_comm = dynamic_cast< ParaCommCPP11* >(comm) 1135 #define LOCK_APP( comm ) comm->lockApp(__FILE__, __LINE__) 1136 #define UNLOCK_APP( comm ) comm->lockApp(__FILE__, __LINE__) 1137 #define LOCK_RANK( comm ) comm->lockRank(__FILE__, __LINE__) 1138 #define UNLOCK_RANK( comm ) comm->unlockRank(__FILE__, __LINE__) 1141 #endif // __PARA_COMM_CPP11_H__ int count
number of the data type elements
~MessageQueueElement()
destructor of MessageQueueElement
static const char * tagStringTable[]
mutex for interrupt message monitor
std::ostream * getOstream()
getter of tag trace stream of this rank
class ParaSolverState (ParaSolver state object for notification message)
int getDataTypeId()
getter of the data type id
static const int ParaCalculationStateType
MessageQueueElement * getHead()
getter of head
std::mutex applicationLockMutex
mutex for applications
ParaTimer extension for threads.
MessageQueueElement * extarctElement(bool *sentMessage)
extracts a message (This method is only for desctructor of ParaCommCPP11. No lock is necessary...
MessageQueueElement * checkElementWithTag(int tag)
check if the specified message with tag exists or nor
static ScipParaParamSet * paraParamSet
MessageQueueElement()
default constructor of MessageQueueElement
virtual void setLocalRank(int inRank)
void unlockRank(char const *f, int l)
unlock rank (for debugging)
ParaTimer * createParaTimer()
create ParaTimer object
void * data
data of the message
int comSize
communicator size : number of threads joined in this system
MessageQueueElement * tail
tail of the message queue
virtual void lockApp(char const *f, int l)
lock UG application to synchronize with other threads (for debug)
MessageQueueElement * checkElement(int source, int datatypeId, int tag)
check if the specified message exists or nor
Base class of Calculation state in a ParaSolver.
static thread_local int localRank
local thread rank
int getCount()
getter of the number of the data type elements
virtual bool waitTerminatedMessage()
function to wait Terminated message (This function is not used currently)
static const int ParaInstanceType
Defines for UG Framework.
const char * getTagString(int tag)
get Tag string for debugging
MessageQueueElement * getNext()
getter of the pointer to the next MessageQueueElement
class ParaTimerMpi (Timer used in thread communication)
MessageQueueElement * next
point to next message queue element
ThreadsTableElement()
default constructor of ThreadsTableElement
ThreadsTableElement(int inRank, ParaParamSet *paraParamSet)
constructor of ThreadsTableElement
Class of MessageQueueTableElement.
static const int TYPE_LAST
Class for message queue element.
std::mutex rankLockMutex
mutex to access rank
void waitMessage(std::condition_variable &sentMsg, std::mutex &queueLockMutex, bool *sentMessage, int source, int datatypeId, int tag)
wait for a specified message coming to a queue
MessageQueueElement(int inSource, int inCount, int inDataTypeId, int inTag, void *inData)
constructor of MessageQueueElement
static const int TagTraceFileName
int getSource()
getter of source rank
void waitMessage(std::condition_variable &sentMsg, std::mutex &queueLockMutex, bool *sentMessage, int source, int *tag)
wait for a specified message coming to a queue
void abort()
abort. How it works sometimes depends on communicator used
MessageQueueElement * extarctElement(bool *sentMessage, int source, int datatypeId, int tag)
extracts a message
double getStartTime()
get start time of this communicator (should not be used)
void unlockRank()
unlock rank
int ** token
index 0: token index 1: token color -1: green > 0: yellow ( termination origin solver number ) -2: re...
int getNumOfMessagesWaitingToSend(int dest=-1)
get size of the messageQueueTable
void enqueue(std::condition_variable &sentMsg, std::mutex &queueLockMutex, bool *sentMessage, MessageQueueElement *newElement)
enqueue a message
static const int ParaSolverTerminationStateType
Base class of communicator for UG Framework.
static const int TagTrace
void link(MessageQueueElement *nextElement)
link to the next MessageQueueElement
std::condition_variable * sentMsg
condition variable for synchronization
bool isStringParamDefaultValue(int param)
check if string parameter is default value or not
class ParaSolverTerminationState (Solver termination state in a ParaSolver)
virtual void unlockApp(char const *f, int l)
unlock UG application to synchronize with other threads (for debug)
int size
number of the messages in queue
virtual void solverInit(ParaParamSet *paraParamSet)
initializer for Solvers
int tag
tag of the message, -1 : in case of broadcast message
ParaSysTimer timer
system timer
void setRank(int r)
setter of this thread rank
Class for the difference between instance and subproblem.
static const int ParaSolutionType
int dataTypeId
data type id
const char * getStringParamValue(int param)
for char parameters
Class of ThreadsTableElement.
~ThreadsTableElement()
destructor of ThreadsTableElement
MessageQueueTableElement()
default constructor of MessageQueueTableElement
double getStartTime(void)
get start time
virtual void unlockApp()
unlock UG application to synchronize with other threads
int getTag()
getter of the message tag
void * getData()
getter of data
void waitMessage(std::condition_variable &sentMsg, std::mutex &queueLockMutex, bool *sentMessage, int source, int tag)
wait for a specified message coming to a queue
bool isEmpty()
check if the queue is empty or not
int source
source thread rank of this message
int getSize()
getter of size
bool tagStringTableIsSetUpCoorectly()
check if tag string table (for debugging) set up correctly
void waitMessage(std::condition_variable &sentMsg, std::mutex &queueLockMutex, bool *sentMessage)
wait for a message coming to a queue
int rank
rank of this thread
bool tagTraceFlag
indicate if tags are traced or not
int getSize()
get size of this communicator, which indicates how many threads in a UG process
static const int ParaTaskType
static const int UG_USER_TYPE_LAST
MessageQueueTableElement ** messageQueueTable
message queue table
static const int ParaSolverStateType
virtual void lockApp()
lock UG application to synchronize with other threads
int getRank()
getter of this thread rank
std::mutex * queueLockMutex
mutex for synchronization
ParaCommCPP11()
constructor of ParaComCPP11
bool * sentMessage
sent message flag for synchronization
MessageQueueElement * head
head of the message queue
static const int ThreadTableSize
size of thread table : this limits the number of threads
static const int ParaParamSetType
Communicator object for C++11 thread communications.
static const int UG_USER_TYPE_FIRST
user defined transfer data types
static const int ParaRacingRampUpParamType
Base class of communicator object.
~MessageQueueTableElement()
destructor of MessageQueueTableElement
std::ostream * tos
tag trace stream for this thread
std::mutex * tokenAccessLockMutex
mutex to access token
bool getBoolParamValue(int param)
for bool parameters