import os
import uuid
import json
import threading
import time
from flask import (
Flask,
request,
render_template,
send_from_directory,
url_for,
Response,
)
from werkzeug.utils import secure_filename
from flask_limiter import Limiter
import subprocess
import openai
import logging
import magic
import hashlib
import re
import ipaddress
# Set up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# Initialize the Flask application
app = Flask(__name__)
app.config["UPLOAD_FOLDER"] = "/tmp"
app.config["ALLOWED_EXTENSIONS"] = {"png", "jpg", "jpeg"}
app.config["ALLOWED_MIME_TYPES"] = {"image/png", "image/jpeg"}
app.config["MAX_CONTENT_LENGTH"] = 1 * 1024 * 1024 # 1 mb
# limiter function
def limiterFn() -> str:
if request.headers.getlist("X-Forwarded-For"):
return request.headers.getlist("X-Forwarded-For")[0]
return request.remote_addr
def limiterExemptFn() -> bool:
ip = ipaddress.ip_address(request.remote_addr or "127.0.0.1")
return ip.is_private
# Initialize the rate limiter
limiter = Limiter(
key_func=limiterFn,
app=app,
default_limits=["20 per minute"], # Allow 10 requests per minute per IP
default_limits_exempt_when=limiterExemptFn,
)
# Whitelisted commands to avoid harmful command execution
ALLOWED_COMMANDS = ["convert", "gm", "mogrify"]
# Blacklisted commands to prevent harmful command execution
BLACKLISTED_COMMANDS = BLACKLISTED_COMMANDS = [
"rm",
"mv",
"cp",
"shutdown",
"reboot",
"py",
"webapp",
"env",
"chmod",
"chown",
"nc",
"ping",
"nslookup",
"finger",
"traceroute",
"ip",
"ifconfig",
"apk",
"apt-get",
"dpkg",
"dnf",
"yum",
"python",
"python3",
"python3.8",
"pip",
"pipx",
"kill",
"ps",
"top",
"curl",
"wget",
"bash",
# "sh ", # change to add space after sh... so wont trigger commands like sha256
"history",
"hosts",
"bin",
".p*",
".*y",
"sleep",
"perl",
]
# Read API keys from Docker secrets
api_key = os.getenv("OPENAI_API_KEY")
api_key_path = os.getenv("OPENAI_API_KEY_FILE")
# Read the API key from the file
if api_key:
openai.api_key = api_key.strip()
elif api_key_path:
with open(api_key_path, "r") as file:
openai.api_key = file.read().strip()
else:
raise ValueError("Error 101") # API key file path not set in environment variables
# Perform file validation to check allowed file
def allowed_file(filename):
return (
"." in filename
and filename.rsplit(".", 1)[1].lower() in app.config["ALLOWED_EXTENSIONS"]
)
# Perform file validation to check allowed MIME type
def allowed_mime_type(file_path):
mime = magic.Magic(mime=True)
file_mime_type = mime.from_file(file_path)
return file_mime_type in app.config["ALLOWED_MIME_TYPES"]
# Perform file validation to check allowed file signature
def allowed_file_signature(file_path):
with open(file_path, "rb") as file:
header = file.read(8)
if header.startswith(b"\x89PNG"):
return True
elif header[:3] == b"\xff\xd8\xff":
return True
return False
# Generate GraphicsMagick command using OpenAI
def gm(instructions, inputfile, outputfile, hashfile):
# Construct the prompt
prompt = (
f"Take in an instruction and return a GraphicsMagick command that can be executed "
f"to carry out the instruction. The input file is {inputfile} while the output file is {outputfile}. "
f"In addition, the hashed user output file is {hashfile}. "
f'Return the prompt in a JSON format like this: {{"command": <executable GraphicsMagick command>}}'
)
# Log the prompt and instructions
logger.debug(f"[prompt] {prompt}")
logger.debug(f"[instructions] {instructions}")
# Generate the ImageMagick command using OpenAI GPT
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "user", "content": prompt},
{"role": "user", "content": instructions},
],
max_tokens=100,
)
# Extract and return the command from the JSON response
response_text = response["choices"][0]["message"]["content"].strip()
try:
command_response = json.loads(response_text)
logger.debug(f"[LLM RETURNED: command_response] {command_response}")
command = command_response.get("command", "")
# Ensure the command does not contain harmful characters
if any(op in command for op in ["&&", "|", "&", "||"]):
raise ValueError("Not allowed characters in command")
# Ensure the command is in the allowed list
if not any(command.startswith(allowed_cmd) for allowed_cmd in ALLOWED_COMMANDS):
raise ValueError("Command allowed, but some error in processing")
# Ensure the command does not contain blacklisted commands
if any(blacklisted_cmd in command for blacklisted_cmd in BLACKLISTED_COMMANDS):
raise ValueError("Harmful command detected")
return command_response
except json.JSONDecodeError:
logger.error(f"Failed to decode JSON response: {response_text}")
raise ValueError(f"Failed to decode JSON response: {response_text}")
# Clean up files after processing
# CSIT team do decide if the cleanup delay. default is in 30sec
def cleanup_files(file_paths, delay=30):
def delayed_cleanup():
time.sleep(delay)
for file_path in file_paths:
try:
if os.path.exists(file_path):
os.remove(file_path)
logger.debug(f"Removed file: {file_path}")
except Exception as e:
logger.error(f"Error removing file {file_path}: {e}")
try:
with open("/app/output.txt", "w") as f:
f.write("This is for troubleshooting purposes.")
logger.debug(f"Cleaned up output.txt")
except Exception as e:
logger.error(f"Error resetting output.txt: {e}")
threading.Thread(target=delayed_cleanup).start()
@app.route("/", methods=["GET", "POST"])
@limiter.limit("10 per minute") # Apply rate limiting per route
def index():
if request.method == "POST":
# Check if the post request has the file part
if "file" not in request.files:
return "No file part"
file = request.files["file"]
# If user does not select file, browser also submits an empty part without filename
if file.filename == "":
return "No selected file"
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file_path = os.path.join(
app.config["UPLOAD_FOLDER"], f"{uuid.uuid4().hex}_{filename}"
)
file.save(file_path)
# Generate a unique hash for the user (based on the file or timestamp)
user_hash = hashlib.md5(
f"{uuid.uuid4().hex}_{filename}".encode()
).hexdigest()
hash_txt_path = f"/app/hash_{user_hash}.txt"
user_input = request.form["user_input"]
try:
output_file = f"{file_path}_output.png"
command_response = gm(user_input, file_path, output_file, hash_txt_path)
command = command_response["command"]
# Write the generated command or output to the unique hash.txt file
with open(hash_txt_path, "w") as f:
f.write(command)
# Log the command
logger.debug(f"[Generated Command] {command}")
# Execute the generated command
subprocess.run(command, env={}, shell=True, check=True)
# Generate URLs for the input and output images
input_img_url = url_for(
"uploaded_file", filename=os.path.basename(file_path)
)
output_img_url = url_for(
"uploaded_file", filename=os.path.basename(output_file)
)
cleanup_files([file_path, output_file, hash_txt_path])
# Return unique hash file for the user
result = f"""
<h3>Original Image:</h3>
<img src="{input_img_url}" alt="Original Image" width="600" height="600">
<h3>Processed Image:</h3>
<img src="{output_img_url}" alt="Processed Image" width="600" height="600">
<a href="/hash/{user_hash}.txt" target="_blank">View your hash.txt</a>
"""
except subprocess.CalledProcessError as e:
result = f"Error executing command: {e}"
# Write the generated command to output.txt as a demonstration
output_txt_path = "/app/output.txt"
with open(output_txt_path, "w") as f:
f.write(command)
os.remove(file_path)
except ValueError as e:
# Handle invalid commands and write to hash.txt
user_hash = hashlib.md5(
f"{uuid.uuid4().hex}_{filename}".encode()
).hexdigest()
hash_txt_path = f"/app/hash_{user_hash}.txt"
# Write the error message to the unique hash.txt file
with open(hash_txt_path, "w") as f:
f.write(f"Error in command generation: {e}")
# Cleanup the uploaded file and return a user-friendly message
cleanup_files([file_path, hash_txt_path])
result = f"""
<h3>Error:</h3>
<p>Command not allowed or invalid. Please check your input.</p>
<a href="/hash/{user_hash}.txt" target="_blank">View your error details in output</a>
"""
return render_template("index.html", result=result)
return render_template("index.html")
@app.route("/tmp/<filename>")
def uploaded_file(filename):
return send_from_directory(app.config["UPLOAD_FOLDER"], filename)
def censor_hash(content):
# Regular expression to match MD5 hashes (32 hex digits)
censored_content = re.sub(r"hash_[a-fA-F0-9]{32}", "hash_***", content)
return censored_content
# Route to serve each user's hash file
@app.route("/hash/<user_hash>.txt")
def serve_hash_file(user_hash):
hash_txt_path = f"/app/hash_{user_hash}.txt"
try:
# Open and read the file content
with open(hash_txt_path, "r") as file:
content = file.read()
# Censor the content. Change all `hash_<md5>.txt` to `hash_***.txt`
censored_content = censor_hash(content)
# Return the censored content as a response
return Response(censored_content, mimetype="text/plain")
except Exception as e:
default_content = (
"This is for troubleshooting purposes. Unable to find hash file."
)
return Response(default_content, mimetype="text/plain")
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)