在前面的文章中,已經對 ArrayBlockingQueue 進行了一次源碼分析,對它的核心源碼做了分析,今天來解析一波同為 BlockingQueue 家族中的一員的 LinkedBlockingQueue。它的底層基于單向鏈表實現。
先看一看它的 Node 內部類和主要屬性、構造函數。
Node
static class Node< E > {
E item;
Node< E > next;
Node(E x) { item = x; }
}
Node 是 LinkedBlockingQueue 的基石。 它如第一張圖所示的一個單向鏈表形式的內部類,item 是當前節點的內容,next 指向的是下一個 Node 節點。
屬性
//容量
private final int capacity;
//隊列中元素的數量
private final AtomicInteger count = new AtomicInteger();
//鏈表的頭節點
transient Node< E > head;
//鏈表的尾節點
private transient Node< E > last;
//出隊鎖
private final ReentrantLock takeLock = new ReentrantLock();
//當隊列沒有元素了,出隊鎖的線程會加入 notEmpty 條件隊列中被阻塞,等待其它線程喚醒
private final Condition notEmpty = takeLock.newCondition();
//入隊鎖
private final ReentrantLock putLock = new ReentrantLock();
//當隊列滿了,入隊鎖的線程會加入 notFull 條件隊列中被阻塞,等待其它線程喚醒
private final Condition notFull = putLock.newCondition();
- 從屬性中就可以看出 LinkedBlockingQueue 和 ArrayBlockingQueue 的差異點: ArrayBlockingQueue 只有一把 ReentrantLock 鎖,入隊和出隊相互互斥。 LinkedBlockingQueue 分了出隊鎖 takeLock 和入隊鎖 putLock,兩把鎖相互不阻塞。
- capacity 是 LinkedBlockingQueue 的容量,表示 LinkedBlockingQueue 是一個有界隊列。
構造函數
LinkedBlockingQueue 提供了三個構造函數。
LinkedBlockingQueue()
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
LinkedBlockingQueue() 構造函數直接調用帶參數的構造函數,參數被設置為 2 的 31 次方 - 1
LinkedBlockingQueue(int capacity)
public LinkedBlockingQueue(int capacity) {
//檢查容量大小
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
//構造一個 null 節點
last = head = new Node< E >(null);
}
LinkedBlockingQueue(int capacity) 構造函數做了三件事:
- 先檢查參數是否大于 0,不大于 0 就拋出異常。
- 設置 capacity 容量為參數大小。
- 構造了一個 null 節點,賦值給 last 和 head。
- head 的 item 元素永遠是一個 null。
LinkedBlockingQueue(Collection c)
public LinkedBlockingQueue(Collection< ? extends E > c) {
//容量是最大值
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
//加鎖
putLock.lock();
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
//真實的入隊操作
enqueue(new Node< E >(e));
++n;
}
//設置元素的數量
count.set(n);
} finally {
//解鎖
putLock.unlock();
}
}
- LinkedBlockingQueue(Collection c) 調用了 LinkedBlockingQueue(int capacity) 構造函數并將參數設置成了最大值。
- putLock 加鎖后,將集合中的元素一個個遍歷并且入隊。
- 設置元素的數量就是集合中元素的數量。
- 在遍歷元素時,會將 null 元素拋出異常并且檢查隊列是否滿了。
入隊
LinkedBlockingQueue 有多種入隊方法,當隊列滿了之后它們的處理方法各不相同。在入隊之前和入隊之后的操作都是相同的。
offer
//LinkedBlockingQueue.offer()
public boolean offer(E e) {
//檢查是否為 null
if (e == null) throw new NullPointerException();
//檢查隊列是否滿了,隊列滿了返回 fasle
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
//初始化一個 size
int c = -1;
//創建一個 Node 節點
Node< E > node = new Node< E >(e);
//putLock 加鎖
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//如果沒有滿隊,則入隊
if (count.get() < capacity) {
//入隊
enqueue(node);
//返回元素的數量并且元素的數量 + 1
c = count.getAndIncrement();
//元素的數量 + 1 還沒有滿隊,還有剩余的容量空間,喚醒 notFull 隊列中因為隊列滿了而等待入隊的線程。
if (c + 1 < capacity)
notFull.signal();
}
} finally {
//解鎖
putLock.unlock();
}
//當 c == 0 表示元素入隊之前是一個空的隊列
//現在隊列不是空的了,喚醒阻塞在 notEmpty 條件隊列中因為空隊列而等待元素出隊的線程
if (c == 0)
signalNotEmpty();
return c >= 0;
}
//LinkedBlockingQueue.enqueue()
private void enqueue(Node< E > node) {
//直接加到 last 的 next 屬性中,并替換 last 屬性
last = last.next = node;
}
//LinkedBlockingQueue.signalNotEmpty()
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
offer() 方法做了以下幾件事情:
- 檢查需要入隊的元素是否為 null。
- 判斷隊列是否滿了,滿了就返回 false。
- 隊列沒有滿,創建一個新的 Node 節點。
- putLock 鎖加鎖,不讓其他線程操作隊列。
- 當隊列沒有滿隊的時候,將元素入隊并且將局部變量 c 設置為入隊之前元素的數量,元素數量 + 1。
- 再次判斷隊列是否滿了,如果隊列中還有空位,則喚醒正在阻塞的入隊線程。這些阻塞的線程來自 put()、offer(E e, long timeout, TimeUnit unit) 方法。
- 釋放 putLock 鎖
- 當入隊之前是一個空隊列的時候,調用 signalNotEmpty() 方法開啟 takeLock 出隊鎖,將阻塞在 notEmpty 條件隊列中的線程喚醒。
enqueue() 方法的源碼比較簡單,就是將 last 節點的 next 指向需要入隊的元素,如下圖所示。
add()
//LinkedBlockingQueue.add()
public boolean add(E e) {
//調用 offer()
if (offer(e))
return true;
else
//隊列滿了,就拋出異常
throw new IllegalStateException("Queue full");
}
add() 方法調用的是 offer() 方法,它在隊列滿了的時候不是返回 false 而是拋出一個 Queue full
異常。
put()
//LinkedBlockingQueue.put()
public void put(E e) throws InterruptedException {
//檢查是否為 null
if (e == null) throw new NullPointerException();
int c = -1;
Node< E > node = new Node< E >(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//putLock 加鎖
putLock.lockInterruptibly();
try {
//如果隊列滿了,就把線程加入 notFull 隊列,阻塞線程
while (count.get() == capacity) {
notFull.await();
}
//入隊
enqueue(node);
//返回元素的數量并且元素的數量 + 1
c = count.getAndIncrement();
//元素的數量 + 1 還沒有滿隊,還有剩余的容量空間,喚醒 notFull 隊列中因為隊列滿了而等待入隊的線程
if (c + 1 < capacity)
notFull.signal();
} finally {
//解鎖
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
put() 方法和 offer()、and() 的方法大致相同,不同的是對隊列滿了之后的操作,offer() 是直接返回 false,and() 是拋出異常,put() 則是將線程加入到 notFull 條件隊列中阻塞入隊線程。
offer(E e, long timeout, TimeUnit unit)
這是一個帶超時時間的 offer() 方法。
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
//檢查是否為 null
if (e == null) throw new NullPointerException();
//將 timeout 超時轉換為毫秒數
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//加鎖
putLock.lockInterruptibly();
try {
//當隊列滿了之后,等待超時時間,如果超時時間到了,還沒入隊則返回 false
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
//入隊
enqueue(new Node< E >(e));
//返回元素的數量并且元素的數量 + 1
c = count.getAndIncrement();
//元素的數量 + 1 還沒有滿隊,還有剩余的容量空間,喚醒 notFull 隊列中因為隊列滿了而等待入隊的線程
if (c + 1 < capacity)
notFull.signal();
} finally {
//解鎖
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
這個方法是在一定時間內元素等待入隊,就是在 timeout 時間內隊列中有空余位置就將元素加入單向鏈表的隊尾,如果在 timeout 時間內元素還沒有入隊,就返回 false。
入隊總結
LinkedBlockingQueue 的入隊一共有 offer()、add()、put()、offer(E e, long timeout, TimeUnit unit) 四種方法。這四種方法在隊列滿了之后的處理是不同的:
- offer() 方法在隊列滿了之后就返回 false。
- add() 方法在內部調用的是 offer() 方法,當隊列滿了就拋出
Queue full
異常。 - put() 方法在隊列滿了之后會將線程加入 notFull 中,等待被喚醒后加入隊列。
- offer(E e, long timeout, TimeUnit unit) 方法在隊列滿了之后會等待一段 timeout 的時間,在這時間內入隊就返回 true,在這段時間內未能入隊就返回 false。
- 每個方法在入隊完后都會喚醒在 notEmpty 隊列中等待出隊的線程。
出隊
LinkedBlockingQueue 的出隊也有幾種不同的方法,它們對于空隊列的處理方式各不相同。
poll()
//LinkedBlockingQueue.poll()
public E poll() {
//元素的數量
final AtomicInteger count = this.count;
//隊列中的非空檢查
if (count.get() == 0)
return null;
//初始化一個 null 對象
E x = null;
//初始化元素的個數
int c = -1;
//takeLock 出隊加鎖
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//當隊列中有元素
if (count.get() > 0) {
//出隊
x = dequeue();
//取出出隊之前的元素數量并且元素的數量 - 1
c = count.getAndDecrement();
//當隊列中還有元素,喚醒 notEmpty 條件隊列中的線程
if (c > 1)
notEmpty.signal();
}
} finally {
//解鎖
takeLock.unlock();
}
//如果出隊之前隊列是滿的,就喚醒 putLock 鎖中的 notFull 條件隊列中等待的線程
if (c == capacity)
signalNotFull();
return x;
}
//LinkedBlockingQueue.dequeue()
private E dequeue() {
//head 節點沒有存儲元素,head.next 是第一個元素
Node< E > h = head;
//取出第一個元素
Node< E > first = h.next;
//將 head 節點的 next 指向自己
h.next = h;
//將第一個元素變成新的 head
head = first;
//取出內容
E x = first.item;
first.item = null;
return x;
}
poll() 方法在出隊的時候做了以下幾件事情:
- 先取出隊列中元素的數量
- 隊列的非空檢查,當隊列是空的,則返回 false。
- 初始化一個局部的元素變量。
- takeLock 出隊鎖加鎖,不讓其他線程操作隊列的出隊。
- 當隊列中有元素的時候,將隊列中的第一個元素彈出。
- 判斷隊列中是否還有元素,喚醒阻塞在 notEmpty 條件隊列上的線程。
- takeLock 出隊鎖解鎖
- 在原隊列是滿隊的情況下,喚醒阻塞在 notFull 條件隊列上的線程。
dequeue() 方法會將 LinkedBlockingQueue 中第一個元素取出。取的并不是 head 中的item,而是 head.next 中的 item。
poll(long timeout, TimeUnit unit)
//LinkedBlockingQueue.poll(long timeout, TimeUnit unit)
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
//將超時時間轉換為毫秒數
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//takeLock 出隊加鎖
takeLock.lockInterruptibly();
try {
//隊列中是空的,沒有元素
while (count.get() == 0) {
//等待超時時間, 超時后返回 null
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
//出隊
x = dequeue();
//取出出隊之前的元素數量并且元素的數量 - 1
c = count.getAndDecrement();
//當隊列中還有元素,喚醒 notEmpty 條件隊列中的線程
if (c > 1)
notEmpty.signal();
} finally {
//解鎖
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
poll(long timeout, TimeUnit unit) 方法是在一定時間內出隊還未取到元素就阻塞線程,時間到了還沒取到元素就返回 null,并不會一直阻塞線程。
take()
//LinkedBlockingQueue.take()
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//加鎖
takeLock.lockInterruptibly();
try {
//隊列是空的,將線程加入到 notEmpty 隊列中等待并且阻塞線程
while (count.get() == 0) {
notEmpty.await();
}
//出隊
x = dequeue();
//取出出隊之前的元素數量并且元素的數量 - 1
c = count.getAndDecrement();
//當隊列中還有元素,喚醒 notEmpty 條件隊列中的線程
if (c > 1)
notEmpty.signal();
} finally {
//解鎖
takeLock.unlock();
}
//如果出隊之前隊列是滿的,就喚醒 putLock 鎖中的 notFull 條件隊列中等待的線程
if (c == capacity)
signalNotFull();
return x;
}
take() 方法和 poll() 最大的不同就是在空隊列的時候會一直阻塞線程,poll() 則返回 null,poll(long timeout, TimeUnit unit) 則在一定時間內阻塞線程,超時后返回的 null。
remove()
//LinkedBlockingQueue.remove()
public boolean remove(Object o) {
//非空檢查
if (o == null) return false;
//將入隊鎖和出隊鎖都加鎖
fullyLock();
try {
//遍歷所有元素
for (Node< E > trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
//刪除節點
unlink(p, trail);
return true;
}
}
return false;
} finally {
//將入隊鎖和出隊鎖都解鎖
fullyUnlock();
}
}
//LinkedBlockingQueue.unlink()
void unlink(Node< E > p, Node< E > trail) {
//p 是需要刪除的節點,trail 是 p 的上一個節點
p.item = null;
//將 trail.next 指向 p 的下一個節點
trail.next = p.next;
//要刪除的就是最后一個節點
if (last == p)
//將 trail 設置為最后一個節點
last = trail;
//原隊列是滿的隊列,喚醒 notFull 條件隊列中的線程
if (count.getAndDecrement() == capacity)
notFull.signal();
}
remove() 并不會彈出元素,它是刪除一個元素。遍歷整個單向鏈表,找到需要刪除的元素后,將元素前一個節點的next 指向刪除元素的 next。將需要刪除的元素設置為 null。
peek()
//LinkedBlockingQueue.peek()
public E peek() {
//非空檢查
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
//加鎖
takeLock.lock();
try {
//取出第一個元素,為 null 則返回 null
Node< E > first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
//解鎖
takeLock.unlock();
}
}
peek() 方法僅僅是取出第一個元素,沒有修改節點的任何一個 next 屬性,所以并不會將元素從隊列中移除。
出隊總結
LinkedBlockingQueue 的出隊一共有 poll()、take()、poll(long timeout, TimeUnit unit) 三種方法,移除元素用 remove() 方法,取出第一個元素用 peek() 方法。
出隊方法在遇到空隊列的時候操作不同:
- poll() 方法遇到空隊列就返回 null。
- take() 方法遇到空隊列就將線程加入到 notEmpty 條件隊列中并且阻塞線程。
- poll(long timeout, TimeUnit unit) 方法在遇到空隊列就阻塞一段時間,這期間沒獲取到元素就返回 null。
總結
- LinkedBlockingQueue 是基于單向鏈表的,線程安全的。
- 是一個有界的隊列,最大的容量是最大的 int 值。
- 出隊入隊基于兩把鎖。互不阻塞。
LinkedBlockingQueue 被用在線程池中,也可以用在生產者-消費者模式中。
-
源碼
+關注
關注
8文章
633瀏覽量
29140 -
函數
+關注
關注
3文章
4306瀏覽量
62431 -
線程
+關注
關注
0文章
504瀏覽量
19651 -
node
+關注
關注
0文章
23瀏覽量
5933
發布評論請先 登錄
相關推薦
評論