|
|
|
@ -84,7 +84,7 @@ public:
|
|
|
|
|
uint32_t GetNumReceivers() const { return fReceivers.GetCount(); } |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
int32_t plDispatch::fNumBufferReq = 0; |
|
|
|
|
int32_t plDispatch::fNumBufferReq = 0; |
|
|
|
|
bool plDispatch::fMsgActive = false; |
|
|
|
|
plMsgWrap* plDispatch::fMsgCurrent = nil; |
|
|
|
|
plMsgWrap* plDispatch::fMsgHead = nil; |
|
|
|
@ -92,8 +92,8 @@ plMsgWrap* plDispatch::fMsgTail = nil;
|
|
|
|
|
hsTArray<plMessage*> plDispatch::fMsgWatch; |
|
|
|
|
MsgRecieveCallback plDispatch::fMsgRecieveCallback = nil; |
|
|
|
|
|
|
|
|
|
hsMutex plDispatch::fMsgCurrentMutex; // mutex for fMsgCurrent
|
|
|
|
|
hsMutex plDispatch::fMsgDispatchLock; // mutex for IMsgDispatch
|
|
|
|
|
std::recursive_mutex plDispatch::fMsgCurrentMutex; // mutex for fMsgCurrent
|
|
|
|
|
std::mutex plDispatch::fMsgDispatchLock; // mutex for IMsgDispatch
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
plDispatch::plDispatch() |
|
|
|
@ -227,18 +227,19 @@ bool plDispatch::IListeningForExactType(uint16_t hClass)
|
|
|
|
|
|
|
|
|
|
void plDispatch::IMsgEnqueue(plMsgWrap* msgWrap, bool async) |
|
|
|
|
{ |
|
|
|
|
fMsgCurrentMutex.Lock(); |
|
|
|
|
{ |
|
|
|
|
std::lock_guard<std::recursive_mutex> lock(fMsgCurrentMutex); |
|
|
|
|
|
|
|
|
|
#ifdef HS_DEBUGGING |
|
|
|
|
if( msgWrap->fMsg->HasBCastFlag(plMessage::kMsgWatch) ) |
|
|
|
|
fMsgWatch.Append(msgWrap->fMsg); |
|
|
|
|
if (msgWrap->fMsg->HasBCastFlag(plMessage::kMsgWatch)) |
|
|
|
|
fMsgWatch.Append(msgWrap->fMsg); |
|
|
|
|
#endif // HS_DEBUGGING
|
|
|
|
|
|
|
|
|
|
if( fMsgTail ) |
|
|
|
|
fMsgTail = IInsertToQueue(&fMsgTail->fNext, msgWrap); |
|
|
|
|
else |
|
|
|
|
fMsgTail = IInsertToQueue(&fMsgHead, msgWrap); |
|
|
|
|
fMsgCurrentMutex.Unlock(); |
|
|
|
|
if (fMsgTail) |
|
|
|
|
fMsgTail = IInsertToQueue(&fMsgTail->fNext, msgWrap); |
|
|
|
|
else |
|
|
|
|
fMsgTail = IInsertToQueue(&fMsgHead, msgWrap); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if( !async ) |
|
|
|
|
// Test for fMsgActive in IMsgDispatch(), properly wrapped inside a mutex -mcn
|
|
|
|
@ -248,25 +249,22 @@ void plDispatch::IMsgEnqueue(plMsgWrap* msgWrap, bool async)
|
|
|
|
|
// On starts deferring msg delivery until buffering is set to off again.
|
|
|
|
|
bool plDispatch::SetMsgBuffering(bool on) |
|
|
|
|
{ |
|
|
|
|
fMsgCurrentMutex.Lock(); |
|
|
|
|
if( on ) |
|
|
|
|
{ |
|
|
|
|
hsAssert(fNumBufferReq || !fMsgActive, "Can't start deferring message delivery while delivering messages. See mf"); |
|
|
|
|
if( !fNumBufferReq && fMsgActive ) |
|
|
|
|
std::lock_guard<std::recursive_mutex> lock(fMsgCurrentMutex); |
|
|
|
|
if (on) |
|
|
|
|
{ |
|
|
|
|
fMsgCurrentMutex.Unlock(); |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
hsAssert(fNumBufferReq || !fMsgActive, "Can't start deferring message delivery while delivering messages. See mf"); |
|
|
|
|
if (!fNumBufferReq && fMsgActive) |
|
|
|
|
return false; |
|
|
|
|
|
|
|
|
|
fNumBufferReq++; |
|
|
|
|
fMsgActive = true; |
|
|
|
|
fMsgCurrentMutex.Unlock(); |
|
|
|
|
} |
|
|
|
|
else if( !--fNumBufferReq ) |
|
|
|
|
{ |
|
|
|
|
fMsgActive = false; |
|
|
|
|
fMsgCurrentMutex.Unlock(); |
|
|
|
|
IMsgDispatch(); |
|
|
|
|
fNumBufferReq++; |
|
|
|
|
fMsgActive = true; |
|
|
|
|
} |
|
|
|
|
else if (!--fNumBufferReq) |
|
|
|
|
{ |
|
|
|
|
fMsgActive = false; |
|
|
|
|
IMsgDispatch(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
hsAssert(fNumBufferReq >= 0, "Mismatched number of on/off dispatch buffering requests"); |
|
|
|
|
|
|
|
|
@ -275,25 +273,25 @@ bool plDispatch::SetMsgBuffering(bool on)
|
|
|
|
|
|
|
|
|
|
void plDispatch::IMsgDispatch() |
|
|
|
|
{ |
|
|
|
|
if( !fMsgDispatchLock.TryLock() ) |
|
|
|
|
if (!fMsgDispatchLock.try_lock()) |
|
|
|
|
return; |
|
|
|
|
|
|
|
|
|
if( fMsgActive ) |
|
|
|
|
{ |
|
|
|
|
fMsgDispatchLock.Unlock(); |
|
|
|
|
fMsgDispatchLock.unlock(); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fMsgActive = true; |
|
|
|
|
int responseLevel=0; |
|
|
|
|
|
|
|
|
|
fMsgCurrentMutex.Lock(); |
|
|
|
|
fMsgCurrentMutex.lock(); |
|
|
|
|
|
|
|
|
|
plMsgWrap* origTail = fMsgTail; |
|
|
|
|
while((fMsgCurrent = fMsgHead)) |
|
|
|
|
{ |
|
|
|
|
IDequeue(&fMsgHead, &fMsgTail); |
|
|
|
|
fMsgCurrentMutex.Unlock(); |
|
|
|
|
fMsgCurrentMutex.unlock(); |
|
|
|
|
|
|
|
|
|
plMessage* msg = fMsgCurrent->fMsg; |
|
|
|
|
bool nonLocalMsg = msg && msg->HasBCastFlag(plMessage::kNetNonLocal); |
|
|
|
@ -402,16 +400,16 @@ void plDispatch::IMsgDispatch()
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
fMsgCurrentMutex.Lock(); |
|
|
|
|
fMsgCurrentMutex.lock(); |
|
|
|
|
|
|
|
|
|
delete fMsgCurrent; |
|
|
|
|
// TEMP
|
|
|
|
|
fMsgCurrent = (class plMsgWrap *)0xdeadc0de; |
|
|
|
|
} |
|
|
|
|
fMsgCurrentMutex.Unlock(); |
|
|
|
|
fMsgCurrentMutex.unlock(); |
|
|
|
|
|
|
|
|
|
fMsgActive = false; |
|
|
|
|
fMsgDispatchLock.Unlock(); |
|
|
|
|
fMsgDispatchLock.unlock(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
@ -419,12 +417,12 @@ void plDispatch::IMsgDispatch()
|
|
|
|
|
//
|
|
|
|
|
bool plDispatch::IMsgNetPropagate(plMessage* msg) |
|
|
|
|
{ |
|
|
|
|
fMsgCurrentMutex.Lock(); |
|
|
|
|
|
|
|
|
|
// Make sure cascaded messages all have the same net flags
|
|
|
|
|
plNetClientApp::InheritNetMsgFlags(fMsgCurrent ? fMsgCurrent->fMsg : nil, msg, false); |
|
|
|
|
{ |
|
|
|
|
std::lock_guard<std::recursive_mutex> lock(fMsgCurrentMutex); |
|
|
|
|
|
|
|
|
|
fMsgCurrentMutex.Unlock(); |
|
|
|
|
// Make sure cascaded messages all have the same net flags
|
|
|
|
|
plNetClientApp::InheritNetMsgFlags(fMsgCurrent ? fMsgCurrent->fMsg : nil, msg, false); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Decide if msg should go out over the network.
|
|
|
|
|
// If kNetForce is used, this message should always go out over the network, even if it's already
|
|
|
|
@ -511,10 +509,9 @@ void plDispatch::MsgQueue(plMessage* msg)
|
|
|
|
|
{ |
|
|
|
|
if (fQueuedMsgOn) |
|
|
|
|
{ |
|
|
|
|
fQueuedMsgListMutex.Lock(); |
|
|
|
|
std::lock_guard<std::mutex> lock(fQueuedMsgListMutex); |
|
|
|
|
hsAssert(msg,"Message missing"); |
|
|
|
|
fQueuedMsgList.push_back(msg); |
|
|
|
|
fQueuedMsgListMutex.Unlock(); |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
MsgSend(msg, false); |
|
|
|
@ -522,23 +519,23 @@ void plDispatch::MsgQueue(plMessage* msg)
|
|
|
|
|
|
|
|
|
|
void plDispatch::MsgQueueProcess() |
|
|
|
|
{ |
|
|
|
|
// Process all messages on Queue, unlock while sending them
|
|
|
|
|
// this would allow other threads to put new messages on the list while we send()
|
|
|
|
|
while (1) |
|
|
|
|
{
|
|
|
|
|
plMessage * pMsg = nil; |
|
|
|
|
fQueuedMsgListMutex.Lock(); |
|
|
|
|
int size = fQueuedMsgList.size(); |
|
|
|
|
if (size) |
|
|
|
|
{ pMsg = fQueuedMsgList.front(); |
|
|
|
|
fQueuedMsgList.pop_front(); |
|
|
|
|
// Process all messages on Queue, unlock while sending them
|
|
|
|
|
// this would allow other threads to put new messages on the list while we send()
|
|
|
|
|
bool empty = false; |
|
|
|
|
while (!empty) |
|
|
|
|
{ |
|
|
|
|
plMessage * pMsg = nullptr; |
|
|
|
|
{ |
|
|
|
|
std::lock_guard<std::mutex> lock(fQueuedMsgListMutex); |
|
|
|
|
empty = fQueuedMsgList.empty(); |
|
|
|
|
if (!empty) |
|
|
|
|
{ |
|
|
|
|
pMsg = fQueuedMsgList.front(); |
|
|
|
|
fQueuedMsgList.pop_front(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
fQueuedMsgListMutex.Unlock(); |
|
|
|
|
if (pMsg) |
|
|
|
|
{ MsgSend(pMsg, false); |
|
|
|
|
} |
|
|
|
|
if (!size) |
|
|
|
|
break; |
|
|
|
|
MsgSend(pMsg, false); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|