并发容器J.U.C --组件FutureTask、ForkJoin、BlockingQueue

线程

记录一下

生产者线程

public class Producer implements Runnable { private volatile boolean isRuuning = true; private BlockingQueue queue; private static AtomicInteger count = new AtomicInteger(); private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; public Producer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { String data = null; Random r = new Random(); System.out.println("启动生产者线程!"); try { while (isRuuning) { System.out.println("正在生产数据..."); Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); data = "data:" + count.incrementAndGet(); System.out.println("将数据:" + data + "放入队列..."); if (!queue.offer(data, 2, TimeUnit.SECONDS)) { System.out.println("放入数据失败:" + data); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println("退出生产者模式线程!"); } } /** * 终止线程 */ public void stop() { isRuuning = false; } }

消费者线程

public class Consumer implements Runnable { private BlockingQueue<String> queue; private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; public Consumer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { System.out.println("启动消费者线程!"); Random r = new Random(); boolean isRunning = true; try { while (isRunning) { System.out.println("正从队列获取数据..."); String data = queue.poll(2, TimeUnit.SECONDS); if (data != null) { System.out.println("拿到数据:" + data); System.out.println("正在消费数据"); Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); } else { // 超过 2s 还没数据,认为所有生产者线程已经退出,自动退出消费者线程 isRunning = false; } } } catch (InterruptedException e) { e.printStackTrace(); // 停止当前线程 Thread.currentThread().interrupt(); } finally { System.out.println("退出消费者线程!"); } } }

测试类

public class Main { public static void main(String[] args) { // 声明一个容器为10的缓存队列 BlockingQueue<String> queue = new LinkedBlockingQueue<>(10); Producer producer1 = new Producer(queue); Producer producer2 = new Producer(queue); Producer producer3 = new Producer(queue); Consumer consumer = new Consumer(queue); // 借助 Executors ExecutorService service = Executors.newCachedThreadPool(); // 启动线程 service.execute(producer1); service.execute(producer2); service.execute(producer3); service.execute(consumer); // 执行 10s try { Thread.sleep(10 * 1000); producer1.stop(); producer2.stop(); producer3.stop(); Thread.sleep(10 * 1000); service.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); service.shutdown(); System.out.println("Thread 10s bad"); } } }

参考文章