当前位置: 首页 > news >正文

c做网站教程达州网站建设yufanse

c做网站教程,达州网站建设yufanse,商城二次开发,企业系统建设作者#xff1a;vivo 互联网服务器团队-Wang Fei Redis是一种基于客户端-服务端模型以及请求/响应的TCP服务。在遇到批处理命令执行时#xff0c;Redis提供了Pipelining(管道)来提升批处理性能。本文结合实践分析了Spring Boot框架下Redis的Lettuce客户端和Redisson客户端对P… 作者vivo 互联网服务器团队-Wang Fei Redis是一种基于客户端-服务端模型以及请求/响应的TCP服务。在遇到批处理命令执行时Redis提供了Pipelining(管道)来提升批处理性能。本文结合实践分析了Spring Boot框架下Redis的Lettuce客户端和Redisson客户端对Pipeline特性的支持原理并针对实践过程中遇到的问题进行了分析可以帮助开发者了解不同客户端对Pipeline支持原理及避免实际使用中出现问题。 一、前言 Redis 已经提供了像 mget 、mset 这种批量的命令但是某些操作根本就不支持或没有批量的操作从而与 Redis 高性能背道而驰。为此, Redis基于管道机制提供Redis Pipeline新特性。Redis Pipeline是一种通过一次性发送多条命令并在执行完后一次性将结果返回从而减少客户端与redis的通信次数来实现降低往返延时时间提升操作性能的技术。目前Redis Pipeline是被很多个版本的Redis 客户端所支持的。  二、Pipeline 底层原理分析 2.1 Redis单个命令执行基本步骤 Redis是一种基于客户端-服务端模型以及请求/响应的TCP服务。一次Redis客户端发起的请求经过服务端的响应后大致会经历如下的步骤 客户端发起一个查询/插入请求并监听socket返回通常情况都是阻塞模式等待Redis服务器的响应。 服务端处理命令并且返回处理结果给客户端。 客户端接收到服务的返回结果程序从阻塞代码处返回。 2.2 RTT 时间 Redis客户端和服务端之间通过网络连接进行数据传输数据包从客户端到达服务器并从服务器返回数据回复客户端的时间被称之为RTT(Round Trip Time - 往返时间)。我们可以很容易就意识到Redis在连续请求服务端时如果RTT时间为250ms, 即使Redis每秒能处理100k请求但也会因为网络传输花费大量时间导致每秒最多也只能处理4个请求导致整体性能的下降。 2.3 Redis Pipeline 为了提升效率这时候Pipeline出现了。Pipelining不仅仅能够降低RRT实际上它极大的提升了单次执行的操作数。这是因为如果不使用Pipelining那么每次执行单个命令,从访问数据的结构和服务端产生应答的角度它的成本是很低的。但是从执行网络IO的角度它的成本其实是很高的。其中涉及到read()和write()的系统调用这意味着需要从用户态切换到内核态,而这个上下文的切换成本是巨大的。 当使用Pipeline时它允许多个命令的读通过一次read()操作多个命令的应答使用一次write()操作它允许客户端可以一次发送多条命令而不等待上一条命令执行的结果。不仅减少了RTT同时也减少了IO调用次数IO调用涉及到用户态到内核态之间的切换最终提升程序的执行效率与性能。如下图 要支持Pipeline其实既要服务端的支持也要客户端支持。对于服务端来说所需要的是能够处理一个客户端通过同一个TCP连接发来的多个命令可以理解为这里将多个命令切分和处理单个命令一样Redis就是这样处理的。而客户端则是要将多个命令缓存起来缓冲区满了就发送然后再写缓冲最后才处理Redis的应答。 三、Pipeline 基本使用及性能比较 下面我们以给10w个set结构分别插入一个整数值为例分别使用jedis单个命令插入、jedis使用Pipeline模式进行插入和redisson使用Pipeline模式进行插入以及测试其耗时。 Slf4j public class RedisPipelineTestDemo {public static void main(String[] args) {//连接redisJedis jedis new Jedis(10.101.17.180, 6379);//jedis逐一给每个set新增一个valueString zSetKey Pipeline-test-set;int size 100000;long begin System.currentTimeMillis();for (int i 0; i size; i) {jedis.sadd(zSetKey i, aaa);}log.info(Jedis逐一给每个set新增一个value耗时{}ms, (System.currentTimeMillis() - begin));//Jedis使用Pipeline模式 Pipeline Pipeline jedis.Pipelined();begin System.currentTimeMillis();for (int i 0; i size; i) { Pipeline.sadd(zSetKey i, bbb);} Pipeline.sync();log.info(Jedis Pipeline模式耗时{}ms, (System.currentTimeMillis() - begin));//Redisson使用Pipeline模式Config config new Config();config.useSingleServer().setAddress(redis://10.101.17.180:6379);RedissonClient redisson Redisson.create(config);RBatch redisBatch redisson.createBatch();begin System.currentTimeMillis();for (int i 0; i size; i) {redisBatch.getSet(zSetKey i).addAsync(ccc);}redisBatch.execute();log.info(Redisson Pipeline模式耗时{}ms, (System.currentTimeMillis() - begin));//关闭 Pipeline.close();jedis.close();redisson.shutdown();} } 测试结果如下: Jedis逐一给每个set新增一个value耗时162655ms Jedis Pipeline模式耗时504ms Redisson Pipeline模式耗时1399ms 我们发现使用Pipeline模式对应的性能会明显好于单个命令执行的情况。 四、项目中实际应用 在实际使用过程中有这样一个场景很多应用在节假日的时候需要更新应用图标样式在运营进行后台配置的时候, 可以根据圈选的用户标签预先计算出单个用户需要下发的图标样式并存储在Redis里面从而提升性能这里就涉及Redis的批量操作问题业务流程如下 为了提升Redis操作性能我们决定使用Redis Pipelining机制进行批量执行。 4.1 Redis 客户端对比 针对Java技术栈而言目前Redis使用较多的客户端为Jedis、Lettuce和Redisson。 目前项目主要是基于SpringBoot开发针对Redis其默认的客户端为Lettuce所以我们基于Lettuce客户端进行分析。 4.2 Spring环境下Lettuce客户端对Pipeline的实现 在Spring环境下使用Redis的Pipeline也是很简单的。spring-data-redis提供了StringRedisTemplate简化了对Redis的操作,  只需要调用StringRedisTemplate的executePipelined方法就可以了但是在参数中提供了两种回调方式SessionCallback和RedisCallback。 两种使用方式如下(这里以操作set结构为例) RedisCallback的使用方式 public void testRedisCallback() {ListInteger ids Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);Integer contentId 1;redisTemplate.executePipelined(new InsertPipelineExecutionA(ids, contentId));}AllArgsConstructorprivate static class InsertPipelineExecutionA implements RedisCallbackVoid {private final ListInteger ids;private final Integer contentId;Overridepublic Void doInRedis(RedisConnection connection) DataAccessException {RedisSetCommands redisSetCommands connection.setCommands();ids.forEach(id- {String redisKey aaa: id;String value String.valueOf(contentId);redisSetCommands.sAdd(redisKey.getBytes(), value.getBytes());});return null;}} SessionCallback的使用方式 public void testSessionCallback() {ListInteger ids Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);Integer contentId 1;redisTemplate.executePipelined(new InsertPipelineExecutionB(ids, contentId));}AllArgsConstructorprivate static class InsertPipelineExecutionB implements SessionCallbackVoid {private final ListInteger ids;private final Integer contentId;Overridepublic K, V Void execute(RedisOperationsK, V operations) throws DataAccessException {SetOperationsString, String setOperations (SetOperationsString, String) operations.opsForSet();ids.forEach(id- {String redisKey aaa: id;String value String.valueOf(contentId);setOperations.add(redisKey, value);});return null;}} 4.3 RedisCallBack和SessionCallback之间的比较 1、RedisCallBack和SessionCallback都可以实现回调通过它们可以在同一条连接中一次执行多个redis命令。 2、RedisCallback使用的是原生RedisConnection用起来比较麻烦比如上面执行set的add操作key和value需要进行转换可读性差但原生api提供的功能比较齐全。 3、SessionCalback提供了良好的封装可以优先选择使用这种回调方式。 最终的代码实现如下 public void executeB(ListInteger userIds, Integer iconId) {redisTemplate.executePipelined(new InsertPipelineExecution(userIds, iconId)); }AllArgsConstructor private static class InsertPipelineExecution implements SessionCallbackVoid {private final ListInteger userIds;private final Integer iconId;Overridepublic K, V Void execute(RedisOperationsK, V operations) throws DataAccessException {SetOperationsString, String setOperations (SetOperationsString, String) operations.opsForSet();userIds.forEach(userId - {String redisKey aaa: userId;String value String.valueOf(iconId);setOperations.add(redisKey, value);});return null;} } 4.4 源码分析 那么为什么使用Pipeline方式会对性能有较大提升呢我们现在从源码入手着重分析一下 4.4.1 Pipeline方式下获取连接相关原理分析 Overridepublic ListObject executePipelined(SessionCallback? session, Nullable RedisSerializer? resultSerializer) {Assert.isTrue(initialized, template not initialized; call afterPropertiesSet() before using it);Assert.notNull(session, Callback object must not be null);//1. 获取对应的Redis连接工厂RedisConnectionFactory factory getRequiredConnectionFactory();//2. 绑定连接过程RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);try {//3. 执行命令流程, 这里请求参数为RedisCallback, 里面有对应的回调操作return execute((RedisCallbackListObject) connection - {//具体的回调逻辑connection.openPipeline();boolean PipelinedClosed false;try {//执行命令Object result executeSession(session);if (result ! null) {throw new InvalidDataAccessApiUsageException(Callback cannot return a non-null value as it gets overwritten by the Pipeline);}ListObject closePipeline connection.closePipeline(); PipelinedClosed true;return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer);} finally {if (!PipelinedClosed) {connection.closePipeline();}}});} finally {RedisConnectionUtils.unbindConnection(factory);}} ① 获取对应的Redis连接工厂这里要使用Pipeline特性需要使用LettuceConnectionFactory方式这里获取的连接工厂就是LettuceConnectionFactory。 ② 绑定连接过程具体指的是将当前连接绑定到当前线程上面 核心方法为doGetConnection。 public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind,boolean enableTransactionSupport) {Assert.notNull(factory, No RedisConnectionFactory specified);//核心类有缓存作用下次可以从这里获取已经存在的连接RedisConnectionHolder connHolder (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);//如果connHolder不为null, 则获取已经存在的连接, 提升性能if (connHolder ! null) {if (enableTransactionSupport) {potentiallyRegisterTransactionSynchronisation(connHolder, factory);}return connHolder.getConnection();}......//第一次获取连接需要从Redis连接工厂获取连接RedisConnection conn factory.getConnection();//bind true 执行绑定if (bind) {RedisConnection connectionToBind conn;......connHolder new RedisConnectionHolder(connectionToBind);//绑定核心代码: 将获取的连接和当前线程绑定起来TransactionSynchronizationManager.bindResource(factory, connHolder);......return connHolder.getConnection();}return conn;} 里面有个核心类RedisConnectionHolder我们看一下RedisConnectionHolder connHolder   (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory); Nullablepublic static Object getResource(Object key) {Object actualKey TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);Object value doGetResource(actualKey);if (value ! null logger.isTraceEnabled()) {logger.trace(Retrieved value [ value ] for key [ actualKey ] bound to thread [ Thread.currentThread().getName() ]);}return value;} 里面有一个核心方法doGetResource(actualKey)大家很容易猜测这里涉及到一个map结构如果我们看源码也确实是这样一个结构。 Nullableprivate static Object doGetResource(Object actualKey) {MapObject, Object map resources.get();if (map null) {return null;}Object value map.get(actualKey);// Transparently remove ResourceHolder that was marked as void...if (value instanceof ResourceHolder ((ResourceHolder) value).isVoid()) {map.remove(actualKey);// Remove entire ThreadLocal if empty...if (map.isEmpty()) {resources.remove();}value null;}return value;} resources是一个ThreadLocal类型这里会涉及到根据RedisConnectionFactory获取到连接connection的逻辑,如果下一次是同一个actualKey那么就直接使用已经存在的连接而不需要新建一个连接。第一次这里map为null就直接返回了然后回到doGetConnection方法由于这里bind为true我们会执行TransactionSynchronizationManager.bindResource(factory, connHolder);也就是将连接和当前线程绑定了起来。 public static void bindResource(Object key, Object value) throws IllegalStateException {Object actualKey TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);Assert.notNull(value, Value must not be null);MapObject, Object map resources.get();// set ThreadLocal Map if none foundif (map null) {map new HashMap();resources.set(map);}Object oldValue map.put(actualKey, value);......} ③ 我们回到executePipelined在获取到连接工厂将连接和当前线程绑定起来以后就开始需要正式去执行命令了, 这里会调用execute方法 Override Nullable public T T execute(RedisCallbackT action) {return execute(action, isExposeConnection()); } 这里我们注意到execute方法的入参为RedisCallbackTactionRedisCallback对应的doInRedis操作如下,这里在后面的调用过程中会涉及到回调。 connection.openPipeline(); boolean PipelinedClosed false; try {Object result executeSession(session);if (result ! null) {throw new InvalidDataAccessApiUsageException(Callback cannot return a non-null value as it gets overwritten by the Pipeline);}ListObject closePipeline connection.closePipeline(); PipelinedClosed true;return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer); } finally {if (!PipelinedClosed) {connection.closePipeline();} } 我们再来看execute(action, isExposeConnection())方法这里最终会调用Texecute(RedisCallbackTaction, boolean exposeConnection, boolean Pipeline)方法。 Nullable public T T execute(RedisCallbackT action, boolean exposeConnection, boolean Pipeline) {Assert.isTrue(initialized, template not initialized; call afterPropertiesSet() before using it);Assert.notNull(action, Callback object must not be null);//获取对应的连接工厂RedisConnectionFactory factory getRequiredConnectionFactory();RedisConnection conn null;try {if (enableTransactionSupport) {// only bind resources in case of potential transaction synchronizationconn RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);} else {//获取对应的连接(enableTransactionSupportfalse) conn RedisConnectionUtils.getConnection(factory);}boolean existingConnection TransactionSynchronizationManager.hasResource(factory);RedisConnection connToUse preProcessConnection(conn, existingConnection);boolean PipelineStatus connToUse.isPipelined();if (Pipeline !PipelineStatus) {connToUse.openPipeline();}RedisConnection connToExpose (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));//核心方法这里就开始执行回调操作T result action.doInRedis(connToExpose);// close Pipelineif (Pipeline !PipelineStatus) {connToUse.closePipeline();}// TODO: any other connection processing?return postProcessResult(result, connToUse, existingConnection);} finally {RedisConnectionUtils.releaseConnection(conn, factory, enableTransactionSupport);} } 我们看到这里最开始也是获取对应的连接工厂然后获取对应的连接(enableTransactionSupportfalse)具体调用是RedisConnectionUtils.getConnection(factory)方法最终会调用RedisConnection doGetConnection(RedisConnectionFactory factory, booleanallowCreate, boolean bind, boolean enableTransactionSupport)此时bind为false public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind,boolean enableTransactionSupport) {Assert.notNull(factory, No RedisConnectionFactory specified);//直接获取与当前线程绑定的Redis连接RedisConnectionHolder connHolder (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);if (connHolder ! null) {if (enableTransactionSupport) {potentiallyRegisterTransactionSynchronisation(connHolder, factory);}return connHolder.getConnection();}......return conn; } 前面我们分析过一次这里调用RedisConnectionHolder connHolder  (RedisConnectionHolder)TransactionSynchronizationManager.getResource(factory);会获取到之前和当前线程绑定的Redis而不会新创建一个连接。 然后会去执行T result action.doInRedis(connToExpose)这里的action为RedisCallback执行doInRedis为 //开启Pipeline功能 connection.openPipeline(); boolean PipelinedClosed false; try {//执行Redis命令Object result executeSession(session);if (result ! null) {throw new InvalidDataAccessApiUsageException(Callback cannot return a non-null value as it gets overwritten by the Pipeline);}ListObject closePipeline connection.closePipeline(); PipelinedClosed true;return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer); } finally {if (!PipelinedClosed) {connection.closePipeline();} } 这里最开始会开启Pipeline功能然后执行Object result executeSession(session); private Object executeSession(SessionCallback? session) {return session.execute(this); } 这里会调用我们自定义的execute方法 AllArgsConstructor private static class InsertPipelineExecution implements SessionCallbackVoid {private final ListInteger userIds;private final Integer iconId;Overridepublic K, V Void execute(RedisOperationsK, V operations) throws DataAccessException {SetOperationsString, String setOperations (SetOperationsString, String) operations.opsForSet();userIds.forEach(userId - {String redisKey aaa: userId;String value String.valueOf(iconId);setOperations.add(redisKey, value);});return null;} } 进入到foreach循环执行DefaultSetOperations的add方法。 Override public Long add(K key, V... values) {byte[] rawKey rawKey(key);byte[][] rawValues rawValues((Object[]) values);//这里的connection.sAdd是后续回调要执行的方法return execute(connection - connection.sAdd(rawKey, rawValues), true); } 这里会继续执行redisTemplate的execute方法里面最终会调用我们之前分析过的 TT execute(RedisCallbackTaction, boolean exposeConnection, boolean Pipeline)方法。 Nullable public T T execute(RedisCallbackT action, boolean exposeConnection, boolean Pipeline) {Assert.isTrue(initialized, template not initialized; call afterPropertiesSet() before using it);Assert.notNull(action, Callback object must not be null);RedisConnectionFactory factory getRequiredConnectionFactory();RedisConnection conn null;try {......//再次执行回调方法这里执行的Redis基本数据结构对应的操作命令T result action.doInRedis(connToExpose);......// TODO: any other connection processing?return postProcessResult(result, connToUse, existingConnection);} finally {RedisConnectionUtils.releaseConnection(conn, factory, enableTransactionSupport);} } 这里会继续执行T result  action.doInRedis(connToExpose);这里其实执行的doInRedis方法为 connection - connection.sAdd(rawKey, rawValues) 4.4.2 Pipeline方式下执行命令的流程分析: ① 接着上面的流程分析这里的sAdd方法实际调用的是DefaultStringRedisConnection的sAdd方法 Override public Long sAdd(byte[] key, byte[]... values) {return convertAndReturn(delegate.sAdd(key, values), identityConverter); } ② 这里会进一步调用DefaultedRedisConnection的sAdd方法 Override Deprecated default Long sAdd(byte[] key, byte[]... values) {return setCommands().sAdd(key, values); } ③ 接着调用LettuceSetCommands的sAdd方法 Override public Long sAdd(byte[] key, byte[]... values) {Assert.notNull(key, Key must not be null!);Assert.notNull(values, Values must not be null!);Assert.noNullElements(values, Values must not contain null elements!);try {// 如果开启了 Pipelined 模式获取的是 异步连接进行异步操作if (isPipelined()) { Pipeline(connection.newLettuceResult(getAsyncConnection().sadd(key, values)));return null;}if (isQueueing()) {transaction(connection.newLettuceResult(getAsyncConnection().sadd(key, values)));return null;}//常规模式下使用的是同步操作return getConnection().sadd(key, values);} catch (Exception ex) {throw convertLettuceAccessException(ex);} } 这里我们开启了Pipeline, 实际会调用Pipeline(connection.newLettuceResult(getAsyncConnection().sadd(key, values))); 也就是获取异步连接getAsyncConnection然后进行异步操作sadd而常规模式下使用的是同步操作所以在Pipeline模式下执行效率更高。 从上面的获取连接和具体命令执行相关源码分析可以得出使用Lettuce客户端Pipeline模式高效的根本原因 普通模式下每执行一个命令都需要先打开一个连接命令执行完毕以后又需要关闭这个连接执行下一个命令时又需要经过连接打开和关闭的流程而Pipeline的所有命令的执行只需要经过一次连接打开和关闭。 普通模式下命令的执行是同步阻塞模式而Pipeline模式下命令的执行是异步非阻塞模式。 五、项目中遇到的坑 前面介绍了涉及到批量操作可以使用Redis Pipelining机制那是不是任何批量操作相关的场景都可以使用呢比如list类型数据的批量移除操作我们的代码最开始是这么写的 public void deleteSet(String updateKey, SetInteger userIds) {if (CollectionUtils.isEmpty(userIds)) {return;}redisTemplate.executePipelined(new DeleteListCallBack(userIds, updateKey));}AllArgsConstructor private static class DeleteListCallBack implements SessionCallbackObject {private SetInteger userIds;private String updateKey;Overridepublic K, V Object execute(RedisOperationsK, V operations) throws DataAccessException {ListOperationsString, String listOperations (ListOperationsString, String) operations.opsForList();userIds.forEach(userId - listOperations.remove(updateKey, 1, userId.toString()));return null;} } 在数据量比较小的时候没有出现问题直到有一条收到了Redis的内存和cpu利用率的告警消息我们发现这么使用是有问题的核心原因在于list的lrem操作的时间复杂度是O(NM)其中N是list的长度, M是要移除的元素的个数而我们这里还是一个一个移除的当然会导致Redis数据积压和cpu每秒ops升高导致cpu利用率飚高。也就是说即使使用Pipeline进行批量操作但是由于单次操作很耗时是会导致整个Redis出现问题的。 后面我们进行了优化选用了list的ltrim命令一次命令执行批量remove操作 public void deleteSet(String updateKey, SetInteger deviceIds) {if (CollectionUtils.isEmpty(deviceIds)) {return;}int maxSize 10000;redisTemplate.opsForList().trim(updateKey, maxSize 1, -1);} 由于ltrim本身的时间复杂度为O(M), 其中M要移除的元素的个数相比于原始方案的lrem效率提升很多可以不需要使用Redis Pipeline优化结果使得Redis内存利用率和cpu利用率都极大程度得到缓解。 六、Redisson 对 Redis Pipeline 特性支持 在redisson官方文档中额外特性介绍中有说到批量命令执行这个特性 也就是多个命令在一次网络调用中集中发送该特性是RBatch这个类支持的从这个类的描述来看主要是为Redis Pipeline这个特性服务的并且主要是通过队列和异步实现的。 /*** Interface for using Redis Pipeline feature.* p* All method invocations on objects got through this interface* are batched to separate queue and could be executed later* with codeexecute()/code or codeexecuteAsync()/code methods.*** author Nikita Koksharov**/ public interface RBatch {/*** Returns stream instance by codename/code** param K type of key* param V type of value* param name of stream* return RStream object*/K, V RStreamAsyncK, V getStream(String name);/*** Returns stream instance by codename/code* using provided codecodec/code for entries.** param K type of key* param V type of value* param name - name of stream* param codec - codec for entry* return RStream object*/K, V RStreamAsyncK, V getStream(String name, Codec codec);....../*** Returns list instance by name.** param V type of object* param name - name of object* return List object*/V RListAsyncV getList(String name);V RListAsyncV getList(String name, Codec codec);....../*** Executes all operations accumulated during async methods invocations.* p* If cluster configuration used then operations are grouped by slot ids* and may be executed on different servers. Thus command execution order could be changed** return List with result object for each command* throws RedisException in case of any error**/BatchResult? execute() throws RedisException;/*** Executes all operations accumulated during async methods invocations asynchronously.* p* In cluster configurations operations grouped by slot ids* so may be executed on different servers. Thus command execution order could be changed** return List with result object for each command*/RFutureBatchResult? executeAsync();/*** Discard batched commands and release allocated buffers used for parameters encoding.*/void discard();/*** Discard batched commands and release allocated buffers used for parameters encoding.** return void*/RFutureVoid discardAsync();} 简单的测试代码如下 Slf4j public class RedisPipelineTest {public static void main(String[] args) {//Redisson使用Pipeline模式Config config new Config();config.useSingleServer().setAddress(redis://xx.xx.xx.xx:6379);RedissonClient redisson Redisson.create(config);RBatch redisBatch redisson.createBatch();int size 100000;String zSetKey Pipeline-test-set;long begin System.currentTimeMillis();//将命令放入队列中for (int i 0; i size; i) {redisBatch.getSet(zSetKey i).addAsync(ccc);}//批量执行命令redisBatch.execute();log.info(Redisson Pipeline模式耗时{}ms, (System.currentTimeMillis() - begin));//关闭redisson.shutdown();} } 核心方法分析 1.建Redisson客户端RedissonClient redisson redisson.create(config), 该方法最终会调用Reddison的构造方法Redisson(Config config)。 protected Redisson(Config config) {this.config config;Config configCopy new Config(config);connectionManager ConfigSupport.createConnectionManager(configCopy);RedissonObjectBuilder objectBuilder null;if (config.isReferenceEnabled()) {objectBuilder new RedissonObjectBuilder(this);}//新建异步命令执行器commandExecutor new CommandSyncService(connectionManager, objectBuilder);//执行删除超时任务的定时器evictionScheduler new EvictionScheduler(commandExecutor);writeBehindService new WriteBehindService(commandExecutor); } 该构造方法中会新建异步命名执行器CommandAsyncExecutor commandExecutor和用户删除超时任务的EvictionScheduler evictionScheduler。 2.创建RBatch实例RBatch redisBatch redisson.createBatch() 该方法会使用到步骤1中的commandExecutor和evictionScheduler实例对象。 Override public RBatch createBatch(BatchOptions options) {return new RedissonBatch(evictionScheduler, commandExecutor, options); }public RedissonBatch(EvictionScheduler evictionScheduler, CommandAsyncExecutor executor, BatchOptions options) {this.executorService new CommandBatchService(executor, options);this.evictionScheduler evictionScheduler; } 其中的options对象会影响后面批量执行命令的流程。 3. 异步给set集合添加元素的操作addAsync这里会具体调用RedissonSet的addAsync方法 Override public RFutureBoolean addAsync(V e) {String name getRawName(e);return commandExecutor.writeAsync(name, codec, RedisCommands.SADD_SINGLE, name, encode(e)); } 1接着调用CommandAsyncExecutor的异步写入方法writeAsync。 Override public T, R RFutureR writeAsync(String key, Codec codec, RedisCommandT command, Object... params) {RPromiseR mainPromise createPromise();NodeSource source getNodeSource(key);async(false, source, codec, command, params, mainPromise, false);return mainPromise; } 2 接着调用批量命令执行器CommandBatchService的异步发送命令。 Override public V, R void async(boolean readOnlyMode, NodeSource nodeSource,Codec codec, RedisCommandV command, Object[] params, RPromiseR mainPromise, boolean ignoreRedirect) {if (isRedisBasedQueue()) {boolean isReadOnly options.getExecutionMode() ExecutionMode.REDIS_READ_ATOMIC;RedisExecutorV, R executor new RedisQueuedBatchExecutor(isReadOnly, nodeSource, codec, command, params, mainPromise,false, connectionManager, objectBuilder, commands, connections, options, index, executed, latch, referenceType);executor.execute();} else {//执行分支RedisExecutorV, R executor new RedisBatchExecutor(readOnlyMode, nodeSource, codec, command, params, mainPromise,false, connectionManager, objectBuilder, commands, options, index, executed, referenceType);executor.execute();}} 3 接着调用了RedisBatchExecutor.execute方法和BaseRedisBatchExecutor.addBatchCommandData方法。 Override public void execute() {addBatchCommandData(params); }protected final void addBatchCommandData(Object[] batchParams) {MasterSlaveEntry msEntry getEntry(source);Entry entry commands.get(msEntry);if (entry null) {entry new Entry();Entry oldEntry commands.putIfAbsent(msEntry, entry);if (oldEntry ! null) {entry oldEntry;}}if (!readOnlyMode) {entry.setReadOnlyMode(false);}Codec codecToUse getCodec(codec);BatchCommandDataV, R commandData new BatchCommandDataV, R(mainPromise, codecToUse, command, batchParams, index.incrementAndGet());entry.getCommands().add(commandData); } 这里的commands以主节点为KEY,以待发送命令队列列表为VALUE(Entry)保存一个MAP.然后会把命令都添加到entry的commands命令队列中 Entry结构如下面代码所示。 public static class Entry {DequeBatchCommandData?, ? commands new LinkedBlockingDeque();volatile boolean readOnlyMode true;public DequeBatchCommandData?, ? getCommands() {return commands;}public void setReadOnlyMode(boolean readOnlyMode) {this.readOnlyMode readOnlyMode;}public boolean isReadOnlyMode() {return readOnlyMode;}public void clearErrors() {for (BatchCommandData?, ? commandEntry : commands) {commandEntry.clearError();}}} 4. 批量执行命令redisBatch.execute()这里会最终调用CommandBatchService的executeAsync方法该方法完整代码如下我们下面来逐一进行拆解。 public RFutureBatchResult? executeAsync() {......RPromiseBatchResult? promise new RedissonPromise();RPromiseVoid voidPromise new RedissonPromiseVoid();if (this.options.isSkipResult() this.options.getSyncSlaves() 0) {......} else {//这里是对异步执行结果进行处理,可以先忽略, 后面会详细讲先关注批量执行命令的逻辑voidPromise.onComplete((res, ex) - {......});}AtomicInteger slots new AtomicInteger(commands.size());......//真正执行的代码入口批量执行命令for (Map.EntryMasterSlaveEntry, Entry e : commands.entrySet()) {RedisCommonBatchExecutor executor new RedisCommonBatchExecutor(new NodeSource(e.getKey()), voidPromise,connectionManager, this.options, e.getValue(), slots, referenceType);executor.execute();}return promise;} 里面会用到我们在3.3步骤所生成的commands实例。 1接着调用了基类RedisExecutor的execute方法 public void execute() {......connectionFuture.onComplete((connection, e) - {if (connectionFuture.isCancelled()) {connectionManager.getShutdownLatch().release();return;}if (!connectionFuture.isSuccess()) {connectionManager.getShutdownLatch().release();exception convertException(connectionFuture);return;}//调用RedisCommonBatchExecutor的sendCommand方法, 里面会将多个命令放到一个ListCommandData?, ? list列表里面sendCommand(attemptPromise, connection);writeFuture.addListener(new ChannelFutureListener() {Overridepublic void operationComplete(ChannelFuture future) throws Exception {checkWriteFuture(writeFuture, attemptPromise, connection);}});});......} 2接着调用RedisCommonBatchExecutor的sendCommand方法里面会将多个命令放到一个Listcommanddata list列表里面。 Overrideprotected void sendCommand(RPromiseVoid attemptPromise, RedisConnection connection) {boolean isAtomic options.getExecutionMode() ! ExecutionMode.IN_MEMORY;boolean isQueued options.getExecutionMode() ExecutionMode.REDIS_READ_ATOMIC|| options.getExecutionMode() ExecutionMode.REDIS_WRITE_ATOMIC;//将多个命令放到一个ListCommandData?, ? list列表里面ListCommandData?, ? list new ArrayList(entry.getCommands().size());if (source.getRedirect() Redirect.ASK) {RPromiseVoid promise new RedissonPromiseVoid();list.add(new CommandDataVoid, Void(promise, StringCodec.INSTANCE, RedisCommands.ASKING, new Object[] {}));}for (CommandData?, ? c : entry.getCommands()) {if ((c.getPromise().isCancelled() || c.getPromise().isSuccess()) !isWaitCommand(c) !isAtomic) {// skip commandcontinue;}list.add(c);}......//调用RedisConnection的send方法将命令一次性发到Redis服务器端writeFuture connection.send(new CommandsData(attemptPromise, list, options.isSkipResult(), isAtomic, isQueued, options.getSyncSlaves() 0));} 3接着调用RedisConnection的send方法通过Netty通信发送命令到Redis服务器端执行这里也验证了Redisson客户端底层是采用Netty进行通信的。 public ChannelFuture send(CommandsData data) {return channel.writeAndFlush(data); } 5. 接收返回结果这里主要是监听事件是否完成然后组装返回结果, 核心方法是步骤4提到的CommandBatchService的executeAsync方法,里面会对返回结果进行监听和处理, 核心代码如下 public RFutureBatchResult? executeAsync() {......RPromiseBatchResult? promise new RedissonPromise();RPromiseVoid voidPromise new RedissonPromiseVoid();if (this.options.isSkipResult() this.options.getSyncSlaves() 0) {......} else {voidPromise.onComplete((res, ex) - {//对返回结果的处理executed.set(true);......ListObject responses new ArrayListObject(entries.size());int syncedSlaves 0;for (BatchCommandData?, ? commandEntry : entries) {if (isWaitCommand(commandEntry)) {syncedSlaves (Integer) commandEntry.getPromise().getNow();} else if (!commandEntry.getCommand().getName().equals(RedisCommands.MULTI.getName()) !commandEntry.getCommand().getName().equals(RedisCommands.EXEC.getName()) !this.options.isSkipResult()) {......//获取单个命令的执行结果Object entryResult commandEntry.getPromise().getNow();......//将单个命令执行结果放到List中responses.add(entryResult);}}BatchResultObject result new BatchResultObject(responses, syncedSlaves);promise.trySuccess(result);......});}......return promise; } 这里会把单个命令的执行结果放到responses里面最终返回RPromisebatchresultpromise。 从上面的分析来看Redisson客户端对Redis Pipeline的支持也是从多个命令在一次网络通信中执行和异步处理来实现的。 七、总结 Redis提供了Pipelining进行批量操作的高级特性极大地提高了部分数据类型没有批量执行命令导致的执行耗时而引起的性能问题但是我们在使用的过程中需要考虑Pipeline操作中单个命令执行的耗时问题否则带来的效果可能适得其反。最后扩展分析了Redisson客户端对Redis Pipeline特性的支持原理可以与Lettuce客户端对Redis Pipeline支持原理进行比较加深Pipeline在不同Redis客户端实现方式的理解。 参考资料 Redis Pipelining   RedisTemplate使用Pipeline管道命令  如何使用好Redis Pipeline Redisson 管道批量发送命令流程分析 
文章转载自:
http://www.morning.plznfnh.cn.gov.cn.plznfnh.cn
http://www.morning.nwljj.cn.gov.cn.nwljj.cn
http://www.morning.hprmg.cn.gov.cn.hprmg.cn
http://www.morning.wrqw.cn.gov.cn.wrqw.cn
http://www.morning.yzxhk.cn.gov.cn.yzxhk.cn
http://www.morning.wsgyq.cn.gov.cn.wsgyq.cn
http://www.morning.kaoshou.net.gov.cn.kaoshou.net
http://www.morning.pdmc.cn.gov.cn.pdmc.cn
http://www.morning.ztnmc.cn.gov.cn.ztnmc.cn
http://www.morning.pljxz.cn.gov.cn.pljxz.cn
http://www.morning.wrkhf.cn.gov.cn.wrkhf.cn
http://www.morning.dwgcx.cn.gov.cn.dwgcx.cn
http://www.morning.gbljq.cn.gov.cn.gbljq.cn
http://www.morning.wgdnd.cn.gov.cn.wgdnd.cn
http://www.morning.tbcfj.cn.gov.cn.tbcfj.cn
http://www.morning.gpryk.cn.gov.cn.gpryk.cn
http://www.morning.srnth.cn.gov.cn.srnth.cn
http://www.morning.xrmwc.cn.gov.cn.xrmwc.cn
http://www.morning.htjwz.cn.gov.cn.htjwz.cn
http://www.morning.jppb.cn.gov.cn.jppb.cn
http://www.morning.jopebe.cn.gov.cn.jopebe.cn
http://www.morning.bztzm.cn.gov.cn.bztzm.cn
http://www.morning.pbdnj.cn.gov.cn.pbdnj.cn
http://www.morning.zfqr.cn.gov.cn.zfqr.cn
http://www.morning.tpbhf.cn.gov.cn.tpbhf.cn
http://www.morning.zbhfs.cn.gov.cn.zbhfs.cn
http://www.morning.pqcbx.cn.gov.cn.pqcbx.cn
http://www.morning.fkwgk.cn.gov.cn.fkwgk.cn
http://www.morning.xwbld.cn.gov.cn.xwbld.cn
http://www.morning.bdypl.cn.gov.cn.bdypl.cn
http://www.morning.jksgy.cn.gov.cn.jksgy.cn
http://www.morning.nlhcb.cn.gov.cn.nlhcb.cn
http://www.morning.ctrkh.cn.gov.cn.ctrkh.cn
http://www.morning.bpmnx.cn.gov.cn.bpmnx.cn
http://www.morning.kzrbn.cn.gov.cn.kzrbn.cn
http://www.morning.rfwgg.cn.gov.cn.rfwgg.cn
http://www.morning.lmdfj.cn.gov.cn.lmdfj.cn
http://www.morning.lxjxl.cn.gov.cn.lxjxl.cn
http://www.morning.rsfp.cn.gov.cn.rsfp.cn
http://www.morning.dfltx.cn.gov.cn.dfltx.cn
http://www.morning.ltrz.cn.gov.cn.ltrz.cn
http://www.morning.hrkth.cn.gov.cn.hrkth.cn
http://www.morning.rbbgh.cn.gov.cn.rbbgh.cn
http://www.morning.wkjzt.cn.gov.cn.wkjzt.cn
http://www.morning.bswxt.cn.gov.cn.bswxt.cn
http://www.morning.frcxx.cn.gov.cn.frcxx.cn
http://www.morning.grtwn.cn.gov.cn.grtwn.cn
http://www.morning.qgmbx.cn.gov.cn.qgmbx.cn
http://www.morning.qlkjh.cn.gov.cn.qlkjh.cn
http://www.morning.khtyz.cn.gov.cn.khtyz.cn
http://www.morning.kxrhj.cn.gov.cn.kxrhj.cn
http://www.morning.wbns.cn.gov.cn.wbns.cn
http://www.morning.qnhpq.cn.gov.cn.qnhpq.cn
http://www.morning.fkmqg.cn.gov.cn.fkmqg.cn
http://www.morning.rnht.cn.gov.cn.rnht.cn
http://www.morning.rgyts.cn.gov.cn.rgyts.cn
http://www.morning.bfybb.cn.gov.cn.bfybb.cn
http://www.morning.nfqyk.cn.gov.cn.nfqyk.cn
http://www.morning.ftdlg.cn.gov.cn.ftdlg.cn
http://www.morning.fqssx.cn.gov.cn.fqssx.cn
http://www.morning.qwyms.cn.gov.cn.qwyms.cn
http://www.morning.wchsx.cn.gov.cn.wchsx.cn
http://www.morning.mwqbp.cn.gov.cn.mwqbp.cn
http://www.morning.ejknty.cn.gov.cn.ejknty.cn
http://www.morning.xtqld.cn.gov.cn.xtqld.cn
http://www.morning.tkgxg.cn.gov.cn.tkgxg.cn
http://www.morning.rmrcc.cn.gov.cn.rmrcc.cn
http://www.morning.bojkosvit.com.gov.cn.bojkosvit.com
http://www.morning.zmwd.cn.gov.cn.zmwd.cn
http://www.morning.mmqhq.cn.gov.cn.mmqhq.cn
http://www.morning.gbwfx.cn.gov.cn.gbwfx.cn
http://www.morning.kxqpm.cn.gov.cn.kxqpm.cn
http://www.morning.tqwcm.cn.gov.cn.tqwcm.cn
http://www.morning.wkqrp.cn.gov.cn.wkqrp.cn
http://www.morning.lkbyq.cn.gov.cn.lkbyq.cn
http://www.morning.gmswp.cn.gov.cn.gmswp.cn
http://www.morning.qmbgb.cn.gov.cn.qmbgb.cn
http://www.morning.fydsr.cn.gov.cn.fydsr.cn
http://www.morning.nhgkm.cn.gov.cn.nhgkm.cn
http://www.morning.wffxr.cn.gov.cn.wffxr.cn
http://www.tj-hxxt.cn/news/241940.html

相关文章:

  • 网站维护和建设工作范围门户cms系统
  • wap的网站开发项目外包
  • 网站推广怎么做比较好企业宣传手册封面模板
  • o2o网站建设行业现状个人做外贸网站违法吗
  • wordpress数据量大网站访问海南百度首页广告
  • 宜兴市网站建设wordpress 预览demo
  • 做网站总费用百度seo排名优化排行
  • 建个人网站赚钱吗建筑网建设通网站作用是什么意思
  • 网站栏目wordpress自动还原
  • 外贸网站如何建设电商网站开发公司哪家好
  • 购物网站开发的意义河南外贸网站建设
  • asp 制作网站开发重庆建站网站免费
  • 高级网站开发培训中国电信全渠道运营中心
  • 网站建设案例 星座广告平面设计图片
  • 怎么用新浪云做网站ps网上教程
  • 网站开发工程师 课程大纲如何做自己微网站
  • 承德建设银行网站做外贸一般看什么网站
  • 自适应网站是什么wordpress标题关键词
  • php网站建设管理教材海西州网站建设公司
  • 招商加盟网站推广方案红色主题ppt免费模板
  • 潍坊微信网站苏州seo专家教优化网站结构
  • 服装网站建设前景分析上海中高风险地区名单最新
  • php教育网站开发工作百度竞价电话
  • 四川建设人才考试网官方网站网架公司名字大全
  • 网站开发用什么语言开发的什么是网络设计与电子商务
  • 福州建设发展集团网站视频链接怎么wordpress
  • 手机商城官网济南网站搜索引擎优化
  • 网站建设服务那一个便宜wordpress图片压缩文件
  • 帝国和织梦哪个做网站好自己做的网站别人怎么上网找到
  • 优秀英文企业网站网店设计实训报告