<properties>
<java.version>1.8java.version>
<spring-cloud.version>2021.0.1spring-cloud.version>
properties>
<dependencies>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-webartifactId>
dependency>
<dependency>
<groupId>org.springframework.cloudgroupId>
<artifactId>spring-cloud-starter-loadbalancerartifactId>
dependency>
<dependency>
<groupId>org.springframework.cloudgroupId>
<artifactId>spring-cloud-starter-netflix-eureka-clientartifactId>
dependency>
<dependency>
<groupId>org.springframework.cloudgroupId>
<artifactId>spring-cloud-starter-openfeignartifactId>
dependency>
dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloudgroupId>
<artifactId>spring-cloud-dependenciesartifactId>
<version>${spring-cloud.version}version>
<type>pomtype>
<scope>importscope>
dependency>
dependencies>
dependencyManagement>
配置文件
配置文件需要配置eureka服务器信息,使用了ribbon负载均衡,需要配置负载均衡相关信息
server:
port: 8889
contextPath: /feign
spring:
#项目名字
application:
name: feign
# eureka client配置
eureka:
client:
registryFetchIntervalSeconds: 5
serviceUrl:
defaultZone: http://localhost:8098/eureka/ #eureka服务端提供的注册地址 参考服务端配置的这个路径
instance:
hostname: feign #此实例注册到eureka服务端的唯一的实例ID
prefer-ip-address: true #是否显示IP地址
leaseRenewalIntervalInSeconds: 10 #eureka客户需要多长时间发送心跳给eureka服务器,表明它仍然活着,默认为30 秒 (与下面配置的单位都是秒)
leaseExpirationDurationInSeconds: 30 #Eureka服务器在接收到实例的最后一次发出的心跳后,需要等待多久才可以将此实例删除,默认为90秒
health-check-url-path: /actuator/health
ribbon:
MaxAutoRetries: 2 #最大重试次数,当Eureka中可以找到服务,但是服务连不上时将会重试
MaxAutoRetriesNextServer: 3 #切换实例的重试次数
OkToRetryOnAllOperations: false #对所有 *** 作请求都进行重试,如果是get则可以,如果是post,put等 *** 作没有实现幂等的情况下是很危险的,所以设置为false
ConnectTimeout: 5000 #请求连接的超时时间
ReadTimeout: 6000 #请求处理的超时时间
# 调用USER-MGT微服务时使用随机策略
USER-MGT:
ribbon:
NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule
启动类增加注解
增加@EnableFeignClients注解,自动加载Feign的配置
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
public class FeignApplication {
public static void main(String[] args) {
SpringApplication.run(FeignApplication.class, args);
}
}
定义feign调用标签
加入客户端要调用user-mgt微服务的一个接口获取当前的端口号,可以使用@FeignClient定义客户端
@FeignClient("user-mgt")
public interface UserMgtClient {
@RequestMapping(method = RequestMethod.GET, value = "/sequence/number/port")
String getPort();
}
调用目标接口
可以通过属性注入的方式将上面定义的UserMgtClient进行注入
@RestController
@RequestMapping("/feign")
@Slf4j
public class FeignTestController {
@Autowired
private UserMgtClient userMgtClient;
@RequestMapping(value = "/server/port", method = RequestMethod.GET, produces = "application/json")
public String getServerPort(){
String port = userMgtClient.getPort();
log.info("port={}",port);
return port;
}
}
调用过程
对上面的调用过程做一个回顾
首先userMgtClient.getPort()会通过JDK代理的方式进行调用,ReflectiveFeign#invoke的实现中会判断是否调用的是常用的类方法equals、hashCode和toString,如果是直接返回,否则会获取具体的代理目标类来调用目标方法
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if ("equals".equals(method.getName())) {
try {
Object otherHandler =
args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
return equals(otherHandler);
} catch (IllegalArgumentException e) {
return false;
}
} else if ("hashCode".equals(method.getName())) {
return hashCode();
} else if ("toString".equals(method.getName())) {
return toString();
}
// 获取代理类,调用目标方法
return dispatch.get(method).invoke(args);
}
dispatch.get(method).invoke(args)会调用对应的处理器SynchronousMethodHandler#invoke方法,他会新建RestTemplate,执行调用,如果失败了,会进行一次重试
public Object invoke(Object[] argv) throws Throwable {
// 新建RestTemplate
RequestTemplate template = buildTemplateFromArgs.create(argv);
// 获取options
Options options = findOptions(argv);
// 克隆重试器
Retryer retryer = this.retryer.clone();
while (true) {
try {
// 执行调用和解码
return executeAndDecode(template, options);
} catch (RetryableException e) {
try {
// 重试
retryer.continueOrPropagate(e);
} catch (RetryableException th) {
// 重试后依然报错,抛出异常
Throwable cause = th.getCause();
if (propagationPolicy == UNWRAP && cause != null) {
throw cause;
} else {
throw th;
}
}
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}
continue;
}
}
}
executeAndDecode(template, options)
方法中首先构造对目标接口调用的HTTP请求,这里可以扩展请求定制,通过实现RequestInterceptor接口可以为请求添加header属性等信息,如果微服务鉴权需要token,可以使用这种方式;然后进行客户端的调用,比通过异步任务的方式对响应进行解码
Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {
// 构造对目标接口调用的HTTP请求
Request request = targetRequest(template);
if (logLevel != Logger.Level.NONE) {
logger.logRequest(metadata.configKey(), logLevel, request);
}
Response response;
long start = System.nanoTime();
try {
// 客户端调用
response = client.execute(request, options);
// 构造响应
response = response.toBuilder()
.request(request)
.requestTemplate(template)
.build();
} catch (IOException e) {
if (logLevel != Logger.Level.NONE) {
logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
}
throw errorExecuting(request, e);
}
long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
// 解码响应信息
if (decoder != null)
return decoder.decode(response, metadata.returnType());
// 解码响应采用异步的方式,通过CompletableFuture实现
CompletableFuture<Object> resultFuture = new CompletableFuture<>();
asyncResponseHandler.handleResponse(resultFuture, metadata.configKey(), response,
metadata.returnType(),
elapsedTime);
try {
// 等待解码结果
if (!resultFuture.isDone())
throw new IllegalStateException("Response handling not done");
// 异步执行结果反馈
return resultFuture.join();
} catch (CompletionException e) {
Throwable cause = e.getCause();
if (cause != null)
throw cause;
throw e;
}
}
Request targetRequest(RequestTemplate template) {
// 通过实现RequestInterceptor接口可以为请求添加header属性等信息,如果微服务鉴权需要token,可以使用
for (RequestInterceptor interceptor : requestInterceptors) {
interceptor.apply(template);
}
// 执行目标对象的apply方法
return target.apply(template);
}
client.execute(request, options)
客户端执行方法是调用的核心,
public Response execute(Request request, Options options) throws IOException {
// 创建原始URI
URI originalUri = URI.create(request.url());
// 获取URI的HOST
String serviceId = originalUri.getHost();
Assert.state(serviceId != null, "Request URI does not contain a valid hostname: " + originalUri);
// 获取hint
String hint = this.getHint(serviceId);
// 构造负载均衡请求
DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest(new RequestDataContext(LoadBalancerUtils.buildRequestData(request), hint));
// 获取支持生命周期的处理器
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator.getSupportedLifecycleProcessors(this.loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class), RequestDataContext.class, ResponseData.class, ServiceInstance.class);
// 生命周期开始
supportedLifecycleProcessors.forEach((lifecycle) -> {
lifecycle.onStart(lbRequest);
});
// 根据负载均衡策略获取实例
ServiceInstance instance = this.loadBalancerClient.choose(serviceId, lbRequest);
// 构造负载均衡响应
org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse = new DefaultResponse(instance);
String message;
if (instance == null) { // 没有负载均衡实例
message = "Load balancer does not contain an instance for the service " + serviceId;
if (LOG.isWarnEnabled()) {
LOG.warn(message);
}
// 生命周期结束
supportedLifecycleProcessors.forEach((lifecycle) -> {
lifecycle.onComplete(new CompletionContext(Status.DISCARD, lbRequest, lbResponse));
});
return Response.builder().request(request).status(HttpStatus.SERVICE_UNAVAILABLE.value()).body(message, StandardCharsets.UTF_8).build();
} else { // 存在负载均衡实例
// 重新构造负载均衡请求,将serviceId替换为IP+端口
message = this.loadBalancerClient.reconstructURI(instance, originalUri).toString();
Request newRequest = this.buildRequest(request, message);
// 请求处理
return LoadBalancerUtils.executeWithLoadBalancerLifecycleProcessing(this.delegate, options, newRequest, lbRequest, lbResponse, supportedLifecycleProcessors);
}
}
LoadBalancerUtils.executeWithLoadBalancerLifecycleProcessing
执行负载均衡处理的具体处理,主要完成负载均衡生命周期的处理,执行HTTP请求
static Response executeWithLoadBalancerLifecycleProcessing(Client feignClient, Request.Options options,
Request feignRequest, org.springframework.cloud.client.loadbalancer.Request lbRequest,
org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse,
Set<LoadBalancerLifecycle> supportedLifecycleProcessors, boolean loadBalanced) throws IOException {
// 生命周期开始
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, lbResponse));
try {
// 执行实际的HTTP请求
Response response = feignClient.execute(feignRequest, options);
if (loadBalanced) { // 启用了负载均衡
// 生命周期结束
supportedLifecycleProcessors.forEach(
lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.SUCCESS,
lbRequest, lbResponse, buildResponseData(response))));
}
return response;
}
catch (Exception exception) {
if (loadBalanced) {
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
new CompletionContext<>(CompletionContext.Status.FAILED, exception, lbRequest, lbResponse)));
}
throw exception;
}
}
feignClient.execute(feignRequest, options)
客户端执行HTTP请求,主要是建立HTTP的连接,然后调用,再对返回结果进行封装
public Response execute(Request request, Options options) throws IOException {
// 建立HTTP连接,执行请求
HttpURLConnection connection = convertAndSend(request, options);
// 转换返回结果
return convertResponse(connection, request);
}
配置加载
首先,要使用Feign,需要在启动类添加@EnableFeignClients注解,这个注解里面的@Import会加载FeignClientsRegistrar类,该类中会进行Feign客户端相关的配置
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(FeignClientsRegistrar.class)
public @interface EnableFeignClients {
String[] value() default {};
String[] basePackages() default {};
Class<?>[] basePackageClasses() default {};
Class<?>[] defaultConfiguration() default {};
Class<?>[] clients() default {};
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)