死磕java同步系列之Semaphore源碼解析

問題

(1)Semaphore是什么?

十年的赤峰林西網站建設經驗,針對設計、前端、開發(fā)、售后、文案、推廣等六對一服務,響應快,48小時及時工作處理。全網營銷推廣的優(yōu)勢是能夠根據用戶設備顯示端的尺寸不同,自動調整赤峰林西建站的顯示方式,使網站能夠適用不同顯示終端,在瀏覽器中調整網站的寬度,無論在任何一種瀏覽器上瀏覽網站,都能展現優(yōu)雅布局與設計,從而大程度地提升瀏覽體驗。創(chuàng)新互聯從事“赤峰林西網站設計”,“赤峰林西網站推廣”以來,每個客戶項目都認真落實執(zhí)行。

(2)Semaphore具有哪些特性?

(3)Semaphore通常使用在什么場景中?

(4)Semaphore的許可次數是否可以動態(tài)增減?

(5)Semaphore如何實現限流?

簡介

Semaphore,信號量,它保存了一系列的許可(permits),每次調用acquire()都將消耗一個許可,每次調用release()都將歸還一個許可。

特性

Semaphore通常用于限制同一時間對共享資源的訪問次數上,也就是常說的限流。

下面我們一起來學習Java中Semaphore是如何實現的。

類結構

Semaphore中包含了一個實現了AQS的同步器Sync,以及它的兩個子類FairSync和NonFairSync,這說明Semaphore也是區(qū)分公平模式和非公平模式的。

源碼分析

基于之前對于ReentrantLock和ReentrantReadWriteLock的分析,這篇文章相對來說比較簡單,之前講過的一些方法將直接略過,有興趣的可以拉到文章底部查看之前的文章。

內部類Sync

// java.util.concurrent.Semaphore.Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;
    // 構造方法,傳入許可次數,放入state中
    Sync(int permits) {
        setState(permits);
    }
    // 獲取許可次數
    final int getPermits() {
        return getState();
    }
    // 非公平模式嘗試獲取許可
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            // 看看還有幾個許可
            int available = getState();
            // 減去這次需要獲取的許可還剩下幾個許可
            int remaining = available - acquires;
            // 如果剩余許可小于0了則直接返回
            // 如果剩余許可不小于0,則嘗試原子更新state的值,成功了返回剩余許可
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
    // 釋放許可
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            // 看看還有幾個許可
            int current = getState();
            // 加上這次釋放的許可
            int next = current + releases;
            // 檢測溢出
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            // 如果原子更新state的值成功,就說明釋放許可成功,則返回true
            if (compareAndSetState(current, next))
                return true;
        }
    }
    // 減少許可
    final void reducePermits(int reductions) {
        for (;;) {
            // 看看還有幾個許可
            int current = getState();
            // 減去將要減少的許可
            int next = current - reductions;
            // 檢測舉出
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            // 原子更新state的值,成功了返回true
            if (compareAndSetState(current, next))
                return;
        }
    }
    // 銷毀許可
    final int drainPermits() {
        for (;;) {
            // 看看還有幾個許可
            int current = getState();
            // 如果為0,直接返回
            // 如果不為0,把state原子更新為0
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}

通過Sync的幾個實現方法,我們獲取到以下幾點信息:

(1)許可是在構造方法時傳入的;

(2)許可存放在狀態(tài)變量state中;

(3)嘗試獲取一個許可的時候,則state的值減1;

(4)當state的值為0的時候,則無法再獲取許可;

(5)釋放一個許可的時候,則state的值加1;

(6)許可的個數可以動態(tài)改變;

內部類NonfairSync

// java.util.concurrent.Semaphore.NonfairSync
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;
    // 構造方法,調用父類的構造方法
    NonfairSync(int permits) {
        super(permits);
    }
    // 嘗試獲取許可,調用父類的nonfairTryAcquireShared()方法
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

非公平模式下,直接調用父類的nonfairTryAcquireShared()嘗試獲取許可。

內部類FairSync

// java.util.concurrent.Semaphore.FairSync
static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;
    // 構造方法,調用父類的構造方法
    FairSync(int permits) {
        super(permits);
    }
    // 嘗試獲取許可
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // 公平模式需要檢測是否前面有排隊的
            // 如果有排隊的直接返回失敗
            if (hasQueuedPredecessors())
                return -1;
            // 沒有排隊的再嘗試更新state的值
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

公平模式下,先檢測前面是否有排隊的,如果有排隊的則獲取許可失敗,進入隊列排隊,否則嘗試原子更新state的值。

構造方法

// 構造方法,創(chuàng)建時要傳入許可次數,默認使用非公平模式
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
// 構造方法,需要傳入許可次數,及是否公平模式
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

創(chuàng)建Semaphore時需要傳入許可次數。

Semaphore默認也是非公平模式,但是你可以調用第二個構造方法聲明其為公平模式。

下面的方法在學習過前面的內容看來都比較簡單,彤哥這里只列舉Semaphore支持的一些功能了。

以下的方法都是針對非公平模式來描述。

acquire()方法

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

獲取一個許可,默認使用的是可中斷方式,如果嘗試獲取許可失敗,會進入AQS的隊列中排隊。

acquireUninterruptibly()方法

public void acquireUninterruptibly() {
    sync.acquireShared(1);
}

獲取一個許可,非中斷方式,如果嘗試獲取許可失敗,會進入AQS的隊列中排隊。

tryAcquire()方法

public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}

嘗試獲取一個許可,使用Sync的非公平模式嘗試獲取許可方法,不論是否獲取到許可都返回,只嘗試一次,不會進入隊列排隊。

tryAcquire(long timeout, TimeUnit unit)方法

public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

嘗試獲取一個許可,先嘗試一次獲取許可,如果失敗則會等待timeout時間,這段時間內都沒有獲取到許可,則返回false,否則返回true;

release()方法

public void release() {
    sync.releaseShared(1);
}

釋放一個許可,釋放一個許可時state的值會加1,并且會喚醒下一個等待獲取許可的線程。

acquire(int permits)方法

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

一次獲取多個許可,可中斷方式。

acquireUninterruptibly(int permits)方法

public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}

一次獲取多個許可,非中斷方式。

tryAcquire(int permits)方法

public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;
}

一次嘗試獲取多個許可,只嘗試一次。

tryAcquire(int permits, long timeout, TimeUnit unit)方法

public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

嘗試獲取多個許可,并會等待timeout時間,這段時間沒獲取到許可則返回false,否則返回true。

release(int permits)方法

public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

一次釋放多個許可,state的值會相應增加permits的數量。

availablePermits()方法

public int availablePermits() {
    return sync.getPermits();
}

獲取可用的許可次數。

drainPermits()方法

public int drainPermits() {
    return sync.drainPermits();
}

銷毀當前可用的許可次數,對于已經獲取的許可沒有影響,會把當前剩余的許可全部銷毀。

reducePermits(int reduction)方法

protected void reducePermits(int reduction) {
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);
}

減少許可的次數。

總結

(1)Semaphore,也叫信號量,通常用于控制同一時刻對共享資源的訪問上,也就是限流場景;

(2)Semaphore的內部實現是基于AQS的共享鎖來實現的;

(3)Semaphore初始化的時候需要指定許可的次數,許可的次數是存儲在state中;

(4)獲取一個許可時,則state值減1;

(5)釋放一個許可時,則state值加1;

(6)可以動態(tài)減少n個許可;

(7)可以動態(tài)增加n個許可嗎?

彩蛋

(1)如何動態(tài)增加n個許可?

答:調用release(int permits)即可。我們知道釋放許可的時候state的值會相應增加,再回頭看看釋放許可的源碼,發(fā)現與ReentrantLock的釋放鎖還是有點區(qū)別的,Semaphore釋放許可的時候并不會檢查當前線程有沒有獲取過許可,所以可以調用釋放許可的方法動態(tài)增加一些許可。

(2)如何實現限流?

答:限流,即在流量突然增大的時候,上層要能夠限制住突然的大流量對下游服務的沖擊,在分布式系統中限流一般做在網關層,當然在個別功能中也可以自己簡單地來限流,比如秒殺場景,假如只有10個商品需要秒殺,那么,服務本身可以限制同時只進來100個請求,其它請求全部作廢,這樣服務的壓力也不會太大。

使用Semaphore就可以直接針對這個功能來限流,以下是代碼實現:

public class SemaphoreTest {
    public static final Semaphore SEMAPHORE = new Semaphore(100);
    public static final AtomicInteger failCount = new AtomicInteger(0);
    public static final AtomicInteger successCount = new AtomicInteger(0);

    public static void main(String[] args) {
        for (int i = 0; i < 1000; i++) {
            new Thread(()->seckill()).start();
        }
    }

    public static boolean seckill() {
        if (!SEMAPHORE.tryAcquire()) {
            System.out.println("no permits, count="+failCount.incrementAndGet());
            return false;
        }

        try {
            // 處理業(yè)務邏輯
            Thread.sleep(2000);
            System.out.println("seckill success, count="+successCount.incrementAndGet());
        } catch (InterruptedException e) {
            // todo 處理異常
            e.printStackTrace();
        } finally {
            SEMAPHORE.release();
        }
        return true;
    }
}

推薦閱讀

1、 死磕 java同步系列之開篇

2、 死磕 java魔法類之Unsafe解析

3、 死磕 java同步系列之JMM(Java Memory Model)

4、 死磕 java同步系列之volatile解析

5、 死磕 java同步系列之synchronized解析

6、 死磕 java同步系列之自己動手寫一個鎖Lock

7、 死磕 java同步系列之AQS起篇

8、 死磕 java同步系列之ReentrantLock源碼解析(一)——公平鎖、非公平鎖

9、 死磕 java同步系列之ReentrantLock源碼解析(二)——條件鎖

10、 死磕 java同步系列之ReentrantLock VS synchronized

11、 死磕 java同步系列之ReentrantReadWriteLock源碼解析

歡迎關注我的公眾號“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。

分享名稱:死磕java同步系列之Semaphore源碼解析
文章來源:http://bm7419.com/article24/ijpoje.html

成都網站建設公司_創(chuàng)新互聯,為您提供網站制作App開發(fā)、網站設計公司、定制網站網站維護、網站建設

廣告

聲明:本網站發(fā)布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯

成都定制網站網頁設計