/*==LICENSE==*

CyanWorlds.com Engine - MMOG client, server and tools
Copyright (C) 2011 Cyan Worlds, Inc.

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.

Additional permissions under GNU GPL version 3 section 7

If you modify this Program, or any covered work, by linking or
combining it with any of RAD Game Tools Bink SDK, Autodesk 3ds Max SDK,
NVIDIA PhysX SDK, Microsoft DirectX SDK, OpenSSL library, Independent
JPEG Group JPEG library, Microsoft Windows Media SDK, or Apple QuickTime SDK
(or a modified version of those libraries),
containing parts covered by the terms of the Bink SDK EULA, 3ds Max EULA,
PhysX SDK EULA, DirectX SDK EULA, OpenSSL and SSLeay licenses, IJG
JPEG Library README, Windows Media SDK EULA, or QuickTime SDK EULA, the
licensors of this Program grant you additional
permission to convey the resulting work. Corresponding Source for a
non-source form of such a combination shall include the source code for
the parts of OpenSSL and IJG JPEG Library used as well as that of the covered
work.

You can contact Cyan Worlds, Inc. by email legal@cyan.com
 or by snail mail at:
      Cyan Worlds, Inc.
      14617 N Newport Hwy
      Mead, WA   99021

*==LICENSE==*/

#include "hsTypes.h"
#include "hsResMgr.h"
#include "plDispatch.h"
#define PLMESSAGE_PRIVATE
#include "../pnMessage/plMessage.h"
#include "../pnKeyedObject/hsKeyedObject.h"
#include "hsTimer.h"
#include "../pnMessage/plTimeMsg.h"
#include "../pnKeyedObject/plKey.h"
#include "plDispatchLogBase.h"
#include "../pnNetCommon/plNetApp.h"
#include "../pnNetCommon/plSynchedObject.h"
#include "../pnNetCommon/pnNetCommon.h"
#include "hsThread.h"
#include "plProfile.h"

plProfile_CreateTimer("MsgReceive", "Update", MsgReceive);
plProfile_CreateTimer("  TimeMsg", "Update", TimeMsg);
plProfile_CreateTimer("  EvalMsg", "Update", EvalMsg);
plProfile_CreateTimer("  TransformMsg", "Update", TransformMsg);
plProfile_CreateTimer("  CameraMsg", "Update", CameraMsg);

class plMsgWrap
{
public:
	plMsgWrap**						fBack;
	plMsgWrap*						fNext;
	hsTArray<plKey>					fReceivers;

	plMessage*						fMsg;

	plMsgWrap(plMessage* msg) : fMsg(msg) { hsRefCnt_SafeRef(msg); }
	virtual ~plMsgWrap() { hsRefCnt_SafeUnRef(fMsg); }

	plMsgWrap&						ClearReceivers() { fReceivers.SetCount(0); return *this; }
	plMsgWrap&						AddReceiver(const plKey& rcv) 
									{ 
										hsAssert(rcv, "Trying to send mail to nil receiver");
										fReceivers.Append(rcv); return *this;
									}
	const plKey&					GetReceiver(int i) const { return fReceivers[i]; }
	UInt32							GetNumReceivers() const { return fReceivers.GetCount(); }
};

Int32					plDispatch::fNumBufferReq = 0;
hsBool					plDispatch::fMsgActive = false;
plMsgWrap*				plDispatch::fMsgCurrent = nil;
plMsgWrap*				plDispatch::fMsgHead = nil;
plMsgWrap*				plDispatch::fMsgTail = nil;
hsTArray<plMessage*>	plDispatch::fMsgWatch;
MsgRecieveCallback		plDispatch::fMsgRecieveCallback = nil;

hsMutex		plDispatch::fMsgCurrentMutex; // mutex for fMsgCurrent
hsMutex		plDispatch::fMsgDispatchLock; // mutex for IMsgDispatch


plDispatch::plDispatch()
: fOwner(nil), fFutureMsgQueue(nil), fQueuedMsgOn(true)
{
}

plDispatch::~plDispatch()
{
	hsAssert(fRegisteredExactTypes.GetCount() == 0, "registered type after Dispatch shutdown");
	ITrashUndelivered();
}

void plDispatch::BeginShutdown()
{
	fRegisteredExactTypes.Reset();
	ITrashUndelivered();
}

void plDispatch::ITrashUndelivered()
{
	while( fFutureMsgQueue )
	{
		plMsgWrap* nuke = fFutureMsgQueue;
		fFutureMsgQueue = fFutureMsgQueue->fNext;
		hsRefCnt_SafeUnRef(nuke->fMsg);
		delete nuke;
	}

	// If we're the main dispatch, any unsent messages at this
	// point are just trashed. Slave dispatches just go away and
	// leave their messages to be delivered when the main dispatch
	// gets around to it.
	if( this == plgDispatch::Dispatch() )
	{
		while( fMsgHead )
		{
			plMsgWrap* nuke = fMsgHead;
			fMsgHead = fMsgHead->fNext;
			// hsRefCnt_SafeUnRef(nuke->fMsg);		// MOOSE - done in plMsgWrap dtor
			delete nuke;
		}

		// reset static members which we just deleted - MOOSE
		fMsgCurrent=fMsgHead=fMsgTail=nil;

		fMsgActive = false;
	}
}

plMsgWrap* plDispatch::IInsertToQueue(plMsgWrap** curr, plMsgWrap* isert)
{
	isert->fNext = *curr;
	isert->fBack = curr;
	if( *curr )
		(*curr)->fBack = &isert->fNext;
	*curr = isert;
	return isert;
}

plMsgWrap* plDispatch::IDequeue(plMsgWrap** head, plMsgWrap** tail)
{
	plMsgWrap* retVal = *head;
	if( *head )
	{
		*head = (*head)->fNext;
		if( *head )
			(*head)->fBack = head;
	}
	if( tail && (*tail == retVal) )
		*tail = *head;
	return retVal;
}

hsBool plDispatch::ISortToDeferred(plMessage* msg)
{
	plMsgWrap* msgWrap = TRACKED_NEW plMsgWrap(msg);
	if( !fFutureMsgQueue )
	{
		if( IGetOwner() )
			plgDispatch::Dispatch()->RegisterForExactType(plTimeMsg::Index(), IGetOwnerKey());

		IInsertToQueue(&fFutureMsgQueue, msgWrap);
		return false;
	}
	if( fFutureMsgQueue->fMsg->fTimeStamp > msgWrap->fMsg->fTimeStamp )
	{
		IInsertToQueue(&fFutureMsgQueue, msgWrap);
		return false;
	}
	plMsgWrap* after = fFutureMsgQueue;
	while( after->fNext && (after->fNext->fMsg->fTimeStamp < msgWrap->fMsg->fTimeStamp) )
		after = after->fNext;

	IInsertToQueue(&after->fNext, msgWrap);

	return false;
}

void plDispatch::ICheckDeferred(double secs)
{
	while( fFutureMsgQueue && (fFutureMsgQueue->fMsg->fTimeStamp < secs) )
	{
		plMsgWrap* send = IDequeue(&fFutureMsgQueue, nil);
		MsgSend(send->fMsg);
		delete send;
	}

	int timeIdx = plTimeMsg::Index();
	if( IGetOwner()
		&& !fFutureMsgQueue 
		&& 
			( 
				(timeIdx >= fRegisteredExactTypes.GetCount()) 
				|| 
				!fRegisteredExactTypes[plTimeMsg::Index()]
			)
	  )
		plgDispatch::Dispatch()->UnRegisterForExactType(plTimeMsg::Index(), IGetOwnerKey());
}

hsBool plDispatch::IListeningForExactType(UInt16 hClass)
{
	if( (hClass == plTimeMsg::Index()) && fFutureMsgQueue )
		return true;

	return false;
}

void plDispatch::IMsgEnqueue(plMsgWrap* msgWrap, hsBool async)
{
	fMsgCurrentMutex.Lock();

#ifdef HS_DEBUGGING
	if( msgWrap->fMsg->HasBCastFlag(plMessage::kMsgWatch) )
		fMsgWatch.Append(msgWrap->fMsg);
#endif // HS_DEBUGGING

	if( fMsgTail )
		fMsgTail = IInsertToQueue(&fMsgTail->fNext, msgWrap);
	else
		fMsgTail = IInsertToQueue(&fMsgHead, msgWrap);
	fMsgCurrentMutex.Unlock();

	if( !async )
		// Test for fMsgActive in IMsgDispatch(), properly wrapped inside a mutex -mcn
		IMsgDispatch();
}

// On starts deferring msg delivery until buffering is set to off again.
hsBool plDispatch::SetMsgBuffering(hsBool on)
{
	fMsgCurrentMutex.Lock();
	if( on )
	{
		hsAssert(fNumBufferReq || !fMsgActive, "Can't start deferring message delivery while delivering messages. See mf");
		if( !fNumBufferReq && fMsgActive )
		{
			fMsgCurrentMutex.Unlock();
			return false;
		}

		fNumBufferReq++;
		fMsgActive = true;
		fMsgCurrentMutex.Unlock();
	}
	else if( !--fNumBufferReq )
	{
		fMsgActive = false;
		fMsgCurrentMutex.Unlock();
		IMsgDispatch();
	}
	hsAssert(fNumBufferReq >= 0, "Mismatched number of on/off dispatch buffering requests");

	return true;
}

void plDispatch::IMsgDispatch()
{
	if( !fMsgDispatchLock.TryLock() )
		return;

	if( fMsgActive )
	{
		fMsgDispatchLock.Unlock();
		return;
	}

	fMsgActive = true;
	int responseLevel=0;

	fMsgCurrentMutex.Lock();

	plMsgWrap* origTail = fMsgTail;
	while( fMsgCurrent = fMsgHead )
	{
		IDequeue(&fMsgHead, &fMsgTail);
		fMsgCurrentMutex.Unlock();

		plMessage* msg = fMsgCurrent->fMsg;
		hsBool nonLocalMsg = msg && msg->HasBCastFlag(plMessage::kNetNonLocal);

#ifdef HS_DEBUGGING
		int watchIdx = fMsgWatch.Find(msg);
		if( fMsgWatch.kMissingIndex != watchIdx )
		{
			fMsgWatch.Remove(watchIdx);
#if HS_BUILD_FOR_WIN32
			__asm { int 3 }
#endif // HS_BUILD_FOR_WIN32
		}
#endif // HS_DEBUGGING

		static UInt64 startTicks = 0;
		if (plDispatchLogBase::IsLogging())
			startTicks = hsTimer::GetFullTickCount();

		int i, numReceivers=0;
		for( i = 0; fMsgCurrent && i < fMsgCurrent->GetNumReceivers(); i++ )
		{
			const plKey& rcvKey = fMsgCurrent->GetReceiver(i);
			plReceiver* rcv = rcvKey ? plReceiver::ConvertNoRef(rcvKey->ObjectIsLoaded()) : nil;
			if( rcv )
			{
				if (nonLocalMsg)
				{
					// localOnly objects should not get remote messages
					plSynchedObject* synchedObj = plSynchedObject::ConvertNoRef(rcv);
					if (synchedObj && !synchedObj->IsNetSynched() )
					{
						continue;
					}

					if (plNetObjectDebuggerBase::GetInstance())
					{	// log net msg if this is a debug object
						hsKeyedObject* ko = hsKeyedObject::ConvertNoRef(rcv);
						if (plNetObjectDebuggerBase::GetInstance()->IsDebugObject(ko))
						{
							hsLogEntry(plNetObjectDebuggerBase::GetInstance()->LogMsg(
								xtl::format("<RCV> object:%s, GameMessage %s st=%.3f rt=%.3f", 
								ko->GetKeyName(), msg->ClassName(), hsTimer::GetSysSeconds(), hsTimer::GetSeconds()).c_str()));
						}
					}
				}

#ifndef PLASMA_EXTERNAL_RELEASE
				UInt32 rcvTicks = hsTimer::GetPrecTickCount();

				// Object could be deleted by this message, so we need to log this stuff now
				const char* keyname = "(unknown)";
				const char* className = "(unknown)";
				UInt32 clonePlayerID = 0;
				if (plDispatchLogBase::IsLoggingLong())
				{
					hsKeyedObject* ko = hsKeyedObject::ConvertNoRef(rcv);
					if (ko)
					{
						keyname = ko->GetKeyName();
						clonePlayerID = ko->GetKey()->GetUoid().GetClonePlayerID();
						className = ko->ClassName();
					}
				}
#endif // PLASMA_EXTERNAL_RELEASE

				#ifdef HS_DEBUGGING
				if (msg->GetBreakBeforeDispatch())
					DebugBreakIfDebuggerPresent();
				#endif
					
				plProfile_BeginTiming(MsgReceive);
				rcv->MsgReceive(msg);
				plProfile_EndTiming(MsgReceive);

#ifndef PLASMA_EXTERNAL_RELEASE
				if (plDispatchLogBase::IsLoggingLong())
				{
					rcvTicks = hsTimer::GetPrecTickCount() - rcvTicks;

					float rcvTime = (float)(hsTimer::PrecTicksToSecs(rcvTicks) * 1000.f);
					// If the receiver takes more than 5 ms to process its message, log it
					if (rcvTime > 5.f)
						plDispatchLogBase::GetInstance()->LogLongReceive(keyname, className, clonePlayerID, msg, rcvTime);
				}
#endif // PLASMA_EXTERNAL_RELEASE

				numReceivers++;

				if (fMsgRecieveCallback != nil)
					fMsgRecieveCallback();
			}
		}

		// for message logging
//		if (plDispatchLogBase::IsLogging())
//		{
//			float sendTime = hsTimer::FullTicksToMs(hsTimer::GetFullTickCount() - startTicks);
//
//			plDispatchLogBase::GetInstance()->DumpMsg(msg, numReceivers, (int)sendTime, responseLevel*2 /* indent */);
//			if (origTail==fMsgCurrent)
//			{	// if we deliver more msgs after this, they must be response msgs
//				responseLevel++;
//				origTail = fMsgTail;
//			}
//		}

		fMsgCurrentMutex.Lock();

		delete fMsgCurrent;
		// TEMP
		fMsgCurrent = (class plMsgWrap *)0xdeadc0de;
	}
	fMsgCurrentMutex.Unlock();

	fMsgActive = false;
	fMsgDispatchLock.Unlock();
}

//
// returns true if msg has been consumed and deleted
//
hsBool plDispatch::IMsgNetPropagate(plMessage* msg)
{
	fMsgCurrentMutex.Lock();

	// Make sure cascaded messages all have the same net flags
	plNetClientApp::InheritNetMsgFlags(fMsgCurrent ? fMsgCurrent->fMsg : nil, msg, false);

	fMsgCurrentMutex.Unlock();

	// Decide if msg should go out over the network.
	// If kNetForce is used, this message should always go out over the network, even if it's already 
	// part of a net cascade. We still want to inherit net status flags (but ignore them)
	// so that response messages obey cascading rules.  In other words, we are not 
	// halting the cascade, just overriding the send rule for this message.
	if( msg->HasBCastFlag(plMessage::kNetPropagate) && 
		(!msg->HasBCastFlag(plMessage::kNetSent) || 
		msg->HasBCastFlag(plMessage::kNetForce) ||
		msg->HasBCastFlag(plMessage::kNetNonDeterministic) ||
		msg->HasBCastFlag(plMessage::kCCRSendToAllPlayers )) )
	{
		// send it off...
		hsAssert(!msg->HasBCastFlag(plMessage::kNetStartCascade), "initial net cascade msg getting sent over the net again?");
		if (plNetClientApp::GetInstance() && plNetClientApp::GetInstance()->ISendGameMessage(msg)>=0)
			msg->SetBCastFlag(plMessage::kNetSent);
	}

	// Decide if msg should get sent locally
	if (!msg->HasBCastFlag(plMessage::kLocalPropagate))
	{	
		hsRefCnt_SafeUnRef(msg);
		return true;
	}

	// since we've already checked this property, and the msg will be dispatched locally,
	// it should not start any more net cascades.
	msg->SetBCastFlag(plMessage::kNetStartCascade, false);

	return false;
}

hsBool plDispatch::MsgSend(plMessage* msg, hsBool async)
{
	if( IMsgNetPropagate(msg) )
		return true;

	plTimeMsg* timeMsg;
	if( msg->GetTimeStamp() > hsTimer::GetSysSeconds() )
		return ISortToDeferred(msg);
	else if( timeMsg = plTimeMsg::ConvertNoRef(msg) )
		ICheckDeferred(timeMsg->DSeconds());

	plMsgWrap* msgWrap = TRACKED_NEW plMsgWrap(msg);
	hsRefCnt_SafeUnRef(msg);

	// broadcast
	if( msg->HasBCastFlag(plMessage::kBCastByExactType) | msg->HasBCastFlag(plMessage::kBCastByType) )
	{
		int idx = msg->ClassIndex();
		if( idx < fRegisteredExactTypes.GetCount() )
		{
			plTypeFilter* filt = fRegisteredExactTypes[idx];
			if( filt )
			{
				int j;
				for( j = 0; j < filt->fReceivers.GetCount(); j++ )
				{
					msgWrap->AddReceiver(filt->fReceivers[j]);
				}
				if( msg->HasBCastFlag(plMessage::kClearAfterBCast) )
				{
					delete filt;
					fRegisteredExactTypes[idx] = nil;
				}
			}
		}
	}
	// Direct communique
	else
	if( msg->GetNumReceivers() )
	{
		msgWrap->fReceivers = msg->fReceivers;
	}
	IMsgEnqueue(msgWrap, async);

	return true;
}
void plDispatch::MsgQueueOnOff(hsBool sw)
{
	fQueuedMsgOn = sw;
}
void plDispatch::MsgQueue(plMessage* msg)
{
	if (fQueuedMsgOn)
	{
		fQueuedMsgListMutex.Lock();
		hsAssert(msg,"Message missing");
		fQueuedMsgList.push_back(msg);
		fQueuedMsgListMutex.Unlock();
	}
	else
		MsgSend(msg, false);
}

void plDispatch::MsgQueueProcess()
{
		// Process all messages on Queue, unlock while sending them
		// this would allow other threads to put new messages on the list while we send()
	while (1)
	{	
		plMessage * pMsg = nil;
		fQueuedMsgListMutex.Lock();
		int size = fQueuedMsgList.size();
		if (size)
		{	pMsg = fQueuedMsgList.front();
			fQueuedMsgList.pop_front();
		}
		fQueuedMsgListMutex.Unlock();
		if (pMsg)
		{	MsgSend(pMsg, false);
		}
		if (!size)
			break;
	}
}

void plDispatch::RegisterForType(UInt16 hClass, const plKey& receiver)
{
	int i;
	for( i = 0; i < plFactory::GetNumClasses(); i++ )
	{
		if( plFactory::DerivesFrom(hClass, i) )
			RegisterForExactType(i, receiver);
	}
}

void plDispatch::RegisterForExactType(UInt16 hClass, const plKey& receiver)
{
	int idx = hClass;
	fRegisteredExactTypes.ExpandAndZero(idx+1);
	plTypeFilter* filt = fRegisteredExactTypes[idx];
	if( !filt )
	{
		filt = TRACKED_NEW plTypeFilter;
		fRegisteredExactTypes[idx] = filt;
		filt->fHClass = hClass;
	}

	if( filt->fReceivers.kMissingIndex == filt->fReceivers.Find(receiver) )
		filt->fReceivers.Append(receiver);
}

void plDispatch::UnRegisterForType(UInt16 hClass, const plKey& receiver)
{
	int i;
	for( i = 0; i < fRegisteredExactTypes.GetCount(); i++ )
	{
		if( plFactory::DerivesFrom(hClass, i) )
			IUnRegisterForExactType(i , receiver);
	}

}

hsBool plDispatch::IUnRegisterForExactType(int idx, const plKey& receiver)
{
	hsAssert(idx < fRegisteredExactTypes.GetCount(), "Out of range should be filtered before call to internal");
	plTypeFilter* filt = fRegisteredExactTypes[idx];
	if (!filt)
		return false;
	int j;
	for( j = 0; j < filt->fReceivers.GetCount(); j++ )
	{
		if( receiver == filt->fReceivers[j] )
		{
			if( filt->fReceivers.GetCount() > 1 )
			{
				if( j < filt->fReceivers.GetCount() - 1 )
					filt->fReceivers[j] = filt->fReceivers[filt->fReceivers.GetCount() - 1];
				filt->fReceivers[filt->fReceivers.GetCount()-1] = nil;
				filt->fReceivers.SetCount(filt->fReceivers.GetCount()-1);
			}
			else
			{
				delete filt;
				fRegisteredExactTypes[idx] = nil;
			}

			break;
		}
	}
	return false;
}

void plDispatch::UnRegisterAll(const plKey& receiver)
{
	int i;
	for( i = 0; i < fRegisteredExactTypes.GetCount(); i++ )
	{
		plTypeFilter* filt = fRegisteredExactTypes[i];
		if( filt )
		{
			int idx = filt->fReceivers.Find(receiver);
			if( idx != filt->fReceivers.kMissingIndex )
			{
				if( filt->fReceivers.GetCount() > 1 )
				{
					if( idx < filt->fReceivers.GetCount() - 1 )
						filt->fReceivers[idx] = filt->fReceivers[filt->fReceivers.GetCount() - 1];
					filt->fReceivers[filt->fReceivers.GetCount()-1] = nil;
					filt->fReceivers.SetCount(filt->fReceivers.GetCount()-1);
				}
				else
				{
					delete filt;
					fRegisteredExactTypes[i] = nil;
				}
			}
		}
	}
}

void plDispatch::UnRegisterForExactType(UInt16 hClass, const plKey& receiver)
{
	int idx = hClass;
	if( idx >= fRegisteredExactTypes.GetCount() )
		return;
	plTypeFilter* filt = fRegisteredExactTypes[idx];
	if( !filt )
		return;

	IUnRegisterForExactType(idx, receiver);
}