海尔洗衣机洗完不脱水:经典线程同步问题(生产者&消费者)--Java实现

来源:百度文库 编辑:中财网 时间:2024/05/10 09:56:03

原创作品,转载请注明出自xelz's blog

博客地址:http://mingcn.cnblogs.com/

本文地址:http://mingcn.cnblogs.com/archive/2010/10/26/JavaThreadSync.html

 

生产者-消费者(producer-consumer)问题是一个著名的线程同步问题。它描述的是:有一群生产者线程在生产产品,并将这些产品提供给消费者线程去消费。

为使生产者与消费者之间能够并发执行,在两者之间设置了一个具有n个缓冲区的缓冲池,生产者将它所生产的产品放入一个缓冲区中;消费者可以从一个缓冲区中取走产品产生消费。

尽管所有的生产者线程和消费者线程都是以异步方式运行的,但他们之间必须保持同步,即不允许消费者到一个空缓冲区去消费,也不允许生产者向一个已经被占用的缓冲区投放产品。

 

我把这个问题复杂化,设立m个缓冲池,每个缓冲池都有各自固定的容量,每个生产者或消费者在进行生产消费活动之前,先选择一个缓冲池。由此会引发一个线程死锁的问题:所有的生产者都在满的缓冲池等待,直到某个消费者取走一个产品,释放出一块缓冲区;同时所有的消费者都在空的缓冲池等待,直到某个生产者放进一个产品。

解决方法:记录每一个线程的等待状态,如果当前线程会产生等待,则检测是否会产生死锁(所有线程都在等待),如果会产生死锁,拒绝此次生产消费活动(换一个缓冲池)

 

Java Code:

 

001public class ProducerAndConsumer {//作者:xelz,博客http://www.cnblogs.com/mingcn002     003    private static final int PRODUCTION_LINE_COUNT = 20;//生产线(缓冲池)条数004    private static final int PRODUCTION_LINE_SIZE = 100;//生产线最大容量005    private static final int PRODUCER_COUNT = 50;//生产者个数006    private static final int CONSUMER_COUNT = 50;//消费者个数007     008    public static void main(String[] args) {009        ProductionLine[] productionLine = new ProductionLine[PRODUCTION_LINE_COUNT];010        Producer[] producer = new Producer[PRODUCER_COUNT];011        Consumer[] consumer = new Consumer[CONSUMER_COUNT];012        SyncLock.initLock(PRODUCER_COUNT, CONSUMER_COUNT);//初始化每个线程的等待状态013        for(int i = 0; i < PRODUCTION_LINE_COUNT; i++) {//初始化每条生产线,随机容量014            productionLine[i] = new ProductionLine((int) (Math.random() * PRODUCTION_LINE_SIZE) + 1, i);015        }016        for(int i = 0; i < PRODUCER_COUNT; i++) {//初始化生产者线程017            producer[i] = new Producer(i, productionLine);018            new Thread(producer[i]).start();019        }020        for(int i = 0; i < CONSUMER_COUNT; i++) {//初始化消费者线程021            consumer[i] = new Consumer(i, productionLine);022            new Thread(consumer[i]).start();023        }024    }025     026}027 028class Product {//产品(缓冲区)类029     030    int productID;//制造者编号031    int productionLineID;//生产线编号032    int producerID;//产品编号(产品在该生产线上的位置)033     034    Product(int producerID) {035        this.producerID = producerID;036    }037     038}039 040class ProductionLine {//生产线(缓冲池)类041     042    Product[] product;//缓冲池采用循环队列模式043    int size;//缓冲池大小044    int count;//缓冲池当前产品计数045    int productionLineID;//生产线编号046    int produceID;//队头指针,下一个被放入的产品的编号047    int consumeID;//队尾指针,下一个被取走的产品的编号048     049    ProductionLine(int size, int productionLineID) {//初始化生产线(缓冲池)050        this.size = size;051        this.productionLineID = productionLineID;052        product = new Product[size];053        for(int i = 0; i < size / 2; i++) {//为防止刚开始产销不平衡,预先放置一半产品054            putProduct(new Product(-1));//产品生产者编号-1,系统制造055        }056    }057     058    boolean isFull() {//判断缓冲池是否满059        return count == size;060    }061     062    boolean isEmpty() {//判断缓冲池是否空063        return 0 == count;064    }065     066    void putProduct(Product product) {//放入一个产品,并将队头指针前移067        this.product[produceID] = product;068        product.productID = produceID;//给产品贴上标签,产品编号,生产线编号069        product.productionLineID = productionLineID;070        produceID = ++produceID % size;//下一个产品放置位置071        count++;072    }073     074    Product getProduct() {//取出一个产品,并将队尾指针前移075        Product product =  this.product[consumeID];076        this.product[consumeID] = null;077        consumeID = ++consumeID % size;//下一个产品取出位置078        count--;079        return product;080    }081     082}083 084class Producer implements Runnable {//生产者线程085     086    int producerID;//自己的生产者编号087    ProductionLine[] productionLine;//共享的生产线088    ProductionLine currentProductionLine;089     090    Producer(int producerID, ProductionLine[] productionLine) {091        this.producerID = producerID;092        this.productionLine = productionLine;093    }094     095    private void produce() {//生产活动096        Product product;097        int productionLineID;098        while(true) {099            do {//选择一条生产线,若会产生死锁,则重选一条生产线100                productionLineID = (int) (Math.random() * productionLine.length);101                currentProductionLine = productionLine[productionLineID];102            } while((SyncLock.waitOnFull[producerID] = currentProductionLine.isFull()) && SyncLock.deadLock());103            //synchronized(SyncLock.printSyncLock) {104            //  System.out.print("Producer " + producerID +" wants to ");105            //  System.out.println("produce at ProductionLine  " + productionLineID);106            //}107            synchronized(currentProductionLine) {//同步对象:当前生产线,不能同时有两个及以上线程操作同一生产线108                while(currentProductionLine.isFull()) {//缓冲池满,无法生产,等待109                    try {110                        currentProductionLine.wait();111                    } catch(InterruptedException e) {112                        e.printStackTrace();113                    }114                }115                product = new Product(producerID);//生产新的产品116                currentProductionLine.putProduct(product);//加入缓冲池117                synchronized(SyncLock.printSyncLock) {//打印生产线、生产者、产品ID及缓冲池是否变满118                    System.out.print("ProductionLine: " + productionLineID);119                    System.out.print("\tProducer: " + producerID);120                    System.out.print("\tProduct: " + product.productID);121                    System.out.println(currentProductionLine.isFull() ? "\t(" + productionLineID + ")Full!" : "");122                }//释放当前生产线同步锁123                currentProductionLine.notifyAll();//生产活动结束,唤醒当前生产线上等待的其他生产者/消费者线程124            }125            try {126                Thread.sleep((int) (Math.random() * 1000));127            } catch(InterruptedException e) {128                e.printStackTrace();129            }130        }131    }132     133    public void run() {134        produce();135    }136     137}138 139class Consumer implements Runnable {140     141    int consumerID;//自己的消费者编号142    ProductionLine[] productionLine;//共享的生产线143    ProductionLine currentProductionLine;144     145    Consumer(int consumerID, ProductionLine[] productionLine) {146        this.consumerID = consumerID;147        this.productionLine = productionLine;148    }149     150    private void consume() {//消费活动151        Product product;152        int productionLineID;153        while(true) {154            do {//选择一条生产线,若会产生死锁,则重选一条生产线155                productionLineID = (int) (Math.random() * productionLine.length);156                currentProductionLine = productionLine[productionLineID];157            } while((SyncLock.waitOnEmpty[consumerID] = currentProductionLine.isEmpty()) && SyncLock.deadLock());158            //synchronized(SyncLock.printSyncLock) {159            //  System.out.print("Consumer " + consumerID +" wants to ");160            //  System.out.println("consume at ProductionLine  " + productionLineID);161            //}162            synchronized(currentProductionLine) {//同步对象:当前生产线,不能同时有两个及以上线程操作同一生产线163                while(currentProductionLine.isEmpty()) {//缓冲池空,无法消费,等待164                    try {165                        currentProductionLine.wait();166                    } catch(InterruptedException e) {167                        e.printStackTrace();168                    }169                }170                product = currentProductionLine.getProduct();//消费产品171                synchronized(SyncLock.printSyncLock) {//打印生产线、消费者、产品及其制造者ID,以及生产线是否变空172                    System.out.print("ProductionLine: " + productionLineID);173                    System.out.print("\tConsumer: " + consumerID);174                    System.out.print("\tProduct: " + product.productID);175                    System.out.print("(" + product.producerID + ")");176                    System.out.println(currentProductionLine.isEmpty() ? "\t(" + productionLineID + ")Empty!" : "");177                }178            currentProductionLine.notifyAll();//生产活动结束,唤醒当前生产线上等待的其他生产者/消费者线程179            }180            try {181                Thread.sleep((int) (Math.random() * 1000));182            } catch(InterruptedException e) {183                e.printStackTrace();184            }185        }186    }187     188    public void run() {189        consume();190    }191     192}193 194class SyncLock {//提供死锁检测的静态方法,及打印同步195     196    static final Object printSyncLock = new Object();//用于同步输出,防止不同线程输出交叉197    static boolean[] waitOnEmpty;//指示某一消费者是否在空缓冲池等待198    static boolean[] waitOnFull;//指示某一生产者是否在满缓冲池等待199     200    static void initLock(int ProducerCount,int ConsumerCount) {//初始化等待指示器201        waitOnEmpty = new boolean[ConsumerCount];202        waitOnFull = new boolean[ProducerCount];203    }204     205    static boolean deadLock() {//判断是否产生死锁206        for(boolean b : waitOnEmpty) {207            if(!b) return false;208        }209        for(boolean b : waitOnFull) {210            if(!b) return false;211        }212        return true;//若所有线程都在等待状态,则产生死锁213    }214     215}