@@ -33,12 +33,18 @@ static uv_once_t once = UV_ONCE_INIT;
3333static uv_cond_t cond ;
3434static uv_mutex_t mutex ;
3535static unsigned int idle_threads ;
36+ static unsigned int slow_io_work_running ;
3637static unsigned int nthreads ;
3738static uv_thread_t * threads ;
3839static uv_thread_t default_threads [4 ];
3940static QUEUE exit_message ;
4041static QUEUE wq ;
42+ static QUEUE run_slow_work_message ;
43+ static QUEUE slow_io_pending_wq ;
4144
45+ static unsigned int slow_work_thread_threshold (void ) {
46+ return (nthreads + 1 ) / 2 ;
47+ }
4248
4349static void uv__cancelled (struct uv__work * w ) {
4450 abort ();
@@ -51,38 +57,73 @@ static void uv__cancelled(struct uv__work* w) {
5157static void worker (void * arg ) {
5258 struct uv__work * w ;
5359 QUEUE * q ;
60+ int is_slow_work ;
5461
5562 uv_sem_post ((uv_sem_t * ) arg );
5663 arg = NULL ;
5764
5865 for (;;) {
5966 uv_mutex_lock (& mutex );
6067
61- while (QUEUE_EMPTY (& wq )) {
68+ wait_for_work :
69+ /* Keep waiting while either no work is present or only slow I/O
70+ and we're at the threshold for that. */
71+ while (QUEUE_EMPTY (& wq ) ||
72+ (QUEUE_HEAD (& wq ) == & run_slow_work_message &&
73+ QUEUE_NEXT (& run_slow_work_message ) == & wq &&
74+ slow_io_work_running >= slow_work_thread_threshold ())) {
6275 idle_threads += 1 ;
6376 uv_cond_wait (& cond , & mutex );
6477 idle_threads -= 1 ;
6578 }
6679
6780 q = QUEUE_HEAD (& wq );
68-
69- if (q == & exit_message )
81+ if (q == & exit_message ) {
7082 uv_cond_signal (& cond );
71- else {
83+ uv_mutex_unlock (& mutex );
84+ break ;
85+ }
86+
87+ QUEUE_REMOVE (q );
88+ QUEUE_INIT (q ); /* Signal uv_cancel() that the work req is executing. */
89+
90+ is_slow_work = 0 ;
91+ if (q == & run_slow_work_message ) {
92+ /* If we're at the slow I/O threshold, re-schedule until after all
93+ other work in the queue is done. */
94+ if (slow_io_work_running >= slow_work_thread_threshold ()) {
95+ QUEUE_INSERT_TAIL (& wq , q );
96+ goto wait_for_work ;
97+ }
98+
99+ /* If we encountered a request to run slow I/O work but there is none
100+ to run, that means it's cancelled => Start over. */
101+ if (QUEUE_EMPTY (& slow_io_pending_wq ))
102+ goto wait_for_work ;
103+
104+ is_slow_work = 1 ;
105+ slow_io_work_running ++ ;
106+
107+ q = QUEUE_HEAD (& slow_io_pending_wq );
72108 QUEUE_REMOVE (q );
73- QUEUE_INIT (q ); /* Signal uv_cancel() that the work req is
74- executing. */
109+ QUEUE_INIT (q );
110+
111+ /* If there is more slow I/O work, schedule it to be run as well. */
112+ if (!QUEUE_EMPTY (& slow_io_pending_wq )) {
113+ QUEUE_INSERT_TAIL (& wq , & run_slow_work_message );
114+ if (idle_threads > 0 )
115+ uv_cond_signal (& cond );
116+ }
75117 }
76118
77119 uv_mutex_unlock (& mutex );
78120
79- if (q == & exit_message )
80- break ;
81-
82121 w = QUEUE_DATA (q , struct uv__work , wq );
83122 w -> work (w );
84123
85124 uv_mutex_lock (& w -> loop -> wq_mutex );
125+ if (is_slow_work )
126+ slow_io_work_running -- ;
86127 w -> work = NULL ; /* Signal uv_cancel() that the work req is done
87128 executing. */
88129 QUEUE_INSERT_TAIL (& w -> loop -> wq , & w -> wq );
@@ -92,8 +133,20 @@ static void worker(void* arg) {
92133}
93134
94135
95- static void post (QUEUE * q ) {
136+ static void post (QUEUE * q , enum uv__work_kind kind ) {
96137 uv_mutex_lock (& mutex );
138+ if (kind == UV__WORK_SLOW_IO ) {
139+ /* Insert into a separate queue. */
140+ QUEUE_INSERT_TAIL (& slow_io_pending_wq , q );
141+ if (!QUEUE_EMPTY (& run_slow_work_message )) {
142+ /* Running slow I/O tasks is already scheduled => Nothing to do here.
143+ The worker that runs said other task will schedule this one as well. */
144+ uv_mutex_unlock (& mutex );
145+ return ;
146+ }
147+ q = & run_slow_work_message ;
148+ }
149+
97150 QUEUE_INSERT_TAIL (& wq , q );
98151 if (idle_threads > 0 )
99152 uv_cond_signal (& cond );
@@ -108,7 +161,7 @@ UV_DESTRUCTOR(static void cleanup(void)) {
108161 if (nthreads == 0 )
109162 return ;
110163
111- post (& exit_message );
164+ post (& exit_message , UV__WORK_CPU );
112165
113166 for (i = 0 ; i < nthreads ; i ++ )
114167 if (uv_thread_join (threads + i ))
@@ -156,6 +209,8 @@ static void init_threads(void) {
156209 abort ();
157210
158211 QUEUE_INIT (& wq );
212+ QUEUE_INIT (& slow_io_pending_wq );
213+ QUEUE_INIT (& run_slow_work_message );
159214
160215 if (uv_sem_init (& sem , 0 ))
161216 abort ();
@@ -194,13 +249,14 @@ static void init_once(void) {
194249
195250void uv__work_submit (uv_loop_t * loop ,
196251 struct uv__work * w ,
252+ enum uv__work_kind kind ,
197253 void (* work )(struct uv__work * w ),
198254 void (* done )(struct uv__work * w , int status )) {
199255 uv_once (& once , init_once );
200256 w -> loop = loop ;
201257 w -> work = work ;
202258 w -> done = done ;
203- post (& w -> wq );
259+ post (& w -> wq , kind );
204260}
205261
206262
@@ -284,7 +340,11 @@ int uv_queue_work(uv_loop_t* loop,
284340 req -> loop = loop ;
285341 req -> work_cb = work_cb ;
286342 req -> after_work_cb = after_work_cb ;
287- uv__work_submit (loop , & req -> work_req , uv__queue_work , uv__queue_done );
343+ uv__work_submit (loop ,
344+ & req -> work_req ,
345+ UV__WORK_CPU ,
346+ uv__queue_work ,
347+ uv__queue_done );
288348 return 0 ;
289349}
290350
0 commit comments