1 本文以Redis-Cluster为例子,实际使用中Redis-Sentinel和Redis Standalone也是一样的。
一、现有问题
由于Redis本身的一些特性(例如复制)以及使用场景,造成Redis不太适合部署在不同的机房,所以通常来看Redis集群都是在同一个机房部署的。虽然Redis集群自身已经具备了高可用的特性,即使几个Redis节点异常或者挂掉,Redis Cluster也会实现故障自动转移,对应用方来说也可以在很短时间内恢复故障。但是如果发生了机房故障(断电、断网等极端情况),如果应用方降级或者容错机制做的不好甚至业务本身不能降级,或者会丢失重要数据,或者可能瞬间会跑满应用的线程池造成服务不可用,对于一些重要的服务来说是非常致命的。
为了应对像机房故障这类情况,保证应用方在这种极端情况下,仍然可以正常服务(系统正常运行、数据正常),所以需要给出一个Redis跨机房的方案。
二、实现思路和目标:
1.思路
- 使用CacheCloud开通两个位于两个不同机房的Redis-Cluster集群(例如:兆维、北显):一个叫major,作为主Redis服务,一个叫minor,作为备用Redis服务。
- 开发定制版的客户端,利用netflix的hystrix组件能够解除依赖隔离的特性,在major出现故障时,将故障隔离,并将请求自动转发到minor,并且对于应用的主线程池没有影响。(有关hystrix的请求流程流程见下图,有关hystrix使用请参考:http://hot66hot.iteye.com/blog/2155036
2.实现目标:
- 客户端易接入,如同使用Jedis API一样。
- 真正实现跨机房的故障转移。
- 依赖隔离,也就是说即使Redis出现问题,也不会影响主线程池。
- 读取数据正常。
- 写数据尽可能一致。
- 更多的故障转移可配置参数(hystrix):例如隔离线程池大小,超时等
- 暴露相关统计数据和报表:如jmx和hystrix-dashboard
三、实施:
- 1.利用hystrix能够隔离依赖的特性,为major和minor分别放到不同的线程池中(与应用的主线程池隔离)
-
2.客户端接口和初始化方法:由于是定制化客户端,所以暂时没有通用的方法,所有的API需要自己实现。
1 public interface RedisCrossRoomClient { 2 3 String set(String key, String value); 4 5 String get(String key); 6 7 }
初始化方法,需要传入两个初始化好的PipeLineCluster
1 PipeLineCluster是我们内部对于JedisCluster的扩展,这里看成JedisCluster即可。
1 public class RedisClusterCrossRoomClientImpl implements RedisCrossRoomClient {
2 private Logger logger = LoggerFactory.getLogger(RedisClusterCrossRoomClientImpl.class);
3 /**
4 * 主
5 */
6 private PipelineCluster majorPipelineCluster;
7 /**
8 * 备
9 */
10 private PipelineCluster minorPipelineCluster;
11
12 public RedisClusterCrossRoomClientImpl(PipelineCluster majorPipelineCluster, PipelineCluster minorPipelineCluster) {
13 this.majorPipelineCluster = majorPipelineCluster;
14 this.minorPipelineCluster = minorPipelineCluster;
15 }
16 }
-
3.读操作方案:如下图,正常run指向到major, 异常(2.1图中所有指向getFallback)指向到minor。
例如:正常情况下都是从majorPipelineCluster读取数据,当出现非正常情况时(hystrix阀门开启、线程池拒绝、超时、异常)等情况时,走minorPipelineCluster的逻辑
基础类
1 public class BaseCommand { 2 protected final Logger logger = LoggerFactory.getLogger(this.getClass()); 3 4 /** 5 * hystrix参数,例如超时、线程池、关门策略、开门策略等等。 6 */ 7 8 protected static final String MAJOR_READ_COMMAND_KEY = "major_read_command"; 9 protected static final String MAJOR_WRITE_COMMAND_KEY = "major_write_command"; 10 protected static final String MAJOR_GROUP_KEY = "major_redis_group"; 11 protected static final String MAJOR_THREAD_POOL_KEY = "major_redis_pool"; 12 public static int majorTimeOut = 1000; 13 public static int majorThreads = 100; 14 15 /** 16 * hystrix参数,例如超时、线程池、关门策略、开门策略等等。 17 */ 18 protected static final String MINOR_READ_COMMAND_KEY = "minor_read_command"; 19 protected static final String MINOR_WRITE_COMMAND_KEY = "minor_write_command"; 20 protected static final String MINOR_GROUP_KEY = "minor_redis_group"; 21 protected static final String MINOR_THREAD_POOL_KEY = "minor_redis_pool"; 22 public static int minorTimeOut = 1000; 23 public static int minorThreads = 100; 24 25 }
读命令类
1 public abstract class ReadCommand<T> extends BaseCommand { 2 3 protected abstract T readMajor(); 4 5 protected abstract T readMinor(); 6 7 public T read() { 8 // 1.收集总数 9 RedisCrossRoomClientStatusCollector.collectCrossRoomStatus(HystrixStatCountTypeEnum.ALL); 10 11 DataComponentCommand<T> majorCommand = 12 new DataComponentCommand<T>(MAJOR_READ_COMMAND_KEY, MAJOR_GROUP_KEY, MAJOR_THREAD_POOL_KEY, 13 majorTimeOut, majorThreads) { 14 @Override 15 protected T run() throws Exception { 16 // 2.收集run总数 17 RedisCrossRoomClientStatusCollector.collectCrossRoomStatus(HystrixStatCountTypeEnum.RUN); 18 return readMajor(); 19 } 20 21 @Override 22 public T getBusinessFallback() { 23 // 3.收集fallback总数 24 RedisCrossRoomClientStatusCollector.collectCrossRoomStatus(HystrixStatCountTypeEnum.FALLBACK_ALL); 25 26 RedisCrossRoomHystrixStat.counterFallBack(MAJOR_READ_COMMAND_KEY); 27 return new DataComponentCommand<T>(MINOR_READ_COMMAND_KEY, MINOR_GROUP_KEY, MINOR_THREAD_POOL_KEY, 28 minorTimeOut, minorThreads) { 29 @Override 30 protected T run() throws Exception { 31 // 4.收集fallback-run总数 32 RedisCrossRoomClientStatusCollector.collectCrossRoomStatus(HystrixStatCountTypeEnum.FALLBACK_RUN); 33 return readMinor(); 34 } 35 36 @Override 37 public T getBusinessFallback() throws RedisCrossRoomReadMinorFallbackException { 38 // 5.收集fallback-fallback总数 39 RedisCrossRoomClientStatusCollector.collectCrossRoomStatus(HystrixStatCountTypeEnum.FALLBACK_FALLBACK); 40 throw new RedisCrossRoomReadMinorFallbackException("MinorFallbackException"); 41 } 42 }.execute(); 43 } 44 }; 45 return majorCommand.execute(); 46 } 47 48 }
例如get(String key)命令
1 public class RedisClusterCrossRoomClientImpl implements RedisCrossRoomClient { 2 ... 3 @Override 4 public String get(final String key) { 5 return new ReadCommand<String>() { 6 7 @Override 8 protected String readMajor() { 9 return majorPipelineCluster.get(key); 10 } 11 12 @Override 13 protected String readMinor() { 14 return minorPipelineCluster.get(key); 15 } 16 }.read(); 17 18 } 19 ... 20 }
-
4.写操作方案目标:尽可能双写,如果发生故障暂时只是做了隔离,没有做数据同步处理(未来会考虑接入MQ),目前只把写入的结果返回给应用方,应用方来维持一致性。
MultiWriteResult类,四个成员变量分别为:
序号 参数 含义 1 DataStatusEnum majorStatus 主集群执行结果状态 2 T majorResult 主集群执行Redis命令结果 3 DataStatusEnum minorStatus 备用集群执行结果状态 4 T minorResult 备用集群执行Redis命令结果 1 public abstract class WriteCommand<T> extends BaseCommand { 2 3 protected abstract T writeMajor(); 4 5 protected abstract T writeMinor(); 6 7 protected abstract String getCommandParam(); 8 9 public MultiWriteResult<T> write() { 10 DataComponentCommand<T> majorCommand = 11 new DataComponentCommand<T>(MAJOR_WRITE_COMMAND_KEY, MAJOR_GROUP_KEY, MAJOR_THREAD_POOL_KEY, 12 majorTimeOut, majorThreads) { 13 @Override 14 protected T run() throws Exception { 15 return writeMajor(); 16 } 17 18 @Override 19 public T getBusinessFallback() { 20 logger.warn("major cross-room failed: {}", getCommandParam()); 21 return null; 22 } 23 }; 24 25 DataComponentCommand<T> minorCommand = 26 new DataComponentCommand<T>(MINOR_WRITE_COMMAND_KEY, MINOR_GROUP_KEY, MINOR_THREAD_POOL_KEY, 27 minorTimeOut, minorThreads) { 28 @Override 29 protected T run() throws Exception { 30 return writeMinor(); 31 } 32 33 @Override 34 public T getBusinessFallback() { 35 logger.warn("minor cross-room failed: {}", getCommandParam()); 36 return null; 37 } 38 }; 39 40 Future<T> majorFuture = majorCommand.queue(); 41 Future<T> minorFuture = minorCommand.queue(); 42 T majorResult = null; 43 T minorResult = null; 44 try { 45 majorResult = majorFuture.get(); 46 } catch (Exception e) { 47 logger.error(e.getMessage(), e); 48 } 49 try { 50 minorResult = minorFuture.get(); 51 } catch (Exception e) { 52 logger.error(e.getMessage(), e); 53 } 54 55 DataStatusEnum majorStatus = DataStatusEnum.SUCCESS; 56 DataStatusEnum minorStatus = DataStatusEnum.SUCCESS; 57 if (majorResult == null) { 58 majorStatus = DataStatusEnum.FAIL; 59 } 60 if (minorResult == null) { 61 minorStatus = DataStatusEnum.FAIL; 62 } 63 return new MultiWriteResult<T>(majorStatus, majorResult, minorStatus, minorResult); 64 } 65 66 }
例如set命令
1 public class RedisClusterCrossRoomClientImpl implements RedisCrossRoomClient { 2 ... 3 @Override 4 public MultiWriteResult<String> set(final String key, final String value) { 5 return new WriteCommand<String>() { 6 @Override 7 protected String writeMajor() { 8 return majorPipelineCluster.set(key, value); 9 } 10 11 @Override 12 protected String writeMinor() { 13 return minorPipelineCluster.set(key, value); 14 } 15 16 @Override 17 protected String getCommandParam() { 18 return String.format("set key %s value %s", key, value); 19 } 20 }.write(); 21 } 22 ...
四、对外暴露的数据和报表:
(1) hystrix-dashboard报表:实时统计图。
(2) jmx相关数据:major和minor相关统计,run和fallback调用次数、异常次数。
五、测试读:
1.major服务正常,但是major的线程池确实不够用
(1) 测试代码
测试方法:major的线程池设置小一些,请求的并发量大一些,每个线程做1000次随机读并返回主线程
测试验证:每个请求都有返回结果(前提是key是存在的)
1 /**
2 * 主线程池跑满:线程池size过小(major:30,minor:80,并发请求线程100)
3 *
4 * @throws InterruptedException
5 */
6 @Test
7 public void testRandomReadWithEnoughThreads() throws InterruptedException {
8 redisClusterCrossRoomClient.setMajorThreads(30);
9 redisClusterCrossRoomClient.setMinorThreads(80);
10 int threadNum = 100;
11 int perSize = TOTAL_SIZE / threadNum;
12 int totalNeedGet = 1000;
13 CountDownLatch countDownLatch = new CountDownLatch(threadNum);
14 for (int i = 0; i < threadNum; i++) {
15 int start = perSize * i + 1;
16 int end = perSize * (i + 1);
17 Thread thread = new RandomReadThread(start, end, totalNeedGet, countDownLatch);
18 thread.start();
19 }
20 countDownLatch.await();
21 System.out.println("request counter: " + TOTAL_SIZE);
22 System.out.println("readSuccess counter:" + readSuccessCounter.get());
23 }
24
25 class RandomReadThread extends Thread {
26 private int start;
27 private int end;
28 private int totalNeedGet;
29 private CountDownLatch countDownLatch;
30 private long counter;
31 public RandomReadThread(int start, int end, int totalNeedGet, CountDownLatch countDownLatch) {
32 this.start = start;
33 this.end = end;
34 this.totalNeedGet = totalNeedGet;
35 this.countDownLatch = countDownLatch;
36 }
37 @Override
38 public void run() {
39 while (true) {
40 try {
41 if (counter >= totalNeedGet) {
42 countDownLatch.countDown();
43 break;
44 }
45 if (counter % 100 == 0) {
46 logger.info("{} execute {} th, total size {}", Thread.currentThread().getName(), counter,
47 totalNeedGet);
48 }
49 int id = start + new Random().nextInt(end - start);
50 String key = "user:" + id;
51 String result = redisClusterCrossRoomClient.get(key);
52 if (StringUtils.isBlank(result)) {
53 logger.warn("key {}, value is null", key);
54 } else {
55 readSuccessCounter.incrementAndGet();
56 }
57 counter++;
58 TimeUnit.MILLISECONDS.sleep(10);
59 } catch (Exception e) {
60 e.printStackTrace();
61 }
62 }
63 }
64 }
(2) 故障转移:
major线程池偶尔吃满,将线程拒绝,并执行降级逻辑,将请求自动转移到minor。
1 2016-04-25 15:07:10,658 ERROR [Thread-186] impl.RedisClusterCrossRoomClientImpl: major_redis_command could not be queued for execution and failed retrieving fallback.
2 com.netflix.hystrix.exception.HystrixRuntimeException: major_redis_command could not be queued for execution and failed retrieving fallback.
3 at com.netflix.hystrix.HystrixCommand.getFallbackOrThrowException(HystrixCommand.java:1660)
4 at com.netflix.hystrix.HystrixCommand.subscribeWithThreadIsolation(HystrixCommand.java:1250)
5 at com.netflix.hystrix.HystrixCommand.access$1400(HystrixCommand.java:103)
6 at com.netflix.hystrix.HystrixCommand$2.call(HystrixCommand.java:829)
7 at com.netflix.hystrix.HystrixCommand$2.call(HystrixCommand.java:803)
8 at rx.Observable.subscribe(Observable.java:6178)
9 at com.netflix.hystrix.HystrixCommand$TimeoutObservable$1.call(HystrixCommand.java:1080)
10 at com.netflix.hystrix.HystrixCommand$TimeoutObservable$1.call(HystrixCommand.java:1007)
11 at rx.Observable$2.call(Observable.java:153)
12 at rx.Observable$2.call(Observable.java:149)
13 at rx.Observable$2.call(Observable.java:153)
14 at rx.Observable$2.call(Observable.java:149)
15 at rx.Observable.subscribe(Observable.java:6178)
16 at com.netflix.hystrix.HystrixCommand$ObservableCommand$1.call(HystrixCommand.java:936)
17 at com.netflix.hystrix.HystrixCommand$ObservableCommand$1.call(HystrixCommand.java:932)
18 at rx.Observable.subscribe(Observable.java:6178)
19 at rx.operators.BlockingOperatorToFuture.toFuture(BlockingOperatorToFuture.java:55)
20 at rx.observables.BlockingObservable.toFuture(BlockingObservable.java:430)
21 at com.netflix.hystrix.HystrixCommand.queue(HystrixCommand.java:477)
22 at com.sohu.tv.cachecloud.client.redis.crossroom.impl.RedisClusterCrossRoomClientImpl.get(RedisClusterCrossRoomClientImpl.java:136)
23 at com.sohu.tv.cachecloud.client.redis.crossroom.RedisClusterCrossRoomClientImplTest$RandomReadThread.run(RedisClusterCrossRoomClientImplTest.java:212)
24 Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@5a56156e rejected from java.util.concurrent.ThreadPoolExecutor@656a0adc[Running, pool size = 30, active threads = 30, queued tasks = 0, completed tasks = 410]
25 at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2048)
26 at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
27 at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
28 at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:132)
29 at com.netflix.hystrix.HystrixCommand.subscribeWithThreadIsolation(HystrixCommand.java:1166)
30 ... 19 more
(3) hystrix-dashbord: 部分被major线程池拒绝的线程(紫色),通过fallback转移到minor线程池中执行。
(4) 最终结果,返回非空的结果的个数等于请求个数
1 2016-04-25 15:08:43,393 INFO [main] crossroom.RedisClusterCrossRoomClientImplTest: request counter: 100000
2 2016-04-25 15:08:43,393 INFO [main] crossroom.RedisClusterCrossRoomClientImplTest: readSuccess counter: 100000
2. major服务异常,造成major的线程池不够用,或者存在大量异常,大量超时等等
(1) 测试代码:
测试方法:直接利用Redis的debug sleep seconds命令使得Redis暂时不提供服务。
测试验证:每个请求都有返回结果(前提是key是存在的)
1 /**
2 * 主线程池跑满:主服务异常(用redis sleep进行模拟)
3 *
4 * @throws InterruptedException
5 */
6 @Test
7 public void testRandomReadWithMajorSleep() throws InterruptedException {
8 //让major下的所有节点休息10秒
9 Map<String, JedisPool> jedisPoolMap = majorPipelineCluster.getConnectionHandler().getNodes();
10 for (JedisPool jedisPool : jedisPoolMap.values()) {
11 Jedis jedis = null;
12 try {
13 jedis = jedisPool.getResource();
14 jedis.debug(DebugParams.SLEEP(10));
15 } catch (Exception e) {
16 logger.error(e.getMessage(), e);
17 } finally {
18 if (jedis != null) {
19 jedis.close();
20 }
21 }
22 }
23 int threadNum = 20;
24 int perSize = TOTAL_SIZE / threadNum;
25 int totalNeedGet = 1000;
26 CountDownLatch countDownLatch = new CountDownLatch(threadNum);
27 for (int i = 0; i < threadNum; i++) {
28 int start = perSize * i + 1;
29 int end = perSize * (i + 1);
30 Thread thread = new RandomReadThread(start, end, totalNeedGet, countDownLatch);
31 thread.start();
32 }
33 countDownLatch.await();
34 logger.info("request counter: {}", (threadNum * totalNeedGet));
35 logger.info("readSuccess counter: {}", readSuccessCounter.get());
36 }
(2) 故障转移:
直接major线程池的阀门打开了,所有请求执行降级逻辑,将请求自动转移到minor。
1 2016-04-25 14:41:48,144 ERROR [Thread-13] impl.RedisClusterCrossRoomClientImpl: major_redis_command short-circuited and failed retrieving fallback.
2 com.netflix.hystrix.exception.HystrixRuntimeException: major_redis_command short-circuited and failed retrieving fallback.
3 at com.netflix.hystrix.HystrixCommand.getFallbackOrThrowException(HystrixCommand.java:1660)
4 at com.netflix.hystrix.HystrixCommand.getFallbackOrThrowException(HystrixCommand.java:1614)
5 at com.netflix.hystrix.HystrixCommand.access$1300(HystrixCommand.java:103)
6 at com.netflix.hystrix.HystrixCommand$2.call(HystrixCommand.java:820)
7 at com.netflix.hystrix.HystrixCommand$2.call(HystrixCommand.java:803)
8 at rx.Observable.subscribe(Observable.java:6178)
9 at com.netflix.hystrix.HystrixCommand$TimeoutObservable$1.call(HystrixCommand.java:1080)
10 at com.netflix.hystrix.HystrixCommand$TimeoutObservable$1.call(HystrixCommand.java:1007)
11 at rx.Observable$2.call(Observable.java:153)
12 at rx.Observable$2.call(Observable.java:149)
13 at rx.Observable$2.call(Observable.java:153)
14 at rx.Observable$2.call(Observable.java:149)
15 at rx.Observable.subscribe(Observable.java:6178)
16 at com.netflix.hystrix.HystrixCommand$ObservableCommand$1.call(HystrixCommand.java:936)
17 at com.netflix.hystrix.HystrixCommand$ObservableCommand$1.call(HystrixCommand.java:932)
18 at rx.Observable.subscribe(Observable.java:6178)
19 at rx.operators.BlockingOperatorToFuture.toFuture(BlockingOperatorToFuture.java:55)
20 at rx.observables.BlockingObservable.toFuture(BlockingObservable.java:430)
21 at com.netflix.hystrix.HystrixCommand.queue(HystrixCommand.java:477)
22 at com.sohu.tv.cachecloud.client.redis.crossroom.impl.RedisClusterCrossRoomClientImpl.get(RedisClusterCrossRoomClientImpl.java:136)
23 at com.sohu.tv.cachecloud.client.redis.crossroom.RedisClusterCrossRoomClientImplTest$RandomReadThread.run(RedisClusterCrossRoomClientImplTest.java:214)
24 2016-04-25 14:41:48,145 ERROR [Thread-25] impl.RedisClusterCrossRoomClientImpl: major_redis_command short-circuited and failed retrieving fallback.
25 com.netflix.hystrix.exception.HystrixRuntimeException: major_redis_command short-circuited and failed retrieving fallback.
26 at com.netflix.hystrix.HystrixCommand.getFallbackOrThrowException(HystrixCommand.java:1660)
27 at com.netflix.hystrix.HystrixCommand.getFallbackOrThrowException(HystrixCommand.java:1614)
28 at com.netflix.hystrix.HystrixCommand.access$1300(HystrixCommand.java:103)
29 at com.netflix.hystrix.HystrixCommand$2.call(HystrixCommand.java:820)
30 at com.netflix.hystrix.HystrixCommand$2.call(HystrixCommand.java:803)
31 at rx.Observable.subscribe(Observable.java:6178)
32 at com.netflix.hystrix.HystrixCommand$TimeoutObservable$1.call(HystrixCommand.java:1080)
33 at com.netflix.hystrix.HystrixCommand$TimeoutObservable$1.call(HystrixCommand.java:1007)
34 at rx.Observable$2.call(Observable.java:153)
35 at rx.Observable$2.call(Observable.java:149)
36 at rx.Observable$2.call(Observable.java:153)
37 at rx.Observable$2.call(Observable.java:149)
38 at rx.Observable.subscribe(Observable.java:6178)
39 at com.netflix.hystrix.HystrixCommand$ObservableCommand$1.call(HystrixCommand.java:936)
40 at com.netflix.hystrix.HystrixCommand$ObservableCommand$1.call(HystrixCommand.java:932)
41 at rx.Observable.subscribe(Observable.java:6178)
42 at rx.operators.BlockingOperatorToFuture.toFuture(BlockingOperatorToFuture.java:55)
43 at rx.observables.BlockingObservable.toFuture(BlockingObservable.java:430)
44 at com.netflix.hystrix.HystrixCommand.queue(HystrixCommand.java:477)
45 at com.sohu.tv.cachecloud.client.redis.crossroom.impl.RedisClusterCrossRoomClientImpl.get(RedisClusterCrossRoomClientImpl.java:136)
46 at com.sohu.tv.cachecloud.client.redis.crossroom.RedisClusterCrossRoomClientImplTest$RandomReadThread.run(RedisClusterCrossRoomClientImplTest.java:214)
(3) hystrix-dashboard
major的熔断器阀门已经打开,请求都转移到迁移到minor,之后major恢复,流量又回到major
(4) 最终结果:
返回非空的结果的个数等于请求个数
1 12016-04-25 15:01:21,338 INFO [main] crossroom.RedisClusterCrossRoomClientImplTest: request counter: 20000
2 2016-04-25 15:01:21,338 INFO [main] crossroom.RedisClusterCrossRoomClientImplTest: readSuccess counter: 20000
六、测试写:
1.major异常:
(1)测试代码
直接sleep major,20个线程,一共写1000次,每个线程50个key 。
1 @Test
2 public void testRandomWriteWithMajorSleep() throws InterruptedException {
3 // major 休息10秒
4 tempSleepPipelineCluster(majorPipelineCluster, 10);
5 int totalWrite = 1000;
6 int threadNum = 20;
7 int perSize = totalWrite / threadNum;
8 CountDownLatch countDownLatch = new CountDownLatch(threadNum);
9 for (int i = 0; i < threadNum; i++) {
10 int start = TOTAL_SIZE + perSize * i + 1;
11 int end = TOTAL_SIZE + perSize * (i + 1);
12 Thread thread = new RandomWriteThread(start, end, countDownLatch);
13 thread.start();
14 }
15 countDownLatch.await();
16 logger.info("total request write: {}", totalWrite);
17 logger.info("writeFail counter: {}", writeFailCounter.get());
18 logger.info("writePartFail counter: {}", writePartFailCounter.get());
19 logger.info("writeSuccess counter: {}", writeSuccessCounter.get());
20 }
21 /**
22 * 随机写
23 *
24 * @author leifu
25 * @Date 2016年4月25日
26 * @Time 下午3:39:13
27 */
28 class RandomWriteThread extends Thread {
29 private int start;
30 private int end;
31 private CountDownLatch countDownLatch;
32 public RandomWriteThread(int start, int end, CountDownLatch countDownLatch) {
33 this.start = start;
34 this.end = end;
35 this.countDownLatch = countDownLatch;
36 }
37 @Override
38 public void run() {
39 for (int id = start; id <= end; id++) {
40 try {
41 String key = "user:" + id;
42 String value = String.valueOf(id);
43 MultiWriteResult<String> multiWriteResult = redisClusterCrossRoomClient.set(key, value);
44 DataStatusEnum majorStatus = multiWriteResult.getMajorStatus();
45 DataStatusEnum minorStatus = multiWriteResult.getMinorStatus();
46 if(DataStatusEnum.SUCCESS.equals(majorStatus) && DataStatusEnum.SUCCESS.equals(minorStatus)){
47 writeSuccessCounter.incrementAndGet();
48 } else if(!DataStatusEnum.SUCCESS.equals(majorStatus) && !DataStatusEnum.SUCCESS.equals(minorStatus)){
49 writeFailCounter.incrementAndGet();
50 } else {
51 writePartFailCounter.incrementAndGet();
52 }
53
54
55 TimeUnit.MILLISECONDS.sleep(10);
56 } catch (InterruptedException e) {
57 e.printStackTrace();
58 }
59 }
60 countDownLatch.countDown();
61 }
62 }
(2) 异常:major出现大量超时,major写入失败
1 2016-04-25 17:31:08,703 ERROR [main] crossroom.RedisClusterCrossRoomClientImplTest: java.net.SocketTimeoutException: Read timed out
2 redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out
3 at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:201)
4 at redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:40)
5 at redis.clients.jedis.Protocol.process(Protocol.java:142)
6 at redis.clients.jedis.Protocol.read(Protocol.java:206)
7 at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:282)
8 at redis.clients.jedis.Connection.getStatusCodeReply(Connection.java:208)
9 at redis.clients.jedis.BinaryJedis.debug(BinaryJedis.java:2974)
10 at com.sohu.tv.cachecloud.client.redis.crossroom.RedisClusterCrossRoomClientImplTest.tempSleepPipelineCluster(RedisClusterCrossRoomClientImplTest.java:184)
11 at com.sohu.tv.cachecloud.client.redis.crossroom.RedisClusterCrossRoomClientImplTest.testRandomWriteWithMajorSleep(RedisClusterCrossRoomClientImplTest.java:252)
12 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
13 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
14 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
15 at java.lang.reflect.Method.invoke(Method.java:606)
16 at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
17 at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
18 at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
19 at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
20 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
21 at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
22 at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
23 at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
24 at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
25 at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
26 at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
27 at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
28 at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
29 at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
30 at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
31 at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
32 at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
33 at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
34 at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
35 at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
36 Caused by: java.net.SocketTimeoutException: Read timed out
37 at java.net.SocketInputStream.socketRead0(Native Method)
38 at java.net.SocketInputStream.read(SocketInputStream.java:152)
39 at java.net.SocketInputStream.read(SocketInputStream.java:122)
40 at java.net.SocketInputStream.read(SocketInputStream.java:108)
41 at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:195)
42 ... 32 more
43 2016-04-25 17:31:10,777 WARN [Thread-14] sources.URLConfigurationSource: No URLs will be polled as dynamic configuration sources.
44 2016-04-25 17:31:10,777 INFO [Thread-14] sources.URLConfigurationSource: To enable URLs as dynamic configuration sources, define System property archaius.configurationSource.additionalUrls or make config.properties available on classpath.
45 2016-04-25 17:31:10,779 INFO [Thread-14] config.DynamicPropertyFactory: DynamicPropertyFactory is initialized with configuration sources: com.netflix.config.ConcurrentCompositeConfiguration@224dc69c
(3) dashboard:
(4) 最终结果:
一共写了1000次,成功写入22次,部分成功写入了978次。
1 2016-04-25 17:31:12,399 INFO [main] crossroom.RedisClusterCrossRoomClientImplTest: total request write: 1000
2 2016-04-25 17:31:12,399 INFO [main] crossroom.RedisClusterCrossRoomClientImplTest: writeFail counter: 0
3 2016-04-25 17:31:12,399 INFO [main] crossroom.RedisClusterCrossRoomClientImplTest: writePartFail counter: 978
4 2016-04-25 17:31:12,399 INFO [main] crossroom.RedisClusterCrossRoomClientImplTest: writeSuccess counter: 22
5 期间打印了失败的日志:
6 2016-04-28 10:47:29,871 WARN [Thread-17] impl.RedisClusterCrossRoomClientImpl$1: major cross-room failed: set key user:10300 value 10300
7 ...................................
七、问题和展望
- 由于是定制化客户端,目前只支持部分Redis命令API,后续需要添加
- 在出现故障时,读正常,但是双写会出现major和minor数据不一致的情况。(可以考虑利用MQ机制来解决这个问题),目前使用打印日志的形式记录。
- …………