NoSQLRedis高级
FANSEARedis高级
数据结构使用场景
随着 Redis 版本的更新,后面又支持了四种数据类型: BitMap(2.2 版新增)、HyperLogLog(2.8 版新增)、GEO(3.2 版新增)、Stream(5.0 版新增)。 Redis 五种数据类型的应用场景:
- String 类型的应用场景:缓存对象、常规计数、分布式锁、共享 session 信息等。
- List 类型的应用场景:消息队列(但是有两个问题:1. 生产者需要自行实现全局唯一 ID;2. 不能以消费组形式消费数据)等。
- Hash 类型:缓存对象、购物车等。
- Set 类型:聚合计算(并集、交集、差集)场景,比如点赞、共同关注、抽奖活动等。
- Zset 类型:排序场景,比如排行榜、电话和姓名排序等。
Redis 后续版本又支持四种数据类型,它们的应用场景如下:
- BitMap(2.2 版新增):二值状态统计的场景,比如签到、判断用户登陆状态、连续签到用户总数等;
- HyperLogLog(2.8 版新增):海量数据基数统计的场景,比如百万级网页 UV 计数【独立访客(unique visitor)】等;
- GEO(3.2 版新增):存储地理位置信息的场景,比如滴滴叫车;
- Stream(5.0 版新增):消息队列,相比于基于 List 类型实现的消息队列,有这两个特有的特性:自动生成全局唯一消息ID,支持以消费组形式消费数据。
String
缓存保存的主要数据结构
原理:SDS
(Simple Dynamic String)
1 2 3 4 5 6
| struct sdshdr { uint32_t len; uint32_t alloc; unsigned char flags; char buf[]; };
|
- 避免缓冲区溢出::维护了其最大容量信息,并且所有写入操作都会检查是否超出此限制,所以使用SDS可以有效防止因越界写入导致的安全问题
- 查询长度时间复杂度为O(1)
- 二进制安全,以长度标志来判断字符串是否结束,不受
\0
干扰
可以用于全局自增id,统计文章浏览量

1 2 3 4 5 6 7 8 9 10 11 12 13
| public static final long BEGIN_TIMESTAMP=1672531200L;
public long nextId(String keyPrefix){ LocalDateTime now = LocalDateTime.now(); long nowSecond = now.toEpochSecond(ZoneOffset.UTC); long timeStamp = nowSecond - BEGIN_TIMESTAMP; String date = now.format(DateTimeFormatter.ofPattern("yyyyMM")); Long increment = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date); return timeStamp << 32 | increment; }
|
List
可以用于做队列,栈使用,可以用作于等待队列
应用场景:发布订阅【消息队列】,慢查询
队列组合:(先进先出)
1 2
| RPUSH key 1 2 3 4 5 LPOP key
|
栈组合:
1 2
| LPUSH key 1 2 3 4 5 LPOP key
|

Hash
可以用于存储对象数据,相比于String保存的json对象,字段容易更改!

Zset
原理
Zset原理
- **
ziplist
**:(数量小于128)压缩列表节点,本质是双向链表,按score从小到大排序

- **
skiplist
**:跳表,通过空间换时间,时间复杂度为O(logN)

使用
点赞排名用户信息展示!

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public Result likeBlog(Long id) { Long userId = UserHolder.getUser().getId(); Double score = stringRedisTemplate.opsForZSet().score(RedisConstants.BLOG_LIKED_KEY + id, userId.toString()); if (score ==null){ boolean like = update().setSql("liked = liked+1").eq("id", id).update(); if (BooleanUtil.isTrue(like)){ stringRedisTemplate.opsForZSet().add(RedisConstants.BLOG_LIKED_KEY+id,userId.toString(),System.currentTimeMillis()); } }else { boolean ifLike = update().setSql("liked = liked-1").eq("id", id).update(); if (BooleanUtil.isTrue(ifLike)){ stringRedisTemplate.opsForZSet().remove(RedisConstants.BLOG_LIKED_KEY+id,userId.toString()); } } return Result.ok(); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public Result queryBlogLikes(Long id) { List<UserDTO> userDTOS; Set<String> top5 = stringRedisTemplate.opsForZSet().range(RedisConstants.BLOG_LIKED_KEY + id, 0, 4); if (top5 ==null ||top5.isEmpty()){ return Result.ok(Collections.emptyList()); } List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList()); String idStr = StrUtil.join(",", ids); if (ids.size() >1){ userDTOS = userService .query().in("id",ids).last("ORDER BY FIELD("+idStr+")").list() .stream() .map(user -> BeanUtil.copyProperties(user, UserDTO.class)) .collect(Collectors.toList()); }else { userDTOS = userService.listByIds(ids).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)) .collect(Collectors.toList()); } return Result.ok(userDTOS); }
|
HyperLogLog
主要应用于统计浏览量总数
可以按天,月分批统计;并可以用pfmerge合并计算年总数
PFMERGE用来合并两个key计算总共的统计量

1 2 3 4 5 6 7 8
| @Test void testHyperLogLog(){ userService.list().forEach(user -> { stringRedisTemplate.opsForHyperLogLog().add("UV", String.valueOf(user.getId())); }); System.out.println(stringRedisTemplate.opsForHyperLogLog().size("UV")); }
|
BitMap
最高位32位,并且下标从0开始计数,这刚好可以用于统计某个人的签到情况

- 将签到数据保存到Redis
1 2 3 4 5 6 7 8 9 10 11 12 13
| public Result sign() { Long userId = UserHolder.getUser().getId(); LocalDateTime now = LocalDateTime.now(); String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM")); int dayOfMonth = now.getDayOfMonth(); String key = SystemConstants.USER_SIGN + userId + keySuffix; stringRedisTemplate.opsForValue().setBit(key,dayOfMonth-1,true); return Result.ok(); }
|
- 获取连续签到天数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package redis;
public class Main { public static void main(String[] args) { int num = 1000; String binaryNumber = Integer.toBinaryString(num); System.out.println("Binary representation of " + num + " is: " + binaryNumber); int count = 0; while (num!=0) { if ((num&1)==0) { count =0; }else { count++; } num>>=1; } System.err.println(count); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public Result signCount() { Long userId = UserHolder.getUser().getId(); LocalDateTime now = LocalDateTime.now(); String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM")); int dayOfMonth = now.getDayOfMonth(); String key = SystemConstants.USER_SIGN + userId + keySuffix; List<Long> result = stringRedisTemplate.opsForValue().bitField(key, BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0)); if (result == null || result.isEmpty()){ return Result.ok(0); } Long num = result.get(0); if (num == null || num == 0){ return Result.ok(0); } int count = 0; while (true){ if(num==0){ break; } if ((num & 1) == 0) { count=0; }else { count++; } num >>= 1; } return Result.ok(count);
}
|
Stream
redis的MQ
1 2 3 4 5 6 7 8
| # 创建消息队列 XGROUP create stream.mqtt group1 0 MKSTREAM
# 加入消息(*代表id自动生成) XADD stream.mqtt * deviceId 5
# 消费消息(>代表从最早的去取,先进先出) xreadgroup group group1 customer count 1 streams stream.mqtt >
|
使用方式
注册监听器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Configuration public class RedisMQConfig { @Autowired private RedisMQListener redisMQListener; private RedisTemplate redisTemplate; private static final Logger log = LoggerFactory.getLogger(RedisMQConfig.class); @Autowired public RedisMQConfig(RedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; } @Bean public Subscription subscription(RedisConnectionFactory redisConnectionFactory) { StreamMessageListenerContainer.StreamMessageListenerContainerOptions options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder() .pollTimeout(Duration.ofSeconds(1)).build(); StreamMessageListenerContainer streamMessageListenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options); Subscription subscription = streamMessageListenerContainer.receiveAutoAck(Consumer.from("group1", "customer"), StreamOffset.create("stream.mqtt", ReadOffset.lastConsumed()), redisMQListener); streamMessageListenerContainer.start(); return subscription; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| @Component public class RedisMQListener implements StreamListener<String, MapRecord<String, String, Object>> { private static final Logger log = LoggerFactory.getLogger(RedisMQListener.class);
private static final ExecutorService threadPoolExecutor = Executors.newSingleThreadExecutor();
@Override public void onMessage(MapRecord message) {
threadPoolExecutor.execute(new MqttMSgHandle(message)); }
@AllArgsConstructor public class MqttMSgHandle implements Runnable{
private MapRecord message; @Override public void run() { System.out.println(Thread.currentThread().getName() + ":接收到的消息:" + message.getId() + ";" + JSON.toJSONString(message.getValue())); try { }catch (Exception e){ log.error("出现订单异常",e); handlePendingList(); } }
private void handlePendingList() { while (true){ try { }catch (Exception e){ log.error("出现pending-list订单异常",e); try { Thread.sleep(200); } catch (InterruptedException interruptedException) { interruptedException.printStackTrace(); } } } } } }
|
创建独立线程不断轮询
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| @Service public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); @PostConstruct private void init(){ SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandle()); } String queueName = "stream.orders"; public class VoucherOrderHandle implements Runnable{ @Override public void run() { while (true){ try { List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), StreamOffset.create(queueName, ReadOffset.lastConsumed()) ); if (list==null || list.isEmpty()) { continue; } MapRecord<String, Object, Object> record = list.get(0); Map<Object, Object> value = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true); createVoucherOrder(voucherOrder); stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId()); }catch (Exception e){ log.error("出现订单异常",e); handlePendingList(); } } }
private void handlePendingList() { while (true){ try { List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1), StreamOffset.create(queueName, ReadOffset.from("0")) ); if (list==null || list.isEmpty()) { break; } MapRecord<String, Object, Object> record = list.get(0); Map<Object, Object> value = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true); createVoucherOrder(voucherOrder); stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId()); }catch (Exception e){ log.error("出现pending-list订单异常",e); try { Thread.sleep(200); } catch (InterruptedException interruptedException) { interruptedException.printStackTrace(); } } } } }
}
|
持久化
美团二面:Redis持久化选RDB还是AOF?面试这么回答就过了_哔哩哔哩_bilibili
RDB(默认):采取的是定时把数据刷到磁盘中,以二进制文件存储,文件紧凑,优点是redis宕机数据恢复快,但是数据可靠性不强
AOF:将修改操作记录在AOF文件日志中,数据可靠性强,且也会做操作日志压缩,但是恢复数据依然很慢


主线程fork子线程进行RDB存储
子进程通过共享内存获取主进程数据
当主进程发生更改时才对子进程的数据进行拷贝,如果没有更改则直接通过共享内存刷入RDB
将快照写入临时的RDB文件
完成临时RDB写入替换原有的RDB文件
COW:写时拷贝的主要作用就是将拷贝推迟到写操作真正发生时,这也就避免了大量无意义的拷贝操作
父/子进程修改某个页时,该页的共享才结束,同时子进程分配该页大小的物理空间复制父进程对应页的内容。
这样,如果当子进程运行期间,父子进程都没有修改数据,那么操作系统就节省了大量的内存复制时间和占用空间
- 如果没有修改,直接通过共享内存做RDB存储。减少了大量内存拷贝的时间复制时间和空间占用
- 修改了则结束共享内存,再进行拷贝

为什么fork子线程?
- 避免了主线程阻塞,备份时也能执行主业务
- 共享内存(减少拷贝的前提) + COW机制(写时复制):减少拷贝空间和时间占用

AOF
最常用

- 写入缓冲区
- 每隔1s写入AOF日志文件
- 优化冗余操作日志

为什么AOF比RDB更加可靠?
因为AOF每秒都会执行,而RDB是通过配置文件定义修改了多少才进行备份(非实时性)
save 900 1
表示如果在900秒内至少有1个键被修改,则进行一次RDB快照
- AOF通过记录服务器接收到的每个写操作来实现持久化,这样即使在最坏的情况下(如突然断电),也只会丢失最近几秒钟的数据,因为AOF文件会在每秒或者每次执行命令后进行同步。
- RDB则是在指定的时间间隔内生成一个快照文件,如果在这期间发生故障,则可能丢失自上次快照以来的所有数据。
事务机制
内存淘汰
当 Redis 达到最大内存限制时,Redis会确切地使用配置好的最大内存策略指令来执行。相关策略如下:
noeviction:写满就不能写,默认淘汰策略
random:
- allkeys-random:在主键空间中,包含所有数据,随机移除某个key。
- volatile-random:在设置了过期时间的键空间中,随机移除某个key。
volatile-ttl:越早过期的数据先被淘汰
lru:
- allkeys-lru:在主键空间中,优先移除最近未使用的key。(推荐)
- volatile-lru:在设置了过期时间的键空间中,优先移除最近未使用的key。
这就有个问题,同时有1s前访问5次的数据和5s前访问1000次的数据时,会先删除5s前访问1000次的数据
lfu:lru的基础上增加请求次数统计,更精准代表热点数据
修改内存策略:
在redis.conf
文件加入
1 2
| maxmemory 300mb maxmemory-policy allkeys-lru
|
Redisson
Redisson原理
Redisson的watchDog机制
主从同步
主节点执行写操作,从节点执行读操作。任务分工,减少读写阻塞,但是这就带来了主从节点数据不一致的问题
主从同步操作:当主节点执行持久化时将保存的RDB文件同步给从节点,在传输这期间可能也会执行其他的命令,这时候将命令写在txt文件里发送给从节点
哨兵机制(Sentinel)

当主节点挂掉了怎么办,这个时候需要选举新的主节点,那如何监测节点是否存活呢?又该选举谁当新的主节点呢?
方法是单独分出来一些节点充当管理员,管理员也被成为哨兵,哨兵会时不时监测主从节点的存活状态

- 每
1s
询问所有节点,对节点发送ping
:当一个哨兵发现R1服务没有反应,会被定义为主观下线,这个时候需要征求其他哨兵访问R1的状态,如果都没有反应则可以认为是客观下线

回到开始的问题,如果主节点下线了,该选择谁为主节点?
- 根据优先级,配置越高优先级越高
- 根据最近节点,保障数据最新
- 根据复制偏移量,越大则越全
复制偏移量:是指令的缓存区最后的数据,越大证明数据越新


面试题
Redis的大Key问题如何解决?
产生问题:
- 内存占用
- 网络传输延迟
- 持久化备份降速
解决方案:
- 大key转化为多个小key
比如把一个大的Hash结构分别转化为多个Hash结构:

搭建Redis集群,把可以分配到不同的Hash slot槽所在的分片上
压缩算法,将数据压缩到Redis在使用时对其解压缩
String类型底层实现
Redis篇——String类型在Redis中底层存储数据结构,全方位分析底层存储原理!_redis string 底层数据结构-CSDN博客
底层实现是SDS(simple dynamic string),中文翻译为简单动态字符串。它是一个动态字符串结构,由长度、空闲空间和字节数组三部分组成。
SDS有三种编码类型:
1、embstr:占用64Bytes的空间,存储44Bytes的数据
embstr不会重新再次开辟空间,而是会嵌入在redisObject里面
(1)前19个字节用于存储embstr结构
(2)中间44个字节存储数据
(3)最后都为\0符号
2、raw:存储大于44Bytes的数据
此时动态字符串SDS的内存与其依赖的redisOject的内存不在连续了

3、int:存储整数类型
注意:如果之前这个存储的字符串的长度并未达到44,更新后的长度也没有达到44字节,它的底层存储结构还是会变成RAW



存储器的速度比较
寄存器 > Cache > RAM > ROM/硬盘等其他持久存储设备。
Redis为什么这么快?
- 内存存储数据
- 单线程
多线程主要提高了吞吐量,而其中多线程竞争需要加锁也会造成时间消耗。并且redis数据是保存在内存中的读写速度快,可以避免切换上下文的开销。再者,redis使用了IO复用,单线程也能高效的处理大量并发连接!
Redis是单线程的原因主要在于其设计和使用的内存数据结构。以下是几个关键因素:
- 内存操作的高效性:Redis的数据全部存储在内存中,针对内存的数据进行读写操作都非常快。在这种情况下,单线程能够高效地处理这些操作,因为避免了多线程之间的同步和上下文切换的开销。
- 避免多线程竞争:在单线程模型中,所有请求都是串行执行的,没有多线程的困扰,因此不需要考虑锁的复杂度和线程之间的竞争问题。这简化了系统设计,并提高了操作的确定性。
- I/O多路复用技术:Redis使用了I/O多路复用技术(如Linux中的epoll、kqueue或select/poll等),这使得单个线程能够同时监控多个文件描述符(客户端连接)的状态变化。当有新的客户端连接请求或已有连接上有数据可读写时,I/O多路复用函数会通知Redis主线程进行处理。这种技术使得单线程能够高效地处理大量并发连接。
IO多路复用
优秀的数据结构
如跳表提升了查询速率!