跳至主要內容

通用类解读

wangdx大约 34 分钟

生成随机数

1、
package com.yootk;

import java.util.Random;

public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        Random random = new Random(); // 定义一个随机数的处理类
        for (int x = 0; x < 3; x++) {
            new Thread(()->{
                System.out.printf("【%s】生成随机数:%d%n", Thread.currentThread().getName(), random.nextInt(100));
            },"随机数生成线程 - " + x).start();
        }
    }
}


2、

package com.yootk;

import java.util.Random;

public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        Random random = new Random(); // 定义一个随机数的处理类
        for (int x = 0; x < 3; x++) {
            new Thread(() -> {
                for (int y = 0; y < 3; y++) {
                    System.out.printf("【%s】〖%d〗生成随机数:%d%n", Thread.currentThread().getName(), y, random.nextInt(100));
                }
            }, "随机数生成线程 - " + x).start();
        }
    }
}

3、
package com.yootk;

import java.util.concurrent.ThreadLocalRandom;

public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        for (int x = 0; x < 3; x++) {
            new Thread(() -> {
                for (int y = 0; y < 3; y++) {
                    System.out.printf("【%s】〖%d〗生成随机数:%d%n", Thread.currentThread().getName(), y,
                            ThreadLocalRandom.current().nextInt(100));
                }
            }, "随机数生成线程 - " + x).start();
        }
    }
}

互斥锁

1、
package com.yootk;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class Ticket {  // 实现抢票的处理机制
    private int count = 3; // 总票数
    private Lock lock = new ReentrantLock(); // 定义互斥锁
    public void sale() {    // 售票处理
        this.lock.lock(); // 对当前的线程锁定
        try {
            if (this.count > 0) {   // 【业务处理1】判断今年是否有剩余的票数
                // 在进行操作延迟的过程之中,独占锁会始终锁定当前的资源
                TimeUnit.SECONDS.sleep(1); // 模拟一下操作的延迟
                System.out.printf("【%s】卖票,剩余票数:%d%n", Thread.currentThread().getName(), this.count--);
            } else {
                System.out.printf("【%s】票已经卖完了,明天请早!%n", Thread.currentThread().getName());
            }
        } catch (Exception e) {}
        finally {
            this.lock.unlock(); // 进行解锁配置
        }
    }
}
public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        Ticket ticket = new Ticket(); // 共享一个售票应用
        for (int x = 0; x < 5; x++) {
            new Thread(()->{
                ticket.sale();
            }, "售票员 - " + x).start();
        }
    }
}

ReentrantReadWriteLock

1、
package com.yootk;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

class Account { // 模拟银行账户
    private String name; // 账户的名称
    // 包括X东、X宝、X刀刀,都不会使用double描述金额,因为会有一个浮点的进位的bug,出现金额漏洞
    private int asset; // 账户的资金,资金使用整型
    // 因为读不需要考虑到线程同步处理,但是写需要考虑到线程同步处理
    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); // 读写锁

    public Account(String name, int asset) {
        this.name = name;
        this.asset = asset;
    }

    public void save(int asset) { // 向银行账户之中进行存款操作
        this.readWriteLock.writeLock().lock(); // 写入锁定
        try {
            this.asset += asset; // 存款处理
            TimeUnit.SECONDS.sleep(1); // 模拟业务的处理延迟
            System.err.printf("【%s】修改银行资产,当前的资产为:%10.2f%n", Thread.currentThread().getName(), this.asset / 100.0);
        } catch (Exception e) {
        } finally {
            this.readWriteLock.writeLock().unlock(); // 解除锁定
        }
    }

    @Override
    public String toString() {
        this.readWriteLock.readLock().lock(); // 读锁属于共享锁
        try {
            TimeUnit.MILLISECONDS.sleep(200); // 模拟业务延迟
            return String.format("〖%s〗账户名称:%s、账户余额:%10.2f %n", Thread.currentThread().getName(), this.name, this.asset / 100.0);
        } catch (Exception e) {
            return null;
        } finally {
            this.readWriteLock.readLock().unlock(); // 解除锁定
        }
    }
}

public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        Account account = new Account("李兴华", 0); // 定义银行账户信息
        int money[] = new int[]{110, 230, 10_000}; // 定义要存款的金额
        for (int x = 0; x < 10; x++) {  // 通过循环模拟操作线程
            if (x % 2 == 0) { // 存款线程
                new Thread(() -> {
                    for (int y = 0; y < money.length; y++) {
                        account.save(money[y]); // 存款操作
                    }
                }, "存款线程 - " + x).start();
            } else {    // 取款线程
                new Thread(() -> {
                    while (true) {
                        System.out.print(account);
                    }
                }, "取款线程 - " + x).start();
            }
        }
    }
}


标记戳锁

1、
package com.yootk;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

class Account { // 模拟银行账户
    private String name; // 账户的名称
    private int asset; // 账户的资金,资金使用整型
    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); // 读写锁

    public Account(String name, int asset) {
        this.name = name;
        this.asset = asset;
    }

    public void save(int asset) { // 向银行账户之中进行存款操作
        this.readWriteLock.writeLock().lock(); // 写入锁定
        try {
            this.asset += asset; // 存款处理
            TimeUnit.SECONDS.sleep(1); // 模拟业务的处理延迟
            System.err.printf("【%s】修改银行资产,当前的资产为:%10.2f%n", Thread.currentThread().getName(), this.asset / 100.0);
        } catch (Exception e) {
        } finally {
            this.readWriteLock.writeLock().unlock(); // 解除锁定
        }
    }

    @Override
    public String toString() {
        this.readWriteLock.readLock().lock(); // 读锁属于共享锁
        try {
            TimeUnit.MILLISECONDS.sleep(200); // 模拟业务延迟
            return String.format("〖%s〗账户名称:%s、账户余额:%10.2f %n", Thread.currentThread().getName(), this.name, this.asset / 100.0);
        } catch (Exception e) {
            return null;
        } finally {
            this.readWriteLock.readLock().unlock(); // 解除锁定
        }
    }
}

public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        Account account = new Account("李兴华", 0); // 定义银行账户信息
        int money[] = new int[]{110, 230, 10_000}; // 定义要存款的金额
        for (int x = 0; x < 10; x++) { // 10个读锁
            new Thread(() -> {
                while (true) {
                    System.out.print(account);
                }
            }, "取款线程 - " + x).start();
        }
        for (int x = 0; x < 2; x++) {  // 2个写锁
            new Thread(() -> {
                for (int y = 0; y < money.length; y++) {
                    account.save(money[y]); // 存款操作
                }
            }, "存款线程 - " + x).start();
        }
    }
}

2、
package com.yootk;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.StampedLock;

class Account { // 模拟银行账户
    private String name; // 账户的名称
    private int asset; // 账户的资金,资金使用整型
    private StampedLock stampedLock = new StampedLock(); // 标记戳锁

    public Account(String name, int asset) {
        this.name = name;
        this.asset = asset;
    }

    public void save(int asset) { // 向银行账户之中进行存款操作
        long stamp = this.stampedLock.writeLock(); // 获取写锁,这个时间戳用于解锁
        try {
            this.asset += asset; // 存款处理
            TimeUnit.SECONDS.sleep(2); // 模拟业务的处理延迟
            System.err.printf("【%s】修改银行资产,当前的资产为:%10.2f%n", Thread.currentThread().getName(), this.asset / 100.0);
        } catch (Exception e) {
        } finally {
            this.stampedLock.unlockWrite(stamp); // 解除写锁
        }
    }

    @Override
    public String toString() {
        long stamp = this.stampedLock.readLock(); // 获取读锁
        try {
            TimeUnit.MILLISECONDS.sleep(200); // 模拟业务延迟
            return String.format("〖%s〗账户名称:%s、账户余额:%10.2f %n", Thread.currentThread().getName(), this.name, this.asset / 100.0);
        } catch (Exception e) {
            return null;
        } finally {
            this.stampedLock.unlockRead(stamp); // 释放读锁
        }
    }
}
public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        Account account = new Account("李兴华", 0); // 定义银行账户信息
        int money[] = new int[]{110, 230, 10_000}; // 定义要存款的金额
        for (int x = 0; x < 100; x++) { // 10个读锁
            new Thread(() -> {
                while (true) {
                    System.out.print(account);
                }
            }, "取款线程 - " + x).start();
        }
        for (int x = 0; x < 2; x++) {  // 2个写锁
            new Thread(() -> {
                for (int y = 0; y < money.length; y++) {
                    account.save(money[y]); // 存款操作
                }
            }, "存款线程 - " + x).start();
        }
    }
}


Condition

1、
package com.yootk;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class YootkDemo { // 李兴华高薪就业编程训练营
    public static String msg = null; // 保存信息

    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        Lock lock = new ReentrantLock(); // 获取一个互斥锁
        Condition condition = lock.newCondition(); // 获取Condition接口实例
        try {
            lock.lock(); // 锁定主线程
            new Thread(() -> {    // 创建子线程,修改msg的属性内容
                lock.lock(); // 锁定子线程
                try {
                    System.out.printf("【%s】准备进行数据的处理操作%n", Thread.currentThread().getName());
                    TimeUnit.SECONDS.sleep(2); // 模拟数据修改的操作延迟
                    msg = "沐言科技课程下载:www.yootk.com/resources"; // 修改数据内容
                    condition.signal(); // 唤醒第一个等待的线程
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }, "子线程").start();
            condition.await(); // 主线程等待
            System.out.println("【主线程】得到最终处理的结果:" + msg);
        } catch (Exception e) {
            lock.unlock();
        }
    }
}

LockSupport

1、
package com.yootk;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;

public class YootkDemo { // 李兴华高薪就业编程训练营
    public static String msg = null; // 保存信息

    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        Thread mainThread = Thread.currentThread(); // 获取当前的主线程
        new Thread(() -> {    // 创建子线程,修改msg的属性内容
            try {
                System.out.printf("【%s】准备进行数据的处理操作%n", Thread.currentThread().getName());
                TimeUnit.SECONDS.sleep(2); // 模拟数据修改的操作延迟
                msg = "沐言科技课程下载:www.yootk.com/resources"; // 修改数据内容
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                LockSupport.unpark(mainThread); // 恢复主线程
            }
        }, "子线程").start();
        LockSupport.park(mainThread); // 挂起主线程
        System.out.println("【主线程】得到最终处理的结果:" + msg);
    }
}

Semaphore

1、
package com.yootk;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class YootkDemo { // 李兴华高薪就业编程训练营

    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        Semaphore semaphore = new Semaphore(2); // 此时只有两个资源
        for (int x = 0; x < 5; x++) {   // 给出5个线程资源
            new Thread(()->{    // 此时5个用户需要抢占2个资源
                try {
                    semaphore.acquire(); // 尝试获取到一个资源
                    if (semaphore.availablePermits() >= 0) {    // 当前有了空余资源
                        System.out.printf("【业务办理 - 开始】当前办理业务人员的信息为:%s %n", Thread.currentThread().getName());
                        TimeUnit.SECONDS.sleep(2); // 模拟业务办理的时间
                        System.out.printf("〖业务办理 - 结束〗当前办理业务人员的信息为:%s %n", Thread.currentThread().getName());
                        semaphore.release(); // 释放资源
                    }
                } catch (Exception e) {}
            }, "用户 - " + x).start();
        }
    }
}

CountDownLatch

1、
package com.yootk;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        CountDownLatch latch = new CountDownLatch(3); // 总共的等待的数量为3
        for (int x = 0; x < 3; x++) {
            new Thread(()->{
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.printf("【%s】到达,并且已上车%n", Thread.currentThread().getName());
                latch.countDown(); // 减1
            },"游客 - " + x).start();
        }
        latch.await(); // 等待计数为0后再解除阻塞
        System.out.println("【主线程】所有的旅客都到齐了,开车走人,去下一个景点购物消费。");
    }
}

CyclicBarrier

1、
package com.yootk;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        CyclicBarrier barrier = new CyclicBarrier(2); // 现在设置的栅栏的数量为2
        for (int x = 0; x < 4; x++) {   // 创建4个线程
            new Thread(() -> {
                try {
                    System.out.printf("【Barrier - 等待开始】当前的线程名称:%s%n", Thread.currentThread().getName());
                    TimeUnit.SECONDS.sleep(2); // 模拟业务延迟
                    barrier.await(); // 等待,凑够了2个等待的线程
                    System.err.printf("〖Barrier - 业务处理完毕〗当前的线程名称:%s%n", Thread.currentThread().getName());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, "执行者 - " + x).start();
        }
    }
}


2、
package com.yootk;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

public class YootkDemo { // 李兴华高薪就业编程训练营

    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        CyclicBarrier barrier = new CyclicBarrier(2); // 现在设置的栅栏的数量为2
        for (int x = 0; x < 3; x++) {   // 创建3个线程
            final int temp = x; // 留给内部类使用
            new Thread(() -> {
                try {
                    System.out.printf("【Barrier - 等待开始】当前的线程名称:%s%n", Thread.currentThread().getName());
                    TimeUnit.SECONDS.sleep(2); // 模拟业务延迟
                    if (temp == 2) {    // 设置一个判断条件
                        barrier.reset(); // 重置
                    } else {
                        barrier.await(); // 等待,凑够了2个等待的线程
                    }
                    System.err.printf("〖Barrier - 业务处理完毕〗当前的线程名称:%s%n", Thread.currentThread().getName());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, "执行者 - " + x).start();
        }
    }
}


3、

package com.yootk;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

public class YootkDemo { // 李兴华高薪就业编程训练营

    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        CyclicBarrier barrier = new CyclicBarrier(2, ()->{
            System.out.println("【屏障业务处理】两个子线程已就绪,可以开始执行屏障线程控制。");
        }); // 现在设置的栅栏的数量为2
        for (int x = 0; x < 4; x++) {   // 创建3个线程
            final int temp = x; // 留给内部类使用
            new Thread(() -> {
                try {
                    System.out.printf("【Barrier - 等待开始】当前的线程名称:%s%n", Thread.currentThread().getName());
                    TimeUnit.SECONDS.sleep(2); // 模拟业务延迟
                    barrier.await(); // 等待,凑够了2个等待的线程
                    System.err.printf("〖Barrier - 业务处理完毕〗当前的线程名称:%s%n", Thread.currentThread().getName());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, "执行者 - " + x).start();
        }
    }
}

Exchanger

1、
package com.yootk;

import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;

public class YootkDemo { // 李兴华高薪就业编程训练营

    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        int repeat = 20; // 定义生产和消费的次数
        Exchanger<String> exchanger = new Exchanger<>();
        new Thread(()->{
            for (int x = 0; x < repeat; x++) {  // 生产的次数
                String info = null; // 保存最终的数据
                if (x % 2 == 0) {   // 奇偶数判断
                    info = "李兴华高薪就业编程训练营:edu.yootk.com";
                } else {
                    info = "沐言科技:www.yootk.com";
                }
                try {
                    TimeUnit.SECONDS.sleep(1); // 模拟延迟
                    exchanger.exchange(info); // 数据存储
                    System.out.printf("【%s】%s%n", Thread.currentThread().getName(), info);
                } catch (InterruptedException e) {}
            }
        }, "生产者").start();
        new Thread(()->{
            for (int x = 0; x < repeat; x++) {  // 消费的次数
                try {
                    TimeUnit.SECONDS.sleep(2); // 模拟延迟
                    String info = exchanger.exchange(null); // 数据获取
                    System.err.printf("〖%s〗%s%n", Thread.currentThread().getName(), info);
                } catch (InterruptedException e) {}
            }
        }, "消费者").start();
    }
}

CompletableFuture

1、
package com.yootk;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class YootkDemo { // 李兴华高薪就业编程训练营

    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        CompletableFuture<String> future = new CompletableFuture<>();
        // 下面利用多线程的概念模拟一个炮兵打跑的形式,因为所有的操作都需要有统一的口令
        for (int x = 0; x < 2; x++) {
            new Thread(()->{
                System.out.printf("【START】%s,炮兵就位,等待开炮命令发出。%n", Thread.currentThread().getName());
                try { // get()方法未收到数据则持续阻塞,收到数据则解除阻塞
                    System.err.printf("〖END〗%s,解除阻塞,收到执行命令:%s%n", Thread.currentThread().getName(), future.get());
                } catch (Exception e) {
                    e.printStackTrace();
                }

            }, "炮兵 - " + x).start();
        }
        TimeUnit.SECONDS.sleep(2); // 准备的模拟延迟
        future.complete("开炮");
    }
}



2、
package com.yootk;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class YootkDemo { // 李兴华高薪就业编程训练营

    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
            System.err.println("【*** 异步线程 ***】突然接到了上将军的紧急联系电话。");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } // 当此异步线程执行完成之后,才会让所有的子线程恢复执行
            System.err.println("【*** 异步线程 ***】接到了新的作战任务,更改炮兵的作战目标。");
        }); // 异步线程不用考虑返回值了
        // 下面利用多线程的概念模拟一个炮兵打跑的形式,因为所有的操作都需要有统一的口令
        for (int x = 0; x < 2; x++) {
            new Thread(()->{
                System.out.printf("【START】%s,炮兵就位,等待开炮命令发出。%n", Thread.currentThread().getName());
                try { // get()方法未收到数据则持续阻塞,收到数据则解除阻塞
                    future.get(); // 异步的操作是没有返回结果的
                    System.err.printf("〖END〗%s,收到开火命令,万炮齐鸣。%n", Thread.currentThread().getName());
                } catch (Exception e) {
                    e.printStackTrace();
                }

            }, "炮兵 - " + x).start();
        }
        System.out.println("【★主线程★】所有的炮兵线程进入到了就绪状态,等待后续命令的发送。");
    }
}

并发合集

1、
package com.yootk;

import java.util.ArrayList;
import java.util.List;

public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        // ArrayList是一个基于数组的形式实现的集合存储,那么既然属于数组的形式,最终就需要考虑到索引的操作问题
        List<String> all = new ArrayList<>(); // 创建一个List集合
        for (int num = 0; num < 10; num++) {    // 创建10个线程
            new Thread(()->{
                for (int x = 0; x < 10; x++) {  // 进行数据的循环更新操作
                    all.add("【"+Thread.currentThread().getName()+"】www.yootk.com"); // 添加集合数据
                    System.out.println(all); // 输出全部的数据内容
                }
            }, "集合操作线程 - " + num).start(); // 线程启动
        }
    }
}

2、
package com.yootk;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        // ArrayList是一个基于数组的形式实现的集合存储,那么既然属于数组的形式,最终就需要考虑到索引的操作问题
        List<String> originAll = new ArrayList<>(); // 创建一个List集合
        List<String> all = Collections.synchronizedList(originAll); // 集合转换
        for (int num = 0; num < 10; num++) {    // 创建10个线程
            new Thread(()->{
                for (int x = 0; x < 10; x++) {  // 进行数据的循环更新操作
                    all.add("【"+Thread.currentThread().getName()+"】www.yootk.com"); // 添加集合数据
                    System.out.println(all); // 输出全部的数据内容
                }
            }, "集合操作线程 - " + num).start(); // 线程启动
        }
    }
}

并发单集

1、
package com.yootk;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        List<String> all = new CopyOnWriteArrayList<>(); // 创建一个List集合
        for (int num = 0; num < 10; num++) {    // 创建10个线程
            new Thread(()->{
                for (int x = 0; x < 10; x++) {  // 进行数据的循环更新操作
                    all.add("【"+Thread.currentThread().getName()+"】www.yootk.com"); // 添加集合数据
                    System.out.println(all); // 输出全部的数据内容
                }
            }, "集合操作线程 - " + num).start(); // 线程启动
        }
    }
}

2、
package com.yootk;

import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        Set<String> all = new CopyOnWriteArraySet<>(); // 创建一个Set集合
        for (int num = 0; num < 10; num++) {    // 创建10个线程
            new Thread(()->{
                for (int x = 0; x < 10; x++) {  // 进行数据的循环更新操作
                    all.add("【"+Thread.currentThread().getName()+"】www.yootk.com"); // 添加集合数据
                    System.out.println(all); // 输出全部的数据内容
                }
            }, "集合操作线程 - " + num).start(); // 线程启动
        }
    }
}

ConcurrentHashMap

1、
package com.yootk;

import java.util.HashMap;
import java.util.Map;

public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        Map<String, String> map = new HashMap<>(); // 创建HashMap集合
        for (int num = 0; num < 10; num++) {    // 创建10个线程
            new Thread(()->{
                for (int x = 0; x < 10; x++) {  // 进行数据的循环更新操作
                    map.put("【"+Thread.currentThread().getName()+"】x = " + x, "www.yootk.com");
                    System.out.println(map); // 输出全部的数据内容
                }
            }, "集合操作线程 - " + num).start(); // 线程启动
        }
    }
}

2、
package com.yootk;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        Map<String, String> map = new ConcurrentHashMap<>(); // 创建HashMap集合
        for (int num = 0; num < 10; num++) {    // 创建10个线程
            new Thread(()->{
                for (int x = 0; x < 10; x++) {  // 进行数据的循环更新操作
                    map.put("【"+Thread.currentThread().getName()+"】x = " + x, "www.yootk.com");
                    System.out.println(map); // 输出全部的数据内容
                }
            }, "集合操作线程 - " + num).start(); // 线程启动
        }
    }
}

ConcurrentSkipListSet

1、
package com.yootk;

import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;

public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        Set<String> all = new ConcurrentSkipListSet<>(); // 创建HashMap集合
        for (int num = 0; num < 10; num++) {    // 创建10个线程
            new Thread(()->{
                for (int x = 0; x < 10; x++) {  // 进行数据的循环更新操作
                    all.add("【"+Thread.currentThread().getName()+"】www.yootk.com");
                }
            }, "集合操作线程 - " + num).start(); // 线程启动
        }
    }
}

2、
package com.yootk;

import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;

public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        Set<String> all = new ConcurrentSkipListSet<>(); // 创建HashMap集合
        for (int num = 0; num < 10; num++) {    // 创建10个线程
            new Thread(()->{
                for (int x = 0; x < 10; x++) {  // 进行数据的循环更新操作
                    all.add("【"+Thread.currentThread().getName()+"】www.yootk.com");
                    System.out.println(all.contains("【"+Thread.currentThread().getName()+"】www.yootk.com"));
                }
            }, "集合操作线程 - " + num).start(); // 线程启动
        }
    }
}

BlockingQueue

1、
package com.yootk;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        // 每一个队列在启用的时候必须明确的设置其保存的数据的个数
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(5); // 数组阻塞队列
        // 如果要想观察到队列的使用,最佳的做法就是生产者与消费者的处理模型
        for (int x = 0; x < 10; x++) {  // 创建循环
            final int temp = x;
            new Thread(() -> {
                for (int y = 0; y < 100; y++) { // 持续生产
                    try {
                        TimeUnit.SECONDS.sleep(2); // 生产慢
                        String msg = "{ID = MUYAN - " + temp + " - " + y + "}沐言科技:www.yootk.com";
                        queue.put(msg); // 向队列之中添加数据
                        System.out.printf("【%s】%s%n", Thread.currentThread().getName(), msg);
                    } catch (InterruptedException e) {
                    }
                }
            }, "YOOTK生产者 - " + x).start();
        }
        for (int x = 0; x < 2; x++) {
            new Thread(() -> {
                while(true) {   // 持续性的通过队列抓取数据
                    try {
                        TimeUnit.SECONDS.sleep(2); // 生产慢
                        System.err.printf("【%s】%s", Thread.currentThread().getName(), queue.take());
                    } catch (InterruptedException e) {
                    }
                }
            }, "YOOTK消费者 - " + x).start();
        }
    }
}

2、
package com.yootk;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        // 每一个队列在启用的时候必须明确的设置其保存的数据的个数
        BlockingQueue<String> queue = new LinkedBlockingQueue<>(5); // 数组阻塞队列
        // 如果要想观察到队列的使用,最佳的做法就是生产者与消费者的处理模型
        for (int x = 0; x < 10; x++) {  // 创建循环
            final int temp = x;
            new Thread(() -> {
                for (int y = 0; y < 100; y++) { // 持续生产
                    try {
                        TimeUnit.SECONDS.sleep(2); // 生产慢
                        String msg = "{ID = MUYAN - " + temp + " - " + y + "}沐言科技:www.yootk.com";
                        queue.put(msg); // 向队列之中添加数据
                        System.out.printf("【%s】%s%n", Thread.currentThread().getName(), msg);
                    } catch (InterruptedException e) {
                    }
                }
            }, "YOOTK生产者 - " + x).start();
        }
        for (int x = 0; x < 2; x++) {
            new Thread(() -> {
                while(true) {   // 持续性的通过队列抓取数据
                    try {
                        TimeUnit.SECONDS.sleep(2); // 生产慢
                        System.err.printf("【%s】%s", Thread.currentThread().getName(), queue.take());
                    } catch (InterruptedException e) {
                    }
                }
            }, "YOOTK消费者 - " + x).start();
        }
    }
}

3、
package com.yootk;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        // 每一个队列在启用的时候必须明确的设置其保存的数据的个数
        BlockingQueue<String> queue = new PriorityBlockingQueue<>(5); // 数组阻塞队列
        // 如果要想观察到队列的使用,最佳的做法就是生产者与消费者的处理模型
        for (int x = 0; x < 10; x++) {  // 创建循环
            final int temp = x;
            new Thread(() -> {
                for (int y = 0; y < 100; y++) { // 持续生产
                    try {
                        TimeUnit.SECONDS.sleep(2); // 生产慢
                        String msg = "{ID = MUYAN - " + temp + " - " + y + "}沐言科技:www.yootk.com";
                        queue.put(msg); // 向队列之中添加数据
                        System.out.printf("【%s】%s%n", Thread.currentThread().getName(), msg);
                    } catch (InterruptedException e) {
                    }
                }
            }, "YOOTK生产者 - " + x).start();
        }
        for (int x = 0; x < 2; x++) {
            new Thread(() -> {
                while(true) {   // 持续性的通过队列抓取数据
                    try {
                        TimeUnit.SECONDS.sleep(2); // 生产慢
                        System.err.printf("【%s】%s", Thread.currentThread().getName(), queue.take());
                    } catch (InterruptedException e) {
                    }
                }
            }, "YOOTK消费者 - " + x).start();
        }
    }
}

4、
package com.yootk;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        // 每一个队列在启用的时候必须明确的设置其保存的数据的个数
        BlockingQueue<String> queue = new SynchronousQueue<>(); // 数组阻塞队列
        // 如果要想观察到队列的使用,最佳的做法就是生产者与消费者的处理模型
        for (int x = 0; x < 10; x++) {  // 创建循环
            final int temp = x;
            new Thread(() -> {
                for (int y = 0; y < 10; y++) { // 持续生产
                    try {
                        TimeUnit.SECONDS.sleep(2); // 生产慢
                        String msg = "{ID = MUYAN - " + temp + " - " + y + "}沐言科技:www.yootk.com";
                        queue.put(msg); // 向队列之中添加数据
                        System.out.printf("【%s】%s%n", Thread.currentThread().getName(), msg);
                    } catch (InterruptedException e) {
                    }
                }
            }, "YOOTK生产者 - " + x).start();
        }
        for (int x = 0; x < 2; x++) {
            new Thread(() -> {
                while(true) {   // 持续性的通过队列抓取数据
                    try {
                        TimeUnit.SECONDS.sleep(2); // 生产慢
                        System.err.printf("【%s】%s", Thread.currentThread().getName(), queue.take());
                    } catch (InterruptedException e) {
                    }
                }
            }, "YOOTK消费者 - " + x).start();
        }
    }
}

TransferQueue

1、
package com.yootk;

import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;

public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        // 每一个队列在启用的时候必须明确的设置其保存的数据的个数
        TransferQueue<String> queue = new LinkedTransferQueue<>(); // 数组阻塞队列
        // 如果要想观察到队列的使用,最佳的做法就是生产者与消费者的处理模型
        for (int x = 0; x < 10; x++) {  // 创建循环
            final int temp = x;
            new Thread(() -> {
                for (int y = 0; y < 10; y++) { // 持续生产
                    try {
                        TimeUnit.SECONDS.sleep(2); // 生产慢
                        String msg = "{ID = MUYAN - " + temp + " - " + y + "}沐言科技:www.yootk.com";
                        queue.put(msg); // 向队列之中添加数据
                        System.out.printf("【%s】%s%n", Thread.currentThread().getName(), msg);
                    } catch (InterruptedException e) {
                    }
                }
            }, "YOOTK生产者 - " + x).start();
        }
        for (int x = 0; x < 2; x++) {
            new Thread(() -> {
                while(true) {   // 持续性的通过队列抓取数据
                    try {
                        TimeUnit.SECONDS.sleep(2); // 生产慢
                        System.err.printf("【%s】%s", Thread.currentThread().getName(), queue.take());
                    } catch (InterruptedException e) {
                    }
                }
            }, "YOOTK消费者 - " + x).start();
        }
    }
}

BlockingDeque

1、
package com.yootk;

import java.util.concurrent.*;

public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        // 每一个队列在启用的时候必须明确的设置其保存的数据的个数
        BlockingDeque<String> queue = new LinkedBlockingDeque<>(); // 数组阻塞队列
        // 如果要想观察到队列的使用,最佳的做法就是生产者与消费者的处理模型
        for (int x = 0; x < 10; x++) {  // 创建循环
            final int temp = x;
            new Thread(() -> {
                for (int y = 0; y < 10; y++) { // 持续生产
                    try {
                        TimeUnit.SECONDS.sleep(2); // 生产慢
                        String msg = "{ID = MUYAN - " + temp + " - " + y + "}沐言科技:www.yootk.com";
                        queue.putFirst(msg); // 向队列之中添加数据
                        System.out.printf("【%s】%s%n", Thread.currentThread().getName(), msg);
                    } catch (InterruptedException e) {
                    }
                }
            }, "FIRST - YOOTK生产者 - " + x).start();
        }
        for (int x = 0; x < 10; x++) {  // 创建循环
            final int temp = x;
            new Thread(() -> {
                for (int y = 0; y < 10; y++) { // 持续生产
                    try {
                        TimeUnit.SECONDS.sleep(2); // 生产慢
                        String msg = "{ID = MUYAN - " + temp + " - " + y + "}沐言科技:www.yootk.com";
                        queue.putLast(msg); // 向队列之中添加数据
                        System.out.printf("【%s】%s%n", Thread.currentThread().getName(), msg);
                    } catch (InterruptedException e) {
                    }
                }
            }, "LAST - YOOTK生产者 - " + x).start();
        }
        for (int x = 0; x < 2; x++) {
            new Thread(() -> {
                int count = 0;
                while (true) {   // 持续性的通过队列抓取数据
                    if (count % 2 == 0) {
                        try {
                            TimeUnit.SECONDS.sleep(2); // 生产慢
                            System.err.printf("【%s】%s", Thread.currentThread().getName(), queue.takeFirst());
                        } catch (InterruptedException e) {
                        }
                    } else {
                        try {
                            TimeUnit.SECONDS.sleep(2); // 生产慢
                            System.err.printf("【%s】%s", Thread.currentThread().getName(), queue.takeLast());
                        } catch (InterruptedException e) {
                        }
                    }
                    count++;
                }
            }, "YOOTK消费者 - " + x).start();
        }
    }
}

RecursiveTask

1、

package com.yootk;

import java.util.concurrent.*;
class SumTask extends RecursiveTask<Integer> {  // 实现数据累加的计算
    private static final int THRESHOLD = 25; // 分支阈值
    private int start; // 开始计算数值
    private int end; // 结束计算数值
    public SumTask(int start, int end) { // 数据的累加配置
        this.start = start;
        this.end = end;
    }
    @Override
    protected Integer compute() { // 完成计算的处理
        // 所有的子分支的处理,以及所有相关分支的合并处理都在此方法之中完成
        int sum = 0; // 保存最终的计算结果
        boolean isFork = (end - start) <= THRESHOLD; // 是否需要进行分支
        if (isFork) {   // 计算子分支
            for (int i = start; i <= end; i ++) {
                sum += i; // 分支处理
            }
            System.out.printf("【%s】start = %d、end = %d、sum = %d%n",
                    Thread.currentThread().getName(), this.start, this.end, sum);
        } else {    // 需要开启分支
            int middel = (start + end) / 2;
            SumTask leftTask = new SumTask(this.start, middel);
            SumTask rightTask = new SumTask(middel + 1, this.end);
            leftTask.fork(); // 开启左分支
            rightTask.fork(); // 开启右分支
            sum = leftTask.join() + rightTask.join(); // 等待分支处理的执行结果返回
        }
        return sum;
    }
}
public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        SumTask task = new SumTask(1, 100); // 外部的计算操作
        ForkJoinPool pool = new ForkJoinPool(); // 开启分支任务池
        Future<Integer> future = pool.submit(task); // 执行分支任务
        System.out.println("分支任务计算结果:" + future.get()); // 异步返回
    }
}

RecursiveAction

1、
package com.yootk;

import java.util.Arrays;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class CountSave {   // 如果不想使用这个类使用原子的整型也是可以的
    private Lock lock = new ReentrantLock(); // 采用一个互斥锁
    private int sum = 0; // 保存累加结果
    public void add(int num) {
        this.lock.lock(); // 同步锁定
        try {
            this.sum += num; // 进行数据的累加
        } finally {
            this.lock.unlock(); // 解锁处理
        }
    }
    public int getSum() {
        return sum;
    }
}


class SumTask extends RecursiveAction {  // 实现数据累加的计算
    private static final int THRESHOLD = 25; // 分支阈值
    private int start; // 开始计算数值
    private int end; // 结束计算数值
    private CountSave save; // 结果的存储
    public SumTask(int start, int end, CountSave save) { // 数据的累加配置
        this.start = start;
        this.end = end;
        this.save = save; // 保存累加结果使用
    }
    @Override
    protected void compute() { // 完成计算的处理
        // 所有的子分支的处理,以及所有相关分支的合并处理都在此方法之中完成
        int sum = 0; // 保存最终的计算结果
        boolean isFork = (end - start) <= THRESHOLD; // 是否需要进行分支
        if (isFork) {   // 计算子分支
            for (int i = start; i <= end; i ++) {
                sum += i; // 分支处理
            }
            this.save.add(sum); // 保存累加结果
            System.out.printf("【%s】start = %d、end = %d、sum = %d%n",
                    Thread.currentThread().getName(), this.start, this.end, sum);
        } else {    // 需要开启分支
            int middel = (start + end) / 2;
            SumTask leftTask = new SumTask(this.start, middel, this.save);
            SumTask rightTask = new SumTask(middel + 1, this.end, this.save);
            leftTask.fork(); // 开启左分支
            rightTask.fork(); // 开启右分支
        }
    }
}
public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        CountSave save = new CountSave(); // 保存累加结果
        SumTask task = new SumTask(1, 100, save); // 外部的计算操作
        ForkJoinPool pool = new ForkJoinPool(); // 开启分支任务池
        pool.submit(task); // 执行分支任务
        while (!task.isDone()) { // 任务没有结束
            TimeUnit.MILLISECONDS.sleep(100); // 延迟一下
        }
        if (task.isCompletedNormally()) { // 任务执行完毕
            System.out.println("分支任务计算结果:" + save.getSum()); // 异步返回
        }
    }
}


CountedCompleter

1、
package com.yootk;

import java.util.Arrays;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


class SumTask extends CountedCompleter<AtomicInteger> {  // 实现数据累加的计算
    private static final int THRESHOLD = 25; // 分支阈值
    private int start; // 开始计算数值
    private int end; // 结束计算数值
    private AtomicInteger result; // 保存最终的存储结果
    public SumTask(int start, int end, AtomicInteger result) { // 数据的累加配置
        this.start = start;
        this.end = end;
        this.result = result; // 保存累加结果使用
    }
    @Override
    public void compute() { // 完成计算的处理
        // 所有的子分支的处理,以及所有相关分支的合并处理都在此方法之中完成
        int sum = 0; // 保存最终的计算结果
        boolean isFork = (end - start) <= THRESHOLD; // 是否需要进行分支
        if (isFork) {   // 计算子分支
            for (int i = start; i <= end; i ++) {
                sum += i; // 分支处理
            }
            this.result.addAndGet(sum); // 数据的累加
            // 在每一个分支执行完成之后,可以手工的进行回调操作的触发
            super.tryComplete(); // 钩子触发
        } else {    // 需要开启分支
            int middle = (start + end) / 2;
            SumTask leftTask = new SumTask(this.start, middle, this.result);
            SumTask rightTask = new SumTask(middle + 1, this.end, this.result);
            leftTask.fork(); // 开启左分支
            rightTask.fork(); // 开启右分支
        }
    }

    @Override
    public void onCompletion(CountedCompleter<?> caller) { // 钩子的触发
        System.out.printf("【%s】start = %d、end = %d%n",
                Thread.currentThread().getName(), this.start, this.end);
    }
}
public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        AtomicInteger result = new AtomicInteger(); // 保存最终的计算结果
        SumTask task = new SumTask(1, 100, result); // 外部的计算操作
        task.addToPendingCount(1); // 设置准备执行的任务量
        ForkJoinPool pool = new ForkJoinPool(); // 开启分支任务池
        pool.submit(task); // 执行分支任务
        while (task.getPendingCount() != 0) {   // 有任务未执行完毕
            TimeUnit.MILLISECONDS.sleep(100); // 延迟一下
            if (result.get() != 0) {    // 有了计算结果
                System.out.println("分支任务计算结果:" + result); // 异步返回
                break;
            }
        }
    }
}

ForkJoinPool

1、
package com.yootk;

import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class SumTask extends RecursiveTask<Integer> {  // 实现数据累加的计算
    private static final int THRESHOLD = 5; // 分支阈值
    private int start; // 开始计算数值
    private int end; // 结束计算数值
    private Lock lock = new ReentrantLock(); // 互斥锁
    public SumTask(int start, int end) { // 数据的累加配置
        this.start = start;
        this.end = end;
    }
    @Override
    protected Integer compute() { // 完成计算的处理
        // 所有的子分支的处理,以及所有相关分支的合并处理都在此方法之中完成
        int sum = 0; // 保存最终的计算结果
        boolean isFork = (end - start) <= THRESHOLD; // 是否需要进行分支
        if (isFork) {   // 计算子分支
            SumHandleManagedBlocker blocker = new SumHandleManagedBlocker(
                    this.start, this.end, this.lock);
            try {
                ForkJoinPool.managedBlock(blocker); // 加入阻塞管理
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return blocker.result; // 返回计算的结果
        } else {    // 需要开启分支
            int middel = (start + end) / 2;
            SumTask leftTask = new SumTask(this.start, middel);
            SumTask rightTask = new SumTask(middel + 1, this.end);
            leftTask.fork(); // 开启左分支
            rightTask.fork(); // 开启右分支
            sum = leftTask.join() + rightTask.join(); // 等待分支处理的执行结果返回
        }
        return sum;
    }
    static class SumHandleManagedBlocker implements ForkJoinPool.ManagedBlocker {// 自定义线程管理
        private Integer result;
        private int start;
        private int end;
        private Lock lock; // 获取一个互斥锁
        public SumHandleManagedBlocker(int start, int end, Lock lock) {
            this.start = start;
            this.end = end;
            this.lock = lock;
        }
        @Override
        public boolean block() throws InterruptedException { // 处理延迟任务
            int sum = 0;
            this.lock.lock();
            try {
                for (int x = start; x <= end; x ++) {   // 数学计算
                    TimeUnit.MILLISECONDS.sleep(100); // 延迟
                    sum += 1; // 执行数据的累加
                }
            } finally {
                this.result = sum; // 返回处理结果
                this.lock.unlock(); // 解锁
            }
            System.out.printf("【%s】处理数据累加业务,start = %d、end = %d、sum = %d%n",
                    Thread.currentThread().getName(), this.start, this.end, sum);
            return result != null; // 结束标记
        }

        @Override
        public boolean isReleasable() { // 补偿的判断,返回false会创建补偿线程
            return this.result != null; // 阻塞解除判断
        }

    }
}
public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        SumTask task = new SumTask(1, 100); // 外部的计算操作
        ForkJoinPool pool = new ForkJoinPool(2); // 开启分支任务池
        Future<Integer> future = pool.submit(task); // 执行分支任务
        System.out.println("分支任务计算结果:" + future.get()); // 异步返回
    }
}

Phaser

1、
package com.yootk;

import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        Phaser phaser = new Phaser(2); // 定义2个任务
        System.out.println("【Phaser阶段1】" + phaser.getPhase()); // 初始阶段
        for (int x = 0; x < 2; x++) {   // 循环创建线程
            TimeUnit.SECONDS.sleep(1);
            new Thread(()->{
                System.out.printf("【%s】我已就位,等待下一步的执行命令。%n",
                        Thread.currentThread().getName());
                phaser.arriveAndAwaitAdvance(); // Phaser就位,拥有线程等待
                System.out.printf("【%s】人员齐备,准备执行新的任务。%n",
                        Thread.currentThread().getName());
            }, "士兵 - " + x).start();
        }
        TimeUnit.SECONDS.sleep(3); // 等待一下再执行
        System.out.println("【Phaser阶段2】" + phaser.getPhase()); // 第二个阶段
    }
}

2、
package com.yootk;

import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        Phaser phaser = new Phaser(2); // 定义2个任务
        for (int x = 0; x < 2; x++) {   // 循环创建线程
            TimeUnit.SECONDS.sleep(2); // 延迟
            new Thread(()->{
                System.out.printf("【%s】达到已经上车。%n", Thread.currentThread().getName());
                phaser.arrive(); // 等价于countDown()方法
            }, "游客 - " + x).start();
        }
        phaser.awaitAdvance(phaser.getPhase()); // 等待达到阶段
        System.out.println("【主线程】人齐了,开车走人,下一个景点购物消费。");
    }
}

3、
package com.yootk;

import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        int repeat = 2; // 执行的轮数配置
        Phaser phaser = new Phaser() {  // 进行一些处理方法的覆写
            @Override
            protected boolean onAdvance(int phase, int registeredParties) { // 回调处理
                System.out.printf("【onAdvance()处理】进阶处理操作,phase = %s、registeredParties = %s%n",
                        phase, registeredParties);
                return phase + 1 >= repeat || registeredParties == 0; // 终止处理
            }
        };
        for (int x = 0; x < 2; x++) {   // 循环创建2个线程
            phaser.register(); // 注册参与者的线程
            new Thread(()->{ // 每一个线程都在持续的执行之中
                while (!phaser.isTerminated()) { // 现在没有终止Phaser执行
                    phaser.arriveAndAwaitAdvance(); // 等待其他的线程就位
                    try {
                        TimeUnit.SECONDS.sleep(1); // 增加操作延迟
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.printf("【%s】YOOTK业务处理。%n", Thread.currentThread().getName());
                }
            }, "子线程 - " + x).start();
        }
    }
}

4、
package com.yootk;

import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

class Tasker implements Runnable {// 定义任务的处理线程
    private Phaser phaser;

    public Tasker(Phaser phaser) {
        this.phaser = phaser;
        this.phaser.register(); // 将当前线程进行注册
    }

    @Override
    public void run() {
        while (!phaser.isTerminated()) { // 任务还在执行
            this.phaser.arriveAndAwaitAdvance(); // 等待其他参与者的线程
            System.out.printf("【%s】Yootk业务处理。%n", Thread.currentThread().getName());
        }
    }
}

public class YootkDemo { // 李兴华高薪就业编程训练营
    public static final int THRESHOLD = 2; // 定义每一个Phaser对应的任务数量

    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        int repeat = 2; // 定义任务的重复的周期
        Phaser phaser = new Phaser() {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.printf("【onAdvance()处理】进阶处理操作,phase = %s、registeredParties = %s%n",
                        phase, registeredParties);
                return phase + 1 >= repeat || registeredParties == 0; // 终止处理
            }
        };
        Tasker[] taskers = new Tasker[5]; // 创建5个子任务
        build(taskers, 0, taskers.length, phaser); // 创建任务层级
        for (int x = 0; x < taskers.length; x++) { // 启动线程
            new Thread(taskers[x], "子线程 - " + x).start();
        }
    }

    private static void build(Tasker[] taskers, int low, int high, Phaser parent) {
        if ((high - low) > THRESHOLD) { // 层级的判断
            for (int x = low; x < high; x += THRESHOLD) {   // 每次进行2个任务的配置
                int limit = Math.min(x + THRESHOLD, high); // 获取最小值
                build(taskers, x, limit, new Phaser(parent)); // 定义层级
            }
        } else {
            for (int x = low; x < high; ++x) {  // 循环创建任务
                taskers[x] = new Tasker(parent); // 定义任务层级
            }
        }
    }
}

CountDownLatch

1、
package com.yootk;

public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        int a = 10; // 定义整型变量
        int b = 20; // 定义整型变量
        System.out.printf("数学加法计算:" + (a + b));
    }
}

2、
package com.yootk;

import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class YootkDemo { // 李兴华高薪就业编程训练营
    public static int a = 10;// 定义整型变量
    public static int b = 20; // 定义整型变量
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        CountDownLatch latch = new CountDownLatch(1); // 倒计数的内容为1
        System.out.printf("【第一次计算】数学加法计算:%d%n", (a + b));
        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(1); // 延迟处理
                a = 50; // 修改b的内容
                latch.countDown(); // 减1的操作
            } catch (InterruptedException e) {}
        }, "数据修改子线程").start();
        latch.await(); // 等待计数为0
        System.out.printf("【第二次计算】数学加法计算:%d%n", (a + b));
    }
}

SubmissionPublisher

1、
package com.yootk;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;

public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        List<String> datas = List.of("www.yootk.com", "edu.yootk.com"); // 操作数据
        SubmissionPublisher publisher = new SubmissionPublisher<>(); // 数据发布
        // 此时进行数据的消费处理,同时返回有一个异步的处理任务
        CompletableFuture<Void> task = publisher.consume(System.out::println); // 直接传入消费型的函数引用
        // 以上只是定义了消息生产者和消费者之间的基本关联模型,随后进行具体的数据处理
        datas.forEach((tmp) -> { // 集合的迭代操作
            publisher.submit(tmp); // 发布数据
            try {
                TimeUnit.SECONDS.sleep(1); // 每次延迟1秒的时间
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        publisher.close(); // 数据发送完毕
        if (task != null) {
            task.get(); // 放行
        }
    }
}

Flow

1、
package com.yootk;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;

class Book {    // 自定义程序类
    private long id;
    private String title;
    private String content;
    // Setter、Getter、无参构造、toString()等方法就直接略了
    public Book(long id, String title, String content) {
        this.id = id;
        this.title = title;
        this.content = content;
    }
    // 因为这种响应式编程操作一般都需要采用数据流的形式进行处理,所以为了简化定义一个List创建器
    static class BookDataCreator {  // 创造者模式
        public static List<Book> getBooks() {   // 获取数据项
            List<Book> bookList = new ArrayList<>(); // 实例化List集合
            bookList.add(new Book(1, "Java面向对象编程", "yootk.com"));
            bookList.add(new Book(2, "Java就业编程实战", "yootk.com"));
            bookList.add(new Book(3, "JavaWeb就业编程实战", "yootk.com"));
            bookList.add(new Book(4, "SpringBoot就业编程实战", "yootk.com"));
            bookList.add(new Book(5, "SpringCloud就业编程实战", "yootk.com"));
            bookList.add(new Book(6, "Redis就业编程实战", "yootk.com"));
            bookList.add(new Book(7, "Spring就业编程实战", "yootk.com"));
            bookList.add(new Book(8, "SSM就业编程实战", "yootk.com"));
            bookList.add(new Book(9, "Netty就业编程实战", "yootk.com"));
            return bookList;
        }
    }

    public long getId() {
        return id;
    }

    public void setId(long id) {
        this.id = id;
    }

    public String getTitle() {
        return title;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
}
class BookSubscriber implements Flow.Subscriber<Book> { // 图书数据的订阅者
    private Flow.Subscription subscription; // 整个的订阅控制
    private int counter = 0; // 计数器
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription; // 保存订阅控制操作
        // 由于不确定后面会同时返回多少个数据,此时的目的是触发数据的接收操作
        this.subscription.request(1); // 从发布者之中获取1个数据项
        System.out.println("【BookSubscriber】数据订阅者开启...");
    }

    @Override
    public void onNext(Book item) { // 数据接收
        System.out.printf("【BookSubscriber】图书ID = %s、图书名称 = %s、图书的内容 = %s%n",
                item.getId(), item.getTitle(), item.getContent());
        this.counter ++; // 计数累加
        this.subscription.request(1); // 再次接收,继续触发onNext()
    }

    @Override
    public void onError(Throwable throwable) { // 订阅出错的时候处理
        System.err.println("【BookSubscriber】" + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.printf("【BookSubscriber】订阅者数据处理完成,一共处理的数据量为:%s%n", this.counter);
    }

    public int getCounter() {
        return counter;
    }
}
public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        SubmissionPublisher<Book> publisher = new SubmissionPublisher<>(); // 数据发布
        BookSubscriber subscriber = new BookSubscriber(); // 数据订阅者
        publisher.subscribe(subscriber); // 发布者与订阅者建立连接
        // 实际的集合可能来自于数据库,或者是来自于其他的文本数据(是一些固定结构的,例如:XML、JSON)
        List<Book> books = Book.BookDataCreator.getBooks(); // 获取创建的集合
        books.stream().forEach(book -> publisher.submit(book)); // 数据的发布
        // 应该实现数据的等待处理,所以增加一个停滞的判断
        while (books.size() != subscriber.getCounter()) {   // 数据量没有消费完成
            TimeUnit.SECONDS.sleep(1); // 象征性的延迟
        }
        publisher.close(); // 关闭发送者
    }
}

demo

1、
package com.yootk;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

class Message { // 目标的转换类型
    private String content;
    private String author;
    public Message(String content, String author) {
        this.content = content;
        this.author = author;
    }
    public String getContent() {
        return content;
    }
    public void setContent(String content) {
        this.content = content;
    }
    public String getAuthor() {
        return author;
    }
    public void setAuthor(String author) {
        this.author = author;
    }
}
class MessageSubscriber implements Flow.Subscriber<Message> {
    private Flow.Subscription subscription;
    private int counter = 0;
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        // 如果此时你需要接收的数量很多,例如设置为了5,如果不足5个数据流也接收
        this.subscription.request(1);
    }
    @Override
    public void onNext(Message item) {
        System.out.printf("【MessageSubscriber】消息内容 = %s、消息作者 = %s%n",
                item.getContent(), item.getAuthor());
        this.counter ++;
        this.subscription.request(1); // 下次的执行触发
    }
    @Override
    public void onError(Throwable throwable) {
        System.err.println("【MessageSubscriber】消息订阅者出现错误:" + throwable.getMessage());
    }
    @Override
    public void onComplete() {
        System.out.println("【MessageSubscriber】消息订阅处理完成,处理的数据量为:" + this.counter);
    }
    public int getCounter() {
        return counter;
    }
}
// 此时的转换器处理类拥有接收和发布的支持能力,那么建议多继承一个父类
class MessageProcessor extends SubmissionPublisher<Message> implements Flow.Processor<Book, Message> {
    private Flow.Subscription subscription;
    private Function<Book, Message> function; // 实现转换功能定义
    public MessageProcessor(Function<Book, Message> function) {
        this.function = function;
    }
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1);
    }
    @Override
    public void onNext(Book item) {
        super.submit(this.function.apply(item)); // 将转换后的数据再次发布
        this.subscription.request(1); // 重新抓取数据
    }
    @Override
    public void onError(Throwable throwable) {
        throwable.printStackTrace(); // 偷懒
    }
    @Override
    public void onComplete() {
    }
}
class Book {    // 自定义程序类
    private long id;
    private String title;
    private String content;
    // Setter、Getter、无参构造、toString()等方法就直接略了
    public Book(long id, String title, String content) {
        this.id = id;
        this.title = title;
        this.content = content;
    }
    // 因为这种响应式编程操作一般都需要采用数据流的形式进行处理,所以为了简化定义一个List创建器
    static class BookDataCreator {  // 创造者模式
        public static List<Book> getBooks() {   // 获取数据项
            List<Book> bookList = new ArrayList<>(); // 实例化List集合
            bookList.add(new Book(1, "Java面向对象编程", "yootk.com"));
            bookList.add(new Book(2, "Java就业编程实战", "yootk.com"));
            bookList.add(new Book(3, "JavaWeb就业编程实战", "yootk.com"));
            bookList.add(new Book(4, "SpringBoot就业编程实战", "yootk.com"));
            bookList.add(new Book(5, "SpringCloud就业编程实战", "yootk.com"));
            bookList.add(new Book(6, "Redis就业编程实战", "yootk.com"));
            bookList.add(new Book(7, "Spring就业编程实战", "yootk.com"));
            bookList.add(new Book(8, "SSM就业编程实战", "yootk.com"));
            bookList.add(new Book(9, "Netty就业编程实战", "yootk.com"));
            return bookList;
        }
    }

    public long getId() {
        return id;
    }

    public void setId(long id) {
        this.id = id;
    }

    public String getTitle() {
        return title;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
}
public class YootkDemo { // 李兴华高薪就业编程训练营
    public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
        MessageProcessor processor = new MessageProcessor((item) -> {
            String book = String.format("图书编号:%s、图书名称:%s、图书内容:%s",
                    item.getId(), item.getTitle(), item.getContent());
            return new Message(book, "李兴华");
        });
        MessageSubscriber messageSubscriber = new MessageSubscriber(); // 消息订阅者
        // BookSubscriber bookSubscriber = new BookSubscriber(); // 图书订阅者
        SubmissionPublisher<Book> publisher = new SubmissionPublisher<>(); // 数据发布
        publisher.subscribe(processor); // 发布者与订阅者建立连接
        processor.subscribe(messageSubscriber);
        // 实际的集合可能来自于数据库,或者是来自于其他的文本数据(是一些固定结构的,例如:XML、JSON)
        List<Book> books = Book.BookDataCreator.getBooks(); // 获取创建的集合
        books.stream().forEach(book -> publisher.submit(book)); // 数据的发布
        // 应该实现数据的等待处理,所以增加一个停滞的判断
        while (books.size() != messageSubscriber.getCounter()) {   // 数据量没有消费完成
            TimeUnit.SECONDS.sleep(1); // 象征性的延迟
        }
        publisher.close(); // 关闭发送者
    }
}

上次编辑于: