487 lines
15 KiB
Python
487 lines
15 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
LLM Performance Test Tool
|
||
支持本地和云端大模型性能测试,兼容 OpenAI API
|
||
"""
|
||
|
||
import os
|
||
import json
|
||
import time
|
||
import uuid
|
||
import statistics
|
||
from datetime import datetime
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from threading import Lock
|
||
|
||
from flask import Flask, render_template, request, jsonify, send_from_directory
|
||
import requests
|
||
|
||
app = Flask(__name__)
|
||
app.config['SECRET_KEY'] = 'llm-perf-test-secret-key'
|
||
|
||
# 数据存储目录
|
||
DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
|
||
if not os.path.exists(DATA_DIR):
|
||
os.makedirs(DATA_DIR)
|
||
|
||
# 配置文件路径
|
||
CONFIG_FILE = os.path.join(DATA_DIR, 'config.json')
|
||
TEST_CASES_FILE = os.path.join(DATA_DIR, 'test_cases.json')
|
||
RESULTS_FILE = os.path.join(DATA_DIR, 'results.json')
|
||
|
||
# 默认配置
|
||
DEFAULT_CONFIG = {
|
||
"api_base": "http://localhost:11434/v1",
|
||
"api_key": "",
|
||
"model": "qwen2.5:latest",
|
||
"timeout": 60,
|
||
"max_tokens": 512,
|
||
"temperature": 0.7
|
||
}
|
||
|
||
# 默认测试用例
|
||
DEFAULT_TEST_CASES = [
|
||
{
|
||
"id": "tc_001",
|
||
"name": "简单问答",
|
||
"prompt": "你好,请介绍一下自己。",
|
||
"expected_length": 100
|
||
},
|
||
{
|
||
"id": "tc_002",
|
||
"name": "代码生成",
|
||
"prompt": "写一个Python函数计算斐波那契数列的前n项。",
|
||
"expected_length": 200
|
||
},
|
||
{
|
||
"id": "tc_003",
|
||
"name": "长文本理解",
|
||
"prompt": "请总结以下段落的主要观点:人工智能正在改变我们的生活方式。从智能手机到自动驾驶汽车,AI技术已经深入到我们日常生活的方方面面。它不仅提高了效率,还创造了新的可能性。",
|
||
"expected_length": 150
|
||
},
|
||
{
|
||
"id": "tc_004",
|
||
"name": "创意写作",
|
||
"prompt": "写一首关于春天的四行短诗。",
|
||
"expected_length": 100
|
||
},
|
||
{
|
||
"id": "tc_005",
|
||
"name": "逻辑推理",
|
||
"prompt": "如果A大于B,B大于C,那么A和C的关系是什么?请解释你的推理过程。",
|
||
"expected_length": 120
|
||
}
|
||
]
|
||
|
||
# 内存中的结果缓存
|
||
results_cache = {}
|
||
results_lock = Lock()
|
||
|
||
|
||
def load_config():
|
||
"""加载配置"""
|
||
if os.path.exists(CONFIG_FILE):
|
||
with open(CONFIG_FILE, 'r', encoding='utf-8') as f:
|
||
config = json.load(f)
|
||
# 合并默认配置
|
||
for key, value in DEFAULT_CONFIG.items():
|
||
if key not in config:
|
||
config[key] = value
|
||
return config
|
||
return DEFAULT_CONFIG.copy()
|
||
|
||
|
||
def save_config(config):
|
||
"""保存配置"""
|
||
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
|
||
json.dump(config, f, ensure_ascii=False, indent=2)
|
||
|
||
|
||
def load_test_cases():
|
||
"""加载测试用例"""
|
||
if os.path.exists(TEST_CASES_FILE):
|
||
with open(TEST_CASES_FILE, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
return DEFAULT_TEST_CASES.copy()
|
||
|
||
|
||
def save_test_cases(test_cases):
|
||
"""保存测试用例"""
|
||
with open(TEST_CASES_FILE, 'w', encoding='utf-8') as f:
|
||
json.dump(test_cases, f, ensure_ascii=False, indent=2)
|
||
|
||
|
||
def load_results():
|
||
"""加载历史结果"""
|
||
if os.path.exists(RESULTS_FILE):
|
||
with open(RESULTS_FILE, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
return []
|
||
|
||
|
||
def save_result(result):
|
||
"""保存测试结果"""
|
||
results = load_results()
|
||
results.append(result)
|
||
with open(RESULTS_FILE, 'w', encoding='utf-8') as f:
|
||
json.dump(results, f, ensure_ascii=False, indent=2)
|
||
|
||
|
||
def generate_id():
|
||
"""生成唯一ID"""
|
||
return str(uuid.uuid4())[:8]
|
||
|
||
|
||
def call_llm_api(config, prompt, stream=False):
|
||
"""
|
||
调用 LLM API
|
||
返回: (ttft, tps, total_time, output_text, tokens_in, tokens_out)
|
||
"""
|
||
api_base = config.get('api_base', '').rstrip('/')
|
||
api_key = config.get('api_key', '')
|
||
model = config.get('model', '')
|
||
timeout = config.get('timeout', 60)
|
||
max_tokens = config.get('max_tokens', 512)
|
||
temperature = config.get('temperature', 0.7)
|
||
|
||
headers = {
|
||
'Content-Type': 'application/json'
|
||
}
|
||
if api_key:
|
||
headers['Authorization'] = 'Bearer ' + api_key
|
||
|
||
payload = {
|
||
'model': model,
|
||
'messages': [{'role': 'user', 'content': prompt}],
|
||
'max_tokens': max_tokens,
|
||
'temperature': temperature,
|
||
'stream': stream
|
||
}
|
||
|
||
url = api_base + '/chat/completions'
|
||
|
||
start_time = time.time()
|
||
first_token_time = None
|
||
output_text = ''
|
||
tokens_in = 0
|
||
tokens_out = 0
|
||
|
||
try:
|
||
if stream:
|
||
# 流式模式 - 测量 TTFT
|
||
response = requests.post(url, headers=headers, json=payload,
|
||
timeout=timeout, stream=True)
|
||
response.raise_for_status()
|
||
|
||
for line in response.iter_lines():
|
||
if line:
|
||
line = line.decode('utf-8')
|
||
if line.startswith('data: '):
|
||
data = line[6:]
|
||
if data == '[DONE]':
|
||
break
|
||
try:
|
||
chunk = json.loads(data)
|
||
if 'choices' in chunk and len(chunk['choices']) > 0:
|
||
delta = chunk['choices'][0].get('delta', {})
|
||
content = delta.get('content', '')
|
||
if content:
|
||
if first_token_time is None:
|
||
first_token_time = time.time()
|
||
output_text += content
|
||
except:
|
||
pass
|
||
|
||
total_time = time.time() - start_time
|
||
ttft = first_token_time - start_time if first_token_time else total_time
|
||
|
||
# 估算 token 数量
|
||
tokens_in = len(prompt) // 4
|
||
tokens_out = len(output_text) // 4
|
||
|
||
# 计算 TPS
|
||
generation_time = total_time - ttft
|
||
tps = tokens_out / generation_time if generation_time > 0 else 0
|
||
|
||
else:
|
||
# 非流式模式
|
||
response = requests.post(url, headers=headers, json=payload, timeout=timeout)
|
||
response.raise_for_status()
|
||
|
||
end_time = time.time()
|
||
total_time = end_time - start_time
|
||
|
||
data = response.json()
|
||
if 'choices' in data and len(data['choices']) > 0:
|
||
output_text = data['choices'][0].get('message', {}).get('content', '')
|
||
|
||
# 获取 token 使用量
|
||
usage = data.get('usage', {})
|
||
tokens_in = usage.get('prompt_tokens', len(prompt) // 4)
|
||
tokens_out = usage.get('completion_tokens', len(output_text) // 4)
|
||
|
||
# 非流式模式下 TTFT 约等于总时间
|
||
ttft = total_time
|
||
tps = tokens_out / total_time if total_time > 0 else 0
|
||
|
||
return {
|
||
'success': True,
|
||
'ttft': round(ttft * 1000, 2), # 转换为毫秒
|
||
'tps': round(tps, 2),
|
||
'total_time': round(total_time, 3),
|
||
'output': output_text,
|
||
'tokens_in': tokens_in,
|
||
'tokens_out': tokens_out
|
||
}
|
||
|
||
except Exception as e:
|
||
return {
|
||
'success': False,
|
||
'error': str(e),
|
||
'ttft': 0,
|
||
'tps': 0,
|
||
'total_time': 0,
|
||
'output': '',
|
||
'tokens_in': 0,
|
||
'tokens_out': 0
|
||
}
|
||
|
||
|
||
def run_single_test(config, test_case, iterations=1, stream=False):
|
||
"""运行单个测试用例"""
|
||
results = []
|
||
|
||
for i in range(iterations):
|
||
result = call_llm_api(config, test_case['prompt'], stream=stream)
|
||
result['iteration'] = i + 1
|
||
results.append(result)
|
||
|
||
# 计算统计数据
|
||
successful = [r for r in results if r['success']]
|
||
|
||
if successful:
|
||
avg_ttft = statistics.mean([r['ttft'] for r in successful])
|
||
avg_tps = statistics.mean([r['tps'] for r in successful])
|
||
avg_total_time = statistics.mean([r['total_time'] for r in successful])
|
||
|
||
stats = {
|
||
'avg_ttft': round(avg_ttft, 2),
|
||
'avg_tps': round(avg_tps, 2),
|
||
'avg_total_time': round(avg_total_time, 3),
|
||
'min_ttft': round(min([r['ttft'] for r in successful]), 2),
|
||
'max_ttft': round(max([r['ttft'] for r in successful]), 2),
|
||
'min_tps': round(min([r['tps'] for r in successful]), 2),
|
||
'max_tps': round(max([r['tps'] for r in successful]), 2),
|
||
'success_rate': round(len(successful) / len(results) * 100, 1)
|
||
}
|
||
else:
|
||
stats = {
|
||
'avg_ttft': 0,
|
||
'avg_tps': 0,
|
||
'avg_total_time': 0,
|
||
'success_rate': 0
|
||
}
|
||
|
||
return {
|
||
'test_case': test_case,
|
||
'iterations': iterations,
|
||
'stream_mode': stream,
|
||
'results': results,
|
||
'statistics': stats
|
||
}
|
||
|
||
|
||
def run_concurrent_tests(config, test_cases, concurrency=1, iterations_per_case=1, stream=False):
|
||
"""并发运行多个测试用例"""
|
||
all_results = []
|
||
|
||
def run_test(tc):
|
||
return run_single_test(config, tc, iterations_per_case, stream)
|
||
|
||
if concurrency > 1:
|
||
with ThreadPoolExecutor(max_workers=concurrency) as executor:
|
||
futures = {executor.submit(run_test, tc): tc for tc in test_cases}
|
||
for future in as_completed(futures):
|
||
result = future.result()
|
||
all_results.append(result)
|
||
else:
|
||
for tc in test_cases:
|
||
result = run_test(tc)
|
||
all_results.append(result)
|
||
|
||
return all_results
|
||
|
||
|
||
@app.route('/')
|
||
def index():
|
||
"""主页"""
|
||
return render_template('index.html')
|
||
|
||
|
||
@app.route('/api/config', methods=['GET'])
|
||
def get_config():
|
||
"""获取配置"""
|
||
return jsonify(load_config())
|
||
|
||
|
||
@app.route('/api/config', methods=['POST'])
|
||
def update_config():
|
||
"""更新配置"""
|
||
config = request.json
|
||
save_config(config)
|
||
return jsonify({'status': 'success'})
|
||
|
||
|
||
@app.route('/api/test-cases', methods=['GET'])
|
||
def get_test_cases():
|
||
"""获取测试用例"""
|
||
return jsonify(load_test_cases())
|
||
|
||
|
||
@app.route('/api/test-cases', methods=['POST'])
|
||
def update_test_cases():
|
||
"""更新测试用例"""
|
||
test_cases = request.json
|
||
save_test_cases(test_cases)
|
||
return jsonify({'status': 'success'})
|
||
|
||
|
||
@app.route('/api/test-cases/<case_id>', methods=['DELETE'])
|
||
def delete_test_case(case_id):
|
||
"""删除测试用例"""
|
||
test_cases = load_test_cases()
|
||
test_cases = [tc for tc in test_cases if tc['id'] != case_id]
|
||
save_test_cases(test_cases)
|
||
return jsonify({'status': 'success'})
|
||
|
||
|
||
@app.route('/api/run-test', methods=['POST'])
|
||
def run_test():
|
||
"""运行测试"""
|
||
data = request.json
|
||
config = data.get('config', load_config())
|
||
test_case_ids = data.get('test_case_ids', [])
|
||
iterations = data.get('iterations', 1)
|
||
concurrency = data.get('concurrency', 1)
|
||
stream = data.get('stream', False)
|
||
|
||
# 加载选中的测试用例
|
||
all_test_cases = load_test_cases()
|
||
if test_case_ids:
|
||
selected_cases = [tc for tc in all_test_cases if tc['id'] in test_case_ids]
|
||
else:
|
||
selected_cases = all_test_cases
|
||
|
||
if not selected_cases:
|
||
return jsonify({'error': 'No test cases selected'}), 400
|
||
|
||
# 生成测试ID
|
||
test_id = generate_id()
|
||
|
||
# 运行测试
|
||
start_time = time.time()
|
||
results = run_concurrent_tests(config, selected_cases, concurrency, iterations, stream)
|
||
end_time = time.time()
|
||
|
||
# 汇总结果
|
||
summary = {
|
||
'test_id': test_id,
|
||
'timestamp': datetime.now().isoformat(),
|
||
'config': config,
|
||
'test_cases_count': len(selected_cases),
|
||
'iterations': iterations,
|
||
'concurrency': concurrency,
|
||
'stream_mode': stream,
|
||
'total_duration': round(end_time - start_time, 2),
|
||
'results': results
|
||
}
|
||
|
||
# 保存结果
|
||
save_result(summary)
|
||
|
||
# 缓存结果
|
||
with results_lock:
|
||
results_cache[test_id] = summary
|
||
|
||
return jsonify(summary)
|
||
|
||
|
||
@app.route('/api/results', methods=['GET'])
|
||
def get_results():
|
||
"""获取历史测试结果"""
|
||
return jsonify(load_results())
|
||
|
||
|
||
@app.route('/api/results/<test_id>', methods=['GET'])
|
||
def get_result(test_id):
|
||
"""获取单个测试结果"""
|
||
# 先查缓存
|
||
with results_lock:
|
||
if test_id in results_cache:
|
||
return jsonify(results_cache[test_id])
|
||
|
||
# 再查文件
|
||
results = load_results()
|
||
for result in results:
|
||
if result.get('test_id') == test_id:
|
||
return jsonify(result)
|
||
|
||
return jsonify({'error': 'Result not found'}), 404
|
||
|
||
|
||
@app.route('/api/results/<test_id>', methods=['DELETE'])
|
||
def delete_result(test_id):
|
||
"""删除测试结果"""
|
||
results = load_results()
|
||
results = [r for r in results if r.get('test_id') != test_id]
|
||
with open(RESULTS_FILE, 'w', encoding='utf-8') as f:
|
||
json.dump(results, f, ensure_ascii=False, indent=2)
|
||
|
||
with results_lock:
|
||
if test_id in results_cache:
|
||
del results_cache[test_id]
|
||
|
||
return jsonify({'status': 'success'})
|
||
|
||
|
||
@app.route('/api/verify-config', methods=['POST'])
|
||
def verify_config():
|
||
"""验证 API 配置是否可用"""
|
||
config = request.json
|
||
|
||
try:
|
||
api_base = config.get('api_base', '').rstrip('/')
|
||
api_key = config.get('api_key', '')
|
||
model = config.get('model', '')
|
||
|
||
headers = {'Content-Type': 'application/json'}
|
||
if api_key:
|
||
headers['Authorization'] = 'Bearer ' + api_key
|
||
|
||
# 尝试获取模型列表或进行简单调用
|
||
url = api_base + '/models'
|
||
response = requests.get(url, headers=headers, timeout=10)
|
||
|
||
if response.status_code == 200:
|
||
return jsonify({'status': 'success', 'message': 'Connection successful'})
|
||
else:
|
||
# 尝试一个简单的 completion 调用来验证
|
||
test_payload = {
|
||
'model': model,
|
||
'messages': [{'role': 'user', 'content': 'Hi'}],
|
||
'max_tokens': 5
|
||
}
|
||
test_response = requests.post(api_base + '/chat/completions',
|
||
headers=headers, json=test_payload, timeout=10)
|
||
if test_response.status_code == 200:
|
||
return jsonify({'status': 'success', 'message': 'Connection successful'})
|
||
else:
|
||
return jsonify({'status': 'error', 'message': 'API returned status ' + str(test_response.status_code)})
|
||
|
||
except Exception as e:
|
||
return jsonify({'status': 'error', 'message': str(e)})
|
||
|
||
|
||
if __name__ == '__main__':
|
||
app.run(host='0.0.0.0', port=8001, debug=True)
|