這篇文章主要為大家展示了“Java多線程中不同條件下編寫生產(chǎn)消費(fèi)者模型的示例分析”,內(nèi)容簡(jiǎn)而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“Java多線程中不同條件下編寫生產(chǎn)消費(fèi)者模型的示例分析”這篇文章吧。
創(chuàng)新互聯(lián)網(wǎng)站建設(shè)提供從項(xiàng)目策劃、軟件開(kāi)發(fā),軟件安全維護(hù)、網(wǎng)站優(yōu)化(SEO)、網(wǎng)站分析、效果評(píng)估等整套的建站服務(wù),主營(yíng)業(yè)務(wù)為做網(wǎng)站、成都網(wǎng)站設(shè)計(jì),重慶APP軟件開(kāi)發(fā)以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專業(yè)、用心的態(tài)度為用戶提供真誠(chéng)的服務(wù)。創(chuàng)新互聯(lián)深信只要達(dá)到每一位用戶的要求,就會(huì)得到認(rèn)可,從而選擇與我們長(zhǎng)期合作。這樣,我們也可以走得更遠(yuǎn)!
生產(chǎn)消費(fèi)者模型
生產(chǎn)者消費(fèi)者模型具體來(lái)講,就是在一個(gè)系統(tǒng)中,存在生產(chǎn)者和消費(fèi)者兩種角色,他們通過(guò)內(nèi)存緩沖區(qū)進(jìn)行通信,生產(chǎn)者生產(chǎn)消費(fèi)者需要的資料,消費(fèi)者把資料做成產(chǎn)品。生產(chǎn)消費(fèi)者模式如下圖。
定義商品類
package demo; /*定義商品*/ public class Goods { public final String name; public final int price; public final int id; public Goods(String name, int price, int id){ this.name = name; /*類型*/ this.price = price; /*價(jià)格*/ this.id = id; /*商品序列號(hào)*/ } @Override public String toString(){ return "name: " + name + ", price:"+ price + ", id: " + id; } }
基本要求:
1)生產(chǎn)者不能重復(fù)生產(chǎn)一個(gè)商品,也就是說(shuō)不能有兩個(gè)id相同的商品
2)生產(chǎn)者不能覆蓋一個(gè)商品(當(dāng)前商品還未被消費(fèi),就被下一個(gè)新商品覆蓋)。也就是說(shuō)消費(fèi)商品時(shí),商品的id屬性可以不連續(xù),但不能出現(xiàn)缺號(hào)的情況
3)消費(fèi)者不能重復(fù)消費(fèi)一個(gè)商品
1.生產(chǎn)者線程無(wú)線生產(chǎn),消費(fèi)者線程無(wú)限消費(fèi)的模式
1.1使用線程對(duì)象,一個(gè)生產(chǎn)者線程,一個(gè)消費(fèi)者線程,一個(gè)商品存儲(chǔ)位置
package demo; import java.util.Random; /*使用線程對(duì)象,一個(gè)緩存位置,一個(gè)生產(chǎn)者,一個(gè)消費(fèi)者,無(wú)限生產(chǎn)商品消費(fèi)商品*/ public class ProducterComsumerDemo1 { /*定義一個(gè)商品緩存位置*/ private volatile Goods goods; /*定義一個(gè)對(duì)象作為鎖,不使用goods作為鎖是因?yàn)樯a(chǎn)者每次會(huì)產(chǎn)生一個(gè)新的對(duì)象*/ private Object obj = new Object(); /*isFull == true 生產(chǎn)者線程休息,消費(fèi)者線程消費(fèi) *isFull == false 消費(fèi)者線程休息,生產(chǎn)者線程生產(chǎn)*/ private volatile Boolean isFull = false; /*商品的id編號(hào),生產(chǎn)者制造的每個(gè)商品的id都不一樣,每生產(chǎn)一個(gè)id自增1*/ private int id = 1; /*隨機(jī)產(chǎn)生一個(gè)sleep時(shí)間*/ private Random rnd = new Random(); /*=================定義消費(fèi)者線程==================*/ public class ComsumeThread implements Runnable{ @Override public void run(){ try{ while(true){ /*獲取obj對(duì)象的鎖, id 和 isFull 的操作都在同步代碼塊中*/ synchronized(obj){ if(!isFull){ /*wait方法使當(dāng)前線程阻塞,并釋放鎖*/ obj.wait(); } /*隨機(jī)延時(shí)一段時(shí)間*/ Thread.sleep(rnd.nextint(250)); /*模擬消費(fèi)商品*/ System.out.println(goods); /*隨機(jī)延時(shí)一段時(shí)間*/ Thread.sleep(rnd.nextint(250)); isFull = false; /*喚醒阻塞obj上的生產(chǎn)者線程*/ obj.notify(); } /*隨機(jī)延時(shí)一段時(shí)間*/ Thread.sleep(rnd.nextint(250)); } } catch (InterruptedException e){ /*什么都不做*/ } } } /*=================定義生產(chǎn)者線程==================*/ public class ProductThread implements Runnable{ @Override public void run(){ try { while(true){ synchronized(obj){ if(isFull){ obj.wait(); } Thread.sleep(rnd.nextint(500)); /*如果id為偶數(shù),生產(chǎn)價(jià)格為2的產(chǎn)品A *如果id為奇數(shù),生產(chǎn)價(jià)格為1的產(chǎn)品B*/ if(id % 2 == 0){ goods = new Goods("A", 2, id); } else{ goods = new Goods("B", 1, id); } Thread.sleep(rnd.nextint(250)); id++; isFull = true; /*喚醒阻塞的消費(fèi)者線程*/ obj.notify(); } } } catch (InterruptedException e) { /*什么都不做*/ } } } public static void main(String[] args) throws InterruptedException{ ProducterComsumerDemo1 pcd = new ProducterComsumerDemo1(); Runnable c = pcd.new ComsumeThread(); Runnable p = pcd.new ProductThread(); new Thread(p).start(); new Thread(c).start(); } }
運(yùn)行結(jié)果
name: B, price:1, id: 1 name: A, price:2, id: 2 name: B, price:1, id: 3 name: A, price:2, id: 4 name: B, price:1, id: 5 name: A, price:2, id: 6 name: B, price:1, id: 7 name: A, price:2, id: 8 name: B, price:1, id: 9 name: A, price:2, id: 10 name: B, price:1, id: 11 name: A, price:2, id: 12 name: B, price:1, id: 13 ……
從結(jié)果看出,商品類型交替生產(chǎn),每個(gè)商品的id都不相同,且不會(huì)漏過(guò)任何一個(gè)id,生產(chǎn)者沒(méi)有重復(fù)生產(chǎn),消費(fèi)者沒(méi)有重復(fù)消費(fèi),結(jié)果完全正確。
1.2.使用線程對(duì)象,多個(gè)生產(chǎn)者線程,多個(gè)消費(fèi)者線程,1個(gè)緩存位置
1.2.1一個(gè)經(jīng)典的bug
對(duì)于多生產(chǎn)者,多消費(fèi)者這個(gè)問(wèn)題,看起來(lái)我們似乎不用修改代碼,只需在main方法中多添加幾個(gè)線程就好。假設(shè)我們需要三個(gè)消費(fèi)者,一個(gè)生產(chǎn)者,那么我們只需要在main方法中再添加兩個(gè)消費(fèi)者線程。
public static void main(String[] args) throws InterruptedException{ ProducterComsumerDemo1 pcd = new ProducterComsumerDemo1(); Runnable c = pcd.new ComsumeThread(); Runnable p = pcd.new ProductThread(); new Thread(c).start(); new Thread(p).start(); new Thread(c).start(); new Thread(c).start(); }
運(yùn)行結(jié)果
name: B, price:1, id: 1 name: A, price:2, id: 2 name: A, price:2, id: 2 name: B, price:1, id: 3 name: B, price:1, id: 3 name: A, price:2, id: 4 name: A, price:2, id: 4 name: B, price:1, id: 5 name: B, price:1, id: 5 name: A, price:2, id: 6 ……
從結(jié)果中,我們發(fā)現(xiàn)消費(fèi)者重復(fù)消費(fèi)了商品,所以這樣做顯然是錯(cuò)誤的。這里我們定義多個(gè)消費(fèi)者,一個(gè)生產(chǎn)者,所以遇到了重復(fù)消費(fèi)的問(wèn)題,如果定義成一個(gè)消費(fèi)者,多個(gè)生產(chǎn)者就會(huì)遇到id覆蓋的問(wèn)題。如果我們定義多個(gè)消費(fèi)者,多個(gè)生產(chǎn)者,那么即會(huì)遇到重復(fù)消費(fèi),也會(huì)遇到id覆蓋的問(wèn)題。注意,上面的代碼使用的notifyAll喚醒方法,如果使用notify方法喚醒bug仍然可能發(fā)生。
現(xiàn)在我們來(lái)分析一下原因。當(dāng)生產(chǎn)者生產(chǎn)好了商品,會(huì)喚醒因沒(méi)有商品而阻塞消費(fèi)者線程,假設(shè)喚醒的消費(fèi)者線程超過(guò)兩個(gè),這兩個(gè)線程會(huì)競(jìng)爭(zhēng)獲取鎖,獲取到鎖的線程就會(huì)從obj.wait()方法中返回,然后消費(fèi)商品,并把isFull置為false,然后釋放鎖。當(dāng)被喚醒的另一個(gè)線程競(jìng)爭(zhēng)獲取到鎖了以后也會(huì)從obj.wait()方法中返回。會(huì)再次消費(fèi)同一個(gè)商品。顯然,每一個(gè)被喚醒的線程應(yīng)該再次檢查isFull這個(gè)條件。所以無(wú)論是消費(fèi)者,還是生產(chǎn)者,isFull的判斷必須改成while循環(huán),這樣才能得到正確的結(jié)果而不受生產(chǎn)者的線程數(shù)和消費(fèi)者的線程數(shù)的影響。
而對(duì)于只有一個(gè)生產(chǎn)者線程,一個(gè)消費(fèi)者線程,用if判斷是沒(méi)有問(wèn)題的,但是仍然強(qiáng)烈建議改成while語(yǔ)句進(jìn)行判斷。
1.2.2正確的姿勢(shì)
package demo; import java.util.Random; /*使用線程對(duì)象,一個(gè)緩存位置,一個(gè)生產(chǎn)者,一個(gè)消費(fèi)者,無(wú)限生產(chǎn)商品消費(fèi)商品*/ public class ProducterComsumerDemo1 { /*定義一個(gè)商品緩存位置*/ private volatile Goods goods; /*定義一個(gè)對(duì)象作為鎖,不使用goods作為鎖是因?yàn)樯a(chǎn)者每次會(huì)產(chǎn)生一個(gè)新的對(duì)象*/ private Object obj = new Object(); /*isFull == true 生產(chǎn)者線程休息,消費(fèi)者線程消費(fèi) *isFull == false 消費(fèi)者線程消費(fèi),生產(chǎn)者線程生產(chǎn)*/ private volatile Boolean isFull = false; /*商品的id編號(hào),生產(chǎn)者制造的每個(gè)商品的id都不一樣,每生產(chǎn)一個(gè)id自增1*/ private int id = 1; /*隨機(jī)產(chǎn)生一個(gè)sleep時(shí)間*/ private Random rnd = new Random(); /*=================定義消費(fèi)者線程==================*/ public class ComsumeThread implements Runnable{ @Override public void run(){ try{ while(true){ /*獲取obj對(duì)象的鎖, id 和 isFull 的操作都在同步代碼塊中*/ synchronized(obj){ while(!isFull){ /*wait方法使當(dāng)前線程阻塞,并釋放鎖*/ obj.wait(); } /*隨機(jī)延時(shí)一段時(shí)間*/ Thread.sleep(rnd.nextint(250)); /*模擬消費(fèi)商品*/ System.out.println(goods); /*隨機(jī)延時(shí)一段時(shí)間*/ Thread.sleep(rnd.nextint(250)); isFull = false; /*喚醒阻塞obj上的生產(chǎn)者線程*/ obj.notifyAll(); } /*隨機(jī)延時(shí)一段時(shí)間*/ Thread.sleep(rnd.nextint(250)); } } catch (InterruptedException e){ /*我就是任性,這里什么都不做*/ } } } /*=================定義生產(chǎn)者線程==================*/ public class ProductThread implements Runnable{ @Override public void run(){ try { while(true){ synchronized(obj){ while(isFull){ obj.wait(); } Thread.sleep(rnd.nextint(500)); /*如果id為偶數(shù),生產(chǎn)價(jià)格為2的產(chǎn)品A 如果id為奇數(shù),生產(chǎn)價(jià)格為1的產(chǎn)品B*/ if(id % 2 == 0){ goods = new Goods("A", 2, id); } else{ goods = new Goods("B", 1, id); } Thread.sleep(rnd.nextint(250)); id++; isFull = true; /*喚醒阻塞的消費(fèi)者線程*/ obj.notifyAll(); } } } catch (InterruptedException e) { /*我就是任性,這里什么都不做*/ } } } public static void main(String[] args) throws InterruptedException{ ProducterComsumerDemo1 pcd = new ProducterComsumerDemo1(); Runnable c = pcd.new ComsumeThread(); Runnable p = pcd.new ProductThread(); new Thread(p).start(); new Thread(p).start(); new Thread(p).start(); new Thread(c).start(); new Thread(c).start(); new Thread(c).start(); } }
1.3使用線程對(duì)象,多個(gè)緩存位置(有界),多生產(chǎn)者,多消費(fèi)者
1)當(dāng)緩存位置滿時(shí),我們應(yīng)該阻塞生產(chǎn)者線程
2)當(dāng)緩存位置空時(shí),我們應(yīng)該阻塞消費(fèi)者線程
下面的代碼我沒(méi)有用java對(duì)象內(nèi)置的鎖,而是用了ReentrantLock對(duì)象。是因?yàn)槠胀▽?duì)象的鎖只有一個(gè)阻塞隊(duì)列,如果使用notify方式,無(wú)法保證喚醒的就是特定類型的線程(消費(fèi)者線程或生產(chǎn)者線程),而notifyAll方法會(huì)喚醒所有的線程,當(dāng)剩余的緩存商品的數(shù)量小于生產(chǎn)者線程數(shù)量或已緩存商品的數(shù)量小于消費(fèi)者線程時(shí)效率就比較低。所以這里我們通過(guò)ReentrantLock對(duì)象構(gòu)造兩個(gè)阻塞隊(duì)列提高效率。
1.3.1普通方式
package demo; import java.util.LinkedList; import java.util.Random; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /*使用線程對(duì)象,多個(gè)緩存位置(有界),多生產(chǎn)者,多消費(fèi)者,無(wú)限循環(huán)模式*/ public class ProducterComsumerDemo2 { /*最大緩存商品數(shù)*/ private final int MAX_SLOT = 2; /*定義緩存商品的容器*/ private LinkedList<Goods> queue = new LinkedList<Goods>(); /*定義線程鎖和鎖對(duì)應(yīng)的阻塞隊(duì)列*/ private Lock lock = new ReentrantLock(); private Condition full = lock.newCondition(); private Condition empty = lock.newCondition(); /*商品的id編號(hào),生產(chǎn)者制造的每個(gè)商品的id都不一樣,每生產(chǎn)一個(gè)id自增1*/ private int id = 1; /*隨機(jī)產(chǎn)生一個(gè)sleep時(shí)間*/ private Random rnd = new Random(); /*=================定義消費(fèi)者線程==================*/ public class ComsumeThread implements Runnable{ @Override public void run(){ while(true){ /*加鎖,queue的出列操作都在同步代碼塊中*/ lock.lock(); try { while(queue.isEmpty()){ System.out.println("queue is empty"); empty.await(); } /*隨機(jī)延時(shí)一段時(shí)間*/ Thread.sleep(rnd.nextint(200)); /*模擬消費(fèi)商品*/ Goods goods = queue.remove(); System.out.println(goods); /*隨機(jī)延時(shí)一段時(shí)間*/ Thread.sleep(rnd.nextint(200)); /*喚醒阻塞的生產(chǎn)者線程*/ full.signal(); } catch (InterruptedException e) { /*什么都不做*/ } finally{ lock.unlock(); } /*釋放鎖后隨機(jī)延時(shí)一段時(shí)間*/ try { Thread.sleep(rnd.nextint(200)); } catch (InterruptedException e) { /*什么都不做*/ } } } } /*=================定義生產(chǎn)者線程==================*/ public class ProductThread implements Runnable{ @Override public void run(){ while(true){ /*加鎖,queue的入列操作,id操作都在同步代碼塊中*/ lock.lock(); try{ while(queue.size() == MAX_SLOT){ System.out.println("queue is full"); full.await(); } Thread.sleep(rnd.nextint(200)); Goods goods = null; /*根據(jù)序號(hào)產(chǎn)生不同的商品*/ switch(id%3){ case 0 : goods = new Goods("A", 1, id); break; case 1 : goods = new Goods("B", 2, id); break; case 2 : goods = new Goods("C", 3, id); break; } Thread.sleep(rnd.nextint(200)); queue.add(goods); id++; /*喚醒阻塞的消費(fèi)者線程*/ empty.signal(); } catch(InterruptedException e){ /*什么都不做*/ } finally{ lock.unlock(); } /*釋放鎖后隨機(jī)延時(shí)一段時(shí)間*/ try { Thread.sleep(rnd.nextint(100)); } catch (InterruptedException e) { /*什么都不做*/ } } } } /*=================main==================*/ public static void main(String[] args) throws InterruptedException{ ProducterComsumerDemo2 pcd = new ProducterComsumerDemo2(); Runnable c = pcd.new ComsumeThread(); Runnable p = pcd.new ProductThread(); /*兩個(gè)生產(chǎn)者線程,兩個(gè)消費(fèi)者線程*/ new Thread(p).start(); new Thread(p).start(); new Thread(c).start(); new Thread(c).start(); } }
運(yùn)行結(jié)果
queue is empty queue is empty name: B, price:2, id: 1 name: C, price:3, id: 2 name: A, price:1, id: 3 queue is full name: B, price:2, id: 4 name: C, price:3, id: 5 queue is full name: A, price:1, id: 6 name: B, price:2, id: 7 name: C, price:3, id: 8 name: A, price:1, id: 9 name: B, price:2, id: 10 name: C, price:3, id: 11 name: A, price:1, id: 12 name: B, price:2, id: 13 name: C, price:3, id: 14 ……
1.3.2 更優(yōu)雅的實(shí)現(xiàn)方式
下面使用線程池(ThreadPool)和阻塞隊(duì)列(LinkedBlockingQueue)原子類(AtomicInteger)以更加優(yōu)雅的方式實(shí)現(xiàn)上述功能。LinkedBlockingQueue阻塞隊(duì)列僅在take和put方法上鎖,所以id必須定義為原子類。
package demo; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; /*使用線程對(duì)象,多個(gè)緩存位置(有界),多生產(chǎn)者,多消費(fèi)者,無(wú)限循環(huán)模式*/ public class ProducterComsumerDemo4 { /*最大緩存商品數(shù)*/ private final int MAX_SLOT = 3; /*定義緩存商品的容器*/ private LinkedBlockingQueue<Goods> queue = new LinkedBlockingQueue<Goods>(MAX_SLOT); /*商品的id編號(hào),生產(chǎn)者制造的每個(gè)商品的id都不一樣,每生產(chǎn)一個(gè)id自增1*/ private AtomicInteger id = new AtomicInteger(1); /*隨機(jī)產(chǎn)生一個(gè)sleep時(shí)間*/ private Random rnd = new Random(); /*=================定義消費(fèi)者線程==================*/ public class ComsumeThread implements Runnable{ @Override public void run(){ while(true){ try { /*隨機(jī)延時(shí)一段時(shí)間*/ Thread.sleep(rnd.nextint(200)); /*模擬消費(fèi)商品*/ Goods goods = queue.take(); System.out.println(goods); /*隨機(jī)延時(shí)一段時(shí)間*/ Thread.sleep(rnd.nextint(200)); } catch (InterruptedException e) { /*什么都不做*/ } } } } /*=================定義生產(chǎn)者線程==================*/ public class ProductThread implements Runnable{ @Override public void run(){ while(true){ try{ int x = id.getAndIncrement(); Goods goods = null; Thread.sleep(rnd.nextint(200)); /*根據(jù)序號(hào)產(chǎn)生不同的商品*/ switch(x%3){ case 0 : goods = new Goods("A", 1, x); break; case 1 : goods = new Goods("B", 2, x); break; case 2 : goods = new Goods("C", 3, x); break; } Thread.sleep(rnd.nextint(200)); queue.put(goods); Thread.sleep(rnd.nextint(100)); } catch(InterruptedException e){ /*什么都不做*/ } } } } /*=================main==================*/ public static void main(String[] args) throws InterruptedException{ ProducterComsumerDemo4 pcd = new ProducterComsumerDemo4(); Runnable c = pcd.new ComsumeThread(); Runnable p = pcd.new ProductThread(); /*定義線程池*/ ExecutorService es = Executors.newCachedThreadPool(); /*三個(gè)生產(chǎn)者線程,兩個(gè)消費(fèi)者線程*/ es.execute(p); es.execute(p); es.execute(p); es.execute(c); es.execute(c); es.shutdown(); } }
2.有限商品個(gè)數(shù)
這個(gè)問(wèn)題顯然比上面的問(wèn)題要復(fù)雜不少,原因在于要保證緩存區(qū)的商品要全部消費(fèi)掉,沒(méi)有重復(fù)消費(fèi)商品,沒(méi)有覆蓋商品,同時(shí)還要保證所有線程能夠正常結(jié)束,防止存在一直阻塞的線程。
2.1使用線程對(duì)象,多個(gè)緩存位置(有界),多生產(chǎn)者,多消費(fèi)者
思路定義一下三個(gè)變量
/*需要生產(chǎn)的總商品數(shù)*/ private final int TOTAL_NUM = 30; /*已產(chǎn)生的數(shù)量*/ private volatile int productNum = 0; /*已消耗的商品數(shù)*/ private volatile int comsumedNum = 0;
每生產(chǎn)一個(gè)商品 productNum 自增1,直到TOTAL_NUM為止,如果不滿足條件 productNum < TOTAL_NUM 則結(jié)束進(jìn)程,自增操作必須在full.await()方法調(diào)用之前,防止生產(chǎn)者線程無(wú)法喚醒。
同理,每消費(fèi)一個(gè)商品 comsumedNum 自增1,直到TOTAL_NUM為止,如果不滿足條件 comsumedNum < TOTAL_NUM 則結(jié)束進(jìn)程,自增操作必須在empty.await()方法調(diào)用之前,防止消費(fèi)者線程無(wú)法喚醒。
comsumedNum和productNum相當(dāng)于計(jì)劃經(jīng)濟(jì)時(shí)代的糧票一樣,有了它能夠保證生產(chǎn)者線程在喚醒后一定需要生產(chǎn)一個(gè)商品,消費(fèi)者線程在喚醒以后一定能夠消費(fèi)一個(gè)商品
package demo; import java.util.LinkedList; import java.util.Random; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /*使用線程對(duì)象,多個(gè)緩存位置(有界),多生產(chǎn)者,多消費(fèi)者, 有限商品個(gè)數(shù)*/ public class ProducterComsumerDemo3 { /*需要生產(chǎn)的總商品數(shù)*/ private final int TOTAL_NUM = 30; /*已產(chǎn)生的數(shù)量*/ private volatile int productNum = 0; /*已消耗的商品數(shù)*/ private volatile int comsumedNum = 0; /*最大緩存商品數(shù)*/ private final int MAX_SLOT = 2; /*定義線程公用的鎖和條件*/ private Lock lock = new ReentrantLock(); private Condition full = lock.newCondition(); private Condition empty = lock.newCondition(); /*定義緩存商品的容器*/ private LinkedList<Goods> queue = new LinkedList<Goods>(); /*商品的id編號(hào),生產(chǎn)者制造的每個(gè)商品的id都不一樣,每生產(chǎn)一個(gè)id自增1*/ private int id = 1; /*隨機(jī)產(chǎn)生一個(gè)sleep時(shí)間*/ private Random rnd = new Random(); /*=================定義消費(fèi)者線程==================*/ public class ComsumeThread implements Runnable{ @Override public void run(){ while(true){ /*加鎖, id、comsumedNum 操作都在同步代碼塊中*/ lock.lock(); try { /*隨機(jī)延時(shí)一段時(shí)間*/ Thread.sleep(rnd.nextint(250)); if(comsumedNum < TOTAL_NUM){ comsumedNum++; } else{ /*這里會(huì)自動(dòng)執(zhí)行finally的語(yǔ)句,釋放鎖*/ break; } while(queue.isEmpty()){ System.out.println("queue is empty"); empty.await(); } /*隨機(jī)延時(shí)一段時(shí)間*/ Thread.sleep(rnd.nextint(250)); /*模擬消費(fèi)商品*/ Goods goods = queue.remove(); System.out.println(goods); /*隨機(jī)延時(shí)一段時(shí)間*/ Thread.sleep(rnd.nextint(250)); /*喚醒阻塞的生產(chǎn)者線程*/ full.signal(); } catch (InterruptedException e) { } finally{ lock.unlock(); } /*釋放鎖后,隨機(jī)延時(shí)一段時(shí)間*/ try { Thread.sleep(rnd.nextint(250)); } catch (InterruptedException e) { } } System.out.println( "customer " + Thread.currentThread().getName() + " is over"); } } /*=================定義生產(chǎn)者線程==================*/ public class ProductThread implements Runnable{ @Override public void run(){ while(true){ lock.lock(); try{ /*隨機(jī)延時(shí)一段時(shí)間*/ Thread.sleep(rnd.nextint(250)); if(productNum < TOTAL_NUM){ productNum++; } else{ /*這里會(huì)自動(dòng)執(zhí)行finally的語(yǔ)句,釋放鎖*/ break; } Thread.sleep(rnd.nextint(250)); while(queue.size() == MAX_SLOT){ System.out.println("queue is full"); full.await(); } Thread.sleep(rnd.nextint(250)); Goods goods = null; /*根據(jù)序號(hào)產(chǎn)生不同的商品*/ switch(id%3){ case 0 : goods = new Goods("A", 1, id); break; case 1 : goods = new Goods("B", 2, id); break; case 2 : goods = new Goods("C", 3, id); break; } queue.add(goods); id++; /*喚醒阻塞的消費(fèi)者線程*/ empty.signal(); } catch(InterruptedException e){ } finally{ lock.unlock(); } /*釋放鎖后,隨機(jī)延時(shí)一段時(shí)間*/ try { Thread.sleep(rnd.nextint(250)); } catch (InterruptedException e) { /*什么都不做*/ } } System.out.println( "producter " + Thread.currentThread().getName() + " is over"); } } /*=================main==================*/ public static void main(String[] args) throws InterruptedException{ ProducterComsumerDemo3 pcd = new ProducterComsumerDemo3(); ComsumeThread c = pcd.new ComsumeThread(); ProductThread p = pcd.new ProductThread(); new Thread(p).start(); new Thread(p).start(); new Thread(p).start(); new Thread(c).start(); new Thread(c).start(); new Thread(c).start(); System.out.println("main Thread is over"); } }
2.2利用線程池,原子類,阻塞隊(duì)列,以更優(yōu)雅的方式實(shí)現(xiàn)
LinkedBlockingQueue阻塞隊(duì)列僅在take和put方法上鎖,所以productNum和comsumedNum必須定義為原子類。
package demo; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; /*使用線程池,多個(gè)緩存位置(有界),多生產(chǎn)者,多消費(fèi)者, 有限商品個(gè)數(shù)*/ public class LinkedBlockingQueueDemo { /*需要生產(chǎn)的總商品數(shù)*/ private final int TOTAL_NUM = 20; /*已產(chǎn)生商品的數(shù)量*/ volatile AtomicInteger productNum = new AtomicInteger(0); /*已消耗的商品數(shù)*/ volatile AtomicInteger comsumedNum = new AtomicInteger(0); /*最大緩存商品數(shù)*/ private final int MAX_SLOT = 5; /*同步阻塞隊(duì)列,隊(duì)列容量為MAX_SLOT*/ private LinkedBlockingQueue<Goods> lbq = new LinkedBlockingQueue<Goods>(MAX_SLOT); /*隨機(jī)數(shù)*/ private Random rnd = new Random(); /*pn表示產(chǎn)品的編號(hào),產(chǎn)品編號(hào)從1開(kāi)始*/ private volatile AtomicInteger pn = new AtomicInteger(1); /*=================定義消費(fèi)者線程==================*/ public class CustomerThread implements Runnable{ @Override public void run(){ while(comsumedNum.getAndIncrement() < TOTAL_NUM){ try{ /*隨機(jī)延時(shí)一段時(shí)間*/ Thread.sleep(rnd.nextint(500)); /*從隊(duì)列中取出商品,隊(duì)列空時(shí)發(fā)生阻塞*/ Goods goods = lbq.take(); /*隨機(jī)延時(shí)一段時(shí)間*/ Thread.sleep(rnd.nextint(500)); /*模擬消耗商品*/ System.out.println(goods); /*隨機(jī)延時(shí)一段時(shí)間*/ Thread.sleep(rnd.nextint(500)); } catch(InterruptedException e){ } } System.out.println( "customer " + Thread.currentThread().getName() + " is over"); } } /*=================定義生產(chǎn)者線程==================*/ public class ProducerThread implements Runnable{ @Override public void run(){ while(productNum.getAndIncrement() < TOTAL_NUM){ try { int x = pn.getAndIncrement(); Goods goods = null; /*根據(jù)序號(hào)產(chǎn)生不同的商品*/ switch(x%3){ case 0 : goods = new Goods("A", 1, x); break; case 1 : goods = new Goods("B", 2, x); break; case 2 : goods = new Goods("C", 3, x); break; } /*隨機(jī)延時(shí)一段時(shí)間*/ Thread.sleep(rnd.nextint(500)); /*產(chǎn)生的新產(chǎn)品入列,隊(duì)列滿時(shí)發(fā)生阻塞*/ lbq.put(goods); /*隨機(jī)延時(shí)一段時(shí)間*/ Thread.sleep(rnd.nextint(500)); } catch (InterruptedException e1) { /*什么都不做*/ } } System.out.println( "producter " + Thread.currentThread().getName() + " is over "); } } /*=================main==================*/ public static void main(String[] args){ LinkedBlockingQueueDemo lbqd = new LinkedBlockingQueueDemo(); Runnable c = lbqd.new CustomerThread(); Runnable p = lbqd.new ProducerThread(); ExecutorService es = Executors.newCachedThreadPool(); es.execute(c); es.execute(c); es.execute(c); es.execute(p); es.execute(p); es.execute(p); es.shutdown(); System.out.println("main Thread is over"); } }
以上是“Java多線程中不同條件下編寫生產(chǎn)消費(fèi)者模型的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!
當(dāng)前文章:Java多線程中不同條件下編寫生產(chǎn)消費(fèi)者模型的示例分析
鏈接分享:http://bm7419.com/article10/jdscdo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站策劃、虛擬主機(jī)、微信小程序、關(guān)鍵詞優(yōu)化、品牌網(wǎng)站制作、網(wǎng)站收錄
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)