Skip to main content

#!/usr/bin/env python3 """Kimi OAuth token-refreshing proxy for Claude Code (ADR-201).

Sits between Claude Code and api.kimi.com, injecting a fresh OAuth access token on every request. This allows sessions to run indefinitely despite the ~15-minute token expiry.

Usage: kimi-proxy # Start on default port 18462 kimi-proxy --port 9000 # Custom port kimi-proxy --check # Check if proxy is running kimi-proxy --stop # Stop running proxy kimi-proxy --watchdog # Start with auto-restart watchdog kimi-proxy --stop-all # Stop proxy and watchdog

Architecture: Claude Code -> localhost:18462 -> kimi-proxy -> api.kimi.com/coding (injects fresh OAuth token)

Installation: ln -sf $(realpath kimi-proxy.py) ~/.local/bin/kimi-proxy chmod +x ~/.local/bin/kimi-proxy """ import http.client import http.server import json import os import signal import socket import ssl import sys import time import urllib.request from pathlib import Path from socketserver import ThreadingMixIn

--- Configuration ---

DEFAULT_PORT = 18462 KIMI_HOST = "api.kimi.com" KIMI_BASE_PATH = "/coding" CREDENTIALS_FILE = Path.home() / ".kimi" / "credentials" / "kimi-code.json" PID_FILE = Path.home() / ".local" / "run" / "kimi-proxy.pid" CLIENT_ID = "17e5f671-d194-4dfb-9706-5516cb48c098" OAUTH_HOST = "https://auth.kimi.com" REFRESH_THRESHOLD = 300 # Refresh if <5 min remaining MAX_REFRESH_RETRIES = 3 RETRY_BACKOFF_BASE = 2 # seconds -- exponential: 2, 4, 8 WATCHDOG_CHECK_INTERVAL = 10 # seconds between watchdog health checks WATCHDOG_MAX_RESTART_DELAY = 60 # max backoff between restarts WATCHDOG_PID_FILE = Path.home() / ".local" / "run" / "kimi-watchdog.pid"

--- Token Management (reused from kimi-token) ---

_cached_token = None _cached_expires = 0 _last_health_check = 0 _health_status = {"ok": True, "error": None, "last_refresh": 0, "uptime_start": 0}

def _do_token_refresh(refresh_tok): """Execute a single token refresh attempt. Returns new creds dict or raises.""" data = ( f"client_id={CLIENT_ID}" f"&grant_type=refresh_token" f"&refresh_token={refresh_tok}" ).encode() req = urllib.request.Request( f"{OAUTH_HOST}/api/oauth/token", data=data, headers={"Content-Type": "application/x-www-form-urlencoded"}, method="POST", ) with urllib.request.urlopen(req, timeout=10) as resp: result = json.loads(resp.read())

return {
"access_token": result["access_token"],
"refresh_token": result["refresh_token"],
"expires_at": time.time() + float(result["expires_in"]),
"scope": result.get("scope", "kimi-code"),
"token_type": result.get("token_type", "Bearer"),
}

def _is_permanent_failure(exc): """Check if a refresh failure is permanent (no point retrying).""" msg = str(exc).lower() # HTTP 401/403 from auth server = revoked/invalid refresh token if "401" in msg or "403" in msg or "invalid_grant" in msg: return True # urllib HTTP errors if hasattr(exc, "code") and exc.code in (401, 403): return True return False

def get_fresh_token(): """Get a valid OAuth token, refreshing with retry on transient failures.""" global _cached_token, _cached_expires, _health_status

now = time.time()
if _cached_token and (_cached_expires - now) > REFRESH_THRESHOLD:
return _cached_token

# Read from file
if not CREDENTIALS_FILE.exists():
_health_status["ok"] = False
_health_status["error"] = "No credentials file. Run 'kimi' CLI to login."
raise RuntimeError(_health_status["error"])

creds = json.loads(CREDENTIALS_FILE.read_text())
expires_at = creds.get("expires_at", 0)
remaining = expires_at - now

if remaining > REFRESH_THRESHOLD:
_cached_token = creds["access_token"]
_cached_expires = expires_at
_health_status["ok"] = True
_health_status["error"] = None
return _cached_token

# Refresh with retry
refresh_tok = creds.get("refresh_token", "")
if not refresh_tok:
_health_status["ok"] = False
_health_status["error"] = "No refresh token. Run 'kimi' CLI to login."
raise RuntimeError(_health_status["error"])

last_exc = None
for attempt in range(MAX_REFRESH_RETRIES):
try:
new_creds = _do_token_refresh(refresh_tok)
CREDENTIALS_FILE.write_text(json.dumps(new_creds, ensure_ascii=False))

_cached_token = new_creds["access_token"]
_cached_expires = new_creds["expires_at"]
_health_status["ok"] = True
_health_status["error"] = None
_health_status["last_refresh"] = time.time()

if attempt > 0:
print(f"[kimi-proxy] Token refreshed after {attempt + 1} attempts", file=sys.stderr)
else:
print(f"[kimi-proxy] Token refreshed, valid for ~15m", file=sys.stderr)
return _cached_token

except Exception as e:
last_exc = e
if _is_permanent_failure(e):
_health_status["ok"] = False
_health_status["error"] = f"Permanent auth failure: {e}. Run 'kimi' CLI to re-login."
print(f"[kimi-proxy] PERMANENT refresh failure: {e}", file=sys.stderr)
print(f"[kimi-proxy] Run 'kimi' CLI to re-authenticate.", file=sys.stderr)
raise RuntimeError(_health_status["error"]) from e

# Transient failure -- retry with backoff
delay = RETRY_BACKOFF_BASE ** (attempt + 1)
print(f"[kimi-proxy] Refresh attempt {attempt + 1}/{MAX_REFRESH_RETRIES} failed: {e}", file=sys.stderr)
if attempt < MAX_REFRESH_RETRIES - 1:
print(f"[kimi-proxy] Retrying in {delay}s...", file=sys.stderr)
time.sleep(delay)

# All retries exhausted
_health_status["ok"] = False
_health_status["error"] = f"Token refresh failed after {MAX_REFRESH_RETRIES} attempts: {last_exc}"
raise RuntimeError(_health_status["error"])

def _invalidate_cached_token(): """Force the next get_fresh_token() call to refresh from auth server.""" global _cached_token, _cached_expires _cached_token = None _cached_expires = 0 print(f"[kimi-proxy] Cached token invalidated -- will force refresh", file=sys.stderr)

--- Proxy Handler ---

class KimiProxyHandler(http.server.BaseHTTPRequestHandler): """Proxies requests to api.kimi.com with fresh OAuth tokens."""

def log_message(self, format, *args):
"""Suppress default access logs, keep errors."""
pass

def do_POST(self):
self._proxy_request("POST")

def do_GET(self):
if self.path == "/health":
self._handle_health()
return
self._proxy_request("GET")

def do_OPTIONS(self):
"""Handle CORS preflight."""
self.send_response(200)
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "*")
self.end_headers()

def _handle_health(self):
"""Health check: verifies proxy is up and token is refreshable."""
try:
token = get_fresh_token()
status = {
"status": "healthy",
"token_valid": True,
"token_expires_in": int(_cached_expires - time.time()),
"last_refresh": _health_status.get("last_refresh", 0),
"uptime": int(time.time() - _health_status.get("uptime_start", time.time())),
"pid": os.getpid(),
}
self.send_response(200)
except Exception as e:
status = {
"status": "unhealthy",
"token_valid": False,
"error": str(e),
"recovery": "Run 'kimi' CLI to re-authenticate, then restart claude-kimi",
"pid": os.getpid(),
}
self.send_response(503)

self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(status).encode())

def _proxy_request(self, method, _retry_auth=True):
try:
token = get_fresh_token()
except Exception as e:
self.send_response(503)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({
"error": {"type": "proxy_error", "message": f"Token refresh failed: {e}"}
}).encode())
return

# Read request body
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length) if content_length else b""

# Build upstream path
upstream_path = KIMI_BASE_PATH + self.path

# Build headers for upstream
upstream_headers = {
"x-api-key": token,
"Content-Type": self.headers.get("Content-Type", "application/json"),
}
# Forward anthropic-version if present
if self.headers.get("anthropic-version"):
upstream_headers["anthropic-version"] = self.headers["anthropic-version"]
if self.headers.get("anthropic-beta"):
upstream_headers["anthropic-beta"] = self.headers["anthropic-beta"]

# Connect to Kimi
ctx = ssl.create_default_context()
conn = http.client.HTTPSConnection(KIMI_HOST, context=ctx, timeout=300)

try:
conn.request(method, upstream_path, body=body, headers=upstream_headers)
resp = conn.getresponse()

# Self-healing: if Kimi returns 401, the cached token is stale.
# Invalidate it, force a fresh refresh, and retry the request once.
if resp.status == 401 and _retry_auth:
resp_body = resp.read() # consume the 401 response
conn.close()
print(f"[kimi-proxy] Upstream 401 -- invalidating cached token and re-authenticating", file=sys.stderr)
_invalidate_cached_token()
self._retry_with_fresh_token(method, body)
return

# Forward status and headers
self.send_response(resp.status)
is_streaming = False
for header, value in resp.getheaders():
h = header.lower()
if h in ("transfer-encoding",):
continue # Let our server handle chunked encoding
if h == "content-type" and "text/event-stream" in value:
is_streaming = True
self.send_header(header, value)
self.end_headers()

if is_streaming:
# Stream SSE response -- use read1() for non-blocking reads
while True:
try:
chunk = resp.read1(65536)
except AttributeError:
# Fallback if read1 unavailable
chunk = resp.read(4096)
if not chunk:
break
self.wfile.write(chunk)
self.wfile.flush()
else:
# Non-streaming: read and forward full response
self.wfile.write(resp.read())

except Exception as e:
try:
self.send_response(502)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({
"error": {"type": "proxy_error", "message": f"Upstream error: {e}"}
}).encode())
except Exception:
pass
finally:
conn.close()

def _retry_with_fresh_token(self, method, body):
"""Retry a request with a freshly-refreshed token (called after upstream 401)."""
try:
token = get_fresh_token()
except Exception as e:
self.send_response(503)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({
"error": {"type": "proxy_error", "message": f"Token re-auth failed after upstream 401: {e}"}
}).encode())
return

upstream_path = KIMI_BASE_PATH + self.path
upstream_headers = {
"x-api-key": token,
"Content-Type": self.headers.get("Content-Type", "application/json"),
}
if self.headers.get("anthropic-version"):
upstream_headers["anthropic-version"] = self.headers["anthropic-version"]
if self.headers.get("anthropic-beta"):
upstream_headers["anthropic-beta"] = self.headers["anthropic-beta"]

ctx = ssl.create_default_context()
conn = http.client.HTTPSConnection(KIMI_HOST, context=ctx, timeout=300)

try:
conn.request(method, upstream_path, body=body, headers=upstream_headers)
resp = conn.getresponse()

if resp.status == 401:
# Second 401 -- token is truly invalid, don't loop
print(f"[kimi-proxy] STILL 401 after re-auth -- refresh token may be revoked", file=sys.stderr)
print(f"[kimi-proxy] Run 'kimi' CLI to re-authenticate.", file=sys.stderr)
_health_status["ok"] = False
_health_status["error"] = "Persistent 401 after re-auth. Run 'kimi' to re-login."

# Forward whatever Kimi returned (success or second 401)
self.send_response(resp.status)
is_streaming = False
for header, value in resp.getheaders():
h = header.lower()
if h in ("transfer-encoding",):
continue
if h == "content-type" and "text/event-stream" in value:
is_streaming = True
self.send_header(header, value)
self.end_headers()

if is_streaming:
while True:
try:
chunk = resp.read1(65536)
except AttributeError:
chunk = resp.read(4096)
if not chunk:
break
self.wfile.write(chunk)
self.wfile.flush()
else:
self.wfile.write(resp.read())

if resp.status < 400:
print(f"[kimi-proxy] Re-auth successful -- request completed with fresh token", file=sys.stderr)

except Exception as e:
try:
self.send_response(502)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({
"error": {"type": "proxy_error", "message": f"Upstream error on retry: {e}"}
}).encode())
except Exception:
pass
finally:
conn.close()

class ThreadedHTTPServer(ThreadingMixIn, http.server.HTTPServer): """Handle each request in a new thread for concurrent streaming.""" daemon_threads = True allow_reuse_address = True

--- Lifecycle ---

def is_running(port=DEFAULT_PORT): """Check if proxy is already running.""" if PID_FILE.exists(): try: pid = int(PID_FILE.read_text().strip()) os.kill(pid, 0) # Check if process exists return pid except (ProcessError, OSError, ValueError): PID_FILE.unlink(missing_ok=True)

# Also check port
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(1)
s.connect(("127.0.0.1", port))
return -1 # Something is on the port
except (ConnectionRefusedError, socket.timeout, OSError):
return 0

def stop_proxy(): """Stop a running proxy.""" if PID_FILE.exists(): try: pid = int(PID_FILE.read_text().strip()) os.kill(pid, signal.SIGTERM) PID_FILE.unlink(missing_ok=True) print(f"Stopped kimi-proxy (PID {pid})") return True except (OSError, ValueError): PID_FILE.unlink(missing_ok=True) print("No kimi-proxy running") return False

def start_proxy(port=DEFAULT_PORT, foreground=False): """Start the proxy server.""" running = is_running(port) if running: if running > 0: print(f"kimi-proxy already running (PID {running}) on port {port}") else: print(f"Port {port} already in use") return

if not foreground:
# Daemonize
pid = os.fork()
if pid > 0:
# Parent: wait briefly and verify child started
time.sleep(0.3)
if is_running(port):
print(f"kimi-proxy started on port {port} (PID {pid})")
else:
print(f"kimi-proxy started (PID {pid})", file=sys.stderr)
return
# Child: detach
os.setsid()
# Redirect stdio to /dev/null
devnull = os.open(os.devnull, os.O_RDWR)
os.dup2(devnull, 0)
os.dup2(devnull, 1)
# Keep stderr for error logging
log_path = Path.home() / ".local" / "run" / "kimi-proxy.log"
log_path.parent.mkdir(parents=True, exist_ok=True)
log_fd = os.open(str(log_path), os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o644)
os.dup2(log_fd, 2)
os.close(devnull)
os.close(log_fd)

# Write PID file
PID_FILE.parent.mkdir(parents=True, exist_ok=True)
PID_FILE.write_text(str(os.getpid()))

def cleanup(signum, frame):
PID_FILE.unlink(missing_ok=True)
sys.exit(0)

signal.signal(signal.SIGTERM, cleanup)
signal.signal(signal.SIGINT, cleanup)

_health_status["uptime_start"] = time.time()
server = ThreadedHTTPServer(("127.0.0.1", port), KimiProxyHandler)
if foreground:
print(f"kimi-proxy listening on 127.0.0.1:{port} (foreground)")
print("Press Ctrl+C to stop")
try:
server.serve_forever()
except KeyboardInterrupt:
pass
finally:
server.shutdown()
PID_FILE.unlink(missing_ok=True)

def run_watchdog(port=DEFAULT_PORT): """Watchdog: monitor the proxy and restart it if it dies.

Runs as a daemonized parent process. Spawns the proxy as a child,
monitors it via /health, and restarts on crash with backoff.
"""
import subprocess

# Daemonize the watchdog itself
pid = os.fork()
if pid > 0:
time.sleep(0.3)
print(f"kimi-watchdog started (PID {pid})")
return

os.setsid()
devnull = os.open(os.devnull, os.O_RDWR)
os.dup2(devnull, 0)
os.dup2(devnull, 1)
log_path = Path.home() / ".local" / "run" / "kimi-watchdog.log"
log_path.parent.mkdir(parents=True, exist_ok=True)
log_fd = os.open(str(log_path), os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o644)
os.dup2(log_fd, 2)
os.close(devnull)
os.close(log_fd)

# Write watchdog PID
WATCHDOG_PID_FILE.parent.mkdir(parents=True, exist_ok=True)
WATCHDOG_PID_FILE.write_text(str(os.getpid()))

def cleanup(signum, frame):
# Kill proxy too if watchdog is stopped
proxy_pid = is_running(port)
if proxy_pid and proxy_pid > 0:
try:
os.kill(proxy_pid, signal.SIGTERM)
except OSError:
pass
WATCHDOG_PID_FILE.unlink(missing_ok=True)
sys.exit(0)

signal.signal(signal.SIGTERM, cleanup)
signal.signal(signal.SIGINT, cleanup)

restart_count = 0
last_start = 0

print(f"[kimi-watchdog] Started, monitoring proxy on port {port}", file=sys.stderr)

while True:
proxy_pid = is_running(port)

if not proxy_pid:
# Proxy is down -- restart it
# Backoff: 1s, 2s, 4s, 8s, ... up to max
if restart_count > 0:
delay = min(2 ** (restart_count - 1), WATCHDOG_MAX_RESTART_DELAY)
# Reset backoff if proxy was stable for >5 minutes
if last_start and (time.time() - last_start) > 300:
restart_count = 0
delay = 0
if delay:
print(f"[kimi-watchdog] Waiting {delay}s before restart (attempt {restart_count + 1})", file=sys.stderr)
time.sleep(delay)

print(f"[kimi-watchdog] Starting proxy on port {port}", file=sys.stderr)
# Start proxy as a subprocess (it will daemonize itself)
try:
subprocess.run(
[sys.executable, __file__, "--port", str(port)],
timeout=5,
)
except subprocess.TimeoutExpired:
pass # proxy daemonized successfully
except Exception as e:
print(f"[kimi-watchdog] Failed to start proxy: {e}", file=sys.stderr)

last_start = time.time()
restart_count += 1
time.sleep(2) # Give it a moment to bind
continue

# Proxy is running -- check health
try:
req = urllib.request.Request(f"http://127.0.0.1:{port}/health")
with urllib.request.urlopen(req, timeout=5) as resp:
health = json.loads(resp.read())
if health.get("status") == "healthy":
# All good -- reset restart counter
restart_count = 0
except Exception:
# Health check failed but process exists -- might be transient
pass

time.sleep(WATCHDOG_CHECK_INTERVAL)

def stop_watchdog(): """Stop the watchdog process.""" if WATCHDOG_PID_FILE.exists(): try: pid = int(WATCHDOG_PID_FILE.read_text().strip()) os.kill(pid, signal.SIGTERM) WATCHDOG_PID_FILE.unlink(missing_ok=True) print(f"Stopped kimi-watchdog (PID {pid})") return True except (OSError, ValueError): WATCHDOG_PID_FILE.unlink(missing_ok=True) return False

def is_watchdog_running(): """Check if the watchdog is running.""" if WATCHDOG_PID_FILE.exists(): try: pid = int(WATCHDOG_PID_FILE.read_text().strip()) os.kill(pid, 0) return pid except (OSError, ValueError): WATCHDOG_PID_FILE.unlink(missing_ok=True) return 0

def main(): import argparse parser = argparse.ArgumentParser(description="Kimi OAuth token-refreshing proxy") parser.add_argument("--port", type=int, default=DEFAULT_PORT, help=f"Port (default: {DEFAULT_PORT})") parser.add_argument("--check", action="store_true", help="Check if proxy is running") parser.add_argument("--stop", action="store_true", help="Stop running proxy") parser.add_argument("--foreground", action="store_true", help="Run in foreground (don't daemonize)") parser.add_argument("--watchdog", action="store_true", help="Start with watchdog (auto-restart on crash)") parser.add_argument("--stop-all", action="store_true", help="Stop proxy and watchdog") args = parser.parse_args()

if args.check:
pid = is_running(args.port)
wd_pid = is_watchdog_running()
if pid:
# Try health endpoint for richer status
try:
req = urllib.request.Request(f"http://127.0.0.1:{args.port}/health")
with urllib.request.urlopen(req, timeout=3) as resp:
health = json.loads(resp.read())
status = health.get("status", "unknown")
if status == "healthy":
expires_in = health.get("token_expires_in", 0)
uptime = health.get("uptime", 0)
print(f"kimi-proxy healthy (PID {health.get('pid', pid)}) on port {args.port}")
print(f" Token expires in: {expires_in // 60}m {expires_in % 60}s")
print(f" Uptime: {uptime // 3600}h {(uptime % 3600) // 60}m")
else:
print(f"kimi-proxy UNHEALTHY (PID {health.get('pid', pid)})")
print(f" Error: {health.get('error', 'unknown')}")
print(f" Recovery: {health.get('recovery', 'restart')}")
if not wd_pid:
sys.exit(2)
except Exception:
if pid > 0:
print(f"kimi-proxy running (PID {pid}) on port {args.port} (health check failed)")
else:
print(f"Port {args.port} in use (unknown process)")
else:
print(f"kimi-proxy not running")
if wd_pid:
print(f" Watchdog: active (PID {wd_pid})")
else:
print(f" Watchdog: not running")
sys.exit(0 if pid else 1)

if args.stop_all:
stop_watchdog()
stop_proxy()
return

if args.stop:
sys.exit(0 if stop_proxy() else 1)

if args.watchdog:
# Start proxy first, then watchdog to monitor it
start_proxy(args.port, foreground=False)
time.sleep(0.5)
run_watchdog(args.port)
return

start_proxy(args.port, args.foreground)

if name == "main": main()