// ========================================================================== // Class Implementation : COXThreadEngine // ========================================================================== // Source file : OXThreadEngine.cpp // This software along with its related components, documentation and files ("The Libraries") // is © 1994-2007 The Code Project (1612916 Ontario Limited) and use of The Libraries is // governed by a software license agreement ("Agreement"). Copies of the Agreement are // available at The Code Project (www.codeproject.com), as part of the package you downloaded // to obtain this file, or directly from our office. For a copy of the license governing // this software, you may contact us at legalaffairs@codeproject.com, or by calling 416-849-8900. // ////////////////////////////////////////////////////////////////////////// #include "stdafx.h" #include "OXThreadEngine.h" #ifdef _DEBUG #define new DEBUG_NEW #undef THIS_FILE static char THIS_FILE[] = __FILE__; #endif // timeout value (in msec) for the thread termination (see 'COXThreadEngine::Terminate()') #define OXTE_TERMINATION_TIMEOUT 10000 ///////////////////////////////////////////////////////////////////////////// // COXEngineCmd // // Definition of static members --------------------------------------------- DWORD COXEngineCmd::m_dwGlobalIndex = 0; // Data members ------------------------------------------------------------- // protected: // COXDoneNotifier* m_pDoneNotifier; // --- For optional notification after execution // HANDLE m_hFinishedEvent; // --- Used for synchronisation of synchrone commands // LONG m_nRef; // --- Used for reference counting // private: // static DWORD m_dwGlobalIndex; // --- Internal cyclic counter // Implementation of macros ------------------------------------------------- IMPLEMENT_DYNAMIC(COXEngineCmd, CObject) // Member functions --------------------------------------------------------- // public: COXEngineCmd::COXEngineCmd(BOOL bSynchrone /*= TRUE*/, COXDoneNotifier* pDoneNotifier /*= NULL*/) : m_hFinishedEvent(NULL), m_dwIndex(++m_dwGlobalIndex), m_pDoneNotifier(pDoneNotifier), m_nRef(2+(pDoneNotifier!=NULL?1:0)) // One release has to be done after PostCommand(); // Another one will be done by the engine after the // execution // In case of notification a release has { // to be done after notification if (bSynchrone) // m_hFinishedEvent = CreateEvent(NULL, FALSE, FALSE, NULL); } COXEngineCmd::~COXEngineCmd() { if (m_hFinishedEvent) CloseHandle(m_hFinishedEvent); } void COXEngineCmd::Release() { if (InterlockedDecrement(&m_nRef) == 0) delete this; } BOOL COXEngineCmd::IsSynchrone() const { return (m_hFinishedEvent!=NULL); } ///////////////////////////////////////////////////////////////////////////// // COXThreadEngine::COXEngineCmdList // Data members ------------------------------------------------------------- // protected: // HANDLE m_hMutex; // --- mutex for lock functionality // Member functions --------------------------------------------------------- // public: COXThreadEngine::COXEngineCmdList::COXEngineCmdList() : m_hMutex(NULL) // --- In : none // --- Out : none // --- Returns : none // --- Effect : constructs the command list and creates the internal mutex { m_hMutex = CreateMutex(NULL, FALSE, NULL); } COXThreadEngine::COXEngineCmdList::~COXEngineCmdList() // --- In : none // --- Out : none // --- Returns : none // --- Effect : destructs the command list and closes the mutex handle { if (m_hMutex) CloseHandle(m_hMutex); } void COXThreadEngine::COXEngineCmdList::Lock() // --- In : none // --- Out : none // --- Returns : none // --- Effect : locks the list exclusively { WaitForSingleObject(m_hMutex, INFINITE); } void COXThreadEngine::COXEngineCmdList::Unlock() // --- In : none // --- Out : none // --- Returns : none // --- Effect : unlocks the list { VERIFY(ReleaseMutex(m_hMutex)); } ///////////////////////////////////////////////////////////////////////////// // COXThreadEngineBase // Data members ------------------------------------------------------------- // protected: // BOOL m_bEndThread; // --- Thread shall terminate during next thread event // BOOL m_bInitialized; // --- If successfully initialised // CWinThread* m_pThread; // --- Thread pointer of the engines thread // HANDLE m_hCreatedEvent; // --- Event for initialisation synchronisation // HANDLE m_hEndEvent; // --- Event for termination synchronisation // HANDLE m_hThreadEvent; // --- event when a new command is posted // COXEngineCmdList m_cmdList; // --- the command queue of the engine // Member functions --------------------------------------------------------- // public: COXThreadEngine::COXThreadEngine() : m_hCreatedEvent(NULL), m_bInitialized(FALSE), m_bEndThread(FALSE), m_hEndEvent(NULL), m_hThreadEvent(NULL), m_pThread(NULL), m_nTerinationTimeout(OXTE_TERMINATION_TIMEOUT) { m_hCreatedEvent = CreateEvent(NULL, FALSE, FALSE, NULL); m_hEndEvent = CreateEvent(NULL, FALSE, FALSE, NULL); m_hThreadEvent = CreateEvent(NULL, FALSE, FALSE, NULL); } COXThreadEngine::~COXThreadEngine() { if (m_hEndEvent) CloseHandle(m_hEndEvent); if (m_hCreatedEvent) CloseHandle(m_hCreatedEvent); if (m_hThreadEvent) CloseHandle(m_hThreadEvent); } BOOL COXThreadEngine::Initialize() { DWORD dwWaitResult; if (IsInitialized()) { TRACE0("COXThreadEngine::Initialize() -> Already initialized !!"); ASSERT(FALSE); return FALSE; } m_pThread = AfxBeginThread((AFX_THREADPROC)StartThread, this); if (m_pThread) { while (TRUE) { dwWaitResult = ::WaitForSingleObject(&m_hCreatedEvent,INFINITE); //dwWaitResult = MsgWaitForMultipleObjects(1, &m_hCreatedEvent, FALSE, INFINITE, QS_ALLINPUT); if (dwWaitResult == (WAIT_OBJECT_0 + 1)) // You must avoid a WM_QUIT to be send during initialization VERIFY(AfxGetThread()->PumpMessage()); else return TRUE; } } return FALSE; } BOOL COXThreadEngine::IsInitialized() { return m_bInitialized; } void COXThreadEngine::Terminate() { DWORD dwWaitResult; BOOL bEnd = FALSE; BOOL bPostQuit = FALSE; if (m_pThread==NULL) return; m_bEndThread = TRUE; SetEvent(m_hThreadEvent); while (!bEnd) { dwWaitResult = MsgWaitForMultipleObjects(1, &m_hEndEvent, FALSE, m_nTerinationTimeout, QS_ALLINPUT); if (dwWaitResult == (WAIT_OBJECT_0 + 1)) { MSG msg; // making sure there is a msg in the queue // we don't want PumpMessage() to hang if (PeekMessage(&msg, NULL, 0, 0, PM_NOREMOVE)) bPostQuit = (!AfxGetThread()->PumpMessage()) || bPostQuit; } else { if (m_pThread && (dwWaitResult != WAIT_OBJECT_0)) TerminateThread(m_pThread->m_hThread, 0); bEnd = TRUE; } } if (bPostQuit) { AfxPostQuitMessage(0); Sleep(200); // Give the terminating thread the time to terminate } m_bEndThread = FALSE; m_bInitialized = FALSE; } void COXThreadEngine::PostCommand(COXEngineCmd* pCmd, BOOL bASAP /*= FALSE*/) { m_cmdList.Lock(); if (bASAP) m_cmdList.AddHead(pCmd); else m_cmdList.AddTail(pCmd); m_cmdList.Unlock(); SetEvent(m_hThreadEvent); if (pCmd->m_hFinishedEvent) WaitForSingleObject(pCmd->m_hFinishedEvent, INFINITE); } // protected: UINT COXThreadEngine::StartThread( LPVOID pParam ) // --- In : pParam: the engine itself (it needs to be past on as a parameter // because this is a static functions) // --- Out : none // --- Returns : always zero (indicates that the thread terminated normally) // --- Effect : calls the Run() functions which implements a message loop { ((COXThreadEngine*) pParam)->Run(); return 0; } void COXThreadEngine::Run() // --- In : none // --- Out : none // --- Returns : none // --- Effect : implements a loop which ends if Terminate() is called; // this loop reacts on every command that is added to the engines queue { MSG msg; HRESULT hr; hr = CoInitialize(NULL); if (hr==S_OK) m_bInitialized = OnThreadCreation(); SetEvent(m_hCreatedEvent); if (m_bInitialized) { DWORD dwWaitResult; while (!m_bEndThread) { dwWaitResult = MsgWaitForMultipleObjects(1, &m_hThreadEvent, FALSE, INFINITE, QS_ALLINPUT); if (dwWaitResult == (WAIT_OBJECT_0 + 1)) { while (PeekMessage(&msg, NULL, 0, 0, PM_REMOVE)) DispatchMessage(&msg); } else if (!m_bEndThread) OnThreadEvent(); } OnThreadDestruction(); m_pThread = NULL; CoUninitialize(); SetEvent(m_hEndEvent); } m_pThread = NULL; } BOOL COXThreadEngine::OnThreadCreation() // --- In : none // --- Out : none // --- Returns : if initialisation of the engine will be successful or not // --- Effect : is called by the engine when the thread is started // overload this function if you need to do some specific // initialisation { return TRUE; } void COXThreadEngine::OnThreadDestruction() // --- In : none // --- Out : none // --- Returns : none // --- Effect : is called by the engine when the thread is terminated; // overload this function if you need to do some specific // uninitialisation; // dont forget to call this base function on the // end of your overloaded version { COXEngineCmd* pCmd; while (m_cmdList.GetCount()) { pCmd = m_cmdList.RemoveHead(); pCmd->Release(); // the one that will not be done by OnThreadEvent() if (pCmd->m_pDoneNotifier) pCmd->Release(); // the one that will not be done by the notifier } } void COXThreadEngine::OnThreadEvent() // --- In : none // --- Out : none // --- Returns : none // --- Effect : if a thread event occurs, this function is called to execute all // the commands in the engine queue (from head to tail) { COXEngineCmd* pCmd; COXDoneNotifier* pDoneNotifier; HANDLE hFinishedEvent; m_cmdList.Lock(); while (!m_bEndThread && m_cmdList.GetCount()) { pCmd = m_cmdList.RemoveHead(); m_cmdList.Unlock(); hFinishedEvent = pCmd->m_hFinishedEvent; pDoneNotifier = pCmd->m_pDoneNotifier; OnExecuteCmd(pCmd); if (pDoneNotifier) pDoneNotifier->DoneCommand(pCmd); if (hFinishedEvent) SetEvent(hFinishedEvent); pCmd->Release(); m_cmdList.Lock(); } m_cmdList.Unlock(); }