llm_perf_test/app.py

487 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
LLM Performance Test Tool
支持本地和云端大模型性能测试,兼容 OpenAI API
"""
import os
import json
import time
import uuid
import statistics
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
from flask import Flask, render_template, request, jsonify, send_from_directory
import requests
app = Flask(__name__)
app.config['SECRET_KEY'] = 'llm-perf-test-secret-key'
# 数据存储目录
DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
if not os.path.exists(DATA_DIR):
os.makedirs(DATA_DIR)
# 配置文件路径
CONFIG_FILE = os.path.join(DATA_DIR, 'config.json')
TEST_CASES_FILE = os.path.join(DATA_DIR, 'test_cases.json')
RESULTS_FILE = os.path.join(DATA_DIR, 'results.json')
# 默认配置
DEFAULT_CONFIG = {
"api_base": "http://localhost:11434/v1",
"api_key": "",
"model": "qwen2.5:latest",
"timeout": 60,
"max_tokens": 512,
"temperature": 0.7
}
# 默认测试用例
DEFAULT_TEST_CASES = [
{
"id": "tc_001",
"name": "简单问答",
"prompt": "你好,请介绍一下自己。",
"expected_length": 100
},
{
"id": "tc_002",
"name": "代码生成",
"prompt": "写一个Python函数计算斐波那契数列的前n项。",
"expected_length": 200
},
{
"id": "tc_003",
"name": "长文本理解",
"prompt": "请总结以下段落的主要观点人工智能正在改变我们的生活方式。从智能手机到自动驾驶汽车AI技术已经深入到我们日常生活的方方面面。它不仅提高了效率还创造了新的可能性。",
"expected_length": 150
},
{
"id": "tc_004",
"name": "创意写作",
"prompt": "写一首关于春天的四行短诗。",
"expected_length": 100
},
{
"id": "tc_005",
"name": "逻辑推理",
"prompt": "如果A大于BB大于C那么A和C的关系是什么请解释你的推理过程。",
"expected_length": 120
}
]
# 内存中的结果缓存
results_cache = {}
results_lock = Lock()
def load_config():
"""加载配置"""
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'r', encoding='utf-8') as f:
config = json.load(f)
# 合并默认配置
for key, value in DEFAULT_CONFIG.items():
if key not in config:
config[key] = value
return config
return DEFAULT_CONFIG.copy()
def save_config(config):
"""保存配置"""
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
json.dump(config, f, ensure_ascii=False, indent=2)
def load_test_cases():
"""加载测试用例"""
if os.path.exists(TEST_CASES_FILE):
with open(TEST_CASES_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
return DEFAULT_TEST_CASES.copy()
def save_test_cases(test_cases):
"""保存测试用例"""
with open(TEST_CASES_FILE, 'w', encoding='utf-8') as f:
json.dump(test_cases, f, ensure_ascii=False, indent=2)
def load_results():
"""加载历史结果"""
if os.path.exists(RESULTS_FILE):
with open(RESULTS_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
return []
def save_result(result):
"""保存测试结果"""
results = load_results()
results.append(result)
with open(RESULTS_FILE, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
def generate_id():
"""生成唯一ID"""
return str(uuid.uuid4())[:8]
def call_llm_api(config, prompt, stream=False):
"""
调用 LLM API
返回: (ttft, tps, total_time, output_text, tokens_in, tokens_out)
"""
api_base = config.get('api_base', '').rstrip('/')
api_key = config.get('api_key', '')
model = config.get('model', '')
timeout = config.get('timeout', 60)
max_tokens = config.get('max_tokens', 512)
temperature = config.get('temperature', 0.7)
headers = {
'Content-Type': 'application/json'
}
if api_key:
headers['Authorization'] = 'Bearer ' + api_key
payload = {
'model': model,
'messages': [{'role': 'user', 'content': prompt}],
'max_tokens': max_tokens,
'temperature': temperature,
'stream': stream
}
url = api_base + '/chat/completions'
start_time = time.time()
first_token_time = None
output_text = ''
tokens_in = 0
tokens_out = 0
try:
if stream:
# 流式模式 - 测量 TTFT
response = requests.post(url, headers=headers, json=payload,
timeout=timeout, stream=True)
response.raise_for_status()
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data = line[6:]
if data == '[DONE]':
break
try:
chunk = json.loads(data)
if 'choices' in chunk and len(chunk['choices']) > 0:
delta = chunk['choices'][0].get('delta', {})
content = delta.get('content', '')
if content:
if first_token_time is None:
first_token_time = time.time()
output_text += content
except:
pass
total_time = time.time() - start_time
ttft = first_token_time - start_time if first_token_time else total_time
# 估算 token 数量
tokens_in = len(prompt) // 4
tokens_out = len(output_text) // 4
# 计算 TPS
generation_time = total_time - ttft
tps = tokens_out / generation_time if generation_time > 0 else 0
else:
# 非流式模式
response = requests.post(url, headers=headers, json=payload, timeout=timeout)
response.raise_for_status()
end_time = time.time()
total_time = end_time - start_time
data = response.json()
if 'choices' in data and len(data['choices']) > 0:
output_text = data['choices'][0].get('message', {}).get('content', '')
# 获取 token 使用量
usage = data.get('usage', {})
tokens_in = usage.get('prompt_tokens', len(prompt) // 4)
tokens_out = usage.get('completion_tokens', len(output_text) // 4)
# 非流式模式下 TTFT 约等于总时间
ttft = total_time
tps = tokens_out / total_time if total_time > 0 else 0
return {
'success': True,
'ttft': round(ttft * 1000, 2), # 转换为毫秒
'tps': round(tps, 2),
'total_time': round(total_time, 3),
'output': output_text,
'tokens_in': tokens_in,
'tokens_out': tokens_out
}
except Exception as e:
return {
'success': False,
'error': str(e),
'ttft': 0,
'tps': 0,
'total_time': 0,
'output': '',
'tokens_in': 0,
'tokens_out': 0
}
def run_single_test(config, test_case, iterations=1, stream=False):
"""运行单个测试用例"""
results = []
for i in range(iterations):
result = call_llm_api(config, test_case['prompt'], stream=stream)
result['iteration'] = i + 1
results.append(result)
# 计算统计数据
successful = [r for r in results if r['success']]
if successful:
avg_ttft = statistics.mean([r['ttft'] for r in successful])
avg_tps = statistics.mean([r['tps'] for r in successful])
avg_total_time = statistics.mean([r['total_time'] for r in successful])
stats = {
'avg_ttft': round(avg_ttft, 2),
'avg_tps': round(avg_tps, 2),
'avg_total_time': round(avg_total_time, 3),
'min_ttft': round(min([r['ttft'] for r in successful]), 2),
'max_ttft': round(max([r['ttft'] for r in successful]), 2),
'min_tps': round(min([r['tps'] for r in successful]), 2),
'max_tps': round(max([r['tps'] for r in successful]), 2),
'success_rate': round(len(successful) / len(results) * 100, 1)
}
else:
stats = {
'avg_ttft': 0,
'avg_tps': 0,
'avg_total_time': 0,
'success_rate': 0
}
return {
'test_case': test_case,
'iterations': iterations,
'stream_mode': stream,
'results': results,
'statistics': stats
}
def run_concurrent_tests(config, test_cases, concurrency=1, iterations_per_case=1, stream=False):
"""并发运行多个测试用例"""
all_results = []
def run_test(tc):
return run_single_test(config, tc, iterations_per_case, stream)
if concurrency > 1:
with ThreadPoolExecutor(max_workers=concurrency) as executor:
futures = {executor.submit(run_test, tc): tc for tc in test_cases}
for future in as_completed(futures):
result = future.result()
all_results.append(result)
else:
for tc in test_cases:
result = run_test(tc)
all_results.append(result)
return all_results
@app.route('/')
def index():
"""主页"""
return render_template('index.html')
@app.route('/api/config', methods=['GET'])
def get_config():
"""获取配置"""
return jsonify(load_config())
@app.route('/api/config', methods=['POST'])
def update_config():
"""更新配置"""
config = request.json
save_config(config)
return jsonify({'status': 'success'})
@app.route('/api/test-cases', methods=['GET'])
def get_test_cases():
"""获取测试用例"""
return jsonify(load_test_cases())
@app.route('/api/test-cases', methods=['POST'])
def update_test_cases():
"""更新测试用例"""
test_cases = request.json
save_test_cases(test_cases)
return jsonify({'status': 'success'})
@app.route('/api/test-cases/<case_id>', methods=['DELETE'])
def delete_test_case(case_id):
"""删除测试用例"""
test_cases = load_test_cases()
test_cases = [tc for tc in test_cases if tc['id'] != case_id]
save_test_cases(test_cases)
return jsonify({'status': 'success'})
@app.route('/api/run-test', methods=['POST'])
def run_test():
"""运行测试"""
data = request.json
config = data.get('config', load_config())
test_case_ids = data.get('test_case_ids', [])
iterations = data.get('iterations', 1)
concurrency = data.get('concurrency', 1)
stream = data.get('stream', False)
# 加载选中的测试用例
all_test_cases = load_test_cases()
if test_case_ids:
selected_cases = [tc for tc in all_test_cases if tc['id'] in test_case_ids]
else:
selected_cases = all_test_cases
if not selected_cases:
return jsonify({'error': 'No test cases selected'}), 400
# 生成测试ID
test_id = generate_id()
# 运行测试
start_time = time.time()
results = run_concurrent_tests(config, selected_cases, concurrency, iterations, stream)
end_time = time.time()
# 汇总结果
summary = {
'test_id': test_id,
'timestamp': datetime.now().isoformat(),
'config': config,
'test_cases_count': len(selected_cases),
'iterations': iterations,
'concurrency': concurrency,
'stream_mode': stream,
'total_duration': round(end_time - start_time, 2),
'results': results
}
# 保存结果
save_result(summary)
# 缓存结果
with results_lock:
results_cache[test_id] = summary
return jsonify(summary)
@app.route('/api/results', methods=['GET'])
def get_results():
"""获取历史测试结果"""
return jsonify(load_results())
@app.route('/api/results/<test_id>', methods=['GET'])
def get_result(test_id):
"""获取单个测试结果"""
# 先查缓存
with results_lock:
if test_id in results_cache:
return jsonify(results_cache[test_id])
# 再查文件
results = load_results()
for result in results:
if result.get('test_id') == test_id:
return jsonify(result)
return jsonify({'error': 'Result not found'}), 404
@app.route('/api/results/<test_id>', methods=['DELETE'])
def delete_result(test_id):
"""删除测试结果"""
results = load_results()
results = [r for r in results if r.get('test_id') != test_id]
with open(RESULTS_FILE, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
with results_lock:
if test_id in results_cache:
del results_cache[test_id]
return jsonify({'status': 'success'})
@app.route('/api/verify-config', methods=['POST'])
def verify_config():
"""验证 API 配置是否可用"""
config = request.json
try:
api_base = config.get('api_base', '').rstrip('/')
api_key = config.get('api_key', '')
model = config.get('model', '')
headers = {'Content-Type': 'application/json'}
if api_key:
headers['Authorization'] = 'Bearer ' + api_key
# 尝试获取模型列表或进行简单调用
url = api_base + '/models'
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
return jsonify({'status': 'success', 'message': 'Connection successful'})
else:
# 尝试一个简单的 completion 调用来验证
test_payload = {
'model': model,
'messages': [{'role': 'user', 'content': 'Hi'}],
'max_tokens': 5
}
test_response = requests.post(api_base + '/chat/completions',
headers=headers, json=test_payload, timeout=10)
if test_response.status_code == 200:
return jsonify({'status': 'success', 'message': 'Connection successful'})
else:
return jsonify({'status': 'error', 'message': 'API returned status ' + str(test_response.status_code)})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8001, debug=True)