Redis

Redis

Nosql,内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件。key-value不仅仅是key-value。

官网

安装(Mac)

  1. 使用Homebrew下载redis
  2. brew search redis
  3. 找到最新版,现在稳定版为redis@4.0 brew install redis@4.0,等待下载完成即可
  4. 下载完后会有提示If you need to have redis@4.0 first in your PATH run: echo 'export PATH="/usr/local/opt/redis@4.0/bin:$PATH"' >> ~/.zshrc 就是很简单配置路径,复制粘贴回车即可。

启动

redis-server 启动服务端

redis-cli -p 6379 开启客户端

数据类型

基本数据类型

  1. String:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
> set k1 v1		# 存取
OK
> get k1
"v1"

> set counter 100
OK
> incr counter # 原子自增,仅整数
(integer) 101
> incrby counter 50 # 增加额定数量
(integer) 151
> decr counter # 原子自减
(integer) 150
> decrby counter 50 # 减去额定数量
(integer) 100

> mset a 10 b 20 c 30 # 批量操作
OK
> mget a b c
1) "10"
2) "20"
3) "30"

> exists k1 # 查找key是否存在,存在返回1
(integer) 1
> del k1 # 删除当前key-value,成功返回1
(integer) 1
> exists k1
(integer) 0

> set k2 v2
OK
> type k2 # 查看数据类型
string

> expire k2 5 # 设置数据超时时间,成功返回1
(integer) 1
> get k2 # 在有效时间内可获取
"some-value"
> get key # 5秒超时后
(nil)
> set k3 v3 ex 10 # 可以在存数据时设置超时时间
OK
> ttl k3
(integer) 9
  1. List: 双向添加特性,lpush从list左边添加一个数据,rpush从右边添加一个数据,lpop左边删除,rpop右边删除,lrange获取指定范围元素。消息队列,栈。
1
2
3
4
5
6
7
8
9
10
> rpush mylist A		
(integer) 1
> rpush mylist B
(integer) 2
> lpush mylist first
(integer) 3
> lrange mylist 0 -1
1) "first"
2) "A"
3) "B"
  1. **Set:**不能重复
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
> sadd myset 1 2 3  # 增加set不排序
(integer) 3

> smembers myset # 查看成员
1) 3
2) 1
3) 2

> sismember myset 3 # 检测成员是否存在
(integer) 1
> sismember myset 30
(integer) 0
> scard myset # 获取set集合个数
(integer) 3

> srem myset 1 # 移除某个
(integer) 1

> srandmember myset # 随机取出一个值
"3"

> sadd myset2 "set2"
OK
> smove myset mysey2 "1" # 将一个指定的值,移 动到另一个set集合中
(integer) 1

#####
> sadd myset2 2
OK
> sdiff myset myset2 # 差集
1) "1"
2) "3"
3) "set2"

> sinter myset myset2 # 交集
1) "2"

> sunion myset myset2 # 并集
1) "1"
2) "2"
3) "3"
4)"set2"
  1. Hash:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
> hmset user:1000 username antirez birthyear 1977 verified 1   # 一次存入多个键值对
OK
> hget user:1000 username # 查询单个key
"antirez"
> hget user:1000 birthyear
"1977"

> hgetall user:1000 #查询所有
1) "username"
2) "antirez"
3) "birthyear"
4) "1977"
5) "verified"
6) "1"

> hmget user:1000 username birthyear no-such-field # 查询多个key
1) "antirez"
2) "1977"
3) (nil)

> hincrby user:1000 birthyear 10 # 增减操作
(integer) 1987
> hincrby user:1000 birthyear 10
(integer) 1997
  1. **Zset:**排序的set
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
> zadd hackers 1940 "Alan Kay"		# 添加 1940相当于排序的权值
(integer) 1
> zadd hackers 1957 "Sophie Wilson"
(integer 1)
> zadd hackers 1953 "Richard Stallman"
(integer) 1
> zadd hackers 1949 "Anita Borg"
(integer) 1
> zadd hackers 1965 "Yukihiro Matsumoto"
(integer) 1
> zadd hackers 1914 "Hedy Lamarr"
(integer) 1
> zadd hackers 1916 "Claude Shannon"
(integer) 1
> zadd hackers 1969 "Linus Torvalds"
(integer) 1
> zadd hackers 1912 "Alan Turing"
(integer) 1

> zrange hackers 0 -1 # 获取所有数据正序排序
1) "Alan Turing"
2) "Hedy Lamarr"
3) "Claude Shannon"
4) "Alan Kay"
5) "Anita Borg"
6) "Richard Stallman"
7) "Sophie Wilson"
8) "Yukihiro Matsumoto"
9) "Linus Torvalds"

> zrevrange hackers 0 -1
1) "Linus Torvalds"
2) "Yukihiro Matsumoto"
3) "Sophie Wilson"
4) "Richard Stallman"
5) "Anita Borg"
6) "Alan Kay"
7) "Claude Shannon"
8) "Hedy Lamarr"
9) "Alan Turing"

> zrange hackers 0 -1 withscores # 获取所有包括权值
1) "Alan Turing"
2) "1912"
3) "Hedy Lamarr"
4) "1914"
5) "Claude Shannon"
6) "1916"
7) "Alan Kay"
8) "1940"
9) "Anita Borg"
10) "1949"
11) "Richard Stallman"
12) "1953"
13) "Sophie Wilson"
14) "1957"
15) "Yukihiro Matsumoto"
16) "1965"
17) "Linus Torvalds"
18) "1969"

> zrangebyscore hackers -inf 1950 # 范围操作,获取1950之前的人
1) "Alan Turing"
2) "Hedy Lamarr"
3) "Claude Shannon"
4) "Alan Kay"
5) "Anita Borg"

> zremrangebyscore hackers 1940 1960 # 范围删除
(integer) 4

> zrank hackers "Anita Borg" # 获取序列位置
(integer) 4

特殊数据类型

  1. Geospatial 地理位置

    附近的人,计算距离。

    geoadd 添加地理位置,经纬。

    两级无法添加,城市文件导入

    1
    2
    3
    4
    5
    > > geoadd china:city 116.40 39.90 beijing
    > (integer) 1
    > > geoadd china:city 121.47 31.23 shanghai 114.05 22.52 shenzhen
    > (integer) 1
    >

    geopos 获取指定城市位置

    1
    2
    3
    4
    5
    6
    > > geopos china:city beijing shanghai
    > !) 1)"116.39999896287918091"
    > 2)"39.90000009167092543"
    > 2) 1)"121.46899000983782923"
    > 2)"31.23228378291833803"
    >

    geodist 返回两个给定位置之间的距离

    1
    2
    3
    > > geodist china:city beijing shanghai km
    > "1067.3788"
    >

    georadius 以给定经纬度为中心,查找一定范围内的人/城市

    1
    2
    3
    > > georadius china:city 110.00 30.00 1000km  # georadius 查找的列表 中心经纬度 查找半径 withdist withcoord count 1 指定查询最大数量
    > 1)"shenzhen"
    >

    georadiusbymember 找出位于指定范围内的元素,中心点是由给定的位置元素决定=找出位于指定元素周围的其他元素

    1
    2
    3
    > georadiusbymember china:city beijing 1000 km
    > 1) "beijing"
    >
  2. Hyperloglog 基数统计

    什么是基数

    A {1,3,5,7,8,7}

    B {1,3,5,7,8}

    基数 (不重复的元素) = 5 ,可以接受误差!

    用作基数统计的算法,网页的UV(一个人访问一个网站多次,但还是算作一个人)

    传统方式,set保存用户id,然后就可以统计set中的元素数量作为标准判断。

    这个方式如果保存大量用户的ID就会比较麻烦,目的是为了计数,而不是保存用户id

    优点:占用内存是固定的,2^64不同的元素的基数,只需要12KB的内存,如果从内存角度来比较的话十分首选。

    0.81%的错误率

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    > pfadd mykey a b c d e f g h i j		# 添加数据
    (integer) 1
    > pfcount mykey # 统计不同的有多少
    (integer) 10
    > pfadd mykey2 i j z x c v b n m
    (integer) 1
    > pfmerge mykey3 mykey mekey2 # 把mykey和mykey2合并为一个集合
    OK
    > pfcount mykey3 # 统计两个的不同值
    (integer) 15
  3. Bitmap 位图场景

    位存储

    统计用户信息,只有两个状态的,都可以使用Bitmaps

    只有0和1两个状态

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    > setbit sign 0 1			# 存数据,表示0这个位置当天状态是1
    (integer) 0
    > setbit sign 1 0 # 表示1这个位置当天状态是0
    (integer) 0
    > getbit sign 0 # 获取某位置的状态
    (integer) 1
    > getbit sign 1
    (integer) 0

    > bitcount sign # 获取列表全部状态为1的总数
    (integer) 1

事务

multi(创建一个事务)、exec(执行这个事务)、discard(清空事务队列)和 watch(监控)是 Redis 事务相关的命令。事务可以一次执行多个命令, 并且带有以下两个重要的保证:

  • 事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
  • 事务是一个原子操作:事务中的命令要么全部被执行,要么全部都不执行。

multi命令用于开启一个事务,它总是返回 OK 。 multi执行之后, 客户端可以继续向服务器发送任意多条命令, 这些命令不会立即被执行, 而是被放到一个队列中, 当exec命令被调用时, 所有队列中的命令才会被执行。

通过调用 discard, 客户端可以清空事务队列, 并放弃执行事务。

watch 使得 exec 命令需要有条件地执行: 事务只能在所有被监视键都没有被修改的前提下执行, 如果这个前提不能满足的话,事务就不会被执行。被 watch的键会被监视,并会发觉这些键是否被改动过了。 如果有至少一个被监视的键在 exec 执行之前被修改了, 那么整个事务都会被取消, exec返回nil-reply来表示事务已经失败。简单来说,就是给该数据加了乐观锁,并且支持多个同时添加。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
> multi		# 开启事务
OK
> set k1 1 # 添加操作
QUEUED
> set k2 2
QUEUED
> exec # 执行事务
1) (integer) 1
2) (integer) 1

> MULTI
OK
> INCR k1
QUEUED
> DISCARD # 放弃事务
OK
> GET k1
"1"

> WATCH mykey # 监控mykey这个值
OK
val = GET mykey # 在其他客户端被修改
val = val + 1
> MULTI
OK
> SET mykey val # 尝试更改mykey到value
QUEUED
> EXEC # 执行失败
(nil)

Jedis

就是一个java整合了redis 的各种api工具,直接使用,由于和redis命令几乎相同,不做记录,放上别人的博客和官方API文档。

Jedis API常用方法

Jedis API 官方文档

Springboot整合

springData 操作数据的整合,jdbc,mongodb,redis,Jpa…

springboot2.x后不使用jedis变成lettuce

jedis:采用直连,多个线程操作是不安全的,如果想要避免不安全,使用jedis pool连接池,BIO。

lettuce: 采用netty,实例可以在多个线程中进行共享,不存在线程不安全的情况!可以减少线程数据来,更像NIO模式

配置类源码分析

spingboot所有配置类,都有一个自动配置类 RedisAutoConfiguration

自动配置类都会绑定一个 properties 配置文件 RedisProperties

找到这个配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

@Configuration(
proxyBeanMethods = false
)
@ConditionalOnClass({RedisOperations.class})
@EnableConfigurationProperties({RedisProperties.class})
@Import({LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class})
public class RedisAutoConfiguration {
public RedisAutoConfiguration() {
}

@Bean
@ConditionalOnMissingBean(
name = {"redisTemplate"}
) /* 我们可以自定义一个redisTemplate来替换掉这个默认的 */
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
/* 默认的 redisTemplate 没有过多的设置,在redis 对象保存需要序列化
* 两个泛型都是Object Object,使用时需要强转成 String Object
*/
RedisTemplate<Object, Object> template = new RedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}

@Bean
@ConditionalOnMissingBean /* String是由redis中最常使用的类型,单独提出一个bean */
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
}

配置文件源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@ConfigurationProperties(
prefix = "spring.redis"
)
public class RedisProperties {
private int database = 0;
private String url;
private String host = "localhost";
private String password;
private int port = 6379;
private boolean ssl;
private Duration timeout;
private String clientName;
private RedisProperties.Sentinel sentinel;
private RedisProperties.Cluster cluster;
private final RedisProperties.Jedis jedis = new RedisProperties.Jedis();
private final RedisProperties.Lettuce lettuce = new RedisProperties.Lettuce();
/* 省略了方法... */
}

整合测试

  1. 导入依赖

    1
    2
    3
    4
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
  2. 配置连接

1
2
3
4
5
6
7
# application.yml
spring:
redis:
host: 127.0.0.1
port: 6379
...
...
  1. 测试

    ops里几乎包含了所有redis操作。

    1
    2
    //连接操作
    RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();

配置连接池时使用lettuce实现类,因为源码中jedis的类没有注入成功。

自定义Redis配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.redis.spring.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ser.std.StringSerializer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {

@Bean
@SuppressWarnings("all")
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
//一般使用String,Object
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);

//JSON序列化配置
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.activateDefaultTyping(objectMapper.getPolymorphicTypeValidator(),ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.WRAPPER_ARRAY);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);

//String序列化
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

//key采用String的序列化方式
template.setKeySerializer((stringRedisSerializer));
//hash也
template.setHashKeySerializer(stringRedisSerializer);
//value序列化方式采用jackson
template.setValueSerializer(jackson2JsonRedisSerializer);
//hash的value序列化方式采用jackson
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();

return template;
}

}

工具类封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
package com.redis.spring.util;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

@Component
public final class RedisUtil {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

// =============================common============================
/**
* 指定缓存失效时间
* @param key 键
* @param time 时间(秒)
*/
public boolean expire(String key, long time) {
try {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 根据key 获取过期时间
* @param key 键 不能为null
* @return 时间(秒) 返回0代表为永久有效
*/
public long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}


/**
* 判断key是否存在
* @param key 键
* @return true 存在 false不存在
*/
public boolean hasKey(String key) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 删除缓存
* @param key 可以传一个值 或多个
*/
@SuppressWarnings("unchecked")
public void del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete(CollectionUtils.arrayToList(key));
}
}
}


// ============================String=============================

/**
* 普通缓存获取
* @param key 键
* @return
*/
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}

/**
* 普通缓存放入
* @param key 键
* @param value 值
* @return true成功 false失败
*/

public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 普通缓存放入并设置时间
* @param key 键
* @param value 值
* @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期
* @return true成功 false 失败
*/

public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 递增
* @param key 键
* @param delta 要增加几(大于0)
*/
public long incr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递增因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, delta);
}


/**
* 递减
* @param key 键
* @param delta 要减少几(小于0)
*/
public long decr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递减因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, -delta);
}


// ================================Map=================================

/**
* HashGet
* @param key 键 不能为null
* @param item 项 不能为null
*/
public Object hget(String key, String item) {
return redisTemplate.opsForHash().get(key, item);
}

/**
* 获取hashKey对应的所有键值
* @param key 键
* @return 对应的多个键值
*/
public Map<Object, Object> hmget(String key) {
return redisTemplate.opsForHash().entries(key);
}

/**
* HashSet
* @param key 键
* @param map 对应多个键值
*/
public boolean hmset(String key, Map<String, Object> map) {
try {
redisTemplate.opsForHash().putAll(key, map);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* HashSet 并设置时间
* @param key 键
* @param map 对应多个键值
* @param time 时间(秒)
* @return true成功 false失败
*/
public boolean hmset(String key, Map<String, Object> map, long time) {
try {
redisTemplate.opsForHash().putAll(key, map);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value) {
try {
redisTemplate.opsForHash().put(key, item, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value, long time) {
try {
redisTemplate.opsForHash().put(key, item, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 删除hash表中的值
*
* @param key 键 不能为null
* @param item 项 可以使多个 不能为null
*/
public void hdel(String key, Object... item) {
redisTemplate.opsForHash().delete(key, item);
}


/**
* 判断hash表中是否有该项的值
*
* @param key 键 不能为null
* @param item 项 不能为null
* @return true 存在 false不存在
*/
public boolean hHasKey(String key, String item) {
return redisTemplate.opsForHash().hasKey(key, item);
}


/**
* hash递增 如果不存在,就会创建一个 并把新增后的值返回
*
* @param key 键
* @param item 项
* @param by 要增加几(大于0)
*/
public double hincr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, by);
}


/**
* hash递减
*
* @param key 键
* @param item 项
* @param by 要减少记(小于0)
*/
public double hdecr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, -by);
}


// ============================set=============================

/**
* 根据key获取Set中的所有值
* @param key 键
*/
public Set<Object> sGet(String key) {
try {
return redisTemplate.opsForSet().members(key);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}


/**
* 根据value从一个set中查询,是否存在
*
* @param key 键
* @param value 值
* @return true 存在 false不存在
*/
public boolean sHasKey(String key, Object value) {
try {
return redisTemplate.opsForSet().isMember(key, value);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 将数据放入set缓存
*
* @param key 键
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSet(String key, Object... values) {
try {
return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}


/**
* 将set数据放入缓存
*
* @param key 键
* @param time 时间(秒)
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSetAndTime(String key, long time, Object... values) {
try {
Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0)
expire(key, time);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}


/**
* 获取set缓存的长度
*
* @param key 键
*/
public long sGetSetSize(String key) {
try {
return redisTemplate.opsForSet().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}


/**
* 移除值为value的
*
* @param key 键
* @param values 值 可以是多个
* @return 移除的个数
*/

public long setRemove(String key, Object... values) {
try {
Long count = redisTemplate.opsForSet().remove(key, values);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}

// ===============================list=================================

/**
* 获取list缓存的内容
*
* @param key 键
* @param start 开始
* @param end 结束 0 到 -1代表所有值
*/
public List<Object> lGet(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}


/**
* 获取list缓存的长度
*
* @param key 键
*/
public long lGetListSize(String key) {
try {
return redisTemplate.opsForList().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}


/**
* 通过索引 获取list中的值
*
* @param key 键
* @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
*/
public Object lGetIndex(String key, long index) {
try {
return redisTemplate.opsForList().index(key, index);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}


/**
* 将list放入缓存
*
* @param key 键
* @param value 值
*/
public boolean lSet(String key, Object value) {
try {
redisTemplate.opsForList().rightPush(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 将list放入缓存
* @param key 键
* @param value 值
* @param time 时间(秒)
*/
public boolean lSet(String key, Object value, long time) {
try {
redisTemplate.opsForList().rightPush(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}

}


/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @return
*/
public boolean lSet(String key, List<Object> value) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}

}


/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lSet(String key, List<Object> value, long time) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 根据索引修改list中的某条数据
*
* @param key 键
* @param index 索引
* @param value 值
* @return
*/

public boolean lUpdateIndex(String key, long index, Object value) {
try {
redisTemplate.opsForList().set(key, index, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 移除N个值为value
*
* @param key 键
* @param count 移除多少个
* @param value 值
* @return 移除的个数
*/

public long lRemove(String key, long count, Object value) {
try {
Long remove = redisTemplate.opsForList().remove(key, count, value);
return remove;
} catch (Exception e) {
e.printStackTrace();
return 0;
}

}

}

redis.conf配置文件分析

启动的时候可以使用不同的配置文件启动redis

配置文件 unit单位 对大小写不敏感。

导入其他配置文件:

1
2
include /path/to/local.conf
include /path/to/other.conf

网络

1
2
3
bind 127.0.0.1		# 绑定ip
protected-mode yes # 保护模式
port 6379 # 端口设置

通用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
################################# GENERAL #####################################

daemonize yes # 以守护进程的方式运行,默认是no,需要自行开启

pidfile /var/run/redis_6379.pid # 如果以后台的方式运行,需要指定一个pid文件
# Specify the server verbosity level.
# This can be one of:
# debug (a lot of information, useful for development/testing)
# verbose (many rarely useful info, but not a mess like the debug level)
# notice (moderately verbose, what you want in production probably)
# warning (only very important / critical messages are logged)
loglevel notice
# Specify the log file name. Also the empty string can be used to force
# Redis to log on the standard output. Note that if you use standard
# output for logging but daemonize, logs will be sent to /dev/null
logfile "" # 日志文件位置名

# Set the number of databases. The default database is DB 0, you can select
# a different one on a per-connection basis using SELECT <dbid> where
# dbid is a number between 0 and 'databases'-1
databases 16 # 数据库的数量,默认16个

always-show-logo yes # 显示logo

快照

持久化,在规定时间内,执行了多少次操作,则会持久化到文件以 .rdb. .aof 后缀

redis内存数据库,持久化尤为重要

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
################################ SNAPSHOTTING  ################################

# Save the DB on disk:
#
# save <seconds> <changes>
#
# Will save the DB if both the given number of seconds and the given
# number of write operations against the DB occurred.
#
# In the example below the behaviour will be to save:
# after 900 sec (15 min) if at least 1 key changed
# after 300 sec (5 min) if at least 10 keys changed
# after 60 sec if at least 10000 keys changed
#
# Note: you can disable saving completely by commenting out all "save" lines.
#
# It is also possible to remove all the previously configured save
# points by adding a save directive with a single empty string argument
# like in the following example:
#
# save ""

# 例:如果900s内,至少有一个key进行了修改,则进行持久化操作
save 900 1
save 300 10
save 60 10000

stop-writes-on-bgsave-error yes # 如果持久化出错,是否继续工作

# Compress string objects using LZF when dump .rdb databases?
# For default that's set to 'yes' as it's almost always a win.
# If you want to save some CPU in the saving child set it to 'no' but
# the dataset will likely be bigger if you have compressible values or keys.
rdbcompression yes # 是否压缩 rdb 文件。需要消耗一些cpu

rdbchecksum yes # 保存rdb文件的时候,进行错误的检查校验

# Note that you must specify a directory here, not a file name.
dir /usr/local/var/db/redis/ # 保存路径

安全

可以设置redis密码,默认无

1
2
3
4
5
6
7
> config get requirepass		# 获取redis的密码
1)"requirepass"
2)""
> config set requirepass "123456" # 设置密码
OK
> auth 123456 # 设置完密码后需要重新使用密码登录redis
OK

限制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
################################### CLIENTS ####################################
# Set the max number of connected clients at the same time. By default
# this limit is set to 10000 clients, however if the Redis server is not
# able to configure the process file limit to allow for the specified limit
# the max number of allowed clients is set to the current file limit
# minus 32 (as Redis reserves a few file descriptors for internal uses).
#
# Once the limit is reached Redis will close all the new connections sending
# an error 'max number of clients reached'.
#
# maxclients 10000 # 设置能连接上redis的最大客户端容量

############################## MEMORY MANAGEMENT ################################

maxmemory <bytes> # redis配置的最大内存容量

# maxmemory-policy noeviction # 内存达到上限之后的处理策略
# LRU means Least Recently Used
# LFU means Least Frequently Used
# 1.volatile-lru -> 加入key的时候如果过限,首先从设置了过期时间的key集合中逐出最久没有使用的key。
# 2.allkeys-lru -> 加入key的时候,如果过限,首先通过LRU算法逐出最久没有使用的key。
# 3.volatile-lfu -> 从所有配置了过期时间的key中逐出使用频率最少的key。
# 4.allkeys-lfu -> 从所有key中逐出使用频率最少的key。
# 5.volatile-random -> 加入key的时候如果过限,从过期key集合中随机逐出。
# 6.allkeys-random -> 加入key的时候如果过限,从所有key中随机删除。
# 7.volatile-ttl -> 从配置了过期时间的key中逐出马上就要过期的key。
# 8.noeviction -> 当内存使用超过配置的时候会返回错误,不会逐出任何key。

redis的key缓存淘汰策略LRU和LFU

官网说:当某个key被设置了过期时间之后,客户端每次对该key的访问(读写)都会事先检测该key是否过期,如果过期就直接删除;但有一些键只访问一次,因此需要主动删除,默认情况下redis每秒检测10次,检测的对象是所有设置了过期时间的键集合,每次从这个集合中随机检测20个键查看他们是否过期,如果过期就直接删除,如果删除后还有超过25%的集合中的键已经过期,那么继续检测过期集合中的20个随机键进行删除。这样可以保证过期键最大只占所有设置了过期时间键的25%。

LRU实现

Redis维护了一个24位时钟,可以简单理解为当前系统的时间戳,每隔一定时间会更新这个时钟。每个key对象内部同样维护了一个24位的时钟,当新增key对象的时候会把系统的时钟赋值到这个内部对象时钟。比如我现在要进行LRU,那么首先拿到当前的全局时钟,然后再找到内部时钟与全局时钟距离时间最久的(差最大)进行淘汰,这里值得注意的是全局时钟只有24位,按秒为单位来表示才能存储194天,所以可能会出现key的时钟大于全局时钟的情况,如果这种情况出现那么就两个相加而不是相减来求最久的key。

Redis中的LRU与常规的LRU实现并不相同,常规LRU会准确的淘汰掉队头的元素,但是Redis的LRU并不维护队列,只是根据配置的策略要么从所有的key中随机选择N个(N可以配置)要么从所有的设置了过期时间的key中选出N个键,然后再从这N个键中选出最久没有使用的一个key进行淘汰。

LFU实现

如果出现key1的时间比key2时间要久,但key1比key2使用更频繁,所以合理的淘汰策略应该删除key2而不是key1,LFU就是解决这种问题而生。

LFU把原来的key对象的内部时钟的24位分成两部分,前16位还代表时钟,后8位代表一个计数器。16位的情况下如果还按照秒为单位就会导致不够用,所以一般这里以时钟为单位。而后8位表示当前key对象的访问频率,8位只能代表255,但是redis并没有采用线性上升的方式,而是通过一个复杂的公式,通过配置两个参数来调整数据的递增速度。
下图从左到右表示key的命中次数,从上到下表示影响因子,在影响因子为100的条件下,经过10M次命中才能把后8位值加满到255.

1
2
3
4
5
6
7
8
9
10
11
# +--------+------------+------------+------------+------------+------------+
# | factor | 100 hits | 1000 hits | 100K hits | 1M hits | 10M hits |
# +--------+------------+------------+------------+------------+------------+
# | 0 | 104 | 255 | 255 | 255 | 255 |
# +--------+------------+------------+------------+------------+------------+
# | 1 | 18 | 49 | 255 | 255 | 255 |
# +--------+------------+------------+------------+------------+------------+
# | 10 | 10 | 18 | 142 | 255 | 255 |
# +--------+------------+------------+------------+------------+------------+
# | 100 | 8 | 11 | 49 | 143 | 255 |
# +--------+------------+------------+------------+------------+------------+

上面说的情况是key一直被命中的情况,如果一个key经过几分钟没有被命中,那么后8位的值是需要递减几分钟,具体递减几分钟根据衰减因子lfu-decay-time来控制。

上面递增和衰减都有对应参数配置,那么对于新分配的key呢?如果新分配的key计数器开始为0,那么很有可能在内存不足的时候直接就给淘汰掉了,所以默认情况下新分配的key的后8位计数器的值为5(应该可配资),防止因为访问频率过低而直接被删除。

低8位我们描述完了,那么高16位的时钟是用来干嘛的呢?目前我的理解是用来衰减低8位的计数器的,就是根据这个时钟与全局时钟进行比较,如果过了一定时间(做差)就会对计数器进行衰减。

aof配置

1
2
3
4
5
6
7
8
9
############################## APPEND ONLY MODE ###############################

appendonly no # 默认不开启aof,默认是rdb

appendfilename "appendonly.aof" # 持久化文件名字

# appendfsync always # 每次修改都会sync,消耗性能
appendfsync everysec # 每秒执行一次sync,可能会丢失1s的数据
# appendfsync no # 不执行sync 操作系统自己同步数据,速度最快

Redis持久化

RDB

RDB持久化是把当前进程数据生成快照保存到硬盘的过程,触发RDB持久化过程分为手动触发和自动触发

RDB的优点:

·RDB是一个紧凑压缩的二进制文件,代表Redis在某个时间点上的数据 快照。非常适用于备份,全量复制等场景。比如每6小时执行bgsave备份, 并把RDB文件拷贝到远程机器或者文件系统中(如hdfs),用于灾难恢复。

·Redis加载RDB恢复数据远远快于AOF的方式。

RDB的缺点:

·RDB方式数据没办法做到实时持久化/秒级持久化。因为bgsave每次运 行都要执行fork操作创建子进程,属于重量级操作,频繁执行成本过高。

·RDB文件使用特定二进制格式保存,Redis版本演进过程中有多个格式 的RDB版本,存在老版本Redis服务无法兼容新版RDB格式的问题。

针对RDB不适合实时持久化的问题,Redis提供了AOF持久化方式来解决。

AOF

AOF(append only file)持久化:以独立日志的方式记录每次写命令, 重启时再重新执行AOF文件中的命令达到恢复数据的目的。AOF的主要作用 是解决了数据持久化的实时性,目前已经是Redis持久化的主流方式

开启AOF功能需要设置配置:appendonly yes,默认不开启。AOF文件名 通过appendfilename配置设置,默认文件名是appendonly.aof。保存路径同 RDB持久化方式一致,通过dir配置指定。AOF的工作流程操作:命令写入 (append)、文件同步(sync)、文件重写(rewrite)、重启加载 (load)

  1. 所有的写入命令会追加到aof_buf(缓冲区)中。

  2. AOF缓冲区根据对应的策略向硬盘做同步操作。

AOF为什么把命令追加到aof_buf中?Redis使用单线程响应命令,如 果每次写AOF文件命令都直接追加到硬盘,那么性能完全取决于当前硬盘负 载。先写入缓冲区aof_buf中,还有另一个好处,Redis可以提供多种缓冲区同步硬盘的策略,在性能和安全性方面做出平衡

  1. 随着AOF文件越来越大,需要定期对AOF文件进行重写,达到压缩的目的。

Redis发布订阅

pub/sub时一种消息通信模式

三部分:消息发送者,频道,消息订阅者

订阅端

1
2
3
4
5
6
7
8
9
10
11
12
13
> subscribe weather		# 订阅天气频道
Reading messages...(press Ctrl-C to quit)
1)"subscribe"
2)"weather"
3)(integer) 1
# 等待读取推送的消息
1)"message" # 消息
2)"weather" # 那个频道的消息
3)"sunny" #消息的具体内容

1)"message"
2)"weather"
3)"rain"

发送端

1
2
3
4
> publish weather "sunny"			# 发布消息到频道
(integer)1
> publish weather "rain"
(integer)1

使用场景

  1. 实时消息(比如金十数据,微博…)
  2. 实时聊天室
  3. 订阅关注(微信,直播平台订阅功能…)

配合MQ使用

Redis主从复制

概念

主从复制,是将一台redis的服务器的数据复制到其他redis服务器,前者主节点master,后者从节点slave,数据复制是单向的。读写分离,主机以写为主,从机以读为主,可以减缓服务器压力,至少三台。

主要功能

  1. 数据冗余
  2. 故障恢复
  3. 负载均衡
  4. 高可用(集群)基石

一台redis是不能的(宕机)

  1. 结构上,单个会发生单点故障
  2. 容量上,单台redis最大使用内存不超过20G

环境配置

只配置从库,不配置主库,默认为主库。

配置多个不同端口的配置文件,分别启动。

一主二从:一主79,二从80,81

1
2
> info replication # 查看主从配置情况
> slaveof 127.0.0.1 6379 # 在从机上使用,确认主机地址

在.conf配置文件中可以永久配置从机跟从的主机ip/port

主机可以写,从机不能写只能读!主机中所有信息和数据都会被从机保存。主机断开,从机依旧连接到主机,但是没有写操作,这个时候主机恢复,还是可以读到数据,从机断开也一样。

复制原理

slave启动成功连接到master后会发送一个sync同步命令

  • 全量复制:每个slave只要连接到master就会进行一次全部数据复制。
  • 增量复制:新增的数据依次传给slave,完成同步

层层链路

中间的从机当连接的主机断开了,自己变成主机。

哨兵模式

自动监控主机是否故障,如果故障根据投票数自动将从库转换为主库。

哨兵是一个独立的进程,哨兵通过发送命令,等待redis服务器响应,从而监控运行多个redis实例。

多哨兵模式:哨兵之间互相监控

1
2
> vim sentinel.conf
sentinel monitor myredis 127.0.0.1 6379 1 # 被监控的名称

后面的数字表示主机挂了的时候通过从机选出新的主机来投票。failover故障转移,随机在从机中选一个为主机,如果上一个主机恢复了,会变成新主机的从机。

优点:

  1. 哨兵集群,基于主从复制。
  2. 主从可以切换,故障可以转移,系统可用性会更好
  3. 哨兵就是主从模式的升级,更加健壮

缺点:

  1. redis 不好在线扩容,集群一旦到达上限,在线扩容十分麻烦

  2. 实现哨兵模式的配置是很麻烦,有很多选择:

    多哨兵集群配置哨兵端口

    定义工作目录

    监控主机ip port

    主机密码

    默认延时操作

    并行同步时间

    故障转移时间

    .sh通知脚本

    主节点

Redis缓存穿透和雪崩

如果对数据一致性问题要求比较高就无法使用缓存。

缓存穿透:查不到

如果查询一个数据缓存中没有命中,于是向持久层数据库查询,发现没有,于是查询失败,当用户多的时候请求多次持久层数据库,会造成持久层数据库压力,造成缓存穿透。

解决方案

  1. 布隆过滤器

    首先是对所有可能查询的参数以hash形式存储,当用户想要查询的时候,使用布隆过滤器发现不在集合中,就直接丢弃,不再对持久层查询。

  2. 缓存空对象:当存储层不命中后,即使返回的空对象也将其缓存起来,同时会设置一个过期时间,之后再访问这个数据将会从缓存中获取,保护了后端数据源;

    造成问题:1. 造成空值较多 2.空值时间过期不一致


缓存击穿:量太大缓存过期

指一个key非常热点,在不停扛高并发,集中对这个点访问,在这个key失效的瞬间(过期),就有大量的请求并发访问持久层数据库,造成数据库压力,形成缓存击穿。

解决方案

  1. 设置热点不过期(影响效率不考虑)

  2. 使用互斥锁(mutex key):

    业界比较常用的做法。简单地来说,就是在缓存失效的时候(判断拿出来的值为空),不是立即去访问持久层数据库,而是先使用缓存工具的某些带成功操作返回值的操作(比如Redis的setnx或者Memcache的add)去set一个mutex key,当操作返回成功时,再进行访问数据库的操作并回设缓存;否则,就重试整个get缓存的方法。

分布式锁:一个key只有一个线程允许访问数据库,其他线程等待。


缓存雪崩

缓存集中过期失效,redis缓存服务器宕机,访问量瞬间都会到数据库中,造成数据库压垮。

解决方案

  1. **redis高可用:**这个思想的含义是,既然redis有可能挂掉,那我多增设几台redis,这样一台挂掉之后其他的还可以继续工作,其实就是搭建的集群。

  2. **限流降级(HyStrix: 服务熔断,服务降级 – Dashboard流监控):**这个解决方案的思想是,在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对某个key只允许一个线程查询数据和写缓存,其他线程等待。

  3. **数据预热:**数据加热的含义就是在正式部署之前,我先把可能的数据先预先访问一遍,这样部分可能大量访问的数据就会加载到缓存中。在即将发生大并发访问前手动触发加载缓存不同的key,设置不同的过期时间,让缓存失效的时间点尽量均匀。

0%