Files
CTFd/tests/api/v1/test_files.py
gkr 2e06f92c64
Some checks are pending
Linting / Linting (3.11) (push) Waiting to run
Mirror core-theme / mirror (push) Waiting to run
init CTFd source
2025-12-25 09:39:21 +08:00

243 lines
8.2 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import pathlib
import shutil
from io import BytesIO
from CTFd.models import ChallengeFiles, Challenges, Files
from tests.helpers import (
create_ctfd,
destroy_ctfd,
gen_challenge,
gen_file,
login_as_user,
)
def test_api_files_get_non_admin():
app = create_ctfd()
with app.app_context():
chal = gen_challenge(app.db)
gen_file(
app.db,
location="0bf1a55a5cd327c07af15df260979668/bird.swf",
challenge_id=chal.id,
)
with app.test_client() as client:
# test_api_files_get_non_admin
"""Can a user get /api/v1/files if not admin"""
r = client.get("/api/v1/files", json="")
assert r.status_code == 403
# test_api_files_post_non_admin
"""Can a user post /api/v1/files if not admin"""
r = client.post("/api/v1/files")
assert r.status_code == 403
# test_api_file_get_non_admin
"""Can a user get /api/v1/files/<file_id> if not admin"""
r = client.get("/api/v1/files/1", json="")
assert r.status_code == 403
# test_api_file_delete_non_admin
"""Can a user delete /api/v1/files/<file_id> if not admin"""
r = client.delete("/api/v1/files/1", json="")
assert r.status_code == 403
destroy_ctfd(app)
def test_api_files_get_admin():
"""Can a user get /api/v1/files if admin"""
app = create_ctfd()
with app.app_context():
with login_as_user(app, "admin") as client:
r = client.get("/api/v1/files", json="")
assert r.status_code == 200
destroy_ctfd(app)
def test_api_files_post_admin():
"""Can a user post /api/v1/files if admin"""
app = create_ctfd()
with app.app_context():
with login_as_user(app, name="admin") as client:
with client.session_transaction() as sess:
nonce = sess.get("nonce")
r = client.post(
"/api/v1/files",
content_type="multipart/form-data",
data={
"file": (BytesIO(b"test file content"), "test.txt"),
"nonce": nonce,
},
)
assert r.status_code == 200
f = Files.query.filter_by(id=1).first()
assert f.sha1sum == "9032bbc224ed8b39183cb93b9a7447727ce67f9d"
os.remove(os.path.join(app.config["UPLOAD_FOLDER"] + "/" + f.location))
destroy_ctfd(app)
def test_api_file_get_admin():
"""Can a user get /api/v1/files/<file_id> if admin"""
app = create_ctfd()
with app.app_context():
chal = gen_challenge(app.db)
f = gen_file(
app.db,
location="0bf1a55a5cd327c07af15df260979668/bird.swf",
challenge_id=chal.id,
)
assert Files.query.count() == 1
assert ChallengeFiles.query.count() == 1
assert f in chal.files
with login_as_user(app, "admin") as client:
r = client.get("/api/v1/files/1", json="")
assert r.status_code == 200
destroy_ctfd(app)
def test_api_file_delete_admin():
"""Can a user delete /api/v1/files/<file_id> if admin"""
app = create_ctfd()
with app.app_context():
chal = gen_challenge(app.db)
path = os.path.join(
app.config["UPLOAD_FOLDER"], "0bf1a55a5cd327c07af15df260979668", "bird.swf"
)
try:
# Create a fake file
os.makedirs(os.path.dirname(path))
open(path, "a").close()
f = gen_file(
app.db,
location="0bf1a55a5cd327c07af15df260979668/bird.swf",
challenge_id=chal.id,
)
assert Files.query.count() == 1
assert ChallengeFiles.query.count() == 1
assert f in chal.files
# Make sure the file was created
assert os.path.exists(path)
with login_as_user(app, "admin") as client:
r = client.delete("/api/v1/files/1", json="")
assert r.status_code == 200
assert Files.query.count() == 0
assert ChallengeFiles.query.count() == 0
chal = Challenges.query.filter_by(id=1).first()
assert f not in chal.files
# Make sure the API call deleted the file
assert os.path.exists(path) is False
finally:
# Always make sure the file is deleted
shutil.rmtree(os.path.dirname(path), ignore_errors=True)
destroy_ctfd(app)
def test_api_file_custom_location():
"""
Test file uploading with custom location
"""
app = create_ctfd()
with app.app_context():
with login_as_user(app, name="admin") as client:
with client.session_transaction() as sess:
nonce = sess.get("nonce")
r = client.post(
"/api/v1/files",
content_type="multipart/form-data",
data={
"file": (BytesIO(b"test file content"), "test.txt"),
"location": "testing/asdf.txt",
"nonce": nonce,
},
)
assert r.status_code == 200
f = Files.query.filter_by(id=1).first()
assert f.sha1sum == "9032bbc224ed8b39183cb93b9a7447727ce67f9d"
assert f.location == "testing/asdf.txt"
r = client.get("/files/" + f.location)
assert r.get_data(as_text=True) == "test file content"
r = client.get("/api/v1/files/1")
response = r.get_json()
assert (
response["data"]["sha1sum"]
== "9032bbc224ed8b39183cb93b9a7447727ce67f9d"
)
assert response["data"]["location"] == "testing/asdf.txt"
# Test deletion
r = client.delete("/api/v1/files/1", json="")
assert r.status_code == 200
assert Files.query.count() == 0
target = pathlib.Path(app.config["UPLOAD_FOLDER"]) / f.location
assert target.exists() is False
# Test invalid locations
invalid_paths = [
"testing/prefix/asdf.txt",
"/testing/asdf.txt",
"asdf.txt",
]
for path in invalid_paths:
r = client.post(
"/api/v1/files",
content_type="multipart/form-data",
data={
"file": (BytesIO(b"test file content"), "test.txt"),
"location": path,
"nonce": nonce,
},
)
assert r.status_code == 400
destroy_ctfd(app)
def test_api_file_overwrite_by_location():
"""
Test file overwriting with a specific location
"""
app = create_ctfd()
with app.app_context():
with login_as_user(app, name="admin") as client:
with client.session_transaction() as sess:
nonce = sess.get("nonce")
r = client.post(
"/api/v1/files",
content_type="multipart/form-data",
data={
"file": (BytesIO(b"test file content"), "test.txt"),
"location": "testing/asdf.txt",
"nonce": nonce,
},
)
assert r.status_code == 200
f = Files.query.filter_by(id=1).first()
r = client.get("/files/" + f.location)
assert r.get_data(as_text=True) == "test file content"
r = client.post(
"/api/v1/files",
content_type="multipart/form-data",
data={
"file": (BytesIO(b"testing new uploaded file content"), "test.txt"),
"location": "testing/asdf.txt",
"nonce": nonce,
},
)
assert r.status_code == 200
f = Files.query.filter_by(id=1).first()
r = client.get("/files/" + f.location)
assert f.sha1sum == "0ee7eb85ac0b8d8ae03f3080589157cde553b13f"
assert r.get_data(as_text=True) == "testing new uploaded file content"
destroy_ctfd(app)