18 July 2019

[toc]

函数接口的意义

函数式接口 函数描述符 lamdba例子 reference
Predicate T->boolean [test] (List list)->list.isEmpty() https://blog.csdn.net/w605283073/article/details/89410918lamdba
Function<T,R> T->R [apply] (String s)->s.length() https://www.pianshen.com/article/2042114445/
Consumer T→void [void] (Apple a)->System.out.println(a.getWeight()) https://blog.csdn.net/qq_28410283/article/details/80618456
Supplier ()->T [get] ()→new Apple(10) https://blog.csdn.net/qq_28410283/article/details/80625482

lamdba表达式的格式

  • (params) -> expression

    () -> System.out.println("Hello Lambda Expressions");
    
  • (params) -> statement

    (int even, int odd) -> even + odd
    
  • (params) -> { statements } [ ()->{}替代匿名内部类 ]

    // Java 8之前:
    new Thread(new Runnable() {
    @Override
    public void run() {
    System.out.println("Before Java8, too much code for too little to do");
    }
    }).start();
      
    //Java 8方式:
    new Thread( () -> System.out.println("In Java8, Lambda expression rocks !!") ).start();
    

sort演进

//代码传递
public class AppleComparator implements Comparator<Apple> { 
    public int compare(Apple a1, Apple a2){ 
    	return a1.getWeight().compareTo(a2.getWeight()); 
    }
} 
inventory.sort(new AppleComparator());

//匿名类
inventory.sort(new Comparator<Apple>() {
    public int compare(Apple a1, Apple a2){        
    	return a1.getWeight().compareTo(a2.getWeight());    
    } 
});

//lamdba表达式
inventory.sort((Apple a1, Apple a2)-> a1.getWeight().compareTo(a2.getWeight()) ); 

//方法引用
//替代那些转发参数的lambda表达式的语法糖
inventory.sort(comparing(Apple::getWeight)); 

//复合比较器
inventory.sort(comparing(Apple::getWeight).reversed().thenComparing(Apple::getCountry)); 

//空和数字的比较
list.stream().sorted(Comparator.nullsLast(Integer::compareTo).thenComparing(Integer::compareTo)).forEach(System.out::println);
或者
list.sort(Comparator.nullsFirst(Integer::compareTo).thenComparing(Integer::compareTo));
list.forEach(System.out::println);

//自定义比较器
public static void main(String[] args) {
    List<Integer> list = Lists.newArrayList();
 	list.add(1);list.add(2);list.add(3);list.add(4);   
 	list.add(5);list.add(0);list.add(null);list.add(null);
 	
 	//i1 =2,i1 =1
 	list.sort((i1, i2) -> {
        if(i1 == null || i1 == 0) {
            return -1;
 	 } else if(i2 == null || i2 == 0) {
            return 1;
	 }
        return i2 - i1;
 	});
 	for (Integer i : list) {
        System.out.println(i);
 	}
}

//复合谓词(negate[非],and,or)===  a.or(b).and(c)===(a||b) && c
Predicate<Apple> redAndHeavyAppleOrGreen =     redApple.and(a -> a.getWeight() > 150) .or(a -> "green".equals(a.getColor()));

java8-Stream

filter,map,reduce(归约),find,match,sort,limit,skip(扔掉前几个元素),flatMap(Arrays::stream)(把流扁平化为单个流)

查找:allMatch、anyMatch、noneMatch、findFirst和findAny

  • flatMap

    //题目:给定列表[1, 2, 3]和列表[3, 4],应 该返回[(1, 3), (1, 4), (2, 3), (2, 4), (3, 3), (3, 4)]
      
    思路:你可以使用两个map来迭代这两个列表,并生成数对。但这样会返回一个Stream- <Stream<Integer[]>>。你需要让生成的流扁平化,以得到一个Stream<Integer[]>。
      
    List<Integer> numbers1 = Arrays.asList(1, 2, 3);
    List<Integer> numbers2 = Arrays.asList(3, 4);
      
    List<int[]> pairs =numbers1.stream().flatMap(i -> numbers2.stream().map(j -> new int[]{i, j}) ).collect(toList()); 
      
    //list嵌套
    List flatList=listList.stream().flatMap(Collection::stream).collect(Collectors.toList());
    
  • reduce

    reduce接受两个参数:一个初始值,这里是0;一个BinaryOperator<T>来将两个元素结合起来产生一个新值,这里我们用的是 lambda (a, b) -> a + b。 
    int sum = numbers.stream().reduce(0, Integer::sum);//和mapToInt 
    或者
    int sum=numbers.stream().mapToInt(Info::getNo).sum;
    int sum=numbers.stream.mapToInt(Integer::intValue).sum();
      
      
    //reduce还有一个重载的变体,不接受初始值,但是会返回一个Optional的对象
    Optional<Integer> sum = numbers.stream().reduce((a, b) -> (a + b));
      
    //算菜品个数
    int count = menu.stream().map(d -> 1) .reduce(0, (a, b) -> a + b); 
    或者long count = menu.stream().count();
      
    //给每笔订单增加12%的税费
    List costBeforeTax = Arrays.asList(100, 200, 300, 400, 500);
    double bill = costBeforeTax.stream().map((cost) -> cost + .12*cost).reduce((sum, cost) -> sum + cost).get();
    
  • 最大,最小,平均值,总和===mapToInt

    IntStream、LongStream 和 DoubleStream 等流的类中,有个非常有用的方法叫做 summaryStatistics() 。可以返回 IntSummaryStatistics、LongSummaryStatistics 或者 DoubleSummaryStatistics,描述流中元素的各种摘要数据。
      
    //获取数字的个数、最小值、最大值、总和以及平均值
    List<Integer> primes = Arrays.asList(2, 3, 5, 7, 11, 13, 17, 19, 23, 29);
    IntSummaryStatistics stats = primes.stream().mapToInt((x) -> x).summaryStatistics();
    System.out.println("Highest prime number in List : " + stats.getMax());
    System.out.println("Lowest prime number in List : " + stats.getMin());
    System.out.println("Sum of all prime numbers : " + stats.getSum());
    System.out.println("Average of all prime numbers : " + stats.getAverage());
      
    或者
    int sum = Arrays.stream(numbers).sum(); 
    Optional<Transaction> optional =list.stream().min(comparing(Transaction::getValue)); 
    
  • allMatch

    boolean isHealthy = menu.stream()   .allMatch(d -> d.getCalories() < 1000); 
    
  • 偶数流

    IntStream evenNumbers = IntStream.rangeClosed(1, 100) .filter(n -> n % 2 == 0); 
    
  • toMap

    Map<String, String> collect = userList.stream().collect(Collectors.toMap(User::getId, User::getName));
      
    //2和3是等价的
    Map<String, User> collect1 = userList.stream().collect(Collectors.toMap(User::getId, t -> t));
    Map<String, User> collect2 = userList.stream().collect(Collectors.toMap(User::getId, Function.identity()));
      
    //roleNames是list的string串,把他作为key,value是空的list
    Map<String, List<IamV2UserRoleDTO>> roleMap = roleNameStrings.stream().collect(toMap(Function.identity(), d -> new ArrayList()));
      
    //处理value,key不变
    Map<String, List<String>> collect1 = roles.entrySet().stream().collect(toMap(Map.Entry::getKey, e -> e.getValue().stream().map(IamV2UserRoleDTO::getRealName).collect(toList())));
      
    //第三个参数是key重复的时候如何处理
    Map<String, String> collect = userList.stream().collect(Collectors.toMap(User::getId, User::getName, (n1, n2) -> n1 + n2));
    Map<Long, User> maps = userList.stream().collect(Collectors.toMap(User::getId, Function.identity(), (key1, key2) -> key2));
      
    //第四个参数是我们希望返回的map类型,treemap可以根据key排序
    TreeMap<String, String> collect1 = userList.stream().collect(Collectors.toMap(User::getId, User::getName, (n1, n2) -> n1, TreeMap::new));
    
  • 合并两个map===4种方式

    Map<String, List<IamV2UserRoleDTO>> collect = Stream.of(roleMap, userMap)
    .flatMap(x -> x.entrySet().stream())
    .collect(toMap(Map.Entry::getKey,Map.Entry::getValue, (value1, value2) -> {
                            value1.addAll(value2);
                            return value1;
                        }));
                          
    refence:https://blog.csdn.net/tangyaya8/article/details/91399650
    
  • groupingby

    Map<Dish.Type, Map<CaloricLevel, List<Dish>>> dishesByTypeCaloricLevel = menu.stream().collect( groupingBy(Dish::getType, groupingBy(dish -> 
    { 
    	if (dish.getCalories() <= 400) return CaloricLevel.DIET; 
    	else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL; 
     	else return CaloricLevel.FAT; 
    } ) ) );
      
    Map<String, List<List<String>>> map2 = conditions.stream().collect(Collectors.groupingBy(Condition::getCondName, Collectors.mapping(Condition::getCondValue, Collectors.toList())));
      
    Map<Dish.Type, Long> typesCount = menu.stream().collect(groupingBy(Dish::getType, counting())); 
      
    Map<String, Apple> collect3 = list.stream().collect(Collectors.groupingBy(Apple::getName, Collectors.collectingAndThen(Collectors.maxBy(Comparator.comparingInt(Apple::getWeight)), Optional::get)));
      
    Map<String, Integer> collect4 = list.stream().collect(Collectors.groupingBy(Apple::getName, Collectors.summingInt(Apple::getWeight)));
    
  • 遍历map

    Iterator<Map.Entry<Integer, Integer>> it = map.entrySet().iterator(); while (it.hasNext()) { Map.Entry<Integer, Integer> entry = it.next(); System.out.println("key = " + entry.getKey() + ", value = " + entry.getValue()); }
      
    //或者
    map.forEach((key, value) -> { System.out.println(key + ":" + value); });
    
  • joining

    //每次都创建一个String对象,优化下(joining内部是stringbuilder)
    String shortMenu = menu.stream().map(Dish::getName).collect(joining()); 
    String collect2 = list.stream().map(Apple::getName).collect(Collectors.joining(","));
    
  • 文件生成一个流

    long uniqueWords = 0;
      
    try(Stream<String> lines = Files.lines(Paths.get("data.txt"), Charset.defaultCharset())){
    	uniqueWords = lines.flatMap(line -> Arrays.stream(line.split(" "))).distinct() .count();
    } catch(IOException e){ }
    
  • map转list

    List<CandidateInfo> list=map.entrySet().stream().map(entry->entry.getValue()).flatMap(List::stream).collect(Collectors.toList());
    

高级特性

  • Future

    ExecutorService executorService=Executors.newCachedThreadPool();
    Future<String> future = executorService.submit(new Callable<String>() {
        public String call() {
            return "";
     }
    });
    try {
        future.get(1, TimeUnit.MILLISECONDS);
    } catch (Exception e) {
        e.printStackTrace();
    }
    //不能表述多个future结果的依赖性
    
  • CompletableFuture

    CompletableFuture<String> completableFuture=new CompletableFuture();
    new Thread(()->{completableFuture.complete("");}).start();
    外边用Future对象接,获取值的时候future.get();
      
    CompletableFuture.supplyAsync(()->{return "";});
    

代码写法优化

  • 链路追踪traceId【龙湖core包里面有了】【请求在线程池分发,龙湖core没有支持,需要重新封装一下线程池,把traceId带进去】

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()), result);
    }
    public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) {
        return () -> {
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            setTraceIdIfAbsent();
            try {
                return callable.call();
            } finally {
                MDC.clear();
            }
        };
    }
    
  • 替换if,else【策略模板模式】===接口,抽象类,实现类

    public interface CandidateRegisterProcessor {
       
        /**
         * 责任链的检查顺序,小的在前面,default必须在最后
         */
        int a = 0;
        int b = 1;
        int c = Integer.MAX_VALUE;
       
        int getPriority();
       
        boolean match(Context context);
       
        Result register(Context context);
    }
       
    public abstract class CandidateAbstractRegisterProcessor implements CandidateRegisterProcessor {
        //可以做macth和register的公共逻辑
    }
       
       
    @Service
    public class CandidateMultipleRegisterProcessor extends CandidateAbstractRegisterProcessor {
       
        @Override
        public int getPriority() {
            return CandidateRegisterProcessor.REGISTER_PROCESS_PRIORITY_MULTIPLE;
        }
       
        @Override
        public boolean match(CandidateRegisterContext context) {
            return true;
        }
       
       
        @Override
        protected CandidateRegisterResult doRegister(CandidateRegisterContext context) {
            return null;
        }
       
       
    }
       
       
    public class CandidateRegisterHelper {
       
        private static void checkOrInitParam(CandidateRegisterContext context) {
            Preconditions.checkArgument(context.getOrder() != null, "order不能为空");
            Preconditions.checkArgument(context.getRequest() != null, "request不能为空");
        }
       
        private static void preHandleRegisterProcessors(List<CandidateRegisterProcessor> processors) {
            Safes.of(processors)
                    .sort(Comparator.comparingInt(CandidateRegisterProcessor::getPriority));
        }
       
        public static CandidateRegisterResult processRegister(CandidateRegisterContext context, List<CandidateRegisterProcessor> processors) {
            checkOrInitParam(context);
            preHandleRegisterProcessors(processors);
            return Safes.of(processors).stream()
                    .filter(processor -> processor.match(context))
                    .findFirst()
                    .orElseThrow(() -> new RuntimeException("[候补锁号]未找到匹配的Processor"))
                    .register(context);
        }
       
    }
       
       
    //业务调用
    @Component
    public class CandidateMatchOrderConsumer implements MessageListener {
        @Resource
        private List<CandidateRegisterProcessor> candidateRegisterProcessors;
       
        //mq接收数据,循环所有message===入参vo和接口列表
        CandidateRegisterResult result = CandidateRegisterHelper.processRegister(
            new CandidateRegisterContext(order, request),
            candidateRegisterProcessors);
    }
    
    @Override
        public boolean match(RocketMessageExt<Map<String, Object>> map) {
            // common deal
            return this.doMatch(map);
        }
       
       
        @Override
        public void deal(RocketMessageExt<Map<String, Object>> map) {
            // common deal
            this.doDeal(map);
        }
       
       
        protected abstract boolean doMatch(RocketMessageExt<Map<String, Object>> map);
       
        protected abstract void doDeal(RocketMessageExt<Map<String, Object>> map);
    
  • guaua cache(业务获取 + 定期刷新)

    从LoadingCache查询的正规方式是使用get(K)方法。这个方法要么返回已经缓存的值,要么使用CacheLoader向缓存原子地加载新值(通过load(String key) 方法加载)。

    简单使用:https://www.cnblogs.com/fnlingnzb-learner/p/11022152.html

    package com.longfor.plm.todo.util;
       
    import com.google.common.base.Splitter;
    import com.google.common.cache.CacheBuilder;
    import com.google.common.cache.CacheLoader;
    import com.google.common.cache.LoadingCache;
    import org.apache.commons.collections.CollectionUtils;
    import org.omg.PortableInterceptor.SYSTEM_EXCEPTION;
       
    import javax.annotation.PostConstruct;
    import java.util.List;
    import java.util.Optional;
    import java.util.Set;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
       
    import static java.util.concurrent.TimeUnit.MILLISECONDS;
       
    public class LoadCacheUtils {
       
        private static final int EXPIRE = 24 * 60 * 60 * 1000;
       
        private static final String SPLITTER = "ABC123abc";
       
        private static final ScheduledExecutorService singleExecutor = Executors
                .newSingleThreadScheduledExecutor();
       
        @PostConstruct
        public void init() {
            System.out.println("2");
            this.refreshCache();
            singleExecutor.scheduleAtFixedRate(this::refreshCache, 60, 60, TimeUnit.SECONDS);
        }
       
        public void refreshCache() {
            try {
                System.out.println("3");
                // 1
                Set<String> doctorKeys = doctorCache.asMap().keySet();
                if (CollectionUtils.isNotEmpty(doctorKeys)) {
                    for (String key : doctorKeys) {
                        List<String> split = Splitter.on(SPLITTER).splitToList(key);
                        Optional<Doctor> doctor = Optional
                                .ofNullable(db.queryByCode(split.get(0), split.get(1)));
                        doctorCache.put(key, doctor);
                    }
                }
            } catch (Exception e) {
            }
        }
       
        private LoadingCache<String, Optional<Doctor>> doctorCache = CacheBuilder.newBuilder()
                .expireAfterWrite(EXPIRE, MILLISECONDS).maximumSize(20000)
                .build(new CacheLoader<String, Optional<Doctor>>() {
                    @Override
                    public Optional<Doctor> load(String hos$doctorCode) throws Exception {
                        System.out.println("4");
                        List<String> split = Splitter.on(SPLITTER).splitToList(hos$doctorCode);
                        return Optional.ofNullable(
                                db.queryByCode(split.get(0), split.get(1)));
                    }
                });
       
        //业务方法,获取数据
        public Optional<Doctor> loadDoctor(String hosCode, String doctorCode) {
            return new Loader<Optional<Doctor>>() {
                @Override
                public Optional<Doctor> load0(String key) {
                    try {
                        return doctorCache.get(hosCode + SPLITTER + doctorCode);
                    } catch (ExecutionException e) {
                        return Optional.ofNullable(db.queryByCode(hosCode, doctorCode));
                    }
                }
            }.load(hosCode);
        }
       
        private static abstract class Loader<T> {
       
            public T load(String key) {
                long start = System.currentTimeMillis();
                try {
                    return load0(key);
                } finally {
                    //监控
                }
            }
       
            abstract T load0(String key);
        }
       
        class Doctor{}
       
    }
    
    //业务调用
    Optional<Doctor> docOpt = LoadCacheUtils.loadDoctor(hosCode, doctorId);
    if (docOpt == null || !docOpt.isPresent() || docOpt.get() == null) {
           
    }
    
  • 判断Preconditions

    Preconditions.checkArgument(StringUtils.isNotBlank(result.getCardNo()), "卡号不能为空!!");
    
  • 限流==单机(单位时间内的请求数或者并发数),分布式

    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    public @interface AccessLimit {
       
        /**
         * 默认每秒限制访问次数
         * @return
         */
        double defaultLimit() default 10;
    }
    
    package com.longfor.plm.todo.util;
       
    import com.google.common.base.Joiner;
    import com.google.common.collect.Lists;
    import com.google.common.collect.Maps;
    import com.google.common.util.concurrent.RateLimiter;
    import org.springframework.stereotype.Component;
    import org.springframework.web.method.HandlerMethod;
    import org.springframework.web.servlet.HandlerInterceptor;
    import org.springframework.web.servlet.ModelAndView;
       
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    import java.lang.reflect.Method;
    import java.util.List;
    import java.util.Map;
    import java.util.Optional;
       
    @Component
    public class LimitInterceptor implements HandlerInterceptor {
        //配置中心数据
        private  List<Object> configs = Lists.newArrayList();
        //业务请求量数据,configs转换来的
        private Map<String, RateLimiter> limiterMap = Maps.newConcurrentMap();
        private static Joiner UNDERLINE_JOINER = Joiner.on("_").skipNulls();
       
        /** 获取令牌失败resCode */
        private static final int RATE_LIMIT_ERROR_CODE = 999;
        /** 获取令牌失败msg */
        private static final String RATE_LIMIT_ERROR_MSG = "请求过于频繁,请稍后再试";
       
        @Override
        public boolean preHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object _handler) throws Exception {
            try {
                if (!(_handler instanceof HandlerMethod)) {
                    return true;
                }
                HandlerMethod handler = (HandlerMethod) _handler;
                Method method = handler.getMethod();
                boolean accessLimit = method.isAnnotationPresent(AccessLimit.class);
                if (!accessLimit) {
                    return true;
                }
                return doRateLimit(httpServletRequest, httpServletResponse, method);
            } catch (Throwable e) {
                return true;
            }
        }
       
        @Override
        public void postHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception {
       
        }
       
        @Override
        public void afterCompletion(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception {
       
        }
       
       
        private boolean doRateLimit(HttpServletRequest request, HttpServletResponse response, Method method) {
            String uri = request.getRequestURI();
            String key = genKey(uri, "请求限制维度");
            RateLimiter rateLimiter = limiterMap.get(key);
            if (rateLimiter == null) {
                // build new RateLimiter
                double limit = calcLimit(uri, method);
                if (limit < 0) {
                    limit = 0.1;
                }
                rateLimiter = RateLimiter.create(limit);
                limiterMap.putIfAbsent(key, rateLimiter);
            }
       
            boolean acquire = rateLimiter.tryAcquire();
            if (!acquire) {
                WebUtil.responseJson(APIResponse.error(RATE_LIMIT_ERROR_CODE, RATE_LIMIT_ERROR_MSG), response);
                return false;
            }
            return true;
        }
       
        private String genKey(String uri, String hosCode) {
            return UNDERLINE_JOINER.join(uri, hosCode);
        }
       
       
        private double calcLimit(String uri, Method method) {
            Optional<Object> interfaceOpt = Safes.of(configs).stream().filter(config -> 1==1).findAny();
            AccessLimit accessLimit = method.getAnnotation(AccessLimit.class);
            double defaultLimit = accessLimit.defaultLimit();
            if (!interfaceOpt.isPresent()) {
                return defaultLimit;
            }
            Object hospitalOfflineInterface = interfaceOpt.get();
       
            return xx.get().getLimit();
        }
       
    }
    
    public static void responseJson(object value,ServletResponse response){
    	response.setContentType("application/json;charset=UTF-8");
    	try{
    		JsonUtils.instance.writeValue(response.getWriter,value);
    	}catch(Exception e){
    		throw UnsafeUtils.throwException(e);
    	}
    }
    
  • 降级

    • 业务返回数据降级
    • hystrix降级
    • nginx降级(兜底数据)
  • adaptor抽象赋值代码块

  • Safes集合封装类

    import com.google.common.base.Preconditions;
    import com.google.common.collect.Lists;
    import org.apache.commons.lang3.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
       
    import java.math.BigDecimal;
    import java.util.*;
    import java.util.function.Consumer;
       
    /**
     * 避免一些异常情况的工具类
     *
     */
    public class Safes {
       
        final private static Logger logger = LoggerFactory.getLogger(Safes.class);
       
        public static <K, V> Map<K, V> of(Map<K, V> source) {
            return java.util.Optional.ofNullable(source).orElse(new HashMap<K, V>());
        }
       
        public static <T> Iterator<T> of(Iterator<T> source) {
            return java.util.Optional.ofNullable(source).orElse(new ArrayList<T>().iterator());
        }
       
        public static <T> Collection<T> of(Collection<T> source) {
            return Optional.ofNullable(source).orElse(Lists.newArrayList());
        }
       
        public static <T> Iterable<T> of(Iterable<T> source) {
            return java.util.Optional.ofNullable(source).orElse(new ArrayList<T>());
        }
       
        public static <T> List<T> of(List<T> source) {
            return java.util.Optional.ofNullable(source).orElse(new ArrayList<T>());
        }
       
        public static <T> Set<T> of(Set<T> source) {
            return java.util.Optional.ofNullable(source).orElse(new HashSet<T>());
        }
       
        public static BigDecimal of(BigDecimal source) {
            return java.util.Optional.ofNullable(source).orElse(BigDecimal.ZERO);
        }
       
        public static BigDecimal toBigDecimal(String source, BigDecimal defaultValue) {
            Preconditions.checkNotNull(defaultValue);
            try {
                return new BigDecimal(StringUtils.trimToEmpty(source));
            } catch (Throwable t) {
                logger.warn("未能识别的boolean类型, source:{}", source, t);
                return defaultValue;
            }
        }
       
        public static int toInt(String source, int defaultValue) {
            if (StringUtils.isBlank(source)) {
                return defaultValue;
            }
            try {
                return Integer.parseInt(StringUtils.trimToEmpty(source));
            } catch (Throwable t) {
                logger.warn("未能识别的整形 {}", source);
                return defaultValue;
            }
        }
       
        public static long toLong(String source, long defaultValue) {
            if (StringUtils.isBlank(source)) {
                return defaultValue;
            }
            try {
                return Long.parseLong(StringUtils.trimToEmpty(source));
            } catch (Throwable t) {
                logger.warn("未能识别的长整形 {}", source);
                return defaultValue;
            }
        }
       
        public static boolean toBoolean(String source, boolean defaultValue) {
            if (StringUtils.isBlank(source)) {
                return defaultValue;
            }
            try {
                return Boolean.parseBoolean(StringUtils.trimToEmpty(source));
            } catch (Throwable t) {
                logger.warn("未能识别的boolean类型, source:{}", source, t);
                return defaultValue;
            }
        }
       
        public static void run(Runnable runnable, Consumer<Throwable> error) {
            try {
                runnable.run();
            } catch (Throwable t) {
                error.accept(t);
            }
        }
       
    }
    
  • 有没有必要封装一层http?==监控,mock不同环境的数据(代码不全,仅供参考其实现思想)

    现在项目里面是基于boot的,不知道有没有一些监控或者转发之类的功能,以后测试环境机器多了,可以通过封装的http设置自己想要访问的mock数据源,同时用异步请求,并且用切面记录http的一些数据(时间啊,参数啊之类的)

    public ListenableFuture<String> post(HBaseQuery baseQuery,
                                         HosInterface hosInterface,
                                         Object param, Date timeStamp) {
        // 0. build a promise
        String hosCode = baseQuery.getHosCode();
        SettableFuture<String> result = SettableFuture.create();
        String url = hosInterface.dynamicRouter(hospitalService.load(hosCode));
        long enqueueStart = System.currentTimeMillis();
       
        // 1. build a processor
        Consumer<Runnable> processor = this.buildProcessor(hosCode,
                enqueueStart, baseQuery, param, timeStamp, url, hosInterface, result);
       
        // 2. acquire limiter in different queue
        limiters.get(hosInterface.limiterGroup)
                .acquire(System.currentTimeMillis(), calExpire(hosCode, hosInterface),
                        hosCode, hosInterface, true,
                        exceptionWorker(hosInterface.limiterGroup, hosCode, result), processor);
       
        return result;
    }
    
    private Consumer<Runnable> buildProcessor(String hosCode, long enqueueStart,
                                              HBaseQuery baseQuery, Object param,
                                              Date timeStamp, String url,
                                              HosInterface hosInterface, SettableFuture<String> result) {
        return release -> {
            Metrics.timer("hospitalAdaptor_enqueue_timer").tag("hos", hosCode).get()
                    .update(System.currentTimeMillis() - enqueueStart, TimeUnit.MILLISECONDS);
       
            // 2. build request
            String postData = format(baseQuery, param, timeStamp);
            HttpRequest request = RequestBuilder.createPost(url)
                    .setBody(postData).build();
       
            long start = System.currentTimeMillis();
            // 3. request async
            switchClient(hosInterface).asyncRequest(request, new AsyncHandlerBase() {
                @Override
                public void onRequestCompleted(AsyncHttpResponse asyncResponse) {
       
                    release.run();
                    String bodyAsString = asyncResponse.getBodyAsString();
                    long cost = System.currentTimeMillis() - start;
                    logger.info("schema: [{}-{}], url:[{}], request:[{}], response:[{}], cost:[{} ms]",
                            hosInterface, hosCode, url, compress(postData), compress(bodyAsString), cost);
                    if (InterfaceLimiterGroup.LIS == hosInterface.limiterGroup) {
                        Metrics.timer("postRequestToLis_succ_count")
                                .tag("hos", hosCode)
                                .tag("mode", hosInterface.mode.name())
                                .tag("interface", hosInterface.getMonitorTagName()).get()
                                .update(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS);
                    } else {
                        Metrics.timer("postRequestToHis_succ_count")
                                .tag("hos", hosCode)
                                .tag("mode", hosInterface.mode.name())
                                .tag("interface", hosInterface.getMonitorTagName()).get()
                                .update(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS);
                    }
                    // 处理非正常返回的请求
                    if (asyncResponse.getStatusLine().getStatusCode() != 200) {
                        StatusLine status = asyncResponse.getStatusLine();
                        if (needMonitor(baseQuery, hosInterface)) {
                            if (InterfaceLimiterGroup.LIS == hosInterface.limiterGroup) {
                                Metrics.timer("postRequestToLis_error_count").tag("hos", hosCode).tag("mode",
                                        hosInterface.mode.name())
                                        .tag("interface", hosInterface.getMonitorTagName()).get()
                                        .update(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS);
                            } else {
                                Metrics.timer("postRequestToHis_error_count").tag("hos", hosCode).tag("mode",
                                        hosInterface.mode.name())
                                        .tag("interface", hosInterface.getMonitorTagName()).get()
                                        .update(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS);
                            }
                        }
                        result.setException(new IllegalStateException(
                                MessageFormat.format("{0}, 对方接口响应码错误, 错误码:{1}, 响应信息:{2}",
                                        hosCode, status.getStatusCode(), status.getReasonPhrase())));
                        return;
                    }
       
                    // 监控返回码
                    monitorResponseCode(bodyAsString, hosCode, hosInterface);
       
                    result.set(bodyAsString);
                }
       
       
                @Override
                public void onThrowable(Throwable throwable) {
                    release.run(); // 释放信号量
                    logger.warn("schema: [{}-{}], url:[{}], request:[{}], response: error",
                            hosInterface, hosCode, url, compress(postData), throwable);
                    if (InterfaceLimiterGroup.LIS == hosInterface.limiterGroup) {
                        Metrics.timer("postRequestToLis_succ_count")
                                .tag("hos", hosCode)
                                .tag("mode", hosInterface.mode.name())
                                .tag("interface", hosInterface.getMonitorTagName()).get()
                                .update(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS);
                    } else {
                        Metrics.timer("postRequestToHis_succ_count")
                                .tag("hos", hosCode)
                                .tag("mode", hosInterface.mode.name())
                                .tag("interface", hosInterface.getMonitorTagName()).get()
                                .update(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS);
                    }
       
                    if (needMonitor(baseQuery, hosInterface)) {
                        if (InterfaceLimiterGroup.LIS == hosInterface.limiterGroup) {
                            Metrics.timer("postRequestToLis_error_count").tag("hos", hosCode).tag("mode",
                                    hosInterface.mode.name())
                                    .tag("interface", hosInterface.getMonitorTagName()).get()
                                    .update(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS);
                        } else {
                            Metrics.timer("postRequestToHis_error_count").tag("hos", hosCode).tag("mode",
                                    hosInterface.mode.name())
                                    .tag("interface", hosInterface.getMonitorTagName()).get()
                                    .update(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS);
                        }
                    }
       
                    result.setException(new IllegalStateException(throwable));
                }
            });
        };
    }
    
    public void acquire(long startTime, long expire, String hosCode, HosInterface hosInterface, boolean acquire,
                        Consumer<Throwable> reject, Consumer<Runnable> acquired) {
        if (!acquire) {//不需要限流
            acquired.accept(() -> {});
            return;
        }
       
        if (checkAboard(hosCode, hosInterface)) {
            reject.accept(new RejectedExecutionException(
                    MessageFormat.format("当前医院[{0}]接口[{1}]访问时队列已满", hosCode, hosInterface.name())));
            return;
        }
        // check expire
        // 请求等待超时一般发生在业务高峰期内
        long curTime = System.currentTimeMillis();
        if (curTime > expire) {
            // 等待时间格式,最多十来秒,精确到毫秒
            String waitTimeFormat = "ss.SSS";
            try {
                reject.accept(new RejectedExecutionException(
                        MessageFormat.format("当前医院[{0}]接口[{1}]等待超时, 累计等待时间:[{2}]",
                                hosCode, hosInterface.name(), DateFormatUtil.formatFree(new Date(curTime - startTime), waitTimeFormat))));
            } catch (Exception e) {
                logger.info("unexpected reject error", e);
                reject.accept(new RejectedExecutionException(
                        MessageFormat.format("当前医院[{0}]访问QPS过高, 等待超时, 该请求麻烦你拿回去重试", hosCode)));
            }
            return;
        }
       
        if(managedHttpService.isHosInterLimitOpen()) {
            Semaphore semaphore = this.tryAcquire(hosCode, hosInterface);
            if (semaphore != null) {
                Metrics.counter("his_concurrent_request").tag("hosCode", hosCode).get().inc();
                acquired.accept(() -> {
                    semaphore.release();
                    Metrics.counter("his_concurrent_request").tag("hosCode", hosCode).get().dec();
                });
                return;
            }
        } else {
            boolean success = this.getOrInitLimiter(hosCode).tryAcquire();
            if (success) {
                Metrics.counter("his_concurrent_request").tag("hosCode", hosCode).get().inc();
                acquired.accept(() -> {
                    this.limiters.get(hosCode).release();
                    Metrics.counter("his_concurrent_request").tag("hosCode", hosCode).get().dec();
                });
                return;
            }
        }
       
        AtomicLong counter = this.getOrInitCounter(hosCode, hosInterface);
        counter.incrementAndGet();
        // single sentry schedule thread
        XTimer.setTimeout(() -> {
            // 特别注意!!!! incr 和 decr之间不能有任何代码!!!
            // 如果修改了计数器的配置,则两次调用getOrInitCounter获取到的实例可能会不一样,所以这里需要维持counter的引用
            counter.decrementAndGet();
            acquire(startTime, expire, hosCode, hosInterface, true, reject, acquired);
        }, ACQUIRE_INTERVAL, TimeUnit.MILLISECONDS, MoreExecutors.directExecutor());
    }
    
  • 统一返回值

    ResponseBodyAdvice

  • 异步completableFuture(for循环网络请求)

    当然最好的方式就是支持批量查询,findUserData已经支持了,但是没有进行批量查询,而本话题其实主要想引用异步,所以先暂时只关注cpmpletableFuture

    首先 iamService.findUserData可以批量查询,我们for循环查询浪费了网络资源,如果for循环的话,切换成异步好一些

    orgCodes.stream().forEach(code -> {
        Map<String, List> users = new HashMap<>();
        roleInfos.stream().forEach(roleInfo -> {
            long starta = System.currentTimeMillis();
            BaseResponse userDataResponse = iamService.findUserData(code, Arrays.asList(roleInfo.getId()));
            log.info("单个项目人员查询耗时,{}",System.currentTimeMillis()-starta);
            if (userDataResponse.getCode().equals(ProjectConstant.REQUEST_SUCCESS_CODE) && userDataResponse.getData() != null) {
                List<IamV2UserRoleDTO> data = (List<IamV2UserRoleDTO>) userDataResponse.getData();
                if (users.containsKey(roleInfo.getRoleName())) {
                    users.get(roleInfo.getRoleName()).addAll(data);
                } else {
                    users.put(roleInfo.getRoleName(), data);
                }
            }
        });
        result.put(code, users);
    });
    

    改造成

    为什么不是list直接groupingby当userMap使用,因为构造users的对象是嵌套在roleInfos里面,需要两个map合并,否则数据不对

    orgCodes.stream().forEach(code -> {
                Map<String, List> users = new HashMap<>();
                //for http 换成completableFutures
                List<IamV2UserRoleDTO> list=new ArrayList<>();
                List<CompletableFuture<List<IamV2UserRoleDTO>>> completableFutureList=deal(code,roleInfos);
                if(CollectionUtils.isNotEmpty(completableFutureList)){
                    completableFutureList.stream().forEach(ee->{
                        try {
                            List<IamV2UserRoleDTO> ll=ee.get(5, TimeUnit.SECONDS);
                            list.addAll(ll);
                        } catch (Exception eeee) {
                            eeee.printStackTrace();
                        }
                    });
                }
                Map<String, List<IamV2UserRoleDTO>> userMap = list.stream().collect(groupingBy(IamV2UserRoleDTO::getRoleName));
       
                Map<String, List<IamV2UserRoleDTO>> roleMap = roleInfos.stream().collect(toMap(RoleInfo::getRoleName, d -> new ArrayList()));
       
                Map<String, List<IamV2UserRoleDTO>> collect = Stream.of(roleMap, userMap).flatMap(x -> x.entrySet().stream()).collect(toMap(Map.Entry::getKey,Map.Entry::getValue,
                        (value1, value2) -> {
                            value1.addAll(value2);
                            return value1;
                        }));
       
    //            roleMap.forEach((key,value)->userMap.merge(key,value,(v1,v2)->{v1.addAll(v2);return v1;}));
                   
                result.put(code, userMap);
            });
       
    private List<CompletableFuture<List<IamV2UserRoleDTO>>> deal(String code,List<RoleInfo> roleInfos) {
        List<CompletableFuture<List<IamV2UserRoleDTO>>> completableFutureList=new ArrayList<>();
        roleInfos.stream().filter(Objects::nonNull).forEach(ee->{
            completableFutureList.add(getDataFuture(code,ee.getId()));
        });
        return completableFutureList;
    }
       
    private CompletableFuture<List<IamV2UserRoleDTO>> getDataFuture(String code,String id) {
        return CompletableFuture.supplyAsync(()->getData(code,id),
                new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors()*2,Runtime.getRuntime().availableProcessors()*2+1,3,TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10),r->new Thread(r,"123")));
    }
       
    private List<IamV2UserRoleDTO> getData(String code,String id) {
        BaseResponse userDataResponse = iamService.findUserData(code, Arrays.asList(id));
        return (List<IamV2UserRoleDTO>)userDataResponse.getData();
    }
    

    如果是单个http请求,可以这么处理

    public static void supplyAsync() throws Exception {        
        CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
            try {
                //业务
            } catch (InterruptedException e) {
            }
             
            return System.currentTimeMillis();
        });
       
        long time = future.get();
    }
    
  • 没有返回值的方法扔到线程池里面去执行

    submit返回future,execute返回void

    ExecutorService executorService=new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors()*2,Runtime.getRuntime().availableProcessors()*2+1,3,TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10),r->new Thread(r,"123"));
       
    Safes.of(list).forEach(xxx ->
            executorService.submit(() -> crawl0(xxx)));
    
  • 并行====parallel或者countDownLatch(for循环本地数据)

    CountDownLatch countDownLatch = new CountDownLatch(files.length);
    ExecutorService tp = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, Runtime.getRuntime().availableProcessors() * 2 + 1, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), r -> new Thread(r, "123"));
    List<Map<String, Goods>> goodsList = new CopyOnWriteArrayList<Map<String, Goods>>();
       
    Lists.newArrayList(files).stream().forEach(file -> tp.execute(() -> {
        Map<String, Goods> expensive5Goods = getExpensive5Goods(file);
        goodsList.add(expensive5Goods);
        countDownLatch.countDown();
    }));
    countDownLatch.await();
    tp.shutdown();
       
    或者
    List<Map<String, Goods>> goodsList = Arrays.stream(files).parallel().map(RRFiles::getExpensive5Goods).collect(Collectors.toList());
    
  • sql拆分,表结构字段冗余设置

    1.梳理接口进行优化,去掉left join,只用简单sql

    2.数据库设计,部分字段冗余(迁移部分表结构字段)

    3.双写数据,重构数据库表结构的依赖关系? reference:https://quericy.me/blog/867/

    4.数据库可以把一些非索引的字段存到一个json对象作为一个字段扔到数据库里面,防止频繁的ddl字段变更

    //业务调用,或者traceinfo的值
    TraceInfo.Key.multipleParams.get(context.getOrder().getTraceInfo())
    //业务调用,设置traceinfo的值
    TraceInfo.Key.multipleParams.put(candidateOrder.getMainOrder().getTraceInfo(), TraceInfo.Key.multipleParams.name());
    
    package com.longfor.plm.project.util;
       
    import com.google.common.collect.Maps;
    import lombok.Data;
       
    import java.io.Serializable;
    import java.util.Map;
    import java.util.Optional;
    @Data
    public class TraceInfo implements Serializable {
       
        /**
         * 机器编码
         */
        private String machineCode;
        /**
         * 微信下单的openId
         */
        private String openId;
       
        /**
         * app下单的cid
         */
        private String cid;
       
        /**
         * 扩展信息字段
         */
        private Map<String, String> extra;
       
        /**
         * 扩展字段 key
         */
        public enum Key implements TracePut {
            contactIDCardType("联系人证件类型"),
            contactIDCardNo("联系人证件号码"),
            addressInBeijing("在京住址")
            ;
       
            @Override
            public String get(TraceInfo traceInfo) {
                if (traceInfo == null || traceInfo.getExtra() == null) {
                    return null;
                }
                return traceInfo.getExtra().get(this.name());
            }
       
            @Override
            public void put(TraceInfo traceInfo, String value) {
                Map<String, String> extra = Optional.ofNullable(traceInfo.getExtra()).orElse(Maps.newHashMap());
                extra.put(this.name(), value);
                traceInfo.setExtra(extra);
            }
       
            private String desc;
       
            Key(String des) {
            }
        }
       
       
    }
       
    interface TracePut {
        String get(TraceInfo traceInfo);
       
        void put(TraceInfo traceInfo, String value);
    }
    
  • mybtis分页插件ipagehelper(已经存在代码中),mybatis生成代码插件gengerator

    pagehelper取代page信息在代码里面赋值

    generator自动生成dao,mapper,dto(https://blog.csdn.net/qq_43583597/article/details/89294963),因为通用mapper大家有可能不是很熟悉

    <?xml version="1.0" encoding="UTF-8"?>
    <!DOCTYPE generatorConfiguration
            PUBLIC "-//mybatis.org//DTD MyBatis Generator Configuration 1.0//EN"
            "http://mybatis.org/dtd/mybatis-generator-config_1_0.dtd">
    <generatorConfiguration>
        <!-- 数据库驱动:选择你的本地硬盘上面的数据库驱动包-->
        <classPathEntry  location="D:\lhproject\mysql-connector-java-5.1.45-bin.jar"/>
        <context id="DB2Tables"  targetRuntime="MyBatis3">
            <commentGenerator>
                <property name="suppressDate" value="true"/>
                <!-- 是否去除自动生成的注释 true:是 : false:否 -->
                <property name="suppressAllComments" value="true"/>
            </commentGenerator>
            <!--数据库链接URL,用户名、密码 -->
            <jdbcConnection driverClass="com.mysql.jdbc.Driver" connectionURL="jdbc:mysql://10.231.129.164:3306/plm-project-uat" userId="plmuser" password="vIB1i@Rau7MMDm!M">
            </jdbcConnection>
            <javaTypeResolver>
                <property name="forceBigDecimals" value="false"/>
            </javaTypeResolver>
            <!-- 生成模型的包名和位置-->
            <javaModelGenerator targetPackage="com.longfor.plm.project.testGenerator.entity" targetProject="src/main/java">
                <property name="enableSubPackages" value="true"/>
                <property name="trimStrings" value="true"/>
            </javaModelGenerator>
            <!-- 生成映射文件的包名和位置-->
            <sqlMapGenerator targetPackage="main.resources.test.mapping" targetProject="src">
                <!-- enableSubPackages:是否让schema作为包的后缀 -->
                <property name="enableSubPackages" value="false" />
            </sqlMapGenerator>
            <!-- 生成DAO的包名和位置-->
            <javaClientGenerator type="XMLMAPPER" targetPackage="com.longfor.plm.project.testGenerator.dao" targetProject="src/main/java">
                <property name="enableSubPackages" value="true"/>
            </javaClientGenerator>
            <!-- 要生成的表 tableName是数据库中的表名或视图名 domainObjectName是实体类名-->
            <table tableName="plan_info"
                   domainObjectName="PlanInfo"
                   enableCountByExample="false"
                   enableUpdateByExample="false"
                   enableDeleteByExample="false"
                   enableSelectByExample="false"
                   selectByExampleQueryId="false">
       
            </table>
       
        </context>
    </generatorConfiguration>
    


blog comments powered by Disqus