631 lines
19 KiB

/*==LICENSE==*
CyanWorlds.com Engine - MMOG client, server and tools
Copyright (C) 2011 Cyan Worlds, Inc.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
You can contact Cyan Worlds, Inc. by email legal@cyan.com
or by snail mail at:
Cyan Worlds, Inc.
14617 N Newport Hwy
Mead, WA 99021
*==LICENSE==*/
#include "hsTypes.h"
#include "hsResMgr.h"
#include "plDispatch.h"
#define PLMESSAGE_PRIVATE
#include "pnMessage/plMessage.h"
#include "pnKeyedObject/hsKeyedObject.h"
#include "hsTimer.h"
#include "pnMessage/plTimeMsg.h"
#include "pnKeyedObject/plKey.h"
#include "plDispatchLogBase.h"
#include "pnNetCommon/plNetApp.h"
#include "pnNetCommon/plSynchedObject.h"
#include "pnNetCommon/pnNetCommon.h"
#include "hsThread.h"
#include "plProfile.h"
plProfile_CreateTimer("MsgReceive", "Update", MsgReceive);
plProfile_CreateTimer(" TimeMsg", "Update", TimeMsg);
plProfile_CreateTimer(" EvalMsg", "Update", EvalMsg);
plProfile_CreateTimer(" TransformMsg", "Update", TransformMsg);
plProfile_CreateTimer(" CameraMsg", "Update", CameraMsg);
class plMsgWrap
{
public:
plMsgWrap** fBack;
plMsgWrap* fNext;
hsTArray<plKey> fReceivers;
plMessage* fMsg;
plMsgWrap(plMessage* msg) : fMsg(msg) { hsRefCnt_SafeRef(msg); }
virtual ~plMsgWrap() { hsRefCnt_SafeUnRef(fMsg); }
plMsgWrap& ClearReceivers() { fReceivers.SetCount(0); return *this; }
plMsgWrap& AddReceiver(const plKey& rcv)
{
hsAssert(rcv, "Trying to send mail to nil receiver");
fReceivers.Append(rcv); return *this;
}
const plKey& GetReceiver(int i) const { return fReceivers[i]; }
UInt32 GetNumReceivers() const { return fReceivers.GetCount(); }
};
Int32 plDispatch::fNumBufferReq = 0;
hsBool plDispatch::fMsgActive = false;
plMsgWrap* plDispatch::fMsgCurrent = nil;
plMsgWrap* plDispatch::fMsgHead = nil;
plMsgWrap* plDispatch::fMsgTail = nil;
hsTArray<plMessage*> plDispatch::fMsgWatch;
MsgRecieveCallback plDispatch::fMsgRecieveCallback = nil;
hsMutex plDispatch::fMsgCurrentMutex; // mutex for fMsgCurrent
hsMutex plDispatch::fMsgDispatchLock; // mutex for IMsgDispatch
plDispatch::plDispatch()
: fOwner(nil), fFutureMsgQueue(nil), fQueuedMsgOn(true)
{
}
plDispatch::~plDispatch()
{
int i;
for( i = 0; i < fRegisteredExactTypes.GetCount(); i++ )
delete fRegisteredExactTypes[i];
ITrashUndelivered();
}
void plDispatch::ITrashUndelivered()
{
while( fFutureMsgQueue )
{
plMsgWrap* nuke = fFutureMsgQueue;
fFutureMsgQueue = fFutureMsgQueue->fNext;
hsRefCnt_SafeUnRef(nuke->fMsg);
delete nuke;
}
// If we're the main dispatch, any unsent messages at this
// point are just trashed. Slave dispatches just go away and
// leave their messages to be delivered when the main dispatch
// gets around to it.
if( this == plgDispatch::Dispatch() )
{
while( fMsgHead )
{
plMsgWrap* nuke = fMsgHead;
fMsgHead = fMsgHead->fNext;
// hsRefCnt_SafeUnRef(nuke->fMsg); // MOOSE - done in plMsgWrap dtor
delete nuke;
}
// reset static members which we just deleted - MOOSE
fMsgCurrent=fMsgHead=fMsgTail=nil;
fMsgActive = false;
}
}
plMsgWrap* plDispatch::IInsertToQueue(plMsgWrap** curr, plMsgWrap* isert)
{
isert->fNext = *curr;
isert->fBack = curr;
if( *curr )
(*curr)->fBack = &isert->fNext;
*curr = isert;
return isert;
}
plMsgWrap* plDispatch::IDequeue(plMsgWrap** head, plMsgWrap** tail)
{
plMsgWrap* retVal = *head;
if( *head )
{
*head = (*head)->fNext;
if( *head )
(*head)->fBack = head;
}
if( tail && (*tail == retVal) )
*tail = *head;
return retVal;
}
hsBool plDispatch::ISortToDeferred(plMessage* msg)
{
plMsgWrap* msgWrap = TRACKED_NEW plMsgWrap(msg);
if( !fFutureMsgQueue )
{
if( IGetOwner() )
plgDispatch::Dispatch()->RegisterForExactType(plTimeMsg::Index(), IGetOwnerKey());
IInsertToQueue(&fFutureMsgQueue, msgWrap);
return false;
}
if( fFutureMsgQueue->fMsg->fTimeStamp > msgWrap->fMsg->fTimeStamp )
{
IInsertToQueue(&fFutureMsgQueue, msgWrap);
return false;
}
plMsgWrap* after = fFutureMsgQueue;
while( after->fNext && (after->fNext->fMsg->fTimeStamp < msgWrap->fMsg->fTimeStamp) )
after = after->fNext;
IInsertToQueue(&after->fNext, msgWrap);
return false;
}
void plDispatch::ICheckDeferred(double secs)
{
while( fFutureMsgQueue && (fFutureMsgQueue->fMsg->fTimeStamp < secs) )
{
plMsgWrap* send = IDequeue(&fFutureMsgQueue, nil);
MsgSend(send->fMsg);
delete send;
}
int timeIdx = plTimeMsg::Index();
if( IGetOwner()
&& !fFutureMsgQueue
&&
(
(timeIdx >= fRegisteredExactTypes.GetCount())
||
!fRegisteredExactTypes[plTimeMsg::Index()]
)
)
plgDispatch::Dispatch()->UnRegisterForExactType(plTimeMsg::Index(), IGetOwnerKey());
}
hsBool plDispatch::IListeningForExactType(UInt16 hClass)
{
if( (hClass == plTimeMsg::Index()) && fFutureMsgQueue )
return true;
return false;
}
void plDispatch::IMsgEnqueue(plMsgWrap* msgWrap, hsBool async)
{
fMsgCurrentMutex.Lock();
#ifdef HS_DEBUGGING
if( msgWrap->fMsg->HasBCastFlag(plMessage::kMsgWatch) )
fMsgWatch.Append(msgWrap->fMsg);
#endif // HS_DEBUGGING
if( fMsgTail )
fMsgTail = IInsertToQueue(&fMsgTail->fNext, msgWrap);
else
fMsgTail = IInsertToQueue(&fMsgHead, msgWrap);
fMsgCurrentMutex.Unlock();
if( !async )
// Test for fMsgActive in IMsgDispatch(), properly wrapped inside a mutex -mcn
IMsgDispatch();
}
// On starts deferring msg delivery until buffering is set to off again.
hsBool plDispatch::SetMsgBuffering(hsBool on)
{
fMsgCurrentMutex.Lock();
if( on )
{
hsAssert(fNumBufferReq || !fMsgActive, "Can't start deferring message delivery while delivering messages. See mf");
if( !fNumBufferReq && fMsgActive )
{
fMsgCurrentMutex.Unlock();
return false;
}
fNumBufferReq++;
fMsgActive = true;
fMsgCurrentMutex.Unlock();
}
else if( !--fNumBufferReq )
{
fMsgActive = false;
fMsgCurrentMutex.Unlock();
IMsgDispatch();
}
hsAssert(fNumBufferReq >= 0, "Mismatched number of on/off dispatch buffering requests");
return true;
}
void plDispatch::IMsgDispatch()
{
if( !fMsgDispatchLock.TryLock() )
return;
if( fMsgActive )
{
fMsgDispatchLock.Unlock();
return;
}
fMsgActive = true;
int responseLevel=0;
fMsgCurrentMutex.Lock();
plMsgWrap* origTail = fMsgTail;
while( fMsgCurrent = fMsgHead )
{
IDequeue(&fMsgHead, &fMsgTail);
fMsgCurrentMutex.Unlock();
plMessage* msg = fMsgCurrent->fMsg;
hsBool nonLocalMsg = msg && msg->HasBCastFlag(plMessage::kNetNonLocal);
#ifdef HS_DEBUGGING
int watchIdx = fMsgWatch.Find(msg);
if( fMsgWatch.kMissingIndex != watchIdx )
{
fMsgWatch.Remove(watchIdx);
#if HS_BUILD_FOR_WIN32
__asm { int 3 }
#endif // HS_BUILD_FOR_WIN32
}
#endif // HS_DEBUGGING
static UInt64 startTicks = 0;
if (plDispatchLogBase::IsLogging())
startTicks = hsTimer::GetFullTickCount();
int i, numReceivers=0;
for( i = 0; fMsgCurrent && i < fMsgCurrent->GetNumReceivers(); i++ )
{
const plKey& rcvKey = fMsgCurrent->GetReceiver(i);
plReceiver* rcv = rcvKey ? plReceiver::ConvertNoRef(rcvKey->ObjectIsLoaded()) : nil;
if( rcv )
{
if (nonLocalMsg)
{
// localOnly objects should not get remote messages
plSynchedObject* synchedObj = plSynchedObject::ConvertNoRef(rcv);
if (synchedObj && !synchedObj->IsNetSynched() )
{
continue;
}
if (plNetObjectDebuggerBase::GetInstance())
{ // log net msg if this is a debug object
hsKeyedObject* ko = hsKeyedObject::ConvertNoRef(rcv);
if (plNetObjectDebuggerBase::GetInstance()->IsDebugObject(ko))
{
hsLogEntry(plNetObjectDebuggerBase::GetInstance()->LogMsg(
xtl::format("<RCV> object:%s, GameMessage %s st=%.3f rt=%.3f",
ko->GetKeyName(), msg->ClassName(), hsTimer::GetSysSeconds(), hsTimer::GetSeconds()).c_str()));
}
}
}
#ifndef PLASMA_EXTERNAL_RELEASE
UInt32 rcvTicks = hsTimer::GetPrecTickCount();
// Object could be deleted by this message, so we need to log this stuff now
const char* keyname = "(unknown)";
const char* className = "(unknown)";
UInt32 clonePlayerID = 0;
if (plDispatchLogBase::IsLoggingLong())
{
hsKeyedObject* ko = hsKeyedObject::ConvertNoRef(rcv);
if (ko)
{
keyname = ko->GetKeyName();
clonePlayerID = ko->GetKey()->GetUoid().GetClonePlayerID();
className = ko->ClassName();
}
}
#endif // PLASMA_EXTERNAL_RELEASE
#ifdef HS_DEBUGGING
if (msg->GetBreakBeforeDispatch())
DebugBreakIfDebuggerPresent();
#endif
plProfile_BeginTiming(MsgReceive);
rcv->MsgReceive(msg);
plProfile_EndTiming(MsgReceive);
#ifndef PLASMA_EXTERNAL_RELEASE
if (plDispatchLogBase::IsLoggingLong())
{
rcvTicks = hsTimer::GetPrecTickCount() - rcvTicks;
float rcvTime = (float)(hsTimer::PrecTicksToSecs(rcvTicks) * 1000.f);
// If the receiver takes more than 5 ms to process its message, log it
if (rcvTime > 5.f)
plDispatchLogBase::GetInstance()->LogLongReceive(keyname, className, clonePlayerID, msg, rcvTime);
}
#endif // PLASMA_EXTERNAL_RELEASE
numReceivers++;
if (fMsgRecieveCallback != nil)
fMsgRecieveCallback();
}
}
// for message logging
// if (plDispatchLogBase::IsLogging())
// {
// float sendTime = hsTimer::FullTicksToMs(hsTimer::GetFullTickCount() - startTicks);
//
// plDispatchLogBase::GetInstance()->DumpMsg(msg, numReceivers, (int)sendTime, responseLevel*2 /* indent */);
// if (origTail==fMsgCurrent)
// { // if we deliver more msgs after this, they must be response msgs
// responseLevel++;
// origTail = fMsgTail;
// }
// }
fMsgCurrentMutex.Lock();
delete fMsgCurrent;
// TEMP
fMsgCurrent = (class plMsgWrap *)0xdeadc0de;
}
fMsgCurrentMutex.Unlock();
fMsgActive = false;
fMsgDispatchLock.Unlock();
}
//
// returns true if msg has been consumed and deleted
//
hsBool plDispatch::IMsgNetPropagate(plMessage* msg)
{
fMsgCurrentMutex.Lock();
// Make sure cascaded messages all have the same net flags
plNetClientApp::InheritNetMsgFlags(fMsgCurrent ? fMsgCurrent->fMsg : nil, msg, false);
fMsgCurrentMutex.Unlock();
// Decide if msg should go out over the network.
// If kNetForce is used, this message should always go out over the network, even if it's already
// part of a net cascade. We still want to inherit net status flags (but ignore them)
// so that response messages obey cascading rules. In other words, we are not
// halting the cascade, just overriding the send rule for this message.
if( msg->HasBCastFlag(plMessage::kNetPropagate) &&
(!msg->HasBCastFlag(plMessage::kNetSent) ||
msg->HasBCastFlag(plMessage::kNetForce) ||
msg->HasBCastFlag(plMessage::kNetNonDeterministic) ||
msg->HasBCastFlag(plMessage::kCCRSendToAllPlayers )) )
{
// send it off...
hsAssert(!msg->HasBCastFlag(plMessage::kNetStartCascade), "initial net cascade msg getting sent over the net again?");
if (plNetClientApp::GetInstance() && plNetClientApp::GetInstance()->ISendGameMessage(msg)>=0)
msg->SetBCastFlag(plMessage::kNetSent);
}
// Decide if msg should get sent locally
if (!msg->HasBCastFlag(plMessage::kLocalPropagate))
{
hsRefCnt_SafeUnRef(msg);
return true;
}
// since we've already checked this property, and the msg will be dispatched locally,
// it should not start any more net cascades.
msg->SetBCastFlag(plMessage::kNetStartCascade, false);
return false;
}
hsBool plDispatch::MsgSend(plMessage* msg, hsBool async)
{
if( IMsgNetPropagate(msg) )
return true;
plTimeMsg* timeMsg;
if( msg->GetTimeStamp() > hsTimer::GetSysSeconds() )
return ISortToDeferred(msg);
else if( timeMsg = plTimeMsg::ConvertNoRef(msg) )
ICheckDeferred(timeMsg->DSeconds());
plMsgWrap* msgWrap = TRACKED_NEW plMsgWrap(msg);
hsRefCnt_SafeUnRef(msg);
// broadcast
if( msg->HasBCastFlag(plMessage::kBCastByExactType) | msg->HasBCastFlag(plMessage::kBCastByType) )
{
int idx = msg->ClassIndex();
if( idx < fRegisteredExactTypes.GetCount() )
{
plTypeFilter* filt = fRegisteredExactTypes[idx];
if( filt )
{
int j;
for( j = 0; j < filt->fReceivers.GetCount(); j++ )
{
msgWrap->AddReceiver(filt->fReceivers[j]);
}
if( msg->HasBCastFlag(plMessage::kClearAfterBCast) )
{
delete filt;
fRegisteredExactTypes[idx] = nil;
}
}
}
}
// Direct communique
else
if( msg->GetNumReceivers() )
{
msgWrap->fReceivers = msg->fReceivers;
}
IMsgEnqueue(msgWrap, async);
return true;
}
void plDispatch::MsgQueueOnOff(hsBool sw)
{
fQueuedMsgOn = sw;
}
void plDispatch::MsgQueue(plMessage* msg)
{
if (fQueuedMsgOn)
{
fQueuedMsgListMutex.Lock();
hsAssert(msg,"Message missing");
fQueuedMsgList.push_back(msg);
fQueuedMsgListMutex.Unlock();
}
else
MsgSend(msg, false);
}
void plDispatch::MsgQueueProcess()
{
// Process all messages on Queue, unlock while sending them
// this would allow other threads to put new messages on the list while we send()
while (1)
{
plMessage * pMsg = nil;
fQueuedMsgListMutex.Lock();
int size = fQueuedMsgList.size();
if (size)
{ pMsg = fQueuedMsgList.front();
fQueuedMsgList.pop_front();
}
fQueuedMsgListMutex.Unlock();
if (pMsg)
{ MsgSend(pMsg, false);
}
if (!size)
break;
}
}
void plDispatch::RegisterForType(UInt16 hClass, const plKey& receiver)
{
int i;
for( i = 0; i < plFactory::GetNumClasses(); i++ )
{
if( plFactory::DerivesFrom(hClass, i) )
RegisterForExactType(i, receiver);
}
}
void plDispatch::RegisterForExactType(UInt16 hClass, const plKey& receiver)
{
int idx = hClass;
fRegisteredExactTypes.ExpandAndZero(idx+1);
plTypeFilter* filt = fRegisteredExactTypes[idx];
if( !filt )
{
filt = TRACKED_NEW plTypeFilter;
fRegisteredExactTypes[idx] = filt;
filt->fHClass = hClass;
}
if( filt->fReceivers.kMissingIndex == filt->fReceivers.Find(receiver) )
filt->fReceivers.Append(receiver);
}
void plDispatch::UnRegisterForType(UInt16 hClass, const plKey& receiver)
{
int i;
for( i = 0; i < fRegisteredExactTypes.GetCount(); i++ )
{
if( plFactory::DerivesFrom(hClass, i) )
IUnRegisterForExactType(i , receiver);
}
}
hsBool plDispatch::IUnRegisterForExactType(int idx, const plKey& receiver)
{
hsAssert(idx < fRegisteredExactTypes.GetCount(), "Out of range should be filtered before call to internal");
plTypeFilter* filt = fRegisteredExactTypes[idx];
if (!filt)
return false;
int j;
for( j = 0; j < filt->fReceivers.GetCount(); j++ )
{
if( receiver == filt->fReceivers[j] )
{
if( filt->fReceivers.GetCount() > 1 )
{
if( j < filt->fReceivers.GetCount() - 1 )
filt->fReceivers[j] = filt->fReceivers[filt->fReceivers.GetCount() - 1];
filt->fReceivers[filt->fReceivers.GetCount()-1] = nil;
filt->fReceivers.SetCount(filt->fReceivers.GetCount()-1);
}
else
{
delete filt;
fRegisteredExactTypes[idx] = nil;
}
break;
}
}
return false;
}
void plDispatch::UnRegisterAll(const plKey& receiver)
{
int i;
for( i = 0; i < fRegisteredExactTypes.GetCount(); i++ )
{
plTypeFilter* filt = fRegisteredExactTypes[i];
if( filt )
{
int idx = filt->fReceivers.Find(receiver);
if( idx != filt->fReceivers.kMissingIndex )
{
if( filt->fReceivers.GetCount() > 1 )
{
if( idx < filt->fReceivers.GetCount() - 1 )
filt->fReceivers[idx] = filt->fReceivers[filt->fReceivers.GetCount() - 1];
filt->fReceivers[filt->fReceivers.GetCount()-1] = nil;
filt->fReceivers.SetCount(filt->fReceivers.GetCount()-1);
}
else
{
delete filt;
fRegisteredExactTypes[i] = nil;
}
}
}
}
}
void plDispatch::UnRegisterForExactType(UInt16 hClass, const plKey& receiver)
{
int idx = hClass;
if( idx >= fRegisteredExactTypes.GetCount() )
return;
plTypeFilter* filt = fRegisteredExactTypes[idx];
if( !filt )
return;
IUnRegisterForExactType(idx, receiver);
}