/*==LICENSE==* CyanWorlds.com Engine - MMOG client, server and tools Copyright (C) 2011 Cyan Worlds, Inc. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . You can contact Cyan Worlds, Inc. by email legal@cyan.com or by snail mail at: Cyan Worlds, Inc. 14617 N Newport Hwy Mead, WA 99021 *==LICENSE==*/ #include "hsTypes.h" #include "hsResMgr.h" #include "plDispatch.h" #define PLMESSAGE_PRIVATE #include "../pnMessage/plMessage.h" #include "../pnKeyedObject/hsKeyedObject.h" #include "hsTimer.h" #include "../pnMessage/plTimeMsg.h" #include "../pnKeyedObject/plKey.h" #include "plDispatchLogBase.h" #include "../pnNetCommon/plNetApp.h" #include "../pnNetCommon/plSynchedObject.h" #include "../pnNetCommon/pnNetCommon.h" #include "hsThread.h" #include "plProfile.h" plProfile_CreateTimer("MsgReceive", "Update", MsgReceive); plProfile_CreateTimer(" TimeMsg", "Update", TimeMsg); plProfile_CreateTimer(" EvalMsg", "Update", EvalMsg); plProfile_CreateTimer(" TransformMsg", "Update", TransformMsg); plProfile_CreateTimer(" CameraMsg", "Update", CameraMsg); class plMsgWrap { public: plMsgWrap** fBack; plMsgWrap* fNext; hsTArray fReceivers; plMessage* fMsg; plMsgWrap(plMessage* msg) : fMsg(msg) { hsRefCnt_SafeRef(msg); } virtual ~plMsgWrap() { hsRefCnt_SafeUnRef(fMsg); } plMsgWrap& ClearReceivers() { fReceivers.SetCount(0); return *this; } plMsgWrap& AddReceiver(const plKey& rcv) { hsAssert(rcv, "Trying to send mail to nil receiver"); fReceivers.Append(rcv); return *this; } const plKey& GetReceiver(int i) const { return fReceivers[i]; } UInt32 GetNumReceivers() const { return fReceivers.GetCount(); } }; Int32 plDispatch::fNumBufferReq = 0; hsBool plDispatch::fMsgActive = false; plMsgWrap* plDispatch::fMsgCurrent = nil; plMsgWrap* plDispatch::fMsgHead = nil; plMsgWrap* plDispatch::fMsgTail = nil; hsTArray plDispatch::fMsgWatch; MsgRecieveCallback plDispatch::fMsgRecieveCallback = nil; hsMutex plDispatch::fMsgCurrentMutex; // mutex for fMsgCurrent hsMutex plDispatch::fMsgDispatchLock; // mutex for IMsgDispatch plDispatch::plDispatch() : fOwner(nil), fFutureMsgQueue(nil), fQueuedMsgOn(true) { } plDispatch::~plDispatch() { int i; for( i = 0; i < fRegisteredExactTypes.GetCount(); i++ ) delete fRegisteredExactTypes[i]; ITrashUndelivered(); } void plDispatch::ITrashUndelivered() { while( fFutureMsgQueue ) { plMsgWrap* nuke = fFutureMsgQueue; fFutureMsgQueue = fFutureMsgQueue->fNext; hsRefCnt_SafeUnRef(nuke->fMsg); delete nuke; } // If we're the main dispatch, any unsent messages at this // point are just trashed. Slave dispatches just go away and // leave their messages to be delivered when the main dispatch // gets around to it. if( this == plgDispatch::Dispatch() ) { while( fMsgHead ) { plMsgWrap* nuke = fMsgHead; fMsgHead = fMsgHead->fNext; // hsRefCnt_SafeUnRef(nuke->fMsg); // MOOSE - done in plMsgWrap dtor delete nuke; } // reset static members which we just deleted - MOOSE fMsgCurrent=fMsgHead=fMsgTail=nil; fMsgActive = false; } } plMsgWrap* plDispatch::IInsertToQueue(plMsgWrap** curr, plMsgWrap* isert) { isert->fNext = *curr; isert->fBack = curr; if( *curr ) (*curr)->fBack = &isert->fNext; *curr = isert; return isert; } plMsgWrap* plDispatch::IDequeue(plMsgWrap** head, plMsgWrap** tail) { plMsgWrap* retVal = *head; if( *head ) { *head = (*head)->fNext; if( *head ) (*head)->fBack = head; } if( tail && (*tail == retVal) ) *tail = *head; return retVal; } hsBool plDispatch::ISortToDeferred(plMessage* msg) { plMsgWrap* msgWrap = TRACKED_NEW plMsgWrap(msg); if( !fFutureMsgQueue ) { if( IGetOwner() ) plgDispatch::Dispatch()->RegisterForExactType(plTimeMsg::Index(), IGetOwnerKey()); IInsertToQueue(&fFutureMsgQueue, msgWrap); return false; } if( fFutureMsgQueue->fMsg->fTimeStamp > msgWrap->fMsg->fTimeStamp ) { IInsertToQueue(&fFutureMsgQueue, msgWrap); return false; } plMsgWrap* after = fFutureMsgQueue; while( after->fNext && (after->fNext->fMsg->fTimeStamp < msgWrap->fMsg->fTimeStamp) ) after = after->fNext; IInsertToQueue(&after->fNext, msgWrap); return false; } void plDispatch::ICheckDeferred(double secs) { while( fFutureMsgQueue && (fFutureMsgQueue->fMsg->fTimeStamp < secs) ) { plMsgWrap* send = IDequeue(&fFutureMsgQueue, nil); MsgSend(send->fMsg); delete send; } int timeIdx = plTimeMsg::Index(); if( IGetOwner() && !fFutureMsgQueue && ( (timeIdx >= fRegisteredExactTypes.GetCount()) || !fRegisteredExactTypes[plTimeMsg::Index()] ) ) plgDispatch::Dispatch()->UnRegisterForExactType(plTimeMsg::Index(), IGetOwnerKey()); } hsBool plDispatch::IListeningForExactType(UInt16 hClass) { if( (hClass == plTimeMsg::Index()) && fFutureMsgQueue ) return true; return false; } void plDispatch::IMsgEnqueue(plMsgWrap* msgWrap, hsBool async) { fMsgCurrentMutex.Lock(); #ifdef HS_DEBUGGING if( msgWrap->fMsg->HasBCastFlag(plMessage::kMsgWatch) ) fMsgWatch.Append(msgWrap->fMsg); #endif // HS_DEBUGGING if( fMsgTail ) fMsgTail = IInsertToQueue(&fMsgTail->fNext, msgWrap); else fMsgTail = IInsertToQueue(&fMsgHead, msgWrap); fMsgCurrentMutex.Unlock(); if( !async ) // Test for fMsgActive in IMsgDispatch(), properly wrapped inside a mutex -mcn IMsgDispatch(); } // On starts deferring msg delivery until buffering is set to off again. hsBool plDispatch::SetMsgBuffering(hsBool on) { fMsgCurrentMutex.Lock(); if( on ) { hsAssert(fNumBufferReq || !fMsgActive, "Can't start deferring message delivery while delivering messages. See mf"); if( !fNumBufferReq && fMsgActive ) { fMsgCurrentMutex.Unlock(); return false; } fNumBufferReq++; fMsgActive = true; fMsgCurrentMutex.Unlock(); } else if( !--fNumBufferReq ) { fMsgActive = false; fMsgCurrentMutex.Unlock(); IMsgDispatch(); } hsAssert(fNumBufferReq >= 0, "Mismatched number of on/off dispatch buffering requests"); return true; } void plDispatch::IMsgDispatch() { if( !fMsgDispatchLock.TryLock() ) return; if( fMsgActive ) { fMsgDispatchLock.Unlock(); return; } fMsgActive = true; int responseLevel=0; fMsgCurrentMutex.Lock(); plMsgWrap* origTail = fMsgTail; while( fMsgCurrent = fMsgHead ) { IDequeue(&fMsgHead, &fMsgTail); fMsgCurrentMutex.Unlock(); plMessage* msg = fMsgCurrent->fMsg; hsBool nonLocalMsg = msg && msg->HasBCastFlag(plMessage::kNetNonLocal); #ifdef HS_DEBUGGING int watchIdx = fMsgWatch.Find(msg); if( fMsgWatch.kMissingIndex != watchIdx ) { fMsgWatch.Remove(watchIdx); #if HS_BUILD_FOR_WIN32 __asm { int 3 } #endif // HS_BUILD_FOR_WIN32 } #endif // HS_DEBUGGING static UInt64 startTicks = 0; if (plDispatchLogBase::IsLogging()) startTicks = hsTimer::GetFullTickCount(); int i, numReceivers=0; for( i = 0; fMsgCurrent && i < fMsgCurrent->GetNumReceivers(); i++ ) { const plKey& rcvKey = fMsgCurrent->GetReceiver(i); plReceiver* rcv = rcvKey ? plReceiver::ConvertNoRef(rcvKey->ObjectIsLoaded()) : nil; if( rcv ) { if (nonLocalMsg) { // localOnly objects should not get remote messages plSynchedObject* synchedObj = plSynchedObject::ConvertNoRef(rcv); if (synchedObj && !synchedObj->IsNetSynched() ) { continue; } if (plNetObjectDebuggerBase::GetInstance()) { // log net msg if this is a debug object hsKeyedObject* ko = hsKeyedObject::ConvertNoRef(rcv); if (plNetObjectDebuggerBase::GetInstance()->IsDebugObject(ko)) { hsLogEntry(plNetObjectDebuggerBase::GetInstance()->LogMsg( xtl::format(" object:%s, GameMessage %s st=%.3f rt=%.3f", ko->GetKeyName(), msg->ClassName(), hsTimer::GetSysSeconds(), hsTimer::GetSeconds()).c_str())); } } } #ifndef PLASMA_EXTERNAL_RELEASE UInt32 rcvTicks = hsTimer::GetPrecTickCount(); // Object could be deleted by this message, so we need to log this stuff now const char* keyname = "(unknown)"; const char* className = "(unknown)"; UInt32 clonePlayerID = 0; if (plDispatchLogBase::IsLoggingLong()) { hsKeyedObject* ko = hsKeyedObject::ConvertNoRef(rcv); if (ko) { keyname = ko->GetKeyName(); clonePlayerID = ko->GetKey()->GetUoid().GetClonePlayerID(); className = ko->ClassName(); } } #endif // PLASMA_EXTERNAL_RELEASE #ifdef HS_DEBUGGING if (msg->GetBreakBeforeDispatch()) DebugBreakIfDebuggerPresent(); #endif plProfile_BeginTiming(MsgReceive); rcv->MsgReceive(msg); plProfile_EndTiming(MsgReceive); #ifndef PLASMA_EXTERNAL_RELEASE if (plDispatchLogBase::IsLoggingLong()) { rcvTicks = hsTimer::GetPrecTickCount() - rcvTicks; float rcvTime = (float)(hsTimer::PrecTicksToSecs(rcvTicks) * 1000.f); // If the receiver takes more than 5 ms to process its message, log it if (rcvTime > 5.f) plDispatchLogBase::GetInstance()->LogLongReceive(keyname, className, clonePlayerID, msg, rcvTime); } #endif // PLASMA_EXTERNAL_RELEASE numReceivers++; if (fMsgRecieveCallback != nil) fMsgRecieveCallback(); } } // for message logging // if (plDispatchLogBase::IsLogging()) // { // float sendTime = hsTimer::FullTicksToMs(hsTimer::GetFullTickCount() - startTicks); // // plDispatchLogBase::GetInstance()->DumpMsg(msg, numReceivers, (int)sendTime, responseLevel*2 /* indent */); // if (origTail==fMsgCurrent) // { // if we deliver more msgs after this, they must be response msgs // responseLevel++; // origTail = fMsgTail; // } // } fMsgCurrentMutex.Lock(); delete fMsgCurrent; // TEMP fMsgCurrent = (class plMsgWrap *)0xdeadc0de; } fMsgCurrentMutex.Unlock(); fMsgActive = false; fMsgDispatchLock.Unlock(); } // // returns true if msg has been consumed and deleted // hsBool plDispatch::IMsgNetPropagate(plMessage* msg) { fMsgCurrentMutex.Lock(); // Make sure cascaded messages all have the same net flags plNetClientApp::InheritNetMsgFlags(fMsgCurrent ? fMsgCurrent->fMsg : nil, msg, false); fMsgCurrentMutex.Unlock(); // Decide if msg should go out over the network. // If kNetForce is used, this message should always go out over the network, even if it's already // part of a net cascade. We still want to inherit net status flags (but ignore them) // so that response messages obey cascading rules. In other words, we are not // halting the cascade, just overriding the send rule for this message. if( msg->HasBCastFlag(plMessage::kNetPropagate) && (!msg->HasBCastFlag(plMessage::kNetSent) || msg->HasBCastFlag(plMessage::kNetForce) || msg->HasBCastFlag(plMessage::kNetNonDeterministic) || msg->HasBCastFlag(plMessage::kCCRSendToAllPlayers )) ) { // send it off... hsAssert(!msg->HasBCastFlag(plMessage::kNetStartCascade), "initial net cascade msg getting sent over the net again?"); if (plNetClientApp::GetInstance() && plNetClientApp::GetInstance()->ISendGameMessage(msg)>=0) msg->SetBCastFlag(plMessage::kNetSent); } // Decide if msg should get sent locally if (!msg->HasBCastFlag(plMessage::kLocalPropagate)) { hsRefCnt_SafeUnRef(msg); return true; } // since we've already checked this property, and the msg will be dispatched locally, // it should not start any more net cascades. msg->SetBCastFlag(plMessage::kNetStartCascade, false); return false; } hsBool plDispatch::MsgSend(plMessage* msg, hsBool async) { if( IMsgNetPropagate(msg) ) return true; plTimeMsg* timeMsg; if( msg->GetTimeStamp() > hsTimer::GetSysSeconds() ) return ISortToDeferred(msg); else if( timeMsg = plTimeMsg::ConvertNoRef(msg) ) ICheckDeferred(timeMsg->DSeconds()); plMsgWrap* msgWrap = TRACKED_NEW plMsgWrap(msg); hsRefCnt_SafeUnRef(msg); // broadcast if( msg->HasBCastFlag(plMessage::kBCastByExactType) | msg->HasBCastFlag(plMessage::kBCastByType) ) { int idx = msg->ClassIndex(); if( idx < fRegisteredExactTypes.GetCount() ) { plTypeFilter* filt = fRegisteredExactTypes[idx]; if( filt ) { int j; for( j = 0; j < filt->fReceivers.GetCount(); j++ ) { msgWrap->AddReceiver(filt->fReceivers[j]); } if( msg->HasBCastFlag(plMessage::kClearAfterBCast) ) { delete filt; fRegisteredExactTypes[idx] = nil; } } } } // Direct communique else if( msg->GetNumReceivers() ) { msgWrap->fReceivers = msg->fReceivers; } IMsgEnqueue(msgWrap, async); return true; } void plDispatch::MsgQueueOnOff(hsBool sw) { fQueuedMsgOn = sw; } void plDispatch::MsgQueue(plMessage* msg) { if (fQueuedMsgOn) { fQueuedMsgListMutex.Lock(); hsAssert(msg,"Message missing"); fQueuedMsgList.push_back(msg); fQueuedMsgListMutex.Unlock(); } else MsgSend(msg, false); } void plDispatch::MsgQueueProcess() { // Process all messages on Queue, unlock while sending them // this would allow other threads to put new messages on the list while we send() while (1) { plMessage * pMsg = nil; fQueuedMsgListMutex.Lock(); int size = fQueuedMsgList.size(); if (size) { pMsg = fQueuedMsgList.front(); fQueuedMsgList.pop_front(); } fQueuedMsgListMutex.Unlock(); if (pMsg) { MsgSend(pMsg, false); } if (!size) break; } } void plDispatch::RegisterForType(UInt16 hClass, const plKey& receiver) { int i; for( i = 0; i < plFactory::GetNumClasses(); i++ ) { if( plFactory::DerivesFrom(hClass, i) ) RegisterForExactType(i, receiver); } } void plDispatch::RegisterForExactType(UInt16 hClass, const plKey& receiver) { int idx = hClass; fRegisteredExactTypes.ExpandAndZero(idx+1); plTypeFilter* filt = fRegisteredExactTypes[idx]; if( !filt ) { filt = TRACKED_NEW plTypeFilter; fRegisteredExactTypes[idx] = filt; filt->fHClass = hClass; } if( filt->fReceivers.kMissingIndex == filt->fReceivers.Find(receiver) ) filt->fReceivers.Append(receiver); } void plDispatch::UnRegisterForType(UInt16 hClass, const plKey& receiver) { int i; for( i = 0; i < fRegisteredExactTypes.GetCount(); i++ ) { if( plFactory::DerivesFrom(hClass, i) ) IUnRegisterForExactType(i , receiver); } } hsBool plDispatch::IUnRegisterForExactType(int idx, const plKey& receiver) { hsAssert(idx < fRegisteredExactTypes.GetCount(), "Out of range should be filtered before call to internal"); plTypeFilter* filt = fRegisteredExactTypes[idx]; if (!filt) return false; int j; for( j = 0; j < filt->fReceivers.GetCount(); j++ ) { if( receiver == filt->fReceivers[j] ) { if( filt->fReceivers.GetCount() > 1 ) { if( j < filt->fReceivers.GetCount() - 1 ) filt->fReceivers[j] = filt->fReceivers[filt->fReceivers.GetCount() - 1]; filt->fReceivers[filt->fReceivers.GetCount()-1] = nil; filt->fReceivers.SetCount(filt->fReceivers.GetCount()-1); } else { delete filt; fRegisteredExactTypes[idx] = nil; } break; } } return false; } void plDispatch::UnRegisterAll(const plKey& receiver) { int i; for( i = 0; i < fRegisteredExactTypes.GetCount(); i++ ) { plTypeFilter* filt = fRegisteredExactTypes[i]; if( filt ) { int idx = filt->fReceivers.Find(receiver); if( idx != filt->fReceivers.kMissingIndex ) { if( filt->fReceivers.GetCount() > 1 ) { if( idx < filt->fReceivers.GetCount() - 1 ) filt->fReceivers[idx] = filt->fReceivers[filt->fReceivers.GetCount() - 1]; filt->fReceivers[filt->fReceivers.GetCount()-1] = nil; filt->fReceivers.SetCount(filt->fReceivers.GetCount()-1); } else { delete filt; fRegisteredExactTypes[i] = nil; } } } } } void plDispatch::UnRegisterForExactType(UInt16 hClass, const plKey& receiver) { int idx = hClass; if( idx >= fRegisteredExactTypes.GetCount() ) return; plTypeFilter* filt = fRegisteredExactTypes[idx]; if( !filt ) return; IUnRegisterForExactType(idx, receiver); }