事件循環(有時稱為消息循環)是等待并調度傳入事件的線程。線程阻塞等待請求的到達,然后將事件分派給事件處理程序函數。循環通常使用消息隊列來保存傳入消息。依次對每個消息進行出隊,解碼,然后執行操作。事件循環是實現進程間通信的一種方法。
所有操作系統都支持多線程應用程序。每個操作系統都有用于創建線程,消息隊列和計時器的唯一函數調用。隨著C ++ 11線程支持庫的出現,現在可以創建可移植的代碼并避免特定于OS的函數調用。本文提供了一個簡單的示例,說明如何僅依靠C ++標準庫來創建線程事件循環,消息隊列和計時器服務。任何支持線程庫的C ++ 11編譯器都應該能夠編譯附加的源代碼。
通常,我需要一個線程來充當事件循環。線程將入站消息出隊,并根據唯一的消息標識符將數據調度到適當的函數處理程序。能夠調用功能的計時器支持對于低速輪詢很方便,如果在預期的時間內沒有發生任何事情,則可以生成超時。很多時候,輔助線程是在啟動時創建的,直到應用程序終止后才被銷毀。
該實現的關鍵要求是,傳入消息必須在同一線程實例上執行。盡管std::async
可以使用池中的臨時線程,但是此類確保所有傳入消息使用同一線程。例如,可以使用不是線程安全的代碼來實現子系統。單個WorkerThread
實例用于安全地將函數調用分派到子系統中。
乍一看,C ++線程支持似乎缺少一些關鍵功能。是的,std::thread
可以拆分一個線程,但是沒有線程安全隊列,也沒有計時器-大多數OS都提供的服務。我將展示如何使用C ++標準庫創建這些“缺失”功能,并提供許多程序員熟悉的事件處理循環。
本WorkerThread
類封裝了所有必要的事件循環機制。一個簡單的類接口允許線程創建,將消息發布到事件循環以及最終的線程終止。界面如下圖所示:
class WorkerThread
{
public:
/// Constructor
WorkerThread(const char* threadName);
/// Destructor
~WorkerThread();
/// Called once to create the worker thread
/// @return True if thread is created. False otherwise.
bool CreateThread();
/// Called once a program exit to exit the worker thread
void ExitThread();
/// Get the ID of this thread instance
/// @return The worker thread ID
std::thread::id GetThreadId();
/// Get the ID of the currently executing thread
/// @return The current thread ID
static std::thread::id GetCurrentThreadId();
/// Add a message to the thread queue
/// @param[in] data - thread specific message information
void PostMsg(std::shared_ptr<UserData> msg);
private:
WorkerThread(const WorkerThread&) = delete;
WorkerThread& operator=(const WorkerThread&) = delete;
/// Entry point for the worker thread
void Process();
/// Entry point for timer thread
void TimerThread();
std::unique_ptr<std::thread> m_thread;
std::queue<std::shared_ptr<ThreadMsg>> m_queue;
std::mutex m_mutex;
std::condition_variable m_cv;
std::atomic<bool> m_timerExit;
const char* THREAD_NAME;
};
首先要注意的是,std::thread
它用于創建主工作線程。主要工作線程功能是Process()
。
bool WorkerThread::CreateThread()
{
if (!m_thread)
m_thread = new thread(&WorkerThread::Process, this);
return true;
}
該Process()
事件循環如下所示。該線程依賴于a std::queue<ThreadMsg*>
消息隊列。std::queue
不是線程安全的,因此對隊列的所有訪問都必須由互斥保護。A std::condition_variable
用于暫停線程,直到收到新消息已添加到隊列的通知。
void WorkerThread::Process()
{
m_timerExit = false;
std::thread timerThread(&WorkerThread::TimerThread, this);
while (1)
{
std::shared_ptr<ThreadMsg> msg;
{
// Wait for a message to be added to the queue
std::unique_lock<std::mutex> lk(m_mutex);
while (m_queue.empty())
m_cv.wait(lk);
if (m_queue.empty())
continue;
msg = m_queue.front();
m_queue.pop();
}
switch (msg->id)
{
case MSG_POST_USER_DATA:
{
ASSERT_TRUE(msg->msg != NULL);
auto userData = std::static_pointer_cast<UserData>(msg->msg);
cout << userData->msg.c_str() << " " << userData->year << " on " << THREAD_NAME << endl;
break;
}
case MSG_TIMER:
cout << "Timer expired on " << THREAD_NAME << endl;
break;
case MSG_EXIT_THREAD:
{
m_timerExit = true;
timerThread.join();
return;
}
default:
ASSERT();
}
}
}
PostMsg()
ThreadMsg
在堆上創建一個新對象,將該消息添加到隊列中,然后使用條件變量通知工作線程。
void WorkerThread::PostMsg(std::shared_ptr<UserData> data)
{
ASSERT_TRUE(m_thread);
// Create a new ThreadMsg
std::shared_ptr<ThreadMsg> threadMsg(new ThreadMsg(MSG_POST_USER_DATA, data));
// Add user data msg to queue and notify worker thread
std::unique_lock<std::mutex> lk(m_mutex);
m_queue.push(threadMsg);
m_cv.notify_one();
}
循環將繼續處理消息,直到MSG_EXIT_THREAD
收到并退出線程為止。
void WorkerThread::ExitThread()
{
if (!m_thread)
return;
// Create a new ThreadMsg
std::shared_ptr<ThreadMsg> threadMsg(new ThreadMsg(MSG_EXIT_THREAD, 0));
// Put exit thread message into the queue
{
lock_guard<mutex> lock(m_mutex);
m_queue.push(threadMsg);
m_cv.notify_one();
}
m_thread->join();
m_thread = nullptr;
}
下面的代碼片段將std::thread
上面的事件循環與使用Windows API的類似Win32版本進行了對比。注意GetMessage()
API用于代替std::queue
。使用將消息發布到OS消息隊列PostThreadMessage()
。最后,timerSetEvent()
用于將WM_USER_TIMER
消息放入隊列。所有這些服務均由OS提供。std::thread WorkerThread
此處介紹的實現避免了原始OS調用,但實現功能與Win32版本相同,而僅依賴于C ++標準庫。
unsigned long WorkerThread::Process(void* parameter)
{
MSG msg;
BOOL bRet;
// Start periodic timer
MMRESULT timerId = timeSetEvent(250, 10, &WorkerThread::TimerExpired,
reinterpret_cast<DWORD>(this), TIME_PERIODIC);
while ((bRet = GetMessage(&msg, NULL, WM_USER_BEGIN, WM_USER_END)) != 0)
{
switch (msg.message)
{
case WM_DISPATCH_DELEGATE:
{
ASSERT_TRUE(msg.wParam != NULL);
// Convert the ThreadMsg void* data back to a UserData*
const UserData* userData = static_cast<const UserData*>(msg.wParam);
cout << userData->msg.c_str() << " " << userData->year << " on " << THREAD_NAME << endl;
// Delete dynamic data passed through message queue
delete userData;
break;
}
case WM_USER_TIMER:
cout << "Timer expired on " << THREAD_NAME << endl;
break;
case WM_EXIT_THREAD:
timeKillEvent(timerId);
return 0;
default:
ASSERT();
}
}
return 0;
}
使用輔助專用線程將低分辨率定期計時器消息插入隊列。計時器線程在內部創建Process()
。
void WorkerThread::Process()
{
m_timerExit = false;
std::thread timerThread(&WorkerThread::TimerThread, this);
...
計時器線程的唯一責任是MSG_TIMER
每250ms 插入一條消息。在此實現中,無法防止計時器線程將多個計時器消息注入到隊列中。如果工作線程落后并且無法足夠快地服務于消息隊列,則可能發生這種情況。根據工作線程,處理負載以及計時器消息的插入速度,可以采用其他邏輯來防止泛濫隊列。
void WorkerThread::TimerThread()
{
while (!m_timerExit)
{
// Sleep for 250mS then put a MSG_TIMER into the message queue
std::this_thread::sleep_for(250ms);
std::shared_ptr<ThreadMsg> threadMsg (new ThreadMsg(MSG_TIMER, 0));
// Add timer msg to queue and notify worker thread
std::unique_lock<std::mutex> lk(m_mutex);
m_queue.push(threadMsg);
m_cv.notify_one();
}
}
main()
下面的函數顯示了如何使用WorkerThread
該類。創建兩個工作線程,并將消息發布到每個工作線程。短暫延遲后,兩個線程均退出。
// Worker thread instances
WorkerThread workerThread1("WorkerThread1");
WorkerThread workerThread2("WorkerThread2");
int main(void)
{
// Create worker threads
workerThread1.CreateThread();
workerThread2.CreateThread();
// Create message to send to worker thread 1
std::shared_ptr<UserData> userData1(new UserData());
userData1->msg = "Hello world";
userData1->year = 2017;
// Post the message to worker thread 1
workerThread1.PostMsg(userData1);
// Create message to send to worker thread 2
std::shared_ptr<UserData> userData2(new UserData());
userData2->msg = "Goodbye world";
userData2->year = 2017;
// Post the message to worker thread 2
workerThread2.PostMsg(userData2);
// Give time for messages processing on worker threads
this_thread::sleep_for(1s);
workerThread1.ExitThread();
workerThread2.ExitThread();
return 0;
}
C ++線程支持庫提供了獨立于平臺的方式來編寫多線程應用程序代碼,而無需依賴于特定于操作系統的API。WorkerThread
這里介紹的類是事件循環的基本實現,但所有基礎知識都已準備就緒,可以進行擴展。
熱門源碼