这篇文章给大家分享的是有关redis如何通过pipeline提升吞吐量的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。

案例目标

简单介绍 redis pipeline 的机制,结合一段实例说明pipeline 在提升吞吐量方面发生的效用。

案例背景

应用系统在数据推送或事件处理过程中,往往出现数据流经过多个网元;

然而在某些服务中,数据操作对redis 是强依赖的,在最近的一次分析中发现:

一次数据推送会对 redis 产生近30次读写操作!

在数据推送业务中的性能压测中,以数据上报 -> 下发应答为一次事务;而对于这样的读写模型,redis 的操作过于频繁,很快便导致系统延时过高,吞吐量低下,无法满足目标;

优化过程 主要针对业务代码做的优化,其中redis 操作经过大量合并,最终降低到原来的1/5,而系统吞吐量也提升明显。

其中,redis pipeline(管道机制) 的应用是一个关键手段。

pipeline的解释

Pipeline指的是管道技术,指的是客户端允许将多个请求依次发给服务器,过程中而不需要等待请求的回复,在最后再一并读取结果即可。

管道技术使用广泛,例如许多POP3协议已经实现支持这个功能,大大加快了从服务器下载新邮件的过程。 Redis很早就支持管道(pipeline)技术。(因此无论你运行的是什么版本,你都可以使用管道(pipelining)操作Redis)

普通请求模型

[图-pipeline1]

Pipeline请求模型

[图-pipeline2]

从两个图的对比中可看出,普通的请求模型是同步的,每次请求对应一次IO操作等待;

而Pipeline 化之后所有的请求合并为一次IO,除了时延可以降低之外,还能大幅度提升系统吞吐量。

代码实例

说明

本地开启50个线程,每个线程完成1000个key的写入,对比pipeline开启及不开启两种场景下的性能表现。

相关常量

//并发任务privatestaticfinalinttaskCount=50;//pipeline大小privatestaticfinalintbatchSize=10;//每个任务处理命令数privatestaticfinalintcmdCount=1000;privatestaticfinalbooleanusePipeline=true;

初始化连接

JedisPoolConfigpoolConfig=newJedisPoolConfig();poolConfig.setMaxActive(200);poolConfig.setMaxIdle(100);poolConfig.setMaxWait(2000);poolConfig.setTestOnBorrow(false);poolConfig.setTestOnReturn(false);jedisPool=newJedisPool(poolConfig,host,port);

并发启动任务,统计执行时间

publicstaticvoidmain(String[]args)throwsInterruptedException{init();flushDB();longt1=System.currentTimeMillis();ExecutorServiceexecutor=Executors.newCachedThreadPool();CountDownLatchlatch=newCountDownLatch(taskCount);for(inti=0;i<taskCount;i++){executor.submit(newDemoTask(i,latch));}latch.await();executor.shutdownNow();longt2=System.currentTimeMillis();System.out.println("executionfinishtime(s):"+(t2-t1)/1000.0);}

DemoTask 封装了执行key写入的细节,区分不同场景

publicvoidrun(){logger.info("Task[{}]start.",id);try{if(usePipeline){runWithPipeline();}else{runWithNonPipeline();}}finally{latch.countDown();}logger.info("Task[{}]end.",id);}

不使用Pipeline的场景比较简单,循环执行set操作

for(inti=0;i<cmdCount;i++){Jedisjedis=get();try{jedis.set(key(i),UUID.randomUUID().toString());}finally{if(jedis!=null){jedisPool.returnResource(jedis);}}if(i%batchSize==0){logger.info("Task[{}]process--{}",id,i);}}

使用Pipeline,需要处理分段,如10个作为一批命令执行

for(inti=0;i<cmdCount;){Jedisjedis=get();try{Pipelinepipeline=jedis.pipelined();intj;for(j=0;j<batchSize;j++){if(i+j<cmdCount){pipeline.set(key(i+j),UUID.randomUUID().toString());}else{break;}}pipeline.sync();logger.info("Task[{}]pipeline--{}",id,i+j);i+=j;}finally{if(jedis!=null){jedisPool.returnResource(jedis);}}}

运行结果

不使用Pipeline,整体执行26s;而使用Pipeline优化后的代码,执行时间仅需要3s!

NoPipeline-stat

Pipeline-stat

注意事项

pipeline机制可以优化吞吐量,但无法提供原子性/事务保障,而这个可以通过Redis-Multi等命令实现。

感谢各位的阅读!关于“redis如何通过pipeline提升吞吐量”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!