37#ifndef __PARA_COMM_CPP11_H__
38#define __PARA_COMM_CPP11_H__
42#include <condition_variable>
58#define TAG_TRACE( call, fromTo, sourceDest, tag ) \
66 *getOstream() << timer.getRTimeInterval() << " [Rank = " << getRank() << "] " << #call << " " << #fromTo \
67 << " " << sourceDest << " with Tag = " << getTagString(tag) << std::endl; \
72 *getOstream() << timer.getRTimeInterval() << " [Rank = " << getRank() << "] " << #call << " " << #fromTo \
73 << " " << sourceDest << " as broadcast" << std::endl; \
80 std::cout << "### Before creating ostream, Rank = " << getRank() << " issues TAG_TRACE with " << #call << " " << #fromTo \
81 << " " << sourceDest << " with Tag = " << getTagString(tag) << std::endl; \
85 std::cout << "### Before creating ostream, Rank = " << getRank() << " issues TAG_TRACE with " << #call << " " << #fromTo \
86 << " " << sourceDest << " as broadcast" << std::endl; \
287 if( current->getSource() == source
288 && current->getDataTypeId() == datatypeId
289 && current->getTag() == tag )
309 if( current->getTag() == tag )
338 && current->
getTag() == tag )
342 if( current ==
head )
344 if( current ==
tail )
348 *sentMessage =
false;
357 if( current ==
tail )
385 if(
head == 0 )
return 0;
390 if( current ==
tail )
394 *sentMessage =
false;
408 std::condition_variable& sentMsg,
409 std::mutex& queueLockMutex,
425 sentMsg.notify_one();
456 if(
head ) empty =
false;
457 assert( !empty ||
size == 0 );
465 std::condition_variable& sentMsg,
466 std::mutex& queueLockMutex,
470 std::unique_lock<std::mutex> lk(queueLockMutex);
471 sentMsg.wait(lk, [&sentMessage]{
return (*sentMessage ==
true);});
478 std::condition_variable& sentMsg,
479 std::mutex& queueLockMutex,
487 std::unique_lock<std::mutex> lk(queueLockMutex);
488 sentMsg.wait(lk, [&sentMessage]{
return (*sentMessage ==
true);});
494 && current->
getTag() == tag )
501 *sentMessage =
false;
509 std::condition_variable& sentMsg,
510 std::mutex& queueLockMutex,
519 std::unique_lock<std::mutex> lk(queueLockMutex);
520 sentMsg.wait(lk, [&sentMessage]{
return (*sentMessage ==
true);});
527 && current->
getTag() == tag )
534 *sentMessage =
false;
542 std::condition_variable& sentMsg,
543 std::mutex& queueLockMutex,
551 std::unique_lock<std::mutex> lk(queueLockMutex);
552 sentMsg.wait(lk, [&sentMessage]{
return (*sentMessage ==
true);});
573 && current->
getTag() == (*tag) )
581 *sentMessage =
false;
626 std::ostringstream s;
628 std::ofstream *ofs =
new std::ofstream();
629 ofs->open(s.str().c_str());
643 if(
tos != &std::cout )
645 std::ofstream *ofs =
dynamic_cast<std::ofstream *
>(
tos);
683class ParaCalculationState;
685class ParaSolverState;
686class ParaSolverTerminationState;
687class ParaDiffSubproblem;
848 int nTotalMessages = 0;
849 for(
int i = 0; i < (
comSize + 1 ); i++ )
853 return nTotalMessages;
990 std::cout <<
"### Rank" << this->
getRank() <<
":AppLocked! [" << f <<
", " << l <<
"]" << std::endl;
1013 std::cout <<
"### Rank" << this->
getRank() <<
":AppUnlocked! [" << f <<
", " << l <<
"]" << std::endl;
1026 std::cout <<
"Rank Unlocked: " << f <<
", " << l << std::endl;
1046 const int datatypeId,
1057 const int datatypeId,
1069 const int datatypeId,
1107 const int datatypeId,
1118 const int datatypeId,
1133#define DEF_PARA_COMM( para_comm, comm ) ParaCommCPP11 *para_comm = dynamic_cast< ParaCommCPP11* >(comm)
1134#define LOCK_APP( comm ) comm->lockApp(__FILE__, __LINE__)
1135#define UNLOCK_APP( comm ) comm->lockApp(__FILE__, __LINE__)
1136#define LOCK_RANK( comm ) comm->lockRank(__FILE__, __LINE__)
1137#define UNLOCK_RANK( comm ) comm->unlockRank(__FILE__, __LINE__)
Class for message queue element.
int source
source thread rank of this message
int dataTypeId
data type id
int getSource()
getter of source rank
MessageQueueElement()
default constructor of MessageQueueElement
MessageQueueElement * next
point to next message queue element
int getDataTypeId()
getter of the data type id
void link(MessageQueueElement *nextElement)
link to the next MessageQueueElement
MessageQueueElement(int inSource, int inCount, int inDataTypeId, int inTag, void *inData)
constructor of MessageQueueElement
int tag
tag of the message, -1 : in case of broadcast message
void * getData()
getter of data
void * data
data of the message
int count
number of the data type elements
int getTag()
getter of the message tag
~MessageQueueElement()
destructor of MessageQueueElement
int getCount()
getter of the number of the data type elements
MessageQueueElement * getNext()
getter of the pointer to the next MessageQueueElement
Class of MessageQueueTableElement.
void waitMessage(std::condition_variable &sentMsg, std::mutex &queueLockMutex, bool *sentMessage, int source, int *tag)
wait for a specified message coming to a queue
void enqueue(std::condition_variable &sentMsg, std::mutex &queueLockMutex, bool *sentMessage, MessageQueueElement *newElement)
enqueue a message
void waitMessage(std::condition_variable &sentMsg, std::mutex &queueLockMutex, bool *sentMessage, int source, int datatypeId, int tag)
wait for a specified message coming to a queue
void waitMessage(std::condition_variable &sentMsg, std::mutex &queueLockMutex, bool *sentMessage, int source, int tag)
wait for a specified message coming to a queue
MessageQueueElement * extarctElement(bool *sentMessage, int source, int datatypeId, int tag)
extracts a message
int getSize()
getter of size
int size
number of the messages in queue
MessageQueueElement * extarctElement(bool *sentMessage)
extracts a message (This method is only for desctructor of ParaCommCPP11. No lock is necessary....
MessageQueueElement * getHead()
getter of head
MessageQueueElement * tail
tail of the message queue
MessageQueueElement * checkElement(int source, int datatypeId, int tag)
check if the specified message exists or nor
~MessageQueueTableElement()
destructor of MessageQueueTableElement
MessageQueueTableElement()
default constructor of MessageQueueTableElement
void waitMessage(std::condition_variable &sentMsg, std::mutex &queueLockMutex, bool *sentMessage)
wait for a message coming to a queue
MessageQueueElement * head
head of the message queue
MessageQueueElement * checkElementWithTag(int tag)
check if the specified message with tag exists or nor
bool isEmpty()
check if the queue is empty or not
Communicator object for C++11 thread communications.
virtual void solverInit(ParaParamSet *paraParamSet)
initializer for Solvers
int comSize
communicator size : number of threads joined in this system
virtual void lockApp()
lock UG application to synchronize with other threads
bool probe(int *source, int *tag)
probe function which waits a new message
int send(void *bufer, int count, const int datatypeId, int dest, const int tag)
send function for standard ParaData types
void unlockRank()
unlock rank
int ** token
index 0: token index 1: token color -1: green > 0: yellow ( termination origin solver number ) -2: re...
bool iProbe(int *source, int *tag)
iProbe function which checks if a new message is arrived or not
std::mutex rankLockMutex
mutex to access rank
virtual void unlockApp(char const *f, int l)
unlock UG application to synchronize with other threads (for debug)
int receive(void *bufer, int count, const int datatypeId, int source, const int tag)
receive function for standard ParaData types
void freeMem(void *buffer, int count, const int datatypeId)
free memory
std::ostream * getOstream()
get ostream pointer
int getSize()
get size of this communicator, which indicates how many threads in a UG process
virtual bool tagStringTableIsSetUpCoorectly()
check if tag string table (for debugging) set up correctly
bool tagTraceFlag
indicate if tags are traced or not
void * allocateMemAndCopy(const void *buffer, int count, const int datatypeId)
allocate memory and copy message
virtual void setLocalRank(int inRank)
int uTypeSend(void *bufer, const int datatypeId, int dest, int tag)
User type send for created data type.
std::mutex applicationLockMutex
mutex for applications
virtual const char * getTagString(int tag)
get Tag string for debugging
ParaSysTimer timer
system timer
int uTypeReceive(void **bufer, const int datatypeId, int source, int tag)
User type receive for created data type.
virtual void passToken(int rank)
pass token to from the rank to the next
MessageQueueTableElement ** messageQueueTable
message queue table
static const char * tagStringTable[]
tag name string table
virtual void lockApp(char const *f, int l)
lock UG application to synchronize with other threads (for debug)
double getStartTime()
get start time of this communicator (should not be used)
int getRank()
get rank of caller's thread
virtual void init(int argc, char **argv)
initializer of this communicator
int getNumOfMessagesWaitingToSend(int dest=-1)
get size of the messageQueueTable
ParaTimer * createParaTimer()
create ParaTimer object
virtual void solverDel(int rank)
delete Solver from this communicator
bool freeStandardTypes(MessageQueueElement *elem)
free memory
virtual bool waitTerminatedMessage()
function to wait Terminated message (This function is not used currently)
virtual void unlockApp()
unlock UG application to synchronize with other threads
void abort()
abort. How it works sometimes depends on communicator used
bool * sentMessage
sent message flag for synchronization
void copy(void *dest, const void *src, int count, int datatypeId)
copy message
virtual ~ParaCommCPP11()
destructor of this communicator
static thread_local int localRank
local thread rank
virtual void solverReInit(int rank, ParaParamSet *paraParamSet)
reinitializer of a specific Solver
virtual bool passTermToken(int rank)
pass termination token from the rank to the next
virtual void setToken(int rank, int *inToken)
set received token to this communicator
void unlockRank(char const *f, int l)
unlock rank (for debugging)
static ThreadsTableElement * threadsTable[ThreadTableSize]
threads table: index is thread rank
int bcast(void *buffer, int count, const int datatypeId, int root)
broadcast function for standard ParaData types
void waitSpecTagFromSpecSource(const int source, const int tag, int *receivedTag)
wait function for a specific tag from a specific source coming from
virtual void lcInit(ParaParamSet *paraParamSet)
initializer for LoadCoordinator
std::mutex * queueLockMutex
mutex for synchronization
std::mutex * tokenAccessLockMutex
mutex to access token
std::condition_variable * sentMsg
condition variable for synchronization
ParaCommCPP11()
constructor of ParaComCPP11
virtual bool waitToken(int rank)
wait token when UG runs with deterministic mode
Base class of communicator object.
double getStartTime(void)
get start time
class ParaTimerMpi (Timer used in thread communication)
Class of ThreadsTableElement.
void setRank(int r)
setter of this thread rank
std::ostream * getOstream()
getter of tag trace stream of this rank
~ThreadsTableElement()
destructor of ThreadsTableElement
std::ostream * tos
tag trace stream for this thread
int rank
rank of this thread
int getRank()
getter of this thread rank
ThreadsTableElement(int inRank, ParaParamSet *paraParamSet)
constructor of ThreadsTableElement
ThreadsTableElement()
default constructor of ThreadsTableElement
static ScipParaParamSet * paraParamSet
static const int TYPE_LAST
static const int UG_USER_TYPE_LAST
static const int ParaTaskType
static const int ParaInstanceType
static const int ParaParamSetType
static const int ParaSolverStateType
static const int ParaCalculationStateType
static const int ParaSolverTerminationStateType
static const int TagTraceFileName
static const int UG_USER_TYPE_FIRST
user defined transfer data types
static const int ThreadTableSize
size of thread table : this limits the number of threads
static const int ParaSolutionType
static const int ParaRacingRampUpParamType
static const int TagTrace
Base class of communicator for UG Framework.
Defines for UG Framework.
ParaTimer extension for threads.