资讯详情

Ribbon源码剖析

1. @LoadBalanced注解详解

1.1 简介

<dependency>     <groupId>org.springframework.cloud</groupId>     <artifactId>spring-cloud-starter-ribbon</artifactId> </dependency> 

**@LoadBalanced 将一个RestTemplate底层采用标志LoadBalancerClient实际执行http操作,支持负载平衡,找到相应项目下的包**

1.2 LoadBalancerAutoConfiguration

1.2.1 restTemplates

@LoadBalanced @Autowired(required = false) private List<RestTemplate> restTemplates = Collections.emptyList(); 

1.2.2 loadBalancedRestTemplateInitializer

自定义拦截器LoadBalancerInterceptor

 @Bean public SmartInitializingSingleton loadBalancedRestTemplateInitializer(       final List<RestTemplateCustomizer> customizers) {    return new SmartInitializingSingleton() {       @Override       public void afterSingletonsInstantiated() {          for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {             for (RestTemplateCustomizer customizer : customizers) {                customizer.customize(restTemplate);             }          }       }    }; } 

     @Configuration  @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")  static class LoadBalancerInterceptorConfig {      @Bean   public LoadBalancerInterceptor ribbonInterceptor(     LoadBalancerClient loadBalancerClient,     LoadBalancerRequestFactory requestFactory) {    return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);   }    @Bean   @ConditionalOnMissingBean   public RestTemplateCustomizer restTemplateCustomizer(     final LoadBalancerInterceptor loadBalancerInterceptor) {    return new RestTemplateCustomizer() {     @Override     public void customize(RestTemplate restTemplate) {      List<ClientHttpRequestInterceptor> list = new ArrayList<>(        restTemplate.getInterceptors());      list.add(loadBalancerInterceptor);      restTemplate.setInterceptors(list);     }    };   }  }  

1.2.3 ClientHttpRequestInterceptor

2.RibbonLoadBlancerClient 剖析

2.1 RibbonLoadBalancerClient 初始化

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tA6yvNMc-1657457021110)(C:\Users\崔小畅\AppData\Roaming\Typora\typora-user-images\image-20220709155938771.png)]

2.2 execute()具体执行请求的方法

2.2.1 getLoadBalancer

protected ILoadBalancer getLoadBalancer(String serviceId) { 
        
   return this.clientFactory.getLoadBalancer(serviceId);
}

@Override
public <C> C getInstance(String name, Class<C> type) {
   C instance = super.getInstance(name, type);
   if (instance != null) {
      return instance;
   }
   IClientConfig config = getInstance(name, IClientConfig.class);
   return instantiateWithConfig(getContext(name), type, config);
}
private Map<String, AnnotationConfigApplicationContext> contexts = new ConcurrentHashMap<>();

protected AnnotationConfigApplicationContext getContext(String name) {
   if (!this.contexts.containsKey(name)) {
      synchronized (this.contexts) {
         if (!this.contexts.containsKey(name)) {
            this.contexts.put(name, createContext(name));
         }
      }
   }
   return this.contexts.get(name);
}

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cFR6uN1w-1657457021112)(C:\Users\崔小畅\AppData\Roaming\Typora\typora-user-images\image-20220709165131195.png)]

2.2.1.1 ZoneAwareLoadBalancer

  • 看看ZoneAwareLoadBalancer

    什么都没有,直接调用父类DynamicServerListLoadBalancer

  • 看看DynamicServerListLoadBalancer

  • super执行一个initWithConfig()

public BaseLoadBalancer(IClientConfig config, IRule rule, IPing ping) { 
        
    initWithConfig(config, rule, ping);
}
void setupPingTask() {
    if (canSkipPing()) {
        return;
    }
    if (lbTimer != null) {
        lbTimer.cancel();
    }
    lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
            true);
    lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
    forceQuickPing();
}
  • restOfInit

    • enableAndInitLearnNewServersFeature():定时更新

      public void enableAndInitLearnNewServersFeature() {
          LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
          serverListUpdater.start(updateAction);
      }
      
      public synchronized void start(final UpdateAction updateAction) {
          if (isActive.compareAndSet(false, true)) {
              final Runnable wrapperRunnable = new Runnable() {
                  @Override
                  public void run() {
                      if (!isActive.get()) {
                          if (scheduledFuture != null) {
                              scheduledFuture.cancel(true);
                          }
                          return;
                      }
                      try {
                          updateAction.doUpdate();
                          lastUpdated = System.currentTimeMillis();
                      } catch (Exception e) {
                          logger.warn("Failed one update cycle", e);
                      }
                  }
              };
              //  private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;
              //  private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;
      		// 默认的是1秒钟过后,会第一次执行那个Runnable线程,
      		//以后是每隔30秒执行一下那个Runnable线程,就去从eureka client刷新注册表到自己的ribbon的LoadBalancer中来。
              scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                      wrapperRunnable,
                      initialDelayMs,
                      refreshIntervalMs,
                      TimeUnit.MILLISECONDS
              );
          } else {
              logger.info("Already active, no-op");
          }
      }
      
      
       public void doUpdate() {
                DynamicServerListLoadBalancer.this.updateListOfServers();
          }
       
      
      public void updateListOfServers() {
          List<T> servers = new ArrayList();
          if (this.serverListImpl != null) {
              servers = this.serverListImpl.getUpdatedListOfServers();
              LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers);
              if (this.filter != null) {
                  servers = this.filter.getFilteredListOfServers((List)servers);
                  LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers);
              }
          }
      
          this.updateAllServerList((List)servers);
      }
      
    • updateListOfServers()更新server list

       public void updateListOfServers() {
            List<T> servers = new ArrayList<T>();
            if (serverListImpl != null) {
                servers = serverListImpl.getUpdatedListOfServers();
                LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
    
                if (filter != null) {
                    servers = filter.getFilteredListOfServers(servers);
                    LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                            getIdentifier(), servers);
                }
            }
            updateAllServerList(servers);
        }
    
protected void updateAllServerList(List<T> ls) {
    // other threads might be doing this - in which case, we pass
    if (serverListUpdateInProgress.compareAndSet(false, true)) {
        try {
            for (T s : ls) {
                s.setAlive(true); // set so that clients can start using these
                                  // servers right away instead
                                  // of having to wait out the ping cycle.
            }
            setServersList(ls);
            super.forceQuickPing();
        } finally {
            serverListUpdateInProgress.set(false);
        }
    }
}
public void setServersList(List lsrv) {
    super.setServersList(lsrv);
    List<T> serverList = (List<T>) lsrv;
    Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>();
    for (Server server : serverList) {
        // make sure ServerStats is created to avoid creating them on hot
        // path
        getLoadBalancerStats().getSingleServerStat(server);
        String zone = server.getZone();
        if (zone != null) {
            zone = zone.toLowerCase();
            List<Server> servers = serversInZones.get(zone);
            if (servers == null) {
                servers = new ArrayList<Server>();
                serversInZones.put(zone, servers);
            }
            servers.add(server);
        }
    }
    setServerListForZones(serversInZones);
}
2.2.1.1.1 ServerList

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oCmWVDU7-1657457021115)(C:\Users\崔小畅\AppData\Roaming\Typora\typora-user-images\image-20220709172536439.png)]

2.2.1.1.2 IRule

spring-cloud-netflix-core工程下的org.springframework.cloud.netflix.ribbon包下的RibbonClientConfiguration类中

ZoneAvoidanceRule的父类PredicateBasedRule提供啦choose() round robin轮询算法

2.2.1.1.3 IPing

2.2.1.1.4 ServerListUpdater

2.2.2 getServer: 通过负载均衡算法(round robin轮询算法),选择一个server出来

getLoadBalancerStats().getAvailableZones().size():多机房的概念

直接就是用的IRule来选择了一台服务器

IRule是哪来的勒?看2.2.1.2

轮询算法

modulo:当前有几天机器

nextIndex:AtomicInteger nextIndex = new AtomicInteger() 刚开始默认是0

比如说有三台机器

192.168.1.107:8080

192.168.1.108:8080

192.168.1.109:8080

第一次发请求过来:current = nextIndex=0 ,next = current =0+1=1%3 =1 ,cas把current 设置成1 ,return 0

第二次发请求过来:current = nextIndex=1 ,next = current =1+1=2%3 =2,cas把current 设置成2,return 1

第三次发请求过来:current = nextIndex=2,next = current =2+1=3%3 =0 ,cas把current 设置成0,return 2

第三次发请求过来:current = nextIndex=0,next = current =0+1=1%3 =1 ,cas把current 设置成1,return 0

private int incrementAndGetModulo(int modulo) {
    for (;;) {
        int current = nextIndex.get();
        int next = (current + 1) % modulo;
        if (nextIndex.compareAndSet(current, next) && current < modulo)
            return current;
    }
}

2.3 最后执行请求的方法

  • RibbonLoadBlancerClient剖析 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jUlADWoM-1657457021124)(CEE153E456F843F8816375A148E55C8F)]

    • execute具体执行请求的方法

    • getLoadBalancer()方法获取ILoadBalancer 负载均衡器接口

      • 通过参数serviceId也就是服务名,到这个里面 this.clientFactory.getLoadBalancer(serviceId) 这个clientFactory就是SpringClientFactory意思就是说你要调用的每个服务对应得服务名称都有一个对应spring的applicationContext容器,比如 ServiceA服务对应的自己ApplicationContext容器获取自己的ILoadBalancer,里面包含了自己这个服务的独立的一堆组件,比如说LoadBalancer,如果要获取一个服务对应的LoadBalancer,其实就是在自己的那个ApplicationContext里面去获取那个LoadBalancer即可。根据ILoadBalancer接口类型,获取一个ILoadBalancer接口类型的实例化的bean即可

      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tOpm9HSI-1657457021125)(AB3A22730DE94A429C5E8E06B775A582)]

      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eCi6RDyH-1657457021125)(8B6D484413754A968B9F8382DDCF42BB)]

      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nToIWA1R-1657457021126)(FCA19A554FED42B584A6511BB14DBD5C)]

      • 调用父类的getInstance()方法拿取ILoadBalancer 直接返回 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yGuV8mR3-1657457021126)(60F786012D4A4C05B0C3315969DD1428)]
      • contexts是一个map,如果这个服务名存在,就直接中map中去,如果不存在,就创建,然后存到map中 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-n1JKnl9C-1657457021127)(ABF7DC1D4CBE4EF59AA652CB7E84151F)]
      • 这一块就很明确了,一个服务对应一个独立的applicationContext [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wnCyzSHu-1657457021128)(4B32A773570545EB938727320E6C6541)]
      • 流程图 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Z1A0sUtR-1657457021128)(B481C5CE903A45F4B3FE922CF373CBE8)]
    • getServer(loadBalancer) 通过负载均衡算法(round robin轮询算法),选择一个server出来 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2sECyWQY-1657457021129)(9B5669EF037746B2820A5B80F8668C59)]

      • 调用父类chooseServer()方法,父类BaseLoadBalander [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jdQBWhHZ-1657457021129)(5F500459AB084E7EA4AA2682016A1CB4)]
      • BaseLoadBalancer的chooseServer()方法中,直接就是用的IRule来选择了一台服务器 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7eKqGqYS-1657457021130)(B57016114BCF4C1F929B5F2E70416DC6)]
      • IRule 实例化在这,调用ZoneAvoidanceRule 父类的choose() [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6TOh8E02-1657457021130)(BE1CFFB0CF5C4E22A19D59FF47442CDA)] [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Dy7TZsPt-1657457021131)(1C53DEB70F654FDB9AABEA5BAC63D1CC)] [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EBLGc1CI-1657457021131)(D8DCDDB65F9849698A9CA427B4261929)]
      • 负载均衡实际就是用的轮询算法,nextIndex是一个AtomicInteger,默认是0开始,当前的(current+1)%modulo(当前服务机器的数量)=next,然后把next更新到current里面,返回选中的机器 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cYXQluGI-1657457021132)(52292173E183450D8757CAE8F9A146FF)] [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-w6Sbv2f5-1657457021132)(86D2935C466B422FA11B580E88A8EA98)]
    • execute(serviceId, ribbonServer, request) 最后执行请求,这个reuqest 就是在拦截器哪里调用execute() 通过 requestFactory.createRequest 创建 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gXIKS8BP-1657457021133)(B572FB9C034B469AA5C1ECBEE1BC3E58)]

      • 主要就是封装一些请求参数,比如http://ServiceA/sayHello/leo ==> 替换成http://localhost:8080/sayHello/leo,就用这个请求,基于底层的ClientHttpRequestExecution发起一次http请求 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wYLydVEh-1657457021134)(02AAAA8EE2884A1CA4119633ADC658C7)]
  • ILoadBlancer 是什么东西,干吗用的?

    • ILoadBlancer实例化,在spring-cloud-netflix-core工程下的org.springframework.cloud.netflix.ribbon包中的RibbonClientConfiguration 类中 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ohkAn6j2-1657457021134)(C332F112BEC540C7A1D22FC2E81C5A14)] [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kmm1R9xT-1657457021135)(3515B9AD0E104290AD8FE1934486B27E)]
    • 创建的ZoneAwareLoadBalancerZoneAwareLoadBalancer的父类是:DynamicServerListLoadBalancer,他的父类又是:BaseLoadBalancer
    • 看核心方法restOfInit [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-d0dbCxWY-1657457021135)(30BEE7925B8A4CE19AED5C0F1FDE817B)] [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qBbD9xsd-1657457021136)(E1C8970CE29848CF89AA7491118FB521)]
    • enableAndInitLearnNewServersFeature 定时刷新从eureka client中获取注册表(定时任务),然后刷新到LoadBalancer中去, [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uk3FjxLr-1657457021136)(7709709B76E742E7BA55254EC118E980)]
      • 通过start()创建一个Runnable线程,然后用任务调度每隔30s执行Runnable线程,从eureka client 刷新注册表到自己的Ribbon的ILoadBalancer [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LAvK3Cbv-1657457021137)(3C3999E1DA2A4BCE9820943897BD1A06)]
    • updateListOfServers():通过这个方法,从eureka client那里获取到Service的server list 服务下的所有服务实例 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vC7eEaoE-1657457021138)(F56012D0B9C8431683D990C7BBC108A9)]
      • serverListImpl是通过构造函数传过来的,就是个serverList,具体在spring-cloud-netflix-eureka-client工程下的org.springframework.cloud.netflix.ribbon.eureka包下的EurekaRibbonClientConfiguration中,实例化通过@Bean 注入 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QD1yMXeo-1657457021138)(B1A1C7C7499A40B49B433971B02C64FA)]
      • 通过serverListImpl.getUpdatedListOfServers 拿到服务下的所有服务实例,getUpdatedServerList()方法,是调用的DiscoveryEnabledNIWSServerListgetUpdatedServerList()方法: [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pgktXxfl-1657457021139)(FA1C87D12D404168A9AD5109B438F7B6)]
    • ILoadBlancer主要就干两件事
      • 定时任务定时更新eureka clent 根据服务名获取服务实例到ILoadBlancer
      • eureka clent根据服务名获取服务实例到ILoadBlancer
    • 流程图 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-50L657sl-1657457021139)(F002ED06B8484B21B7F6F710871C7638)]
  • Ping机制

  • ribbon Ping机制 检查服务器是否存活

  • Ping机制的源码,原生的ribbon Ping机制是通过DummyPing 类实现的,但是DummyPing里面什么逻辑的都没,默认下是不生效的 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fem8yldJ-1657457021140)(2E60B41739634C0397EE99E03B9DFF27)] [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vjxzdo89-1657457021140)(A63F27629CD1425B851FE23F0EE38B5D)]

  • 判断服务是否存活应该是eureka的事情erueka自己会有故障发现和服务实例摘除的机制,如果某个服务实例挂了,eureka server会发现,然后摘除这个服务实例,然后所有的eureka client都会得到一个通知,eureka client本地的ribbon,不是有一个PollingServerListUpdater组件,每隔30秒去从自己本地的eureka client去拉取注册表

  • ribbon通过与eureka的整合,自动就有一套服务实例故障自动摘除的机制,源码在EurekaRibbonClientConfiguration类中 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CCn3FNEB-1657457021141)(7DF3C651E930405583E1FD0677B0EE17)]

  • 通过NIWSDiscoveryPing类实现Ping机制,isAlive() 方法判断服务是否存活,主要判断服务实例是不是UP状态 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Nyu1LTSm-1657457021141)(3BCC309F9034423DBE574F41C88B254E)]

  • 在构建ILoadBalancer 中, ZoneAwareLoadBalancer 的父类BaseLoadBalancer initWithConfig() 有个定时任务,每隔一段时间(默认30s)就会执行用IPing组件对每个server都执行一下isAlive()方法 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rKXGBTkZ-1657457021142)(1C763E0A09614B658BB4B43B9219C5D6)] [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hKepkXd7-1657457021143)(D5EBF9177F09452A8CB6F7F0AAD96BAF)] [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tMq1L9Ea-1657457021143)(E195CB5661A044768D260F5AFAD6FF9C)] [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yO7o0Hr5-1657457021144)(A468F27B7934464C8D8ABC224771189C)] [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yejdsq4C-1657457021144)(0E9FADEC45A7445491CC0875F865592A)] [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mozrZ3rx-1657457021145)(C06A2035FF104A7787A1E8E4A8DB559A)]

标签: d260s01传感器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台