libuv漫談之線程
node.js給使用者的一個很大印象就是單線程,在某種意義上它確實是只有一條線程,這種反應到libuv里就是一個loop線程,也是node.js大多數工作的線程。
不過node.js里還是有少部分地方使用了新線程處理的邏輯,比如說我們在node常用的vm模塊,runInContext方法有一個timeout的配置項,可以用來配置腳本的執行時間,當配置此項的時候,超時會中斷執行並觸發一個錯誤。實現這個邏輯,靠的就是線程,node為此功能而實現了一個WatchDog類,在這個類里,它開了一個新線程,並且在新線程里開了一個新loop,在這個新loop里註冊uv_timer負責超時機制,當時間到達以後觸發回調,WatchDog的使命也就完成了。
除了這類超時類的工作用新線程來實現,node.js依賴的libuv內部大量使用了線程機制,特別是線程池,在libuv里,有如下幾種工作是依賴線程池的:
- 文件的非同步操作
- DNS工具的getaddrinfo和getnameinfo
- 暴露給用戶使用的uv_queue_work,node也有使用,比如說Node.js Documentation-stream_transform_flush_callback 這裡的_flush是非同步的,它最終會調用uv_queue_work在線程池裡執行。
libuv封裝的線程和線程相關的api,基本上模仿的是POSIX threads的API類似,事實上在unix類的系統里,大多數api都是對pthread的簡單封裝,比如說uv_once封裝的是pthread_once,uv_thread_self就是pthread_self,uv_thread_join也就是pthread_join,當然由於posix系裡各家系統對pthread的支持力度也不同,所以libuv也有api為此做了兼容,這個在後面說。
API示例
簡單的把libuv的線程api分類,有這麼幾類:
線程類api
分別是uv_thread_create,uv_thread_self,uv_thread_equal,uv_thread_join這四個。
可以看到這裡的線程類api和pthread相比並不完全,比如說相比pthread,它沒有提供uv_thread_detach的api,而且uv_thread_create還無法支持設定線程屬性,比如說pthread_attr_t這類,其原因非常簡單,libuv並不想做成一個強大的線程庫,它在滿足自己需求下更追求的是跨平台。
後面我會說libuv在跨平台這個大前提下所做的很多努力和犧牲。
一個簡單的線程類api使用示例:
void worker(void *arg) { uv_thread_t nthread = *((uv_thread_t *) arg); uv_thread_t mthread = uv_thread_self(); assert(!uv_thread_equal(&mthread,&nthread));}int main(int argc, char **argv) { uv_thread_t mthread = uv_thread_self(); uv_thread_t nthread; uv_thread_create(&nthread, worker, &mthread); uv_thread_join(&nthread);}
多線程初始化api
這個api只有一個,uv_once。
使用過pthread的同學都知道有一個pthread_once,它的用途很簡單,在多個線程可能執行的情況下保證初始化只執行一次,一般情況下都是在static方法里使用,這裡的第一個參數是uv_once_t類型,一般用UV_ONCE_INIT宏來初始化。
一個簡單的多線程初始化api示例:
static uv_once_t once = UV_ONCE_INIT;static void init_once(void) { printf("init once
");}void worker(void *arg) { uv_once(&once, init_once); printf("thread id:%lu
",uv_thread_self());}int main(int argc, char **argv) { uv_thread_t nthread1,nthread2; uv_thread_create(&nthread1, worker, NULL); uv_thread_create(&nthread2, worker, NULL); uv_thread_join(&nthread1); uv_thread_join(&nthread2);}
互斥鎖api
分別是uv_mutex_init、uv_mutex_destroy、uv_mutex_lock、uv_mutex_trylock、uv_mutex_unlock這五個。
多線程環境里,由於存在資源競爭的問題,必須需要鎖的機制,libuv封裝了常用的幾類鎖,包括互斥量、讀寫鎖、信號量、條件變數這四種,其中互斥鎖是最常見的線程鎖。
互斥鎖的api里包含了初始化、銷毀以及lock、unlock、trylock這三種方法,使用起來如下:
static uv_mutex_t lock;void worker(void *arg) { // if(uv_mutex_trylock(&lock)<0){ // printf("mutex locked
"); // return; // }; uv_mutex_lock(&lock); sleep(2); printf("thread id:%lu
",uv_thread_self()); uv_mutex_unlock(&lock);}int main(int argc, char **argv) { uv_mutex_init(&lock); uv_thread_t nthread1,nthread2; uv_thread_create(&nthread1, worker, NULL); uv_thread_create(&nthread2, worker, NULL); uv_thread_join(&nthread1); uv_thread_join(&nthread2);}
上面這個示例可以看出來如果使用lock操作,那麼另外一個沒有獲得鎖的線程將會阻塞,直到使用鎖的線程unlock,如果使用了trylock操作,沒獲得鎖它也不會阻塞,但是你可以通過返回值讓它重試。
讀寫鎖API
分別是uv_rwlock_init、uv_rwlock_destroy、uv_rwlock_rdlock、uv_rwlock_tryrdlock、uv_rwlock_rdunlock、uv_rwlock_wrlock、uv_rwlock_trywrlock、uv_rwlock_wrunlock這八個。
讀寫鎖比起互斥鎖更細緻一點,因為它分為寫鎖定和讀鎖定兩類操作,這也是上面api有八個的原因,其中init、destory是初始化和銷毀操作,rdlock、tryrdlock、rdunlock都是讀鎖定相關的,wrlock、trywrlock、wrunlock是寫鎖定相關的。
uv_rwlock_t lock;int num = 0;void writer(void *arg) { uv_rwlock_wrlock(&lock); sleep(2); num++; printf("writer,num is %d
",num); uv_rwlock_wrunlock(&lock);}void reader(void *arg) { uv_rwlock_rdlock(&lock); sleep(2); printf("reader,num is %d
",num); uv_rwlock_rdunlock(&lock);}int main(int argc, char **argv) { uv_rwlock_init(&lock); uv_thread_t nthread1,nthread2,nthread3,nthread4; uv_thread_create(&nthread1, writer, NULL); uv_thread_create(&nthread2, writer, NULL); uv_thread_create(&nthread3, reader, NULL); uv_thread_create(&nthread4, reader, NULL); uv_thread_join(&nthread1); uv_thread_join(&nthread2); uv_thread_join(&nthread3); uv_thread_join(&nthread4);}
這裡開了四個線程,其中兩個線程加了讀鎖,兩個線程加了寫鎖,可以從列印順序里看到,兩個寫鎖的函數先執行,而且表現和mutex類似,第二個獲得寫鎖的會阻塞,等待寫鎖解除。而在有讀鎖的線程里,全部在阻塞等待寫鎖解除,當寫鎖解除以後,讀鎖解除阻塞,此時同時列印了reader,兩個讀鎖之間不阻塞。
上面還有uv_rwlock_tryrdlock和uv_rwlock_trywrlock這兩個api,都是非阻塞版本,如果沒有獲得鎖,它們不阻塞,可以通過返回值判斷重試。
線程條件量API
分別是uv_cond_init、uv_cond_destroy、uv_cond_signal、uv_cond_broadcast、uv_cond_wait、uv_cond_timedwait這六個。
條件變數和mutex的區別是,mutex在同一時刻只允許一個線程進入臨界區,其他線程只能阻塞,而條件量可以讓多個線程進入,我們可以在一定條件下阻塞線程,也可以在一定條件下喚醒指定線程,或者喚醒全部等待的線程。
條件變數這裡的api,除了init、destory這種用來初始化、銷毀的,wait和timewait用來等待條件成立,也就是上鎖,其中timewait是wait的帶時間版本,而signal和broadcast的作用是喚醒線程。
需要注意的是wait操作需要一個mutex作配合,
uv_mutex_t mutex;uv_cond_t cond;int num = 0;void worker1(void *arg) { while(1){ uv_mutex_lock(&mutex); sleep(2); num++; printf("waiting for num>0
"); uv_cond_wait(&cond, &mutex); printf("num is %d
",num); num--; uv_mutex_unlock(&mutex); }}void worker2(void *arg) { while(1){ uv_mutex_lock(&mutex); sleep(2); if(num>0){ uv_cond_signal(&cond); } uv_mutex_unlock(&mutex); }}int main(int argc, char **argv) { uv_mutex_init(&mutex); uv_cond_init(&cond); uv_thread_t nthread1,nthread2; uv_thread_create(&nthread1, worker1, NULL); uv_thread_create(&nthread2, worker2, NULL); uv_thread_join(&nthread1); uv_thread_join(&nthread2);}
上面的代碼里,mutex是必須要加上的,我們可以看到一件交替列印的情形,這裡的執行流程是worker1先執行到`uv_cond_wait`處,釋放了自己的mutex,同時阻塞自身,這時候worker2已經拿到鎖的控制權了,所以可以執行代碼了。
注意到當worker2執行到`uv_cond_signal`時,實際上worker1就已經被激活了,然而它還需要mutex鎖的所有權,所有權不在手裡就相當於調用了uv_mutex_lock,等待mutex,此時worker2把自己的mutex釋放,worker1繼續執行,直到釋放鎖開始循環。
條件量的一大特點就是可以設置條件來控制uv_cond_signal的執行,進而我們可以利用它來阻塞代碼執行,所以在線程池的實現中,條件變數是必不可少的,所有的工作塞進隊列里,當隊列為空的時候我們可以利用條件變數來阻塞代碼。
uv_cond_broadcast和uv_cond_signal的區別在於,前者激活所有等待cond的線程,後者一般只激活一條,雖然有人說uv_cond_signal會根據線程優先度或者等待時間來做優先選擇,但事實上沒有保障的。
信號量API
分別是uv_sem_init、uv_sem_destroy、uv_sem_post、uv_sem_wait、uv_sem_trywait這五個。
信號量作為一個比互斥鎖更寬泛的資源競爭解決方案,在操作系統上也是存在的,它即可用在多線程里對於內存的競爭訪問,也可以用在多進程對於同一資源的競爭訪問,libuv封裝的信號量API是基於內存信號量的,用於解決線程間的同步問題。
還有一種信號量叫做有名信號量,可以用在多進程同步上,libuv沒有做封裝。
信號量有兩個操作,分別是P操作和V操作,uv_sem_wait對應P操作,uv_sem_post對應V操作。
信號量用一個數值S表示,在uv_sem_init的時候可以設置。
當執行P操作時,判斷S是否大於0,如果不大於0,那麼無法進入臨界區,代碼會阻塞,如果大於0,那麼S -= 1,進入臨界區。
當執行V操作時,S+=1,如果S大於0,那麼就會喚醒其他進程被阻塞的代碼進入臨界區。
信號量的兩個操作P和V都是原子性的,不可能被打斷,所以可以保證S的變化是可控的。
可以看出來信號量非常強大,可以模擬上面的幾種鎖機制,比如說把S設置為1,那麼一個簡單的互斥鎖就可以這麼寫:
uv_sem_t sem;void worker(void *arg) { uv_sem_wait(&sem); sleep(2); printf("thread id is:%lu
",uv_thread_self()); uv_sem_post(&sem);}int main(int argc, char **argv) { uv_sem_init(&sem,1); uv_thread_t nthread1,nthread2; uv_thread_create(&nthread1, worker, NULL); uv_thread_create(&nthread2, worker, NULL); uv_thread_join(&nthread1); uv_thread_join(&nthread2);}
這裡面S為1,第一個進程在sem_wait的時候檢查到了S>0,進入臨界區後把S變成了0,這時候第二個進程檢查到了S並不大於0就阻塞住了,當第一個進程執行完畢之後,sem_post把S加1變成了1,此時S大於0,那麼就喚醒了其他進程重新進入臨界區,同時把S減1。
可以看出S的值在整個進程里都始終保持著0或者1這種狀態,這種信號量叫做二進位信號量,也就是互斥鎖的表現。
uv_sem_t sem;int num = 0;void reader(void *arg) { uv_sem_wait(&sem); uv_sem_post(&sem); sleep(2); printf("reader,num is%d
",num);}void writer(void *arg) { printf("writer,num is%d
",num); num++; uv_sem_post(&sem);}int main(int argc, char **argv) { uv_sem_init(&sem,0); uv_thread_t nthread1,nthread2,nthread3; uv_thread_create(&nthread1, reader, NULL); uv_thread_create(&nthread2, reader, NULL); uv_thread_create(&nthread3, writer, NULL); uv_thread_join(&nthread1); uv_thread_join(&nthread2); uv_thread_join(&nthread3);}
這裡使用信號量實現了一個簡單的寫鎖,一寫多讀,首先執行的必然是writer,隨後兩個reader的代碼就會立即執行。
屏障API
分別是uv_barrier_init、uv_barrier_destroy、uv_barrier_wait這三個。
在多進程的時候,我們總會碰到一個需求,就是需要等待一組進程全部執行完畢後再執行某些事,由於多線程是亂序的,無法預估線程都執行到哪裡了,這就要求我們有一個屏障作為同步點,在所有有屏障的地方都會阻塞等待,直到所有的線程都的代碼都執行到同步點,再繼續執行後續代碼。
barrier除了init和destroy的操作外,只有一個wait的API,在uv_barrier_wait執行的地方就是一個同步點,直到指定數量的進程到達了同步點。
如果沒有指定數量的進程運行到同步點上,那麼所有進程都會阻塞住,所以在設定數量的時候需要注意。
可以在進程里設置多個同步點,但要注意第一次都到達同步點被放行以後,繼續執行的代碼在第二個同步點也要有那麼多數量,不然的話會阻塞。
uv_barrier_t barrier;void worker1(void *arg) { printf("worker1,thread id is:%lu
",uv_thread_self()); sleep(2); uv_barrier_wait(&barrier);}void worker2(void *arg) { printf("worker2,thread id is:%lu
",uv_thread_self()); sleep(1); uv_barrier_wait(&barrier);}void callback(void *arg){ printf("waiting for worker
"); uv_barrier_wait(&barrier); printf("all thread done
");}int main(int argc, char **argv) { uv_barrier_init(&barrier,3); uv_thread_t nthread1,nthread2,nthread3; uv_thread_create(&nthread1, worker1, NULL); uv_thread_create(&nthread2, worker2, NULL); uv_thread_create(&nthread3, callback, NULL); uv_thread_join(&nthread1); uv_thread_join(&nthread2); uv_thread_join(&nthread3);}
線程本地存儲API
分別是uv_key_create、uv_key_delete、uv_key_get、uv_key_set這四個。
線程本地存儲TLS是一個很有用的特性,我們在多線程里一直都是共享變數,TLS允許我們存儲共享變數的副本,這個副本只在這個線程里存在,每個線程里都互相不影響。
uv_key_t key;uv_barrier_t barrier;int num = 0;void worker(void *arg) { num++; uv_key_set(&key,&num); int my_num = *((int*)uv_key_get(&key)); uv_barrier_wait(&barrier); printf("num is %d,my num is%d
",num,my_num);}int main(int argc, char **argv) { uv_key_create(&key); uv_barrier_init(&barrier,3); uv_thread_t nthread1,nthread2,nthread3; uv_thread_create(&nthread1, worker, NULL); uv_thread_create(&nthread2, worker, NULL); uv_thread_create(&nthread3, worker, NULL); uv_thread_join(&nthread1); uv_thread_join(&nthread2); uv_thread_join(&nthread3); uv_key_delete(&key);}
實現
POSIX下實現
前面說過,在unix或者類unix下,有一個POSIX thread的標準實現,uv_thread的大部分API都是對pthread的簡單封裝,其中uv_thread_t、uv_mutex_t這類結構體,在posix就是等價於pthread的pthread_t、pthread_mutex_t。
typedef pthread_once_t uv_once_t;typedef pthread_t uv_thread_t;typedef pthread_mutex_t uv_mutex_t;typedef pthread_rwlock_t uv_rwlock_t;typedef UV_PLATFORM_SEM_T uv_sem_t;typedef pthread_cond_t uv_cond_t;typedef pthread_key_t uv_key_t;typedef pthread_barrier_t uv_barrier_t;
這裡面只有uv_sem_t在不同平台上的是不同的定義,其他的都是直接使用POSIX原本的結構體。
之前說過unix系下thread和相關鎖的實現大多數都是封裝pthread的,libuv的封裝一般遵循這個原則:
1. libuv的錯誤都是用返回值為負值表示,所以在系統調用得到非零的錯誤代碼時,它要保證這是一個負值,所以在調用諸如pthread_create之類的函數時,它會得到的錯誤代碼取負,典型示例:
int uv_thread_join(uv_thread_t *tid) { return -pthread_join(*tid, NULL);}
2. 在libuv提供的幾種同步鎖機制里,對於錯誤基本上採用abort()處理,因為這些錯誤都是libuv無法處理的,典型示例:
void uv_mutex_destroy(uv_mutex_t* mutex) { if (pthread_mutex_destroy(mutex)) abort();}
對於使用tryxxx之類API來嘗試獲得鎖,libuv只會對於EBUSY或者EAGAIN(一個意思)有響應,返回的錯誤是UV_EBUSY(就是-EBUSY),典型例子:
int uv_rwlock_trywrlock(uv_rwlock_t* rwlock) { int err; err = pthread_rwlock_trywrlock(rwlock); if (err) { if (err != EBUSY && err != EAGAIN) abort(); return -EBUSY; } return 0;}
為兼容做的努力:
線程類api里,uv_thread_create這個API雖然是說封裝的是pthread_create,但它還做了一些額外工作:
1. uv_thread由於簡化API,並沒有像pthread_create里有個pthread_attr_t參數,所以在調用pthread_create時,一般會把這個參數設置為NULL,但在mac OS下有個例外,它創建的第一個thread默認的stack size是8M,但是後續的默認就只有512K,這個數字是不夠的,可以參見蘋果官方文檔Customizing Process Stack Size ,所以libuv特意為這個問題做了兼容性工作,使用getrlimit獲取到系統的RLIMIT_STACK,在使用pthread_attr_setstacksize設置新線程的stacksize為RLIMIT_STACK。
2. pthread_create里的worker函數的定義形式是void *worker(void *arg),libuv的worker的定義形式是void worker(void *arg),兩者可以轉換。
uv_thread_self,uv_thread_equal,uv_thread_join這三個API都是簡單的對pthread_self,pthread_equal和pthread_join的封裝。
互斥鎖API、讀寫鎖API、多線程初始化api、線程本地存儲API都只是對pthread的簡單封裝,封裝的原則遵循我上面提到的那兩點。
值得注意的是uv_mutex_init這個API,在debug模式下,它會通過pthread_mutexattr_settype來為mutex設置錯誤檢查,它的作用可以在 pthread_mutexattr_settype(3) - Linux man page上看到 :
This type of mutex provides error checking. A thread attempting to relock this mutex without first unlocking it shall return with an error. A thread attempting to unlock a mutex which another thread has locked shall return with an error. A thread attempting to unlock an unlocked mutex shall return with an error.
條件變數API本來也類似於互斥鎖、讀寫鎖之類的封裝,但它為了uv_cond_timedwait這個API在時間兼容性做了一些處理。
通常情況下,pthread_cond_timewait默認的時鐘類型是CLOCK_REALTIME,用的是軟體時間,當時間修改時會被影響到,所以一般我們在pthread_cond_init時會設置一個condattr,用於設置時鐘類型為CLOCK_MONOTONIC,它是不受時間更改影響的,詳見pthread_condattr_setclock。
蘋果系統和安卓5.0之前不支持CLOCK_MONOTONIC,所以libuv對此做了兼容,在uv_cond_timewait里,對於蘋果系統調用pthread_cond_timedwait_relative_np這個API,而對於安卓5.0以前的系統調用pthread_cond_timedwait_monotonic_np,對於其他系統則只是封裝pthread_cond_timedwait。
值得注意的是蘋果系統調用的pthread_cond_timedwait_relative_np這個採用的是和當前時間相對的時間。
在pthread_cond_destory時,libuv對蘋果系統的一個bug做了hack,這個bug是這樣的:
mac: Add a hack to http://condition_variable_posix.cc to avoid a crash.
The Darwin pthreads subsystem has a bug that can cause the program to crash if aconditional variable is destroyed after it has been signalled, but before it hasbeen waited on. This hack is a solution proposed by Apple in its own developerforums.Committed:https://crrev.com/9c98c005f4ea59dd9425a4ee5d4e278f1c00ad8c
hack的方法就是調用一次pthread_cond_timedwait_relative_np。
屏障API在android NDK、mac os、ios以及IBM z/OS下沒有對應的實現,所以libuv為這些平台做了兼容性實現,這種類似於前端polyfill的作法在libuv里有不少,我們以後會看到。
libuv做的屏障API,基本上就是照著POSIX原生的表現實現的,簡易的實現就是利用條件量來模擬,它提供的三個API里,重要的是pthread_barrier_wait,它的邏輯是先攔截,等到指定數量的線程到了同步點後再全部放行,我們僅僅需要在一個結構體保存三個變數,分別是count、cond和mutex(cond必須要一個mutex配合),簡易實現如下:
pthread_mutex_lock(&barrier->mutex);if (--barrier->count == 0) { pthread_cond_broadcast(&barrier->cond); pthread_mutex_unlock(&barrier->mutex); return 0;}do { pthread_cond_wait(&barrier->cond, &barrier->mutex);} while (barrier->count > 0);pthread_mutex_unlock(&barrier->mutex);
可以看出這裡的邏輯非常簡單,每次進來的時候都會把count減1,並且cond_wait,當count減的只剩下0的時候,cond_broadcast激活所有等待的線程,進入後續的邏輯。
這個實現當然還是可以用信號量來實現,我們之前說過PV信號量能做很多事情,但是寫法比這個稍微複雜一些。
但是這個簡易實現在實際應用是有問題的,就是在於我們實現pthread_barrier_destroy的時候,我們只能簡單的把鎖給銷毀了,但是問題是我們在使用pthread_barrier_destroy的時候並不能保證它是在合適的時機調用的,因為此時可能其他線程還在使用這個count、mutex或者cond,安全銷毀存在問題。
所以我們需要有一個返回值表示所有的線程都已經脫離了barrier_wait狀態了,這個返回值就是一個約定值PTHREAD_BARRIER_SERIAL_THREAD,並且在destroy的時候,如果線程還在使用結構體的話,就返回EBUSY,類似於這麼個做法:
int rc = pthread_barrier_wait(&b);if (rc == PTHREAD_BARRIER_SERIAL_THREAD){ pthread_barrier_destroy(&b); // 可能返回EBUSY}
所以libuv的實現也是修復了這個問題,他們參考了openbsd實現 openbsd/rthread_barrier.c ,在結構體里增加了兩個新的數值變數,分別是in和out,初始化的時候都是0,在每次進入pthread_barrier_wait的時候,in都會加1,並且使用pthread_cond_wait阻塞,直到in達到原本指定數量時,開始執行初次喚醒邏輯,這時候也是返回PTHREAD_BARRIER_SERIAL_THREAD的時機,這次沒有使用cond_broadcast喚醒所有線程,而只是喚醒一條線程,並且在那之前會把in重新置為0(用不到了),把out初始化為count-1(表示還需要喚醒多少條線程),而被喚醒後首先執行的還是減out,再喚醒其他線程,釋放mutex鎖,直到每個線程都被喚醒,邏輯也就結束了。
這裡面in變數表示有多少線程進入沉睡期,而out變數表示有多少線程完成了喚醒和釋放鎖,他們在最開始和結束的時候都會變成0,表示未開始或者已完全結束,所以我們可以通過判斷in和out是否大於0來知道現在釋放鎖會不會有問題,如果還在使用的時候就返回EBUSY。
libuv在封裝的uv_barrier_wait時候,考慮到了PTHREAD_BARRIER_SERIAL_THREAD的問題,所以它的返回值為1的時候表示所有線程到達同步點,我們可以在這個時候調用uv_barrire_destory。
再來說說無名信號量API的封裝,在libuv里實際上利用了三套信號量的API做封裝,分別是蘋果系統內核的semaphore_xx系列,POSIX的sem_xx系列,以及System V 信號量的semget/semop/semctl系列,當然API形式模仿的是POSIX信號量,所以對其他兩個系列做了封裝。
前面曾經提到過結構體的定義,只有一個uv_sem_t被定義成UV_PLATFORM_SEM_T,在POSIX信號量用的是sem_t,在蘋果系統里是semaphore_t,而在z/OS就是個int,因為semget返回的semid就是用整數表示的。
這裡面semaphore_xx系列只用於蘋果系統,而System V 信號量只用於IBM z/OS,其他系統全部採用POSIX信號量,原因是MAC OS X10.6之前並不支持POSIX信號量,而支持POSIX信號量後又不支持無名信號量,所以libuv對蘋果直接採用它自己的semaphore_xx系列封裝,而IBM z/OS 確實不支持POSIX信號量,它只有一套System V信號量的API。
System V 信號量API實際上也存在於unix/linux系統,也可以用來創建無名信號量它可以用來完成一些定製化的工作,比如說PV信號量里對於信號數值加1減1的操作,System V 可以允許不是1的數值,當然這也帶來了很多複雜度。
先看看對於蘋果semaphore_xx系列的封裝,這套內核API的具體說明在蘋果的文檔里有描述,參見Synchronization Primitives。
可以看到semaphore_xx在創建和銷毀時需要指定一個taskid,它表示是這個lock的擁有者,一般使用mach_task_self()可以獲取到當前的mach task,第3個參數表示的是等待隊列的同步策略,一般選擇FIFO,在調用semaphore_create時,它會返回成功或者失敗的代碼,libuv對其中的一些做了響應。
由於semaphore_xx系列沒有trywait,所以它用了一個semaphore_timedwait,但是等待時間設置為0,通過返回值是否為KERN_OPERATION_TIMED_OUT來判斷是不是鎖被佔用。
對於如何把System V 信號量API封裝成POSIX信號量API,這裡有一篇非常好的文章可以參考Implement POSIX Semaphore APIs using System V Semaphores APIs
Windows下的實現
先提個醒,雖然windows系統封裝的都是windows API,但還是有很多兼容性工作的,特別libuv是支持windows XP的,為了支持windows XP,libuv就沒辦法使用windows vista帶來的condition-variable APIs,所以在面對uv_cond系列的API封裝時,它做了一系列fallback實現。
簡單介紹幾個概念:
HANDLE: 在windows系統編程里,一個很重要的概念是句柄(HANDLE),它是一種特殊的指針,可以把它想像成類似unix文件描述符一類的東西,它是系統和用戶空間的通道,可以用來操作系統里的資源和對象,HANDLE統一使用CloseHandle來關閉。
Event:Event對象裡面有兩種狀態,表示信號的開和關,它可以設置成自動擋或者手動擋,可以使用SetEvent把狀態置為開(signaled),手動擋模式下,可以用ResetEvent把狀態置為關(nonsignaled),自動擋模式下,如果某個線程里有正在等待信號而阻塞,釋放阻塞時,狀態自動被置為關(nonsignaled)。
WaitForSingleObject:這個函數有兩個參數,一個是handle,還有一個是時間,這條語句會一直阻塞,除非等待HANDLE所指的對象狀態變化為signaled,或者時間到期。它能等待的對象類型有Change notification、Console input、Event、Memory resource notification、Mutex、Process、Semaphore、Thread、Waitable timer這幾種,也就是SYNCHRONIZE訪問許可權的對象。
WaitForSingleObject的時間設置為0的話,就相當於詢問信號狀態是否為開,如果不是則返回WAIT_TIMEOUT。如果時間設置為INFINITE,那麼它只能等待對象狀態變化,等待期間不能釋放HANDLE,不然行為是未定義的。
WaitForMultipleObjects:和上面函數類似,只不過它允許等待一組對象,當這組對象的狀態全部設置為signaled或者任意一個設置為signaled的時候返回。
先來說說uv_once的實現,它的目的是多線程初始化,所以有兩個要求:
- 多個線程同時執行時,僅有一個線程執行對應的初始化。
- 在對應初始化函數未執行完畢的情況下,其他線程必須等待。
為了實現這個功能,我們可以先PK選出一個指定線程作為初始化工作線程,其他線程阻塞等待。
PK選出單個線程,我們可以利用InterlockedCompareExchangePointer函數,它是一個原子操作,所以是線程安全的,它的作用是對比第一個參數指針所指的值和第三個參數,如果結果為true,則把指針指向第二個參數,返回值永遠是第一個指針的值,基於這樣的特性,我們可以利用InterlockedCompareExchangePointer(nullpointer,value,NULL)這樣的形式,來保證多線程下它的返回值為NULL只有一次,進而選定了初始化工作線程。
其他線程阻塞等待,其實只要利用Event和WaitForSingleObject即可,所有線程剛進入的時候都創建一個手動擋Event,PK時拿自己的Event作為第二個參數,如果PK失敗,那麼返回的結果就是初始化工作線程內部的Event,自己持有的Event可以直接關閉,並使用WaitForSingleObject等待初始化工作線程的Event發送信號,當對應的初始化結束後,阻塞也就結束了。
libuv還在uv_once_t結構體里加了個ran,初始化結束後就置為1,相當於緩存,這樣以後再次有這條語句的時候可以直接忽略了。
線程本地存儲TLS的API在windows上有一系列對應的api,分別是TlsAlloc(對應key_create)、TlsFree(對應key_delete)、TlsGetValue(對應key_get)、TlsSetValue(對應key_set),所以libuv對於這套API只是做點簡單的封裝,有這麼幾個小細節:
- TlsAlloc拿到的tls_index其實是個數字,由於libuv的TLS API參數里都是指針,所以uv_key_t在windows下就是只有一個DWORD(數字類型)元素的結構體。
- TlsAlloc失敗後會返回TLS_OUT_OF_INDEXES這個錯誤,在利用TlsFree刪除key的時候,libuv把tls_index設置為TLS_OUT_OF_INDEXES,防止再被使用。
- TlsGetValue拿到結果為NULL的時候,可能是因為本來存的就是0,也可能是獲取失敗,所以需要通過GetLastError的結果是否ERROR_SUCCESS來判斷成功與否。
線程uv_thread_t這個結構體在windows里就是個HANDLE,我們需要實現的API不多,但有些地方還是需要曲線救國的,比如說uv_thread_self。
Win32的API提供創建線程的標準函數是CreateThread,但事實上libuv用了另外一個函數_beginthreadex來創建線程,其原因在MSDNCreateThread function里說的很清楚:
A thread in an executable that calls the C run-time library (CRT) should use the _beginthreadex and _endthreadex functions for thread management rather than CreateThread and ExitThread; this requires the use of the multithreaded version of the CRT. If a thread created using CreateThread calls the CRT, the CRT may terminate the process in low-memory conditions.
它們兩者的區別就在於對C運行時庫的處理上,C運行時庫有大量的全局變數,_beginthreadex對這個做了處理,使得每個線程都有自己的errno之類的變數。
_beginthreadex的參數可以是6個,第一個參數決定是否被子進程繼承;第二個參數是新線程的棧空間大小,設置為0可以使用系統默認值;第三個參數是新線程創建後的工作函數的指針;第四個參數是要傳給新線程的參數列表;第五個參數是設置初始運行狀態的標誌,可以設置為0讓它創建後立即執行工作,也可以設置為CREATE_SUSPENDED後自己手動執行工作;最後一個參數可以用來直接指定接收新線程的指針地址,一般設置為NULL。
libuv為了實現uv_thread_self這個API,它定義了一個結構體ctx,把uv_thread_create里傳入的回調函數和參數指針給存放起來了,它為_beginthreadex的第三個參數定義了一個函數UINT __stdcall uv__thread_start(void* arg),並把ctx作為第四個參數傳給了它;如果線程成功創建,ctx里再保存一個handle,就是線程本身,最後使用ResumeThread讓線程進入運行狀態。
在uv__thread_start里,libuv做了一些初始化工作,首先它利用uv_once來創建一個全局的uv__current_thread_key,隨後它利用線程本地存儲TLS在這個key里存儲一個void*,也就是當前線程本身的HANDLE,雖然每個線程都利用相同的key來存儲,但由於TLS在每個線程下都是獨立的,所以不會有任何衝突。
之所以做上面的這份工作,就是為了在調用uv_thread_self的時候,可以通過TLS取出對應的HANDLE並返回。
設置完uv__current_thread_key後,libuv把ctx里保存的工作函數和對應參數取出來,再直接使用ctx.entry(ctx.arg);執行它。
_beginthreadex當然也會報錯,在出錯的情況下,首先要把之前的ctx給釋放掉,隨後根據錯誤代碼返回對應的錯誤代碼,uv_thread_create響應EACCES、EAGAIN和EINVAL三種錯誤。
uv_thread_join這個API的實現只是使用WaitForSingleObject等待對應的Thread對象,在線程結束後再利用CloseHandle來關閉釋放。
在windows下,uv_thread_equal的實現就是簡單的比較一下兩個thread之間的HANDLE是否相等。
接下來說說互斥鎖的實現,在windows下有一套Mutex的API,比如CreateMutex、OpenMutex、ReleaseMutex這些,由於Mutex擁有SYNCHRONIZE訪問許可權,可以通過WaitForSingleObject來配合使用,起到了類似uv_mutex_lock的作用。
不過libuv的互斥鎖並不是封裝的Mutex,而是利用了另外一套叫做Critical Section Objects的API。顧名思義,臨界區的作用類似於Mutex,但是相比Mutex,它更加輕量,因為Mutex是可以在多線程也可以在多進程下同步使用,而Critical Section只能用在多線程里。
A critical section object provides synchronization similar to that provided by a mutex object, except that a critical section can be used only by the threads of a single process. Event, mutex, and semaphore objects can also be used in a single-process application, but critical section objects provide a slightly faster, more efficient mechanism for mutual-exclusion synchronization (a processor-specific test and set instruction)
-- Critical Section Objects
libuv里的mutex API在臨界區API上都有完全對應,所有的封裝只是簡單的調用函數。
mutex_init對應InitializeCriticalSection,mutex_destory對應DeleteCriticalSection,mutex_lock對應EnterCriticalSection,mutex_trylock對應TryEnterCriticalSection,mutex_unlock對應LeaveCriticalSection。
再來說說信號量API,Win32的API也提供了一套信號量API,Semaphore Objects ,它和Mutex的API類似,都是一個擁有SYNCHRONIZE訪問許可權的HANDLE,可以使用WaitForSingleObject來配合,它主要有CreateSemaphore和ReleaseSemaphore兩個API組成。
在WaitForSingleObject步驟的時候,信號量值會自動減1,而ReleaseSemaphore可以給信號量值增加數值,而且數值也是可以指定的,所以它比sem_post的自由度更大一點。
在libuv中,uv_sem_t就是一個HANDLE,CreateSemaphore有四個參數,第一個參數是設置Semaphore Attributes,如果設置為NULL就表示不能被子進程繼承;第二個參數是信號量初始值;第三個參數是信號量允許的最大值,因為我們的ReleaseSemaphore可以一直加數值,在這裡可以設個上限,但libuv不需要高靈活度,所以一般設置為INT_MAX;最後一個參數是信號量名字,CreateSemaphore可以創建有名或者無名的信號量,因為libuv封裝的是無名信號量,所以直接設為NULL。
uv_sem_init的封裝就是保存CreateSemaphore的返回HANDLE,uv_sem_destroy只需要調用CloseHandle即可,uv_sem_post就是調用ReleaseSemaphore函數,當然了Release Count需要設置為1,而uv_sem_wait和uv_sem_trywait也就是利用WaitForSingleObject,通過返回值來判斷是否有錯。
對於讀寫鎖的封裝,實際上windows是有讀寫鎖API的 Slim Reader/Writer (SRW) Locks ,這個讀寫鎖API不支持windows XP,libuv曾經做了兼容性的寫法,在XP上自己實現了一份讀寫鎖,而在vista極其以上的系統上使用WIN32提供的API做封裝,但是libuv後來放棄了這種兼容性寫法,刪除了SRW Lock的封裝,在所有windows系統下都使用自實現的讀寫鎖。
之所以不是用SRW Lock,原因在於它和pthread_rwlock的表現不同,比如說在pthread_rwlock里,當我們線程上了讀鎖以後,此時無法在裡面使用rwlock_trywrlock嘗試獲取寫鎖,但是SRW Lock里的TryAcquireSRWLockExclusive卻可以返回正常,所以SRW Lock和pthread_rwlock在語義上有很大差別,生搬硬套的話對於寫業務邏輯的用戶來說會埋下bug。
libuv自己實現的這套rwlock,利用了信號量,我們在之前演示信號量的時候也說過它可以實現讀寫鎖,不過由於一個線程允許多個讀鎖,所以需要記錄一下讀鎖的數量,而這個數量又需要保證它在多線程下不出錯,所以又得引入一個互斥鎖。uv_rwlock_t結構體里保存了一個信號量HANDLE,一個用於記錄讀鎖數量的int的num_readers,以及保護num_readers_的互斥鎖CRITICAL_SECTION。
在uv_rwlock_init里,首先使用CreateSemaphoreW創建了一個信號量值為1的HANDLE,然後用InitializeCriticalSection初始化了互斥鎖,再把num_readers_初始化為0。
在uv_rwlock_destroy里,也只是調用DeleteCriticalSection和CloseHandle來釋放資源。
在寫鎖上面,libuv的實現不需要其他輔助,它使用的是個二值信號量來保護寫操作,uv_rwlock_wrlock的封裝只是等待信號量鎖。
void uv_rwlock_wrlock(uv_rwlock_t*rwlock) { DWORD r = WaitForSingleObject(rwlock->state_.write_semaphore_, INFINITE); if (r != WAIT_OBJECT_0) uv_fatal_error(GetLastError(), "WaitForSingleObject");}
uv_rwlock_trywrlock的和上面的步驟類似,只不過是把INFINITE改成了0,根據返回值是不是WAIT_TIMEOUT來判斷是否能進入寫的臨界區。
uv_rwlock_wrunlock只需要把信號量的值加1即可。
在讀鎖上面,是需要和寫鎖相結合的,所以稍微麻煩一些。
uv_rwlock_rdlock封裝的邏輯是這樣的,多個讀鎖等待寫鎖,沒有必要全部都WaitForSingleObject,讀鎖這邊為了保護num_readers_加上了互斥鎖,所以只需要第一個進入臨界區的等待寫鎖釋放即可,其他讀鎖會一直等待這個互斥鎖的釋放,一旦寫鎖釋放後,所有讀鎖都會依次的被放行,並且num_readers_會記錄下讀鎖的數量。
uv_rwlock_tryrdlock的封裝更麻煩一些,因為上面的可以利用阻塞的特點來等待,這裡libuv先嘗試進入num_readers_的臨界區,如果是第一個進入的則嘗試等待寫鎖,成功的話就讓num_readers_增加,其他線程進來後不用嘗試等待寫鎖(因為已經有一個試了),直接增加num_readers_即可。
uv_rwlock_rdunlock的封裝邏輯是讓num_readers_以此減1,當減少到0的時候,使用ReleaseSemaphore來給信號量加回去,這確保了信號量是在所有的讀鎖阻塞之後再恢復原樣。
libuv引入了num_readers_,目的就是為了保證在和信號量配合的時候,能夠只能讓一個讀鎖進行等待和釋放。
之前說過,libuv里對於windows xp兼容,做了兩套的條件變數API,其中一套是利用比較新的 Condition Variables API,另外一套是給XP做的兼容函數。
在Condition Variable API下,封裝uv_cond的話比較簡單,uv_cond_t 就保存一個CONDITION_VARIABLE OBJECT,uv_cond_init對應的是InitializeConditionVariable,uv_cond_signal對應的是WakeConditionVariable,uv_cond_broadcast對應的是WakeAllConditionVariable,uv_cond_wait對應的是SleepConditionVariableCS,這裡傳入的互斥鎖剛好也是Critical Section,uv_cond_timedwait依舊用的是SleepConditionVariableCS,它的第三個參數可以設置等待時間,至於uv_cond_destory不用做任何事。
而在兼容API的實現上,libuv就需要做很多事情了,uv_cond_t的結構體用到了很多東西。
struct { unsigned int waiters_count; CRITICAL_SECTION waiters_count_lock; HANDLE signal_event; HANDLE broadcast_event;} fallback;
這裡面的waiters_count用來保存有多少線程正在等待,waiters_count_lock是為了保護waiters_count,signal_event和broadcast_event是為了cond_signal和cond_broadcast服務的,前者是個自動擋Event,後者是手動擋的Event。
在uv_cond_init里,只需要把上面的四個初始化或者創建出來,而在uv_cond_destory里,也是把兩個Event用CloseHandle關閉,並且用DeleteCriticalSection刪除waiters_count_lock。
uv_cond_wait和uv_cond_timedwait在這裡用了同一個函數uv_cond_wait_helper實現,每次進入wait操作的時候都會給waiters_count加一,再使用WaitForMultipleObjects等待兩個Event的任意一個返回,一旦返回則waiters_count減一,同時還記錄下當前線程是否是最後一個waiters,如果是的話,就手動ResetEvent複位broadcast_event,因為broadcast_event是個手動擋。
因為上面的等待兩個Event,所以不管是signal_event被置為signaled,還是broadcast_event被置為signaled,都會有返回,這樣就可以保證cond_signal和cond_broadcast能夠喚醒。
接下來uv_cond_signal和uv_cond_broadcast的實現就非常簡單了,僅僅是需要使用SetEvent讓對應的event狀態置為開(signaled)即可,不過libuv還判斷了waiters_count是否大於0,如果沒有waiters,就不用發送信號了。
這裡的waiters_count主要還是為了判斷是不是last_waiter的,所以才會加加減減,它也是為了服務於接受uv_cond_broadcast發來的喚醒。
最後還剩下一個柵欄API,windows當然也沒有提供對應的API實現,和之前的蘋果系統、安卓或者IBM z/OS類似,libuv自己實現了一套柵欄API。
遺憾的是,這是一套歷史遺留的API,採用了一個互斥鎖和兩個信號量實現的,曾經在unix兼容實現的時候也用的這麼一套,後來unix那邊更改成統一的mutex+cond實現,這邊沒有跟著修改。
可以看一下它的uv_barriers_wait的實現
int uv_barrier_wait(uv_barrier_t* barrier) { int serial_thread; uv_mutex_lock(&barrier->mutex); if (++barrier->count == barrier->n) { uv_sem_wait(&barrier->turnstile2); uv_sem_post(&barrier->turnstile1); } uv_mutex_unlock(&barrier->mutex); uv_sem_wait(&barrier->turnstile1); uv_sem_post(&barrier->turnstile1); uv_mutex_lock(&barrier->mutex); serial_thread = (--barrier->count == 0); if (serial_thread) { uv_sem_wait(&barrier->turnstile1); uv_sem_post(&barrier->turnstile2); } uv_mutex_unlock(&barrier->mutex); uv_sem_wait(&barrier->turnstile2); uv_sem_post(&barrier->turnstile2); return serial_thread;}
這裡用到的兩個信號量,turnstile1是用來攔截邏輯直到所有的線程都到位(barrier->count == barrier->n),隨後大家一同等待turnstile2,直到(--barrier->count == 0)時,再互相喚醒,從而保證了最後大家都是阻塞到同一個同步點。
總結
從上面可以看出來,libuv對於線程的封裝是做了很多努力的,為了提供平台一致的API,libuv做了很多兼容性實現,同時也放棄了很多線程API。
首先pthread一系列的pthread_attr以及各種鎖的attr全部被捨棄了,所以想要針對線程做一些特別設置是沒有任何辦法的。
對線程本身的操作API只有個self操作,沒有辦法判斷自身是否已被join,沒辦法cancel或者exit,沒辦法做靈活性高的操作。
線程無法被detach,這一點不太好,因為實際上也可以在各種平台實現對應的兼容API,不過libuv的需求完全不需要detach,強依賴它的node.js更不需要,所以libuv不考慮這個。
線程同步鎖的種類,比如說自旋鎖,libuv可以做封裝但它用不到。
C++11的std::thread也是對平台線程做了封裝,它的API封裝老實說是高於libuv的,但是libuv的線程並不是為了專門為了多線程編程服務的,它最大的作用是為了封裝線程池,而這個線程池所用到的API很有限,所以對於線程的API它做了有限度的實現。
我們從線程這一套體系可以看出來,libuv有實用性至上、保持跨平台、保持輕量級等等設計原則,正是因為這些原則讓它做出了API的取捨。
windows下的uv_barriers可以看出libuv代碼有歷史包袱,我們以後說到定時器(timer)的時候會看到,libuv在unix下和windows下使用了兩套代碼,僅僅是因為它們是兩套人馬寫出來的,但是實現的都很好,也有計划過要共用,不過不知為何沒有推進。
推薦閱讀:
※Egg.js 中 GraphQL 小試牛刀
※Node.js 實現 Hot Reload
※Node應用內存泄漏分析方法論與實戰
※狼叔的2017年總結
※如何分析 Node.js 中的內存泄漏
TAG:Nodejs |
