Files
codex-controller-loop/scripts/release-orchestrator.py
eric ebb6b488fe
Some checks failed
distribution-gate / distribution-gate (push) Failing after 1m56s
feat: 0.1.0
2026-04-04 18:41:34 +02:00

502 lines
18 KiB
Python
Executable File

#!/usr/bin/env python3
"""Deterministic release artifact orchestration for codex-controller-loop."""
from __future__ import annotations
import argparse
import hashlib
import json
import os
import re
import subprocess
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Iterable, List
ROOT = Path(__file__).resolve().parents[1]
CONFIG_PATH = ROOT / "scripts" / "release-orchestrator.config.json"
CONTRACT_PATH = ROOT / "docs" / "distribution-contract.json"
def run(cmd: List[str], cwd: Path | None = None, env: Dict[str, str] | None = None) -> str:
completed = subprocess.run(
cmd,
check=True,
cwd=cwd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
return completed.stdout.strip()
def sha256_file(path: Path) -> str:
digest = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1 << 20), b""):
digest.update(chunk)
return digest.hexdigest()
def posix(path: Path) -> str:
return path.as_posix()
def collect_directory_checksums(package_dir: Path) -> List[Dict[str, str]]:
checksums: List[Dict[str, str]] = []
for file_path in sorted(package_dir.rglob("*")):
if not file_path.is_file():
continue
relative = file_path.relative_to(package_dir)
checksums.append(
{
"path": posix(relative),
"sha256": sha256_file(file_path),
}
)
return checksums
def verify_checksums(
package_dir: Path,
checksum_path: Path,
artifact_path: Path,
checksum_payload: Dict[str, Any],
) -> None:
checksum_file = posix(checksum_path.relative_to(package_dir))
manifested_files = {entry["path"]: entry["sha256"] for entry in checksum_payload["files"]}
for relative_path, expected_sha in list(manifested_files.items()):
file_path = package_dir / relative_path
if not file_path.exists():
raise RuntimeError(f"checksum manifest referenced missing file: {relative_path}")
if sha256_file(file_path) != expected_sha:
raise RuntimeError(f"checksum mismatch for file: {relative_path}")
for file_path in sorted(package_dir.rglob("*")):
if not file_path.is_file():
continue
relative_path = posix(file_path.relative_to(package_dir))
if relative_path == checksum_file:
continue
if relative_path not in manifested_files:
raise RuntimeError(f"manifest missing checksum entry for file: {relative_path}")
artifact_expected = checksum_payload["artifact_sha256"]
if sha256_file(artifact_path) != artifact_expected:
raise RuntimeError(f"artifact checksum mismatch: {artifact_path.name}")
def read_package_version() -> str:
cargo_toml = ROOT / "Cargo.toml"
for line in cargo_toml.read_text(encoding="utf-8").splitlines():
m = re.match(r'^version\s*=\s*"([^"]+)"', line.strip())
if m:
return m.group(1)
raise RuntimeError("version not found in Cargo.toml")
def load_json(path: Path) -> Dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))
def write_json(path: Path, payload: Dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
def slugify_token(value: str) -> str:
base = re.sub(r"[^A-Za-z0-9._-]+", "-", value.strip())
base = base.strip("-.")
return base or "unknown"
def target_platform(target: str) -> str:
if "windows" in target:
return "windows"
if "apple" in target or "darwin" in target:
return "darwin"
return "linux"
def collect_targets(targets: Iterable[str] | None, profile: str) -> List[str]:
if targets:
return [t for t in targets]
host = run(["rustc", "-vV"], cwd=ROOT)
for line in host.splitlines():
if line.startswith("host: "):
return [line.split(":", 1)[1].strip()]
return ["x86_64-unknown-linux-gnu" if profile == "release" else "x86_64-unknown-linux-gnu"]
def build_profile_args(profile: str) -> List[str]:
if profile == "release":
return ["--release"]
return ["--profile", profile]
@dataclass(frozen=True)
class SourceInputs:
source_dir: Path
contract_version: str
contract: Dict[str, Any]
version: str
profile: str
targets: List[str]
dist_revision: str
toolchain: str
toolchain_slug: str
git_sha_full: str
git_sha_short: str
source_date_epoch: str
def build_entry_entries(
inputs: SourceInputs, config: Dict[str, Any], args: argparse.Namespace
) -> List[Dict[str, Any]]:
index_entries: List[Dict[str, Any]] = []
build_env = os.environ.copy()
build_env["SOURCE_DATE_EPOCH"] = inputs.source_date_epoch
for target in inputs.targets:
build_cmd = [
"cargo",
"build",
"--locked",
"--target",
target,
*build_profile_args(inputs.profile),
]
if not args.no_build:
run(build_cmd, cwd=inputs.source_dir, env=build_env)
build_path = (
inputs.source_dir
/ "target"
/ target
/ ("release" if inputs.profile == "release" else inputs.profile)
/ config["artifact_name"]
)
if not build_path.exists():
raise FileNotFoundError(f"missing compiled artifact: {build_path}")
artifact_dir = Path(
config["artifact_dir_template"].format(
dist_root=config["dist_root"],
contract_version=inputs.contract_version,
version=inputs.version,
target=target,
profile=inputs.profile,
toolchain=inputs.toolchain_slug,
gitsha=inputs.git_sha_short,
dist_revision=inputs.dist_revision,
)
)
package_dir = artifact_dir / "package"
package_dir.mkdir(parents=True, exist_ok=True)
bin_dir = package_dir / "bin"
bin_dir.mkdir(parents=True, exist_ok=True)
staged_binary = bin_dir / config["artifact_name"]
staged_binary.write_bytes(build_path.read_bytes())
try:
staged_binary.chmod(0o755)
except OSError:
pass
artifact_name = config["artifact_filename_template"].format(
artifact_name=config["artifact_name"],
version=inputs.version,
target=target,
profile=inputs.profile,
toolchain=inputs.toolchain_slug,
gitsha=inputs.git_sha_short,
dist_revision=inputs.dist_revision,
ext=config["artifact_ext"],
)
artifact_file = artifact_dir / artifact_name
manifest = {
"schema_version": "distribution-manifest-v1",
"contract_version": inputs.contract_version,
"artifact": {
"name": config["artifact_name"],
"filename": artifact_name,
"path": posix(artifact_file.relative_to(inputs.source_dir)),
},
"artifact_version": inputs.version,
"target": target,
"platform": target_platform(target),
"profile": inputs.profile,
"toolchain": inputs.toolchain,
"dist_revision": inputs.dist_revision,
"git": {
"revision": inputs.git_sha_full,
"short": inputs.git_sha_short,
},
"build_time_inputs": {
"source_date_epoch": inputs.source_date_epoch,
"build_command": build_cmd,
"target": target,
"profile": inputs.profile,
"artifact_name": config["artifact_name"],
"rustc": inputs.toolchain,
},
"generated_at": datetime.fromtimestamp(
int(inputs.source_date_epoch), tz=timezone.utc
).isoformat(),
}
manifest_path = package_dir / config["manifest_filename"]
checksum_path = package_dir / config["checksums_filename"]
provenance_path = package_dir / config["provenance_filename"]
provenance = {
"schema_version": "distribution-provenance-v1",
"contract_version": inputs.contract_version,
"artifact": {
"name": config["artifact_name"],
"target": target,
"profile": inputs.profile,
"dist_revision": inputs.dist_revision,
"toolchain": inputs.toolchain,
"git": {
"full": inputs.git_sha_full,
"short": inputs.git_sha_short,
},
},
"build_inputs": {
"source_date_epoch": inputs.source_date_epoch,
"build_environment": {
"RUSTFLAGS": os.getenv("RUSTFLAGS", ""),
"CARGO_NET_OFFLINE": os.getenv("CARGO_NET_OFFLINE", ""),
"CARGO_TERM_COLOR": os.getenv("CARGO_TERM_COLOR", ""),
},
"build_command": build_cmd,
},
"build_artifact": {
"binary_name": config["artifact_name"],
"package_root": posix(package_dir.relative_to(inputs.source_dir)),
"manifest_file": config["manifest_filename"],
"checksums_file": config["checksums_filename"],
},
"generated_at": datetime.fromtimestamp(
int(inputs.source_date_epoch), tz=timezone.utc
).isoformat(),
}
provenance_path.write_text(json.dumps(provenance, indent=2, sort_keys=True) + "\n", encoding="utf-8")
run(
[
"tar",
"--sort=name",
"--owner=0",
"--group=0",
"--numeric-owner",
f"--mtime=@{inputs.source_date_epoch}",
"--format=ustar",
"-czf",
str(artifact_file),
"-C",
str(package_dir),
".",
],
cwd=inputs.source_dir,
env={**build_env, "GZIP": "-n"},
)
artifact_sha256 = sha256_file(artifact_file)
manifest["artifact"]["size_bytes"] = artifact_file.stat().st_size
manifest["artifact"]["sha256"] = artifact_sha256
manifest["content"] = {
"generated_by": "scripts/release-orchestrator.py",
"checksum_file": checksum_path.name,
"provenance_file": provenance_path.name,
}
manifest_path.write_text(json.dumps(manifest, indent=2, sort_keys=True) + "\n", encoding="utf-8")
checksums = collect_directory_checksums(package_dir)
checksum_payload = {
"schema_version": "distribution-checksums-v1",
"generated_by": "scripts/release-orchestrator.py",
"generated_at": datetime.fromtimestamp(int(inputs.source_date_epoch), tz=timezone.utc).isoformat(),
"artifact_file": artifact_name,
"artifact_sha256": artifact_sha256,
"files": checksums,
"artifact_entrypoints": {
"binary": posix(Path("bin") / config["artifact_name"]),
"manifest": config["manifest_filename"],
"checksums": config["checksums_filename"],
"provenance": config["provenance_filename"],
},
}
checksum_path.write_text(json.dumps(checksum_payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
manifest["content"] = {
"checksums": checksum_payload,
"generated_by": "scripts/release-orchestrator.py",
"checksum_file": checksum_path.name,
"provenance_file": provenance_path.name,
}
manifest_path.write_text(json.dumps(manifest, indent=2, sort_keys=True) + "\n", encoding="utf-8")
verify_checksums(package_dir, checksum_path, artifact_file, checksum_payload)
index_entries.append(
{
"version": inputs.version,
"target": target,
"profile": inputs.profile,
"platform": target_platform(target),
"toolchain": inputs.toolchain,
"toolchain_slug": inputs.toolchain_slug,
"git_rev": inputs.git_sha_full,
"dist_revision": inputs.dist_revision,
"source_date_epoch": inputs.source_date_epoch,
"generated_at": datetime.fromtimestamp(
int(inputs.source_date_epoch), tz=timezone.utc
).isoformat(),
"artifact_file": posix(artifact_file.relative_to(ROOT)),
"artifact_sha256": artifact_sha256,
"manifest_file": posix(manifest_path.relative_to(ROOT)),
"checksums_file": posix(checksum_path.relative_to(ROOT)),
}
)
return index_entries
def merge_index(
contract_version: str,
dist_root: str,
index_template: str,
legacy_index_template: str | None,
entries: List[Dict[str, Any]],
) -> None:
if not entries:
return
index_path = Path(index_template.format(dist_root=dist_root, contract_version=contract_version))
existing: Dict[str, Any] = {
"schema_version": "distribution-index-v1",
"contract_version": contract_version,
"generated_at": datetime.now(timezone.utc).isoformat(),
"releases": {},
}
if index_path.exists():
existing = load_json(index_path)
releases = existing.get("releases", {})
for entry in entries:
version_bucket = releases.setdefault(entry["version"], {})
target_bucket = version_bucket.setdefault(entry["target"], {})
profile_bucket = target_bucket.setdefault(entry["profile"], [])
index_key = f"{entry['toolchain_slug']}|{entry['git_rev'][:12]}|{entry['dist_revision']}"
profile_bucket = [candidate for candidate in profile_bucket if candidate.get("_index_key") != index_key]
profile_bucket.append({**entry, "_index_key": index_key})
target_bucket[entry["profile"]] = sorted(
profile_bucket,
key=lambda candidate: candidate["_index_key"],
)
ordered_releases: List[Dict[str, Any]] = []
for version in sorted(releases.keys(), key=str):
target_map = releases[version]
target_items = []
for target in sorted(target_map.keys(), key=str):
profile_map = target_map[target]
profile_items = []
for profile in sorted(profile_map.keys(), key=str):
profile_items.append(
{
"profile": profile,
"artifacts": [
{k: v for k, v in candidate.items() if k != "_index_key"}
for candidate in profile_map[profile]
],
}
)
target_items.append({"target": target, "profiles": profile_items})
ordered_releases.append({"version": version, "targets": target_items})
payload = {
"schema_version": "distribution-index-v1",
"contract_version": contract_version,
"generated_at": datetime.fromtimestamp(
int(entries[0]["source_date_epoch"]), tz=timezone.utc
).isoformat(),
"releases": ordered_releases,
}
write_json(index_path, payload)
if legacy_index_template:
legacy_index_path = Path(
legacy_index_template.format(dist_root=dist_root, contract_version=contract_version)
)
write_json(legacy_index_path, payload)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Generate deterministic dist artifacts.")
parser.add_argument("--version", help="artifact version")
parser.add_argument("--profile", default=None, help="cargo profile (release default)")
parser.add_argument("--target", action="append", help="target triple (repeatable)")
parser.add_argument("--dist-revision", default=None, help="distribution revision")
parser.add_argument("--no-build", action="store_true", help="skip cargo build step")
parser.add_argument("--toolchain", default=None, help="toolchain version label")
return parser.parse_args()
def main() -> int:
args = parse_args()
config = load_json(CONFIG_PATH)
contract = load_json(CONTRACT_PATH)
version = args.version or read_package_version()
profile = args.profile or config["default_profile"]
dist_revision = args.dist_revision or config["default_dist_revision"]
toolchain = args.toolchain or run(["rustc", "--version"], cwd=ROOT)
toolchain_slug = slugify_token(toolchain.split(" ")[1] if " " in toolchain else toolchain)
git_sha_full = run(["git", "rev-parse", "HEAD"], cwd=ROOT)
git_sha_short = run(["git", "rev-parse", "--short", "HEAD"], cwd=ROOT)
source_date_epoch = os.getenv(
"SOURCE_DATE_EPOCH",
run(["git", "show", "-s", "--format=%ct", "HEAD"], cwd=ROOT),
)
targets = collect_targets(args.target, profile)
contract_version = contract.get("contract_version", "1.0.0")
input_data = SourceInputs(
source_dir=ROOT,
contract_version=contract_version,
contract=contract,
version=version,
profile=profile,
targets=targets,
dist_revision=dist_revision,
toolchain=toolchain,
toolchain_slug=toolchain_slug,
git_sha_full=git_sha_full,
git_sha_short=git_sha_short,
source_date_epoch=source_date_epoch,
)
entries = build_entry_entries(input_data, config, args)
merge_index(
contract_version,
config["dist_root"],
config["index_path_template"],
config.get("legacy_index_path_template"),
entries,
)
return 0
if __name__ == "__main__":
raise SystemExit(main())