通用类解读
大约 34 分钟
生成随机数
1、
package com.yootk;
import java.util.Random;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
Random random = new Random(); // 定义一个随机数的处理类
for (int x = 0; x < 3; x++) {
new Thread(()->{
System.out.printf("【%s】生成随机数:%d%n", Thread.currentThread().getName(), random.nextInt(100));
},"随机数生成线程 - " + x).start();
}
}
}
2、
package com.yootk;
import java.util.Random;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
Random random = new Random(); // 定义一个随机数的处理类
for (int x = 0; x < 3; x++) {
new Thread(() -> {
for (int y = 0; y < 3; y++) {
System.out.printf("【%s】〖%d〗生成随机数:%d%n", Thread.currentThread().getName(), y, random.nextInt(100));
}
}, "随机数生成线程 - " + x).start();
}
}
}
3、
package com.yootk;
import java.util.concurrent.ThreadLocalRandom;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
for (int x = 0; x < 3; x++) {
new Thread(() -> {
for (int y = 0; y < 3; y++) {
System.out.printf("【%s】〖%d〗生成随机数:%d%n", Thread.currentThread().getName(), y,
ThreadLocalRandom.current().nextInt(100));
}
}, "随机数生成线程 - " + x).start();
}
}
}
互斥锁
1、
package com.yootk;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class Ticket { // 实现抢票的处理机制
private int count = 3; // 总票数
private Lock lock = new ReentrantLock(); // 定义互斥锁
public void sale() { // 售票处理
this.lock.lock(); // 对当前的线程锁定
try {
if (this.count > 0) { // 【业务处理1】判断今年是否有剩余的票数
// 在进行操作延迟的过程之中,独占锁会始终锁定当前的资源
TimeUnit.SECONDS.sleep(1); // 模拟一下操作的延迟
System.out.printf("【%s】卖票,剩余票数:%d%n", Thread.currentThread().getName(), this.count--);
} else {
System.out.printf("【%s】票已经卖完了,明天请早!%n", Thread.currentThread().getName());
}
} catch (Exception e) {}
finally {
this.lock.unlock(); // 进行解锁配置
}
}
}
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
Ticket ticket = new Ticket(); // 共享一个售票应用
for (int x = 0; x < 5; x++) {
new Thread(()->{
ticket.sale();
}, "售票员 - " + x).start();
}
}
}
ReentrantReadWriteLock
1、
package com.yootk;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class Account { // 模拟银行账户
private String name; // 账户的名称
// 包括X东、X宝、X刀刀,都不会使用double描述金额,因为会有一个浮点的进位的bug,出现金额漏洞
private int asset; // 账户的资金,资金使用整型
// 因为读不需要考虑到线程同步处理,但是写需要考虑到线程同步处理
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); // 读写锁
public Account(String name, int asset) {
this.name = name;
this.asset = asset;
}
public void save(int asset) { // 向银行账户之中进行存款操作
this.readWriteLock.writeLock().lock(); // 写入锁定
try {
this.asset += asset; // 存款处理
TimeUnit.SECONDS.sleep(1); // 模拟业务的处理延迟
System.err.printf("【%s】修改银行资产,当前的资产为:%10.2f%n", Thread.currentThread().getName(), this.asset / 100.0);
} catch (Exception e) {
} finally {
this.readWriteLock.writeLock().unlock(); // 解除锁定
}
}
@Override
public String toString() {
this.readWriteLock.readLock().lock(); // 读锁属于共享锁
try {
TimeUnit.MILLISECONDS.sleep(200); // 模拟业务延迟
return String.format("〖%s〗账户名称:%s、账户余额:%10.2f %n", Thread.currentThread().getName(), this.name, this.asset / 100.0);
} catch (Exception e) {
return null;
} finally {
this.readWriteLock.readLock().unlock(); // 解除锁定
}
}
}
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
Account account = new Account("李兴华", 0); // 定义银行账户信息
int money[] = new int[]{110, 230, 10_000}; // 定义要存款的金额
for (int x = 0; x < 10; x++) { // 通过循环模拟操作线程
if (x % 2 == 0) { // 存款线程
new Thread(() -> {
for (int y = 0; y < money.length; y++) {
account.save(money[y]); // 存款操作
}
}, "存款线程 - " + x).start();
} else { // 取款线程
new Thread(() -> {
while (true) {
System.out.print(account);
}
}, "取款线程 - " + x).start();
}
}
}
}
标记戳锁
1、
package com.yootk;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class Account { // 模拟银行账户
private String name; // 账户的名称
private int asset; // 账户的资金,资金使用整型
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); // 读写锁
public Account(String name, int asset) {
this.name = name;
this.asset = asset;
}
public void save(int asset) { // 向银行账户之中进行存款操作
this.readWriteLock.writeLock().lock(); // 写入锁定
try {
this.asset += asset; // 存款处理
TimeUnit.SECONDS.sleep(1); // 模拟业务的处理延迟
System.err.printf("【%s】修改银行资产,当前的资产为:%10.2f%n", Thread.currentThread().getName(), this.asset / 100.0);
} catch (Exception e) {
} finally {
this.readWriteLock.writeLock().unlock(); // 解除锁定
}
}
@Override
public String toString() {
this.readWriteLock.readLock().lock(); // 读锁属于共享锁
try {
TimeUnit.MILLISECONDS.sleep(200); // 模拟业务延迟
return String.format("〖%s〗账户名称:%s、账户余额:%10.2f %n", Thread.currentThread().getName(), this.name, this.asset / 100.0);
} catch (Exception e) {
return null;
} finally {
this.readWriteLock.readLock().unlock(); // 解除锁定
}
}
}
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
Account account = new Account("李兴华", 0); // 定义银行账户信息
int money[] = new int[]{110, 230, 10_000}; // 定义要存款的金额
for (int x = 0; x < 10; x++) { // 10个读锁
new Thread(() -> {
while (true) {
System.out.print(account);
}
}, "取款线程 - " + x).start();
}
for (int x = 0; x < 2; x++) { // 2个写锁
new Thread(() -> {
for (int y = 0; y < money.length; y++) {
account.save(money[y]); // 存款操作
}
}, "存款线程 - " + x).start();
}
}
}
2、
package com.yootk;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.StampedLock;
class Account { // 模拟银行账户
private String name; // 账户的名称
private int asset; // 账户的资金,资金使用整型
private StampedLock stampedLock = new StampedLock(); // 标记戳锁
public Account(String name, int asset) {
this.name = name;
this.asset = asset;
}
public void save(int asset) { // 向银行账户之中进行存款操作
long stamp = this.stampedLock.writeLock(); // 获取写锁,这个时间戳用于解锁
try {
this.asset += asset; // 存款处理
TimeUnit.SECONDS.sleep(2); // 模拟业务的处理延迟
System.err.printf("【%s】修改银行资产,当前的资产为:%10.2f%n", Thread.currentThread().getName(), this.asset / 100.0);
} catch (Exception e) {
} finally {
this.stampedLock.unlockWrite(stamp); // 解除写锁
}
}
@Override
public String toString() {
long stamp = this.stampedLock.readLock(); // 获取读锁
try {
TimeUnit.MILLISECONDS.sleep(200); // 模拟业务延迟
return String.format("〖%s〗账户名称:%s、账户余额:%10.2f %n", Thread.currentThread().getName(), this.name, this.asset / 100.0);
} catch (Exception e) {
return null;
} finally {
this.stampedLock.unlockRead(stamp); // 释放读锁
}
}
}
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
Account account = new Account("李兴华", 0); // 定义银行账户信息
int money[] = new int[]{110, 230, 10_000}; // 定义要存款的金额
for (int x = 0; x < 100; x++) { // 10个读锁
new Thread(() -> {
while (true) {
System.out.print(account);
}
}, "取款线程 - " + x).start();
}
for (int x = 0; x < 2; x++) { // 2个写锁
new Thread(() -> {
for (int y = 0; y < money.length; y++) {
account.save(money[y]); // 存款操作
}
}, "存款线程 - " + x).start();
}
}
}
Condition
1、
package com.yootk;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static String msg = null; // 保存信息
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
Lock lock = new ReentrantLock(); // 获取一个互斥锁
Condition condition = lock.newCondition(); // 获取Condition接口实例
try {
lock.lock(); // 锁定主线程
new Thread(() -> { // 创建子线程,修改msg的属性内容
lock.lock(); // 锁定子线程
try {
System.out.printf("【%s】准备进行数据的处理操作%n", Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(2); // 模拟数据修改的操作延迟
msg = "沐言科技课程下载:www.yootk.com/resources"; // 修改数据内容
condition.signal(); // 唤醒第一个等待的线程
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "子线程").start();
condition.await(); // 主线程等待
System.out.println("【主线程】得到最终处理的结果:" + msg);
} catch (Exception e) {
lock.unlock();
}
}
}
LockSupport
1、
package com.yootk;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static String msg = null; // 保存信息
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
Thread mainThread = Thread.currentThread(); // 获取当前的主线程
new Thread(() -> { // 创建子线程,修改msg的属性内容
try {
System.out.printf("【%s】准备进行数据的处理操作%n", Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(2); // 模拟数据修改的操作延迟
msg = "沐言科技课程下载:www.yootk.com/resources"; // 修改数据内容
} catch (Exception e) {
e.printStackTrace();
} finally {
LockSupport.unpark(mainThread); // 恢复主线程
}
}, "子线程").start();
LockSupport.park(mainThread); // 挂起主线程
System.out.println("【主线程】得到最终处理的结果:" + msg);
}
}
Semaphore
1、
package com.yootk;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
Semaphore semaphore = new Semaphore(2); // 此时只有两个资源
for (int x = 0; x < 5; x++) { // 给出5个线程资源
new Thread(()->{ // 此时5个用户需要抢占2个资源
try {
semaphore.acquire(); // 尝试获取到一个资源
if (semaphore.availablePermits() >= 0) { // 当前有了空余资源
System.out.printf("【业务办理 - 开始】当前办理业务人员的信息为:%s %n", Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(2); // 模拟业务办理的时间
System.out.printf("〖业务办理 - 结束〗当前办理业务人员的信息为:%s %n", Thread.currentThread().getName());
semaphore.release(); // 释放资源
}
} catch (Exception e) {}
}, "用户 - " + x).start();
}
}
}
CountDownLatch
1、
package com.yootk;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
CountDownLatch latch = new CountDownLatch(3); // 总共的等待的数量为3
for (int x = 0; x < 3; x++) {
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("【%s】到达,并且已上车%n", Thread.currentThread().getName());
latch.countDown(); // 减1
},"游客 - " + x).start();
}
latch.await(); // 等待计数为0后再解除阻塞
System.out.println("【主线程】所有的旅客都到齐了,开车走人,去下一个景点购物消费。");
}
}
CyclicBarrier
1、
package com.yootk;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
CyclicBarrier barrier = new CyclicBarrier(2); // 现在设置的栅栏的数量为2
for (int x = 0; x < 4; x++) { // 创建4个线程
new Thread(() -> {
try {
System.out.printf("【Barrier - 等待开始】当前的线程名称:%s%n", Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(2); // 模拟业务延迟
barrier.await(); // 等待,凑够了2个等待的线程
System.err.printf("〖Barrier - 业务处理完毕〗当前的线程名称:%s%n", Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
}
}, "执行者 - " + x).start();
}
}
}
2、
package com.yootk;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
CyclicBarrier barrier = new CyclicBarrier(2); // 现在设置的栅栏的数量为2
for (int x = 0; x < 3; x++) { // 创建3个线程
final int temp = x; // 留给内部类使用
new Thread(() -> {
try {
System.out.printf("【Barrier - 等待开始】当前的线程名称:%s%n", Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(2); // 模拟业务延迟
if (temp == 2) { // 设置一个判断条件
barrier.reset(); // 重置
} else {
barrier.await(); // 等待,凑够了2个等待的线程
}
System.err.printf("〖Barrier - 业务处理完毕〗当前的线程名称:%s%n", Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
}
}, "执行者 - " + x).start();
}
}
}
3、
package com.yootk;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
CyclicBarrier barrier = new CyclicBarrier(2, ()->{
System.out.println("【屏障业务处理】两个子线程已就绪,可以开始执行屏障线程控制。");
}); // 现在设置的栅栏的数量为2
for (int x = 0; x < 4; x++) { // 创建3个线程
final int temp = x; // 留给内部类使用
new Thread(() -> {
try {
System.out.printf("【Barrier - 等待开始】当前的线程名称:%s%n", Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(2); // 模拟业务延迟
barrier.await(); // 等待,凑够了2个等待的线程
System.err.printf("〖Barrier - 业务处理完毕〗当前的线程名称:%s%n", Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
}
}, "执行者 - " + x).start();
}
}
}
Exchanger
1、
package com.yootk;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
int repeat = 20; // 定义生产和消费的次数
Exchanger<String> exchanger = new Exchanger<>();
new Thread(()->{
for (int x = 0; x < repeat; x++) { // 生产的次数
String info = null; // 保存最终的数据
if (x % 2 == 0) { // 奇偶数判断
info = "李兴华高薪就业编程训练营:edu.yootk.com";
} else {
info = "沐言科技:www.yootk.com";
}
try {
TimeUnit.SECONDS.sleep(1); // 模拟延迟
exchanger.exchange(info); // 数据存储
System.out.printf("【%s】%s%n", Thread.currentThread().getName(), info);
} catch (InterruptedException e) {}
}
}, "生产者").start();
new Thread(()->{
for (int x = 0; x < repeat; x++) { // 消费的次数
try {
TimeUnit.SECONDS.sleep(2); // 模拟延迟
String info = exchanger.exchange(null); // 数据获取
System.err.printf("〖%s〗%s%n", Thread.currentThread().getName(), info);
} catch (InterruptedException e) {}
}
}, "消费者").start();
}
}
CompletableFuture
1、
package com.yootk;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
CompletableFuture<String> future = new CompletableFuture<>();
// 下面利用多线程的概念模拟一个炮兵打跑的形式,因为所有的操作都需要有统一的口令
for (int x = 0; x < 2; x++) {
new Thread(()->{
System.out.printf("【START】%s,炮兵就位,等待开炮命令发出。%n", Thread.currentThread().getName());
try { // get()方法未收到数据则持续阻塞,收到数据则解除阻塞
System.err.printf("〖END〗%s,解除阻塞,收到执行命令:%s%n", Thread.currentThread().getName(), future.get());
} catch (Exception e) {
e.printStackTrace();
}
}, "炮兵 - " + x).start();
}
TimeUnit.SECONDS.sleep(2); // 准备的模拟延迟
future.complete("开炮");
}
}
2、
package com.yootk;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
System.err.println("【*** 异步线程 ***】突然接到了上将军的紧急联系电话。");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
} // 当此异步线程执行完成之后,才会让所有的子线程恢复执行
System.err.println("【*** 异步线程 ***】接到了新的作战任务,更改炮兵的作战目标。");
}); // 异步线程不用考虑返回值了
// 下面利用多线程的概念模拟一个炮兵打跑的形式,因为所有的操作都需要有统一的口令
for (int x = 0; x < 2; x++) {
new Thread(()->{
System.out.printf("【START】%s,炮兵就位,等待开炮命令发出。%n", Thread.currentThread().getName());
try { // get()方法未收到数据则持续阻塞,收到数据则解除阻塞
future.get(); // 异步的操作是没有返回结果的
System.err.printf("〖END〗%s,收到开火命令,万炮齐鸣。%n", Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
}
}, "炮兵 - " + x).start();
}
System.out.println("【★主线程★】所有的炮兵线程进入到了就绪状态,等待后续命令的发送。");
}
}
并发合集
1、
package com.yootk;
import java.util.ArrayList;
import java.util.List;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
// ArrayList是一个基于数组的形式实现的集合存储,那么既然属于数组的形式,最终就需要考虑到索引的操作问题
List<String> all = new ArrayList<>(); // 创建一个List集合
for (int num = 0; num < 10; num++) { // 创建10个线程
new Thread(()->{
for (int x = 0; x < 10; x++) { // 进行数据的循环更新操作
all.add("【"+Thread.currentThread().getName()+"】www.yootk.com"); // 添加集合数据
System.out.println(all); // 输出全部的数据内容
}
}, "集合操作线程 - " + num).start(); // 线程启动
}
}
}
2、
package com.yootk;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
// ArrayList是一个基于数组的形式实现的集合存储,那么既然属于数组的形式,最终就需要考虑到索引的操作问题
List<String> originAll = new ArrayList<>(); // 创建一个List集合
List<String> all = Collections.synchronizedList(originAll); // 集合转换
for (int num = 0; num < 10; num++) { // 创建10个线程
new Thread(()->{
for (int x = 0; x < 10; x++) { // 进行数据的循环更新操作
all.add("【"+Thread.currentThread().getName()+"】www.yootk.com"); // 添加集合数据
System.out.println(all); // 输出全部的数据内容
}
}, "集合操作线程 - " + num).start(); // 线程启动
}
}
}
并发单集
1、
package com.yootk;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
List<String> all = new CopyOnWriteArrayList<>(); // 创建一个List集合
for (int num = 0; num < 10; num++) { // 创建10个线程
new Thread(()->{
for (int x = 0; x < 10; x++) { // 进行数据的循环更新操作
all.add("【"+Thread.currentThread().getName()+"】www.yootk.com"); // 添加集合数据
System.out.println(all); // 输出全部的数据内容
}
}, "集合操作线程 - " + num).start(); // 线程启动
}
}
}
2、
package com.yootk;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
Set<String> all = new CopyOnWriteArraySet<>(); // 创建一个Set集合
for (int num = 0; num < 10; num++) { // 创建10个线程
new Thread(()->{
for (int x = 0; x < 10; x++) { // 进行数据的循环更新操作
all.add("【"+Thread.currentThread().getName()+"】www.yootk.com"); // 添加集合数据
System.out.println(all); // 输出全部的数据内容
}
}, "集合操作线程 - " + num).start(); // 线程启动
}
}
}
ConcurrentHashMap
1、
package com.yootk;
import java.util.HashMap;
import java.util.Map;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
Map<String, String> map = new HashMap<>(); // 创建HashMap集合
for (int num = 0; num < 10; num++) { // 创建10个线程
new Thread(()->{
for (int x = 0; x < 10; x++) { // 进行数据的循环更新操作
map.put("【"+Thread.currentThread().getName()+"】x = " + x, "www.yootk.com");
System.out.println(map); // 输出全部的数据内容
}
}, "集合操作线程 - " + num).start(); // 线程启动
}
}
}
2、
package com.yootk;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
Map<String, String> map = new ConcurrentHashMap<>(); // 创建HashMap集合
for (int num = 0; num < 10; num++) { // 创建10个线程
new Thread(()->{
for (int x = 0; x < 10; x++) { // 进行数据的循环更新操作
map.put("【"+Thread.currentThread().getName()+"】x = " + x, "www.yootk.com");
System.out.println(map); // 输出全部的数据内容
}
}, "集合操作线程 - " + num).start(); // 线程启动
}
}
}
ConcurrentSkipListSet
1、
package com.yootk;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
Set<String> all = new ConcurrentSkipListSet<>(); // 创建HashMap集合
for (int num = 0; num < 10; num++) { // 创建10个线程
new Thread(()->{
for (int x = 0; x < 10; x++) { // 进行数据的循环更新操作
all.add("【"+Thread.currentThread().getName()+"】www.yootk.com");
}
}, "集合操作线程 - " + num).start(); // 线程启动
}
}
}
2、
package com.yootk;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
Set<String> all = new ConcurrentSkipListSet<>(); // 创建HashMap集合
for (int num = 0; num < 10; num++) { // 创建10个线程
new Thread(()->{
for (int x = 0; x < 10; x++) { // 进行数据的循环更新操作
all.add("【"+Thread.currentThread().getName()+"】www.yootk.com");
System.out.println(all.contains("【"+Thread.currentThread().getName()+"】www.yootk.com"));
}
}, "集合操作线程 - " + num).start(); // 线程启动
}
}
}
BlockingQueue
1、
package com.yootk;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
// 每一个队列在启用的时候必须明确的设置其保存的数据的个数
BlockingQueue<String> queue = new ArrayBlockingQueue<>(5); // 数组阻塞队列
// 如果要想观察到队列的使用,最佳的做法就是生产者与消费者的处理模型
for (int x = 0; x < 10; x++) { // 创建循环
final int temp = x;
new Thread(() -> {
for (int y = 0; y < 100; y++) { // 持续生产
try {
TimeUnit.SECONDS.sleep(2); // 生产慢
String msg = "{ID = MUYAN - " + temp + " - " + y + "}沐言科技:www.yootk.com";
queue.put(msg); // 向队列之中添加数据
System.out.printf("【%s】%s%n", Thread.currentThread().getName(), msg);
} catch (InterruptedException e) {
}
}
}, "YOOTK生产者 - " + x).start();
}
for (int x = 0; x < 2; x++) {
new Thread(() -> {
while(true) { // 持续性的通过队列抓取数据
try {
TimeUnit.SECONDS.sleep(2); // 生产慢
System.err.printf("【%s】%s", Thread.currentThread().getName(), queue.take());
} catch (InterruptedException e) {
}
}
}, "YOOTK消费者 - " + x).start();
}
}
}
2、
package com.yootk;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
// 每一个队列在启用的时候必须明确的设置其保存的数据的个数
BlockingQueue<String> queue = new LinkedBlockingQueue<>(5); // 数组阻塞队列
// 如果要想观察到队列的使用,最佳的做法就是生产者与消费者的处理模型
for (int x = 0; x < 10; x++) { // 创建循环
final int temp = x;
new Thread(() -> {
for (int y = 0; y < 100; y++) { // 持续生产
try {
TimeUnit.SECONDS.sleep(2); // 生产慢
String msg = "{ID = MUYAN - " + temp + " - " + y + "}沐言科技:www.yootk.com";
queue.put(msg); // 向队列之中添加数据
System.out.printf("【%s】%s%n", Thread.currentThread().getName(), msg);
} catch (InterruptedException e) {
}
}
}, "YOOTK生产者 - " + x).start();
}
for (int x = 0; x < 2; x++) {
new Thread(() -> {
while(true) { // 持续性的通过队列抓取数据
try {
TimeUnit.SECONDS.sleep(2); // 生产慢
System.err.printf("【%s】%s", Thread.currentThread().getName(), queue.take());
} catch (InterruptedException e) {
}
}
}, "YOOTK消费者 - " + x).start();
}
}
}
3、
package com.yootk;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
// 每一个队列在启用的时候必须明确的设置其保存的数据的个数
BlockingQueue<String> queue = new PriorityBlockingQueue<>(5); // 数组阻塞队列
// 如果要想观察到队列的使用,最佳的做法就是生产者与消费者的处理模型
for (int x = 0; x < 10; x++) { // 创建循环
final int temp = x;
new Thread(() -> {
for (int y = 0; y < 100; y++) { // 持续生产
try {
TimeUnit.SECONDS.sleep(2); // 生产慢
String msg = "{ID = MUYAN - " + temp + " - " + y + "}沐言科技:www.yootk.com";
queue.put(msg); // 向队列之中添加数据
System.out.printf("【%s】%s%n", Thread.currentThread().getName(), msg);
} catch (InterruptedException e) {
}
}
}, "YOOTK生产者 - " + x).start();
}
for (int x = 0; x < 2; x++) {
new Thread(() -> {
while(true) { // 持续性的通过队列抓取数据
try {
TimeUnit.SECONDS.sleep(2); // 生产慢
System.err.printf("【%s】%s", Thread.currentThread().getName(), queue.take());
} catch (InterruptedException e) {
}
}
}, "YOOTK消费者 - " + x).start();
}
}
}
4、
package com.yootk;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
// 每一个队列在启用的时候必须明确的设置其保存的数据的个数
BlockingQueue<String> queue = new SynchronousQueue<>(); // 数组阻塞队列
// 如果要想观察到队列的使用,最佳的做法就是生产者与消费者的处理模型
for (int x = 0; x < 10; x++) { // 创建循环
final int temp = x;
new Thread(() -> {
for (int y = 0; y < 10; y++) { // 持续生产
try {
TimeUnit.SECONDS.sleep(2); // 生产慢
String msg = "{ID = MUYAN - " + temp + " - " + y + "}沐言科技:www.yootk.com";
queue.put(msg); // 向队列之中添加数据
System.out.printf("【%s】%s%n", Thread.currentThread().getName(), msg);
} catch (InterruptedException e) {
}
}
}, "YOOTK生产者 - " + x).start();
}
for (int x = 0; x < 2; x++) {
new Thread(() -> {
while(true) { // 持续性的通过队列抓取数据
try {
TimeUnit.SECONDS.sleep(2); // 生产慢
System.err.printf("【%s】%s", Thread.currentThread().getName(), queue.take());
} catch (InterruptedException e) {
}
}
}, "YOOTK消费者 - " + x).start();
}
}
}
TransferQueue
1、
package com.yootk;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
// 每一个队列在启用的时候必须明确的设置其保存的数据的个数
TransferQueue<String> queue = new LinkedTransferQueue<>(); // 数组阻塞队列
// 如果要想观察到队列的使用,最佳的做法就是生产者与消费者的处理模型
for (int x = 0; x < 10; x++) { // 创建循环
final int temp = x;
new Thread(() -> {
for (int y = 0; y < 10; y++) { // 持续生产
try {
TimeUnit.SECONDS.sleep(2); // 生产慢
String msg = "{ID = MUYAN - " + temp + " - " + y + "}沐言科技:www.yootk.com";
queue.put(msg); // 向队列之中添加数据
System.out.printf("【%s】%s%n", Thread.currentThread().getName(), msg);
} catch (InterruptedException e) {
}
}
}, "YOOTK生产者 - " + x).start();
}
for (int x = 0; x < 2; x++) {
new Thread(() -> {
while(true) { // 持续性的通过队列抓取数据
try {
TimeUnit.SECONDS.sleep(2); // 生产慢
System.err.printf("【%s】%s", Thread.currentThread().getName(), queue.take());
} catch (InterruptedException e) {
}
}
}, "YOOTK消费者 - " + x).start();
}
}
}
BlockingDeque
1、
package com.yootk;
import java.util.concurrent.*;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
// 每一个队列在启用的时候必须明确的设置其保存的数据的个数
BlockingDeque<String> queue = new LinkedBlockingDeque<>(); // 数组阻塞队列
// 如果要想观察到队列的使用,最佳的做法就是生产者与消费者的处理模型
for (int x = 0; x < 10; x++) { // 创建循环
final int temp = x;
new Thread(() -> {
for (int y = 0; y < 10; y++) { // 持续生产
try {
TimeUnit.SECONDS.sleep(2); // 生产慢
String msg = "{ID = MUYAN - " + temp + " - " + y + "}沐言科技:www.yootk.com";
queue.putFirst(msg); // 向队列之中添加数据
System.out.printf("【%s】%s%n", Thread.currentThread().getName(), msg);
} catch (InterruptedException e) {
}
}
}, "FIRST - YOOTK生产者 - " + x).start();
}
for (int x = 0; x < 10; x++) { // 创建循环
final int temp = x;
new Thread(() -> {
for (int y = 0; y < 10; y++) { // 持续生产
try {
TimeUnit.SECONDS.sleep(2); // 生产慢
String msg = "{ID = MUYAN - " + temp + " - " + y + "}沐言科技:www.yootk.com";
queue.putLast(msg); // 向队列之中添加数据
System.out.printf("【%s】%s%n", Thread.currentThread().getName(), msg);
} catch (InterruptedException e) {
}
}
}, "LAST - YOOTK生产者 - " + x).start();
}
for (int x = 0; x < 2; x++) {
new Thread(() -> {
int count = 0;
while (true) { // 持续性的通过队列抓取数据
if (count % 2 == 0) {
try {
TimeUnit.SECONDS.sleep(2); // 生产慢
System.err.printf("【%s】%s", Thread.currentThread().getName(), queue.takeFirst());
} catch (InterruptedException e) {
}
} else {
try {
TimeUnit.SECONDS.sleep(2); // 生产慢
System.err.printf("【%s】%s", Thread.currentThread().getName(), queue.takeLast());
} catch (InterruptedException e) {
}
}
count++;
}
}, "YOOTK消费者 - " + x).start();
}
}
}
RecursiveTask
1、
package com.yootk;
import java.util.concurrent.*;
class SumTask extends RecursiveTask<Integer> { // 实现数据累加的计算
private static final int THRESHOLD = 25; // 分支阈值
private int start; // 开始计算数值
private int end; // 结束计算数值
public SumTask(int start, int end) { // 数据的累加配置
this.start = start;
this.end = end;
}
@Override
protected Integer compute() { // 完成计算的处理
// 所有的子分支的处理,以及所有相关分支的合并处理都在此方法之中完成
int sum = 0; // 保存最终的计算结果
boolean isFork = (end - start) <= THRESHOLD; // 是否需要进行分支
if (isFork) { // 计算子分支
for (int i = start; i <= end; i ++) {
sum += i; // 分支处理
}
System.out.printf("【%s】start = %d、end = %d、sum = %d%n",
Thread.currentThread().getName(), this.start, this.end, sum);
} else { // 需要开启分支
int middel = (start + end) / 2;
SumTask leftTask = new SumTask(this.start, middel);
SumTask rightTask = new SumTask(middel + 1, this.end);
leftTask.fork(); // 开启左分支
rightTask.fork(); // 开启右分支
sum = leftTask.join() + rightTask.join(); // 等待分支处理的执行结果返回
}
return sum;
}
}
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
SumTask task = new SumTask(1, 100); // 外部的计算操作
ForkJoinPool pool = new ForkJoinPool(); // 开启分支任务池
Future<Integer> future = pool.submit(task); // 执行分支任务
System.out.println("分支任务计算结果:" + future.get()); // 异步返回
}
}
RecursiveAction
1、
package com.yootk;
import java.util.Arrays;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class CountSave { // 如果不想使用这个类使用原子的整型也是可以的
private Lock lock = new ReentrantLock(); // 采用一个互斥锁
private int sum = 0; // 保存累加结果
public void add(int num) {
this.lock.lock(); // 同步锁定
try {
this.sum += num; // 进行数据的累加
} finally {
this.lock.unlock(); // 解锁处理
}
}
public int getSum() {
return sum;
}
}
class SumTask extends RecursiveAction { // 实现数据累加的计算
private static final int THRESHOLD = 25; // 分支阈值
private int start; // 开始计算数值
private int end; // 结束计算数值
private CountSave save; // 结果的存储
public SumTask(int start, int end, CountSave save) { // 数据的累加配置
this.start = start;
this.end = end;
this.save = save; // 保存累加结果使用
}
@Override
protected void compute() { // 完成计算的处理
// 所有的子分支的处理,以及所有相关分支的合并处理都在此方法之中完成
int sum = 0; // 保存最终的计算结果
boolean isFork = (end - start) <= THRESHOLD; // 是否需要进行分支
if (isFork) { // 计算子分支
for (int i = start; i <= end; i ++) {
sum += i; // 分支处理
}
this.save.add(sum); // 保存累加结果
System.out.printf("【%s】start = %d、end = %d、sum = %d%n",
Thread.currentThread().getName(), this.start, this.end, sum);
} else { // 需要开启分支
int middel = (start + end) / 2;
SumTask leftTask = new SumTask(this.start, middel, this.save);
SumTask rightTask = new SumTask(middel + 1, this.end, this.save);
leftTask.fork(); // 开启左分支
rightTask.fork(); // 开启右分支
}
}
}
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
CountSave save = new CountSave(); // 保存累加结果
SumTask task = new SumTask(1, 100, save); // 外部的计算操作
ForkJoinPool pool = new ForkJoinPool(); // 开启分支任务池
pool.submit(task); // 执行分支任务
while (!task.isDone()) { // 任务没有结束
TimeUnit.MILLISECONDS.sleep(100); // 延迟一下
}
if (task.isCompletedNormally()) { // 任务执行完毕
System.out.println("分支任务计算结果:" + save.getSum()); // 异步返回
}
}
}
CountedCompleter
1、
package com.yootk;
import java.util.Arrays;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class SumTask extends CountedCompleter<AtomicInteger> { // 实现数据累加的计算
private static final int THRESHOLD = 25; // 分支阈值
private int start; // 开始计算数值
private int end; // 结束计算数值
private AtomicInteger result; // 保存最终的存储结果
public SumTask(int start, int end, AtomicInteger result) { // 数据的累加配置
this.start = start;
this.end = end;
this.result = result; // 保存累加结果使用
}
@Override
public void compute() { // 完成计算的处理
// 所有的子分支的处理,以及所有相关分支的合并处理都在此方法之中完成
int sum = 0; // 保存最终的计算结果
boolean isFork = (end - start) <= THRESHOLD; // 是否需要进行分支
if (isFork) { // 计算子分支
for (int i = start; i <= end; i ++) {
sum += i; // 分支处理
}
this.result.addAndGet(sum); // 数据的累加
// 在每一个分支执行完成之后,可以手工的进行回调操作的触发
super.tryComplete(); // 钩子触发
} else { // 需要开启分支
int middle = (start + end) / 2;
SumTask leftTask = new SumTask(this.start, middle, this.result);
SumTask rightTask = new SumTask(middle + 1, this.end, this.result);
leftTask.fork(); // 开启左分支
rightTask.fork(); // 开启右分支
}
}
@Override
public void onCompletion(CountedCompleter<?> caller) { // 钩子的触发
System.out.printf("【%s】start = %d、end = %d%n",
Thread.currentThread().getName(), this.start, this.end);
}
}
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
AtomicInteger result = new AtomicInteger(); // 保存最终的计算结果
SumTask task = new SumTask(1, 100, result); // 外部的计算操作
task.addToPendingCount(1); // 设置准备执行的任务量
ForkJoinPool pool = new ForkJoinPool(); // 开启分支任务池
pool.submit(task); // 执行分支任务
while (task.getPendingCount() != 0) { // 有任务未执行完毕
TimeUnit.MILLISECONDS.sleep(100); // 延迟一下
if (result.get() != 0) { // 有了计算结果
System.out.println("分支任务计算结果:" + result); // 异步返回
break;
}
}
}
}
ForkJoinPool
1、
package com.yootk;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class SumTask extends RecursiveTask<Integer> { // 实现数据累加的计算
private static final int THRESHOLD = 5; // 分支阈值
private int start; // 开始计算数值
private int end; // 结束计算数值
private Lock lock = new ReentrantLock(); // 互斥锁
public SumTask(int start, int end) { // 数据的累加配置
this.start = start;
this.end = end;
}
@Override
protected Integer compute() { // 完成计算的处理
// 所有的子分支的处理,以及所有相关分支的合并处理都在此方法之中完成
int sum = 0; // 保存最终的计算结果
boolean isFork = (end - start) <= THRESHOLD; // 是否需要进行分支
if (isFork) { // 计算子分支
SumHandleManagedBlocker blocker = new SumHandleManagedBlocker(
this.start, this.end, this.lock);
try {
ForkJoinPool.managedBlock(blocker); // 加入阻塞管理
} catch (InterruptedException e) {
e.printStackTrace();
}
return blocker.result; // 返回计算的结果
} else { // 需要开启分支
int middel = (start + end) / 2;
SumTask leftTask = new SumTask(this.start, middel);
SumTask rightTask = new SumTask(middel + 1, this.end);
leftTask.fork(); // 开启左分支
rightTask.fork(); // 开启右分支
sum = leftTask.join() + rightTask.join(); // 等待分支处理的执行结果返回
}
return sum;
}
static class SumHandleManagedBlocker implements ForkJoinPool.ManagedBlocker {// 自定义线程管理
private Integer result;
private int start;
private int end;
private Lock lock; // 获取一个互斥锁
public SumHandleManagedBlocker(int start, int end, Lock lock) {
this.start = start;
this.end = end;
this.lock = lock;
}
@Override
public boolean block() throws InterruptedException { // 处理延迟任务
int sum = 0;
this.lock.lock();
try {
for (int x = start; x <= end; x ++) { // 数学计算
TimeUnit.MILLISECONDS.sleep(100); // 延迟
sum += 1; // 执行数据的累加
}
} finally {
this.result = sum; // 返回处理结果
this.lock.unlock(); // 解锁
}
System.out.printf("【%s】处理数据累加业务,start = %d、end = %d、sum = %d%n",
Thread.currentThread().getName(), this.start, this.end, sum);
return result != null; // 结束标记
}
@Override
public boolean isReleasable() { // 补偿的判断,返回false会创建补偿线程
return this.result != null; // 阻塞解除判断
}
}
}
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
SumTask task = new SumTask(1, 100); // 外部的计算操作
ForkJoinPool pool = new ForkJoinPool(2); // 开启分支任务池
Future<Integer> future = pool.submit(task); // 执行分支任务
System.out.println("分支任务计算结果:" + future.get()); // 异步返回
}
}
Phaser
1、
package com.yootk;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
Phaser phaser = new Phaser(2); // 定义2个任务
System.out.println("【Phaser阶段1】" + phaser.getPhase()); // 初始阶段
for (int x = 0; x < 2; x++) { // 循环创建线程
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
System.out.printf("【%s】我已就位,等待下一步的执行命令。%n",
Thread.currentThread().getName());
phaser.arriveAndAwaitAdvance(); // Phaser就位,拥有线程等待
System.out.printf("【%s】人员齐备,准备执行新的任务。%n",
Thread.currentThread().getName());
}, "士兵 - " + x).start();
}
TimeUnit.SECONDS.sleep(3); // 等待一下再执行
System.out.println("【Phaser阶段2】" + phaser.getPhase()); // 第二个阶段
}
}
2、
package com.yootk;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
Phaser phaser = new Phaser(2); // 定义2个任务
for (int x = 0; x < 2; x++) { // 循环创建线程
TimeUnit.SECONDS.sleep(2); // 延迟
new Thread(()->{
System.out.printf("【%s】达到已经上车。%n", Thread.currentThread().getName());
phaser.arrive(); // 等价于countDown()方法
}, "游客 - " + x).start();
}
phaser.awaitAdvance(phaser.getPhase()); // 等待达到阶段
System.out.println("【主线程】人齐了,开车走人,下一个景点购物消费。");
}
}
3、
package com.yootk;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
int repeat = 2; // 执行的轮数配置
Phaser phaser = new Phaser() { // 进行一些处理方法的覆写
@Override
protected boolean onAdvance(int phase, int registeredParties) { // 回调处理
System.out.printf("【onAdvance()处理】进阶处理操作,phase = %s、registeredParties = %s%n",
phase, registeredParties);
return phase + 1 >= repeat || registeredParties == 0; // 终止处理
}
};
for (int x = 0; x < 2; x++) { // 循环创建2个线程
phaser.register(); // 注册参与者的线程
new Thread(()->{ // 每一个线程都在持续的执行之中
while (!phaser.isTerminated()) { // 现在没有终止Phaser执行
phaser.arriveAndAwaitAdvance(); // 等待其他的线程就位
try {
TimeUnit.SECONDS.sleep(1); // 增加操作延迟
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("【%s】YOOTK业务处理。%n", Thread.currentThread().getName());
}
}, "子线程 - " + x).start();
}
}
}
4、
package com.yootk;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
class Tasker implements Runnable {// 定义任务的处理线程
private Phaser phaser;
public Tasker(Phaser phaser) {
this.phaser = phaser;
this.phaser.register(); // 将当前线程进行注册
}
@Override
public void run() {
while (!phaser.isTerminated()) { // 任务还在执行
this.phaser.arriveAndAwaitAdvance(); // 等待其他参与者的线程
System.out.printf("【%s】Yootk业务处理。%n", Thread.currentThread().getName());
}
}
}
public class YootkDemo { // 李兴华高薪就业编程训练营
public static final int THRESHOLD = 2; // 定义每一个Phaser对应的任务数量
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
int repeat = 2; // 定义任务的重复的周期
Phaser phaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.printf("【onAdvance()处理】进阶处理操作,phase = %s、registeredParties = %s%n",
phase, registeredParties);
return phase + 1 >= repeat || registeredParties == 0; // 终止处理
}
};
Tasker[] taskers = new Tasker[5]; // 创建5个子任务
build(taskers, 0, taskers.length, phaser); // 创建任务层级
for (int x = 0; x < taskers.length; x++) { // 启动线程
new Thread(taskers[x], "子线程 - " + x).start();
}
}
private static void build(Tasker[] taskers, int low, int high, Phaser parent) {
if ((high - low) > THRESHOLD) { // 层级的判断
for (int x = low; x < high; x += THRESHOLD) { // 每次进行2个任务的配置
int limit = Math.min(x + THRESHOLD, high); // 获取最小值
build(taskers, x, limit, new Phaser(parent)); // 定义层级
}
} else {
for (int x = low; x < high; ++x) { // 循环创建任务
taskers[x] = new Tasker(parent); // 定义任务层级
}
}
}
}
CountDownLatch
1、
package com.yootk;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
int a = 10; // 定义整型变量
int b = 20; // 定义整型变量
System.out.printf("数学加法计算:" + (a + b));
}
}
2、
package com.yootk;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static int a = 10;// 定义整型变量
public static int b = 20; // 定义整型变量
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
CountDownLatch latch = new CountDownLatch(1); // 倒计数的内容为1
System.out.printf("【第一次计算】数学加法计算:%d%n", (a + b));
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(1); // 延迟处理
a = 50; // 修改b的内容
latch.countDown(); // 减1的操作
} catch (InterruptedException e) {}
}, "数据修改子线程").start();
latch.await(); // 等待计数为0
System.out.printf("【第二次计算】数学加法计算:%d%n", (a + b));
}
}
SubmissionPublisher
1、
package com.yootk;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
List<String> datas = List.of("www.yootk.com", "edu.yootk.com"); // 操作数据
SubmissionPublisher publisher = new SubmissionPublisher<>(); // 数据发布
// 此时进行数据的消费处理,同时返回有一个异步的处理任务
CompletableFuture<Void> task = publisher.consume(System.out::println); // 直接传入消费型的函数引用
// 以上只是定义了消息生产者和消费者之间的基本关联模型,随后进行具体的数据处理
datas.forEach((tmp) -> { // 集合的迭代操作
publisher.submit(tmp); // 发布数据
try {
TimeUnit.SECONDS.sleep(1); // 每次延迟1秒的时间
} catch (InterruptedException e) {
e.printStackTrace();
}
});
publisher.close(); // 数据发送完毕
if (task != null) {
task.get(); // 放行
}
}
}
Flow
1、
package com.yootk;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
class Book { // 自定义程序类
private long id;
private String title;
private String content;
// Setter、Getter、无参构造、toString()等方法就直接略了
public Book(long id, String title, String content) {
this.id = id;
this.title = title;
this.content = content;
}
// 因为这种响应式编程操作一般都需要采用数据流的形式进行处理,所以为了简化定义一个List创建器
static class BookDataCreator { // 创造者模式
public static List<Book> getBooks() { // 获取数据项
List<Book> bookList = new ArrayList<>(); // 实例化List集合
bookList.add(new Book(1, "Java面向对象编程", "yootk.com"));
bookList.add(new Book(2, "Java就业编程实战", "yootk.com"));
bookList.add(new Book(3, "JavaWeb就业编程实战", "yootk.com"));
bookList.add(new Book(4, "SpringBoot就业编程实战", "yootk.com"));
bookList.add(new Book(5, "SpringCloud就业编程实战", "yootk.com"));
bookList.add(new Book(6, "Redis就业编程实战", "yootk.com"));
bookList.add(new Book(7, "Spring就业编程实战", "yootk.com"));
bookList.add(new Book(8, "SSM就业编程实战", "yootk.com"));
bookList.add(new Book(9, "Netty就业编程实战", "yootk.com"));
return bookList;
}
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
class BookSubscriber implements Flow.Subscriber<Book> { // 图书数据的订阅者
private Flow.Subscription subscription; // 整个的订阅控制
private int counter = 0; // 计数器
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription; // 保存订阅控制操作
// 由于不确定后面会同时返回多少个数据,此时的目的是触发数据的接收操作
this.subscription.request(1); // 从发布者之中获取1个数据项
System.out.println("【BookSubscriber】数据订阅者开启...");
}
@Override
public void onNext(Book item) { // 数据接收
System.out.printf("【BookSubscriber】图书ID = %s、图书名称 = %s、图书的内容 = %s%n",
item.getId(), item.getTitle(), item.getContent());
this.counter ++; // 计数累加
this.subscription.request(1); // 再次接收,继续触发onNext()
}
@Override
public void onError(Throwable throwable) { // 订阅出错的时候处理
System.err.println("【BookSubscriber】" + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.printf("【BookSubscriber】订阅者数据处理完成,一共处理的数据量为:%s%n", this.counter);
}
public int getCounter() {
return counter;
}
}
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
SubmissionPublisher<Book> publisher = new SubmissionPublisher<>(); // 数据发布
BookSubscriber subscriber = new BookSubscriber(); // 数据订阅者
publisher.subscribe(subscriber); // 发布者与订阅者建立连接
// 实际的集合可能来自于数据库,或者是来自于其他的文本数据(是一些固定结构的,例如:XML、JSON)
List<Book> books = Book.BookDataCreator.getBooks(); // 获取创建的集合
books.stream().forEach(book -> publisher.submit(book)); // 数据的发布
// 应该实现数据的等待处理,所以增加一个停滞的判断
while (books.size() != subscriber.getCounter()) { // 数据量没有消费完成
TimeUnit.SECONDS.sleep(1); // 象征性的延迟
}
publisher.close(); // 关闭发送者
}
}
demo
1、
package com.yootk;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
class Message { // 目标的转换类型
private String content;
private String author;
public Message(String content, String author) {
this.content = content;
this.author = author;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public String getAuthor() {
return author;
}
public void setAuthor(String author) {
this.author = author;
}
}
class MessageSubscriber implements Flow.Subscriber<Message> {
private Flow.Subscription subscription;
private int counter = 0;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
// 如果此时你需要接收的数量很多,例如设置为了5,如果不足5个数据流也接收
this.subscription.request(1);
}
@Override
public void onNext(Message item) {
System.out.printf("【MessageSubscriber】消息内容 = %s、消息作者 = %s%n",
item.getContent(), item.getAuthor());
this.counter ++;
this.subscription.request(1); // 下次的执行触发
}
@Override
public void onError(Throwable throwable) {
System.err.println("【MessageSubscriber】消息订阅者出现错误:" + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("【MessageSubscriber】消息订阅处理完成,处理的数据量为:" + this.counter);
}
public int getCounter() {
return counter;
}
}
// 此时的转换器处理类拥有接收和发布的支持能力,那么建议多继承一个父类
class MessageProcessor extends SubmissionPublisher<Message> implements Flow.Processor<Book, Message> {
private Flow.Subscription subscription;
private Function<Book, Message> function; // 实现转换功能定义
public MessageProcessor(Function<Book, Message> function) {
this.function = function;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(Book item) {
super.submit(this.function.apply(item)); // 将转换后的数据再次发布
this.subscription.request(1); // 重新抓取数据
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace(); // 偷懒
}
@Override
public void onComplete() {
}
}
class Book { // 自定义程序类
private long id;
private String title;
private String content;
// Setter、Getter、无参构造、toString()等方法就直接略了
public Book(long id, String title, String content) {
this.id = id;
this.title = title;
this.content = content;
}
// 因为这种响应式编程操作一般都需要采用数据流的形式进行处理,所以为了简化定义一个List创建器
static class BookDataCreator { // 创造者模式
public static List<Book> getBooks() { // 获取数据项
List<Book> bookList = new ArrayList<>(); // 实例化List集合
bookList.add(new Book(1, "Java面向对象编程", "yootk.com"));
bookList.add(new Book(2, "Java就业编程实战", "yootk.com"));
bookList.add(new Book(3, "JavaWeb就业编程实战", "yootk.com"));
bookList.add(new Book(4, "SpringBoot就业编程实战", "yootk.com"));
bookList.add(new Book(5, "SpringCloud就业编程实战", "yootk.com"));
bookList.add(new Book(6, "Redis就业编程实战", "yootk.com"));
bookList.add(new Book(7, "Spring就业编程实战", "yootk.com"));
bookList.add(new Book(8, "SSM就业编程实战", "yootk.com"));
bookList.add(new Book(9, "Netty就业编程实战", "yootk.com"));
return bookList;
}
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
public class YootkDemo { // 李兴华高薪就业编程训练营
public static void main(String[] args) throws Exception { // 沐言科技:www.yootk.com
MessageProcessor processor = new MessageProcessor((item) -> {
String book = String.format("图书编号:%s、图书名称:%s、图书内容:%s",
item.getId(), item.getTitle(), item.getContent());
return new Message(book, "李兴华");
});
MessageSubscriber messageSubscriber = new MessageSubscriber(); // 消息订阅者
// BookSubscriber bookSubscriber = new BookSubscriber(); // 图书订阅者
SubmissionPublisher<Book> publisher = new SubmissionPublisher<>(); // 数据发布
publisher.subscribe(processor); // 发布者与订阅者建立连接
processor.subscribe(messageSubscriber);
// 实际的集合可能来自于数据库,或者是来自于其他的文本数据(是一些固定结构的,例如:XML、JSON)
List<Book> books = Book.BookDataCreator.getBooks(); // 获取创建的集合
books.stream().forEach(book -> publisher.submit(book)); // 数据的发布
// 应该实现数据的等待处理,所以增加一个停滞的判断
while (books.size() != messageSubscriber.getCounter()) { // 数据量没有消费完成
TimeUnit.SECONDS.sleep(1); // 象征性的延迟
}
publisher.close(); // 关闭发送者
}
}