222 lines
7.1 KiB
Python
222 lines
7.1 KiB
Python
#!/usr/bin/env python3
|
|
import os
|
|
import hmac
|
|
import hashlib
|
|
import requests
|
|
import sys
|
|
import time
|
|
|
|
# === Configuration via environment or constants ===
|
|
|
|
FREEBOX_HOST = os.environ.get("FREEBOX_HOST")
|
|
API_BASE = f"https://{FREEBOX_HOST}/api/latest"
|
|
# or “/api/latest” depending on your Freebox version
|
|
# If your reversed endpoint uses “/api/latest/domain/…” you might combine
|
|
# that base separately
|
|
|
|
APP_ID = os.environ.get("FREEBOX_APP_ID", "fr.dotty.updatecert")
|
|
APP_NAME = os.environ.get("FREEBOX_APP_NAME", "Certificate Auto Updater")
|
|
APP_VERSION = os.environ.get("FREEBOX_APP_VERSION", "0.0.1")
|
|
DEVICE_NAME = os.environ.get("FREEBOX_DEVICE_NAME", "zem")
|
|
|
|
DOMAIN = os.environ.get("DOMAIN", "box.dotty.fr")
|
|
|
|
# Paths for letsencrypt files (adjust to your environment)
|
|
LE_CERT_PATH = f"/etc/letsencrypt/live/{DOMAIN}/cert.pem"
|
|
LE_KEY_PATH = f"/etc/letsencrypt/live/{DOMAIN}/privkey.pem"
|
|
LE_CHAIN_PATH = f"/etc/letsencrypt/live/{DOMAIN}/chain.pem"
|
|
|
|
|
|
# Requests verify
|
|
REQUESTS_VERIFY = True
|
|
|
|
|
|
# === Utility to read files ===
|
|
def read_file(filepath):
|
|
with open(filepath, "r") as f:
|
|
return f.read()
|
|
|
|
|
|
# === Freebox API authentication / session logic ===
|
|
|
|
|
|
def api_get(path, session_token=None):
|
|
url = API_BASE + path
|
|
headers = {}
|
|
if session_token:
|
|
headers["X-Fbx-App-Auth"] = session_token
|
|
resp = requests.get(url, headers=headers, verify=REQUESTS_VERIFY)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
|
|
def api_post(path, session_token, data):
|
|
url = API_BASE + path
|
|
headers = {"Content-Type": "application/json", "X-Fbx-App-Auth": session_token}
|
|
resp = requests.post(url, headers=headers, json=data, verify=REQUESTS_VERIFY)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
|
|
def api_put(path, session_token, data):
|
|
url = API_BASE + path
|
|
headers = {"Content-Type": "application/json", "X-Fbx-App-Auth": session_token}
|
|
resp = requests.put(url, headers=headers, json=data, verify=REQUESTS_VERIFY)
|
|
resp.raise_for_status()
|
|
return resp
|
|
|
|
|
|
def api_delete(path, session_token, data):
|
|
url = API_BASE + path
|
|
headers = {"Content-Type": "application/json", "X-Fbx-App-Auth": session_token}
|
|
resp = requests.delete(url, headers=headers, json=data, verify=REQUESTS_VERIFY)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
|
|
def authorize_app(session_token=None):
|
|
# Request authorization (app_id, name, version, device) => returns track_id
|
|
path = "/login/authorize/"
|
|
payload = {
|
|
"app_id": APP_ID,
|
|
"app_name": APP_NAME,
|
|
"app_version": APP_VERSION,
|
|
"device_name": DEVICE_NAME,
|
|
}
|
|
# This call may or may not require session token; docs sometimes show
|
|
# no X-Fbx-App-Auth yet
|
|
url = API_BASE + path
|
|
headers = {"Content-Type": "application/json"}
|
|
resp = requests.post(url, headers=headers, json=payload, verify=REQUESTS_VERIFY)
|
|
print(resp.json())
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
|
|
def fetch_challenge():
|
|
# GET /login/
|
|
resp = api_get("/login/")
|
|
return resp["result"]["challenge"], resp["result"]["password_salt"]
|
|
|
|
|
|
def open_session(app_token, challenge, password_salt):
|
|
# Compute the HMAC+sha1 of (challenge + salt + app_token?) based on docs
|
|
# Docs: password = HMAC_SHA1(app_token, challenge) (or something similar)
|
|
# The exact formula depends on API version—this is based on examples
|
|
# in clients.
|
|
|
|
# Based on freebox client examples:
|
|
# signature = HMAC_SHA1(app_token, challenge)
|
|
signature = hmac.new(
|
|
app_token.encode("utf-8"), msg=challenge.encode("utf-8"), digestmod=hashlib.sha1
|
|
).hexdigest()
|
|
|
|
resp = requests.post(
|
|
API_BASE + "/login/session/",
|
|
json={"app_id": APP_ID, "password": signature},
|
|
verify=REQUESTS_VERIFY,
|
|
)
|
|
resp.raise_for_status()
|
|
return resp.json()["result"]["session_token"]
|
|
|
|
|
|
def close_session(session_token):
|
|
# POST /login/logout
|
|
payload = {}
|
|
_ = api_post("/login/logout/", session_token, payload)
|
|
return
|
|
|
|
|
|
def ensure_app_authorized():
|
|
# You might store app_token persistently (in a file) so you don't re-authorize each time
|
|
# For simplicity here: always do authorize + wait
|
|
resp = authorize_app()
|
|
if resp["succes"]:
|
|
track_id = resp["result"]["track_id"]
|
|
print("Authorization requested, track_id =", track_id)
|
|
# Poll authorization status
|
|
for _ in range(60):
|
|
time.sleep(2)
|
|
status = api_get(f"/login/authorize/{track_id}")
|
|
st = status["result"]["status"]
|
|
print("Authorization status:", st)
|
|
if st == "granted":
|
|
return True
|
|
elif st == "denied":
|
|
raise RuntimeError("Freebox user denied authorization")
|
|
raise RuntimeError("Authorization timed out")
|
|
|
|
|
|
# === Main flow: login => upload cert => logout ===
|
|
|
|
|
|
def main():
|
|
app_token = os.environ.get("FREEBOX_APP_TOKEN")
|
|
|
|
if not app_token:
|
|
print("No FREEBOX_APP_TOKEN found; requesting authorization.")
|
|
ensure_app_authorized()
|
|
# After authorization, you would fetch your app_token
|
|
# via /login/authorize result or similar call.
|
|
# But I'll assume the environment will supply it after initial setup.
|
|
app_token = os.environ.get("FREEBOX_APP_TOKEN")
|
|
if not app_token:
|
|
print("ERROR: app_token still not provided. Abort.")
|
|
sys.exit(1)
|
|
|
|
# Step 1: get challenge / salt
|
|
challenge, password_salt = fetch_challenge()
|
|
# Step 2: open session
|
|
session_token = open_session(app_token, challenge, password_salt)
|
|
print("Session opened, token:", session_token)
|
|
|
|
try:
|
|
# Step 3: read your certificates
|
|
cert_pem = read_file(LE_CERT_PATH)
|
|
key_pem = read_file(LE_KEY_PATH)
|
|
chain_pem = read_file(LE_CHAIN_PATH)
|
|
|
|
# Step 4: call your reversed endpoint to upload
|
|
import_payload = {
|
|
"key_type": "rsa",
|
|
"cert_pem": cert_pem,
|
|
"key_pem": key_pem,
|
|
"intermediates": chain_pem,
|
|
}
|
|
# Note: your reversed endpoint is under “/domain/owned/…/import_cert”
|
|
# You may need to adjust base path, e.g. API_BASE + "/domain/owned/…"
|
|
# Here I assume you use “/domain/owned/{domain}/import_cert” appended
|
|
# to API_BASE
|
|
|
|
print(
|
|
"Delete response:",
|
|
api_delete(
|
|
f"/domain/owned/{DOMAIN}",
|
|
session_token,
|
|
{
|
|
"id": DOMAIN,
|
|
},
|
|
),
|
|
)
|
|
|
|
# didn't find a cleaner way to "update" the certificates other than
|
|
# remove domain and add it back with updated certificates
|
|
api_post("/domain/owned/", session_token, {"id": DOMAIN})
|
|
resp = api_post(
|
|
f"/domain/owned/{DOMAIN}/import_cert", session_token, import_payload
|
|
)
|
|
print("Upload response:", resp)
|
|
api_put("/domain/config/", session_token, {"default_domain": DOMAIN})
|
|
print(
|
|
"Get config after default set:", api_get("/domain/config/", session_token)
|
|
)
|
|
|
|
finally:
|
|
# Step 5: logout / close session
|
|
close_session(session_token)
|
|
print("Session closed")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|