|
| 1 | +"""Create a simple firmware sign and verify server.""" |
| 2 | + |
| 3 | +# Copyright The Mbed TLS Contributors |
| 4 | +# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later |
| 5 | +# |
| 6 | + |
| 7 | +# The the script assumes that gpg agent chaching should be |
| 8 | +# disabled. The password provided by the user is used to confirm |
| 9 | +# that the user is authorised to sign the firmware. |
| 10 | + |
| 11 | +# Please configure gpg-agent with `max-cache-ttl 0` when deploying. |
| 12 | + |
| 13 | +import flask |
| 14 | +import random |
| 15 | +import os |
| 16 | +import string |
| 17 | +import shlex |
| 18 | +import shutil |
| 19 | +import subprocess |
| 20 | +import pathlib |
| 21 | + |
| 22 | +from typing import Text , Tuple |
| 23 | + |
| 24 | + |
| 25 | +################ /* Configuration Parameters ################# |
| 26 | +app = flask.Flask(__name__) |
| 27 | + |
| 28 | +# Allow the host to set the IP for the server3 |
| 29 | +server_ip = "0.0.0.0" # Will launch the server but dl links won't work. |
| 30 | +server_port = "5000" # Port to bind to. |
| 31 | +cleanup_on_startup = True # Cleanup the temporary directory on launch. |
| 32 | + |
| 33 | +################ Configuration Parameters */ ################# |
| 34 | + |
| 35 | +# ENV overrides |
| 36 | +if "MBEDTLS_FW_SIGN_SERVER_IP" in os.environ: |
| 37 | + server_ip = os.environ["MBEDTLS_FW_SIGN_SERVER_IP"] |
| 38 | + |
| 39 | +if "MBEDTLS_FW_SIGN_SERVER_PORT" in os.environ: |
| 40 | + server_port = os.environ["MBEDTLS_FW_SIGN_SERVER_PORT"] |
| 41 | + |
| 42 | +download_pfix = "http://{}:{}/".format(server_ip, server_port) |
| 43 | +sign_cmd = ("gpg --detach-sign --pinentry-mode loopback" |
| 44 | + " --passphrase '{pasw}' --armor --batch {tarb}") |
| 45 | +verify_cmd = "gpg --verify {sig} {tarb}" |
| 46 | +sum_cmd = "sha256sum {tarb}" |
| 47 | +zip_cmd = "zip -r {name}.zip ." |
| 48 | + |
| 49 | + |
| 50 | +def do_shell_exec(exec_string: str) -> Tuple[int, str, str]: |
| 51 | + """Helper function to do shell execution. |
| 52 | +
|
| 53 | + exec_string - String to execute (as is - function will split) |
| 54 | + expected_result - Expected return code. |
| 55 | + """ |
| 56 | + |
| 57 | + shell_process = subprocess.Popen( |
| 58 | + shlex.split(exec_string), |
| 59 | + stdin=subprocess.PIPE, |
| 60 | + stdout=subprocess.PIPE, |
| 61 | + stderr=subprocess.PIPE) |
| 62 | + (shell_stdout, shell_stderr) = shell_process.communicate() |
| 63 | + |
| 64 | + return (shell_process.returncode, |
| 65 | + shell_stdout.decode("utf-8"), |
| 66 | + shell_stderr.decode("utf-8")) |
| 67 | + |
| 68 | + |
| 69 | +def randomise_path(name: str) -> str: |
| 70 | + """Attach a pseudo-ranom postfix of an underscore and 3 uppercase characters.""" |
| 71 | + pfix = "".join(random.choices(string.ascii_uppercase, k=3)) |
| 72 | + return os.path.join("tmp", "{}_{}".format(name, pfix)) |
| 73 | + |
| 74 | + |
| 75 | +@app.route('/') |
| 76 | +def main() -> flask.typing.ResponseReturnValue: |
| 77 | + return flask.render_template("index.html") |
| 78 | + |
| 79 | + |
| 80 | +@app.route('/<path>/<filename>') |
| 81 | +def download(path: str, filename: str) -> flask.typing.ResponseReturnValue: |
| 82 | + path = os.path.join("tmp", path) |
| 83 | + return flask.send_from_directory(path, filename, as_attachment=True) |
| 84 | + |
| 85 | + |
| 86 | +@app.route('/sign', methods=['POST']) |
| 87 | +def sign() -> flask.typing.ResponseReturnValue: |
| 88 | + if flask.request.method == 'POST': |
| 89 | + |
| 90 | + # Accept the file |
| 91 | + f = flask.request.files['rel_file'] |
| 92 | + f.save(f.filename) |
| 93 | + |
| 94 | + pwd = flask.request.form.get("passw") |
| 95 | + |
| 96 | + # Update the names with the new tmp directory |
| 97 | + artf_name = f.filename |
| 98 | + artf_basename = artf_name.split(".")[0] |
| 99 | + sha_fname = artf_name + ".sha256.txt" |
| 100 | + sign_fname = artf_name + ".asc" |
| 101 | + tmp_workdir = randomise_path(artf_basename) |
| 102 | + |
| 103 | + if cleanup_on_startup: |
| 104 | + shutil.rmtree(tmp_workdir) |
| 105 | + archive_name = artf_basename + ".zip" |
| 106 | + |
| 107 | + # Create a workdir |
| 108 | + pathlib.Path(tmp_workdir).mkdir(parents=True, exist_ok=True) |
| 109 | + |
| 110 | + artf_name = os.path.join(tmp_workdir, artf_name) |
| 111 | + sha_fname = os.path.join(tmp_workdir, sha_fname) |
| 112 | + sign_fname = os.path.join(tmp_workdir, sign_fname) |
| 113 | + archive_name = os.path.join(tmp_workdir, archive_name) |
| 114 | + |
| 115 | + # Move the tarballs |
| 116 | + os.rename(f.filename, artf_name) |
| 117 | + |
| 118 | + # Calculate the sha256 |
| 119 | + ret_code, _stdout, _sterr = do_shell_exec(sum_cmd.format(tarb=artf_name)) |
| 120 | + if ret_code == 0: |
| 121 | + with open(sha_fname, "w") as F: |
| 122 | + F.write(_stdout) |
| 123 | + sha = shlex.split(_stdout)[0] |
| 124 | + else: |
| 125 | + raise Exception("Shasum failed!") |
| 126 | + |
| 127 | + # Sign the tarball |
| 128 | + ret_code, _stdout, _sterr = do_shell_exec(sign_cmd.format(pasw=pwd, tarb=artf_name)) |
| 129 | + # If password is incorrect or other error exit. |
| 130 | + if ret_code != 0: |
| 131 | + return flask.render_template("signed.html", |
| 132 | + artf_name="Not Authorised", |
| 133 | + artf_url=download_pfix, |
| 134 | + sha="Not Authorised", |
| 135 | + sha_url=download_pfix, |
| 136 | + sign_name="Not Authorised", |
| 137 | + sign_url=download_pfix, |
| 138 | + zip_name="Not Authorised", |
| 139 | + zip_url=download_pfix) |
| 140 | + # Zip everything |
| 141 | + cwd = os.getcwd() |
| 142 | + os.chdir(tmp_workdir) |
| 143 | + ret_code, _stdout, _sterr = do_shell_exec( |
| 144 | + zip_cmd.format(name=artf_basename)) |
| 145 | + if ret_code != 0: |
| 146 | + raise Exception("zip Failed") |
| 147 | + os.chdir(cwd) |
| 148 | + |
| 149 | + # Calculate the download urls |
| 150 | + sha_url = download_pfix + "/".join(sha_fname.split("/")[1:]) |
| 151 | + artf_url = download_pfix + "/".join(artf_name.split("/")[1:]) |
| 152 | + sign_url = download_pfix + "/".join(sign_fname.split("/")[1:]) |
| 153 | + archive_url = download_pfix + "/".join(archive_name.split("/")[1:]) |
| 154 | + |
| 155 | + # Return the results page |
| 156 | + return flask.render_template("signed.html", |
| 157 | + artf_name=os.path.basename(artf_name), |
| 158 | + artf_url=artf_url, |
| 159 | + sha=sha, sha_url=sha_url, |
| 160 | + sign_name=os.path.basename(sign_fname), |
| 161 | + sign_url=sign_url, |
| 162 | + zip_name=os.path.basename(archive_name), |
| 163 | + zip_url=archive_url) |
| 164 | + |
| 165 | + |
| 166 | +@app.route('/verify', methods=['POST']) |
| 167 | +def verify() -> flask.typing.ResponseReturnValue: |
| 168 | + if flask.request.method == 'POST': |
| 169 | + # Create a workdir |
| 170 | + tmp_workdir = randomise_path("verification") |
| 171 | + pathlib.Path(tmp_workdir).mkdir(parents=True, exist_ok=True) |
| 172 | + |
| 173 | + # Accept the files |
| 174 | + f = flask.request.files['rel_file'] |
| 175 | + f.save(f.filename) |
| 176 | + |
| 177 | + s = flask.request.files['sig_file'] |
| 178 | + s.save(s.filename) |
| 179 | + |
| 180 | + artf_fname = os.path.join(tmp_workdir, f.filename) |
| 181 | + sig_fname = os.path.join(tmp_workdir, s.filename) |
| 182 | + |
| 183 | + # Move the files |
| 184 | + os.rename(f.filename, artf_fname) |
| 185 | + os.rename(s.filename, sig_fname) |
| 186 | + |
| 187 | + # Verify the archive's signature |
| 188 | + ret_code, _stdout, _sterr = do_shell_exec( |
| 189 | + verify_cmd.format(sig=sig_fname, tarb=artf_fname)) |
| 190 | + verified = "Success" if ret_code == 0 else "Failed" |
| 191 | + |
| 192 | + # Return the result |
| 193 | + return flask.render_template("verification.html", |
| 194 | + verified=verified, |
| 195 | + result=_sterr) |
| 196 | + |
| 197 | +if __name__ == '__main__': |
| 198 | + app.run(host=server_ip, debug=False) |
0 commit comments