Redis高级

Redis高级

数据结构使用场景

随着 Redis 版本的更新,后面又支持了四种数据类型: BitMap(2.2 版新增)、HyperLogLog(2.8 版新增)、GEO(3.2 版新增)、Stream(5.0 版新增)。 Redis 五种数据类型的应用场景:

  • String 类型的应用场景:缓存对象、常规计数、分布式锁、共享 session 信息等。
  • List 类型的应用场景:消息队列(但是有两个问题:1. 生产者需要自行实现全局唯一 ID;2. 不能以消费组形式消费数据)等。
  • Hash 类型:缓存对象、购物车等。
  • Set 类型:聚合计算(并集、交集、差集)场景,比如点赞、共同关注、抽奖活动等。
  • Zset 类型:排序场景,比如排行榜、电话和姓名排序等。

Redis 后续版本又支持四种数据类型,它们的应用场景如下:

  • BitMap(2.2 版新增):二值状态统计的场景,比如签到、判断用户登陆状态、连续签到用户总数等;
  • HyperLogLog(2.8 版新增):海量数据基数统计的场景,比如百万级网页 UV 计数【独立访客(unique visitor)】等;
  • GEO(3.2 版新增):存储地理位置信息的场景,比如滴滴叫车;
  • Stream(5.0 版新增):消息队列,相比于基于 List 类型实现的消息队列,有这两个特有的特性:自动生成全局唯一消息ID,支持以消费组形式消费数据。

String

缓存保存的主要数据结构

原理:SDS(Simple Dynamic String)

1
2
3
4
5
6
struct sdshdr {
uint32_t len; //字符串长度
uint32_t alloc; //字符串空间大小
unsigned char flags; //表示sds的类型(8位)
char buf[]; //用于存储字符串数据
};
  • 避免缓冲区溢出::维护了其最大容量信息,并且所有写入操作都会检查是否超出此限制,所以使用SDS可以有效防止因越界写入导致的安全问题
  • 查询长度时间复杂度为O(1)
  • 二进制安全,以长度标志来判断字符串是否结束,不受\0干扰

可以用于全局自增id统计文章浏览量

image-20230609161022653

1
2
3
4
5
6
7
8
9
10
11
12
13
public static final long BEGIN_TIMESTAMP=1672531200L;

public long nextId(String keyPrefix){
//1. 生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timeStamp = nowSecond - BEGIN_TIMESTAMP;
String date = now.format(DateTimeFormatter.ofPattern("yyyyMM"));
//2. redis自增
Long increment = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
//3. 拼接返回
return timeStamp << 32 | increment;
}

List

可以用于做队列,栈使用,可以用作于等待队列

应用场景:发布订阅【消息队列】,慢查询

队列组合:(先进先出)

1
2
RPUSH key 1 2 3 4 5
LPOP key

栈组合:

1
2
LPUSH key 1 2 3 4 5
LPOP key

image-20230609172326658

Hash

可以用于存储对象数据,相比于String保存的json对象,字段容易更改!

image-20230609163650354

Zset

原理

Zset原理

  • **ziplist**:(数量小于128)压缩列表节点,本质是双向链表,按score从小到大排序

v2-6e584cd141c4b0dbbec37fc3ade80fa1_1440w

  • **skiplist**:跳表,通过空间换时间,时间复杂度为O(logN)

25e6411ae7b3b2f0154da760cbc86343

使用

点赞排名用户信息展示!

image-20230609195218583

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public Result likeBlog(Long id) {
//更改使用scoreSet,用分数进行排行,然后按顺序返回点赞用户头像
Long userId = UserHolder.getUser().getId();
//判断是否已经点赞
Double score = stringRedisTemplate.opsForZSet().score(RedisConstants.BLOG_LIKED_KEY + id, userId.toString());
if (score ==null){
//1.如果没点赞,点赞数+1,增加集合
boolean like = update().setSql("liked = liked+1").eq("id", id).update();
if (BooleanUtil.isTrue(like)){
stringRedisTemplate.opsForZSet().add(RedisConstants.BLOG_LIKED_KEY+id,userId.toString(),System.currentTimeMillis());
}
}else {
//2.如果已经点赞,则取消点赞-1,并删除集合
boolean ifLike = update().setSql("liked = liked-1").eq("id", id).update();
if (BooleanUtil.isTrue(ifLike)){
stringRedisTemplate.opsForZSet().remove(RedisConstants.BLOG_LIKED_KEY+id,userId.toString());
}
}
return Result.ok();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public Result queryBlogLikes(Long id) {
List<UserDTO> userDTOS;
//将top5的用户返回
//1.在Redis中利用文章id,查询top5点赞用户id
Set<String> top5 = stringRedisTemplate.opsForZSet().range(RedisConstants.BLOG_LIKED_KEY + id, 0, 4);
if (top5 ==null ||top5.isEmpty()){
return Result.ok(Collections.emptyList());
}
//2.根据用户id查询user,这个方法这绝,一行代码实现String向Long的转换
List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
String idStr = StrUtil.join(",", ids);
//3.将user封装成userDto
//注意:这里的listByIds底层是使用in,查询出来的结果默认按照id递增排序,这里我们需要按照时间升序排序,需要注入sql实现!
if (ids.size() >1){
userDTOS = userService
.query().in("id",ids).last("ORDER BY FIELD("+idStr+")").list()
.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
}else {
userDTOS = userService.listByIds(ids).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
}
return Result.ok(userDTOS);
}

HyperLogLog

主要应用于统计浏览量总数

可以按天,月分批统计;并可以用pfmerge合并计算年总数

PFMERGE用来合并两个key计算总共的统计量

image-20230924170913733

1
2
3
4
5
6
7
8
@Test
void testHyperLogLog(){
userService.list().forEach(user -> {
stringRedisTemplate.opsForHyperLogLog().add("UV", String.valueOf(user.getId()));
});
//PFCOUNT
System.out.println(stringRedisTemplate.opsForHyperLogLog().size("UV"));
}

BitMap

最高位32位,并且下标从0开始计数,这刚好可以用于统计某个人的签到情况

image-20230924131126304

  1. 将签到数据保存到Redis
1
2
3
4
5
6
7
8
9
10
11
12
13
public Result sign() {
//1.获取用户id
Long userId = UserHolder.getUser().getId();
//2.获取当前年月日,组装成key
LocalDateTime now = LocalDateTime.now();
String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
int dayOfMonth = now.getDayOfMonth();
//拼装成key
String key = SystemConstants.USER_SIGN + userId + keySuffix;
//3.对Redis操作
stringRedisTemplate.opsForValue().setBit(key,dayOfMonth-1,true);
return Result.ok();
}
  1. 获取连续签到天数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package redis;

public class Main {
public static void main(String[] args) {
int num = 1000; // 十进制数
String binaryNumber = Integer.toBinaryString(num);
System.out.println("Binary representation of " + num + " is: " + binaryNumber);
int count = 0;
while (num!=0) {
if ((num&1)==0) {
count =0;
}else {
count++;
}
num>>=1;
}
System.err.println(count);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public Result signCount() {
//1.获取用户和年月日
Long userId = UserHolder.getUser().getId();
LocalDateTime now = LocalDateTime.now();
String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
int dayOfMonth = now.getDayOfMonth();
//2.拼接key,并且查询0到当天的签到量 BITFIELD sign:1010:202309 GET u24 0
String key = SystemConstants.USER_SIGN + userId + keySuffix;
List<Long> result = stringRedisTemplate.opsForValue().bitField(key,
BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0));
if (result == null || result.isEmpty()){
return Result.ok(0);
}
Long num = result.get(0);
if (num == null || num == 0){
return Result.ok(0);
}
int count = 0;
//3.右移运算和与运算获取当前位,是1就加一,是0就返回
while (true){
if(num==0){
break;
}
//相与获取当前位的数 1&1=1 1&0=0
if ((num & 1) == 0) {
count=0;
}else {
count++;
}
//执行右移操作,使其循环向右读取
num >>= 1;
}
return Result.ok(count);

}

Stream

redis的MQ

1
2
3
4
5
6
7
8
# 创建消息队列
XGROUP create stream.mqtt group1 0 MKSTREAM

# 加入消息(*代表id自动生成)
XADD stream.mqtt * deviceId 5

# 消费消息(>代表从最早的去取,先进先出)
xreadgroup group group1 customer count 1 streams stream.mqtt >

使用方式

注册监听器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Configuration
public class RedisMQConfig {

@Autowired
private RedisMQListener redisMQListener;

private RedisTemplate redisTemplate;

private static final Logger log = LoggerFactory.getLogger(RedisMQConfig.class);

@Autowired
public RedisMQConfig(RedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}

@Bean
public Subscription subscription(RedisConnectionFactory redisConnectionFactory) {

StreamMessageListenerContainer.StreamMessageListenerContainerOptions options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.pollTimeout(Duration.ofSeconds(1)).build();
StreamMessageListenerContainer streamMessageListenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
Subscription subscription = streamMessageListenerContainer.receiveAutoAck(Consumer.from("group1", "customer"), StreamOffset.create("stream.mqtt", ReadOffset.lastConsumed()), redisMQListener);
streamMessageListenerContainer.start();
return subscription;
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Component
public class RedisMQListener implements StreamListener<String, MapRecord<String, String, Object>> {

private static final Logger log = LoggerFactory.getLogger(RedisMQListener.class);

// 创建一个线程池
/*private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
10, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());*/
private static final ExecutorService threadPoolExecutor = Executors.newSingleThreadExecutor();

@Override
public void onMessage(MapRecord message) {
// 异步处理消息
/*threadPoolExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName() + ":接收到的消息:" + message.getId() + ";" + JSON.toJSONString(message.getValue()));
});*/
threadPoolExecutor.execute(new MqttMSgHandle(message));

}

@AllArgsConstructor
public class MqttMSgHandle implements Runnable{

//这边实现读取消息队列创建订单的操作
private MapRecord message;
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ":接收到的消息:" + message.getId() + ";" + JSON.toJSONString(message.getValue()));
try {
//处理MQTT的数据
}catch (Exception e){
log.error("出现订单异常",e);
handlePendingList();
}
}

private void handlePendingList() {
while (true){
try {
//处理未消费成功的数据
}catch (Exception e){
log.error("出现pending-list订单异常",e);
try {
Thread.sleep(200);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
}
}
}
}
创建独立线程不断轮询
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

@PostConstruct
private void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandle());
}

String queueName = "stream.orders";

public class VoucherOrderHandle implements Runnable{
//这边实现读取消息队列创建订单的操作
@Override
public void run() {
while (true){
try {
//1.获取消息队列中的订单消息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
//1.1判断是否获取成功
if (list==null || list.isEmpty()) {
//1.2不成功,说明无消息,重新执行
continue;
}
//解析订单信息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
//1.3成功,创建订单
createVoucherOrder(voucherOrder);
//2.ACK确认
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
}catch (Exception e){
log.error("出现订单异常",e);
handlePendingList();
}
}
}

private void handlePendingList() {
while (true){
try {
//1.获取pending—list队列中的订单消息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS stream.orders 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
//1.1判断是否获取成功
if (list==null || list.isEmpty()) {
//1.2不成功,说明无消息,重新执行
break;
}
//解析订单信息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
//1.3成功,创建订单
createVoucherOrder(voucherOrder);
//2.ACK确认
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
}catch (Exception e){
log.error("出现pending-list订单异常",e);
try {
Thread.sleep(200);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
}
}
}

}

持久化

美团二面:Redis持久化选RDB还是AOF?面试这么回答就过了_哔哩哔哩_bilibili

RDB(默认):采取的是定时把数据刷到磁盘中,以二进制文件存储,文件紧凑,优点是redis宕机数据恢复快,但是数据可靠性不强

AOF:将修改操作记录在AOF文件日志中,数据可靠性强,且也会做操作日志压缩,但是恢复数据依然很慢

v2-25a35f6536ebe44fb3af8e5bb60eb775_1440w

RDB

image-20240526215143671

  1. 主线程fork子线程进行RDB存储

  2. 子进程通过共享内存获取主进程数据

    当主进程发生更改时才对子进程的数据进行拷贝,如果没有更改则直接通过共享内存刷入RDB

  3. 将快照写入临时的RDB文件

  4. 完成临时RDB写入替换原有的RDB文件

COW:写时拷贝的主要作用就是将拷贝推迟到写操作真正发生时,这也就避免了大量无意义的拷贝操作

父/子进程修改某个页时,该页的共享才结束,同时子进程分配该页大小的物理空间复制父进程对应页的内容。

这样,如果当子进程运行期间,父子进程都没有修改数据,那么操作系统就节省了大量的内存复制时间和占用空间

  • 如果没有修改,直接通过共享内存做RDB存储。减少了大量内存拷贝的时间复制时间和空间占用
  • 修改了则结束共享内存,再进行拷贝

image-20241119144901377

为什么fork子线程?

  • 避免了主线程阻塞,备份时也能执行主业务
  • 共享内存(减少拷贝的前提) + COW机制(写时复制):减少拷贝空间和时间占用

image-20240526215054163

AOF

最常用

image-20240526215548789

  1. 写入缓冲区
  2. 每隔1s写入AOF日志文件
  3. 优化冗余操作日志

image-20240526215445927

为什么AOF比RDB更加可靠?

因为AOF每秒都会执行,而RDB是通过配置文件定义修改了多少才进行备份(非实时性)

save 900 1 表示如果在900秒内至少有1个键被修改,则进行一次RDB快照

  • AOF通过记录服务器接收到的每个写操作来实现持久化,这样即使在最坏的情况下(如突然断电),也只会丢失最近几秒钟的数据,因为AOF文件会在每秒或者每次执行命令后进行同步。
  • RDB则是在指定的时间间隔内生成一个快照文件,如果在这期间发生故障,则可能丢失自上次快照以来的所有数据。

事务机制

内存淘汰

当 Redis 达到最大内存限制时,Redis会确切地使用配置好的最大内存策略指令来执行。相关策略如下:

  1. noeviction:写满就不能写,默认淘汰策略

  2. random

  • allkeys-random:在主键空间中,包含所有数据,随机移除某个key。
  • volatile-random:在设置了过期时间的键空间中,随机移除某个key。
  1. volatile-ttl:越早过期的数据先被淘汰

  2. lru

    • allkeys-lru:在主键空间中,优先移除最近未使用的key。(推荐)
    • volatile-lru:在设置了过期时间的键空间中,优先移除最近未使用的key

    这就有个问题,同时有1s前访问5次的数据和5s前访问1000次的数据时,会先删除5s前访问1000次的数据

  3. lfu:lru的基础上增加请求次数统计,更精准代表热点数据

修改内存策略:

redis.conf文件加入

1
2
maxmemory 300mb
maxmemory-policy allkeys-lru

Redisson

Redisson原理

Redisson的watchDog机制

主从同步

主节点执行写操作,从节点执行读操作。任务分工,减少读写阻塞,但是这就带来了主从节点数据不一致的问题

主从同步操作:当主节点执行持久化时将保存的RDB文件同步给从节点,在传输这期间可能也会执行其他的命令,这时候将命令写在txt文件里发送给从节点

哨兵机制(Sentinel)

image-20241103222111633

当主节点挂掉了怎么办,这个时候需要选举新的主节点,那如何监测节点是否存活呢?又该选举谁当新的主节点呢?

方法是单独分出来一些节点充当管理员,管理员也被成为哨兵,哨兵会时不时监测主从节点的存活状态

  • 10s访问主节点,询问主从情况

image-20241103223431773

  • 1s询问所有节点,对节点发送ping:当一个哨兵发现R1服务没有反应,会被定义为主观下线,这个时候需要征求其他哨兵访问R1的状态,如果都没有反应则可以认为是客观下线

image-20241103223448348

回到开始的问题,如果主节点下线了,该选择谁为主节点?

  • 根据优先级,配置越高优先级越高
  • 根据最近节点,保障数据最新
  • 根据复制偏移量,越大则越全

复制偏移量:是指令的缓存区最后的数据,越大证明数据越新

image-20241103223954118

image-20241103223547626

面试题

Redis的大Key问题如何解决?

产生问题:

  1. 内存占用
  2. 网络传输延迟
  3. 持久化备份降速

解决方案:

  1. 大key转化为多个小key

比如把一个大的Hash结构分别转化为多个Hash结构

image-20240518131051988

  1. 搭建Redis集群,把可以分配到不同的Hash slot槽所在的分片上

  2. 压缩算法,将数据压缩到Redis在使用时对其解压缩

String类型底层实现

Redis篇——String类型在Redis中底层存储数据结构,全方位分析底层存储原理!_redis string 底层数据结构-CSDN博客

底层实现是SDS(simple dynamic string),中文翻译为简单动态字符串。它是一个动态字符串结构,由长度、空闲空间和字节数组三部分组成

SDS有三种编码类型:

1、embstr:占用64Bytes的空间,存储44Bytes的数据

embstr不会重新再次开辟空间,而是会嵌入在redisObject里面
(1)前19个字节用于存储embstr结构
(2)中间44个字节存储数据
(3)最后都为\0符号

2、raw:存储大于44Bytes的数据

此时动态字符串SDS的内存与其依赖的redisOject的内存不在连续了

image-20240801171711555

3、int:存储整数类型

注意:如果之前这个存储的字符串的长度并未达到44,更新后的长度也没有达到44字节,它的底层存储结构还是会变成RAW

image-20240801171837213

image-20240801172133951

image-20240801172151492

存储器的速度比较

寄存器 > Cache > RAM > ROM/硬盘等其他持久存储设备。

Redis为什么这么快?

  1. 内存存储数据
  2. 单线程

多线程主要提高了吞吐量,而其中多线程竞争需要加锁也会造成时间消耗。并且redis数据是保存在内存中的读写速度快,可以避免切换上下文的开销。再者,redis使用了IO复用,单线程也能高效的处理大量并发连接!

Redis是单线程的原因主要在于其设计和使用的内存数据结构。以下是几个关键因素:

  1. 内存操作的高效性:Redis的数据全部存储在内存中,针对内存的数据进行读写操作都非常快。在这种情况下,单线程能够高效地处理这些操作,因为避免了多线程之间的同步和上下文切换的开销。
  2. 避免多线程竞争:在单线程模型中,所有请求都是串行执行的,没有多线程的困扰,因此不需要考虑锁的复杂度和线程之间的竞争问题。这简化了系统设计,并提高了操作的确定性。
  3. I/O多路复用技术:Redis使用了I/O多路复用技术(如Linux中的epoll、kqueue或select/poll等),这使得单个线程能够同时监控多个文件描述符(客户端连接)的状态变化。当有新的客户端连接请求或已有连接上有数据可读写时,I/O多路复用函数会通知Redis主线程进行处理。这种技术使得单线程能够高效地处理大量并发连接。
  1. IO多路复用

  2. 优秀的数据结构

如跳表提升了查询速率!