作者:向昊
前置知识
cse的通信是基于vert.x来搞的,所以我们首先得了解下里面的几个概念:
Verticle:You can think of verticle instances as a bit like actors in the Actor Model. A typical verticle-based Vert.x application will be composed of many verticle instances in each Vert.x instance.参考:https://vertx.io/docs/apidocs/io/vertx/core/Verticle.html
所以我们知道干活的就是这个家伙,它就是这个模式中的工具人
Route:可以看成是一个条件集合(可以指定url的匹配规则),它用这些条件来判断一个http请求或失败是否应该被路由到指定的Handler
Router:可以看成一个核心的控制器,管理着Route
VertxHttpDispatcher:是cse里的类,可以看成是请求分发处理器,即一个请求过来了怎么处理都是由它来管理的。
初始化
RestServerVerticle
经过一系列流程最终会调用这个方法:
io.vertx.core.impl.DeploymentManager#doDeploy():注意如果在这个地方打断点,可能会进多次。因为上面也提到过我们的操作都是基于Verticle的,cse中有2种Verticle,一种是org.apache.servicecomb.foundation.vertx.client.ClientVerticle一种是org.apache.servicecomb.transport.rest.vertx.RestServerVerticle,这篇文章我们主要分析接受请求的流程,即着眼于RestServerVerticle,至于ClientVerticle的分析,先挖个坑,以后填上~
调用栈如下:
VertxHttpDispatcher
由上图可知,会调用如下方法:
// org.apache.servicecomb.transport.rest.vertx.RestServerVerticle#start
public void start(Promise<Void> startPromise) throws Exception {
// ...
Router mainRouter = Router.router(vertx);
mountAccessLogHandler(mainRouter);
mountCorsHandler(mainRouter);
initDispatcher(mainRouter);
// ...
}
在这里我们看到了上文提到的Router,继续看initDispatcher(mainRouter)这个方法:
// org.apache.servicecomb.transport.rest.vertx.RestServerVerticle#initDispatcher
private void initDispatcher(Router mainRouter) {
List<VertxHttpDispatcher> dispatchers = SPIServiceUtils.getSortedService(VertxHttpDispatcher.class);
for (VertxHttpDispatcher dispatcher : dispatchers) {
if (dispatcher.enabled()) {
dispatcher.init(mainRouter);
}
}
}
首先通过SPI方式获取所有VertxHttpDispatcher,然后循环调用其init方法,由于分析的不是边缘服务,即这里我们没有自定义VertxHttpDispatcher。
Router
接着上文分析,会调用如下方法:
// org.apache.servicecomb.transport.rest.vertx.VertxRestDispatcher
public void init(Router router) {
// cookies handler are enabled by default start from 3.8.3
String pattern = DynamicPropertyFactory.getInstance().getStringProperty(KEY_PATTERN, null).get();
if(pattern == null) {
router.route().handler(createBodyHandler());
router.route().failureHandler(this::failureHandler).handler(this::onRequest);
} else {
router.routeWithRegex(pattern).handler(createBodyHandler());
router.routeWithRegex(pattern).failureHandler(this::failureHandler).handler(this::onRequest);
}
}
由于一般不会主动去设置servicecomb.http.dispatcher.rest.pattern这个配置,即pattern为空,所以这个时候是没有特定url的匹配规则,即会匹配所有的url
我们需要注意handler(this::onRequest)这段代码,这个代码就是接受到请求后的处理。
处理请求
经过上面的初始化后,咱们的准备工作已经准备就绪,这个时候突然来了一个请求
(GET http://127.0.0.1:18088/agdms/v1/stock-apps/query?pkgName=test),便会触发上面提到的回调,如下:
// org.apache.servicecomb.transport.rest.vertx.VertxRestDispatcher#onRequest
protected void onRequest(RoutingContext context) {
if (transport == null) {
transport = CseContext.getInstance().getTransportManager().findTransport(Const.RESTFUL);
}
HttpServletRequestEx requestEx = new VertxServerRequestToHttpServletRequest(context);
HttpServletResponseEx responseEx = new VertxServerResponseToHttpServletResponse(context.response());
VertxRestInvocation vertxRestInvocation = new VertxRestInvocation();
context.put(RestConst.REST_PRODUCER_INVOCATION, vertxRestInvocation);
vertxRestInvocation.invoke(transport, requestEx, responseEx, httpServerFilters);
}
最主要的就是那个invoke方法:
// org.apache.servicecomb.common.rest.RestProducerInvocation#invoke
public void invoke(Transport transport, HttpServletRequestEx requestEx, HttpServletResponseEx responseEx,
List<HttpServerFilter> httpServerFilters) {
this.transport = transport;
this.requestEx = requestEx;
this.responseEx = responseEx;
this.httpServerFilters = httpServerFilters;
requestEx.setAttribute(RestConst.REST_REQUEST, requestEx);
try {
findRestOperation();
} catch (InvocationException e) {
sendFailResponse(e);
return;
}
scheduleInvocation();
}
这里看似简单,其实后背隐藏着大量的逻辑,下面来简单分析下findRestOperation()和scheduleInvocation()这2个方法。
findRestOperation
从名字我们也可以看出这个方法主要是寻找出对应的OperationId
// org.apache.servicecomb.common.rest.RestProducerInvocation#findRestOperation
protected void findRestOperation() {
MicroserviceMeta selfMicroserviceMeta = SCBEngine.getInstance().getProducerMicroserviceMeta();
findRestOperation(selfMicroserviceMeta);
}
SCBEngine.getInstance().getProducerMicroserviceMeta():这个是获取该服务的一些信息,项目启动时,会将本服务的基本信息注册到注册中心上去。相关代码可以参考:org.apache.servicecomb.serviceregistry.RegistryUtils#init。
本服务信息如下:
我们主要关注这个参数:intfSchemaMetaMgr,即我们在契约中定义的接口,或者是代码中的Controller下的方法。
findRestOperation(selfMicroserviceMeta):首先通过上面的microserviceMeta获取该服务下所有对外暴露的url,然后根据请求的RequestURI和Method来获取OperationLocator,进而对restOperationMeta进行赋值,其内容如下:
可以看到这个restOperationMeta里面的内容十分丰富,和我们接口是完全对应的。
scheduleInvocation
现在我们知道了请求所对应的Operation相关信息了,那么接下来就要进行调用了。但是调用前还要进行一些前置动作,比如参数的校验、流控等等。
现在选取关键代码进行分析:
createInvocation:这个就是创建一个Invocation,Invocation在cse中还是一个比较重要的概念。它分为服务端和消费端,它们之间的区别还是挺大的。创建服务端的Invocation时候它会加载服务端相关的Handler,同理消费端会加载消费端相关的Handler。这次我们创建的是服务端的Invocation,即它会加载org.apache.servicecomb.qps.ProviderQpsFlowControlHandler、org.apache.servicecomb.qps.ProviderQpsFlowControlHandler、org.apache.servicecomb.core.handler.impl.ProducerOperationHandler这3个Handler(当然这些都是可配置的,不过最后一个是默认加载的,具体可以参考这篇文章:浅析CSE中Handler)
runOnExecutor:这个方法超级重要,咱们也详细分析下,最终调用如下:
// org.apache.servicecomb.common.rest.AbstractRestInvocation#invoke
public void invoke() {
try {
Response response = prepareInvoke();
if (response != null) {
sendResponseQuietly(response);
return;
}
doInvoke();
} catch (Throwable e) {
LOGGER.error("unknown rest exception.", e);
sendFailResponse(e);
}
}
prepareInvoke:这个方法主要是执行HttpServerFilter里面的方法,具体可以参考:浅析CSE中的Filter执行时机。如果response不为空就直接返回了。像参数校验就是这个org.apache.servicecomb.common.rest.filter.inner.ServerRestArgsFilter的功能,一般报400 bad request就可以进去跟跟代码了
doInvoke:类似责任链模式,会调用上面说的3个Handler,前面2个Handler咱们不详细分析了,直接看最后一个Handler,即org.apache.servicecomb.core.handler.impl.ProducerOperationHandler
// org.apache.servicecomb.core.handler.impl.ProducerOperationHandler#handle
public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception {
SwaggerProducerOperation producerOperation =
invocation.getOperationMeta().getExtData(Const.PRODUCER_OPERATION);
if (producerOperation == null) {
asyncResp.producerFail(
ExceptionUtils.producerOperationNotExist(invocation.getSchemaId(),
invocation.getOperationName()));
return;
}
producerOperation.invoke(invocation, asyncResp);
}
producerOperation是在启动流程中赋值的,具体代码可以参考:org.apache.servicecomb.core.definition.schema.ProducerSchemaFactory#getOrCreateProducerSchema,其内容如下:
可以看到,这其下内容对应的就是我们代码中接口对应的方法。
接着会调用org.apache.servicecomb.swagger.engine.SwaggerProducerOperation#invoke方法:
// org.apache.servicecomb.swagger.engine.SwaggerProducerOperation#invoke
public void invoke(SwaggerInvocation invocation, AsyncResponse asyncResp) {
if (CompletableFuture.class.equals(producerMethod.getReturnType())) {
completableFutureInvoke(invocation, asyncResp);
return;
}
syncInvoke(invocation, asyncResp);
}
由于我们的同步调用,即直接看syncInvoke方法即可:
public void syncInvoke(SwaggerInvocation invocation, AsyncResponse asyncResp) {
ContextUtils.setInvocationContext(invocation);
Response response = doInvoke(invocation);
ContextUtils.removeInvocationContext();
asyncResp.handle(response);
}
咱们一般上下文传递信息就是这行代码"搞的鬼":ContextUtils.setInvocationContext(invocation),然后再看doInvoke方法:
public Response doInvoke(SwaggerInvocation invocation) {
Response response = null;
try {
invocation.onBusinessMethodStart();
Object[] args = argumentsMapper.toProducerArgs(invocation);
for (ProducerInvokeExtension producerInvokeExtension : producerInvokeExtenstionList) {
producerInvokeExtension.beforeMethodInvoke(invocation, this, args);
}
Object result = producerMethod.invoke(producerInstance, args);
response = responseMapper.mapResponse(invocation.getStatus(), result);
invocation.onBusinessMethodFinish();
invocation.onBusinessFinish();
} catch (Throwable e) {
if (shouldPrintErrorLog(e)){
LOGGER.error("unexpected error operation={}, message={}",
invocation.getInvocationQualifiedName(), e.getMessage());
}
invocation.onBusinessMethodFinish();
invocation.onBusinessFinish();
response = processException(invocation, e);
}
return response;
}
producerInvokeExtenstionList:根据SPI加载ProducerInvokeExtension相关类,系统会自动加载org.apache.servicecomb.swagger.invocation.validator.ParameterValidator,顾名思义这个就是校验请求参数的。如校验@Notnull、@Max(50)这些标签。
producerMethod.invoke(producerInstance, args):通过反射去调用到具体的方法上!
这样整个流程差不多完结了,剩下的就是响应转换和返回响应信息。
总结
这样我们大概了解到了我们的服务是怎么接受和处理请求的,即请求进入我们服务后,首先会获取服务信息,然后根据请求的路径和方法去匹配具体的接口,然后经过Handler和Filter的处理,再通过反射调用到我们的业务代码上,最后返回响应。
整体流程看似简单但是背后隐藏了大量的逻辑,本文也是摘取相对重要的流程进行分析,还有很多地方没有分析到的,比如在调用runOnExecutor之前会进行线程切换,还有同步调用和异步调用的区别以及服务启动时候初始化的逻辑等等。这些内容也是比较有意思,值得深挖。