作者:向昊
上集我们大概了解到了服务端是怎么处理请求的,那么发送请求又是个什么样的流程了?本文主要分析使用cse
提供的RestTemplate
的场景,其实cse
提供的rpc
注解(RpcReference
)的方式最后的调用逻辑和RestTemplate
是殊途同归的。
使用
使用cse
提供的RestTemplate
时候,是这样初始化的:
RestTemplate restTemplate = RestTemplateBuilder.create();
restTemplate.getForObject("cse://appId:serviceName/xxx", Object.class);
我们可以注意到2个怪异的地方:
RestTemplate
是通过RestTemplateBuilder.create()
来获取的,而不是用的Spring
里提供的。- 请求路径开头是
cse
而不是我们常见的http
、https
且需要加上服务所属的应用ID
和服务名称。
解析
根据url匹配RestTemplate
首先看下RestTemplateBuilder.create()
,它返回的是org.apache.servicecomb.provider.springmvc.reference.RestTemplateWrapper
,是cse
提供的一个包装类。
// org.apache.servicecomb.provider.springmvc.reference.RestTemplateWrapper
// 用于同时支持cse调用和非cse调用
class RestTemplateWrapper extends RestTemplate {
private final List<AcceptableRestTemplate> acceptableRestTemplates = new ArrayList<>();
final RestTemplate defaultRestTemplate = new RestTemplate();
RestTemplateWrapper() {
acceptableRestTemplates.add(new CseRestTemplate());
}
RestTemplate getRestTemplate(String url) {
for (AcceptableRestTemplate template : acceptableRestTemplates) {
if (template.isAcceptable(url)) {
return template;
}
}
return defaultRestTemplate;
}
}
-
AcceptableRestTemplate
:这个类是一个抽象类,也是继承RestTemplate
的,目前其子类就是CseRestTemplate
,我们也可以看到在初始化的时候会默认往acceptableRestTemplates
中添加一个CseRestTemplate
。 -
回到使用的地方
restTemplate.getForObject
:这个方法会委托给如下方法:public <T> T getForObject(String url, Class<T> responseType, Object... urlVariables) throws RestClientException { return getRestTemplate(url).getForObject(url, responseType, urlVariables); }
可以看到首先会调用
getRestTemplate(url)
,即会调用template.isAcceptable(url)
,如果匹配到了就返回CseRestTemplate
,否则就返回常规的RestTemplate
。那么再看下isAcceptable()
这个方法:
到这里我们就清楚了路径中的cse://
的作用了,就是为了使用CseRestTemplate
来发起请求,也理解了为啥RestTemplateWrapper
可以同时支持cse
调用和非cse
调用。
委托调用
从上面可知,我们的cse
调用其实都是委托给CseRestTemplate
了。在构造CseRestTemplate
的时候会初始化几个东西:
public CseRestTemplate() {
setMessageConverters(Arrays.asList(new CseHttpMessageConverter()));
setRequestFactory(new CseClientHttpRequestFactory());
setUriTemplateHandler(new CseUriTemplateHandler());
}
这里需要重点关注new CseClientHttpRequestFactory()
:
public class CseClientHttpRequestFactory implements ClientHttpRequestFactory {
@Override
public ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod) throws IOException {
return new CseClientHttpRequest(uri, httpMethod);
}
}
最终委托到了CseClientHttpRequest
这个类,这里就是重头戏了!
我们先把注意力拉回到这句话:restTemplate.getForObject("cse://appId:serviceName/xxx", Object.class)
,从上面我们知道其逻辑是先根据url
找到对应的RestTemplate
,然后调用getForObject
这个方法,最终这个方法会调用到:org.springframework.web.client.RestTemplate#doExecute
:
protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
@Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
ClientHttpResponse response = null;
try {
ClientHttpRequest request = createRequest(url, method);
if (requestCallback != null) {
requestCallback.doWithRequest(request);
}
response = request.execute();
handleResponse(url, method, response);
return (responseExtractor != null ? responseExtractor.extractData(response) : null);
}
}
createRequest(url, method)
:会调用getRequestFactory().createRequest(url, method)
,即最终会调用到我们初始化CseClientHttpRequest
是塞的RequestFactory
,所以这里会返回ClientHttpRequest
这个类。request.execute()
:这个方法会委托到org.apache.servicecomb.provider.springmvc.reference.CseClientHttpRequest#execute
这个方法上。
至此我们知道前面的调用最终会委托到CseClientHttpRequest#execute这个方法上
cse调用
接着上文分析:
public ClientHttpResponse execute() {
path = findUriPath(uri);
requestMeta = createRequestMeta(method.name(), uri);
QueryStringDecoder queryStringDecoder = new QueryStringDecoder(uri.getRawSchemeSpecificPart());
queryParams = queryStringDecoder.parameters();
Object[] args = this.collectArguments();
// 异常流程,直接抛异常出去
return this.invoke(args);
}
-
createRequestMeta(method.name(), uri)
:这里主要是根据microserviceName
去获取调用服务的信息,并会将获取的信息放入到Map
中。服务信息如下:
-
可以看到里面的信息很丰富,例如应用名、服务名、还有接口对应的yaml信息等。
-
this.collectArguments()
:这里隐藏了一个校验点,就是会校验传入的参数是否符合对方接口的定义。主要是通过这个方法:org.apache.servicecomb.common.rest.codec.RestCodec#restToArgs
,如果不符合真个流程就结束了。
准备invocation
从上面分析可知,获取到接口所需的参数后就会调用这个方法:org.apache.servicecomb.provider.springmvc.reference.CseClientHttpRequest#invoke
:
private CseClientHttpResponse invoke(Object[] args) {
Invocation invocation = prepareInvocation(args);
Response response = doInvoke(invocation);
if (response.isSuccessed()) {
return new CseClientHttpResponse(response);
}
throw ExceptionFactory.convertConsumerException(response.getResult());
}
-
prepareInvocation(args)
:这个方法会准备好Invocation
,这个Invocation
在上集已经分析过了,不过上集中的它是为服务端服务的,那么咱们这块当然就得为消费端工作了protected Invocation prepareInvocation(Object[] args) { Invocation invocation = InvocationFactory.forConsumer(requestMeta.getReferenceConfig(), requestMeta.getOperationMeta(), args); return invocation; }
从名字也可以看出它是为消费端服务的,其实无论是
forProvider
还是forConsumer
,它们最主要的区别就是加载的Handler
不同,这次加载的Handler
如下:class org.apache.servicecomb.qps.ConsumerQpsFlowControlHandler
(流控)class org.apache.servicecomb.loadbalance.LoadbalanceHandler
(负载)class org.apache.servicecomb.bizkeeper.ConsumerBizkeeperHandler
(容错)class org.apache.servicecomb.core.handler.impl.TransportClientHandler
(调用,默认加载的)前面3个
Handler
可以参考下这个微服务治理专栏 -
doInvoke(invocation)
:初始化好了invocation
后就开始调用了。最终会调用到这个方法上:org.apache.servicecomb.core.provider.consumer.InvokerUtils#innerSyncInvoke
至此,这些动作就是
cse
中RestTemplate
和rpc
调用的不同之处。不过可以清楚的看到RestTemplate
的方式是只支持同步的,即innerSyncInvoke
,但是rpc
是可以支持异步的,即reactiveInvoke
public static Response innerSyncInvoke(Invocation invocation) { invocation.next(respExecutor::setResponse); }
到这里我们知道了,消费端发起请求还是得靠invocation
的责任链驱动
启动invocation责任链
好了,咱们的老朋友又出现了:invocation.next
,这个方法是个典型的责任链模式,其链条就是上面说的那4个Handler
。前面3个就不分析了,直接跳到TransportClientHandler
。
// org.apache.servicecomb.core.handler.impl.TransportClientHandler
public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception {
Transport transport = invocation.getTransport();
transport.send(invocation, asyncResp);
}
-
invocation.getTransport()
:获取请求地址,即最终发送请求的时候还是以ip:port
的形式。 -
transport.send(invocation, asyncResp)
:调用链为org.apache.servicecomb.transport.rest.vertx.VertxRestTransport#send
->
org.apache.servicecomb.transport.rest.client.RestTransportClient#send
(这里会初始化HttpClientWithContext
,下面会分析)->
org.apache.servicecomb.transport.rest.client.http.RestClientInvocation#invoke
(真正发送请求的地方)public void invoke(Invocation invocation, AsyncResponse asyncResp) throws Exception { createRequest(ipPort, path); clientRequest.putHeader(org.apache.servicecomb.core.Const.TARGET_MICROSERVICE, invocation.getMicroserviceName()); RestClientRequestImpl restClientRequest = new RestClientRequestImpl(clientRequest, httpClientWithContext.context(), asyncResp, throwableHandler); invocation.getHandlerContext().put(RestConst.INVOCATION_HANDLER_REQUESTCLIENT, restClientRequest); Buffer requestBodyBuffer = restClientRequest.getBodyBuffer(); HttpServletRequestEx requestEx = new VertxClientRequestToHttpServletRequest(clientRequest, requestBodyBuffer); invocation.getInvocationStageTrace().startClientFiltersRequest(); // 触发filter.beforeSendRequest方法 for (HttpClientFilter filter : httpClientFilters) { if (filter.enabled()) { filter.beforeSendRequest(invocation, requestEx); } } // 从业务线程转移到网络线程中去发送 // httpClientWithContext.runOnContext }
-
createRequest(ipPort, path)
:根据参数初始化HttpClientRequest clientRequest
,初始化的时候会传入一个创建一个responseHandler
,即对响应的处理。注意org.apache.servicecomb.common.rest.filter.HttpClientFilter#afterReceiveResponse
的调用就是在这里埋下伏笔的,是通过回调org.apache.servicecomb.transport.rest.client.http.RestClientInvocation#processResponseBody
这个方法触发的(在创建responseHandler
时候创建的)且
org.apache.servicecomb.common.rest.filter.HttpClientFilter#beforeSendRequest
:这个方法的触发我们也可以很清楚的看到在发送请求执行的。 -
requestEx
:注意它的类型是HttpServletRequestEx
,虽然名字里面带有Servlet
,但是打开它的方法可以发现有很多我们在tomcat
中那些常用的方法都直接抛出异常了,这也是一个易错点! -
httpClientWithContext.runOnContext
:用来发送请求的逻辑,不过这里还是有点绕的,下面重点分析下
-
httpClientWithContext.runOnContext
首先看下HttpClientWithContext
的定义:
public class HttpClientWithContext {
public interface RunHandler {
void run(HttpClient httpClient);
}
private HttpClient httpClient;
private Context context;
public HttpClientWithContext(HttpClient httpClient, Context context) {
this.httpClient = httpClient;
this.context = context;
}
public void runOnContext(RunHandler handler) {
context.runOnContext((v) -> {
handler.run(httpClient);
});
}
}
-
从上面可知发送请求调用的是这个方法:
runOnContext
,参数为RunHandler
接口,然后是以lambda
的方式传入的,lambda
的参数为httpClient
,这个httpClient
又是在HttpClientWithContext
的构造函数中初始化的。这个构造函数是在org.apache.servicecomb.transport.rest.client.RestTransportClient#send
这个方法中初始化的(调用org.apache.servicecomb.transport.rest.client.RestTransportClient#findHttpClientPool
这个方法)。但是我们观察调用的地方:
// 从业务线程转移到网络线程中去发送 httpClientWithContext.runOnContext(httpClient -> { clientRequest.setTimeout(operationMeta.getConfig().getMsRequestTimeout()); processServiceCombHeaders(invocation, operationMeta); try { restClientRequest.end(); } catch (Throwable e) { LOGGER.error(invocation.getMarker(), "send http request failed, local:{}, remote: {}.", getLocalAddress(), ipPort, e); fail((ConnectionBase) clientRequest.connection(), e); } });
其实在这块逻辑中
HttpClient
是没有被用到的,实际上发送请求的动作是restClientRequest.end()
触发的,restClientRequest
是cse
中的类RestClientRequestImpl
,然后它包装了HttpClientRequest
(vertx
中提供的),即restClientRequest.end()
最终还是委托到了HttpClientRequest.end()
上了。那么这个
HttpClientRequest
是怎么被初始化的了?它是在createRequest(ipPort, path)
这个方法中初始化的,即在调用org.apache.servicecomb.transport.rest.client.http.RestClientInvocation#invoke
方法入口处。初始化的逻辑如下:
clientRequest = httpClientWithContext.getHttpClient().request(method, requestOptions, this::handleResponse)
httpClientWithContext.getHttpClient()
:这个方法返回的是HttpClient
,上面说的HttpClient
作用就体现出来了,用来初始化了我们发送请求的关键先生:HttpClientRequest
。那么至此我们发送请求的整体逻辑大概就清晰了。
总结
无论是采用RestTemplate
的方式还是采用rpc
注解的方式来发送请求,其底层逻辑其实是一样的。即首先根据请求信息匹配到对方的服务信息,然后经过一些列的Handler
处理,如限流、负载、容错等等(这也是一个很好的扩展机制),最终会走到TransportClientHandler
这个Handler
,然后根据条件去初始化发送的request
,经过HttpClientFilter
的处理后就会委托给vertx
的HttpClientRequest
来真正的发出请求。