Update for Python 3.6 compatibility

This commit is contained in:
OpenClaw 2026-03-03 00:00:13 +08:00
parent ebbc56fc4a
commit 4f8048bd8d
2 changed files with 318 additions and 235 deletions

536
app.py
View File

@ -21,7 +21,8 @@ app.config['SECRET_KEY'] = 'llm-perf-test-secret-key'
# 数据存储目录
DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
os.makedirs(DATA_DIR, exist_ok=True)
if not os.path.exists(DATA_DIR):
os.makedirs(DATA_DIR)
# 配置文件路径
CONFIG_FILE = os.path.join(DATA_DIR, 'config.json')
@ -47,26 +48,33 @@ DEFAULT_TEST_CASES = [
"expected_length": 100
},
{
"id": "tc_002",
"id": "tc_002",
"name": "代码生成",
"prompt": "写一个Python函数计算斐波那契数列的前n项。",
"prompt": "写一个Python函数计算斐波那契数列的前n项。",
"expected_length": 200
},
{
"id": "tc_003",
"name": "长文本理解",
"prompt": """请总结以下段落的主要观点:\n\n人工智能AI是计算机科学的一个分支致力于创造能够执行通常需要人类智能的任务的系统。这些任务包括视觉感知、语音识别、决策制定和语言翻译等。机器学习是AI的一个子集它使计算机能够从数据中学习并改进而无需明确编程。深度学习是机器学习的一种特定方法使用人工神经网络来模拟人脑的工作方式。近年来随着计算能力的提升和大数据的可用性AI技术取得了显著进展在医疗诊断、自动驾驶汽车、自然语言处理等领域展现出巨大潜力。然而AI的发展也引发了关于隐私、就业和伦理等方面的担忧需要社会各界共同探讨和制定相应的规范。""",
"prompt": "请总结以下段落的主要观点人工智能正在改变我们的生活方式。从智能手机到自动驾驶汽车AI技术已经深入到我们日常生活的方方面面。它不仅提高了效率还创造了新的可能性。",
"expected_length": 150
},
{
"id": "tc_004",
"name": "创意写作",
"prompt": "写一个关于未来城市的短篇科幻故事约300字。",
"expected_length": 400
"prompt": "写一首关于春天的四行短诗。",
"expected_length": 100
},
{
"id": "tc_005",
"name": "逻辑推理",
"prompt": "如果A大于BB大于C那么A和C的关系是什么请解释你的推理过程。",
"expected_length": 120
}
]
# 全局锁
# 内存中的结果缓存
results_cache = {}
results_lock = Lock()
@ -74,7 +82,12 @@ def load_config():
"""加载配置"""
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'r', encoding='utf-8') as f:
return {**DEFAULT_CONFIG, **json.load(f)}
config = json.load(f)
# 合并默认配置
for key, value in DEFAULT_CONFIG.items():
if key not in config:
config[key] = value
return config
return DEFAULT_CONFIG.copy()
@ -99,7 +112,7 @@ def save_test_cases(test_cases):
def load_results():
"""加载历史测试结果"""
"""加载历史结果"""
if os.path.exists(RESULTS_FILE):
with open(RESULTS_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
@ -108,177 +121,196 @@ def load_results():
def save_result(result):
"""保存测试结果"""
with results_lock:
results = load_results()
results.append(result)
with open(RESULTS_FILE, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
results = load_results()
results.append(result)
with open(RESULTS_FILE, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
def stream_chat_completion(api_base, api_key, model, messages, max_tokens, temperature, timeout):
def generate_id():
"""生成唯一ID"""
return str(uuid.uuid4())[:8]
def call_llm_api(config, prompt, stream=False):
"""
流式调用 LLM API实时计算 TTFT TPS
调用 LLM API
返回: (ttft, tps, total_time, output_text, tokens_in, tokens_out)
"""
api_base = config.get('api_base', '').rstrip('/')
api_key = config.get('api_key', '')
model = config.get('model', '')
timeout = config.get('timeout', 60)
max_tokens = config.get('max_tokens', 512)
temperature = config.get('temperature', 0.7)
headers = {
"Content-Type": "application/json"
'Content-Type': 'application/json'
}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
headers['Authorization'] = 'Bearer ' + api_key
payload = {
"model": model,
"messages": messages,
"max_tokens": max_tokens,
"temperature": temperature,
"stream": True
'model': model,
'messages': [{'role': 'user', 'content': prompt}],
'max_tokens': max_tokens,
'temperature': temperature,
'stream': stream
}
url = f"{api_base}/chat/completions"
url = api_base + '/chat/completions'
first_token_time = None
start_time = time.time()
total_tokens = 0
content_chunks = []
first_token_time = None
output_text = ''
tokens_in = 0
tokens_out = 0
try:
response = requests.post(url, headers=headers, json=payload,
timeout=timeout, stream=True)
response.raise_for_status()
for line in response.iter_lines():
if not line:
continue
if stream:
# 流式模式 - 测量 TTFT
response = requests.post(url, headers=headers, json=payload,
timeout=timeout, stream=True)
response.raise_for_status()
line_str = line.decode('utf-8')
if line_str.startswith('data: '):
data_str = line_str[6:]
if data_str == '[DONE]':
break
try:
data = json.loads(data_str)
delta = data.get('choices', [{}])[0].get('delta', {})
content = delta.get('content', '')
if content:
if first_token_time is None:
first_token_time = time.time()
content_chunks.append(content)
total_tokens += len(content) # 近似token数
except json.JSONDecodeError:
continue
end_time = time.time()
# 计算指标
ttft = (first_token_time - start_time) * 1000 if first_token_time else 0 # ms
total_time = (end_time - start_time) * 1000 # ms
tps = total_tokens / (total_time / 1000) if total_time > 0 else 0
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data = line[6:]
if data == '[DONE]':
break
try:
chunk = json.loads(data)
if 'choices' in chunk and len(chunk['choices']) > 0:
delta = chunk['choices'][0].get('delta', {})
content = delta.get('content', '')
if content:
if first_token_time is None:
first_token_time = time.time()
output_text += content
except:
pass
total_time = time.time() - start_time
ttft = first_token_time - start_time if first_token_time else total_time
# 估算 token 数量
tokens_in = len(prompt) // 4
tokens_out = len(output_text) // 4
# 计算 TPS
generation_time = total_time - ttft
tps = tokens_out / generation_time if generation_time > 0 else 0
else:
# 非流式模式
response = requests.post(url, headers=headers, json=payload, timeout=timeout)
response.raise_for_status()
end_time = time.time()
total_time = end_time - start_time
data = response.json()
if 'choices' in data and len(data['choices']) > 0:
output_text = data['choices'][0].get('message', {}).get('content', '')
# 获取 token 使用量
usage = data.get('usage', {})
tokens_in = usage.get('prompt_tokens', len(prompt) // 4)
tokens_out = usage.get('completion_tokens', len(output_text) // 4)
# 非流式模式下 TTFT 约等于总时间
ttft = total_time
tps = tokens_out / total_time if total_time > 0 else 0
return {
"success": True,
"ttft_ms": round(ttft, 2),
"total_time_ms": round(total_time, 2),
"tps": round(tps, 2),
"total_chars": sum(len(c) for c in content_chunks),
"content": ''.join(content_chunks)
'success': True,
'ttft': round(ttft * 1000, 2), # 转换为毫秒
'tps': round(tps, 2),
'total_time': round(total_time, 3),
'output': output_text,
'tokens_in': tokens_in,
'tokens_out': tokens_out
}
except Exception as e:
return {
"success": False,
"error": str(e)
'success': False,
'error': str(e),
'ttft': 0,
'tps': 0,
'total_time': 0,
'output': '',
'tokens_in': 0,
'tokens_out': 0
}
def run_single_test(api_config, test_case, run_index=0):
"""运行单个测试"""
messages = [{"role": "user", "content": test_case["prompt"]}]
result = stream_chat_completion(
api_base=api_config["api_base"],
api_key=api_config["api_key"],
model=api_config["model"],
messages=messages,
max_tokens=api_config.get("max_tokens", 512),
temperature=api_config.get("temperature", 0.7),
timeout=api_config.get("timeout", 60)
)
result["test_case_id"] = test_case["id"]
result["test_case_name"] = test_case["name"]
result["run_index"] = run_index
result["timestamp"] = datetime.now().isoformat()
return result
def run_batch_tests(api_config, test_cases, runs_per_case=1, concurrency=1):
"""批量运行测试"""
all_tasks = []
for test_case in test_cases:
for i in range(runs_per_case):
all_tasks.append((api_config, test_case, i))
def run_single_test(config, test_case, iterations=1, stream=False):
"""运行单个测试用例"""
results = []
completed = 0
total = len(all_tasks)
with ThreadPoolExecutor(max_workers=concurrency) as executor:
futures = {executor.submit(run_single_test, *task): task for task in all_tasks}
for i in range(iterations):
result = call_llm_api(config, test_case['prompt'], stream=stream)
result['iteration'] = i + 1
results.append(result)
# 计算统计数据
successful = [r for r in results if r['success']]
if successful:
avg_ttft = statistics.mean([r['ttft'] for r in successful])
avg_tps = statistics.mean([r['tps'] for r in successful])
avg_total_time = statistics.mean([r['total_time'] for r in successful])
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
completed += 1
print(f"Progress: {completed}/{total}")
except Exception as e:
print(f"Test failed: {e}")
return results
def calculate_statistics(results):
"""计算统计数据"""
successful = [r for r in results if r.get("success")]
failed = [r for r in results if not r.get("success")]
if not successful:
return {"error": "No successful tests"}
ttfts = [r["ttft_ms"] for r in successful]
tpss = [r["tps"] for r in successful]
times = [r["total_time_ms"] for r in successful]
stats = {
"total_tests": len(results),
"successful": len(successful),
"failed": len(failed),
"success_rate": round(len(successful) / len(results) * 100, 2),
"ttft": {
"avg": round(statistics.mean(ttfts), 2),
"min": round(min(ttfts), 2),
"max": round(max(ttfts), 2),
"median": round(statistics.median(ttfts), 2)
},
"tps": {
"avg": round(statistics.mean(tpss), 2),
"min": round(min(tpss), 2),
"max": round(max(tpss), 2),
"median": round(statistics.median(tpss), 2)
},
"total_time": {
"avg": round(statistics.mean(times), 2),
"min": round(min(times), 2),
"max": round(max(times), 2)
stats = {
'avg_ttft': round(avg_ttft, 2),
'avg_tps': round(avg_tps, 2),
'avg_total_time': round(avg_total_time, 3),
'min_ttft': round(min([r['ttft'] for r in successful]), 2),
'max_ttft': round(max([r['ttft'] for r in successful]), 2),
'min_tps': round(min([r['tps'] for r in successful]), 2),
'max_tps': round(max([r['tps'] for r in successful]), 2),
'success_rate': round(len(successful) / len(results) * 100, 1)
}
else:
stats = {
'avg_ttft': 0,
'avg_tps': 0,
'avg_total_time': 0,
'success_rate': 0
}
}
return stats
return {
'test_case': test_case,
'iterations': iterations,
'stream_mode': stream,
'results': results,
'statistics': stats
}
# ==================== Flask Routes ====================
def run_concurrent_tests(config, test_cases, concurrency=1, iterations_per_case=1, stream=False):
"""并发运行多个测试用例"""
all_results = []
def run_test(tc):
return run_single_test(config, tc, iterations_per_case, stream)
if concurrency > 1:
with ThreadPoolExecutor(max_workers=concurrency) as executor:
futures = {executor.submit(run_test, tc): tc for tc in test_cases}
for future in as_completed(futures):
result = future.result()
all_results.append(result)
else:
for tc in test_cases:
result = run_test(tc)
all_results.append(result)
return all_results
@app.route('/')
def index():
@ -286,119 +318,169 @@ def index():
return render_template('index.html')
@app.route('/api/config', methods=['GET', 'POST'])
def config_api():
"""配置管理 API"""
if request.method == 'GET':
return jsonify(load_config())
else:
new_config = request.json
save_config(new_config)
return jsonify({"status": "success"})
@app.route('/api/config', methods=['GET'])
def get_config():
"""获取配置"""
return jsonify(load_config())
@app.route('/api/test-cases', methods=['GET', 'POST', 'PUT', 'DELETE'])
def test_cases_api():
"""测试用例管理 API"""
if request.method == 'GET':
return jsonify(load_test_cases())
elif request.method == 'POST':
test_cases = load_test_cases()
new_case = request.json
new_case['id'] = f"tc_{uuid.uuid4().hex[:6]}"
test_cases.append(new_case)
save_test_cases(test_cases)
return jsonify({"status": "success", "id": new_case['id']})
elif request.method == 'PUT':
updated_case = request.json
test_cases = load_test_cases()
for i, tc in enumerate(test_cases):
if tc['id'] == updated_case['id']:
test_cases[i] = updated_case
break
save_test_cases(test_cases)
return jsonify({"status": "success"})
elif request.method == 'DELETE':
case_id = request.args.get('id')
test_cases = load_test_cases()
test_cases = [tc for tc in test_cases if tc['id'] != case_id]
save_test_cases(test_cases)
return jsonify({"status": "success"})
@app.route('/api/config', methods=['POST'])
def update_config():
"""更新配置"""
config = request.json
save_config(config)
return jsonify({'status': 'success'})
@app.route('/api/test-cases', methods=['GET'])
def get_test_cases():
"""获取测试用例"""
return jsonify(load_test_cases())
@app.route('/api/test-cases', methods=['POST'])
def update_test_cases():
"""更新测试用例"""
test_cases = request.json
save_test_cases(test_cases)
return jsonify({'status': 'success'})
@app.route('/api/test-cases/<case_id>', methods=['DELETE'])
def delete_test_case(case_id):
"""删除测试用例"""
test_cases = load_test_cases()
test_cases = [tc for tc in test_cases if tc['id'] != case_id]
save_test_cases(test_cases)
return jsonify({'status': 'success'})
@app.route('/api/run-test', methods=['POST'])
def run_test_api():
"""运行测试 API"""
def run_test():
"""运行测试"""
data = request.json
api_config = data.get('config', load_config())
config = data.get('config', load_config())
test_case_ids = data.get('test_case_ids', [])
runs_per_case = data.get('runs_per_case', 1)
iterations = data.get('iterations', 1)
concurrency = data.get('concurrency', 1)
stream = data.get('stream', False)
# 获取要运行的测试用例
# 加载选中的测试用例
all_test_cases = load_test_cases()
if test_case_ids:
test_cases = [tc for tc in all_test_cases if tc['id'] in test_case_ids]
selected_cases = [tc for tc in all_test_cases if tc['id'] in test_case_ids]
else:
test_cases = all_test_cases
selected_cases = all_test_cases
if not test_cases:
return jsonify({"error": "No test cases selected"}), 400
if not selected_cases:
return jsonify({'error': 'No test cases selected'}), 400
# 生成测试ID
test_id = generate_id()
# 运行测试
results = run_batch_tests(api_config, test_cases, runs_per_case, concurrency)
start_time = time.time()
results = run_concurrent_tests(config, selected_cases, concurrency, iterations, stream)
end_time = time.time()
# 计算统计
stats = calculate_statistics(results)
# 汇总结果
summary = {
'test_id': test_id,
'timestamp': datetime.now().isoformat(),
'config': config,
'test_cases_count': len(selected_cases),
'iterations': iterations,
'concurrency': concurrency,
'stream_mode': stream,
'total_duration': round(end_time - start_time, 2),
'results': results
}
# 保存结果
test_run = {
"id": f"run_{uuid.uuid4().hex[:8]}",
"timestamp": datetime.now().isoformat(),
"config": api_config,
"stats": stats,
"results": results
}
save_result(test_run)
save_result(summary)
return jsonify(test_run)
# 缓存结果
with results_lock:
results_cache[test_id] = summary
return jsonify(summary)
@app.route('/api/results', methods=['GET'])
def get_results_api():
def get_results():
"""获取历史测试结果"""
return jsonify(load_results())
@app.route('/api/results/<result_id>', methods=['GET'])
def get_result_detail_api(result_id):
"""获取单个测试结果详情"""
@app.route('/api/results/<test_id>', methods=['GET'])
def get_result(test_id):
"""获取单个测试结果"""
# 先查缓存
with results_lock:
if test_id in results_cache:
return jsonify(results_cache[test_id])
# 再查文件
results = load_results()
for result in results:
if result.get('id') == result_id:
if result.get('test_id') == test_id:
return jsonify(result)
return jsonify({"error": "Result not found"}), 404
return jsonify({'error': 'Result not found'}), 404
@app.route('/api/results/<result_id>', methods=['DELETE'])
def delete_result_api(result_id):
@app.route('/api/results/<test_id>', methods=['DELETE'])
def delete_result(test_id):
"""删除测试结果"""
results = load_results()
results = [r for r in results if r.get('test_id') != test_id]
with open(RESULTS_FILE, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
with results_lock:
results = load_results()
results = [r for r in results if r.get('id') != result_id]
with open(RESULTS_FILE, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
return jsonify({"status": "success"})
if test_id in results_cache:
del results_cache[test_id]
return jsonify({'status': 'success'})
@app.route('/api/verify-config', methods=['POST'])
def verify_config():
"""验证 API 配置是否可用"""
config = request.json
try:
api_base = config.get('api_base', '').rstrip('/')
api_key = config.get('api_key', '')
model = config.get('model', '')
headers = {'Content-Type': 'application/json'}
if api_key:
headers['Authorization'] = 'Bearer ' + api_key
# 尝试获取模型列表或进行简单调用
url = api_base + '/models'
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
return jsonify({'status': 'success', 'message': 'Connection successful'})
else:
# 尝试一个简单的 completion 调用来验证
test_payload = {
'model': model,
'messages': [{'role': 'user', 'content': 'Hi'}],
'max_tokens': 5
}
test_response = requests.post(api_base + '/chat/completions',
headers=headers, json=test_payload, timeout=10)
if test_response.status_code == 200:
return jsonify({'status': 'success', 'message': 'Connection successful'})
else:
return jsonify({'status': 'error', 'message': 'API returned status ' + str(test_response.status_code)})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
if __name__ == '__main__':
# 初始化默认配置和测试用例
if not os.path.exists(CONFIG_FILE):
save_config(DEFAULT_CONFIG)
if not os.path.exists(TEST_CASES_FILE):
save_test_cases(DEFAULT_TEST_CASES)
app.run(host='0.0.0.0', port=8001, debug=True)

View File

@ -1,8 +1,9 @@
flask==3.0.0
requests==2.31.0
openai==1.6.0
plotly==5.18.0
pandas==2.1.4
numpy==1.26.2
gunicorn==21.2.0
python-dotenv==1.0.0
flask==2.0.3
requests==2.27.1
openai==0.28.0
plotly==5.3.0
pandas==1.1.5
numpy==1.19.5
gunicorn==20.1.0
python-dotenv==0.19.0
werkzeug==2.0.3