Feign使用和原理的总结

Feign使用和原理的总结,第1张

Feign总结 使用 引入依赖
 <properties>
        <java.version>1.8java.version>
        <spring-cloud.version>2021.0.1spring-cloud.version>
    properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-webartifactId>
        dependency>
        <dependency>
            <groupId>org.springframework.cloudgroupId>
            <artifactId>spring-cloud-starter-loadbalancerartifactId>
        dependency>
        <dependency>
            <groupId>org.springframework.cloudgroupId>
            <artifactId>spring-cloud-starter-netflix-eureka-clientartifactId>
        dependency>
        
        <dependency>
            <groupId>org.springframework.cloudgroupId>
            <artifactId>spring-cloud-starter-openfeignartifactId>
        dependency>
    dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloudgroupId>
                <artifactId>spring-cloud-dependenciesartifactId>
                <version>${spring-cloud.version}version>
                <type>pomtype>
                <scope>importscope>
            dependency>
        dependencies>
    dependencyManagement>
配置文件

配置文件需要配置eureka服务器信息,使用了ribbon负载均衡,需要配置负载均衡相关信息

server:
  port: 8889
  contextPath: /feign

spring:
  #项目名字
  application:
    name: feign

# eureka client配置
eureka:
  client:
    registryFetchIntervalSeconds: 5
    serviceUrl:
      defaultZone: http://localhost:8098/eureka/  #eureka服务端提供的注册地址 参考服务端配置的这个路径
  instance:
    hostname: feign #此实例注册到eureka服务端的唯一的实例ID
    prefer-ip-address: true #是否显示IP地址
    leaseRenewalIntervalInSeconds: 10 #eureka客户需要多长时间发送心跳给eureka服务器,表明它仍然活着,默认为30 秒 (与下面配置的单位都是秒)
    leaseExpirationDurationInSeconds: 30 #Eureka服务器在接收到实例的最后一次发出的心跳后,需要等待多久才可以将此实例删除,默认为90秒
    health-check-url-path: /actuator/health

ribbon:
  MaxAutoRetries: 2 #最大重试次数,当Eureka中可以找到服务,但是服务连不上时将会重试
  MaxAutoRetriesNextServer: 3 #切换实例的重试次数
  OkToRetryOnAllOperations: false  #对所有 *** 作请求都进行重试,如果是get则可以,如果是post,put等 *** 作没有实现幂等的情况下是很危险的,所以设置为false
  ConnectTimeout: 5000  #请求连接的超时时间
  ReadTimeout: 6000 #请求处理的超时时间

# 调用USER-MGT微服务时使用随机策略
USER-MGT:
  ribbon:
    NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule
启动类增加注解

增加@EnableFeignClients注解,自动加载Feign的配置

@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
public class FeignApplication {

    public static void main(String[] args) {
        SpringApplication.run(FeignApplication.class, args);
    }

}
定义feign调用标签

加入客户端要调用user-mgt微服务的一个接口获取当前的端口号,可以使用@FeignClient定义客户端

@FeignClient("user-mgt")
public interface UserMgtClient {
    @RequestMapping(method = RequestMethod.GET, value = "/sequence/number/port")
    String getPort();
}
调用目标接口

可以通过属性注入的方式将上面定义的UserMgtClient进行注入

@RestController
@RequestMapping("/feign")
@Slf4j
public class FeignTestController {
    @Autowired
    private UserMgtClient userMgtClient;

    @RequestMapping(value = "/server/port", method = RequestMethod.GET, produces = "application/json")
    public String getServerPort(){
        String port = userMgtClient.getPort();
        log.info("port={}",port);
        return port;
    }
}
调用过程

对上面的调用过程做一个回顾

首先userMgtClient.getPort()会通过JDK代理的方式进行调用,ReflectiveFeign#invoke的实现中会判断是否调用的是常用的类方法equals、hashCode和toString,如果是直接返回,否则会获取具体的代理目标类来调用目标方法

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  if ("equals".equals(method.getName())) {
    try {
      Object otherHandler =
          args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
      return equals(otherHandler);
    } catch (IllegalArgumentException e) {
      return false;
    }
  } else if ("hashCode".equals(method.getName())) {
    return hashCode();
  } else if ("toString".equals(method.getName())) {
    return toString();
  }
  // 获取代理类,调用目标方法
  return dispatch.get(method).invoke(args);
}

dispatch.get(method).invoke(args)会调用对应的处理器SynchronousMethodHandler#invoke方法,他会新建RestTemplate,执行调用,如果失败了,会进行一次重试

public Object invoke(Object[] argv) throws Throwable {
    // 新建RestTemplate
  RequestTemplate template = buildTemplateFromArgs.create(argv);
    // 获取options
  Options options = findOptions(argv);
    // 克隆重试器
  Retryer retryer = this.retryer.clone();
  while (true) {
    try {
        // 执行调用和解码
      return executeAndDecode(template, options);
    } catch (RetryableException e) {
      try {
          // 重试
        retryer.continueOrPropagate(e);
      } catch (RetryableException th) {
          // 重试后依然报错,抛出异常
        Throwable cause = th.getCause();
        if (propagationPolicy == UNWRAP && cause != null) {
          throw cause;
        } else {
          throw th;
        }
      }
      if (logLevel != Logger.Level.NONE) {
        logger.logRetry(metadata.configKey(), logLevel);
      }
      continue;
    }
  }
}

executeAndDecode(template, options)方法中首先构造对目标接口调用的HTTP请求,这里可以扩展请求定制,通过实现RequestInterceptor接口可以为请求添加header属性等信息,如果微服务鉴权需要token,可以使用这种方式;然后进行客户端的调用,比通过异步任务的方式对响应进行解码

Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {
    // 构造对目标接口调用的HTTP请求
  Request request = targetRequest(template);

  if (logLevel != Logger.Level.NONE) {
    logger.logRequest(metadata.configKey(), logLevel, request);
  }

  Response response;
  long start = System.nanoTime();
  try {
      // 客户端调用
    response = client.execute(request, options);
      // 构造响应
    response = response.toBuilder()
        .request(request)
        .requestTemplate(template)
        .build();
  } catch (IOException e) {
    if (logLevel != Logger.Level.NONE) {
      logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
    }
    throw errorExecuting(request, e);
  }
  long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);

  // 解码响应信息
  if (decoder != null)
    return decoder.decode(response, metadata.returnType());
  // 解码响应采用异步的方式,通过CompletableFuture实现
  CompletableFuture<Object> resultFuture = new CompletableFuture<>();
  asyncResponseHandler.handleResponse(resultFuture, metadata.configKey(), response,
      metadata.returnType(),
      elapsedTime);

  try {
      // 等待解码结果
    if (!resultFuture.isDone())
      throw new IllegalStateException("Response handling not done");
      // 异步执行结果反馈
    return resultFuture.join();
  } catch (CompletionException e) {
    Throwable cause = e.getCause();
    if (cause != null)
      throw cause;
    throw e;
  }
}

Request targetRequest(RequestTemplate template) {
    // 通过实现RequestInterceptor接口可以为请求添加header属性等信息,如果微服务鉴权需要token,可以使用
    for (RequestInterceptor interceptor : requestInterceptors) {
      interceptor.apply(template);
    }
    // 执行目标对象的apply方法
    return target.apply(template);
  }

client.execute(request, options)客户端执行方法是调用的核心,

public Response execute(Request request, Options options) throws IOException {
    // 创建原始URI
    URI originalUri = URI.create(request.url());
    // 获取URI的HOST
    String serviceId = originalUri.getHost();
    Assert.state(serviceId != null, "Request URI does not contain a valid hostname: " + originalUri);
    // 获取hint
    String hint = this.getHint(serviceId);
    // 构造负载均衡请求
    DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest(new RequestDataContext(LoadBalancerUtils.buildRequestData(request), hint));
    // 获取支持生命周期的处理器
    Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator.getSupportedLifecycleProcessors(this.loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class), RequestDataContext.class, ResponseData.class, ServiceInstance.class);
    // 生命周期开始
    supportedLifecycleProcessors.forEach((lifecycle) -> {
        lifecycle.onStart(lbRequest);
    });
    // 根据负载均衡策略获取实例
    ServiceInstance instance = this.loadBalancerClient.choose(serviceId, lbRequest);
    // 构造负载均衡响应
    org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse = new DefaultResponse(instance);
    String message;
    if (instance == null) { // 没有负载均衡实例
        message = "Load balancer does not contain an instance for the service " + serviceId;
        if (LOG.isWarnEnabled()) {
            LOG.warn(message);
        }
        // 生命周期结束
        supportedLifecycleProcessors.forEach((lifecycle) -> {
            lifecycle.onComplete(new CompletionContext(Status.DISCARD, lbRequest, lbResponse));
        });
        return Response.builder().request(request).status(HttpStatus.SERVICE_UNAVAILABLE.value()).body(message, StandardCharsets.UTF_8).build();
    } else { // 存在负载均衡实例
        // 重新构造负载均衡请求,将serviceId替换为IP+端口
        message = this.loadBalancerClient.reconstructURI(instance, originalUri).toString();
        Request newRequest = this.buildRequest(request, message);
        // 请求处理
        return LoadBalancerUtils.executeWithLoadBalancerLifecycleProcessing(this.delegate, options, newRequest, lbRequest, lbResponse, supportedLifecycleProcessors);
    }
}

LoadBalancerUtils.executeWithLoadBalancerLifecycleProcessing执行负载均衡处理的具体处理,主要完成负载均衡生命周期的处理,执行HTTP请求

static Response executeWithLoadBalancerLifecycleProcessing(Client feignClient, Request.Options options,
      Request feignRequest, org.springframework.cloud.client.loadbalancer.Request lbRequest,
      org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse,
      Set<LoadBalancerLifecycle> supportedLifecycleProcessors, boolean loadBalanced) throws IOException {
    // 生命周期开始
   supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, lbResponse));
   try {
       // 执行实际的HTTP请求
      Response response = feignClient.execute(feignRequest, options);
      if (loadBalanced) { // 启用了负载均衡
          // 生命周期结束
         supportedLifecycleProcessors.forEach(
               lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.SUCCESS,
                     lbRequest, lbResponse, buildResponseData(response))));
      }
      return response;
   }
   catch (Exception exception) {
      if (loadBalanced) {
         supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
               new CompletionContext<>(CompletionContext.Status.FAILED, exception, lbRequest, lbResponse)));
      }
      throw exception;
   }
}

feignClient.execute(feignRequest, options)客户端执行HTTP请求,主要是建立HTTP的连接,然后调用,再对返回结果进行封装

public Response execute(Request request, Options options) throws IOException {
    // 建立HTTP连接,执行请求
  HttpURLConnection connection = convertAndSend(request, options);
    // 转换返回结果
  return convertResponse(connection, request);
}
配置加载

首先,要使用Feign,需要在启动类添加@EnableFeignClients注解,这个注解里面的@Import会加载FeignClientsRegistrar类,该类中会进行Feign客户端相关的配置

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(FeignClientsRegistrar.class)
public @interface EnableFeignClients {
   String[] value() default {};
   String[] basePackages() default {};
   Class<?>[] basePackageClasses() default {};
   Class<?>[] defaultConfiguration() default {};
   Class<?>[] clients() default {};
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/langs/720029.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-25
下一篇 2022-04-25

发表评论

登录后才能评论

评论列表(0条)

保存