10 問 10 答:你真的瞭解線程池嗎?
《Java 開發手冊》中強調,線程資源必須通過線程池提供,而創建線程池必須使用 ThreadPoolExecutor。手冊主要強調利用線程池避免兩個問題,一是線程過渡切換,二是避免請求過多時造成 OOM。但是如果參數配置錯誤,還是會引發上面的兩個問題。所以本節我們主要是討論 ThreadPoolExecutor 的一些技術細節,並且給出幾個常用的最佳實踐建議。
我在查找資料的過程中,發現有些問題存在爭議。後面發現,一部分原因是因爲不同 JDK 版本的現實是有差異的。因此,下面的分析是基於當下最常用的版本 JDK1.8,並且對於存在爭議的問題,我們分析源碼,源碼纔是最準確的。
1 corePoolSize=0 會怎麼樣
這是一個爭議點。我發現大部分博文,不論是國內的還是國外的,都是這樣回答這個問題的:
-
提交任務後,先判斷當前池中線程數是否小於 corePoolSize,如果小於,則創建新線程執行這個任務。
-
否則,判斷等待隊列是否已滿,如果沒有滿,則添加到等待隊列。
-
否則,判斷當前池中線程數是否大於 maximumPoolSize,如果大於則拒絕。
-
否則,創建一個新的線程執行這個任務。
按照上面的描述,如果 corePoolSize=0,則會判斷等待隊列的容量,如果還有容量,則排隊,並且不會創建新的線程。
—— 但其實,這是老版本的實現方式,從 1.6 之後,實現方式就變了。我們直接看 execute 的源碼(submit 也依賴它),我備註出了關鍵一行:
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
// 注意這一行代碼,添加到等待隊列成功後,判斷當前池內線程數是否爲0,如果是則創建一個firstTask爲null的worker,這個worker會從等待隊列中獲取任務並執行。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
-
線程池提交任務後,首先判斷當前池中線程數是否小於 corePoolSize。
-
如果小於則嘗試創建新的線程執行該任務;否則嘗試添加到等待隊列。
-
如果添加隊列成功,判斷當前池內線程數是否爲 0,如果是則創建一個 firstTask 爲 null 的 worker,這個 worker 會從等待隊列中獲取任務並執行。
-
如果添加到等待隊列失敗,一般是隊列已滿,纔會再嘗試創建新的線程。
-
但在創建之前需要與 maximumPoolSize 比較,如果小於則創建成功。
-
否則執行拒絕策略。
答
上述問題需區分 JDK 版本。在 1.6 版本之後,如果 corePoolSize=0,提交任務時如果線程池爲空,則會立即創建一個線程來執行任務(先排隊再獲取);如果提交任務的時候,線程池不爲空,則先在等待隊列中排隊,只有隊列滿了纔會創建新線程。
所以,優化在於,在隊列沒有滿的這段時間內,會有一個線程在消費提交的任務;1.6 之前的實現是,必須等隊列滿了之後,纔開始消費。
2 線程池創建之後,會立即創建核心線程麼
之前有人問過我這個問題,因爲他發現應用中有些 Bean 創建了線程池,但是這個 Bean 一般情況下用不到,所以諮詢我是否需要把這個線程池註釋掉,以減少應用運行時的線程數(該應用運行時線程過多。)
答
不會。從上面的源碼可以看出,在剛剛創建 ThreadPoolExecutor 的時候,線程並不會立即啓動,而是要等到有任務提交時纔會啓動,除非調用了 prestartCoreThread/prestartAllCoreThreads 事先啓動核心線程。
-
prestartCoreThread:Starts a core thread, causing it to idly wait for work. This overrides the default policy of starting core threads only when new tasks are executed.
-
prestartAllCoreThreads:Starts all core threads.
3 核心線程永遠不會銷燬麼
這個問題有點 tricky。首先我們要明確一下概念,雖然在 JavaDoc 中也使用了 “core/non-core threads” 這樣的描述,但其實這是一個動態的概念,JDK 並沒有給一部分線程打上 “core” 的標記,做什麼特殊化的處理。這個問題我認爲想要探討的是閒置線程終結策略的問題。
在 JDK1.6 之前,線程池會盡量保持 corePoolSize 個核心線程,即使這些線程閒置了很長時間。這一點曾被開發者詬病,所以從 JDK1.6 開始,提供了方法 allowsCoreThreadTimeOut,如果傳參爲 true,則允許閒置的核心線程被終止。
請注意這種策略和 corePoolSize=0 的區別。我總結的區別是:
-
corePoolSize=0:在一般情況下只使用一個線程消費任務,只有當併發請求特別多、等待隊列都滿了之後,纔開始用多線程。
-
allowsCoreThreadTimeOut=true && corePoolSize>1:在一般情況下就開始使用多線程(corePoolSize 個),當併發請求特別多,等待隊列都滿了之後,繼續加大線程數。但是當請求沒有的時候,允許核心線程也終止。
所以 corePoolSize=0 的效果,基本等同於 allowsCoreThreadTimeOut=true && corePoolSize=1,但實現細節其實不同。
答
在 JDK1.6 之後,如果 allowsCoreThreadTimeOut=true,核心線程也可以被終止。
4 如何保證線程不被銷燬
首先我們要明確一下線程池模型。線程池有個內部類 Worker,它實現了 Runnable 接口,首先,它自己要 run 起來。然後它會在合適的時候獲取我們提交的 Runnable 任務,然後調用任務的 run() 接口。一個 Worker 不終止的話可以不斷執行任務。
我們前面說的 “線程池中的線程”,其實就是 Worker;等待隊列中的元素,是我們提交的 Runnable 任務。
每一個 Worker 在創建出來的時候,會調用它本身的 run() 方法,實現是 runWorker(this),這個實現的核心是一個 while 循環,這個循環不結束,Worker 線程就不會終止,就是這個基本邏輯。
-
在這個 while 條件中,有個 getTask() 方法是核心中的核心,它所做的事情就是從等待隊列中取出任務來執行:
-
如果沒有達到 corePoolSize,則創建的 Worker 在執行完它承接的任務後,會用 workQueue.take() 取任務、注意,這個接口是阻塞接口,如果取不到任務,Worker 線程一直阻塞。
-
如果超過了 corePoolSize,或者 allowCoreThreadTimeOut,一個 Worker 在空閒了之後,會用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 取任務。注意,這個接口只阻塞等待 keepAliveTime 時間,超過這個時間返回 null,則 Worker 的 while 循環執行結束,則被終止了。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 看這裏,核心邏輯在這裏
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 注意,核心中的核心在這裏
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
答
實現方式非常巧妙,核心線程(Worker)即使一直空閒也不終止,是通過 workQueue.take() 實現的,它會一直阻塞到從等待隊列中取到新的任務。非核心線程空閒指定時間後終止是通過 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 實現的,一個空閒的 Worker 只等待 keepAliveTime,如果還沒有取到任務則循環終止,線程也就運行結束了。
引申思考
Worker 本身就是個線程,它再調用我們傳入的 Runnable.run(),會啓動一個子線程麼?如果你還沒有答案,再回想一下 Runnable 和 Thread 的關係。
5 空閒線程過多會有什麼問題
籠統地回答是會佔用內存,我們分析一下佔用了哪些內存。首先,比較普通的一部分,一個線程的內存模型:
-
虛擬機棧
-
本地方法棧
-
程序計數器
我想額外強調是下面這幾個內存佔用,需要小心:
-
ThreadLocal:業務代碼是否使用了 ThreadLocal?就算沒有,Spring 框架中也大量使用了 ThreadLocal,你所在公司的框架可能也是一樣。
-
局部變量:線程處於阻塞狀態,肯定還有棧幀沒有出棧,棧幀中有局部變量表,凡是被局部變量表引用的內存都不能回收。所以如果這個線程創建了比較大的局部變量,那麼這一部分內存無法 GC。
-
TLAB 機制:如果你的應用線程數處於高位,那麼新的線程初始化可能因爲 Eden 沒有足夠的空間分配 TLAB 而觸發 YoungGC。
答
-
線程池保持空閒的核心線程是它的默認配置,一般來講是沒有問題的,因爲它佔用的內存一般不大。怕的就是業務代碼中使用 ThreadLocal 緩存的數據過大又不清理。
-
如果你的應用線程數處於高位,那麼需要觀察一下 YoungGC 的情況,估算一下 Eden 大小是否足夠。如果不夠的話,可能要謹慎地創建新線程,並且讓空閒的線程終止;必要的時候,可能需要對 JVM 進行調參。
6 keepAliveTime=0 會怎麼樣
這也是個爭議點。有的博文說等於 0 表示空閒線程永遠不會終止,有的說表示執行完立刻終止。還有的說等於 - 1 表示空閒線程永遠不會終止。其實稍微看一下源碼知道了,這裏我直接拋出答案。
答
在 JDK1.8 中,keepAliveTime=0 表示非核心線程執行完立刻終止。
默認情況下,keepAliveTime 小於 0,初始化的時候纔會報錯;但如果 allowsCoreThreadTimeOut,keepAliveTime 必須大於 0,不然初始化報錯。
7 怎麼進行異常處理
很多代碼的寫法,我們都習慣按照常見範式去編寫,而沒有去思考爲什麼。比如:
-
如果我們使用 execute() 提交任務,我們一般要在 Runable 任務的代碼加上 try-catch 進行異常處理。
-
如果我們使用 submit() 提交任務,我們一般要在主線程中,對 Future.get() 進行 try-catch 進行異常處理。
—— 但是在上面,我提到過,submit() 底層實現依賴 execute(),兩者應該統一呀,爲什麼有差異呢?下面再扒一扒 submit() 的源碼,它的實現蠻有意思。
首先,ThreadPoolExecutor 中沒有 submit 的代碼,而是在它的父類 AbstractExecutorService 中,有三個 submit 的重載方法,代碼非常簡單,關鍵代碼就兩行:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
正是因爲這三個重載方法,都調用了 execute,所以我才說 submit 底層依賴 execute。通過查看這裏 execute 的實現,我們不難發現,它就是 ThreadPoolExecutor 中的實現,所以,造成 submit 和 execute 的差異化的代碼,不在這。那麼造成差異的一定在 newTaskFor 方法中。這個方法也就 new 了一個 FutureTask 而已,FutureTask 實現 RunnableFuture 接口,RunnableFuture 接口繼承 Runnable 接口和 Future 接口。而 Callable 只是 FutureTask 的一個成員變量。
所以講到這裏,就有另一個 Java 基礎知識點:Callable 和 Future 的關係。我們一般用 Callable 編寫任務代碼,Future 是異步返回對象,通過它的 get 方法,阻塞式地獲取結果。FutureTask 的核心代碼就是實現了 Future 接口,也就是 get 方法的實現:
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
// 核心代碼
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
// 死循環
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 只有任務的狀態是’已完成‘,纔會跳出死循環
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
get 的核心實現是有個 awaitDone 方法,這是一個死循環,只有任務的狀態是 “已完成”,纔會跳出死循環;否則會依賴 UNSAFE 包下的 LockSupport.park 原語進行阻塞,等待 LockSupport.unpark 信號量。而這個信號量只有當運行結束獲得結果、或者出現異常的情況下,纔會發出來。分別對應方法 set 和 setException。這就是異步執行、阻塞獲取的原理,扯得有點遠了。
回到最初我們的疑問,爲什麼 submit 之後,通過 get 方法可以獲取到異常?原因是 FutureTask 有一個 Object 類型的 outcome 成員變量,用來記錄執行結果。這個結果可以是傳入的泛型,也可以是 Throwable 異常:
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
// get方法中依賴的,報告執行結果
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
FutureTask 的另一個巧妙的地方就是借用 RunnableAdapter 內部類,將 submit 的 Runnable 封裝成 Callable。所以就算你 submit 的是 Runnable,一樣可以用 get 獲取到異常。
答
-
不論是用 execute 還是 submit,都可以自己在業務代碼上加 try-catch 進行異常處理。我一般喜歡使用這種方式,因爲我喜歡對不同業務場景的異常進行差異化處理,至少打不一樣的日誌吧。
-
如果是 execute,還可以自定義線程池,繼承 ThreadPoolExecutor 並複寫其 afterExecute(Runnable r, Throwable t) 方法。
-
或者實現 Thread.UncaughtExceptionHandler 接口,實現 void uncaughtException(Thread t, Throwable e); 方法,並將該 handler 傳遞給線程池的 ThreadFactory。
-
但是注意,afterExecute 和 UncaughtExceptionHandler 都不適用 submit。因爲通過上面的 FutureTask.run() 不難發現,它自己對 Throwable 進行了 try-catch,封裝到了 outcome 屬性,所以底層方法 execute 的 Worker 是拿不到異常信息的。
8 線程池需不需要關閉
答
一般來講,線程池的生命週期跟隨服務的生命週期。如果一個服務(Service)停止服務了,那麼需要調用 shutdown 方法進行關閉。所以 ExecutorService.shutdown 在 Java 以及一些中間件的源碼中,是封裝在 Service 的 shutdown 方法內的。
如果是 Server 端不重啓就不停止提供服務,我認爲是不需要特殊處理的。
9 shutdown 和 shutdownNow 的區別
答
-
shutdown => 平緩關閉,等待所有已添加到線程池中的任務執行完再關閉。
-
shutdownNow => 立刻關閉,停止正在執行的任務,並返回隊列中未執行的任務。
本來想分析一下兩者的源碼的,但是發現本文的篇幅已經過長了,源碼也貼了不少。感興趣的朋友自己看一下即可。
10 Spring 中有哪些和 ThreadPoolExecutor 類似的工具
答
這裏我想着重強調的就是 SimpleAsyncTaskExecutor,Spring 中使用的 @Async 註解,底層就是基於 SimpleAsyncTaskExecutor 去執行任務,只不過它不是線程池,而是每次都新開一個線程。
另外想要強調的是 Executor 接口。Java 初學者容易想當然的以爲 Executor 結尾的類就是一個線程池,而上面的都是反例。我們可以在 JDK 的 execute 方法上看到這個註釋:
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*/
所以,它的職責並不是提供一個線程池的接口,而是提供一個 “將來執行命令” 的接口。真正能代表線程池意義的,是 ThreadPoolExecutor 類,而不是 Executor 接口。
最佳實踐總結
-
【強制】使用 ThreadPoolExecutor 的構造函數聲明線程池,避免使用 Executors 類的 newFixedThreadPool 和 newCachedThreadPool。
-
【強制】 創建線程或線程池時請指定有意義的線程名稱,方便出錯時回溯。即 threadFactory 參數要構造好。
-
【建議】建議不同類別的業務用不同的線程池。
-
【建議】CPU 密集型任務 (N+1):這種任務消耗的主要是 CPU 資源,可以將線程數設置爲 N(CPU 核心數)+1,比 CPU 核心數多出來的一個線程是爲了防止線程偶發的缺頁中斷,或者其它原因導致的任務暫停而帶來的影響。一旦任務暫停,CPU 就會處於空閒狀態,而在這種情況下多出來的一個線程就可以充分利用 CPU 的空閒時間。
-
【建議】I/O 密集型任務 (2N):這種任務應用起來,系統會用大部分的時間來處理 I/O 交互,而線程在處理 I/O 的時間段內不會佔用 CPU 來處理,這時就可以將 CPU 交出給其它線程使用。因此在 I/O 密集型任務的應用中,我們可以多配置一些線程,具體的計算方法是 2N。
-
【建議】workQueue 不要使用無界隊列,儘量使用有界隊列。避免大量任務等待,造成 OOM。
-
【建議】如果是資源緊張的應用,使用 allowsCoreThreadTimeOut 可以提高資源利用率。
-
【建議】雖然使用線程池有多種異常處理的方式,但在任務代碼中,使用 try-catch 最通用,也能給不同任務的異常處理做精細化。
-
【建議】對於資源緊張的應用,如果擔心線程池資源使用不當,可以利用 ThreadPoolExecutor 的 API 實現簡單的監控,然後進行分析和優化。
線程池初始化示例:
private static final ThreadPoolExecutor pool;
static {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("po-detail-pool-%d").build();
pool = new ThreadPoolExecutor(4, 8, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(512),
threadFactory, new ThreadPoolExecutor.AbortPolicy());
pool.allowCoreThreadTimeOut(true);
}
-
threadFactory:給出帶業務語義的線程命名。
-
corePoolSize:快速啓動 4 個線程處理該業務,是足夠的。
-
maximumPoolSize:IO 密集型業務,我的服務器是 4C8G 的,所以 4*2=8。
-
keepAliveTime:服務器資源緊張,讓空閒的線程快速釋放。
-
pool.allowCoreThreadTimeOut(true):也是爲了在可以的時候,讓線程釋放,釋放資源。
-
workQueue:一個任務的執行時長在 100~300ms,業務高峯期 8 個線程,按照 10s 超時(已經很高了)。10s 鍾,8 個線程,可以處理 10 * 1000ms / 200ms * 8 = 400 個任務左右,往上再取一點,512 已經很多了。
-
handler:極端情況下,一些任務只能丟棄,保護服務端。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/axWymUaYaARtvsYqvfyTtw