Redis Pipelining 底层原理分析及实践
Redis是一种基于客户端-服务端模型以及请求/响应的TCP服务。在遇到批处理命令执行时,Redis提供了Pipelining(管道)来提升批处理性能。本文结合实践分析了Spring
Boot框架下Redis的Lettuce客户端和Redisson客户端对Pipeline特性的支持原理,并针对实践过程中遇到的问题进行了分析,可以帮助开发者了解不同客户端对Pipeline支持原理及避免实际使用中出现问题。
一、前言
Redis
已经提供了像 mget 、mset 这种批量的命令,但是某些操作根本就不支持或没有批量的操作,从而与 Redis 高性能背道而驰。为此,
Redis基于管道机制,提供Redis Pipeline新特性。Redis
Pipeline是一种通过一次性发送多条命令并在执行完后一次性将结果返回,从而减少客户端与redis的通信次数来实现降低往返延时时间提升操作性能的技术。目前,Redis
Pipeline是被很多个版本的Redis 客户端所支持的。
二、Pipeline 底层原理分析
2.1 Redis单个命令执行基本步骤
Redis是一种基于客户端-服务端模型以及请求/响应的TCP服务。一次Redis客户端发起的请求,经过服务端的响应后,大致会经历如下的步骤:
2.2 RTT 时间
Redis客户端和服务端之间通过网络连接进行数据传输,数据包从客户端到达服务器,并从服务器返回数据回复客户端的时间被称之为RTT(Round
Trip Time - 往返时间)。我们可以很容易就意识到,Redis在连续请求服务端时,如果RTT时间为250ms,
即使Redis每秒能处理100k请求,但也会因为网络传输花费大量时间,导致每秒最多也只能处理4个请求,导致整体性能的下降。
2.3 Redis Pipeline
为了提升效率,这时候Pipeline出现了。Pipelining不仅仅能够降低RRT,实际上它极大的提升了单次执行的操作数。这是因为如果不使用Pipelining,那么每次执行单个命令,从访问数据的结构和服务端产生应答的角度,它的成本是很低的。但是从执行网络IO的角度,它的成本其实是很高的。其中涉及到read()和write()的系统调用,这意味着需要从用户态切换到内核态,而这个上下文的切换成本是巨大的。
当使用Pipeline时,它允许多个命令的读通过一次read()操作,多个命令的应答使用一次write()操作,它允许客户端可以一次发送多条命令,而不等待上一条命令执行的结果。不仅减少了RTT,同时也减少了IO调用次数(IO调用涉及到用户态到内核态之间的切换),最终提升程序的执行效率与性能。如下图:
要支持Pipeline,其实既要服务端的支持,也要客户端支持。对于服务端来说,所需要的是能够处理一个客户端通过同一个TCP连接发来的多个命令,可以理解为,这里将多个命令切分,和处理单个命令一样,Redis就是这样处理的。而客户端,则是要将多个命令缓存起来,缓冲区满了就发送,然后再写缓冲,最后才处理Redis的应答。
三、Pipeline 基本使用及性能比较
下面我们以给10w个set结构分别插入一个整数值为例,分别使用jedis单个命令插入、jedis使用Pipeline模式进行插入和redisson使用Pipeline模式进行插入以及测试其耗时。
@Slf4j
public class RedisPipelineTestDemo {
public static void main(String[] args) {
//连接redis
Jedis jedis = new Jedis("10.101.17.180", 6379);
//jedis逐一给每个set新增一个value
String zSetKey = "Pipeline-test-set";
int size = 100000;
long begin = System.currentTimeMillis();
for (int i = 0; i < size; i++) {
jedis.sadd(zSetKey + i, "aaa");
}
log.info("Jedis逐一给每个set新增一个value耗时:{}ms", (System.currentTimeMillis() - begin));
//Jedis使用Pipeline模式 Pipeline Pipeline = jedis.Pipelined();
begin = System.currentTimeMillis();
for (int i = 0; i < size; i++) { Pipeline.sadd(zSetKey + i, "bbb");
} Pipeline.sync();
log.info("Jedis Pipeline模式耗时:{}ms", (System.currentTimeMillis() - begin));
//Redisson使用Pipeline模式
Config config = new Config();
config.useSingleServer().setAddress("redis://10.101.17.180:6379");
RedissonClient redisson = Redisson.create(config);
RBatch redisBatch = redisson.createBatch();
begin = System.currentTimeMillis();
for (int i = 0; i < size; i++) {
redisBatch.getSet(zSetKey + i).addAsync("ccc");
}
redisBatch.execute();
log.info("Redisson Pipeline模式耗时:{}ms", (System.currentTimeMillis() - begin));
//关闭 Pipeline.close();
jedis.close();
redisson.shutdown();
}
}
测试结果如下:
Jedis逐一给每个set新增一个value耗时:162655ms
Jedis Pipeline模式耗时:504ms
Redisson Pipeline模式耗时:1399ms
我们发现使用Pipeline模式对应的性能会明显好于单个命令执行的情况。
四、项目中实际应用
在实际使用过程中有这样一个场景,很多应用在节假日的时候需要更新应用图标样式,在运营进行后台配置的时候, 可以根据圈选的用户标签预先计算出单个用户需要下发的图标样式并存储在Redis里面,从而提升性能,这里就涉及Redis的批量操作问题,业务流程如下:
为了提升Redis操作性能,我们决定使用Redis Pipelining机制进行批量执行。
4.1 Redis 客户端对比
针对Java技术栈而言,目前Redis使用较多的客户端为Jedis、Lettuce和Redisson。
目前项目主要是基于SpringBoot开发,针对Redis,其默认的客户端为Lettuce,所以我们基于Lettuce客户端进行分析。
4.2 Spring环境下Lettuce客户端对Pipeline的实现
在Spring环境下,使用Redis的Pipeline也是很简单的。spring-data-redis提供了
StringRedisTemplate简化了对Redis的操作, 只需要调用StringRedisTemplate的executePipelined方法就可以了,但是在参数中提供了两种回调方式:SessionCallback和RedisCallback。
两种使用方式如下(这里以操作set结构为例):
RedisCallback的使用方式:
public void testRedisCallback() {
List ids= Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
Integer contentId = 1;
redisTemplate.executePipelined(new InsertPipelineExecutionA(ids, contentId));
}
@AllArgsConstructor
private static class InsertPipelineExecutionA implements RedisCallback {
private final List ids;
private final Integer contentId;
@Override
public Void doInRedis(RedisConnection connection) DataAccessException {
RedisSetCommands redisSetCommands = connection.setCommands();
ids.forEach(id-> {
String redisKey = "aaa:" + id;
String value = String.valueOf(contentId);
redisSetCommands.sAdd(redisKey.getBytes(), value.getBytes());
});
return null;
}
}
SessionCallback的使用方式:
public void testSessionCallback() {
List ids= Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
Integer contentId = 1;
redisTemplate.executePipelined(new InsertPipelineExecutionB(ids, contentId));
}
@AllArgsConstructor
private static class InsertPipelineExecutionB implements SessionCallback {
private final List ids;
private final Integer contentId;
@Override
public Void execute(RedisOperations operations) throws DataAccessException {
SetOperations setOperations = (SetOperations) operations.opsForSet();
ids.forEach(id-> {
String redisKey = "aaa:" + id;
String value = String.valueOf(contentId);
setOperations.add(redisKey, value);
});
return null;
}
}
4.3 RedisCallBack和SessionCallback之间的比较
1、RedisCallBack和SessionCallback都可以实现回调,通过它们可以在同一条连接中一次执行多个redis命令。
2、RedisCallback使用的是原生
RedisConnection,用起来比较麻烦,比如上面执行set的add操作,key和value需要进行转换,可读性差,但原生api提供的功能比较齐全。
3、SessionCalback提供了良好的封装,可以优先选择使用这种回调方式。
最终的代码实现如下:
public void executeB(List userIds, Integer iconId) {
redisTemplate.executePipelined(new InsertPipelineExecution(userIds, iconId));
}
@AllArgsConstructor
private static class InsertPipelineExecution implements SessionCallback {
private final List userIds;
private final Integer iconId;
@Override
public Void execute(RedisOperations operations) throws DataAccessException {
SetOperations setOperations = (SetOperations) operations.opsForSet();
userIds.forEach(userId -> {
String redisKey = "aaa:" + userId;
String value = String.valueOf(iconId);
setOperations.add(redisKey, value);
});
return null;
}
}
4.4 源码分析
那么为什么使用Pipeline方式会对性能有较大提升呢,我们现在从源码入手着重分析一下:
4.4.1 Pipeline方式下获取连接相关原理分析:
@Override
public List executePipelined(SessionCallback session, @Nullable RedisSerializer resultSerializer) {
Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
Assert.notNull(session, "Callback object must not be null");
//1. 获取对应的Redis连接工厂
RedisConnectionFactory factory = getRequiredConnectionFactory();
//2. 绑定连接过程
RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
try {
//3. 执行命令流程, 这里请求参数为RedisCallback, 里面有对应的回调操作
return execute((RedisCallback) connection -> {
//具体的回调逻辑
connection.openPipeline();
boolean PipelinedClosed = false;
try {
//执行命令
Object result = executeSession(session);
if (result != null) {
throw new InvalidDataAccessApiUsageException(
"Callback cannot return a non-null value as it gets overwritten by the Pipeline");
}
List closePipeline = connection.closePipeline(); PipelinedClosed = true;
return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer);
} finally {
if (!PipelinedClosed) {
connection.closePipeline();
}
}
});
} finally {
RedisConnectionUtils.unbindConnection(factory);
}
}
① 获取对应的Redis连接工厂,这里要使用Pipeline特性需要使用
LettuceConnectionFactory方式,这里获取的连接工厂就是LettuceConnectionFactory。
② 绑定连接过程,具体指的是将当前连接绑定到当前线程上面, 核心方法为:doGetConnection。
public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind,
boolean enableTransactionSupport) {
Assert.notNull(factory, "No RedisConnectionFactory specified");
//核心类,有缓存作用,下次可以从这里获取已经存在的连接
RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);
//如果connHolder不为null, 则获取已经存在的连接, 提升性能
if (connHolder != null) {
if (enableTransactionSupport) {
potentiallyRegisterTransactionSynchronisation(connHolder, factory);
}
return connHolder.getConnection();
}
......
//第一次获取连接,需要从Redis连接工厂获取连接
RedisConnection conn = factory.getConnection();
//bind = true 执行绑定
if (bind) {
RedisConnection connectionToBind = conn;
......
connHolder = new RedisConnectionHolder(connectionToBind);
//绑定核心代码: 将获取的连接和当前线程绑定起来
TransactionSynchronizationManager.bindResource(factory, connHolder);
......
return connHolder.getConnection();
}
return conn;
}
里面有个核心类RedisConnectionHolder,我们看一下
RedisConnectionHolder connHolder =
(RedisConnectionHolder)
TransactionSynchronizationManager.getResource(factory);
@Nullable
public static Object getResource(Object key) {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Object value = doGetResource(actualKey);
if (value != null && logger.isTraceEnabled()) {
logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
Thread.currentThread().getName() + "]");
}
return value;
}
里面有一个核心方法doGetResource
(actualKey),大家很容易猜测这里涉及到一个map结构,如果我们看源码,也确实是这样一个结构。
@Nullable
private static Object doGetResource(Object actualKey) {
Map map = resources.get();
if (map == null) {
return null;
}
Object value = map.get(actualKey);
// Transparently remove ResourceHolder that was marked as void...
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
map.remove(actualKey);
// Remove entire ThreadLocal if empty...
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}
resources是一个ThreadLocal类型,这里会涉及到根据RedisConnectionFactory获取到连接connection的逻辑,如果下一次是同一个actualKey,那么就直接使用已经存在的连接,而不需要新建一个连接。第一次这里map为null,就直接返回了,然后回到doGetConnection方法,由于这里bind为true,我们会执行TransactionSynchronizationManager.bindResource(factory,
connHolder);,也就是将连接和当前线程绑定了起来。
public static void bindResource(Object key, Object value) throws IllegalStateException {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Assert.notNull(value, "Value must not be null");
Map map = resources.get();
// set ThreadLocal Map if none found
if (map == null) {
map = new HashMap();
resources.set(map);
}
Object oldValue = map.put(actualKey, value);
......
}
③ 我们回到executePipelined,在获取到连接工厂,将连接和当前线程绑定起来以后,就开始需要正式去执行命令了, 这里会调用execute方法
@Override
@Nullable
public T execute(RedisCallback action) {
return execute(action, isExposeConnection());
}
这里我们注意到execute方法的入参为RedisCallbackaction,RedisCallback对应的doInRedis操作如下,这里在后面的调用过程中会涉及到回调。
connection.openPipeline();
boolean PipelinedClosed = false;
try {
Object result = executeSession(session);
if (result != null) {
throw new InvalidDataAccessApiUsageException(
"Callback cannot return a non-null value as it gets overwritten by the Pipeline");
}
List closePipeline = connection.closePipeline(); PipelinedClosed = true;
return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer);
} finally {
if (!PipelinedClosed) {
connection.closePipeline();
}
}
我们再来看execute(action,
isExposeConnection())方法,这里最终会调用
execute(RedisCallbackaction, boolean exposeConnection, boolean Pipeline)方法。
@Nullable
public T execute(RedisCallback action, boolean exposeConnection, boolean Pipeline) {
Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
Assert.notNull(action, "Callback object must not be null");
//获取对应的连接工厂
RedisConnectionFactory factory = getRequiredConnectionFactory();
RedisConnection conn = null;
try {
if (enableTransactionSupport) {
// only bind resources in case of potential transaction synchronization
conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
} else {
//获取对应的连接(enableTransactionSupport=false)
conn = RedisConnectionUtils.getConnection(factory);
}
boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);
RedisConnection connToUse = preProcessConnection(conn, existingConnection);
boolean PipelineStatus = connToUse.isPipelined();
if (Pipeline && !PipelineStatus) {
connToUse.openPipeline();
}
RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
//核心方法,这里就开始执行回调操作
T result = action.doInRedis(connToExpose);
// close Pipeline
if (Pipeline && !PipelineStatus) {
connToUse.closePipeline();
}
// TODO: any other connection processing?
return postProcessResult(result, connToUse, existingConnection);
} finally {
RedisConnectionUtils.releaseConnection(conn, factory, enableTransactionSupport);
}
}
我们看到这里最开始也是获取对应的连接工厂,然后获取对应的连接
(enableTransactionSupport=false),具体调用是
RedisConnectionUtils.getConnection(factory)方法,最终会调用
RedisConnection
doGetConnection(RedisConnectionFactory factory, boolean allowCreate,
boolean bind, boolean enableTransactionSupport),此时bind为false
public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind,
boolean enableTransactionSupport) {
Assert.notNull(factory, "No RedisConnectionFactory specified");
//直接获取与当前线程绑定的Redis连接
RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);
if (connHolder != null) {
if (enableTransactionSupport) {
potentiallyRegisterTransactionSynchronisation(connHolder, factory);
}
return connHolder.getConnection();
}
......
return conn;
}
前面我们分析过一次,这里调用
RedisConnectionHolder connHolder =
(RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);会获取到之前和当前线程绑定的Redis,而不会新创建一个连接。
然后会去执行T result = action.
doInRedis(connToExpose),这里的action为RedisCallback,执行doInRedis为:
//开启Pipeline功能
connection.openPipeline();
boolean PipelinedClosed = false;
try {
//执行Redis命令
Object result = executeSession(session);
if (result != null) {
throw new InvalidDataAccessApiUsageException(
"Callback cannot return a non-null value as it gets overwritten by the Pipeline");
}
List closePipeline = connection.closePipeline(); PipelinedClosed = true;
return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer);
} finally {
if (!PipelinedClosed) {
connection.closePipeline();
}
}
这里最开始会开启Pipeline功能,然后执行
Object result = executeSession(session);
private Object executeSession(SessionCallback session) {
return session.execute(this);
}
这里会调用我们自定义的execute方法
@AllArgsConstructor
private static class InsertPipelineExecution implements SessionCallback {
private final List userIds;
private final Integer iconId;
@Override
public Void execute(RedisOperations operations) throws DataAccessException {
SetOperations setOperations = (SetOperations) operations.opsForSet();
userIds.forEach(userId -> {
String redisKey = "aaa:" + userId;
String value = String.valueOf(iconId);
setOperations.add(redisKey, value);
});
return null;
}
}
进入到foreach循环,执行DefaultSetOperations的add方法。
@Override
public Long add(K key, V... values) {
byte[] rawKey = rawKey(key);
byte[][] rawValues = rawValues((Object[]) values);
//这里的connection.sAdd是后续回调要执行的方法
return execute(connection -> connection.sAdd(rawKey, rawValues), true);
}
这里会继续执行redisTemplate的execute方法,里面最终会调用我们之前分析过的T
execute(RedisCallbackaction, boolean exposeConnection, boolean
Pipeline)方法。
@Nullable
public T execute(RedisCallback action, boolean exposeConnection, boolean Pipeline) {
Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
Assert.notNull(action, "Callback object must not be null");
RedisConnectionFactory factory = getRequiredConnectionFactory();
RedisConnection conn = null;
try {
......
//再次执行回调方法,这里执行的Redis基本数据结构对应的操作命令
T result = action.doInRedis(connToExpose);
......
// TODO: any other connection processing?
return postProcessResult(result, connToUse, existingConnection);
} finally {
RedisConnectionUtils.releaseConnection(conn, factory, enableTransactionSupport);
}
}
这里会继续执行T result =
action.doInRedis(connToExpose);,这里其实执行的doInRedis方法为:
connection -> connection.sAdd(rawKey, rawValues)
4.4.2 Pipeline方式下执行命令的流程分析:
① 接着上面的流程分析,这里的sAdd方法实际调用的是DefaultStringRedisConnection的sAdd方法
@Override
public Long sAdd(byte[] key, byte[]... values) {
return convertAndReturn(delegate.sAdd(key, values), identityConverter);
}
② 这里会进一步调用
DefaultedRedisConnection的sAdd方法
@Override
@Deprecated
default Long sAdd(byte[] key, byte[]... values) {
return setCommands().sAdd(key, values);
}
③ 接着调用LettuceSetCommands的sAdd方法
@Override
public Long sAdd(byte[] key, byte[]... values) {
Assert.notNull(key, "Key must not be null!");
Assert.notNull(values, "Values must not be null!");
Assert.noNullElements(values, "Values must not contain null elements!");
try {
// 如果开启了 Pipelined 模式,获取的是 异步连接,进行异步操作
if (isPipelined()) { Pipeline(connection.newLettuceResult(getAsyncConnection().sadd(key, values)));
return null;
}
if (isQueueing()) {
transaction(connection.newLettuceResult(getAsyncConnection().sadd(key, values)));
return null;
}
//常规模式下,使用的是同步操作
return getConnection().sadd(key, values);
} catch (Exception ex) {
throw convertLettuceAccessException(ex);
}
}
这里我们开启了Pipeline, 实际会调用
Pipeline(connection.newLettuceResult(getAsyncConnection().sadd(key,
values)));
也就是获取异步连接getAsyncConnection,然后进行异步操作sadd,而常规模式下,使用的是同步操作,所以在Pipeline模式下,执行效率更高。
从上面的获取连接和具体命令执行相关源码分析可以得出使用Lettuce客户端Pipeline模式高效的根本原因:
五、项目中遇到的坑
前面介绍了涉及到批量操作,可以使用Redis Pipelining机制,那是不是任何批量操作相关的场景都可以使用呢,比如list类型数据的批量移除操作,我们的代码最开始是这么写的:
public void deleteSet(String updateKey, Set userIds) {
if (CollectionUtils.isEmpty(userIds)) {
return;
}
redisTemplate.executePipelined(new DeleteListCallBack(userIds, updateKey));
}
@AllArgsConstructor
private static class DeleteListCallBack implements SessionCallback {
private Set userIds;
private String updateKey;
@Override
public Object execute(RedisOperations operations) throws DataAccessException {
ListOperations listOperations = (ListOperations) operations.opsForList();
userIds.forEach(userId -> listOperations.remove(updateKey, 1, userId.toString()));
return null;
}
}
在数据量比较小的时候没有出现问题,直到有一条收到了Redis的内存和cpu利用率的告警消息,我们发现这么使用是有问题的,核心原因在于list的lrem操作的时间复杂度是O(N+M),其中N是list的长度,
M是要移除的元素的个数,而我们这里还是一个一个移除的,当然会导致Redis数据积压和cpu每秒ops升高导致cpu利用率飚高。也就是说,即使使用Pipeline进行批量操作,但是由于单次操作很耗时,是会导致整个Redis出现问题的。
后面我们进行了优化,选用了list的ltrim命令,一次命令执行批量remove操作:
public void deleteSet(String updateKey, Set deviceIds) {
if (CollectionUtils.isEmpty(deviceIds)) {
return;
}
int maxSize = 10000;
redisTemplate.opsForList().trim(updateKey, maxSize + 1, -1);
}
由于ltrim本身的时间复杂度为O(M), 其中M要移除的元素的个数,相比于原始方案的lrem,效率提升很多,可以不需要使用Redis Pipeline,优化结果使得Redis内存利用率和cpu利用率都极大程度得到缓解。
六、Redisson 对 Redis Pipeline 特性支持
在redisson官方文档中额外特性介绍中有说到批量命令执行这个特性, 也就是多个命令在一次网络调用中集中发送,该特性是RBatch这个类支持的,从这个类的描述来看,主要是为Redis Pipeline这个特性服务的,并且主要是通过队列和异步实现的。
/**
* Interface for using Redis Pipeline feature.
*
* All method invocations on objects got through this interface
* are batched to separate queue and could be executed later
* with execute()
or executeAsync()
methods.
*
*
* @author Nikita Koksharov
*
*/
public interface RBatch {
/**
* Returns stream instance by name
*
* @param type of key
* @param type of value
* @param name of stream
* @return RStream object
*/
RStreamAsync getStream(String name);
/**
* Returns stream instance by name
* using provided codec
for entries.
*
* @param type of key
* @param type of value
* @param name - name of stream
* @param codec - codec for entry
* @return RStream object
*/
RStreamAsync getStream(String name, Codec codec);
......
/**
* Returns list instance by name.
*
* @param type of object
* @param name - name of object
* @return List object
*/
RListAsync getList(String name);
RListAsync getList(String name, Codec codec);
......
/**
* Executes all operations accumulated during async methods invocations.
*
* If cluster configuration used then operations are grouped by slot ids
* and may be executed on different servers. Thus command execution order could be changed
*
* @return List with result object for each command
* @throws RedisException in case of any error
*
*/
BatchResult execute() throws RedisException;
/**
* Executes all operations accumulated during async methods invocations asynchronously.
*
* In cluster configurations operations grouped by slot ids
* so may be executed on different servers. Thus command execution order could be changed
*
* @return List with result object for each command
*/
RFuture> commands = new LinkedBlockingDeque();
volatile boolean readOnlyMode = true;
public Deque> executeAsync() {
......
RPromise> list列表里面
List> executeAsync() {
......
RPromise