/*==LICENSE==*

CyanWorlds.com Engine - MMOG client, server and tools
Copyright (C) 2011  Cyan Worlds, Inc.

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program.  If not, see <http://www.gnu.org/licenses/>.

You can contact Cyan Worlds, Inc. by email legal@cyan.com
 or by snail mail at:
      Cyan Worlds, Inc.
      14617 N Newport Hwy
      Mead, WA   99021

*==LICENSE==*/

#include "hsTypes.h"
#include "hsResMgr.h"
#include "plDispatch.h"
#define PLMESSAGE_PRIVATE
#include "../pnMessage/plMessage.h"
#include "../pnKeyedObject/hsKeyedObject.h"
#include "hsTimer.h"
#include "../pnMessage/plTimeMsg.h"
#include "../pnKeyedObject/plKey.h"
#include "plDispatchLogBase.h"
#include "../pnNetCommon/plNetApp.h"
#include "../pnNetCommon/plSynchedObject.h"
#include "../pnNetCommon/pnNetCommon.h"
#include "hsThread.h"
#include "plProfile.h"

plProfile_CreateTimer("MsgReceive", "Update", MsgReceive);
plProfile_CreateTimer("  TimeMsg", "Update", TimeMsg);
plProfile_CreateTimer("  EvalMsg", "Update", EvalMsg);
plProfile_CreateTimer("  TransformMsg", "Update", TransformMsg);
plProfile_CreateTimer("  CameraMsg", "Update", CameraMsg);

class plMsgWrap
{
public:
	plMsgWrap**						fBack;
	plMsgWrap*						fNext;
	hsTArray<plKey>					fReceivers;

	plMessage*						fMsg;

	plMsgWrap(plMessage* msg) : fMsg(msg) { hsRefCnt_SafeRef(msg); }
	virtual ~plMsgWrap() { hsRefCnt_SafeUnRef(fMsg); }

	plMsgWrap&						ClearReceivers() { fReceivers.SetCount(0); return *this; }
	plMsgWrap&						AddReceiver(const plKey& rcv) 
									{ 
										hsAssert(rcv, "Trying to send mail to nil receiver");
										fReceivers.Append(rcv); return *this;
									}
	const plKey&					GetReceiver(int i) const { return fReceivers[i]; }
	UInt32							GetNumReceivers() const { return fReceivers.GetCount(); }
};

Int32					plDispatch::fNumBufferReq = 0;
hsBool					plDispatch::fMsgActive = false;
plMsgWrap*				plDispatch::fMsgCurrent = nil;
plMsgWrap*				plDispatch::fMsgHead = nil;
plMsgWrap*				plDispatch::fMsgTail = nil;
hsTArray<plMessage*>	plDispatch::fMsgWatch;
MsgRecieveCallback		plDispatch::fMsgRecieveCallback = nil;

hsMutex		plDispatch::fMsgCurrentMutex; // mutex for fMsgCurrent
hsMutex		plDispatch::fMsgDispatchLock; // mutex for IMsgDispatch


plDispatch::plDispatch()
: fOwner(nil), fFutureMsgQueue(nil), fQueuedMsgOn(true)
{
}

plDispatch::~plDispatch()
{
	int i;
	for( i = 0; i < fRegisteredExactTypes.GetCount(); i++ )
		delete fRegisteredExactTypes[i];

	ITrashUndelivered();

}

void plDispatch::ITrashUndelivered()
{
	while( fFutureMsgQueue )
	{
		plMsgWrap* nuke = fFutureMsgQueue;
		fFutureMsgQueue = fFutureMsgQueue->fNext;
		hsRefCnt_SafeUnRef(nuke->fMsg);
		delete nuke;
	}

	// If we're the main dispatch, any unsent messages at this
	// point are just trashed. Slave dispatches just go away and
	// leave their messages to be delivered when the main dispatch
	// gets around to it.
	if( this == plgDispatch::Dispatch() )
	{
		while( fMsgHead )
		{
			plMsgWrap* nuke = fMsgHead;
			fMsgHead = fMsgHead->fNext;
			// hsRefCnt_SafeUnRef(nuke->fMsg);		// MOOSE - done in plMsgWrap dtor
			delete nuke;
		}

		// reset static members which we just deleted - MOOSE
		fMsgCurrent=fMsgHead=fMsgTail=nil;

		fMsgActive = false;
	}
}

plMsgWrap* plDispatch::IInsertToQueue(plMsgWrap** curr, plMsgWrap* isert)
{
	isert->fNext = *curr;
	isert->fBack = curr;
	if( *curr )
		(*curr)->fBack = &isert->fNext;
	*curr = isert;
	return isert;
}

plMsgWrap* plDispatch::IDequeue(plMsgWrap** head, plMsgWrap** tail)
{
	plMsgWrap* retVal = *head;
	if( *head )
	{
		*head = (*head)->fNext;
		if( *head )
			(*head)->fBack = head;
	}
	if( tail && (*tail == retVal) )
		*tail = *head;
	return retVal;
}

hsBool plDispatch::ISortToDeferred(plMessage* msg)
{
	plMsgWrap* msgWrap = TRACKED_NEW plMsgWrap(msg);
	if( !fFutureMsgQueue )
	{
		if( IGetOwner() )
			plgDispatch::Dispatch()->RegisterForExactType(plTimeMsg::Index(), IGetOwnerKey());

		IInsertToQueue(&fFutureMsgQueue, msgWrap);
		return false;
	}
	if( fFutureMsgQueue->fMsg->fTimeStamp > msgWrap->fMsg->fTimeStamp )
	{
		IInsertToQueue(&fFutureMsgQueue, msgWrap);
		return false;
	}
	plMsgWrap* after = fFutureMsgQueue;
	while( after->fNext && (after->fNext->fMsg->fTimeStamp < msgWrap->fMsg->fTimeStamp) )
		after = after->fNext;

	IInsertToQueue(&after->fNext, msgWrap);

	return false;
}

void plDispatch::ICheckDeferred(double secs)
{
	while( fFutureMsgQueue && (fFutureMsgQueue->fMsg->fTimeStamp < secs) )
	{
		plMsgWrap* send = IDequeue(&fFutureMsgQueue, nil);
		MsgSend(send->fMsg);
		delete send;
	}

	int timeIdx = plTimeMsg::Index();
	if( IGetOwner()
		&& !fFutureMsgQueue 
		&& 
			( 
				(timeIdx >= fRegisteredExactTypes.GetCount()) 
				|| 
				!fRegisteredExactTypes[plTimeMsg::Index()]
			)
	  )
		plgDispatch::Dispatch()->UnRegisterForExactType(plTimeMsg::Index(), IGetOwnerKey());
}

hsBool plDispatch::IListeningForExactType(UInt16 hClass)
{
	if( (hClass == plTimeMsg::Index()) && fFutureMsgQueue )
		return true;

	return false;
}

void plDispatch::IMsgEnqueue(plMsgWrap* msgWrap, hsBool async)
{
	fMsgCurrentMutex.Lock();

#ifdef HS_DEBUGGING
	if( msgWrap->fMsg->HasBCastFlag(plMessage::kMsgWatch) )
		fMsgWatch.Append(msgWrap->fMsg);
#endif // HS_DEBUGGING

	if( fMsgTail )
		fMsgTail = IInsertToQueue(&fMsgTail->fNext, msgWrap);
	else
		fMsgTail = IInsertToQueue(&fMsgHead, msgWrap);
	fMsgCurrentMutex.Unlock();

	if( !async )
		// Test for fMsgActive in IMsgDispatch(), properly wrapped inside a mutex -mcn
		IMsgDispatch();
}

// On starts deferring msg delivery until buffering is set to off again.
hsBool plDispatch::SetMsgBuffering(hsBool on)
{
	fMsgCurrentMutex.Lock();
	if( on )
	{
		hsAssert(fNumBufferReq || !fMsgActive, "Can't start deferring message delivery while delivering messages. See mf");
		if( !fNumBufferReq && fMsgActive )
		{
			fMsgCurrentMutex.Unlock();
			return false;
		}

		fNumBufferReq++;
		fMsgActive = true;
		fMsgCurrentMutex.Unlock();
	}
	else if( !--fNumBufferReq )
	{
		fMsgActive = false;
		fMsgCurrentMutex.Unlock();
		IMsgDispatch();
	}
	hsAssert(fNumBufferReq >= 0, "Mismatched number of on/off dispatch buffering requests");

	return true;
}

void plDispatch::IMsgDispatch()
{
	if( !fMsgDispatchLock.TryLock() )
		return;

	if( fMsgActive )
	{
		fMsgDispatchLock.Unlock();
		return;
	}

	fMsgActive = true;
	int responseLevel=0;

	fMsgCurrentMutex.Lock();

	plMsgWrap* origTail = fMsgTail;
	while( fMsgCurrent = fMsgHead )
	{
		IDequeue(&fMsgHead, &fMsgTail);
		fMsgCurrentMutex.Unlock();

		plMessage* msg = fMsgCurrent->fMsg;
		hsBool nonLocalMsg = msg && msg->HasBCastFlag(plMessage::kNetNonLocal);

#ifdef HS_DEBUGGING
		int watchIdx = fMsgWatch.Find(msg);
		if( fMsgWatch.kMissingIndex != watchIdx )
		{
			fMsgWatch.Remove(watchIdx);
#if HS_BUILD_FOR_WIN32
			__asm { int 3 }
#endif // HS_BUILD_FOR_WIN32
		}
#endif // HS_DEBUGGING

		static UInt64 startTicks = 0;
		if (plDispatchLogBase::IsLogging())
			startTicks = hsTimer::GetFullTickCount();

		int i, numReceivers=0;
		for( i = 0; fMsgCurrent && i < fMsgCurrent->GetNumReceivers(); i++ )
		{
			const plKey& rcvKey = fMsgCurrent->GetReceiver(i);
			plReceiver* rcv = rcvKey ? plReceiver::ConvertNoRef(rcvKey->ObjectIsLoaded()) : nil;
			if( rcv )
			{
				if (nonLocalMsg)
				{
					// localOnly objects should not get remote messages
					plSynchedObject* synchedObj = plSynchedObject::ConvertNoRef(rcv);
					if (synchedObj && !synchedObj->IsNetSynched() )
					{
						continue;
					}

					if (plNetObjectDebuggerBase::GetInstance())
					{	// log net msg if this is a debug object
						hsKeyedObject* ko = hsKeyedObject::ConvertNoRef(rcv);
						if (plNetObjectDebuggerBase::GetInstance()->IsDebugObject(ko))
						{
							hsLogEntry(plNetObjectDebuggerBase::GetInstance()->LogMsg(
								xtl::format("<RCV> object:%s, GameMessage %s st=%.3f rt=%.3f", 
								ko->GetKeyName(), msg->ClassName(), hsTimer::GetSysSeconds(), hsTimer::GetSeconds()).c_str()));
						}
					}
				}

#ifndef PLASMA_EXTERNAL_RELEASE
				UInt32 rcvTicks = hsTimer::GetPrecTickCount();

				// Object could be deleted by this message, so we need to log this stuff now
				const char* keyname = "(unknown)";
				const char* className = "(unknown)";
				UInt32 clonePlayerID = 0;
				if (plDispatchLogBase::IsLoggingLong())
				{
					hsKeyedObject* ko = hsKeyedObject::ConvertNoRef(rcv);
					if (ko)
					{
						keyname = ko->GetKeyName();
						clonePlayerID = ko->GetKey()->GetUoid().GetClonePlayerID();
						className = ko->ClassName();
					}
				}
#endif // PLASMA_EXTERNAL_RELEASE

				#ifdef HS_DEBUGGING
				if (msg->GetBreakBeforeDispatch())
					DebugBreakIfDebuggerPresent();
				#endif
					
				plProfile_BeginTiming(MsgReceive);
				rcv->MsgReceive(msg);
				plProfile_EndTiming(MsgReceive);

#ifndef PLASMA_EXTERNAL_RELEASE
				if (plDispatchLogBase::IsLoggingLong())
				{
					rcvTicks = hsTimer::GetPrecTickCount() - rcvTicks;

					float rcvTime = (float)(hsTimer::PrecTicksToSecs(rcvTicks) * 1000.f);
					// If the receiver takes more than 5 ms to process its message, log it
					if (rcvTime > 5.f)
						plDispatchLogBase::GetInstance()->LogLongReceive(keyname, className, clonePlayerID, msg, rcvTime);
				}
#endif // PLASMA_EXTERNAL_RELEASE

				numReceivers++;

				if (fMsgRecieveCallback != nil)
					fMsgRecieveCallback();
			}
		}

		// for message logging
//		if (plDispatchLogBase::IsLogging())
//		{
//			float sendTime = hsTimer::FullTicksToMs(hsTimer::GetFullTickCount() - startTicks);
//
//			plDispatchLogBase::GetInstance()->DumpMsg(msg, numReceivers, (int)sendTime, responseLevel*2 /* indent */);
//			if (origTail==fMsgCurrent)
//			{	// if we deliver more msgs after this, they must be response msgs
//				responseLevel++;
//				origTail = fMsgTail;
//			}
//		}

		fMsgCurrentMutex.Lock();

		delete fMsgCurrent;
		// TEMP
		fMsgCurrent = (class plMsgWrap *)0xdeadc0de;
	}
	fMsgCurrentMutex.Unlock();

	fMsgActive = false;
	fMsgDispatchLock.Unlock();
}

//
// returns true if msg has been consumed and deleted
//
hsBool plDispatch::IMsgNetPropagate(plMessage* msg)
{
	fMsgCurrentMutex.Lock();

	// Make sure cascaded messages all have the same net flags
	plNetClientApp::InheritNetMsgFlags(fMsgCurrent ? fMsgCurrent->fMsg : nil, msg, false);

	fMsgCurrentMutex.Unlock();

	// Decide if msg should go out over the network.
	// If kNetForce is used, this message should always go out over the network, even if it's already 
	// part of a net cascade. We still want to inherit net status flags (but ignore them)
	// so that response messages obey cascading rules.  In other words, we are not 
	// halting the cascade, just overriding the send rule for this message.
	if( msg->HasBCastFlag(plMessage::kNetPropagate) && 
		(!msg->HasBCastFlag(plMessage::kNetSent) || 
		msg->HasBCastFlag(plMessage::kNetForce) ||
		msg->HasBCastFlag(plMessage::kNetNonDeterministic) ||
		msg->HasBCastFlag(plMessage::kCCRSendToAllPlayers )) )
	{
		// send it off...
		hsAssert(!msg->HasBCastFlag(plMessage::kNetStartCascade), "initial net cascade msg getting sent over the net again?");
		if (plNetClientApp::GetInstance() && plNetClientApp::GetInstance()->ISendGameMessage(msg)>=0)
			msg->SetBCastFlag(plMessage::kNetSent);
	}

	// Decide if msg should get sent locally
	if (!msg->HasBCastFlag(plMessage::kLocalPropagate))
	{	
		hsRefCnt_SafeUnRef(msg);
		return true;
	}

	// since we've already checked this property, and the msg will be dispatched locally,
	// it should not start any more net cascades.
	msg->SetBCastFlag(plMessage::kNetStartCascade, false);

	return false;
}

hsBool plDispatch::MsgSend(plMessage* msg, hsBool async)
{
	if( IMsgNetPropagate(msg) )
		return true;

	plTimeMsg* timeMsg;
	if( msg->GetTimeStamp() > hsTimer::GetSysSeconds() )
		return ISortToDeferred(msg);
	else if( timeMsg = plTimeMsg::ConvertNoRef(msg) )
		ICheckDeferred(timeMsg->DSeconds());

	plMsgWrap* msgWrap = TRACKED_NEW plMsgWrap(msg);
	hsRefCnt_SafeUnRef(msg);

	// broadcast
	if( msg->HasBCastFlag(plMessage::kBCastByExactType) | msg->HasBCastFlag(plMessage::kBCastByType) )
	{
		int idx = msg->ClassIndex();
		if( idx < fRegisteredExactTypes.GetCount() )
		{
			plTypeFilter* filt = fRegisteredExactTypes[idx];
			if( filt )
			{
				int j;
				for( j = 0; j < filt->fReceivers.GetCount(); j++ )
				{
					msgWrap->AddReceiver(filt->fReceivers[j]);
				}
				if( msg->HasBCastFlag(plMessage::kClearAfterBCast) )
				{
					delete filt;
					fRegisteredExactTypes[idx] = nil;
				}
			}
		}
	}
	// Direct communique
	else
	if( msg->GetNumReceivers() )
	{
		msgWrap->fReceivers = msg->fReceivers;
	}
	IMsgEnqueue(msgWrap, async);

	return true;
}
void plDispatch::MsgQueueOnOff(hsBool sw)
{
	fQueuedMsgOn = sw;
}
void plDispatch::MsgQueue(plMessage* msg)
{
	if (fQueuedMsgOn)
	{
		fQueuedMsgListMutex.Lock();
		hsAssert(msg,"Message missing");
		fQueuedMsgList.push_back(msg);
		fQueuedMsgListMutex.Unlock();
	}
	else
		MsgSend(msg, false);
}

void plDispatch::MsgQueueProcess()
{
		// Process all messages on Queue, unlock while sending them
		// this would allow other threads to put new messages on the list while we send()
	while (1)
	{	
		plMessage * pMsg = nil;
		fQueuedMsgListMutex.Lock();
		int size = fQueuedMsgList.size();
		if (size)
		{	pMsg = fQueuedMsgList.front();
			fQueuedMsgList.pop_front();
		}
		fQueuedMsgListMutex.Unlock();
		if (pMsg)
		{	MsgSend(pMsg, false);
		}
		if (!size)
			break;
	}
}

void plDispatch::RegisterForType(UInt16 hClass, const plKey& receiver)
{
	int i;
	for( i = 0; i < plFactory::GetNumClasses(); i++ )
	{
		if( plFactory::DerivesFrom(hClass, i) )
			RegisterForExactType(i, receiver);
	}
}

void plDispatch::RegisterForExactType(UInt16 hClass, const plKey& receiver)
{
	int idx = hClass;
	fRegisteredExactTypes.ExpandAndZero(idx+1);
	plTypeFilter* filt = fRegisteredExactTypes[idx];
	if( !filt )
	{
		filt = TRACKED_NEW plTypeFilter;
		fRegisteredExactTypes[idx] = filt;
		filt->fHClass = hClass;
	}

	if( filt->fReceivers.kMissingIndex == filt->fReceivers.Find(receiver) )
		filt->fReceivers.Append(receiver);
}

void plDispatch::UnRegisterForType(UInt16 hClass, const plKey& receiver)
{
	int i;
	for( i = 0; i < fRegisteredExactTypes.GetCount(); i++ )
	{
		if( plFactory::DerivesFrom(hClass, i) )
			IUnRegisterForExactType(i , receiver);
	}

}

hsBool plDispatch::IUnRegisterForExactType(int idx, const plKey& receiver)
{
	hsAssert(idx < fRegisteredExactTypes.GetCount(), "Out of range should be filtered before call to internal");
	plTypeFilter* filt = fRegisteredExactTypes[idx];
	if (!filt)
		return false;
	int j;
	for( j = 0; j < filt->fReceivers.GetCount(); j++ )
	{
		if( receiver == filt->fReceivers[j] )
		{
			if( filt->fReceivers.GetCount() > 1 )
			{
				if( j < filt->fReceivers.GetCount() - 1 )
					filt->fReceivers[j] = filt->fReceivers[filt->fReceivers.GetCount() - 1];
				filt->fReceivers[filt->fReceivers.GetCount()-1] = nil;
				filt->fReceivers.SetCount(filt->fReceivers.GetCount()-1);
			}
			else
			{
				delete filt;
				fRegisteredExactTypes[idx] = nil;
			}

			break;
		}
	}
	return false;
}

void plDispatch::UnRegisterAll(const plKey& receiver)
{
	int i;
	for( i = 0; i < fRegisteredExactTypes.GetCount(); i++ )
	{
		plTypeFilter* filt = fRegisteredExactTypes[i];
		if( filt )
		{
			int idx = filt->fReceivers.Find(receiver);
			if( idx != filt->fReceivers.kMissingIndex )
			{
				if( filt->fReceivers.GetCount() > 1 )
				{
					if( idx < filt->fReceivers.GetCount() - 1 )
						filt->fReceivers[idx] = filt->fReceivers[filt->fReceivers.GetCount() - 1];
					filt->fReceivers[filt->fReceivers.GetCount()-1] = nil;
					filt->fReceivers.SetCount(filt->fReceivers.GetCount()-1);
				}
				else
				{
					delete filt;
					fRegisteredExactTypes[i] = nil;
				}
			}
		}
	}
}

void plDispatch::UnRegisterForExactType(UInt16 hClass, const plKey& receiver)
{
	int idx = hClass;
	if( idx >= fRegisteredExactTypes.GetCount() )
		return;
	plTypeFilter* filt = fRegisteredExactTypes[idx];
	if( !filt )
		return;

	IUnRegisterForExactType(idx, receiver);
}