import os
import uuid
import json
import threading
import time
from flask import (
from werkzeug.utils import secure_filename
from flask_limiter import Limiter
import subprocess
import openai
import logging
import magic
import hashlib
import re
import ipaddress
# Set up logging
logger = logging.getLogger(__name__)
# Initialize the Flask application
app = Flask(__name__)
app.config["UPLOAD_FOLDER"] = "/tmp"
app.config["ALLOWED_EXTENSIONS"] = {"png", "jpg", "jpeg"}
app.config["ALLOWED_MIME_TYPES"] = {"image/png", "image/jpeg"}
app.config["MAX_CONTENT_LENGTH"] = 1 * 1024 * 1024 # 1 mb
# limiter function
def limiterFn() -> str:
if request.headers.getlist("X-Forwarded-For"):
return request.headers.getlist("X-Forwarded-For")[0]
return request.remote_addr
def limiterExemptFn() -> bool:
ip = ipaddress.ip_address(request.remote_addr or "")
return ip.is_private
# Initialize the rate limiter
limiter = Limiter(
default_limits=["20 per minute"], # Allow 10 requests per minute per IP
# Whitelisted commands to avoid harmful command execution
ALLOWED_COMMANDS = ["convert", "gm", "mogrify"]
# Blacklisted commands to prevent harmful command execution
# "sh ", # change to add space after sh... so wont trigger commands like sha256
# Read API keys from Docker secrets
api_key = os.getenv("OPENAI_API_KEY")
api_key_path = os.getenv("OPENAI_API_KEY_FILE")
# Read the API key from the file
if api_key:
openai.api_key = api_key.strip()
elif api_key_path:
with open(api_key_path, "r") as file:
openai.api_key =
raise ValueError("Error 101") # API key file path not set in environment variables
# Perform file validation to check allowed file
def allowed_file(filename):
return (
"." in filename
and filename.rsplit(".", 1)[1].lower() in app.config["ALLOWED_EXTENSIONS"]
# Perform file validation to check allowed MIME type
def allowed_mime_type(file_path):
mime = magic.Magic(mime=True)
file_mime_type = mime.from_file(file_path)
return file_mime_type in app.config["ALLOWED_MIME_TYPES"]
# Perform file validation to check allowed file signature
def allowed_file_signature(file_path):
with open(file_path, "rb") as file:
header =
if header.startswith(b"\x89PNG"):
return True
elif header[:3] == b"\xff\xd8\xff":
return True
return False
# Generate GraphicsMagick command using OpenAI
def gm(instructions, inputfile, outputfile, hashfile):
# Construct the prompt
prompt = (
f"Take in an instruction and return a GraphicsMagick command that can be executed "
f"to carry out the instruction. The input file is {inputfile} while the output file is {outputfile}. "
f"In addition, the hashed user output file is {hashfile}. "
f'Return the prompt in a JSON format like this: {{"command": <executable GraphicsMagick command>}}'
# Log the prompt and instructions
logger.debug(f"[prompt] {prompt}")
logger.debug(f"[instructions] {instructions}")
# Generate the ImageMagick command using OpenAI GPT
response = openai.ChatCompletion.create(
{"role": "user", "content": prompt},
{"role": "user", "content": instructions},
# Extract and return the command from the JSON response
response_text = response["choices"][0]["message"]["content"].strip()
command_response = json.loads(response_text)
logger.debug(f"[LLM RETURNED: command_response] {command_response}")
command = command_response.get("command", "")
# Ensure the command does not contain harmful characters
if any(op in command for op in ["&&", "|", "&", "||"]):
raise ValueError("Not allowed characters in command")
# Ensure the command is in the allowed list
if not any(command.startswith(allowed_cmd) for allowed_cmd in ALLOWED_COMMANDS):
raise ValueError("Command allowed, but some error in processing")
# Ensure the command does not contain blacklisted commands
if any(blacklisted_cmd in command for blacklisted_cmd in BLACKLISTED_COMMANDS):
raise ValueError("Harmful command detected")
return command_response
except json.JSONDecodeError:
logger.error(f"Failed to decode JSON response: {response_text}")
raise ValueError(f"Failed to decode JSON response: {response_text}")
# Clean up files after processing
# CSIT team do decide if the cleanup delay. default is in 30sec
def cleanup_files(file_paths, delay=30):
def delayed_cleanup():
for file_path in file_paths:
if os.path.exists(file_path):
logger.debug(f"Removed file: {file_path}")
except Exception as e:
logger.error(f"Error removing file {file_path}: {e}")
with open("/app/output.txt", "w") as f:
f.write("This is for troubleshooting purposes.")
logger.debug(f"Cleaned up output.txt")
except Exception as e:
logger.error(f"Error resetting output.txt: {e}")
@app.route("/", methods=["GET", "POST"])
@limiter.limit("10 per minute") # Apply rate limiting per route
def index():
if request.method == "POST":
# Check if the post request has the file part
if "file" not in request.files:
return "No file part"
file = request.files["file"]
# If user does not select file, browser also submits an empty part without filename
if file.filename == "":
return "No selected file"
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file_path = os.path.join(
app.config["UPLOAD_FOLDER"], f"{uuid.uuid4().hex}_{filename}"
# Generate a unique hash for the user (based on the file or timestamp)
user_hash = hashlib.md5(
hash_txt_path = f"/app/hash_{user_hash}.txt"
user_input = request.form["user_input"]
output_file = f"{file_path}_output.png"
command_response = gm(user_input, file_path, output_file, hash_txt_path)
command = command_response["command"]
# Write the generated command or output to the unique hash.txt file
with open(hash_txt_path, "w") as f:
# Log the command
logger.debug(f"[Generated Command] {command}")
# Execute the generated command, env={}, shell=True, check=True)
# Generate URLs for the input and output images
input_img_url = url_for(
"uploaded_file", filename=os.path.basename(file_path)
output_img_url = url_for(
"uploaded_file", filename=os.path.basename(output_file)
cleanup_files([file_path, output_file, hash_txt_path])
# Return unique hash file for the user
result = f"""
<h3>Original Image:</h3>
<img src="{input_img_url}" alt="Original Image" width="600" height="600">
<h3>Processed Image:</h3>
<img src="{output_img_url}" alt="Processed Image" width="600" height="600">
<a href="/hash/{user_hash}.txt" target="_blank">View your hash.txt</a>
except subprocess.CalledProcessError as e:
result = f"Error executing command: {e}"
# Write the generated command to output.txt as a demonstration
output_txt_path = "/app/output.txt"
with open(output_txt_path, "w") as f:
except ValueError as e:
# Handle invalid commands and write to hash.txt
user_hash = hashlib.md5(
hash_txt_path = f"/app/hash_{user_hash}.txt"
# Write the error message to the unique hash.txt file
with open(hash_txt_path, "w") as f:
f.write(f"Error in command generation: {e}")
# Cleanup the uploaded file and return a user-friendly message
cleanup_files([file_path, hash_txt_path])
result = f"""
<p>Command not allowed or invalid. Please check your input.</p>
<a href="/hash/{user_hash}.txt" target="_blank">View your error details in output</a>
return render_template("index.html", result=result)
return render_template("index.html")
def uploaded_file(filename):
return send_from_directory(app.config["UPLOAD_FOLDER"], filename)
def censor_hash(content):
# Regular expression to match MD5 hashes (32 hex digits)
censored_content = re.sub(r"hash_[a-fA-F0-9]{32}", "hash_***", content)
return censored_content
# Route to serve each user's hash file
def serve_hash_file(user_hash):
hash_txt_path = f"/app/hash_{user_hash}.txt"
# Open and read the file content
with open(hash_txt_path, "r") as file:
content =
# Censor the content. Change all `hash_<md5>.txt` to `hash_***.txt`
censored_content = censor_hash(content)
# Return the censored content as a response
return Response(censored_content, mimetype="text/plain")
except Exception as e:
default_content = (
"This is for troubleshooting purposes. Unable to find hash file."
return Response(default_content, mimetype="text/plain")
if __name__ == "__main__":"", port=5000)