JUC 常用 4 大併發工具類
什麼是 JUC?
JUC 就是 java.util.concurrent 包, 這個包俗稱 JUC, 裏面都是解決併發問題的一些東西
該包的位置位於 java 下面的 rt.jar 包下面
4 大常用併發工具類:
CountDownLatch
CyclicBarrier
Semaphore
ExChanger
CountDownLatch:
CountDownLatch, 俗稱閉鎖, 作用是類似加強版的 Join, 是讓一組線程等待其他的線程完成工作以後才執行
就比如在啓動框架服務的時候, 我們主線程需要在環境線程初始化完成之後才能啓動, 這時候我們就可以實現使用 CountDownLatch 來完成
/**
* Constructs a {@code CountDownLatch} initialized with the given count.
*
* @param count the number of times {@link #countDown} must be invoked
* before threads can pass through {@link #await}
* @throws IllegalArgumentException if {@code count} is negative
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
在源碼中可以看到, 創建 CountDownLatch 時, 需要傳入一個 int 類型的參數, 將決定在執行次扣減之後, 等待的線程被喚醒
通過這個類圖就可以知道其實 CountDownLatch 並沒有多少東西
方法介紹:
CountDownLatch: 初始化方法
await: 等待方法, 同時帶參數的是超時重載方法
countDown: 每執行一次, 計數器減一, 就是初始化傳入的數字, 也代表着一個線程完成了任務
getCount: 獲取當前值
toString: 這個就不用說了
裏面的 Sync 是一個內部類, 外面的方法其實都是操作這個內部類的, 這個內部類繼承了 AQS, 實現的標準方法, AQS 將在後面的章節寫
主線程中創建 CountDownLatch(3), 然後主線程 await 阻塞, 然後線程 A,B,C 各自完成了任務, 調用了 countDown, 之後, 每個線程調用一次計數器就會減一, 初始是 3, 然後 A 線程調用後變成 2,B 線程調用後變成 1,C 線程調用後, 變成 0, 這時就會喚醒正在 await 的主線程, 然後主線程繼續執行
說一千道一萬, 不如代碼寫幾行, 上代碼:
休眠工具類, 之後的代碼都會用到
package org.dance.tools;
import java.util.concurrent.TimeUnit;
/**
* 類說明:線程休眠輔助工具類
*/
public class SleepTools {
/**
* 按秒休眠
* @param seconds 秒數
*/
public static final void second(int seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
}
}
/**
* 按毫秒數休眠
* @param seconds 毫秒數
*/
public static final void ms(int seconds) {
try {
TimeUnit.MILLISECONDS.sleep(seconds);
} catch (InterruptedException e) {
}
}
}
package org.dance.day2.util;
import org.dance.tools.SleepTools;
import java.util.concurrent.CountDownLatch;
/**
* CountDownLatch的使用,有五個線程,6個扣除點
* 扣除完成後主線程和業務線程,才能執行工作
* 扣除點一般都是大於等於需要初始化的線程的
* @author ZYGisComputer
*/
public class UseCountDownLatch {
/**
* 設置爲6個扣除點
*/
static CountDownLatch countDownLatch = new CountDownLatch(6);
/**
* 初始化線程
*/
private static class InitThread implements Runnable {
@Override
public void run() {
System.out.println("thread_" + Thread.currentThread().getId() + " ready init work .....");
// 執行扣減 扣減不代表結束
countDownLatch.countDown();
for (int i = 0; i < 2; i++) {
System.out.println("thread_" + Thread.currentThread().getId() + ".....continue do its work");
}
}
}
/**
* 業務線程
*/
private static class BusiThread implements Runnable {
@Override
public void run() {
// 業務線程需要在等初始化完畢後才能執行
try {
countDownLatch.await();
for (int i = 0; i < 3; i++) {
System.out.println("BusiThread " + Thread.currentThread().getId() + " do business-----");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
// 創建單獨的初始化線程
new Thread(){
@Override
public void run() {
SleepTools.ms(1);
System.out.println("thread_" + Thread.currentThread().getId() + " ready init work step 1st.....");
// 扣減一次
countDownLatch.countDown();
System.out.println("begin stop 2nd.....");
SleepTools.ms(1);
System.out.println("thread_" + Thread.currentThread().getId() + " ready init work step 2nd.....");
// 扣減一次
countDownLatch.countDown();
}
}.start();
// 啓動業務線程
new Thread(new BusiThread()).start();
// 啓動初始化線程
for (int i = 0; i <= 3; i++) {
new Thread(new InitThread()).start();
}
// 主線程進入等待
try {
countDownLatch.await();
System.out.println("Main do ites work.....");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
返回結果:
thread_13 ready init work .....
thread_13.....continue do its work
thread_13.....continue do its work
thread_14 ready init work .....
thread_14.....continue do its work
thread_14.....continue do its work
thread_15 ready init work .....
thread_15.....continue do its work
thread_11 ready init work step 1st.....
begin stop 2nd.....
thread_16 ready init work .....
thread_16.....continue do its work
thread_16.....continue do its work
thread_15.....continue do its work
thread_11 ready init work step 2nd.....
Main do ites work.....
BusiThread 12 do business-----
BusiThread 12 do business-----
BusiThread 12 do business-----
通過返回結果就可以很直接的看到業務線程是在初始化線程完全跑完之後, 纔開始執行的
CyclicBarrier:
CyclicBarrier, 俗稱柵欄鎖, 作用是讓一組線程到達某個屏障, 被阻塞, 一直到組內的最後一個線程到達, 然後屏障開放, 接着, 所有的線程繼續運行
這個感覺和 CountDownLatch 有點相似, 但是其實是不一樣的, 所謂的差別, 將在下面詳解
CyclicBarrier 的構造參數有兩個
/**
* Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and
* does not perform a predefined action when the barrier is tripped.
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
public CyclicBarrier(int parties) {
this(parties, null);
}
/**
* Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and which
* will execute the given barrier action when the barrier is tripped,
* performed by the last thread entering the barrier.
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @param barrierAction the command to execute when the barrier is
* tripped, or {@code null} if there is no action
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
很明顯能感覺出來, 上面的構造參數調用了下面的構造參數, 是一個構造方法重載
首先這個第一個參數也樹 Int 類型的, 傳入的是執行線程的個數, 這個數量和 CountDownLatch 不一樣, 這個數量是需要和線程數量吻合的, CountDownLatch 則不一樣, CountDownLatch 可以大於等於, 而 CyclicBarrier 只能等於, 然後是第二個參數, 第二個參數是 barrierAction, 這個參數是當屏障開放後, 執行的任務線程, 如果當屏障開放後需要執行什麼任務, 可以寫在這個線程中
主線程創建 CyclicBarrier(3,barrierAction), 然後由線程開始執行, 線程 A,B 執行完成後都調用了 await, 然後他們都在一個屏障前阻塞者, 需要等待線程 C 也, 執行完成, 調用 await 之後, 然後三個線程都達到屏障後, 屏障開放, 然後線程繼續執行, 並且 barrierAction 在屏障開放的一瞬間也開始執行
上代碼:
package org.dance.day2.util;
import org.dance.tools.SleepTools;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
/**
* CyclicBarrier的使用
*
* @author ZYGisComputer
*/
public class UseCyclicBarrier {
/**
* 存放子線程工作結果的安全容器
*/
private static ConcurrentHashMap<String, Long> resultMap = new ConcurrentHashMap<>();
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5,new CollectThread());
/**
* 結果打印線程
* 用來演示CyclicBarrier的第二個參數,barrierAction
*/
private static class CollectThread implements Runnable {
@Override
public void run() {
StringBuffer result = new StringBuffer();
for (Map.Entry<String, Long> workResult : resultMap.entrySet()) {
result.append("[" + workResult.getValue() + "]");
}
System.out.println("the result = " + result);
System.out.println("do other business.....");
}
}
/**
* 工作子線程
* 用於CyclicBarrier的一組線程
*/
private static class SubThread implements Runnable {
@Override
public void run() {
// 獲取當前線程的ID
long id = Thread.currentThread().getId();
// 放入統計容器中
resultMap.put(String.valueOf(id), id);
Random random = new Random();
try {
if (random.nextBoolean()) {
Thread.sleep(1000 + id);
System.out.println("Thread_"+id+"..... do something");
}
System.out.println(id+" is await");
cyclicBarrier.await();
Thread.sleep(1000+id);
System.out.println("Thread_"+id+".....do its business");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
for (int i = 0; i <= 4; i++) {
Thread thread = new Thread(new SubThread());
thread.start();
}
}
}
返回結果:
11 is await
14 is await
15 is await
Thread_12..... do something
12 is await
Thread_13..... do something
13 is await
the result = [11][12][13][14][15]
do other business.....
Thread_11.....do its business
Thread_12.....do its business
Thread_13.....do its business
Thread_14.....do its business
Thread_15.....do its business
通過返回結果可以看出前面的 11 14 15 三個線程沒有進入 if 語句塊, 在執行到 await 的時候進入了等待, 而另外 12 13 兩個線程進入到了 if 語句塊當中, 多休眠了 1 秒多, 然後當 5 個線程同時到達 await 的時候, 屏障開放, 執行了 barrierAction 線程, 然後線程組繼續執行
解釋一下 CountDownLatch 和 CyclicBarrier 的卻別吧!
首先就是 CountDownLatch 的構造參數傳入的數量一般都是大於等於線程, 數量的, 因爲他是有第三方控制的, 可以扣減多次, 然後就是 CyclicBarrier 的構造參數第一個參數傳入的數量一定是等於線程的個數的, 因爲他是由一組線程自身控制的
區別
CountDownLatch CyclicBarrier
控制 第三方控制 自身控制
傳入數量 大於等於線程數量 等於線程數量
Semaphore:
Semaphore, 俗稱信號量, 作用於控制同時訪問某個特定資源的線程數量, 用在流量控制
一說特定資源控制, 那麼第一時間就想到了數據庫連接..
之前用等待超時模式寫了一個數據庫連接池, 打算用這個 Semaphone 也寫一個
/**
* Creates a {@code Semaphore} with the given number of
* permits and nonfair fairness setting.
*
* @param permits the initial number of permits available.
* This value may be negative, in which case releases
* must occur before any acquires will be granted.
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
在源碼中可以看到在構建 Semaphore 信號量的時候, 需要傳入許可證的數量, 這個數量就是資源的最大允許的訪問的線程數
接下里用信號量實現一個數據庫連接池
連接對象
package org.dance.day2.util.pool;
import org.dance.tools.SleepTools;
import java.sql.*;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
/**
* 數據庫連接
* @author ZYGisComputer
*/
public class SqlConnection implements Connection {
/**
* 獲取數據庫連接
* @return
*/
public static final Connection fetchConnection(){
return new SqlConnection();
}
@Override
public void commit() throws SQLException {
SleepTools.ms(70);
}
@Override
public Statement createStatement() throws SQLException {
SleepTools.ms(1);
return null;
}
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
return null;
}
@Override
public CallableStatement prepareCall(String sql) throws SQLException {
return null;
}
@Override
public String nativeSQL(String sql) throws SQLException {
return null;
}
@Override
public void setAutoCommit(boolean autoCommit) throws SQLException {
}
@Override
public boolean getAutoCommit() throws SQLException {
return false;
}
@Override
public void rollback() throws SQLException {
}
@Override
public void close() throws SQLException {
}
@Override
public boolean isClosed() throws SQLException {
return false;
}
@Override
public DatabaseMetaData getMetaData() throws SQLException {
return null;
}
@Override
public void setReadOnly(boolean readOnly) throws SQLException {
}
@Override
public boolean isReadOnly() throws SQLException {
return false;
}
@Override
public void setCatalog(String catalog) throws SQLException {
}
@Override
public String getCatalog() throws SQLException {
return null;
}
@Override
public void setTransactionIsolation(int level) throws SQLException {
}
@Override
public int getTransactionIsolation() throws SQLException {
return 0;
}
@Override
public SQLWarning getWarnings() throws SQLException {
return null;
}
@Override
public void clearWarnings() throws SQLException {
}
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
return null;
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
return null;
}
@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
return null;
}
@Override
public Map<String, Class<?>> getTypeMap() throws SQLException {
return null;
}
@Override
public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
}
@Override
public void setHoldability(int holdability) throws SQLException {
}
@Override
public int getHoldability() throws SQLException {
return 0;
}
@Override
public Savepoint setSavepoint() throws SQLException {
return null;
}
@Override
public Savepoint setSavepoint(String name) throws SQLException {
return null;
}
@Override
public void rollback(Savepoint savepoint) throws SQLException {
}
@Override
public void releaseSavepoint(Savepoint savepoint) throws SQLException {
}
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
return null;
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
return null;
}
@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
return null;
}
@Override
public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
return null;
}
@Override
public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
return null;
}
@Override
public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
return null;
}
@Override
public Clob createClob() throws SQLException {
return null;
}
@Override
public Blob createBlob() throws SQLException {
return null;
}
@Override
public NClob createNClob() throws SQLException {
return null;
}
@Override
public SQLXML createSQLXML() throws SQLException {
return null;
}
@Override
public boolean isValid(int timeout) throws SQLException {
return false;
}
@Override
public void setClientInfo(String name, String value) throws SQLClientInfoException {
}
@Override
public void setClientInfo(Properties properties) throws SQLClientInfoException {
}
@Override
public String getClientInfo(String name) throws SQLException {
return null;
}
@Override
public Properties getClientInfo() throws SQLException {
return null;
}
@Override
public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
return null;
}
@Override
public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
return null;
}
@Override
public void setSchema(String schema) throws SQLException {
}
@Override
public String getSchema() throws SQLException {
return null;
}
@Override
public void abort(Executor executor) throws SQLException {
}
@Override
public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
}
@Override
public int getNetworkTimeout() throws SQLException {
return 0;
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
return null;
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return false;
}
}
連接池對象
package org.dance.day2.util.pool;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.Semaphore;
/**
* 使用信號量控制數據庫的鏈接和釋放
*
* @author ZYGisComputer
*/
public class DBPoolSemaphore {
/**
* 池容量
*/
private final static int POOL_SIZE = 10;
/**
* useful 代表可用連接
* useless 代表已用連接
* 爲什麼要使用兩個Semaphore呢?是因爲,在連接池中不只有連接本身是資源,空位也是資源,也需要記錄
*/
private final Semaphore useful, useless;
/**
* 連接池
*/
private final static LinkedList<Connection> POOL = new LinkedList<>();
/**
* 使用靜態塊初始化池
*/
static {
for (int i = 0; i < POOL_SIZE; i++) {
POOL.addLast(SqlConnection.fetchConnection());
}
}
public DBPoolSemaphore() {
// 初始可用的許可證等於池容量
useful = new Semaphore(POOL_SIZE);
// 初始不可用的許可證容量爲0
useless = new Semaphore(0);
}
/**
* 獲取數據庫連接
*
* @return 連接對象
*/
public Connection takeConnection() throws InterruptedException {
// 可用許可證減一
useful.acquire();
Connection connection;
synchronized (POOL) {
connection = POOL.removeFirst();
}
// 不可用許可證數量加一
useless.release();
return connection;
}
/**
* 釋放鏈接
*
* @param connection 連接對象
*/
public void returnConnection(Connection connection) throws InterruptedException {
if(null!=connection){
// 打印日誌
System.out.println("當前有"+useful.getQueueLength()+"個線程等待獲取連接,,"
+"可用連接有"+useful.availablePermits()+"個");
// 不可用許可證減一
useless.acquire();
synchronized (POOL){
POOL.addLast(connection);
}
// 可用許可證加一
useful.release();
}
}
}
測試類:
package org.dance.day2.util.pool;
import org.dance.tools.SleepTools;
import java.sql.Connection;
import java.util.Random;
/**
* 測試Semaphore
* @author ZYGisComputer
*/
public class UseSemaphore {
/**
* 連接池
*/
public static final DBPoolSemaphore pool = new DBPoolSemaphore();
private static class BusiThread extends Thread{
@Override
public void run() {
// 隨機數工具類 爲了讓每個線程持有連接的時間不一樣
Random random = new Random();
long start = System.currentTimeMillis();
try {
Connection connection = pool.takeConnection();
System.out.println("Thread_"+Thread.currentThread().getId()+
"_獲取數據庫連接耗時["+(System.currentTimeMillis()-start)+"]ms.");
// 模擬使用連接查詢數據
SleepTools.ms(100+random.nextInt(100));
System.out.println("查詢數據完成歸還連接");
pool.returnConnection(connection);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
for (int i = 0; i < 50; i++) {
BusiThread busiThread = new BusiThread();
busiThread.start();
}
}
}
測試返回結果:
Thread_11_獲取數據庫連接耗時[0]ms.
Thread_12_獲取數據庫連接耗時[0]ms.
Thread_13_獲取數據庫連接耗時[0]ms.
Thread_14_獲取數據庫連接耗時[0]ms.
Thread_15_獲取數據庫連接耗時[0]ms.
Thread_16_獲取數據庫連接耗時[0]ms.
Thread_17_獲取數據庫連接耗時[0]ms.
Thread_18_獲取數據庫連接耗時[0]ms.
Thread_19_獲取數據庫連接耗時[0]ms.
Thread_20_獲取數據庫連接耗時[0]ms.
查詢數據完成歸還連接
當前有40個線程等待獲取連接,,可用連接有0個
Thread_21_獲取數據庫連接耗時[112]ms.
查詢數據完成歸還連接
...................
查詢數據完成歸還連接
當前有2個線程等待獲取連接,,可用連接有0個
Thread_59_獲取數據庫連接耗時[637]ms.
查詢數據完成歸還連接
當前有1個線程等待獲取連接,,可用連接有0個
Thread_60_獲取數據庫連接耗時[660]ms.
查詢數據完成歸還連接
當前有0個線程等待獲取連接,,可用連接有0個
查詢數據完成歸還連接
...................
當前有0個線程等待獲取連接,,可用連接有8個
查詢數據完成歸還連接
當前有0個線程等待獲取連接,,可用連接有9個
通過執行結果可以很明確的看到, 一上來就有 10 個線程獲取到了連接,, 然後後面的 40 個線程進入阻塞, 然後只有釋放鏈接之後, 等待的線程就會有一個拿到, 然後越後面的線程等待的時間就越長, 然後一直到所有的線程執行完畢
最後打印的可用連接有九個不是因爲少了一個是因爲在釋放之前打印的, 不是錯誤
從結果中可以看到, 我們對連接池中的資源的到了控制, 這就是信號量的流量控制
Exchanger:
Exchanger, 俗稱交換器, 用於在線程之間交換數據, 但是比較受限, 因爲只能兩個線程之間交換數據
/**
* Creates a new Exchanger.
*/
public Exchanger() {
participant = new Participant();
}
這個構造函數沒有什麼好說的, 也沒有入參, 只有在創建的時候指定一下需要交換的數據的泛型即可, 下面看代碼
package org.dance.day2.util;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Exchanger;
/**
* 線程之間交換數據
* @author ZYGisComputer
*/
public class UseExchange {
private static final Exchanger<Set<String>> exchanger = new Exchanger<>();
public static void main(String[] args) {
new Thread(){
@Override
public void run() {
Set<String> aSet = new HashSet<>();
aSet.add("A");
aSet.add("B");
aSet.add("C");
try {
Set<String> exchange = exchanger.exchange(aSet);
for (String s : exchange) {
System.out.println("aSet"+s);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
new Thread(){
@Override
public void run() {
Set<String> bSet = new HashSet<>();
bSet.add("1");
bSet.add("2");
bSet.add("3");
try {
Set<String> exchange = exchanger.exchange(bSet);
for (String s : exchange) {
System.out.println("bSet"+s);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
}
}
執行結果:
bSetA
bSetB
bSetC
aSet1
aSet2
aSet3
通過執行結果可以清晰的看到, 兩個線程中的數據發生了交換, 這就是 Exchanger 的線程數據交換了
以上就是 JUC 的 4 大常用併發工具類了
轉自:彼岸舞
鏈接:www.cnblogs.com/flower-dance/p/13714006.html
推薦關注「Linux 愛好者」,提升 Linux 技能
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/KZw6t0h3GSm328XQrY6uMg