缓存管理
Caffeine 缓存概述
计算机数据加载
- 由于计算机体系结构的设计问题,所有的程序都会在 CPU 之中进行运算,然而考虑到计算数据的完整性,所有的数据不会通过磁盘加载,而是会通过内存进行数据的缓存,最终才会被加载到 CPU 之中,这样一来在整个项目的运行过程之中,如果磁盘 IO 的操作性能较差,那么最终就会导致程序变慢。
数据缓存
- 此时的应用只有单个或者有限的几个用户使用,这样的执行逻辑是没有任何问题的,但是如果要运行在高并发环境下,同时项目中又需要对数据库数据进行大量的读取,则这样的开发就一定会产生严重的性能问题,甚至出现因为应用过多竞争 I0 资源,而出现的宕机或服务崩溃等问题。此时最佳的解决方案是在内存开辟一块空间,通过该空间缓存一部分的数据内容
常见单机缓存组件
- 较为常见的 Java 单机缓存组件有如下三种
- EHCache 组件:一个随 Hibernate 框架同时推广的缓存组件,也是 Hibernate 之中默认的缓存实现其属于一个纯粹的 Java 缓存框架,具有快速、简单等操作特点,同时支持有更多的缓存处理功能;
- Google Guava:是一个非常方便易用的本地化缓存组件,基于 LRU 算法实现,支持多种缓存过期策略:
- Caffeine:是对 Guava 缓存组件的重写版本,虽然功能不如 EHCache 多,但是其提供了最优的缓存命中率。
Caffeine 组件特点
- 可以自动将数据加载到缓存之中,也可以采用异步的方式进行加载
- 当基于频率和最近访问的缓存达到最大容量时,该组件会自动切换到基于大小的模式
- 可以根据上一次缓存访问或上一次的数据写入来决定缓存的过期处理:
- 当某一条缓存数据出现了过期访问后可以自动进行异步刷新;
- 考虑到 JVM 内存的管理机制,所有的缓存 KEY 自动包含在弱引用之中,VALUE 包含在弱引用或软引用中:
- 当缓存数据被清理后,将会收到相应的通知信息;
- 缓存数据的写入可以传播到外部存储;
- 自动记录缓存数据被访问的次数。
// https://mvnrepository.com/artifact/com.github.ben-manes.caffeine/caffeine
implementation group: 'com.github.ben-manes.caffeine', name: 'caffeine', version: '3.1.8'
手工缓存
Caffeine 缓存组件配置
- 在 Caffeine 缓存组件之中,为了便于所有缓存数据操作的标准化,提供有一个 Cache 的缓存公共操作接口,而要想获取到此接口的实例,则就需要通过 Caffeine 类中提供的方法来进行构建,在构建的时候可以对缓存空间的缓存对象个数以及失效时间等环境进行设置
Cache 接口方法
- 在进行缓存数据存储时,都需要存放一个二元偶对象,即要包含有数据项的 KEY 和 VALUE,而缓存数据的获取也要通过 KEY 来完成。Cache 作一个公共的标准,其内部的方法设计是基于 ConcurrentMap 设计实现的,该接口定义定义的方法如表所示。
Caffeine 数据存储结构
- 由于缓存的实现操作之中需要进行大量的异步处理,所以在 Caffeine 组件内部是基于 ConcurrentMap 接口形式实现的并发集合,定义了一个新的 LocalCache 缓存处理接口,继承结构如图所示。这样可以保证在并发写入下的数据安全,同时也可以保证数据的查询性能,而在通过 Caffeine.build()方法进行 Cache 构建时,也会依据当前存储长度失效时间配置、缓存权重的配置来决定返回是那一个 LocalCache 接口实例,由于本次已经设置了队列个数,所以当前用于实现缓存数据存储的类型为"BoundedLocalCache.BoundedLocalManualCache"
1、
https://github.com/ben-manes/caffeine
2、
package com.yootk.test;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
public class TestCaffeine { // 测试Caffeine组件的基本操作
private static final Logger LOGGER = LoggerFactory.getLogger(TestCaffeine.class);
public static void main(String[] args) throws Exception {
Cache<String, String> cache = Caffeine.newBuilder() // 构建一个新的Caffeine实例
.maximumSize(100) // 设置缓存之中保存的最大数据量
.expireAfterAccess(3L, TimeUnit.SECONDS) // 如无访问则3秒后失效
.build(); // 构建Cache接口实例
cache.put("yootk", "www.yootk.com"); // 设置缓存项
cache.put("edu", "edu.yootk.com"); // 设置缓存项
cache.put("book", "Spring开发实战"); // 设置缓存项
LOGGER.info("【未超时获取缓存数据】yootk = {}", cache.getIfPresent("yootk")); // 获取数据
TimeUnit.SECONDS.sleep(5); // 5秒后超时
LOGGER.info("【已超时获取缓存数据】yootk = {}", cache.getIfPresent("yootk")); // 获取数据
}
}
3、
package com.yootk.test;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
public class TestCaffeine { // 测试Caffeine组件的基本操作
private static final Logger LOGGER = LoggerFactory.getLogger(TestCaffeine.class);
public static void main(String[] args) throws Exception {
Cache<String, String> cache = Caffeine.newBuilder() // 构建一个新的Caffeine实例
.maximumSize(100) // 设置缓存之中保存的最大数据量
.expireAfterAccess(3L, TimeUnit.SECONDS) // 如无访问则3秒后失效
.build(); // 构建Cache接口实例
cache.put("yootk", "www.yootk.com"); // 设置缓存项
cache.put("edu", "edu.yootk.com"); // 设置缓存项
cache.put("book", "Spring开发实战"); // 设置缓存项
LOGGER.info("【未超时获取缓存数据】yootk = {}", cache.getIfPresent("yootk")); // 获取数据
TimeUnit.SECONDS.sleep(5); // 5秒后超时
LOGGER.info("【已超时获取缓存数据】yootk = {}", cache.get("yootk",
(key) -> {
LOGGER.info("【失效处理】没有发现 KEY = {} 的数据,要进行失效处理控制。", key);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "【EXPIRE】" + key; // 失效数据的返回
})); // 获取数据
}
}
缓存同步加载
缓存同步加载
- 传统的缓存操作过程之中,如果缓存的数据已经不存在了,则通过 KEY 查询时,就会返回 null 的内容。在 Caffeine 组件之中,提供有同步缓存加载支持,如果发现要加载的数据不存在时,可以通过 CacheLoader 接口实现同步数据加载操作,这样就可以实现失效缓存数据的维护
1、
package com.yootk.test;
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
public class TestCaffeine { // 测试Caffeine组件的基本操作
private static final Logger LOGGER = LoggerFactory.getLogger(TestCaffeine.class);
public static void main(String[] args) throws Exception {
LoadingCache<String, String> cache = Caffeine.newBuilder() // 构建一个新的Caffeine实例
.maximumSize(100) // 设置缓存之中保存的最大数据量
.expireAfterAccess(3L, TimeUnit.SECONDS) // 如无访问则3秒后失效
.build(new CacheLoader<String, String>() {
@Override
public @Nullable String load(String key) throws Exception {
TimeUnit.SECONDS.sleep(2); // 模拟数据的加载延迟
return "【LoadingCache】" + key; // 数据加载的返回结果
}
}); // 构建Cache接口实例
cache.put("yootk", "www.yootk.com"); // 设置缓存项
LOGGER.info("【未超时获取缓存数据】yootk = {}", cache.getIfPresent("yootk")); // 获取数据
TimeUnit.SECONDS.sleep(5); // 5秒后超时
LOGGER.info("【已超时获取缓存数据】yootk = {}", cache.getIfPresent("yootk")); // 获取数据
}
}
2、
package com.yootk.test;
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class TestCaffeine { // 测试Caffeine组件的基本操作
private static final Logger LOGGER = LoggerFactory.getLogger(TestCaffeine.class);
public static void main(String[] args) throws Exception {
LoadingCache<String, String> cache = Caffeine.newBuilder() // 构建一个新的Caffeine实例
.maximumSize(100) // 设置缓存之中保存的最大数据量
.expireAfterAccess(3L, TimeUnit.SECONDS) // 如无访问则3秒后失效
.build(new CacheLoader<String, String>() {
@Override
public @Nullable String load(String key) throws Exception {
LOGGER.info("【CacheLoader】进行缓存数据的加载处理,当前的KEY = {}", key);
TimeUnit.SECONDS.sleep(2); // 模拟数据的加载延迟
return "【LoadingCache】" + key; // 数据加载的返回结果
}
}); // 构建Cache接口实例
cache.put("yootk", "www.yootk.com"); // 设置缓存项
LOGGER.info("【未超时获取缓存数据】yootk = {}", cache.getIfPresent("yootk")); // 获取数据
TimeUnit.SECONDS.sleep(5); // 5秒后超时
cache.put("edu", "edu.yootk.com"); // 缓存失效之后继续追加新的数据项
for (Map.Entry<String, String> entry : cache.getAll(List.of("yootk", "edu", "book")).entrySet()) {
LOGGER.info("【数据加载】key = {}、value = {}", entry.getKey(), entry.getValue());
}
LOGGER.info("【已超时获取缓存数据】yootk = {}", cache.getIfPresent("yootk")); // 获取数据
}
}
3、
INFO com.yootk.test.TestCaffeine - 【未超时获取缓存数据】yootk = www.yootk.com
INFO com.yootk.test.TestCaffeine - 【CacheLoader】进行缓存数据的加载处理,当前的KEY = yootk
INFO com.yootk.test.TestCaffeine - 【CacheLoader】进行缓存数据的加载处理,当前的KEY = book
INFO com.yootk.test.TestCaffeine - 【数据加载】key = yootk、value = 【LoadingCache】yootk
INFO com.yootk.test.TestCaffeine - 【数据加载】key = edu、value = edu.yootk.com
INFO com.yootk.test.TestCaffeine - 【数据加载】key = book、value = 【LoadingCache】book
INFO com.yootk.test.TestCaffeine - 【已超时获取缓存数据】yootk = 【LoadingCache】yootk
4、
package com.yootk.test;
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class TestCaffeine { // 测试Caffeine组件的基本操作
private static final Logger LOGGER = LoggerFactory.getLogger(TestCaffeine.class);
public static void main(String[] args) throws Exception {
LoadingCache<String, String> cache = Caffeine.newBuilder() // 构建一个新的Caffeine实例
.maximumSize(100) // 设置缓存之中保存的最大数据量
.expireAfterAccess(3L, TimeUnit.SECONDS) // 如无访问则3秒后失效
.build((key) -> {
LOGGER.info("【CacheLoader】进行缓存数据的加载处理,当前的KEY = {}", key);
TimeUnit.SECONDS.sleep(2); // 模拟数据的加载延迟
return "【LoadingCache】" + key; // 数据加载的返回结果
}); // 构建Cache接口实例
cache.put("yootk", "www.yootk.com"); // 设置缓存项
LOGGER.info("【未超时获取缓存数据】yootk = {}", cache.getIfPresent("yootk")); // 获取数据
TimeUnit.SECONDS.sleep(5); // 5秒后超时
cache.put("edu", "edu.yootk.com"); // 缓存失效之后继续追加新的数据项
for (Map.Entry<String, String> entry : cache.getAll(List.of("yootk", "edu", "book")).entrySet()) {
LOGGER.info("【数据加载】key = {}、value = {}", entry.getKey(), entry.getValue());
}
LOGGER.info("【已超时获取缓存数据】yootk = {}", cache.getIfPresent("yootk")); // 获取数据
}
}
5、
package com.yootk.test;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class TestCaffeine { // 测试Caffeine组件的基本操作
private static final Logger LOGGER = LoggerFactory.getLogger(TestCaffeine.class);
public static void main(String[] args) throws Exception {
AsyncLoadingCache<String, String> cache = Caffeine.newBuilder() // 构建一个新的Caffeine实例
.maximumSize(100) // 设置缓存之中保存的最大数据量
.expireAfterAccess(3L, TimeUnit.SECONDS) // 如无访问则3秒后失效
.buildAsync((key, executor) ->
CompletableFuture.supplyAsync(()->{
LOGGER.info("【AsyncCacheLoader】进行缓存数据的加载处理,当前的KEY = {}", key);
try {
TimeUnit.SECONDS.sleep(20); // 模拟数据的加载延迟
} catch (InterruptedException e) {}
return "【LoadingCache】" + key; // 数据加载的返回结果
})); // 构建Cache接口实例
// 此时是一个异步的缓存管理,所以在这个时候进行数据追加的处理前也要采用异步的方式进行定义
cache.put("yootk", CompletableFuture.completedFuture("www.yootk.com")); // 设置缓存项
LOGGER.info("【未超时获取缓存数据】yootk = {}", cache.getIfPresent("yootk").get()); // 获取数据
TimeUnit.SECONDS.sleep(5); // 5秒后超时
cache.put("edu", CompletableFuture.completedFuture("edu.yootk.com")); // 缓存失效之后继续追加新的数据项
for (Map.Entry<String, String> entry : cache.getAll(List.of("yootk", "edu", "book")).get().entrySet()) {
LOGGER.info("【数据加载】key = {}、value = {}", entry.getKey(), entry.getValue());
}
}
}
6、
[main] INFO com.yootk.test.TestCaffeine - 【未超时获取缓存数据】yootk = www.yootk.com
[ForkJoinPool.commonPool-worker-1] INFO com.yootk.test.TestCaffeine - 【AsyncCacheLoader】进行缓存数据的加载处理,当前的KEY = yootk
[ForkJoinPool.commonPool-worker-2] INFO com.yootk.test.TestCaffeine - 【AsyncCacheLoader】进行缓存数据的加载处理,当前的KEY = book
[main] INFO com.yootk.test.TestCaffeine - 【数据加载】key = yootk、value = 【LoadingCache】yootk
[main] INFO com.yootk.test.TestCaffeine - 【数据加载】key = edu、value = edu.yootk.com
[main] INFO com.yootk.test.TestCaffeine - 【数据加载】key = book、value = 【LoadingCache】book
异步缓存
异步缓存数据加载
- Caffeine 提供的缓存数据加载机制,可以直接在缓存处理的级别上实现失效数据的恢复,但是如果所有的数据加载操作都基于同步的方式来实现处理,则一定会引起非常严重的性能问题为了解决此类操作的出现,Caffeine 也提供有异步数据加载操作,可以直接通过 AsyncLoadingCache 接口以及 CompletableFuture 异步加载类来实现,实现结构如图所示。
Caffeine.buildAsync()操作类结构
- 如果要想创建异步加载,则需要通过 Caffeine.buildAsync0 方法来完成,在该方法中需 要传递 AsyncCacheLoader 接口实例,用于进行异步数据的加载,该方法会返回 AsyncLoadingCache 接口实例,该接口可以通过 CompletableFuture 对象实例实现异步返回数据的获取,这些操作类的关联结构如图所示。
1、
package com.yootk.test;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class TestCaffeine { // 测试Caffeine组件的基本操作
private static final Logger LOGGER = LoggerFactory.getLogger(TestCaffeine.class);
public static void main(String[] args) throws Exception {
AsyncLoadingCache<String, String> cache = Caffeine.newBuilder() // 构建一个新的Caffeine实例
.maximumSize(100) // 设置缓存之中保存的最大数据量
.expireAfterAccess(3L, TimeUnit.SECONDS) // 如无访问则3秒后失效
.buildAsync((key, executor) ->
CompletableFuture.supplyAsync(()->{
LOGGER.info("【AsyncCacheLoader】进行缓存数据的加载处理,当前的KEY = {}", key);
try {
TimeUnit.SECONDS.sleep(20); // 模拟数据的加载延迟
} catch (InterruptedException e) {}
return "【LoadingCache】" + key; // 数据加载的返回结果
})); // 构建Cache接口实例
// 此时是一个异步的缓存管理,所以在这个时候进行数据追加的处理前也要采用异步的方式进行定义
cache.put("yootk", CompletableFuture.completedFuture("www.yootk.com")); // 设置缓存项
LOGGER.info("【未超时获取缓存数据】yootk = {}", cache.getIfPresent("yootk").get()); // 获取数据
TimeUnit.SECONDS.sleep(5); // 5秒后超时
cache.put("edu", CompletableFuture.completedFuture("edu.yootk.com")); // 缓存失效之后继续追加新的数据项
for (Map.Entry<String, String> entry : cache.getAll(List.of("yootk", "edu", "book")).get().entrySet()) {
LOGGER.info("【数据加载】key = {}、value = {}", entry.getKey(), entry.getValue());
}
}
}
缓存数据驱逐
缓存数据清除
- Java 中的缓存都需要在 JVM 堆内存空间之中进行内存的分配,开发者可以通过 Cache 相关接口实现缓存数据的存储与获取。但是缓存中所存储的数据,本质上都属于程序所需就需要为缓存定义一个数据的驱逐策略(或称要的临时数据,考虑到缓存的容量问题,以释放有限的缓存空间为清除策略),
容量驱逐策略
- 在进行缓存创建时可以通过 maximumSize()方法进行缓存保存数据的,如果当前存储的缓存数据已经达到了该容量,则自动清除之前的数据项,并保存新的数据。
权重驱逐策略
- 在创建缓存时可以通过 maximumWeight()方法设置一个权重阈值,而后在数据存储时利用 Weighter 函数式接口基于 KEY 和 VALUE 计算当前保存数据的权重,如果该权重大于权重阈值则进行淘汰
时间驱逐策略
- 在创建数据缓存时,需要考虑缓存失效时间的处理,而失效时间之中又分为写入时间计时与访问时间计时两种,当采用了写入超时驱逐策略(expireAfterWrite()方法调用)会以写入的时间作为起点进行计算,而使用访问超时驱逐策略(expireAfterAccess(方法调用),在每次访问后都会重新开始计时处理。
自定义过期失效驱逐策略
- 现实的应用开发之中,由于不同业务的需要,对于缓存数据的失效控制也会较为繁琐为了满足于一些多样化的过期处理要求,Caffeine 类中还提供有一个 expireAfter()过期处理方法,该方法可以接收一个 Expiry 接口实例实现数据过期控制
引用驱逐策略
- 缓存中的内存会占用 JVM 的堆内存空间,当 JVM 堆内存空间不足时,就需要进行及时的释放,所以在进行数据存储时就可以采用弱引用与软引用的形式进行存储。
1、
package com.yootk.test;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
public class TestCaffeine { // 测试Caffeine组件的基本操作
private static final Logger LOGGER = LoggerFactory.getLogger(TestCaffeine.class);
public static void main(String[] args) throws Exception {
Cache<String, String> cache = Caffeine.newBuilder() // 构建一个新的Caffeine实例
.maximumSize(1) // 设置缓存之中保存的最大数据量
.expireAfterAccess(3L, TimeUnit.SECONDS) // 如无访问则3秒后失效
.build(); // 构建Cache接口实例
// 此时是一个异步的缓存管理,所以在这个时候进行数据追加的处理前也要采用异步的方式进行定义
cache.put("yootk", "www.yootk.com"); // 设置缓存项
cache.put("edu", "edu.yootk.com"); // 设置缓存项
// 思考:为什么现在缓存个数已经超过了,但是最早的缓存数据还在保留?没有及时清理
TimeUnit.MILLISECONDS.sleep(100); // 100毫秒
LOGGER.info("【未超时获取缓存数据】yootk = {}", cache.getIfPresent("yootk")); // 获取数据
LOGGER.info("【未超时获取缓存数据】edu = {}", cache.getIfPresent("edu")); // 获取数据
}
}
2、
package com.yootk.test;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
public class TestCaffeine { // 测试Caffeine组件的基本操作
private static final Logger LOGGER = LoggerFactory.getLogger(TestCaffeine.class);
public static void main(String[] args) throws Exception {
Cache<String, String> cache = Caffeine.newBuilder() // 构建一个新的Caffeine实例
.maximumWeight(100) // 设置缓存之中的最大权重
.weigher((key, value) -> {
LOGGER.info("【Weigher权重计算器】key = {}、value = {}", key, value);
// 实际开发之中的权重计算处理操作,可以通过KEY和VALUE的长度计算得来
return 51; // 现在就简单一些,直接让权重暴高
})
.expireAfterAccess(3L, TimeUnit.SECONDS) // 如无访问则3秒后失效
.build(); // 构建Cache接口实例
// 此时是一个异步的缓存管理,所以在这个时候进行数据追加的处理前也要采用异步的方式进行定义
cache.put("yootk", "www.yootk.com"); // 设置缓存项
cache.put("edu", "edu.yootk.com"); // 设置缓存项
cache.put("book", "Spring开发实战"); // 设置缓存项
cache.put("author", "小李老师 —— 李兴华"); // 设置缓存项
// 思考:为什么现在缓存个数已经超过了,但是最早的缓存数据还在保留?没有及时清理
TimeUnit.MILLISECONDS.sleep(100); // 100毫秒
LOGGER.info("【未超时获取缓存数据】yootk = {}", cache.getIfPresent("yootk")); // 获取数据
LOGGER.info("【未超时获取缓存数据】edu = {}", cache.getIfPresent("edu")); // 获取数据
LOGGER.info("【未超时获取缓存数据】book = {}", cache.getIfPresent("book")); // 获取数据
LOGGER.info("【未超时获取缓存数据】author = {}", cache.getIfPresent("author")); // 获取数据
}
}
3、
com.yootk.test.TestCaffeine - 【Weigher权重计算器】key = yootk、value = www.yootk.com
com.yootk.test.TestCaffeine - 【Weigher权重计算器】key = edu、value = edu.yootk.com
com.yootk.test.TestCaffeine - 【Weigher权重计算器】key = book、value = Spring开发实战
com.yootk.test.TestCaffeine - 【Weigher权重计算器】key = author、value = 小李老师 —— 李兴华
com.yootk.test.TestCaffeine - 【未超时获取缓存数据】yootk = null
com.yootk.test.TestCaffeine - 【未超时获取缓存数据】edu = null
com.yootk.test.TestCaffeine - 【未超时获取缓存数据】book = null
com.yootk.test.TestCaffeine - 【未超时获取缓存数据】author = 小李老师 —— 李兴华
4、
package com.yootk.test;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
public class TestCaffeine { // 测试Caffeine组件的基本操作
private static final Logger LOGGER = LoggerFactory.getLogger(TestCaffeine.class);
public static void main(String[] args) throws Exception {
Cache<String, String> cache = Caffeine.newBuilder() // 构建一个新的Caffeine实例
.maximumSize(100)
.expireAfterWrite(2L, TimeUnit.SECONDS) // 写入后2秒失效
.build(); // 构建Cache接口实例
cache.put("yootk", "www.yootk.com");
for (int x = 0; x < 3; x++) { // 数据的读取
TimeUnit.MILLISECONDS.sleep(1500); // 每次休眠1.5秒
LOGGER.info("【数据访问 - {}】key = {}、value = {}", x, "yootk",
cache.getIfPresent("yootk")); // 日志的输出
}
}
}
5、
package com.yootk.test;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry;
import org.checkerframework.checker.index.qual.NonNegative;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
public class TestCaffeine { // 测试Caffeine组件的基本操作
private static final Logger LOGGER = LoggerFactory.getLogger(TestCaffeine.class);
public static void main(String[] args) throws Exception {
Cache<String, String> cache = Caffeine.newBuilder() // 构建一个新的Caffeine实例
.maximumSize(100)
.expireAfter(new Expiry<String, String>() {
@Override
public long expireAfterCreate(String key, String value, long currentTime) {
LOGGER.info("【创建后失效计算】key = {}、value = {}", key, value);
return TimeUnit.NANOSECONDS.convert(2, TimeUnit.SECONDS);
}
@Override
public long expireAfterUpdate(String key, String value, long currentTime, @NonNegative long currentDuration) {
LOGGER.info("【更新后失效计算】key = {}、value = {}", key, value);
return TimeUnit.NANOSECONDS.convert(5, TimeUnit.SECONDS);
}
@Override
public long expireAfterRead(String key, String value, long currentTime, @NonNegative long currentDuration) {
LOGGER.info("【读取后失效计算】key = {}、value = {}", key, value);
return TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS);
}
})
.build(); // 构建Cache接口实例
cache.put("yootk", "www.yootk.com");
for (int x = 0; x < 3; x++) { // 数据的读取
TimeUnit.MILLISECONDS.sleep(1500); // 每次休眠1.5秒
LOGGER.info("【数据访问 - {}】key = {}、value = {}", x, "yootk",
cache.getIfPresent("yootk")); // 日志的输出
}
}
}
6、
package com.yootk.test;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry;
import org.checkerframework.checker.index.qual.NonNegative;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
public class TestCaffeine { // 测试Caffeine组件的基本操作
private static final Logger LOGGER = LoggerFactory.getLogger(TestCaffeine.class);
public static void main(String[] args) throws Exception {
Cache<String, String> cache = Caffeine.newBuilder() // 构建一个新的Caffeine实例
.maximumSize(100)
.expireAfter(new Expiry<String, String>() {
@Override
public long expireAfterCreate(String key, String value, long currentTime) {
LOGGER.info("【创建后失效计算】key = {}、value = {}", key, value);
return TimeUnit.NANOSECONDS.convert(2, TimeUnit.SECONDS);
}
@Override
public long expireAfterUpdate(String key, String value, long currentTime, @NonNegative long currentDuration) {
LOGGER.info("【更新后失效计算】key = {}、value = {}", key, value);
return TimeUnit.NANOSECONDS.convert(5, TimeUnit.SECONDS);
}
@Override
public long expireAfterRead(String key, String value, long currentTime, @NonNegative long currentDuration) {
LOGGER.info("【读取后失效计算】key = {}、value = {}", key, value);
return TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS);
}
})
.build(); // 构建Cache接口实例
cache.put("yootk", "www.yootk.com");
TimeUnit.SECONDS.sleep(3); // 间隔3秒
LOGGER.info("【数据读取】yootk = {}", cache.getIfPresent("yootk"));
}
}
7、
package com.yootk.test;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
public class TestCaffeine { // 测试Caffeine组件的基本操作
private static final Logger LOGGER = LoggerFactory.getLogger(TestCaffeine.class);
public static void main(String[] args) throws Exception {
Cache<String, String> cache = Caffeine.newBuilder() // 构建一个新的Caffeine实例
.maximumSize(100)
.weakKeys() // 弱引用的KEY
.weakValues() // 弱引用的VALUE
.build(); // 构建Cache接口实例
String key = new String("yootk"); // 妥妥的BUG
String value = new String("www.yootk.com"); // 妥妥的BUG
cache.put(key, value); // 妥妥的弱
LOGGER.info("【GC调用前】yootk = {}", cache.getIfPresent(key));
value = null; // 清空引用
Runtime.getRuntime().gc(); // 强制性的FullGC
TimeUnit.MILLISECONDS.sleep(100); // 等待100毫秒
LOGGER.info("【GC调用后】yootk = {}", cache.getIfPresent(key));
}
}
8、
package com.yootk.test;
import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class TestCaffeine { // 测试Caffeine组件的基本操作
private static final Logger LOGGER = LoggerFactory.getLogger(TestCaffeine.class);
public static void main(String[] args) throws Exception {
AsyncCache<String, String> cache = Caffeine.newBuilder() // 构建一个新的Caffeine实例
.maximumSize(100)
.weakKeys() // 弱引用的KEY
.weakValues() // 弱引用的VALUE
.buildAsync(); // 构建Cache接口实例
String key = new String("yootk"); // 妥妥的BUG
String value = new String("www.yootk.com"); // 妥妥的BUG
cache.put(key, CompletableFuture.completedFuture(value)); // 妥妥的弱
LOGGER.info("【GC调用前】yootk = {}", cache.getIfPresent(key));
value = null; // 清空引用
Runtime.getRuntime().gc(); // 强制性的FullGC
TimeUnit.MILLISECONDS.sleep(100); // 等待100毫秒
LOGGER.info("【GC调用后】yootk = {}", cache.getIfPresent(key));
}
}
缓存数据删除与监听
缓存数据删除监听
- 缓存数据的删除一般会存在有两种形式,一种是超时时间达到后的自动删除,另外一种就是采用手工的删除方式而在每次进行数据删除后,可以利用 RemovalListener 接实现被删除数据的监听,类实现结构如图所示,这样开发者就可以在缓存删除后利用该方法实现一些收尾的操作。
1、
package com.yootk.test;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestCaffeine { // 测试Caffeine组件的基本操作
private static final Logger LOGGER = LoggerFactory.getLogger(TestCaffeine.class);
public static void main(String[] args) throws Exception {
Cache<String, String> cache = Caffeine.newBuilder() // 构建一个新的Caffeine实例
.maximumSize(100)
.build(); // 构建Cache接口实例
cache.put("yootk", "www.yootk.com"); // 保存数据
LOGGER.info("【数据删除前】yootk = {}", cache.getIfPresent("yootk"));
cache.invalidate("yootk"); // 删除指定KEY的缓存
LOGGER.info("【数据删除后】yootk = {}", cache.getIfPresent("yootk"));
}
}
2、
package com.yootk.test;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.awt.image.LookupOp;
import java.util.concurrent.TimeUnit;
public class TestCaffeine { // 测试Caffeine组件的基本操作
private static final Logger LOGGER = LoggerFactory.getLogger(TestCaffeine.class);
public static void main(String[] args) throws Exception {
Cache<String, String> cache = Caffeine.newBuilder() // 构建一个新的Caffeine实例
.maximumSize(100)
.expireAfterAccess(2L, TimeUnit.SECONDS)
.removalListener(new RemovalListener<String, String>() {
@Override
public void onRemoval(@Nullable String key, @Nullable String value, RemovalCause cause) {
LOGGER.info("【数据删除监听】key = {}、value = {}、cause = {}", key, value, cause);
}
})
.build(); // 构建Cache接口实例
cache.put("yootk", "www.yootk.com"); // 保存数据
cache.put("edu", "edu.yootk.com"); // 保存数据
cache.invalidate("yootk"); // 删除指定KEY的缓存
TimeUnit.SECONDS.sleep(5); // 休眠5秒,定时清除
LOGGER.info("【已失效数据】edu = {}", cache.getIfPresent("edu"));
TimeUnit.SECONDS.sleep(Long.MAX_VALUE); // 阻止程序结束
}
}
CacheStats
缓存统计记录
- Caffeine 缓存组件除了提供有强大的缓存处理性能之外,也额外提供了一些缓存数据的都可以对这些操作记录的结果进行记录,这统计功能,每当用户进行缓存数据操作时,看样就可以准确的知道缓存命中数、失效数、驱逐数等统计结果。
1、
package com.yootk.test;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class TestCaffeine { // 测试Caffeine组件的基本操作
private static final Logger LOGGER = LoggerFactory.getLogger(TestCaffeine.class);
public static void main(String[] args) throws Exception {
Cache<String, String> cache = Caffeine.newBuilder() // 构建一个新的Caffeine实例
.expireAfterAccess(1L, TimeUnit.SECONDS)
.maximumSize(100)
.recordStats()
.build(); // 构建Cache接口实例
cache.put("yootk", "www.yootk.com"); // 保存数据
cache.put("edu", "edu.yootk.com"); // 保存数据
cache.put("book", "《Spring开发实战》"); // 保存数据
// 此时设置的候选的KEY数据是有些不存在的,通过这些不存在的数据进行最终的非命中统计操作
String keys[] = new String[] {"yootk", "edu", "book", "lee", "happy"}; // 候选KEY
Random random = new Random(); // 定义随机数
for (int x = 0; x < 100; x++) {
TimeUnit.MILLISECONDS.sleep(15); // 让查询线程适当运行一下
new Thread(()->{
String key = keys[random.nextInt(keys.length)]; // 获取查询的KEY
LOGGER.info("【{}】key = {}、value = {}", Thread.currentThread().getName(),
key, cache.getIfPresent(key));
}, "查询线程 - " + x).start(); // 开启多线程进行查询
}
TimeUnit.SECONDS.sleep(1); // 让查询线程适当运行一下
CacheStats stats = cache.stats(); // 获取统计数据
LOGGER.info("【CacheStats】缓存操作请求次数:{}", stats.requestCount());
LOGGER.info("【CacheStats】缓存命中次数:{}", stats.hitCount());
LOGGER.info("【CacheStats】缓存未命中次数:{}", stats.missCount());
// 所有的缓存组件里面,最为重要的一项性能指标就是命中率的处理问题了
LOGGER.info("【CacheStats】缓存命中率:{}", stats.hitRate());
LOGGER.info("【CacheStats】缓存驱逐次数:{}", stats.evictionCount());
}
}
缓存驱逐算法
缓存驱逐算法
- 缓存技术发展的最初目的就是为了解决数据的读取性能,但是在缓存之中如果保存了过多的数据项,则最终一定会产生内存溢出问题,所以就必须设计一种数据的缓存算法在空间不足时能够进行数据的驱逐,给新数据的存储增加可用的空间,而为了实现这样的机制,提供有三类缓存算法:FIFO、LRU、LFU(其内部又扩展了 TinyLFU 算法与 WTinyLFU 算法)
FIFO(FirstInput First Output、先进先出)缓存算法
这是一种早期使用的缓存算法,采用队列的形式实现缓存的存储,实现的核心依据在于:较早保存在缓存中的数据有可能不会再使用,所以一旦缓存中的容量不足时,会通过一个指针进行队首数据的删除,以置换出新的存储空间,保存新增的缓存项
FIFO 是一种实现简单的缓存算法,但是这种算法会存在有一个“缺页率”的问题,如果最早存储的缓存数据一直属于热点数据,但是由于队列长度的限制,有可能会将这个热点数据删除而造成缓存数据丢失的问题。当缓存队列中的很多热点数据被清除之后就会增加缺页率,这样的现象被称为(迟到)现象,而造成该现象的主要原因是在于该算法与缓存中的"Be ady"数据访问不相容,并且缓存命中率很低,现在已经很少使用了。
RU(The Least Recently Used、最近最久未使用)缓存算法
- 该算法的主要特点不再是依据保存时间进行数据项的清除,而是通过数据最后一次访问的时间戳来进行排查,当缓存空间已经满员时,但会将最久没有访问的数据进行清除。LRU 算法是一种常见的缓存算法,在 Redis 以及 Memcached 分布式缓存之中使用较多
LFU(Least Frequently Used、最近最少使用)缓存算法
- 在缓存中的数据在最近一段时间很少被访问到,那么其将来可能被访问到的可能性也很小,这样当缓存空间已满时,最小访问频率的缓存数据将被删除,如果此时缓存中保存的数据访问计数全部为 1,则不会删除缓存的数据,同时也不保存新的缓存数据。
TinyLFU 算法
- 使用 LFU 算法可以在固定的一段时间之内达到较高的命中率,但是在 LFU 算法中需要维持缓存会存在有额外的开销。由于在该算法中所有的数据都记录的频率信息(每次访问都要更新)依据统计进行保存,所以当面对突发性的稀疏流量(Sparse Bursts)访问时会因为记录频次的问题而无法在缓存中存储,而导致业务逻辑出现偏差,所以为了解决 LFU 所存在的问题,就需要提供一个优化算法,这样才有了 TinyLFU 算法
- TinyLFU 为了解决缓存频率信息记录空间的问题,采用了 Sketching 数据流技术,使用了一个 Count-Min Sketch 算法,在该算法中认为数据访问 15 次就可以作为一个热点数据存在,而后可以按照位的方式进行统计(-一个 long 数据类型可以保存 64 位的数据而后可以实现 16 个数据的统计)这样就避免了采用传统 Map 实现统计频次的操作,从而节约了数据的体积。而面对新的数据无法追加缓存的问题,在 TinyLFU 中采用了一种“保持新鲜”的机制,该机制的主要特点就是当整体的统计数据达到一个顶峰数值后,所有记录的频率统计除 2,这样已有高频次的数据就会降低频次。
W-TinyLFU 算法
- LRU 算法实现较为简单,同时也表现出了较好的命中率,面对突发性的稀疏流量表现很好,可以很好的适应热点数据的访问,但是如果有些冷数据(该数据已经被缓存淘汰了)突然访问量激增,则会重新加载该数据到缓存之中,由于会存在加载完后数据再度变冷的可能,所以该算法会造成缓存污染。但是这种稀疏流量的缓存操作确实 TinyLFU 算法所缺少的,因为新的缓存数据可能还没有积攒到足够的访问频导致命中率下降,所以针对于此类问题,在 Caffeine 中设计出了 W-TinyLFU(率就已经被剔除了 Window TinyLFU)算法
- 在 W-TinyLFU 算法之中,将整个的缓存区域分为两块,一块是 Window 缓存区(大小为当前缓存内存的 1%),另一块为主缓存区(大小为当前缓存内存的 99%),而后在主缓存区中由分为 Protectedx(大小为 80%)和 Probation 区(大小为 20%)。新增加的缓存数据全部保存在 Window 区域,这样就可以解决稀疏流量的缓存加载问题,当 Window 区域已经填满后,会将里面的候选缓存数据保存在主缓存区的 Probation 区域内,而当 Probation 区域也满员后,则会通过 TinyLFU 过滤器进行比对,保留有价值的候选数据,而无价值的数据则直接驱逐。
Caffeine 数据存储结构
Caffeine 核心存储类结构
- 在 Caffeine 之中由于需要考虑到并发数据的缓存写入,同时也需要考虑到数据读取的性能,本质上是基于并发 Map 集合的方式(ConcurrentMap 接口)实现了数据的存储,而除了基本的数据存储之外,还需要考虑到数据的驱逐策略(引用驱逐、失效时间驱逐等):为此专门设计了 Node 抽象类,该类为 Deque 接口的实现子类,并且在该类中定义了访问顺序与写入顺序的实现
Node 及相关子类
- Node 节点提供了缓存项的基本存储结构的配置,但是这个类在实际的使用之中又需要考虑到引用策略的问题,例如:在使用 Caffeine 类创建缓存时可以定义弱引用或软引用的存储结构。为了使缓存可以满足此种存储结构的设计,基于 Node 类创建了若干个不同的子类,包括有:FS 子类、FD 子类以及 FW 子类
缓存数据存储结构
- 在 BoundedLocalCache 类中定义的 ConcurrentHashMap 集合属性时,采用的 KEV 的类型为 Object,而该 Object 可能是一个普通的强引用的 KEY 类型,也有可能是一个引用队列,以 FW 子类为实现说明
1、
public static final int WINDOW = 0; // 【节点类型】WINDOW区节点
public static final int PROBATION = 1; // 【节点类型】试用区节点
public static final int PROTECTED = 2; // 【节点类型】保护区节点
2、
public Object newLookupKey(Object var1) {
return new References.LookupKeyReference(var1);
}
3、
public LookupKeyReference(K key) {
this.hashCode = System.identityHashCode(key); // 生成HashCode
this.key = requireNonNull(key);
}
public K get() {
return key;
}
缓存数据存储源码分析
手工缓存操作结构
- 每当用户创建了 Cache 实例之后,会根据不同的缓存类型选择不同的实例,本次以手工缓存(LocalManualCache 接口)的操作为例,进行操作的调用分析,其中最核心的方法就是 BoundedLocalCache 子类所提供的 put()方法。
put()方法操作步骤
- put()方法除了进行数据的保存之外,:还提供有数据更新的功能,所以每一次保存前都需要根据 KEY 进行 Map 集合的查找,:如果发现数据不存在则存储新的数据并返回 nu1,如果数据存在则根据 KEY 进行数据替换,并返回原始数据;
- 数据存储或更新时需要配置有不同的任务线程,即:该操作不占用主线程资源,,就防上出石驾德罗得旻家版带来的阻塞问题,并且每次存储完成后也都会调用相应的异步线程
- 在数据保存过程之中会持续进行节点失效时间的更新,根据当前的操作情况动态决定是采用读取更新还是写入更新,如果发现有节点失效,则会触发缓存驱逐操作;
1、
package com.yootk.test;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
public class TestCaffeine { // 测试Caffeine组件的基本操作
private static final Logger LOGGER = LoggerFactory.getLogger(TestCaffeine.class);
public static void main(String[] args) throws Exception {
Cache<String, String> cache = Caffeine.newBuilder() // 构建一个新的Caffeine实例
.expireAfterAccess(1L, TimeUnit.SECONDS)
.maximumSize(100)
.recordStats()
.build(); // 构建Cache接口实例
cache.put(null, null); // 保存数据
}
}
2、
@Nullable V put(K key, V value, Expiry<K, V> expiry, boolean onlyIfAbsent) {
requireNonNull(key); // 检查KEY是否为空
requireNonNull(value); // 检查VALUES是否为空
Node<K, V> node = null; // 声明一个节点
long now = expirationTicker().read();// 获取当前的时间
int newWeight = weigher.weigh(key, value); // 权重的计算
for (;;) {
Node<K, V> prior = data.get(nodeFactory.newLookupKey(key)); // 获取节点
if (prior == null) { // 节点不存在
if (node == null) { // 节点为空
node = nodeFactory.newNode(key, keyReferenceQueue(),
value, valueReferenceQueue(), newWeight, now); // 创建新节点
setVariableTime(node, expireAfterCreate(key, value, expiry, now)); // 失效配置
}
prior = data.putIfAbsent(node.getKeyReference(), node); // Map集合查询数据
if (prior == null) { // 未存储过
afterWrite(new AddTask(node, newWeight)); // 数据写入
return null; // 操作结束
} else if (onlyIfAbsent) { // 未关联过
// An optimistic fast path to avoid unnecessary locking
V currentValue = prior.getValue();// 获取保存的VALUE数据项
if ((currentValue != null) && !hasExpired(prior, now)) { // 是否失效
if (!isComputingAsync(prior)) { // 是否为异步模型
tryExpireAfterRead(prior, key, currentValue, expiry(), now); // 更新访问时间
setAccessTime(prior, now); // 设置访问时间
}
afterRead(prior, now, /* recordHit */ false); // 数据读取处理
return currentValue; // 返回结果
}
}
} else if (onlyIfAbsent) { // 数据未关联
// An optimistic fast path to avoid unnecessary locking
V currentValue = prior.getValue();// 获取当前的内容
if ((currentValue != null) && !hasExpired(prior, now)) { // 内容是否失效
if (!isComputingAsync(prior)) { // 是否为异步
tryExpireAfterRead(prior, key, currentValue, expiry(), now); // 失效读取操作
setAccessTime(prior, now);
}
afterRead(prior, now, /* recordHit */ false);
return currentValue;
}
} else {
discardRefresh(prior.getKeyReference());// 失效处理
}
V oldValue; // 保存已经存储过的数据
long varTime; // 保存时间
int oldWeight; // 已有数据的权重
boolean expired = false; // 失效的状态
boolean mayUpdate = true; // 更新的状态
boolean exceedsTolerance = false; // 保存失效状态
synchronized (prior) { // 同步的更新处理
if (!prior.isAlive()) { // 是否还存在节点
continue;
}
oldValue = prior.getValue();// 获取已有的数据
oldWeight = prior.getWeight();// 获取已有的权重
if (oldValue == null) { // 数据为空
varTime = expireAfterCreate(key, value, expiry, now); // 失效处理
notifyEviction(key, null, RemovalCause.COLLECTED); // 通知回收
} else if (hasExpired(prior, now)) { // 判断是否失效
expired = true; // 设置失效的状态
varTime = expireAfterCreate(key, value, expiry, now); // 失效处理
notifyEviction(key, oldValue, RemovalCause.EXPIRED); // 通知回收
} else if (onlyIfAbsent) {
mayUpdate = false; // 更新的标记
varTime = expireAfterRead(prior, key, value, expiry, now);
} else {
varTime = expireAfterUpdate(prior, key, value, expiry, now);
}
if (mayUpdate) { // 判断当前的更新标记
exceedsTolerance =
(expiresAfterWrite() && (now - prior.getWriteTime()) > EXPIRE_WRITE_TOLERANCE)
|| (expiresVariable()
&& Math.abs(varTime - prior.getVariableTime()) > EXPIRE_WRITE_TOLERANCE);
setWriteTime(prior, now); // 写入时间
prior.setWeight(newWeight); // 写入的权重
prior.setValue(value, valueReferenceQueue());// 写入的数据
}
setVariableTime(prior, varTime);
setAccessTime(prior, now);
}
if (expired) { // 存在有失效数据
notifyRemoval(key, oldValue, RemovalCause.EXPIRED); // 删除处理
} else if (oldValue == null) {
notifyRemoval(key, /* oldValue */ null, RemovalCause.COLLECTED);
} else if (mayUpdate) {
notifyOnReplace(key, oldValue, value);
}
int weightedDifference = mayUpdate ? (newWeight - oldWeight) : 0; // 权重差
if ((oldValue == null) || (weightedDifference != 0) || expired) {
afterWrite(new UpdateTask(prior, weightedDifference)); // 数据写入
} else if (!onlyIfAbsent && exceedsTolerance) {
afterWrite(new UpdateTask(prior, weightedDifference)); // 数据写入
} else {
if (mayUpdate) {
setWriteTime(prior, now);
}
afterRead(prior, now, /* recordHit */ false);
}
return expired ? null : oldValue;
}
}
频次记录源码分析
数据更新处理
- 在使用 put()方法进行数据保存时,有可能会触发两个操作任务,分别是 AddTask 与 UpdateTask,这两个类都属于缓存操作的内部类,并且全部实现了 Runnable 接口同时 BoundedLocalCache 的内部提供有一个 frequencySketch()方法,该方法可以获取到 FrequencySketch 类的对象实例,即,可以通过该类对象实现缓存数据项的频次统计
1、
package com.yootk.test;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
public class TestCaffeine { // 测试Caffeine组件的基本操作
private static final Logger LOGGER = LoggerFactory.getLogger(TestCaffeine.class);
public static void main(String[] args) throws Exception {
Cache<String, String> cache = Caffeine.newBuilder() // 构建一个新的Caffeine实例
.expireAfterAccess(1L, TimeUnit.SECONDS)
.maximumSize(1)
.recordStats()
.build(); // 构建Cache接口实例
cache.put("yootk", "www.yootk.com"); // 保存数据
cache.put("edu", "edu.yootk.com"); // 保存数据
}
}
2、
@GuardedBy("evictionLock")
void onAccess(Node<K, V> node) {
if (evicts()) { // 数据是否被清除
K key = node.getKey();// 获取数据
if (key == null) { // key为空,直接返回null
return;
}
frequencySketch().increment(key); // 频次访问记录
if (node.inWindow()) {
reorder(accessOrderWindowDeque(), node); // Window队列的记录
} else if (node.inMainProbation()) {
reorderProbation(node); // 试用区记录
} else {
reorder(accessOrderProtectedDeque(), node); // 保护区记录
}
setHitsInSample(hitsInSample() + 1); // 更新命中数量
} else if (expiresAfterAccess()) {
reorder(accessOrderWindowDeque(), node);
}
if (expiresVariable()) {
timerWheel().reschedule(node); // 时间轮调度算法(Caffeine中最值钱的部分)
}
}
3、
int sampleSize; // 降频的样本量,最大值的10倍
int tableMask; // 获取table索引的掩码(一个long记录16个数据)
long[] table; // 保存频次的数据
int size; // 统计长度
public void increment(E e) { // 频次增长
if (isNotInitialized()) { // 初始化判断
return;
}
// 根据KEY来进行Hash数据的获取,但是考虑到Hash分配不均匀的问题,所以此时要再HASH处理
int hash = spread(e.hashCode());// 计算新的HashValue
int start = (hash & 3) << 2; // 计算tab中long数据位的起始定位
// 根据不同的种子内容,计算出不同统计数据的下标 - Loop unrolling improves throughput by 5m ops/s
int index0 = indexOf(hash, 0); // 获取table下表
int index1 = indexOf(hash, 1); // 获取table下表
int index2 = indexOf(hash, 2); // 获取table下表
int index3 = indexOf(hash, 3); // 获取table下表
boolean added = incrementAt(index0, start); // 计算“start + 0”位置的频次
added |= incrementAt(index1, start + 1); // 计算“start + 1”位置的频次
added |= incrementAt(index2, start + 2); // 计算“start + 2”位置的频次
added |= incrementAt(index3, start + 3); // 计算“start + 3”位置的频次
if (added && (++size == sampleSize)) { // 样本的统计
reset();// 数据降频处理
}
}
4、
boolean incrementAt(int i, int j) {
int offset = j << 2; // 计算偏移量
long mask = (0xfL << offset); // 计算掩码
if ((table[i] & mask) != mask) { // 判断当前掩码的结果(4位内容最多就是15)
table[i] += (1L << offset); // 不是15追加1
return true; // 频次增加完成
}
return false; // 超过15的频次增加
}
5、
void reset() {
int count = 0;
for (int i = 0; i < table.length; i++) {
count += Long.bitCount(table[i] & ONE_MASK);
table[i] = (table[i] >>> 1) & RESET_MASK;
}
size = (size - (count >>> 2)) >>> 1; // 移位的处理操作
}
6、
public int frequency(E e) {
if (isNotInitialized()) {
return 0;
}
int hash = spread(e.hashCode());// HASH处理
int start = (hash & 3) << 2;
int frequency = Integer.MAX_VALUE; // 最大的频次定义
for (int i = 0; i < 4; i++) { // 通过循环获取一个频次的内容
int index = indexOf(hash, i); // 根据当前的Hash内容获取到个索引
int count = (int) ((table[index] >>> ((start + i) << 2)) & 0xfL); // 获取次数
frequency = Math.min(frequency, count); // 返回最小值
}
return frequency;
}
缓存驱逐源码分析
1、
afterWrite(new AddTask(node, newWeight));
2、
void afterWrite(Runnable task) {
for (int i = 0; i < WRITE_BUFFER_RETRIES; i++) {
if (writeBuffer.offer(task)) {
scheduleAfterWrite();
return;
}
scheduleDrainBuffers(); // 实现缓存页面的替换操作
}
lock();
try {
maintenance(task);
} catch (RuntimeException e) {
logger.log(Level.ERROR, "Exception thrown when performing the maintenance task", e);
} finally {
evictionLock.unlock();
}
}
3、
void scheduleDrainBuffers() {
if (drainStatus() >= PROCESSING_TO_IDLE) {
return;
}
if (evictionLock.tryLock()) { // 在进行缓存数据排除的时候需要进行同步的锁定
try {
int drainStatus = drainStatus();
if (drainStatus >= PROCESSING_TO_IDLE) {
return;
}
setDrainStatusRelease(PROCESSING_TO_IDLE);
executor.execute(drainBuffersTask);
} catch (Throwable t) {
logger.log(Level.WARNING, "Exception thrown when submitting maintenance task", t);
maintenance(/* ignored */ null);
} finally {
evictionLock.unlock();
}
}
}
4、
void maintenance(@Nullable Runnable task) {
setDrainStatusRelease(PROCESSING_TO_IDLE);
try {
drainReadBuffer();
drainWriteBuffer();
if (task != null) {
task.run();
}
drainKeyReferences();
drainValueReferences();
expireEntries(); // 让实体数据失效
evictEntries(); // 实体数据的清除
climb();
} finally {
if ((drainStatus() != PROCESSING_TO_IDLE) || !casDrainStatus(PROCESSING_TO_IDLE, IDLE)) {
setDrainStatusOpaque(REQUIRED);
}
}
}
5、
void evictEntries() {
if (!evicts()) {
return;
}
int candidates = evictFromWindow();// 从Window区清除所需要的实体数据
evictFromMain(candidates); // 从Main区清除所需要的实体数据
}
6、
int evictFromWindow() {
int candidates = 0; // 清除候选缓存项的个数
Node<K, V> node = accessOrderWindowDeque().peek();// 通过Window队列获取节点
while (windowWeightedSize() > windowMaximum()) { // 数据过多
// The pending operations will adjust the size to reflect the correct weight
if (node == null) { // 没有节点
break; // 直接退出循环循环
}
Node<K, V> next = node.getNextInAccessOrder();// 获取下一个节点
if (node.getPolicyWeight() != 0) { // 权重判断
node.makeMainProbation();// 保存在了Probation空间
accessOrderWindowDeque().remove(node); // Window队列数据移除
accessOrderProbationDeque().add(node); // Probation队列头部增加数据
candidates++; // 候选的个数自增
setWindowWeightedSize(windowWeightedSize() - node.getPolicyWeight());// 修改Window区权重
}
node = next;
}
return candidates; // 获取移动缓存项的数量
}
7、
void evictFromMain(int candidates) { // 通过主缓存区进行清除
int victimQueue = PROBATION; // 试用缓冲区的标记
Node<K, V> victim = accessOrderProbationDeque().peekFirst();// 头部获取淘汰节点
Node<K, V> candidate = accessOrderProbationDeque().peekLast();// 尾部获取候选节点
while (weightedSize() > maximum()) { // 缓存区数量过多
// Search the admission window for additional candidates
if (candidates == 0) { // 移动的候选数量为0
candidate = accessOrderWindowDeque().peekLast();// 通过头部获取一个候选节点
}
// Try evicting from the protected and window queues,尝试通过受保护队列和Window队列驱逐元素
if ((candidate == null) && (victim == null)) { // 移动节点为空
if (victimQueue == PROBATION) { // 队列为试用队列
victim = accessOrderProtectedDeque().peekFirst();// 获取受保护队列第一个元素
victimQueue = PROTECTED; // 切换队列
continue;
} else if (victimQueue == PROTECTED) { // 判断队列类型
victim = accessOrderWindowDeque().peekFirst();// 获取Window队列的第一个元素
victimQueue = WINDOW; // 切换队列
continue;
}
// The pending operations will adjust the size to reflect the correct weight
break;
}
// Skip over entries with zero weight
if ((victim != null) && (victim.getPolicyWeight() == 0)) { // 跨过零权重的实体数据
victim = victim.getNextInAccessOrder();
continue;
} else if ((candidate != null) && (candidate.getPolicyWeight() == 0)) {
candidate = (candidates > 0)
? candidate.getPreviousInAccessOrder()// 获取权重为0的候选
: candidate.getNextInAccessOrder();
candidates--; // 候选的数量减1
continue;
}
// Evict immediately if only one of the entries is present
if (victim == null) { // 驱逐类型为空
@SuppressWarnings("NullAway")
Node<K, V> previous = candidate.getPreviousInAccessOrder();// 候选父元素
Node<K, V> evict = candidate; // 候选元素的交换
candidate = previous; // 候选节点的修改
candidates--; // 减少驱逐的个数
evictEntry(evict, RemovalCause.SIZE, 0L); // 元素驱逐处理
continue;
} else if (candidate == null) { // 候选元素为空
Node<K, V> evict = victim; // 获取驱逐的元素项
victim = victim.getNextInAccessOrder();
evictEntry(evict, RemovalCause.SIZE, 0L); // 元素驱逐处理
continue;
}
// Evict immediately if an entry was collected
K victimKey = victim.getKey();// 获取驱逐数据的KEY
K candidateKey = candidate.getKey();// 获取候选项的KEY
if (victimKey == null) { // 当前存在有驱逐KEY
Node<K, V> evict = victim; // 配置驱逐项
victim = victim.getNextInAccessOrder();
evictEntry(evict, RemovalCause.COLLECTED, 0L);
continue;
} else if (candidateKey == null) {
Node<K, V> evict = candidate;
candidate = (candidates > 0)
? candidate.getPreviousInAccessOrder()
: candidate.getNextInAccessOrder();
candidates--;
evictEntry(evict, RemovalCause.COLLECTED, 0L);
continue;
}
// Evict immediately if the candidate's weight exceeds the maximum
if (candidate.getPolicyWeight() > maximum()) { // 权重的驱逐处理
Node<K, V> evict = candidate;
candidate = (candidates > 0)
? candidate.getPreviousInAccessOrder()
: candidate.getNextInAccessOrder();
candidates--;
evictEntry(evict, RemovalCause.SIZE, 0L);
continue;
}
// Evict the entry with the lowest frequency
candidates--;
if (admit(candidateKey, victimKey)) { // 试用队列的驱逐操作
Node<K, V> evict = victim;
victim = victim.getNextInAccessOrder();
evictEntry(evict, RemovalCause.SIZE, 0L);
candidate = candidate.getPreviousInAccessOrder();
} else { // 候选队列的处理
Node<K, V> evict = candidate;
candidate = (candidates > 0)
? candidate.getPreviousInAccessOrder()
: candidate.getNextInAccessOrder();
evictEntry(evict, RemovalCause.SIZE, 0L);
}
}
}
8、
/** The initial percent of the maximum weighted capacity dedicated to the main space. */
static final double PERCENT_MAIN = 0.99d; // 主缓存区所占比率
/** The percent of the maximum weighted capacity dedicated to the main's protected space. */
static final double PERCENT_MAIN_PROTECTED = 0.80d; // 保护队列所占比率
/** The difference in hit rates that restarts the climber. */
static final double HILL_CLIMBER_RESTART_THRESHOLD = 0.05d; // 命中差异率
/** The percent of the total size to adapt the window by. */
static final double HILL_CLIMBER_STEP_PERCENT = 0.0625d; // Window调整大小百分比
/** The rate to decrease the step size to adapt by. */
static final double HILL_CLIMBER_STEP_DECAY_RATE = 0.98d; // 空间减少比率
TimerWheel
超时队列驱逐
- 缓存中的数据都存在有时效性,当超过了指定的时效后就应该进行相关缓存数据的驱逐用户使用 Caffeine 类进行缓存创建时,提供有三个缓存过期的处理方法,分别是:“expireAfterAccess()”访问超时统计,以及“expireAfterWrite()”写入超时统计“expireAfter()”自定义超时处理。前两种超时策略只需要设置好超时的时间间隔即可而后所有的数据按照超时时间顺序的保存在 AccessOrderDeque 双端队列(如果是写入后超时则保存在 WriteOrderDeque 队列),队列的头部保存的是即将过期的缓存项当过期时间一到则自动弹出元素
时间轮算法
- 时间轮是一种环形的数据结构,可以将其内部划分为若干个不同的时间格子,每一个格,每一个格子都对应有一子代表一个时间(时间格子划分的越短,时间的精度就越高)个链表,并在该链表中保存全部的到期任务。所有的新任务依据一定的求算法,保存在合适的格子之中,在任务执行时会有一个指针随着时间转动到对应的格子之中,并执行相应格子中的到期任务,从而解决了自定义过期驱逐任务设计中的性能问题
时间轮结构
- 由于不同应用中时间轮定义的精度不同(时间格子划分不精细),所以时间轮失效的处理仅仅只能够使用一种非实时性的方式进行处理,虽然牺牲了精确度,但是却保证了性能,在 Caffeine 之中提供了 TimerWheel 类进行时间轮的功能实现
1、
boolean evictEntry(Node<K, V> node, RemovalCause cause, long now) {
K key = node.getKey();// 数据的驱逐肯定要获取数据的KEY
@SuppressWarnings("unchecked")
V[] value = (V[]) new Object[1];
boolean[] removed = new boolean[1];
boolean[] resurrect = new boolean[1];
Object keyReference = node.getKeyReference();
RemovalCause[] actualCause = new RemovalCause[1]; // 驱逐原因
data.computeIfPresent(keyReference, (k, n) -> { // 所有的数据保存在Map集合之中
if (n != node) {
return n;
}
synchronized (n) { // 进行数据驱逐的同步处理
value[0] = n.getValue();
if ((key == null) || (value[0] == null)) {
actualCause[0] = RemovalCause.COLLECTED;
} else if (cause == RemovalCause.COLLECTED) {
resurrect[0] = true;
return n;
} else {
actualCause[0] = cause;
}
if (actualCause[0] == RemovalCause.EXPIRED) {
boolean expired = false;
if (expiresAfterAccess()) { // 判断驱逐的操作
expired |= ((now - n.getAccessTime()) >= expiresAfterAccessNanos());
}
if (expiresAfterWrite()) { // 判断驱逐的操作
expired |= ((now - n.getWriteTime()) >= expiresAfterWriteNanos());
}
if (expiresVariable()) { // 判断驱逐的操作
expired |= (n.getVariableTime() <= now);
}
if (!expired) {
resurrect[0] = true;
return n;
}
} else if (actualCause[0] == RemovalCause.SIZE) {
int weight = node.getWeight();
if (weight == 0) {
resurrect[0] = true;
return n;
}
}
notifyEviction(key, value[0], actualCause[0]); // 发出一个去追驱逐监听处理
makeDead(n);
}
discardRefresh(keyReference);
removed[0] = true;
return null;
});
// The entry is no longer eligible for eviction
if (resurrect[0]) {
return false;
}
// If the eviction fails due to a concurrent removal of the victim, that removal may cancel out
// the addition that triggered this eviction. The victim is eagerly unlinked before the removal
// task so that if an eviction is still required then a new victim will be chosen for removal.
if (node.inWindow() && (evicts() || expiresAfterAccess())) {
accessOrderWindowDeque().remove(node);
} else if (evicts()) {
if (node.inMainProbation()) {
accessOrderProbationDeque().remove(node);
} else {
accessOrderProtectedDeque().remove(node);
}
}
if (expiresAfterWrite()) {
writeOrderDeque().remove(node);
} else if (expiresVariable()) {
timerWheel().deschedule(node);
}
if (removed[0]) {
statsCounter().recordEviction(node.getWeight(), actualCause[0]);
// Notify the listener only if the entry was evicted. This must be performed as the last
// step during eviction to safeguard against the executor rejecting the notification task.
notifyRemoval(key, value[0], actualCause[0]);
} else {
// Eagerly decrement the size to potentially avoid an additional eviction, rather than wait
// for the removal task to do it on the next maintenance cycle.
makeDead(node);
}
return true;
}
2、
protected AccessOrderDeque<Node<K, V>> accessOrderWindowDeque() {
throw new UnsupportedOperationException();
}
protected AccessOrderDeque<Node<K, V>> accessOrderProbationDeque() {
throw new UnsupportedOperationException();
}
protected AccessOrderDeque<Node<K, V>> accessOrderProtectedDeque() {
throw new UnsupportedOperationException();
}
3、
public void schedule(Node<K, V> node) {
Node<K, V> sentinel = findBucket(node.getVariableTime());
link(sentinel, node);
}
void link(Node<K, V> sentinel, Node<K, V> node) {
node.setPreviousInVariableOrder(sentinel.getPreviousInVariableOrder());
node.setNextInVariableOrder(sentinel);
sentinel.getPreviousInVariableOrder().setNextInVariableOrder(node);
sentinel.setPreviousInVariableOrder(node);
}
SpringCache 组件概述
SpringCache 实现缓存统一整合
- 项目应用中通过缓存组件可以实现应用的处理性能,但是在现实的开发环境中,会存在有不同的缓存组件,例如:常见单机版缓存组件包括 Caffeine、EHCache,常见分布式缓存组件包括 Memcached、Redis
1、
DROP DATABASE IF EXISTS yootk;
CREATE DATABASE yootk CHARACTER SET UTF8;
USE yootk;
CREATE TABLE emp(
eid VARCHAR(50) comment '雇员编号',
ename VARCHAR(50) comment '雇员姓名',
job VARCHAR(50) comment '雇员职位',
salary DOUBLE comment '基本工资',
CONSTRAINT pk_eid PRIMARY KEY(eid)
) engine=innodb;
INSERT INTO emp(eid, ename, job, salary) VALUES ('muyan', '张易言', '职员', 2500);
INSERT INTO emp(eid, ename, job, salary) VALUES ('yootk', '李沐文', '讲师', 5000);
INSERT INTO emp(eid, ename, job, salary) VALUES ('mylee', '李兴华', '讲师', 3000);
INSERT INTO emp(eid, ename, job, salary) VALUES ('wings', '王子琪', '经理', 3500);
2、
project(":cache") {
dependencies{
implementation(libraries.'caffeine')
implementation(libraries.'mysql-connector-java')
implementation(libraries.'jakarta.persistence-api')
implementation(libraries.'hibernate-core')
implementation(libraries.'hibernate-hikaricp')
implementation(libraries.'hibernate-core-jakarta')
implementation(libraries.'hibernate-jcache')
implementation(libraries.'spring-aop')
implementation(libraries.'spring-aspects')
implementation(libraries.'ehcache')
implementation(libraries.'HikariCP')
implementation(libraries.'spring-data-jpa')
}
}
3、
# 【JDBC】配置JDBC驱动程序
yootk.database.driverClassName=com.mysql.cj.jdbc.Driver
# 【JDBC】配置JDBC连接地址
yootk.database.jdbcUrl=jdbc:mysql://localhost:3306/yootk
# 【JDBC】数据库用户名
yootk.database.username=root
# 【JDBC】配置数据库密码
yootk.database.password=mysqladmin
# 【HikariCP】定义连接超时时间
yootk.database.connectionTimeOut=3000
# 【HikariCP】是否为只读操作
yootk.database.readOnly=false
# 【HikariCP】定义一个连接最小维持的时长
yootk.database.pool.idleTimeOut=3000
# 【HikariCP】定义一个连接最大的保存时间
yootk.database.pool.maxLifetime=6000
# 【HikariCP】定义数据库连接池最大长度
yootk.database.pool.maximumPoolSize=60
# 【HikariCP】定义数据库连接池最小的维持数量
yootk.database.pool.minimumIdle=20
4、
# 【JPA使用环境】开启DDL自动更换操作机制,如果发现数据表的结构有问题时,自动进行更新
hibernate.hbm2ddl.auto=update
# 【JPA使用环境】是否要显示出每次执行的SQL命令
hibernate.show_sql=true
# 【JPA使用环境】是否采用格式化的方式进行SQL显示
hibernate.format_sql=false
# 【JPA二级缓存】启用二级缓存配置
hibernate.cache.use_second_level_cache=true
# 【JPA二级缓存】二级缓存的工厂类,本次为jcache
hibernate.cache.region.factory_class=org.hibernate.cache.jcache.internal.JCacheRegionFactory
# 【JPA二级缓存】配置JSR-107缓存标准实现子类
hibernate.javax.cache.provider=org.ehcache.jsr107.EhcacheCachingProvider
5、
# 【JPA使用环境】开启DDL自动更换操作机制,如果发现数据表的结构有问题时,自动进行更新
hibernate.hbm2ddl.auto=update
# 【JPA使用环境】是否要显示出每次执行的SQL命令
hibernate.show_sql=true
# 【JPA使用环境】是否采用格式化的方式进行SQL显示
hibernate.format_sql=false
# 【JPA二级缓存】启用二级缓存配置
hibernate.cache.use_second_level_cache=true
# 【JPA二级缓存】二级缓存的工厂类,本次为jcache
hibernate.cache.region.factory_class=org.hibernate.cache.jcache.internal.JCacheRegionFactory
# 【JPA二级缓存】配置JSR-107缓存标准实现子类
hibernate.javax.cache.provider=org.ehcache.jsr107.EhcacheCachingProvider
6、
package com.yootk.config;
import com.zaxxer.hikari.HikariDataSource;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import javax.sql.DataSource;
@Configuration // 配置类
@PropertySource("classpath:config/database.properties") // 读取资源文件
public class DataSourceConfig {
@Value("${yootk.database.driverClassName}") // 绑定资源文件中的配置数据项
private String driverClassName; // JDBC驱动程序
@Value("${yootk.database.jdbcUrl}") // 绑定资源文件中的配置数据项
private String jdbcUrl; // JDBC连接地址
@Value("${yootk.database.username}") // 绑定资源文件中的配置数据项
private String username; // JDBC连接用户名
@Value("${yootk.database.password}") // 绑定资源文件中的配置数据项
private String password; // JDBC连接密码
@Value("${yootk.database.connectionTimeOut}") // 绑定资源文件中的配置数据项
private long connectionTimeout; // JDBC连接超时时间
@Value("${yootk.database.readOnly}") // 绑定资源文件中的配置数据项
private boolean readOnly; // 是否为只读数据库
@Value("${yootk.database.pool.idleTimeOut}") // 绑定资源文件中的配置数据项
private long idleTimeOut; // 连接池之中的,一个连接的存活最小时间
@Value("${yootk.database.pool.maxLifetime}") // 绑定资源文件中的配置数据项
private long maxLifetime; // 连接池之中的,一个连接的存活最长时间
@Value("${yootk.database.pool.maximumPoolSize}") // 绑定资源文件中的配置数据项
private int maximumPoolSize; // 连接池最大维持的长度
@Value("${yootk.database.pool.minimumIdle}") // 绑定资源文件中的配置数据项
private int minimumIdle; // 连接池最小维持的长度
@Bean("dataSource") // 配置Bean
public DataSource dataSource() {
HikariDataSource dataSource = new HikariDataSource();
dataSource.setDriverClassName(this.driverClassName);
dataSource.setJdbcUrl(this.jdbcUrl);
dataSource.setUsername(this.username);
dataSource.setPassword(this.password);
dataSource.setConnectionTimeout(this.connectionTimeout);
dataSource.setReadOnly(this.readOnly);
dataSource.setIdleTimeout(this.idleTimeOut);
dataSource.setMaxLifetime(this.maxLifetime);
dataSource.setMaximumPoolSize(this.maximumPoolSize);
dataSource.setMinimumIdle(this.minimumIdle);
return dataSource;
}
}
7、
package com.yootk.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
@Configuration // 配置类
public class TransactionConfig {
@Bean // 事务的注册
public PlatformTransactionManager transactionManager(DataSource dataSource) {
// PlatformTransactionManager是一个事务的控制标准,而后不同的数据库组件需要实现该标准。
JpaTransactionManager transactionManager = new JpaTransactionManager(); // 事务实现子类
transactionManager.setDataSource(dataSource); // 设置数据源
return transactionManager;
}
}
8、
package com.yootk.config;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.aop.Advisor;
import org.springframework.aop.aspectj.AspectJExpressionPointcut;
import org.springframework.aop.support.DefaultPointcutAdvisor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionManager;
import org.springframework.transaction.interceptor.NameMatchTransactionAttributeSource;
import org.springframework.transaction.interceptor.RuleBasedTransactionAttribute;
import org.springframework.transaction.interceptor.TransactionAttribute;
import org.springframework.transaction.interceptor.TransactionInterceptor;
import java.util.HashMap;
import java.util.Map;
@Configuration
@Aspect // AOP代理配置
public class TransactionAdviceConfig { // 事务切面配置
@Bean("txAdvice")
public TransactionInterceptor transactionInterceptor(
TransactionManager transactionManager) {
RuleBasedTransactionAttribute readAttribute = new RuleBasedTransactionAttribute();
readAttribute.setReadOnly(true); // 只读事务
readAttribute.setPropagationBehavior(
TransactionDefinition.PROPAGATION_NOT_SUPPORTED); // 非事务运行
RuleBasedTransactionAttribute requiredAttribute = new RuleBasedTransactionAttribute();
requiredAttribute.setPropagationBehavior(
TransactionDefinition.PROPAGATION_REQUIRED); // 开启事务
Map<String, TransactionAttribute> attributeMap = new HashMap<>(); // 保存方法映射
attributeMap.put("add*", requiredAttribute); // 方法映射配置
attributeMap.put("edit*", requiredAttribute); // 方法映射配置
attributeMap.put("delete*", requiredAttribute); // 方法映射配置
attributeMap.put("get*", readAttribute); // 方法映射配置
NameMatchTransactionAttributeSource source =
new NameMatchTransactionAttributeSource(); // 命名匹配事务
source.setNameMap(attributeMap); // 方法名称的匹配操作的事务控制生效
TransactionInterceptor interceptor =
new TransactionInterceptor(transactionManager, source); // 事务拦截器配置
return interceptor;
}
@Bean
public Advisor transactionAdviceAdvisor(TransactionInterceptor interceptor) {
String express = "execution (* com.yootk..service.*.*(..))"; // 切面表达式
AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut(); // 切入表达式配置
pointcut.setExpression(express); // 定义切面
return new DefaultPointcutAdvisor(pointcut, interceptor);
}
}
9、
package com.yootk.config;
import jakarta.persistence.SharedCacheMode;
import org.hibernate.jpa.HibernatePersistenceProvider;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.orm.jpa.vendor.HibernateJpaDialect;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
import javax.sql.DataSource;
@Configuration
@PropertySource("classpath:config/jpa.properties") // 资源文件加载
public class SpringDataJPAConfig { // SpringDataJPA配置类
@Value("${hibernate.hbm2ddl.auto}")
private String hbm2ddlAuto;
@Value("${hibernate.show_sql}")
private boolean showSql;
@Value("${hibernate.format_sql}")
private String formatSql;
@Value("${hibernate.cache.use_second_level_cache}")
private boolean secondLevelCache;
@Value("${hibernate.cache.region.factory_class}")
private String factoryClass;
@Value("${hibernate.javax.cache.provider}")
private String cacheProvider;
@Bean("entityManagerFactory") // 进行Bean注册
public LocalContainerEntityManagerFactoryBean entityManagerFactory(
DataSource dataSource,
HibernatePersistenceProvider provider,
HibernateJpaVendorAdapter adapter,
HibernateJpaDialect dialect) {
LocalContainerEntityManagerFactoryBean factoryBean =
new LocalContainerEntityManagerFactoryBean(); // 工厂配置
factoryBean.setDataSource(dataSource);
factoryBean.setPersistenceProvider(provider);
factoryBean.setJpaVendorAdapter(adapter);
factoryBean.setJpaDialect(dialect);
// 在当前应用之中是直接提供了二级缓存的配置,所以应该继续启用
factoryBean.setSharedCacheMode(SharedCacheMode.ENABLE_SELECTIVE); // 启用二缓存
factoryBean.setPackagesToScan("com.yootk.po"); // 实体类扫描包
factoryBean.setPersistenceUnitName("YootkJPA"); // 持久化单元名称
factoryBean.getJpaPropertyMap().put("hibernate.hbm2ddl.auto", this.hbm2ddlAuto);
factoryBean.getJpaPropertyMap().put("hibernate.format_sql", this.formatSql);
factoryBean.getJpaPropertyMap().put("hibernate.cache.use_second_level_cache", this.secondLevelCache);
factoryBean.getJpaPropertyMap().put("hibernate.cache.region.factory_class", this.factoryClass);
factoryBean.getJpaPropertyMap().put("hibernate.javax.cache.provider", this.cacheProvider);
return factoryBean;
}
@Bean
public HibernatePersistenceProvider provider() { // JPA持久化实现类
return new HibernatePersistenceProvider();
}
@Bean
public HibernateJpaVendorAdapter adapter() { // JPA适配器
HibernateJpaVendorAdapter adapter = new HibernateJpaVendorAdapter();
adapter.setShowSql(this.showSql); // 进行是否显示SQL的配置
return adapter;
}
@Bean
public HibernateJpaDialect dialect() { // JPA方言
return new HibernateJpaDialect();
}
}
10、
package com.yootk;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
@ComponentScan("com.yootk") // 扫描包
@EnableJpaRepositories("com.yootk.dao") // 数据层保存包
public class StartSpringCache { // 项目的启动类
}
11、
package com.yootk.po;
import jakarta.persistence.Entity;
import jakarta.persistence.Id;
@Entity
public class Emp {
@Id
private String eid;
private String ename;
private String job;
private Double salary;
public String getEid() {
return eid;
}
public void setEid(String eid) {
this.eid = eid;
}
public String getEname() {
return ename;
}
public void setEname(String ename) {
this.ename = ename;
}
public String getJob() {
return job;
}
public void setJob(String job) {
this.job = job;
}
public Double getSalary() {
return salary;
}
public void setSalary(Double salary) {
this.salary = salary;
}
}
12、
package com.yootk.dao;
import com.yootk.po.Emp;
import org.springframework.data.jpa.repository.JpaRepository;
import java.util.List;
public interface IEmpDAO extends JpaRepository<Emp, String> {
public List<Emp> findByEname(String ename); // 根据名称查询
}
13、
package com.yootk.service;
import com.yootk.po.Emp;
public interface IEmpService {
public Emp edit(Emp emp); // 编辑雇员信息
public boolean delete(String eid); // 删除雇员信息
public Emp get(String eid); // 根据ID查询雇员信息
public Emp getEname(String ename); // 根据名称查询雇员信息
}
14、
package com.yootk.service.impl;
import com.yootk.dao.IEmpDAO;
import com.yootk.po.Emp;
import com.yootk.service.IEmpService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Optional;
@Service
public class EmpServiceImpl implements IEmpService {
@Autowired
private IEmpDAO empDAO;
@Override
public Emp edit(Emp emp) {
return this.empDAO.save(emp); // 增加或更新数据
}
@Override
public boolean delete(String eid) {
if (this.empDAO.existsById(eid)) { // 此时的数据存在
this.empDAO.deleteById(eid); // 数据删除
return true;
}
return false;
}
@Override
public Emp get(String eid) {
Optional<Emp> result = this.empDAO.findById(eid);
if (result.isPresent()) { // 数据存在
return result.get(); // 获取查询结果
}
return null;
}
@Override
public Emp getEname(String ename) {
return this.empDAO.findByEname(ename).get(0); // 返回数据
}
}
15、
package com.yootk.test;
import com.yootk.StartSpringCache;
import com.yootk.po.Emp;
import com.yootk.service.IEmpService;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
@ContextConfiguration(classes = StartSpringCache.class) // 启动类
@ExtendWith({SpringExtension.class})
public class TestEmpService {
private static final Logger LOGGER = LoggerFactory.getLogger(TestEmpService.class);
@Autowired
private IEmpService empService;
@Test
public void testEdit() {
Emp emp = new Emp();
emp.setEid("muyan");
emp.setEname("爆可爱的小李老师");
emp.setJob("作者蒹讲师");
emp.setSalary(3600.0);
LOGGER.info("雇员数据修改:{}", this.empService.edit(emp)); // 更新业务
}
@Test
public void testGet() {
Emp emp = this.empService.get("muyan"); // 获取指定ID的数据
LOGGER.info("雇员数据查询,雇员ID = {}、雇员姓名 = {}、雇员职位 = {}、基本工资 = {}",
emp.getEid(), emp.getEname(), emp.getJob(), emp.getSalary());
}
@Test
public void testGetByName() {
Emp emp = this.empService.getEname("爆可爱的小李老师"); // 获取指定ID的数据
LOGGER.info("雇员数据查询,雇员ID = {}、雇员姓名 = {}、雇员职位 = {}、基本工资 = {}",
emp.getEid(), emp.getEname(), emp.getJob(), emp.getSalary());
}
@Test
public void testDelete() {
LOGGER.info("雇员数据删除:{}", this.empService.delete("muyan"));
}
}
ConcurrentHashMap 缓存管理
Spring 缓存实现结构
- SprinqCache 之中为了便于缓存结构的管理,在“orq.springframework.cache”包中提供了两个核心的标准接口,分别为:Cache 实现接口、CacheManager 管理接口
缓存配置及使用
- 在进行缓存实现过程中,Spring 是基于 Cache 接口提供的方法进行缓存操作的,所以不同的缓存组件如果要接入到 Spring 之中,则需要提供 Cache 接口的具体实现子类。考虑到缓存的管理问题,在 Spring 中又提供了 CacheManager 接口,所有可以在应用中使用的 Cache 类型全部在该接口之中进行配置
1、
package com.yootk.config;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cache.concurrent.ConcurrentMapCache;
import org.springframework.cache.support.SimpleCacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashSet;
import java.util.Set;
@Configuration // 配置类
@EnableCaching // 当前的应用要开启缓存
public class CacheConfig { // 缓存配置类
@Bean // 定义缓存管理类
public CacheManager cacheManager() {
SimpleCacheManager cacheManager = new SimpleCacheManager(); // 获取缓存管理接口实例
Set<Cache> caches = new HashSet<>(); // 保存全部的缓存集合
caches.add(new ConcurrentMapCache("emp")); // 创建一个雇员缓存
caches.add(new ConcurrentMapCache("dept")); // 创建一个部门缓存
caches.add(new ConcurrentMapCache("sal")); // 创建一个工资缓存
cacheManager.setCaches(caches); // 将缓存放到缓存管理器中
return cacheManager;
}
}
2、
package com.yootk.service;
import com.yootk.po.Emp;
import org.springframework.cache.annotation.Cacheable;
public interface IEmpService {
public Emp edit(Emp emp); // 编辑雇员信息
public boolean delete(String eid); // 删除雇员信息
@Cacheable(cacheNames = "emp") // 要使用的缓存名称
public Emp get(String eid); // 根据ID查询雇员信息
@Cacheable(cacheNames = "emp")
public Emp getEname(String ename); // 根据名称查询雇员信息
}
3、
package com.yootk.test;
import com.yootk.StartSpringCache;
import com.yootk.po.Emp;
import com.yootk.service.IEmpService;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
@ContextConfiguration(classes = StartSpringCache.class) // 启动类
@ExtendWith({SpringExtension.class})
public class TestEmpService {
private static final Logger LOGGER = LoggerFactory.getLogger(TestEmpService.class);
@Autowired
private IEmpService empService;
@Test
public void testGet() {
Emp empA = this.empService.get("muyan"); // 获取指定ID的数据
LOGGER.info("雇员数据查询,雇员ID = {}、雇员姓名 = {}、雇员职位 = {}、基本工资 = {}",
empA.getEid(), empA.getEname(), empA.getJob(), empA.getSalary());
Emp empB = this.empService.get("muyan"); // 获取指定ID的数据
LOGGER.info("雇员数据查询,雇员ID = {}、雇员姓名 = {}、雇员职位 = {}、基本工资 = {}",
empB.getEid(), empB.getEname(), empB.getJob(), empB.getSalary());
}
@Test
public void testGetByName() {
Emp emp = this.empService.getEname("爆可爱的小李老师"); // 获取指定ID的数据
LOGGER.info("雇员数据查询,雇员ID = {}、雇员姓名 = {}、雇员职位 = {}、基本工资 = {}",
emp.getEid(), emp.getEname(), emp.getJob(), emp.getSalary());
}
}
Cacheable
1、
@Cacheable(cacheNames = "emp", key = "#eid") // 要使用的缓存名称
public Emp get(String eid); // 根据ID查询雇员信息
2、
@Cacheable(cacheNames = "emp", key = "#eid", condition = "#eid.contains('yootk')") // 要使用的缓存名称
public Emp get(String eid); // 根据ID查询雇员信息
3、
@Cacheable(cacheNames = "emp", key = "#eid", unless = "#result.salary<5000") // 要使用的缓存名称
public Emp get(String eid); // 根据ID查询雇员信息
4、
@Cacheable(cacheNames = "emp", key = "#eid", sync = true) // 要使用的缓存名称
public Emp get(String eid); // 根据ID查询雇员信息
Caffeine 缓存管理
Caffeine 缓存配置
- SpringCache 使用了 ConcurrentHashMap 的结构实现了缓存的存储,这样可以保证更新的安全性,以及访问的高效性,同时也是 JDK 之中所给予的原生实现结构,但是考虑到高并发访问以及高容量存储环境的要求,则建议在项目之中引入 Caffeine 缓存组件,此时只需修改配置类之中的 CacheManager 子类即可
1、
package com.yootk.config;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.yootk.cache.CaffeineCacheManager;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
@Configuration // 配置类
@EnableCaching // 当前的应用要开启缓存
public class CacheConfig { // 缓存配置类
@Bean // 定义缓存管理类
public CacheManager cacheManager() {
CaffeineCacheManager cacheManager = new CaffeineCacheManager();
Caffeine<Object, Object> caffeine = Caffeine.newBuilder()
.maximumSize(100).expireAfterAccess(3L, TimeUnit.SECONDS);
cacheManager.setCaffeine(caffeine); // 设置缓存实现
cacheManager.setCacheNames(Arrays.asList("emp")); // 设置缓存名称
return cacheManager;
}
}
2、
@Bean
public CacheManager cacheManager() {
SimpleCacheManager cacheManager = new SimpleCacheManager();
Set<Cache> caches = new HashSet<>();
caches.add(new CaffeineCache("emp", Caffeine.newBuilder().build()));
cacheManager.setCaches(caches); // 保存全部的缓存配置
return cacheManager;
}
缓存更新策略
更新缓存数据
- 缓存保存的是应用中的热点数据,所有的热点数据都是由用户进行维护的,当某一个用户对原始数据库中的数据发出修改操作后,缓存也应该进行及时的数据更新,而这一功能可以在业务方法的定义中,基于@CachePut 注解来实现更新处理
1、
@CachePut(cacheNames = "emp", key = "#emp.eid", unless = "#result == null")
public Emp edit(Emp emp); // 编辑雇员信息
@CachePut(cacheNames = "emp", key = "#p0.eid", unless = "#result == null")
public Emp edit(Emp emp); // 编辑雇员信息
public boolean delete(String eid); // 删除雇员信息
@Cacheable(cacheNames = "emp", key = "#p0", unless = "#result.salary>2000") // 要使用的缓存名称
public Emp get(String eid); // 根据ID查询雇员信息
@Cacheable(cacheNames = "emp")
public Emp getEname(String ename); // 根据名称查询雇员信息
2、
@Test
public void testEditCache() {
Emp empA = this.empService.get("muyan"); // 获取指定ID的数据
LOGGER.info("【第一次】雇员数据查询,雇员ID = {}、雇员姓名 = {}、雇员职位 = {}、基本工资 = {}",
empA.getEid(), empA.getEname(), empA.getJob(), empA.getSalary());
Emp emp = new Emp(); // 定义新的雇员对象
emp.setEid("muyan"); // 修改数据的ID
emp.setEname("爆可爱的小李老师"); // 新的数据的内容
emp.setJob("作者蒹讲师"); // 新的数据的内容
emp.setSalary(3600.0); // 新的数据的内容
LOGGER.info("【更新】雇员数据修改:{}", this.empService.edit(emp)); // 更新业务
Emp empB = this.empService.get("muyan"); // 获取指定ID的数据
LOGGER.info("【第二次】雇员数据查询,雇员ID = {}、雇员姓名 = {}、雇员职位 = {}、基本工资 = {}",
empB.getEid(), empB.getEname(), empB.getJob(), empB.getSalary());
}
缓存清除策略
缓存不同步造成的数据读取错误
- 缓存的数据需要与实体数据完全对应,当实体数据由于业务需要被删除之后,如果不删除缓存中的数据,那么许多用户依然可以读取到该数据的内容
缓存清除注解
- 这样的操作在一些业务严谨的环境下是会存在有逻辑问题的,所以在 Spring 中提供了“@CacheEvict”注解以实现缓存数据的清除同步,该注解一般是结合业务数据的清除操作使用,基本结构如图所示,下面将通过具体的步骤进行该操作的实现。
1、
package com.yootk.service;
import com.yootk.po.Emp;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.CachePut;
import org.springframework.cache.annotation.Cacheable;
@CacheConfig(cacheNames = "emp") // 配置公共的缓存信息
public interface IEmpService {
/**
* 进行数据的更新操作,而在数据更新的时候会返回一个新的Emp对象
* 如果要进行缓存的处理,那么主要依靠该对象进行数据的更新操作
* @param emp 表示要更新的雇员数据
* @return 已经更新完成的数据内容
*/
@CachePut(key = "#emp.eid", unless = "#result == null")
public Emp edit(Emp emp); // 编辑雇员信息
@CacheEvict(key = "#eid") // 根据雇员编号删除缓存数据
public boolean delete(String eid); // 删除雇员信息
@Cacheable(key = "#eid", sync = true) // 要使用的缓存名称
public Emp get(String eid); // 根据ID查询雇员信息
@Cacheable
public Emp getEname(String ename); // 根据名称查询雇员信息
}
2、
@Test
public void testDeleteCache() {
Emp empA = this.empService.get("muyan"); // 获取指定ID的数据
LOGGER.info("【第一次】雇员数据查询,雇员ID = {}、雇员姓名 = {}、雇员职位 = {}、基本工资 = {}",
empA.getEid(), empA.getEname(), empA.getJob(), empA.getSalary());
LOGGER.info("【删除】删除雇员数据:{}", this.empService.delete("muyan"));
Emp empB = this.empService.get("muyan"); // 获取指定ID的数据
LOGGER.info("【第二次】雇员数据查询,雇员ID = {}、雇员姓名 = {}、雇员职位 = {}、基本工资 = {}",
empB.getEid(), empB.getEname(), empB.getJob(), empB.getSalary());
}
多级缓存策略
Caching 多级缓存策略
- 一个应用之中最为繁琐的就是数据查询处理,以当前的应用为例,在 IEmpService 接口中提供了 get()、getByEname()两个查询方法的处理,这样在进行数据更新的时候,除了要考虑到雇员 ID 查询的缓存更新,也需要考虑到雇员姓名的缓存更新,所以为了实现这样的多级缓存需要,在 Spring 中提供了@Caching 注解
1、
@CachePut(key = "#emp.eid", unless = "#result == null")
public Emp edit(Emp emp); // 编辑雇员信息
@Cacheable(key = "#eid", sync = true) // 要使用的缓存名称
public Emp get(String eid); // 根据ID查询雇员信息
@Cacheable(key = "#ename") // 根据姓名缓存雇员数据
public Emp getEname(String ename); // 根据名称查询雇员信息
2、
@Test
public void testEditCacheByEname() {
Emp empA = this.empService.getEname("李兴华"); // 获取姓名查询数据
LOGGER.info("【第一次】雇员数据查询,雇员ID = {}、雇员姓名 = {}、雇员职位 = {}、基本工资 = {}",
empA.getEid(), empA.getEname(), empA.getJob(), empA.getSalary());
Emp emp = new Emp(); // 定义新的雇员对象
emp.setEid("mylee"); // 修改数据的ID
emp.setEname("李兴华"); // 新的数据的内容
emp.setJob("一名受到商业机构抵制的公益教学者"); // 新的数据的内容
emp.setSalary(1600.0); // 新的数据的内容
LOGGER.info("【更新】雇员数据修改:{}", this.empService.edit(emp)); // 更新业务
Emp empB = this.empService.getEname("李兴华"); // 获取姓名查询数据
LOGGER.info("【第二次】雇员数据查询,雇员ID = {}、雇员姓名 = {}、雇员职位 = {}、基本工资 = {}",
empB.getEid(), empB.getEname(), empB.getJob(), empB.getSalary());
}
3、
@Caching(put = {
@CachePut(key = "#emp.eid", unless = "#result == null"), // 根据雇员编号更新缓存
@CachePut(key = "#emp.ename", unless = "#result == null") // 根据雇员姓名更新缓存
})
public Emp edit(Emp emp); // 编辑雇员信息
4、
USE yootk;
DELETE FROM emp;
INSERT INTO emp(eid, ename, job, salary) VALUES ('muyan', '张易言', '职员', 2500);
INSERT INTO emp(eid, ename, job, salary) VALUES ('yootk', '李沐文', '讲师', 5000);
INSERT INTO emp(eid, ename, job, salary) VALUES ('mylee', '李兴华', '讲师', 3000);
INSERT INTO emp(eid, ename, job, salary) VALUES ('wings', '王子琪', '经理', 3500);
Memcached 缓存概述
单机缓存管理局限
- 使用缓存解决了应用数据加载的 IO 性能问题,但是 Caffeine 仅仅只提供了一个高效的单机缓存应用,如图所示,而在实际的应用运行场景之中,除了缓存数据之外,还需要进行应用数据的管理,并且由于物理内存的局限性,不同的应用也需要进行内存的抢占,这些因素都可能影响到最终缓存处理的质量。
分布式缓存
- 如果现在开辟的缓存空间太小,那么就会造成缓存数据不足,从而导致数据库服务的访问压力激增。而如果缓存空间太大,那么又会影响到其他进程的内存分配,所以单机版的缓存只能够应用于缓存数量较小的场景,而在数据量较大的应用之中是无法使用的,那么此时就需要考虑到分布式缓存
Memcached
- Memcached 是以 LiveJournal 旗下 Danga Interactive 公司的 Brad Fitzpatric 为首开发的一款软件,现在已成为 mixi、hatena、Facebook、Vox、LiveJournal 等众多服务中提高 Web 应用扩展性的重要因素,其采用了“key=value”的方式进行存储,同时软件的安装也较为简单
Memcached 命令操作
- 在数据保存时,需要首先设置要存储的数据 KEY,数据的保存时间(单位:秒)以及数操作的格式如图 10-43 所示,据的长度,命令执行之后再输入其对应的 VALUEset 命令的完整语法如下:
第-行输入存储命令:set key flags exptime bytes [noreply]
第二行输入存储数据:value
配置参数:
“key”:设置要保存缓存数据的 key;
“flags”:是一个整型的标记信息,可以保存一些与数据存储有关的附加内容,'exptime”:数据的过期时间,单位是秒,如果设置为 0 则表示永不过期;
bytes”:在缓存中存储数据的长度:
“noreplay”:可选参数,表示服务操作完成后不返回数据;
保存数据项:
- 设置存储 KEY:set muyan 0 900 9
- 设置存储 VALUE:yootk.com
- 程序执行结果:STORED
数据获取:
- get muyan
1、
dnf -y install memcached
yum -y install memcached
2、
memcached -h
3、
memcached -p 6030 -m 128m -c 1024 -u root -d
4、
netstat -nptl
5、
firewall-cmd --zone=public --add-port=6030/tcp --permanent
firewall-cmd --reload
6、
C:\Windows\System32\drivers\etc\hosts
7、
telnet memcached-server 6030
8、
set muyan 0 900 9
yootk.com
9、
get muyan
Memcached 数据操作命令
1、 清空缓存数据
flush_all
2、存储数据
set muyan 10 0 5
3、指定key追加数据
append muyan 11 0 4
4、
prepend muyan 12 0 4
5、
gets muyan
6、
cas muyan 0 900 5 18
7、
stats items
8、
stats cachedump 1 0
9、
delete muyan
Spring 整合 Memcached
Memcached 客户端使用结构
- Memcached 作为缓存数据库,一般都需要与具体的业务有所关联,而业务的实现往往都是通过程序语言来完成的,为了便于操作,在 Memcached 中提供了 Java 相关的客户端依赖支持,在进行使用时只需要配置好相关的连接池信息,并通过 MemCachedClient 类即可实现数据操作
1、
// https://mvnrepository.com/artifact/com.whalin/Memcached-Java-Client
implementation group: 'com.whalin', name: 'Memcached-Java-Client', version: '3.0.2'
2、
# 【Memcached配置】设置服务组件的地址与端口
memcached.server=memcached-server:6030
# 【Memcached配置】设置服务器的权重
memcached.weights=1
# 【Memcached配置】设置每个缓存服务器的初始化连接数量
memcached.initConn=1
# 【Memcached配置】设置每个缓存服务器的最小维持连接数量
memcached.minConn=1
# 【Memcached配置】设置最大可用的连接数量
memcached.maxConn=50
# 【Memcached配置】配置Nagle算法,因为缓存的处理不需要确认(影响性能)
memcached.nagle=false
# 【Memcached配置】设置Socket读取等待的超时时间
memcached.socketTO=3000
# 【Memcached配置】设置连接池的服务更新时间间隔
memcached.maintSleep=5000
3、
package com.yootk.config;
import com.whalin.MemCached.MemCachedClient;
import com.whalin.MemCached.SockIOPool;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
@Configuration
@PropertySource("classpath:config/memcached.properties") // 加载资源文件
public class MemcachedConfig {
@Value("${memcached.server}")
private String server; // 服务器地址
@Value("${memcached.weights}")
private int weight; // 权重
@Value("${memcached.initConn}")
private int initConn; // 初始化连接数量
@Value("${memcached.minConn}")
private int minConn; // 最小维持的连接数量
@Value("${memcached.maxConn}")
private int maxConn; // 最大连接数量
@Value("${memcached.maintSleep}")
private int maintSleep; // 连接池维护周期
@Value("${memcached.nagle}")
private boolean nagle; // Nagle算法配置
@Value("${memcached.socketTO}")
private int socketTO; // 连接超时
@Bean("socketIOPool")
public SockIOPool initSocketIOPool() {
SockIOPool pool = SockIOPool.getInstance("memcachedPool"); // 获取连接池的实例
pool.setServers(new String[] {this.server}); // 配置服务器地址
pool.setWeights(new Integer[] {this.weight}); // 配置权重
pool.setInitConn(this.initConn); // 初始化连接数量
pool.setMinConn(this.minConn); // 最小连接池数量
pool.setMaxBusyTime(this.maxConn); // 最大的连接池数量
pool.setMaintSleep(this.maintSleep); // 维护间隔
pool.setNagle(this.nagle); // 禁用Nagle算法,以提升处理性能
pool.setSocketTO(this.socketTO); // 配置连接超时时间
pool.initialize(); // 初始化连接池
return pool;
}
@Bean
public MemCachedClient memCachedClient() {
MemCachedClient cachedClient = new MemCachedClient("memcachedPool");
return cachedClient;
}
}
4、
package com.yootk.test;
import com.whalin.MemCached.MemCachedClient;
import com.yootk.StartSpringCache;
import jakarta.persistence.EntityManagerFactory;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import javax.sql.DataSource;
import java.util.Date;
import java.util.concurrent.TimeUnit;
@ContextConfiguration(classes = StartSpringCache.class) // 启动类
@ExtendWith({SpringExtension.class})
public class TestMemcached { // 测试Caffeine组件的基本操作
private static final Logger LOGGER = LoggerFactory.getLogger(TestMemcached.class);
@Autowired
private MemCachedClient memCachedClient;
@Test
public void testData() throws Exception {
long expire = System.currentTimeMillis() +
TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
LOGGER.info("设置Memcached缓存数据:{}", this.memCachedClient
.set("muyan", "www.yootk.com", new Date(expire)));
LOGGER.info("获取Memcached缓存数据:muyan = {}", this.memCachedClient.get("muyan"));
}
}
SpringCache 整合 Memcached 缓存服务
Cache 接口方法
- 为便于缓存的统一操作,SpringCache 提供了一系列的配置注解,利用这些注解并结合业务处理方法可以方便的实现缓存数据的操作。使用 SpringCache 进行缓存操作时,具体的缓存处理如果现在要与 Memcached 缓存服务器进行整合,则就需要开发者自是由 Cache 接口完成的,定义 Cache 接口的实现子类,随后根据需要覆写 Cache 接口中的全部抽象方法
CacheManager 缓存管理类
- Cache 接口只是定义了缓存操作的具体实现,而 Memcached 缓存操作是由 MemCachedClient 工具类实现的,所以只需要在 Cache 子类中调用相关的方法即可实现。但是如果要想让 SpringCache 获取到指定的 Cache 接口实例,按照标准来讲需要提供有一个 CacheManager 接口实例,而对于 Memcached 缓存管理就需要开发者创建新的 CacheManager 接口实现类
1、
package com.yootk.cache.memcached;
import com.whalin.MemCached.MemCachedClient;
import org.springframework.cache.Cache;
import org.springframework.cache.support.SimpleValueWrapper;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
public class MemcachedCache implements Cache {
private MemCachedClient client; // 所有的操作命令由此对象发出
private String name; // 缓存的名称
private long expire; // 失效时间
public MemcachedCache(String name, long expire, MemCachedClient client) {
this.client = client;
this.name = name;
this.expire = expire;
}
@Override
public String getName() {
return this.name;
}
@Override
public Object getNativeCache() { // 原生缓存提供者
return this.client; // 将Memcached工具对象实例返回
}
@Override
public ValueWrapper get(Object key) {
ValueWrapper wrapper = null;
Object value = this.client.get(key.toString()); // 根据key查询value
if (value != null) { // 有数据了
wrapper = new SimpleValueWrapper(value); // 将内容包装
}
return wrapper;
}
@Override
public <T> T get(Object key, Class<T> type) {
Object cacheValue = this.client.get(key.toString()); // 获取缓存数据
if (type == null && !type.isInstance(cacheValue)) {
throw new IllegalStateException("缓存数据不是 [" + type.getName() + "] 类实例:" + cacheValue);
}
return (T) cacheValue;
}
@Override
public <T> T get(Object key, Callable<T> valueLoader) {
T value = (T) this.get(key); // 获取数据
if (value == null) { // 数据为空
FutureTask<T> futureTask = new FutureTask<>(valueLoader); // 异步数据加载
new Thread(futureTask).start(); // 启动异步线程
try {
value = futureTask.get();
} catch (Exception e) {}
}
return value;
}
@Override
public void put(Object key, Object value) {
this.client.set(key.toString(), value, new Date(this.expire)); // 添加缓存
}
@Override
public void evict(Object key) {
this.client.delete(key.toString());
}
@Override
public void clear() {
this.client.flushAll(); // 清空缓存
}
}
2、
package com.yootk.cache.memcached;
import com.whalin.MemCached.MemCachedClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.Cache;
import org.springframework.cache.support.AbstractCacheManager;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
public class MemcachedCacheManager extends AbstractCacheManager {
@Autowired
private MemCachedClient cachedClient;
private long expire = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
// CacheManager需要保存有多个不同的Cache项,那么应该通过集合存储
private ConcurrentHashMap<String, Cache> cacheMap = new ConcurrentHashMap<>();
@Override
protected Collection<? extends Cache> loadCaches() {
return this.cacheMap.values();
}
@Override
public Cache getCache(String name) { // 根据名称获取Cache
Cache cache = this.cacheMap.get(name); // 获取指定名称的缓存对象
if (cache == null) {
cache = new MemcachedCache(name, this.expire, this.cachedClient);
this.cacheMap.put(name, cache); // 下次获取缓存对象方便
}
return cache;
}
}
3、
package com.yootk.config;
import com.yootk.cache.memcached.MemcachedCacheManager;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration // 配置类
@EnableCaching // 当前的应用要开启缓存
public class CacheConfig { // 缓存配置类
@Bean
public CacheManager cacheManager() {
return new MemcachedCacheManager();
}
}
demo