llm_perf_test/app.py

487 lines
15 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
LLM Performance Test Tool
支持本地和云端大模型性能测试兼容 OpenAI API
"""
import os
import json
import time
import uuid
import statistics
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
from flask import Flask, render_template, request, jsonify, send_from_directory
import requests
app = Flask(__name__)
app.config['SECRET_KEY'] = 'llm-perf-test-secret-key'
# 数据存储目录
DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
2026-03-03 00:00:13 +08:00
if not os.path.exists(DATA_DIR):
os.makedirs(DATA_DIR)
# 配置文件路径
CONFIG_FILE = os.path.join(DATA_DIR, 'config.json')
TEST_CASES_FILE = os.path.join(DATA_DIR, 'test_cases.json')
RESULTS_FILE = os.path.join(DATA_DIR, 'results.json')
# 默认配置
DEFAULT_CONFIG = {
"api_base": "http://localhost:11434/v1",
"api_key": "",
"model": "qwen2.5:latest",
"timeout": 60,
"max_tokens": 512,
"temperature": 0.7
}
# 默认测试用例
DEFAULT_TEST_CASES = [
{
"id": "tc_001",
"name": "简单问答",
"prompt": "你好,请介绍一下自己。",
"expected_length": 100
},
{
2026-03-03 00:00:13 +08:00
"id": "tc_002",
"name": "代码生成",
2026-03-03 00:00:13 +08:00
"prompt": "写一个Python函数计算斐波那契数列的前n项。",
"expected_length": 200
},
{
"id": "tc_003",
"name": "长文本理解",
2026-03-03 00:00:13 +08:00
"prompt": "请总结以下段落的主要观点人工智能正在改变我们的生活方式。从智能手机到自动驾驶汽车AI技术已经深入到我们日常生活的方方面面。它不仅提高了效率还创造了新的可能性。",
"expected_length": 150
},
{
"id": "tc_004",
"name": "创意写作",
2026-03-03 00:00:13 +08:00
"prompt": "写一首关于春天的四行短诗。",
"expected_length": 100
},
{
"id": "tc_005",
"name": "逻辑推理",
"prompt": "如果A大于BB大于C那么A和C的关系是什么请解释你的推理过程。",
"expected_length": 120
}
]
2026-03-03 00:00:13 +08:00
# 内存中的结果缓存
results_cache = {}
results_lock = Lock()
def load_config():
"""加载配置"""
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'r', encoding='utf-8') as f:
2026-03-03 00:00:13 +08:00
config = json.load(f)
# 合并默认配置
for key, value in DEFAULT_CONFIG.items():
if key not in config:
config[key] = value
return config
return DEFAULT_CONFIG.copy()
def save_config(config):
"""保存配置"""
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
json.dump(config, f, ensure_ascii=False, indent=2)
def load_test_cases():
"""加载测试用例"""
if os.path.exists(TEST_CASES_FILE):
with open(TEST_CASES_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
return DEFAULT_TEST_CASES.copy()
def save_test_cases(test_cases):
"""保存测试用例"""
with open(TEST_CASES_FILE, 'w', encoding='utf-8') as f:
json.dump(test_cases, f, ensure_ascii=False, indent=2)
def load_results():
2026-03-03 00:00:13 +08:00
"""加载历史结果"""
if os.path.exists(RESULTS_FILE):
with open(RESULTS_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
return []
def save_result(result):
"""保存测试结果"""
2026-03-03 00:00:13 +08:00
results = load_results()
results.append(result)
with open(RESULTS_FILE, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
2026-03-03 00:00:13 +08:00
def generate_id():
"""生成唯一ID"""
return str(uuid.uuid4())[:8]
2026-03-03 00:00:13 +08:00
def call_llm_api(config, prompt, stream=False):
"""
2026-03-03 00:00:13 +08:00
调用 LLM API
返回: (ttft, tps, total_time, output_text, tokens_in, tokens_out)
"""
2026-03-03 00:00:13 +08:00
api_base = config.get('api_base', '').rstrip('/')
api_key = config.get('api_key', '')
model = config.get('model', '')
timeout = config.get('timeout', 60)
max_tokens = config.get('max_tokens', 512)
temperature = config.get('temperature', 0.7)
headers = {
2026-03-03 00:00:13 +08:00
'Content-Type': 'application/json'
}
if api_key:
2026-03-03 00:00:13 +08:00
headers['Authorization'] = 'Bearer ' + api_key
payload = {
2026-03-03 00:00:13 +08:00
'model': model,
'messages': [{'role': 'user', 'content': prompt}],
'max_tokens': max_tokens,
'temperature': temperature,
'stream': stream
}
2026-03-03 00:00:13 +08:00
url = api_base + '/chat/completions'
start_time = time.time()
2026-03-03 00:00:13 +08:00
first_token_time = None
output_text = ''
tokens_in = 0
tokens_out = 0
try:
2026-03-03 00:00:13 +08:00
if stream:
# 流式模式 - 测量 TTFT
response = requests.post(url, headers=headers, json=payload,
timeout=timeout, stream=True)
response.raise_for_status()
2026-03-03 00:00:13 +08:00
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data = line[6:]
if data == '[DONE]':
break
try:
chunk = json.loads(data)
if 'choices' in chunk and len(chunk['choices']) > 0:
delta = chunk['choices'][0].get('delta', {})
content = delta.get('content', '')
if content:
if first_token_time is None:
first_token_time = time.time()
output_text += content
except:
pass
total_time = time.time() - start_time
ttft = first_token_time - start_time if first_token_time else total_time
# 估算 token 数量
tokens_in = len(prompt) // 4
tokens_out = len(output_text) // 4
# 计算 TPS
generation_time = total_time - ttft
tps = tokens_out / generation_time if generation_time > 0 else 0
else:
# 非流式模式
response = requests.post(url, headers=headers, json=payload, timeout=timeout)
response.raise_for_status()
end_time = time.time()
total_time = end_time - start_time
data = response.json()
if 'choices' in data and len(data['choices']) > 0:
output_text = data['choices'][0].get('message', {}).get('content', '')
# 获取 token 使用量
usage = data.get('usage', {})
tokens_in = usage.get('prompt_tokens', len(prompt) // 4)
tokens_out = usage.get('completion_tokens', len(output_text) // 4)
# 非流式模式下 TTFT 约等于总时间
ttft = total_time
tps = tokens_out / total_time if total_time > 0 else 0
return {
2026-03-03 00:00:13 +08:00
'success': True,
'ttft': round(ttft * 1000, 2), # 转换为毫秒
'tps': round(tps, 2),
'total_time': round(total_time, 3),
'output': output_text,
'tokens_in': tokens_in,
'tokens_out': tokens_out
}
except Exception as e:
return {
2026-03-03 00:00:13 +08:00
'success': False,
'error': str(e),
'ttft': 0,
'tps': 0,
'total_time': 0,
'output': '',
'tokens_in': 0,
'tokens_out': 0
}
2026-03-03 00:00:13 +08:00
def run_single_test(config, test_case, iterations=1, stream=False):
"""运行单个测试用例"""
results = []
2026-03-03 00:00:13 +08:00
for i in range(iterations):
result = call_llm_api(config, test_case['prompt'], stream=stream)
result['iteration'] = i + 1
results.append(result)
2026-03-03 00:00:13 +08:00
# 计算统计数据
successful = [r for r in results if r['success']]
2026-03-03 00:00:13 +08:00
if successful:
avg_ttft = statistics.mean([r['ttft'] for r in successful])
avg_tps = statistics.mean([r['tps'] for r in successful])
avg_total_time = statistics.mean([r['total_time'] for r in successful])
2026-03-03 00:00:13 +08:00
stats = {
'avg_ttft': round(avg_ttft, 2),
'avg_tps': round(avg_tps, 2),
'avg_total_time': round(avg_total_time, 3),
'min_ttft': round(min([r['ttft'] for r in successful]), 2),
'max_ttft': round(max([r['ttft'] for r in successful]), 2),
'min_tps': round(min([r['tps'] for r in successful]), 2),
'max_tps': round(max([r['tps'] for r in successful]), 2),
'success_rate': round(len(successful) / len(results) * 100, 1)
}
else:
stats = {
'avg_ttft': 0,
'avg_tps': 0,
'avg_total_time': 0,
'success_rate': 0
}
2026-03-03 00:00:13 +08:00
return {
'test_case': test_case,
'iterations': iterations,
'stream_mode': stream,
'results': results,
'statistics': stats
}
2026-03-03 00:00:13 +08:00
def run_concurrent_tests(config, test_cases, concurrency=1, iterations_per_case=1, stream=False):
"""并发运行多个测试用例"""
all_results = []
2026-03-03 00:00:13 +08:00
def run_test(tc):
return run_single_test(config, tc, iterations_per_case, stream)
2026-03-03 00:00:13 +08:00
if concurrency > 1:
with ThreadPoolExecutor(max_workers=concurrency) as executor:
futures = {executor.submit(run_test, tc): tc for tc in test_cases}
for future in as_completed(futures):
result = future.result()
all_results.append(result)
else:
for tc in test_cases:
result = run_test(tc)
all_results.append(result)
2026-03-03 00:00:13 +08:00
return all_results
@app.route('/')
def index():
"""主页"""
return render_template('index.html')
2026-03-03 00:00:13 +08:00
@app.route('/api/config', methods=['GET'])
def get_config():
"""获取配置"""
return jsonify(load_config())
2026-03-03 00:00:13 +08:00
@app.route('/api/config', methods=['POST'])
def update_config():
"""更新配置"""
config = request.json
save_config(config)
return jsonify({'status': 'success'})
@app.route('/api/test-cases', methods=['GET'])
def get_test_cases():
"""获取测试用例"""
return jsonify(load_test_cases())
@app.route('/api/test-cases', methods=['POST'])
def update_test_cases():
"""更新测试用例"""
test_cases = request.json
save_test_cases(test_cases)
return jsonify({'status': 'success'})
@app.route('/api/test-cases/<case_id>', methods=['DELETE'])
def delete_test_case(case_id):
"""删除测试用例"""
test_cases = load_test_cases()
test_cases = [tc for tc in test_cases if tc['id'] != case_id]
save_test_cases(test_cases)
return jsonify({'status': 'success'})
@app.route('/api/run-test', methods=['POST'])
2026-03-03 00:00:13 +08:00
def run_test():
"""运行测试"""
data = request.json
2026-03-03 00:00:13 +08:00
config = data.get('config', load_config())
test_case_ids = data.get('test_case_ids', [])
2026-03-03 00:00:13 +08:00
iterations = data.get('iterations', 1)
concurrency = data.get('concurrency', 1)
2026-03-03 00:00:13 +08:00
stream = data.get('stream', False)
2026-03-03 00:00:13 +08:00
# 加载选中的测试用例
all_test_cases = load_test_cases()
if test_case_ids:
2026-03-03 00:00:13 +08:00
selected_cases = [tc for tc in all_test_cases if tc['id'] in test_case_ids]
else:
2026-03-03 00:00:13 +08:00
selected_cases = all_test_cases
2026-03-03 00:00:13 +08:00
if not selected_cases:
return jsonify({'error': 'No test cases selected'}), 400
# 生成测试ID
test_id = generate_id()
# 运行测试
2026-03-03 00:00:13 +08:00
start_time = time.time()
results = run_concurrent_tests(config, selected_cases, concurrency, iterations, stream)
end_time = time.time()
2026-03-03 00:00:13 +08:00
# 汇总结果
summary = {
'test_id': test_id,
'timestamp': datetime.now().isoformat(),
'config': config,
'test_cases_count': len(selected_cases),
'iterations': iterations,
'concurrency': concurrency,
'stream_mode': stream,
'total_duration': round(end_time - start_time, 2),
'results': results
}
# 保存结果
2026-03-03 00:00:13 +08:00
save_result(summary)
2026-03-03 00:00:13 +08:00
# 缓存结果
with results_lock:
results_cache[test_id] = summary
return jsonify(summary)
@app.route('/api/results', methods=['GET'])
2026-03-03 00:00:13 +08:00
def get_results():
"""获取历史测试结果"""
return jsonify(load_results())
2026-03-03 00:00:13 +08:00
@app.route('/api/results/<test_id>', methods=['GET'])
def get_result(test_id):
"""获取单个测试结果"""
# 先查缓存
with results_lock:
if test_id in results_cache:
return jsonify(results_cache[test_id])
# 再查文件
results = load_results()
for result in results:
2026-03-03 00:00:13 +08:00
if result.get('test_id') == test_id:
return jsonify(result)
2026-03-03 00:00:13 +08:00
return jsonify({'error': 'Result not found'}), 404
2026-03-03 00:00:13 +08:00
@app.route('/api/results/<test_id>', methods=['DELETE'])
def delete_result(test_id):
"""删除测试结果"""
2026-03-03 00:00:13 +08:00
results = load_results()
results = [r for r in results if r.get('test_id') != test_id]
with open(RESULTS_FILE, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
with results_lock:
2026-03-03 00:00:13 +08:00
if test_id in results_cache:
del results_cache[test_id]
return jsonify({'status': 'success'})
2026-03-03 00:00:13 +08:00
@app.route('/api/verify-config', methods=['POST'])
def verify_config():
"""验证 API 配置是否可用"""
config = request.json
2026-03-03 00:00:13 +08:00
try:
api_base = config.get('api_base', '').rstrip('/')
api_key = config.get('api_key', '')
model = config.get('model', '')
headers = {'Content-Type': 'application/json'}
if api_key:
headers['Authorization'] = 'Bearer ' + api_key
# 尝试获取模型列表或进行简单调用
url = api_base + '/models'
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
return jsonify({'status': 'success', 'message': 'Connection successful'})
else:
# 尝试一个简单的 completion 调用来验证
test_payload = {
'model': model,
'messages': [{'role': 'user', 'content': 'Hi'}],
'max_tokens': 5
}
test_response = requests.post(api_base + '/chat/completions',
headers=headers, json=test_payload, timeout=10)
if test_response.status_code == 200:
return jsonify({'status': 'success', 'message': 'Connection successful'})
else:
return jsonify({'status': 'error', 'message': 'API returned status ' + str(test_response.status_code)})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8001, debug=True)