单条件变量if 多消费者存在两个问题:

  1. 唤醒之后,但是条件被其他消费者修改,if可能不成立
  2. 无法唤醒,生产者生产满之后睡眠,唤醒消费者,消费者消费完之后继续睡眠,唤醒另外一个消费者,不满足条件从而陷入死睡眠当中
package com.example.demo.interview;

import java.util.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class HighCpuUsage {
    static int count = 0;
    static int val = 0;
    volatile static int flag = 0;
    static int MAX = 10;
    static Queue<Integer> queue = new ArrayDeque<>(MAX);
    static Random random = new Random();
    static int put() {
        if(count == MAX) {
            System.out.println("Error count = 1, but put");
            flag = 1;
            return 0;
        }
        int x = random.nextInt();
        queue.add(x);
        count ++;
        return x;
    }
    static int get() {
        if(count == 0) {
            System.out.println("Error count = 0, but get");
            flag = 2;
            return -1;
        }
        count--;
        Integer poll = queue.poll();
        return poll;
    }

    public static void main(String[] args) throws InterruptedException {
        ReentrantLock lock = new ReentrantLock();
        Condition full = lock.newCondition();
        Condition empty = lock.newCondition();
        List<Thread> threads = new ArrayList<>();

        Thread thread1 = new Thread(() -> {
            while (true) {
                lock.lock();
                while (count == MAX) {
                    try {
                        full.await();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
                int x = put();
                System.out.println("Put " + x);
                empty.signal();
                lock.unlock();
            }
        });
        thread1.start();
        threads.add(thread1);

        for (int i = 0; i < 2; i++) {
            Thread thread = new Thread(() -> {
                while (true) {
                    lock.lock();
                    while (count == 0) {
                        try {
                            empty.await();
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }
                    int x = get();
                    System.out.println("Get " + x);
                    full.signal();
                    lock.unlock();
                }
            });
            thread.start();
            threads.add(thread);
        }

        // System.out.println("Main");
        while(true) {
            Scanner in = new Scanner(System.in);
            int q = in.nextInt();
            if(q == 1) {
                for (Thread thread : threads) {
                    thread.interrupt();
                }
                System.exit(0);
            }
        }
    }
}

自研信号量

class MySemaphore {
    int val;
    ReentrantLock lock = new ReentrantLock();
    Condition condition = lock.newCondition();
    MySemaphore(int val) {
        this.val = val;
    }
    @SneakyThrows
    void acquire() {
        lock.lock();
        while (val == 0) {
            condition.await();
        }
        val--;
        lock.unlock();
    }
    @SneakyThrows
    void release() {
        lock.lock();
        val++;
        condition.signal();
        lock.unlock();
    }
}