1  本文以Redis-Cluster为例子,实际使用中Redis-Sentinel和Redis Standalone也是一样的。

一、现有问题

由于Redis本身的一些特性(例如复制)以及使用场景,造成Redis不太适合部署在不同的机房,所以通常来看Redis集群都是在同一个机房部署的。虽然Redis集群自身已经具备了高可用的特性,即使几个Redis节点异常或者挂掉,Redis Cluster也会实现故障自动转移,对应用方来说也可以在很短时间内恢复故障。但是如果发生了机房故障(断电、断网等极端情况),如果应用方降级或者容错机制做的不好甚至业务本身不能降级,或者会丢失重要数据,或者可能瞬间会跑满应用的线程池造成服务不可用,对于一些重要的服务来说是非常致命的。

为了应对像机房故障这类情况,保证应用方在这种极端情况下,仍然可以正常服务(系统正常运行、数据正常),所以需要给出一个Redis跨机房的方案。

二、实现思路和目标:

1.思路

  • 使用CacheCloud开通两个位于两个不同机房的Redis-Cluster集群(例如:兆维、北显):一个叫major,作为主Redis服务,一个叫minor,作为备用Redis服务。
  • 开发定制版的客户端,利用netflix的hystrix组件能够解除依赖隔离的特性,在major出现故障时,将故障隔离,并将请求自动转发到minor,并且对于应用的主线程池没有影响。(有关hystrix的请求流程流程见下图,有关hystrix使用请参考:http://hot66hot.iteye.com/blog/2155036

    

2.实现目标:

  • 客户端易接入,如同使用Jedis API一样。
  • 真正实现跨机房的故障转移。
  • 依赖隔离,也就是说即使Redis出现问题,也不会影响主线程池。
  • 读取数据正常。
  • 写数据尽可能一致。
  • 更多的故障转移可配置参数(hystrix):例如隔离线程池大小,超时等
  • 暴露相关统计数据和报表:如jmx和hystrix-dashboard

三、实施:

  • 1.利用hystrix能够隔离依赖的特性,为major和minor分别放到不同的线程池中(与应用的主线程池隔离)
  • 2.客户端接口和初始化方法:由于是定制化客户端,所以暂时没有通用的方法,所有的API需要自己实现。

     1	public interface RedisCrossRoomClient {
     2	
     3	String set(String key, String value);
     4	
     5	String get(String key);
     6	
     7	}
    

初始化方法,需要传入两个初始化好的PipeLineCluster

      1  PipeLineCluster是我们内部对于JedisCluster的扩展,这里看成JedisCluster即可。
   1	public class RedisClusterCrossRoomClientImpl implements RedisCrossRoomClient {
   2		private Logger logger = LoggerFactory.getLogger(RedisClusterCrossRoomClientImpl.class);
   3		/**
   4		* 主
   5		*/
   6		private PipelineCluster majorPipelineCluster;
   7		/**
   8		* 备
   9		*/
   10		private PipelineCluster minorPipelineCluster;
   11		
   12		public RedisClusterCrossRoomClientImpl(PipelineCluster majorPipelineCluster, PipelineCluster minorPipelineCluster) {
   13			this.majorPipelineCluster = majorPipelineCluster;
   14			this.minorPipelineCluster = minorPipelineCluster;
   15		}
   16	}
  • 3.读操作方案:如下图,正常run指向到major, 异常(2.1图中所有指向getFallback)指向到minor。

    例如:正常情况下都是从majorPipelineCluster读取数据,当出现非正常情况时(hystrix阀门开启、线程池拒绝、超时、异常)等情况时,走minorPipelineCluster的逻辑

    基础类

     1 	public class BaseCommand {
     2 	    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
     3	
     4 	    /**
     5 	    * hystrix参数,例如超时、线程池、关门策略、开门策略等等。
     6 	    */
     7 	   
     8 	   protected static final String MAJOR_READ_COMMAND_KEY = "major_read_command";
     9 	   protected static final String MAJOR_WRITE_COMMAND_KEY = "major_write_command";
     10	    protected static final String MAJOR_GROUP_KEY = "major_redis_group";
     11	    protected static final String MAJOR_THREAD_POOL_KEY = "major_redis_pool";
     12	    public static int majorTimeOut = 1000;
     13	    public static int majorThreads = 100;
     14	
     15	    /**
     16	     * hystrix参数,例如超时、线程池、关门策略、开门策略等等。
     17	     */
     18	    protected static final String MINOR_READ_COMMAND_KEY = "minor_read_command";
     19	    protected static final String MINOR_WRITE_COMMAND_KEY = "minor_write_command";
     20	    protected static final String MINOR_GROUP_KEY = "minor_redis_group";
     21	    protected static final String MINOR_THREAD_POOL_KEY = "minor_redis_pool";
     22	    public static int minorTimeOut = 1000;
     23	    public static int minorThreads = 100;
     24	
     25	 }
    

    读命令类

     1	public abstract class ReadCommand<T> extends BaseCommand {
     2	
     3	   protected abstract T readMajor();
     4	  
     5	   protected abstract T readMinor();
     6	  
     7	   public T read() {
     8	      // 1.收集总数
     9	       RedisCrossRoomClientStatusCollector.collectCrossRoomStatus(HystrixStatCountTypeEnum.ALL);
     10	      
     11	       DataComponentCommand<T> majorCommand =
     12	               new DataComponentCommand<T>(MAJOR_READ_COMMAND_KEY, MAJOR_GROUP_KEY, MAJOR_THREAD_POOL_KEY,
     13	                       majorTimeOut, majorThreads) {
     14	                   @Override
     15	                   protected T run() throws Exception {
     16	                       // 2.收集run总数
     17	                       RedisCrossRoomClientStatusCollector.collectCrossRoomStatus(HystrixStatCountTypeEnum.RUN);
     18	                       return readMajor();
     19	                   }
     20					   
     21	                   @Override
     22	                   public T getBusinessFallback() {
     23	                       // 3.收集fallback总数
     24	                       RedisCrossRoomClientStatusCollector.collectCrossRoomStatus(HystrixStatCountTypeEnum.FALLBACK_ALL);
     25	                       
     26	                       RedisCrossRoomHystrixStat.counterFallBack(MAJOR_READ_COMMAND_KEY);
     27	                       return new DataComponentCommand<T>(MINOR_READ_COMMAND_KEY, MINOR_GROUP_KEY, MINOR_THREAD_POOL_KEY,
     28	                               minorTimeOut, minorThreads) {
     29	                           @Override
     30	                           protected T run() throws Exception {
     31	                               // 4.收集fallback-run总数
     32	                               RedisCrossRoomClientStatusCollector.collectCrossRoomStatus(HystrixStatCountTypeEnum.FALLBACK_RUN);
     33	                               return readMinor();
     34	                           }
     35							   
     36	                           @Override
     37	                           public T getBusinessFallback() throws RedisCrossRoomReadMinorFallbackException {
     38	                               // 5.收集fallback-fallback总数
     39	                               RedisCrossRoomClientStatusCollector.collectCrossRoomStatus(HystrixStatCountTypeEnum.FALLBACK_FALLBACK);
     40	                               throw new RedisCrossRoomReadMinorFallbackException("MinorFallbackException");
     41	                           }
     42	                       }.execute();
     43	                   }
     44	               };
     45	       return majorCommand.execute();
     46	   }
     47		   
     48	}
    

    例如get(String key)命令

     1 	public class RedisClusterCrossRoomClientImpl implements RedisCrossRoomClient {
     2 	...
     3		   @Override
     4		   public String get(final String key) {
     5		       return new ReadCommand<String>() {
     6		   
     7		           @Override
     8		           protected String readMajor() {
     9		               return majorPipelineCluster.get(key);
     10		           }
     11		   
     12		           @Override
     13		           protected String readMinor() {
     14		               return minorPipelineCluster.get(key);
     15		           }
     16		       }.read();
     17		   
     18		   }
     19	...
     20	}
    
  • 4.写操作方案目标:尽可能双写,如果发生故障暂时只是做了隔离,没有做数据同步处理(未来会考虑接入MQ),目前只把写入的结果返回给应用方,应用方来维持一致性。

    MultiWriteResult类,四个成员变量分别为:

    序号 参数 含义
    1 DataStatusEnum majorStatus 主集群执行结果状态
    2 T majorResult 主集群执行Redis命令结果
    3 DataStatusEnum minorStatus 备用集群执行结果状态
    4 T minorResult 备用集群执行Redis命令结果

        

     1 	public abstract class WriteCommand<T> extends BaseCommand {
     2 	
     3 	  protected abstract T writeMajor();
     4 	
     5 	  protected abstract T writeMinor();
     6 	  
     7 	  protected abstract String getCommandParam();
     8 	
     9 	  public MultiWriteResult<T> write() {
     10	       DataComponentCommand<T> majorCommand =
     11	               new DataComponentCommand<T>(MAJOR_WRITE_COMMAND_KEY, MAJOR_GROUP_KEY, MAJOR_THREAD_POOL_KEY,
     12	                       majorTimeOut, majorThreads) {
     13	                   @Override
     14	                   protected T run() throws Exception {
     15	                       return writeMajor();
     16	                   }
     17	
     18	                   @Override
     19	                   public T getBusinessFallback() {
     20	                       logger.warn("major cross-room failed: {}", getCommandParam());
     21	                       return null;
     22	                   }
     23	               };
     24	
     25	       DataComponentCommand<T> minorCommand =
     26	               new DataComponentCommand<T>(MINOR_WRITE_COMMAND_KEY, MINOR_GROUP_KEY, MINOR_THREAD_POOL_KEY,
     27	                       minorTimeOut, minorThreads) {
     28	                   @Override
     29	                   protected T run() throws Exception {
     30	                       return writeMinor();
     31	                   }
     32	
     33	                   @Override
     34	                   public T getBusinessFallback() {
     35	                       logger.warn("minor cross-room failed: {}", getCommandParam());
     36	                       return null;
     37	                   }
     38	               };
     39	
     40	       Future<T> majorFuture = majorCommand.queue();
     41	       Future<T> minorFuture = minorCommand.queue();
     42	       T majorResult = null;
     43	       T minorResult = null;
     44	       try {
     45	           majorResult = majorFuture.get();
     46	       } catch (Exception e) {
     47	           logger.error(e.getMessage(), e);
     48	       }
     49	       try {
     50	           minorResult = minorFuture.get();
     51	       } catch (Exception e) {
     52	           logger.error(e.getMessage(), e);
     53	       }
     54	
     55	       DataStatusEnum majorStatus = DataStatusEnum.SUCCESS;
     56	       DataStatusEnum minorStatus = DataStatusEnum.SUCCESS;
     57	       if (majorResult == null) {
     58	           majorStatus = DataStatusEnum.FAIL;
     59	       }
     60	       if (minorResult == null) {
     61	           minorStatus = DataStatusEnum.FAIL;
     62	       }
     63	       return new MultiWriteResult<T>(majorStatus, majorResult, minorStatus, minorResult);
     64	   }
     65	
     66	}
    

    例如set命令

     1 	public class RedisClusterCrossRoomClientImpl implements RedisCrossRoomClient {
     2 	...
     3 		@Override
     4 		public MultiWriteResult<String> set(final String key, final String value) {
     5 		    return new WriteCommand<String>() {
     6 		        @Override
     7 		        protected String writeMajor() {
     8 		            return majorPipelineCluster.set(key, value);
     9 		        }
     10	 	
     11	 	        @Override
     12	 	        protected String writeMinor() {
     13	 	            return minorPipelineCluster.set(key, value);
     14	 	        }
     15	 	
     16	 	        @Override
     17	 	        protected String getCommandParam() {
     18	 	            return String.format("set key %s value %s", key, value);
     19	 	        }
     20	 	    }.write();
     21	 	}
     22	 ...
    

四、对外暴露的数据和报表:

(1) hystrix-dashboard报表:实时统计图。

(2) jmx相关数据:major和minor相关统计,run和fallback调用次数、异常次数。

五、测试读:

1.major服务正常,但是major的线程池确实不够用

(1) 测试代码

测试方法:major的线程池设置小一些,请求的并发量大一些,每个线程做1000次随机读并返回主线程

测试验证:每个请求都有返回结果(前提是key是存在的)

1 	/**
2 	 * 主线程池跑满:线程池size过小(major:30,minor:80,并发请求线程100)
3 	 * 
4 	 * @throws InterruptedException
5 	*/
6 	@Test
7 	public void testRandomReadWithEnoughThreads() throws InterruptedException {
8 	    redisClusterCrossRoomClient.setMajorThreads(30);
9 	    redisClusterCrossRoomClient.setMinorThreads(80);
10	    int threadNum = 100;
11	    int perSize = TOTAL_SIZE / threadNum;
12	    int totalNeedGet = 1000;
13	    CountDownLatch countDownLatch = new CountDownLatch(threadNum);
14	    for (int i = 0; i < threadNum; i++) {
15	        int start = perSize * i + 1;
16	        int end = perSize * (i + 1);
17	        Thread thread = new RandomReadThread(start, end, totalNeedGet, countDownLatch);
18	        thread.start();
19	   }
20	    countDownLatch.await();
21	    System.out.println("request counter: " + TOTAL_SIZE);
22	    System.out.println("readSuccess counter:" + readSuccessCounter.get());
23	 }
24	
25	 class RandomReadThread extends Thread {
26	    private int start;
27	    private int end;
28	    private int totalNeedGet;
29	    private CountDownLatch countDownLatch;
30	    private long counter;
31	    public RandomReadThread(int start, int end, int totalNeedGet, CountDownLatch countDownLatch) {
32	        this.start = start;
33	        this.end = end;
34	        this.totalNeedGet = totalNeedGet;
35	        this.countDownLatch = countDownLatch;
36	    }
37	    @Override
38	    public void run() {
39	        while (true) {
40	            try {
41	                if (counter >= totalNeedGet) {
42	                    countDownLatch.countDown();
43	                    break;
44	                }
45	                if (counter % 100 == 0) {
46	                    logger.info("{} execute {} th, total size {}", Thread.currentThread().getName(), counter,
47	                            totalNeedGet);
48	                }
49	                int id = start + new Random().nextInt(end - start);
50	                String key = "user:" + id;
51	                String result = redisClusterCrossRoomClient.get(key);
52	                if (StringUtils.isBlank(result)) {
53	                    logger.warn("key {}, value is null", key);
54	                } else {
55	                    readSuccessCounter.incrementAndGet();
56	                }
57	                counter++;
58	                TimeUnit.MILLISECONDS.sleep(10);
59	            } catch (Exception e) {
60	                e.printStackTrace();
61	            }
62	        }
63	    }
64	}

(2) 故障转移:

major线程池偶尔吃满,将线程拒绝,并执行降级逻辑,将请求自动转移到minor。

1	2016-04-25 15:07:10,658 ERROR [Thread-186] impl.RedisClusterCrossRoomClientImpl: major_redis_command could not be queued for execution and failed retrieving fallback.
2	com.netflix.hystrix.exception.HystrixRuntimeException: major_redis_command could not be queued for execution and failed retrieving fallback.
3  	at com.netflix.hystrix.HystrixCommand.getFallbackOrThrowException(HystrixCommand.java:1660)
4	at com.netflix.hystrix.HystrixCommand.subscribeWithThreadIsolation(HystrixCommand.java:1250)
5	at com.netflix.hystrix.HystrixCommand.access$1400(HystrixCommand.java:103)
6	at com.netflix.hystrix.HystrixCommand$2.call(HystrixCommand.java:829)
7	at com.netflix.hystrix.HystrixCommand$2.call(HystrixCommand.java:803)
8	at rx.Observable.subscribe(Observable.java:6178)
9	at com.netflix.hystrix.HystrixCommand$TimeoutObservable$1.call(HystrixCommand.java:1080)
10	at com.netflix.hystrix.HystrixCommand$TimeoutObservable$1.call(HystrixCommand.java:1007)
11	at rx.Observable$2.call(Observable.java:153)
12	at rx.Observable$2.call(Observable.java:149)
13	at rx.Observable$2.call(Observable.java:153)
14	at rx.Observable$2.call(Observable.java:149)
15	at rx.Observable.subscribe(Observable.java:6178)
16	at com.netflix.hystrix.HystrixCommand$ObservableCommand$1.call(HystrixCommand.java:936)
17	at com.netflix.hystrix.HystrixCommand$ObservableCommand$1.call(HystrixCommand.java:932)
18	at rx.Observable.subscribe(Observable.java:6178)
19	at rx.operators.BlockingOperatorToFuture.toFuture(BlockingOperatorToFuture.java:55)
20	at rx.observables.BlockingObservable.toFuture(BlockingObservable.java:430)
21	at com.netflix.hystrix.HystrixCommand.queue(HystrixCommand.java:477)
22	at com.sohu.tv.cachecloud.client.redis.crossroom.impl.RedisClusterCrossRoomClientImpl.get(RedisClusterCrossRoomClientImpl.java:136)
23	at com.sohu.tv.cachecloud.client.redis.crossroom.RedisClusterCrossRoomClientImplTest$RandomReadThread.run(RedisClusterCrossRoomClientImplTest.java:212)
24	Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@5a56156e rejected from java.util.concurrent.ThreadPoolExecutor@656a0adc[Running, pool size = 30, active threads = 30, queued tasks = 0, completed tasks = 410]
25	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2048)
26	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
27	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
28	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:132)
29	at com.netflix.hystrix.HystrixCommand.subscribeWithThreadIsolation(HystrixCommand.java:1166)
30	... 19 more

(3) hystrix-dashbord: 部分被major线程池拒绝的线程(紫色),通过fallback转移到minor线程池中执行。

    

(4) 最终结果,返回非空的结果的个数等于请求个数

1  2016-04-25 15:08:43,393 INFO [main] crossroom.RedisClusterCrossRoomClientImplTest: request counter: 100000
2  2016-04-25 15:08:43,393 INFO [main] crossroom.RedisClusterCrossRoomClientImplTest: readSuccess counter: 100000

2. major服务异常,造成major的线程池不够用,或者存在大量异常,大量超时等等

(1) 测试代码:

测试方法:直接利用Redis的debug sleep seconds命令使得Redis暂时不提供服务。

测试验证:每个请求都有返回结果(前提是key是存在的)

1	/**
2	 * 主线程池跑满:主服务异常(用redis sleep进行模拟)
3	 * 
4	 * @throws InterruptedException
5	 */
6	@Test
7	public void testRandomReadWithMajorSleep() throws InterruptedException {
8	    //让major下的所有节点休息10秒
9	    Map<String, JedisPool> jedisPoolMap = majorPipelineCluster.getConnectionHandler().getNodes();
10	    for (JedisPool jedisPool : jedisPoolMap.values()) {
11	        Jedis jedis = null;
12	        try {
13	            jedis = jedisPool.getResource();
14	            jedis.debug(DebugParams.SLEEP(10));
15	       } catch (Exception e) {
16	            logger.error(e.getMessage(), e);
17	        } finally {
18	            if (jedis != null) {
19	                jedis.close();
20	           }
21	        }
22	    }
23	    int threadNum = 20;
24	    int perSize = TOTAL_SIZE / threadNum;
25	    int totalNeedGet = 1000;
26	    CountDownLatch countDownLatch = new CountDownLatch(threadNum);
27	    for (int i = 0; i < threadNum; i++) {
28	        int start = perSize * i + 1;
29	        int end = perSize * (i + 1);
30	        Thread thread = new RandomReadThread(start, end, totalNeedGet, countDownLatch);
31	        thread.start();
32	    }
33	    countDownLatch.await();
34	    logger.info("request counter: {}", (threadNum * totalNeedGet));
35	    logger.info("readSuccess counter: {}", readSuccessCounter.get());
36	}

(2) 故障转移:

直接major线程池的阀门打开了,所有请求执行降级逻辑,将请求自动转移到minor。

1	2016-04-25 14:41:48,144 ERROR [Thread-13] impl.RedisClusterCrossRoomClientImpl: major_redis_command short-circuited and failed retrieving fallback.
2	com.netflix.hystrix.exception.HystrixRuntimeException: major_redis_command short-circuited and failed retrieving fallback.
3	 at com.netflix.hystrix.HystrixCommand.getFallbackOrThrowException(HystrixCommand.java:1660)
4	 at com.netflix.hystrix.HystrixCommand.getFallbackOrThrowException(HystrixCommand.java:1614)
5	 at com.netflix.hystrix.HystrixCommand.access$1300(HystrixCommand.java:103)
6	 at com.netflix.hystrix.HystrixCommand$2.call(HystrixCommand.java:820)
7	 at com.netflix.hystrix.HystrixCommand$2.call(HystrixCommand.java:803)
8	 at rx.Observable.subscribe(Observable.java:6178)
9	 at com.netflix.hystrix.HystrixCommand$TimeoutObservable$1.call(HystrixCommand.java:1080)
10	 at com.netflix.hystrix.HystrixCommand$TimeoutObservable$1.call(HystrixCommand.java:1007)
11	 at rx.Observable$2.call(Observable.java:153)
12	 at rx.Observable$2.call(Observable.java:149)
13	 at rx.Observable$2.call(Observable.java:153)
14	 at rx.Observable$2.call(Observable.java:149)
15	 at rx.Observable.subscribe(Observable.java:6178)
16	 at com.netflix.hystrix.HystrixCommand$ObservableCommand$1.call(HystrixCommand.java:936)
17	 at com.netflix.hystrix.HystrixCommand$ObservableCommand$1.call(HystrixCommand.java:932)
18	 at rx.Observable.subscribe(Observable.java:6178)
19	 at rx.operators.BlockingOperatorToFuture.toFuture(BlockingOperatorToFuture.java:55)
20	 at rx.observables.BlockingObservable.toFuture(BlockingObservable.java:430)
21	 at com.netflix.hystrix.HystrixCommand.queue(HystrixCommand.java:477)
22	 at com.sohu.tv.cachecloud.client.redis.crossroom.impl.RedisClusterCrossRoomClientImpl.get(RedisClusterCrossRoomClientImpl.java:136)
23	 at com.sohu.tv.cachecloud.client.redis.crossroom.RedisClusterCrossRoomClientImplTest$RandomReadThread.run(RedisClusterCrossRoomClientImplTest.java:214)
24	2016-04-25 14:41:48,145 ERROR [Thread-25] impl.RedisClusterCrossRoomClientImpl: major_redis_command short-circuited and failed retrieving fallback.
25	com.netflix.hystrix.exception.HystrixRuntimeException: major_redis_command short-circuited and failed retrieving fallback.
26	 at com.netflix.hystrix.HystrixCommand.getFallbackOrThrowException(HystrixCommand.java:1660)
27	 at com.netflix.hystrix.HystrixCommand.getFallbackOrThrowException(HystrixCommand.java:1614)
28	 at com.netflix.hystrix.HystrixCommand.access$1300(HystrixCommand.java:103)
29	 at com.netflix.hystrix.HystrixCommand$2.call(HystrixCommand.java:820)
30	 at com.netflix.hystrix.HystrixCommand$2.call(HystrixCommand.java:803)
31	 at rx.Observable.subscribe(Observable.java:6178)
32	 at com.netflix.hystrix.HystrixCommand$TimeoutObservable$1.call(HystrixCommand.java:1080)
33	 at com.netflix.hystrix.HystrixCommand$TimeoutObservable$1.call(HystrixCommand.java:1007)
34	 at rx.Observable$2.call(Observable.java:153)
35	 at rx.Observable$2.call(Observable.java:149)
36	 at rx.Observable$2.call(Observable.java:153)
37	 at rx.Observable$2.call(Observable.java:149)
38	 at rx.Observable.subscribe(Observable.java:6178)
39	 at com.netflix.hystrix.HystrixCommand$ObservableCommand$1.call(HystrixCommand.java:936)
40	 at com.netflix.hystrix.HystrixCommand$ObservableCommand$1.call(HystrixCommand.java:932)
41	 at rx.Observable.subscribe(Observable.java:6178)
42	 at rx.operators.BlockingOperatorToFuture.toFuture(BlockingOperatorToFuture.java:55)
43	 at rx.observables.BlockingObservable.toFuture(BlockingObservable.java:430)
44	 at com.netflix.hystrix.HystrixCommand.queue(HystrixCommand.java:477)
45	 at com.sohu.tv.cachecloud.client.redis.crossroom.impl.RedisClusterCrossRoomClientImpl.get(RedisClusterCrossRoomClientImpl.java:136)
46	 at com.sohu.tv.cachecloud.client.redis.crossroom.RedisClusterCrossRoomClientImplTest$RandomReadThread.run(RedisClusterCrossRoomClientImplTest.java:214)

(3) hystrix-dashboard

major的熔断器阀门已经打开,请求都转移到迁移到minor,之后major恢复,流量又回到major

    

(4) 最终结果:

返回非空的结果的个数等于请求个数

1	12016-04-25 15:01:21,338 INFO  [main] crossroom.RedisClusterCrossRoomClientImplTest: request counter: 20000
2	2016-04-25 15:01:21,338 INFO  [main] crossroom.RedisClusterCrossRoomClientImplTest: readSuccess counter: 20000

六、测试写:

1.major异常:

(1)测试代码

直接sleep major,20个线程,一共写1000次,每个线程50个key 。

1		@Test
2	public void testRandomWriteWithMajorSleep() throws InterruptedException {
3	    // major 休息10秒
4	    tempSleepPipelineCluster(majorPipelineCluster, 10);
5	    int totalWrite = 1000;
6	    int threadNum = 20;
7	    int perSize = totalWrite / threadNum;
8	    CountDownLatch countDownLatch = new CountDownLatch(threadNum);
9	    for (int i = 0; i < threadNum; i++) {
10	        int start = TOTAL_SIZE + perSize * i + 1;
11	        int end = TOTAL_SIZE + perSize * (i + 1);
12	        Thread thread = new RandomWriteThread(start, end, countDownLatch);
13	        thread.start();
14	    }
15	    countDownLatch.await();
16	    logger.info("total request write: {}", totalWrite);
17	    logger.info("writeFail counter: {}", writeFailCounter.get());
18	    logger.info("writePartFail counter: {}", writePartFailCounter.get());
19	    logger.info("writeSuccess counter: {}", writeSuccessCounter.get());
20	}
21	/**
22	 * 随机写
23	 * 
24	 * @author leifu
25	 * @Date 2016年4月25日
26	 * @Time 下午3:39:13
27	 */
28	class RandomWriteThread extends Thread {
29	    private int start;
30	    private int end;
31	    private CountDownLatch countDownLatch;
32	    public RandomWriteThread(int start, int end, CountDownLatch countDownLatch) {
33	        this.start = start;
34	        this.end = end;
35	        this.countDownLatch = countDownLatch;
36	    }
37	    @Override
38	    public void run() {
39	        for (int id = start; id <= end; id++) {
40	            try {
41	                String key = "user:" + id;
42	                String value = String.valueOf(id);
43	                MultiWriteResult<String> multiWriteResult = redisClusterCrossRoomClient.set(key, value);
44	                DataStatusEnum majorStatus = multiWriteResult.getMajorStatus();
45	                DataStatusEnum minorStatus = multiWriteResult.getMinorStatus();
46	                if(DataStatusEnum.SUCCESS.equals(majorStatus) && DataStatusEnum.SUCCESS.equals(minorStatus)){
47	                    writeSuccessCounter.incrementAndGet();
48	                } else if(!DataStatusEnum.SUCCESS.equals(majorStatus) && !DataStatusEnum.SUCCESS.equals(minorStatus)){
49	                    writeFailCounter.incrementAndGet();
50	                } else {
51	                    writePartFailCounter.incrementAndGet();
52	                }
53	                
54	                
55	                TimeUnit.MILLISECONDS.sleep(10);
56	            } catch (InterruptedException e) {
57	                e.printStackTrace();
58	            }
59	        }
60	        countDownLatch.countDown();
61	    }
62	}

(2) 异常:major出现大量超时,major写入失败

1	2016-04-25 17:31:08,703 ERROR [main] crossroom.RedisClusterCrossRoomClientImplTest: java.net.SocketTimeoutException: Read timed out
2	redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out
3	at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:201)
4	at redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:40)
5	at redis.clients.jedis.Protocol.process(Protocol.java:142)
6	at redis.clients.jedis.Protocol.read(Protocol.java:206)
7	at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:282)
8	at redis.clients.jedis.Connection.getStatusCodeReply(Connection.java:208)
9	at redis.clients.jedis.BinaryJedis.debug(BinaryJedis.java:2974)
10	at com.sohu.tv.cachecloud.client.redis.crossroom.RedisClusterCrossRoomClientImplTest.tempSleepPipelineCluster(RedisClusterCrossRoomClientImplTest.java:184)
11	at com.sohu.tv.cachecloud.client.redis.crossroom.RedisClusterCrossRoomClientImplTest.testRandomWriteWithMajorSleep(RedisClusterCrossRoomClientImplTest.java:252)
12	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
13	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
14	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
15	at java.lang.reflect.Method.invoke(Method.java:606)
16	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
17	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
18	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
19	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
20	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
21	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
22	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
23	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
24	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
25	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
26	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
27	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
28	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
29	at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
30	at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
31	at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
32	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
33	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
34	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
35	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
36	Caused by: java.net.SocketTimeoutException: Read timed out
37	at java.net.SocketInputStream.socketRead0(Native Method)
38	at java.net.SocketInputStream.read(SocketInputStream.java:152)
39	at java.net.SocketInputStream.read(SocketInputStream.java:122)
40	at java.net.SocketInputStream.read(SocketInputStream.java:108)
41	at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:195)
42	... 32 more
43	2016-04-25 17:31:10,777 WARN [Thread-14] sources.URLConfigurationSource: No URLs will be polled as dynamic configuration sources.
44	2016-04-25 17:31:10,777 INFO [Thread-14] sources.URLConfigurationSource: To enable URLs as dynamic configuration sources, define System property archaius.configurationSource.additionalUrls or make config.properties available on classpath.
45	2016-04-25 17:31:10,779 INFO [Thread-14] config.DynamicPropertyFactory: DynamicPropertyFactory is initialized with configuration sources: com.netflix.config.ConcurrentCompositeConfiguration@224dc69c

(3) dashboard:

    

(4) 最终结果:

一共写了1000次,成功写入22次,部分成功写入了978次。

1	2016-04-25 17:31:12,399 INFO [main] crossroom.RedisClusterCrossRoomClientImplTest: total request write: 1000
2	2016-04-25 17:31:12,399 INFO [main] crossroom.RedisClusterCrossRoomClientImplTest: writeFail counter: 0
3	2016-04-25 17:31:12,399 INFO [main] crossroom.RedisClusterCrossRoomClientImplTest: writePartFail counter: 978
4	2016-04-25 17:31:12,399 INFO [main] crossroom.RedisClusterCrossRoomClientImplTest: writeSuccess counter: 22
5	期间打印了失败的日志:
6	2016-04-28 10:47:29,871 WARN [Thread-17] impl.RedisClusterCrossRoomClientImpl$1: major cross-room failed: set key user:10300 value 10300
7	...................................

七、问题和展望

  • 由于是定制化客户端,目前只支持部分Redis命令API,后续需要添加
  • 在出现故障时,读正常,但是双写会出现major和minor数据不一致的情况。(可以考虑利用MQ机制来解决这个问题),目前使用打印日志的形式记录。
  • …………