/*==LICENSE==*
CyanWorlds.com Engine - MMOG client, server and tools
Copyright (C) 2011 Cyan Worlds, Inc.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
Additional permissions under GNU GPL version 3 section 7
If you modify this Program, or any covered work, by linking or
combining it with any of RAD Game Tools Bink SDK, Autodesk 3ds Max SDK,
NVIDIA PhysX SDK, Microsoft DirectX SDK, OpenSSL library, Independent
JPEG Group JPEG library, Microsoft Windows Media SDK, or Apple QuickTime SDK
(or a modified version of those libraries),
containing parts covered by the terms of the Bink SDK EULA, 3ds Max EULA,
PhysX SDK EULA, DirectX SDK EULA, OpenSSL and SSLeay licenses, IJG
JPEG Library README, Windows Media SDK EULA, or QuickTime SDK EULA, the
licensors of this Program grant you additional
permission to convey the resulting work. Corresponding Source for a
non-source form of such a combination shall include the source code for
the parts of OpenSSL and IJG JPEG Library used as well as that of the covered
work.
You can contact Cyan Worlds, Inc. by email legal@cyan.com
or by snail mail at:
Cyan Worlds, Inc.
14617 N Newport Hwy
Mead, WA 99021
*==LICENSE==*/
#include "hsTypes.h"
#include "hsResMgr.h"
#include "plDispatch.h"
#define PLMESSAGE_PRIVATE
#include "../pnMessage/plMessage.h"
#include "../pnKeyedObject/hsKeyedObject.h"
#include "hsTimer.h"
#include "../pnMessage/plTimeMsg.h"
#include "../pnKeyedObject/plKey.h"
#include "plDispatchLogBase.h"
#include "../pnNetCommon/plNetApp.h"
#include "../pnNetCommon/plSynchedObject.h"
#include "../pnNetCommon/pnNetCommon.h"
#include "hsThread.h"
#include "plProfile.h"
plProfile_CreateTimer("MsgReceive", "Update", MsgReceive);
plProfile_CreateTimer(" TimeMsg", "Update", TimeMsg);
plProfile_CreateTimer(" EvalMsg", "Update", EvalMsg);
plProfile_CreateTimer(" TransformMsg", "Update", TransformMsg);
plProfile_CreateTimer(" CameraMsg", "Update", CameraMsg);
class plMsgWrap
{
public:
plMsgWrap** fBack;
plMsgWrap* fNext;
hsTArray fReceivers;
plMessage* fMsg;
plMsgWrap(plMessage* msg) : fMsg(msg) { hsRefCnt_SafeRef(msg); }
virtual ~plMsgWrap() { hsRefCnt_SafeUnRef(fMsg); }
plMsgWrap& ClearReceivers() { fReceivers.SetCount(0); return *this; }
plMsgWrap& AddReceiver(const plKey& rcv)
{
hsAssert(rcv, "Trying to send mail to nil receiver");
fReceivers.Append(rcv); return *this;
}
const plKey& GetReceiver(int i) const { return fReceivers[i]; }
UInt32 GetNumReceivers() const { return fReceivers.GetCount(); }
};
Int32 plDispatch::fNumBufferReq = 0;
hsBool plDispatch::fMsgActive = false;
plMsgWrap* plDispatch::fMsgCurrent = nil;
plMsgWrap* plDispatch::fMsgHead = nil;
plMsgWrap* plDispatch::fMsgTail = nil;
hsTArray plDispatch::fMsgWatch;
MsgRecieveCallback plDispatch::fMsgRecieveCallback = nil;
hsMutex plDispatch::fMsgCurrentMutex; // mutex for fMsgCurrent
hsMutex plDispatch::fMsgDispatchLock; // mutex for IMsgDispatch
plDispatch::plDispatch()
: fOwner(nil), fFutureMsgQueue(nil), fQueuedMsgOn(true)
{
}
plDispatch::~plDispatch()
{
int i;
for( i = 0; i < fRegisteredExactTypes.GetCount(); i++ )
delete fRegisteredExactTypes[i];
ITrashUndelivered();
}
void plDispatch::ITrashUndelivered()
{
while( fFutureMsgQueue )
{
plMsgWrap* nuke = fFutureMsgQueue;
fFutureMsgQueue = fFutureMsgQueue->fNext;
hsRefCnt_SafeUnRef(nuke->fMsg);
delete nuke;
}
// If we're the main dispatch, any unsent messages at this
// point are just trashed. Slave dispatches just go away and
// leave their messages to be delivered when the main dispatch
// gets around to it.
if( this == plgDispatch::Dispatch() )
{
while( fMsgHead )
{
plMsgWrap* nuke = fMsgHead;
fMsgHead = fMsgHead->fNext;
// hsRefCnt_SafeUnRef(nuke->fMsg); // MOOSE - done in plMsgWrap dtor
delete nuke;
}
// reset static members which we just deleted - MOOSE
fMsgCurrent=fMsgHead=fMsgTail=nil;
fMsgActive = false;
}
}
plMsgWrap* plDispatch::IInsertToQueue(plMsgWrap** curr, plMsgWrap* isert)
{
isert->fNext = *curr;
isert->fBack = curr;
if( *curr )
(*curr)->fBack = &isert->fNext;
*curr = isert;
return isert;
}
plMsgWrap* plDispatch::IDequeue(plMsgWrap** head, plMsgWrap** tail)
{
plMsgWrap* retVal = *head;
if( *head )
{
*head = (*head)->fNext;
if( *head )
(*head)->fBack = head;
}
if( tail && (*tail == retVal) )
*tail = *head;
return retVal;
}
hsBool plDispatch::ISortToDeferred(plMessage* msg)
{
plMsgWrap* msgWrap = TRACKED_NEW plMsgWrap(msg);
if( !fFutureMsgQueue )
{
if( IGetOwner() )
plgDispatch::Dispatch()->RegisterForExactType(plTimeMsg::Index(), IGetOwnerKey());
IInsertToQueue(&fFutureMsgQueue, msgWrap);
return false;
}
if( fFutureMsgQueue->fMsg->fTimeStamp > msgWrap->fMsg->fTimeStamp )
{
IInsertToQueue(&fFutureMsgQueue, msgWrap);
return false;
}
plMsgWrap* after = fFutureMsgQueue;
while( after->fNext && (after->fNext->fMsg->fTimeStamp < msgWrap->fMsg->fTimeStamp) )
after = after->fNext;
IInsertToQueue(&after->fNext, msgWrap);
return false;
}
void plDispatch::ICheckDeferred(double secs)
{
while( fFutureMsgQueue && (fFutureMsgQueue->fMsg->fTimeStamp < secs) )
{
plMsgWrap* send = IDequeue(&fFutureMsgQueue, nil);
MsgSend(send->fMsg);
delete send;
}
int timeIdx = plTimeMsg::Index();
if( IGetOwner()
&& !fFutureMsgQueue
&&
(
(timeIdx >= fRegisteredExactTypes.GetCount())
||
!fRegisteredExactTypes[plTimeMsg::Index()]
)
)
plgDispatch::Dispatch()->UnRegisterForExactType(plTimeMsg::Index(), IGetOwnerKey());
}
hsBool plDispatch::IListeningForExactType(UInt16 hClass)
{
if( (hClass == plTimeMsg::Index()) && fFutureMsgQueue )
return true;
return false;
}
void plDispatch::IMsgEnqueue(plMsgWrap* msgWrap, hsBool async)
{
fMsgCurrentMutex.Lock();
#ifdef HS_DEBUGGING
if( msgWrap->fMsg->HasBCastFlag(plMessage::kMsgWatch) )
fMsgWatch.Append(msgWrap->fMsg);
#endif // HS_DEBUGGING
if( fMsgTail )
fMsgTail = IInsertToQueue(&fMsgTail->fNext, msgWrap);
else
fMsgTail = IInsertToQueue(&fMsgHead, msgWrap);
fMsgCurrentMutex.Unlock();
if( !async )
// Test for fMsgActive in IMsgDispatch(), properly wrapped inside a mutex -mcn
IMsgDispatch();
}
// On starts deferring msg delivery until buffering is set to off again.
hsBool plDispatch::SetMsgBuffering(hsBool on)
{
fMsgCurrentMutex.Lock();
if( on )
{
hsAssert(fNumBufferReq || !fMsgActive, "Can't start deferring message delivery while delivering messages. See mf");
if( !fNumBufferReq && fMsgActive )
{
fMsgCurrentMutex.Unlock();
return false;
}
fNumBufferReq++;
fMsgActive = true;
fMsgCurrentMutex.Unlock();
}
else if( !--fNumBufferReq )
{
fMsgActive = false;
fMsgCurrentMutex.Unlock();
IMsgDispatch();
}
hsAssert(fNumBufferReq >= 0, "Mismatched number of on/off dispatch buffering requests");
return true;
}
void plDispatch::IMsgDispatch()
{
if( !fMsgDispatchLock.TryLock() )
return;
if( fMsgActive )
{
fMsgDispatchLock.Unlock();
return;
}
fMsgActive = true;
int responseLevel=0;
fMsgCurrentMutex.Lock();
plMsgWrap* origTail = fMsgTail;
while( fMsgCurrent = fMsgHead )
{
IDequeue(&fMsgHead, &fMsgTail);
fMsgCurrentMutex.Unlock();
plMessage* msg = fMsgCurrent->fMsg;
hsBool nonLocalMsg = msg && msg->HasBCastFlag(plMessage::kNetNonLocal);
#ifdef HS_DEBUGGING
int watchIdx = fMsgWatch.Find(msg);
if( fMsgWatch.kMissingIndex != watchIdx )
{
fMsgWatch.Remove(watchIdx);
#if HS_BUILD_FOR_WIN32
__asm { int 3 }
#endif // HS_BUILD_FOR_WIN32
}
#endif // HS_DEBUGGING
static UInt64 startTicks = 0;
if (plDispatchLogBase::IsLogging())
startTicks = hsTimer::GetFullTickCount();
int i, numReceivers=0;
for( i = 0; fMsgCurrent && i < fMsgCurrent->GetNumReceivers(); i++ )
{
const plKey& rcvKey = fMsgCurrent->GetReceiver(i);
plReceiver* rcv = rcvKey ? plReceiver::ConvertNoRef(rcvKey->ObjectIsLoaded()) : nil;
if( rcv )
{
if (nonLocalMsg)
{
// localOnly objects should not get remote messages
plSynchedObject* synchedObj = plSynchedObject::ConvertNoRef(rcv);
if (synchedObj && !synchedObj->IsNetSynched() )
{
continue;
}
if (plNetObjectDebuggerBase::GetInstance())
{ // log net msg if this is a debug object
hsKeyedObject* ko = hsKeyedObject::ConvertNoRef(rcv);
if (plNetObjectDebuggerBase::GetInstance()->IsDebugObject(ko))
{
hsLogEntry(plNetObjectDebuggerBase::GetInstance()->LogMsg(
xtl::format(" object:%s, GameMessage %s st=%.3f rt=%.3f",
ko->GetKeyName(), msg->ClassName(), hsTimer::GetSysSeconds(), hsTimer::GetSeconds()).c_str()));
}
}
}
#ifndef PLASMA_EXTERNAL_RELEASE
UInt32 rcvTicks = hsTimer::GetPrecTickCount();
// Object could be deleted by this message, so we need to log this stuff now
const char* keyname = "(unknown)";
const char* className = "(unknown)";
UInt32 clonePlayerID = 0;
if (plDispatchLogBase::IsLoggingLong())
{
hsKeyedObject* ko = hsKeyedObject::ConvertNoRef(rcv);
if (ko)
{
keyname = ko->GetKeyName();
clonePlayerID = ko->GetKey()->GetUoid().GetClonePlayerID();
className = ko->ClassName();
}
}
#endif // PLASMA_EXTERNAL_RELEASE
#ifdef HS_DEBUGGING
if (msg->GetBreakBeforeDispatch())
DebugBreakIfDebuggerPresent();
#endif
plProfile_BeginTiming(MsgReceive);
rcv->MsgReceive(msg);
plProfile_EndTiming(MsgReceive);
#ifndef PLASMA_EXTERNAL_RELEASE
if (plDispatchLogBase::IsLoggingLong())
{
rcvTicks = hsTimer::GetPrecTickCount() - rcvTicks;
float rcvTime = (float)(hsTimer::PrecTicksToSecs(rcvTicks) * 1000.f);
// If the receiver takes more than 5 ms to process its message, log it
if (rcvTime > 5.f)
plDispatchLogBase::GetInstance()->LogLongReceive(keyname, className, clonePlayerID, msg, rcvTime);
}
#endif // PLASMA_EXTERNAL_RELEASE
numReceivers++;
if (fMsgRecieveCallback != nil)
fMsgRecieveCallback();
}
}
// for message logging
// if (plDispatchLogBase::IsLogging())
// {
// float sendTime = hsTimer::FullTicksToMs(hsTimer::GetFullTickCount() - startTicks);
//
// plDispatchLogBase::GetInstance()->DumpMsg(msg, numReceivers, (int)sendTime, responseLevel*2 /* indent */);
// if (origTail==fMsgCurrent)
// { // if we deliver more msgs after this, they must be response msgs
// responseLevel++;
// origTail = fMsgTail;
// }
// }
fMsgCurrentMutex.Lock();
delete fMsgCurrent;
// TEMP
fMsgCurrent = (class plMsgWrap *)0xdeadc0de;
}
fMsgCurrentMutex.Unlock();
fMsgActive = false;
fMsgDispatchLock.Unlock();
}
//
// returns true if msg has been consumed and deleted
//
hsBool plDispatch::IMsgNetPropagate(plMessage* msg)
{
fMsgCurrentMutex.Lock();
// Make sure cascaded messages all have the same net flags
plNetClientApp::InheritNetMsgFlags(fMsgCurrent ? fMsgCurrent->fMsg : nil, msg, false);
fMsgCurrentMutex.Unlock();
// Decide if msg should go out over the network.
// If kNetForce is used, this message should always go out over the network, even if it's already
// part of a net cascade. We still want to inherit net status flags (but ignore them)
// so that response messages obey cascading rules. In other words, we are not
// halting the cascade, just overriding the send rule for this message.
if( msg->HasBCastFlag(plMessage::kNetPropagate) &&
(!msg->HasBCastFlag(plMessage::kNetSent) ||
msg->HasBCastFlag(plMessage::kNetForce) ||
msg->HasBCastFlag(plMessage::kNetNonDeterministic) ||
msg->HasBCastFlag(plMessage::kCCRSendToAllPlayers )) )
{
// send it off...
hsAssert(!msg->HasBCastFlag(plMessage::kNetStartCascade), "initial net cascade msg getting sent over the net again?");
if (plNetClientApp::GetInstance() && plNetClientApp::GetInstance()->ISendGameMessage(msg)>=0)
msg->SetBCastFlag(plMessage::kNetSent);
}
// Decide if msg should get sent locally
if (!msg->HasBCastFlag(plMessage::kLocalPropagate))
{
hsRefCnt_SafeUnRef(msg);
return true;
}
// since we've already checked this property, and the msg will be dispatched locally,
// it should not start any more net cascades.
msg->SetBCastFlag(plMessage::kNetStartCascade, false);
return false;
}
hsBool plDispatch::MsgSend(plMessage* msg, hsBool async)
{
if( IMsgNetPropagate(msg) )
return true;
plTimeMsg* timeMsg;
if( msg->GetTimeStamp() > hsTimer::GetSysSeconds() )
return ISortToDeferred(msg);
else if( timeMsg = plTimeMsg::ConvertNoRef(msg) )
ICheckDeferred(timeMsg->DSeconds());
plMsgWrap* msgWrap = TRACKED_NEW plMsgWrap(msg);
hsRefCnt_SafeUnRef(msg);
// broadcast
if( msg->HasBCastFlag(plMessage::kBCastByExactType) | msg->HasBCastFlag(plMessage::kBCastByType) )
{
int idx = msg->ClassIndex();
if( idx < fRegisteredExactTypes.GetCount() )
{
plTypeFilter* filt = fRegisteredExactTypes[idx];
if( filt )
{
int j;
for( j = 0; j < filt->fReceivers.GetCount(); j++ )
{
msgWrap->AddReceiver(filt->fReceivers[j]);
}
if( msg->HasBCastFlag(plMessage::kClearAfterBCast) )
{
delete filt;
fRegisteredExactTypes[idx] = nil;
}
}
}
}
// Direct communique
else
if( msg->GetNumReceivers() )
{
msgWrap->fReceivers = msg->fReceivers;
}
IMsgEnqueue(msgWrap, async);
return true;
}
void plDispatch::MsgQueueOnOff(hsBool sw)
{
fQueuedMsgOn = sw;
}
void plDispatch::MsgQueue(plMessage* msg)
{
if (fQueuedMsgOn)
{
fQueuedMsgListMutex.Lock();
hsAssert(msg,"Message missing");
fQueuedMsgList.push_back(msg);
fQueuedMsgListMutex.Unlock();
}
else
MsgSend(msg, false);
}
void plDispatch::MsgQueueProcess()
{
// Process all messages on Queue, unlock while sending them
// this would allow other threads to put new messages on the list while we send()
while (1)
{
plMessage * pMsg = nil;
fQueuedMsgListMutex.Lock();
int size = fQueuedMsgList.size();
if (size)
{ pMsg = fQueuedMsgList.front();
fQueuedMsgList.pop_front();
}
fQueuedMsgListMutex.Unlock();
if (pMsg)
{ MsgSend(pMsg, false);
}
if (!size)
break;
}
}
void plDispatch::RegisterForType(UInt16 hClass, const plKey& receiver)
{
int i;
for( i = 0; i < plFactory::GetNumClasses(); i++ )
{
if( plFactory::DerivesFrom(hClass, i) )
RegisterForExactType(i, receiver);
}
}
void plDispatch::RegisterForExactType(UInt16 hClass, const plKey& receiver)
{
int idx = hClass;
fRegisteredExactTypes.ExpandAndZero(idx+1);
plTypeFilter* filt = fRegisteredExactTypes[idx];
if( !filt )
{
filt = TRACKED_NEW plTypeFilter;
fRegisteredExactTypes[idx] = filt;
filt->fHClass = hClass;
}
if( filt->fReceivers.kMissingIndex == filt->fReceivers.Find(receiver) )
filt->fReceivers.Append(receiver);
}
void plDispatch::UnRegisterForType(UInt16 hClass, const plKey& receiver)
{
int i;
for( i = 0; i < fRegisteredExactTypes.GetCount(); i++ )
{
if( plFactory::DerivesFrom(hClass, i) )
IUnRegisterForExactType(i , receiver);
}
}
hsBool plDispatch::IUnRegisterForExactType(int idx, const plKey& receiver)
{
hsAssert(idx < fRegisteredExactTypes.GetCount(), "Out of range should be filtered before call to internal");
plTypeFilter* filt = fRegisteredExactTypes[idx];
if (!filt)
return false;
int j;
for( j = 0; j < filt->fReceivers.GetCount(); j++ )
{
if( receiver == filt->fReceivers[j] )
{
if( filt->fReceivers.GetCount() > 1 )
{
if( j < filt->fReceivers.GetCount() - 1 )
filt->fReceivers[j] = filt->fReceivers[filt->fReceivers.GetCount() - 1];
filt->fReceivers[filt->fReceivers.GetCount()-1] = nil;
filt->fReceivers.SetCount(filt->fReceivers.GetCount()-1);
}
else
{
delete filt;
fRegisteredExactTypes[idx] = nil;
}
break;
}
}
return false;
}
void plDispatch::UnRegisterAll(const plKey& receiver)
{
int i;
for( i = 0; i < fRegisteredExactTypes.GetCount(); i++ )
{
plTypeFilter* filt = fRegisteredExactTypes[i];
if( filt )
{
int idx = filt->fReceivers.Find(receiver);
if( idx != filt->fReceivers.kMissingIndex )
{
if( filt->fReceivers.GetCount() > 1 )
{
if( idx < filt->fReceivers.GetCount() - 1 )
filt->fReceivers[idx] = filt->fReceivers[filt->fReceivers.GetCount() - 1];
filt->fReceivers[filt->fReceivers.GetCount()-1] = nil;
filt->fReceivers.SetCount(filt->fReceivers.GetCount()-1);
}
else
{
delete filt;
fRegisteredExactTypes[i] = nil;
}
}
}
}
}
void plDispatch::UnRegisterForExactType(UInt16 hClass, const plKey& receiver)
{
int idx = hClass;
if( idx >= fRegisteredExactTypes.GetCount() )
return;
plTypeFilter* filt = fRegisteredExactTypes[idx];
if( !filt )
return;
IUnRegisterForExactType(idx, receiver);
}