#!/usr/bin/env python3 """ LLM Performance Test Tool 支持本地和云端大模型性能测试,兼容 OpenAI API """ import os import json import time import uuid import statistics from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed from threading import Lock from flask import Flask, render_template, request, jsonify, send_from_directory import requests app = Flask(__name__) app.config['SECRET_KEY'] = 'llm-perf-test-secret-key' # 数据存储目录 DATA_DIR = os.path.join(os.path.dirname(__file__), 'data') if not os.path.exists(DATA_DIR): os.makedirs(DATA_DIR) # 配置文件路径 CONFIG_FILE = os.path.join(DATA_DIR, 'config.json') TEST_CASES_FILE = os.path.join(DATA_DIR, 'test_cases.json') RESULTS_FILE = os.path.join(DATA_DIR, 'results.json') # 默认配置 DEFAULT_CONFIG = { "api_base": "http://localhost:11434/v1", "api_key": "", "model": "qwen2.5:latest", "timeout": 60, "max_tokens": 512, "temperature": 0.7 } # 默认测试用例 DEFAULT_TEST_CASES = [ { "id": "tc_001", "name": "简单问答", "prompt": "你好,请介绍一下自己。", "expected_length": 100 }, { "id": "tc_002", "name": "代码生成", "prompt": "写一个Python函数计算斐波那契数列的前n项。", "expected_length": 200 }, { "id": "tc_003", "name": "长文本理解", "prompt": "请总结以下段落的主要观点:人工智能正在改变我们的生活方式。从智能手机到自动驾驶汽车,AI技术已经深入到我们日常生活的方方面面。它不仅提高了效率,还创造了新的可能性。", "expected_length": 150 }, { "id": "tc_004", "name": "创意写作", "prompt": "写一首关于春天的四行短诗。", "expected_length": 100 }, { "id": "tc_005", "name": "逻辑推理", "prompt": "如果A大于B,B大于C,那么A和C的关系是什么?请解释你的推理过程。", "expected_length": 120 } ] # 内存中的结果缓存 results_cache = {} results_lock = Lock() def load_config(): """加载配置""" if os.path.exists(CONFIG_FILE): with open(CONFIG_FILE, 'r', encoding='utf-8') as f: config = json.load(f) # 合并默认配置 for key, value in DEFAULT_CONFIG.items(): if key not in config: config[key] = value return config return DEFAULT_CONFIG.copy() def save_config(config): """保存配置""" with open(CONFIG_FILE, 'w', encoding='utf-8') as f: json.dump(config, f, ensure_ascii=False, indent=2) def load_test_cases(): """加载测试用例""" if os.path.exists(TEST_CASES_FILE): with open(TEST_CASES_FILE, 'r', encoding='utf-8') as f: return json.load(f) return DEFAULT_TEST_CASES.copy() def save_test_cases(test_cases): """保存测试用例""" with open(TEST_CASES_FILE, 'w', encoding='utf-8') as f: json.dump(test_cases, f, ensure_ascii=False, indent=2) def load_results(): """加载历史结果""" if os.path.exists(RESULTS_FILE): with open(RESULTS_FILE, 'r', encoding='utf-8') as f: return json.load(f) return [] def save_result(result): """保存测试结果""" results = load_results() results.append(result) with open(RESULTS_FILE, 'w', encoding='utf-8') as f: json.dump(results, f, ensure_ascii=False, indent=2) def generate_id(): """生成唯一ID""" return str(uuid.uuid4())[:8] def call_llm_api(config, prompt, stream=False): """ 调用 LLM API 返回: (ttft, tps, total_time, output_text, tokens_in, tokens_out) """ api_base = config.get('api_base', '').rstrip('/') api_key = config.get('api_key', '') model = config.get('model', '') timeout = config.get('timeout', 60) max_tokens = config.get('max_tokens', 512) temperature = config.get('temperature', 0.7) headers = { 'Content-Type': 'application/json' } if api_key: headers['Authorization'] = 'Bearer ' + api_key payload = { 'model': model, 'messages': [{'role': 'user', 'content': prompt}], 'max_tokens': max_tokens, 'temperature': temperature, 'stream': stream } url = api_base + '/chat/completions' start_time = time.time() first_token_time = None output_text = '' tokens_in = 0 tokens_out = 0 try: if stream: # 流式模式 - 测量 TTFT response = requests.post(url, headers=headers, json=payload, timeout=timeout, stream=True) response.raise_for_status() for line in response.iter_lines(): if line: line = line.decode('utf-8') if line.startswith('data: '): data = line[6:] if data == '[DONE]': break try: chunk = json.loads(data) if 'choices' in chunk and len(chunk['choices']) > 0: delta = chunk['choices'][0].get('delta', {}) content = delta.get('content', '') if content: if first_token_time is None: first_token_time = time.time() output_text += content except: pass total_time = time.time() - start_time ttft = first_token_time - start_time if first_token_time else total_time # 估算 token 数量 tokens_in = len(prompt) // 4 tokens_out = len(output_text) // 4 # 计算 TPS generation_time = total_time - ttft tps = tokens_out / generation_time if generation_time > 0 else 0 else: # 非流式模式 response = requests.post(url, headers=headers, json=payload, timeout=timeout) response.raise_for_status() end_time = time.time() total_time = end_time - start_time data = response.json() if 'choices' in data and len(data['choices']) > 0: output_text = data['choices'][0].get('message', {}).get('content', '') # 获取 token 使用量 usage = data.get('usage', {}) tokens_in = usage.get('prompt_tokens', len(prompt) // 4) tokens_out = usage.get('completion_tokens', len(output_text) // 4) # 非流式模式下 TTFT 约等于总时间 ttft = total_time tps = tokens_out / total_time if total_time > 0 else 0 return { 'success': True, 'ttft': round(ttft * 1000, 2), # 转换为毫秒 'tps': round(tps, 2), 'total_time': round(total_time, 3), 'output': output_text, 'tokens_in': tokens_in, 'tokens_out': tokens_out } except Exception as e: return { 'success': False, 'error': str(e), 'ttft': 0, 'tps': 0, 'total_time': 0, 'output': '', 'tokens_in': 0, 'tokens_out': 0 } def run_single_test(config, test_case, iterations=1, stream=False): """运行单个测试用例""" results = [] for i in range(iterations): result = call_llm_api(config, test_case['prompt'], stream=stream) result['iteration'] = i + 1 results.append(result) # 计算统计数据 successful = [r for r in results if r['success']] if successful: avg_ttft = statistics.mean([r['ttft'] for r in successful]) avg_tps = statistics.mean([r['tps'] for r in successful]) avg_total_time = statistics.mean([r['total_time'] for r in successful]) stats = { 'avg_ttft': round(avg_ttft, 2), 'avg_tps': round(avg_tps, 2), 'avg_total_time': round(avg_total_time, 3), 'min_ttft': round(min([r['ttft'] for r in successful]), 2), 'max_ttft': round(max([r['ttft'] for r in successful]), 2), 'min_tps': round(min([r['tps'] for r in successful]), 2), 'max_tps': round(max([r['tps'] for r in successful]), 2), 'success_rate': round(len(successful) / len(results) * 100, 1) } else: stats = { 'avg_ttft': 0, 'avg_tps': 0, 'avg_total_time': 0, 'success_rate': 0 } return { 'test_case': test_case, 'iterations': iterations, 'stream_mode': stream, 'results': results, 'statistics': stats } def run_concurrent_tests(config, test_cases, concurrency=1, iterations_per_case=1, stream=False): """并发运行多个测试用例""" all_results = [] def run_test(tc): return run_single_test(config, tc, iterations_per_case, stream) if concurrency > 1: with ThreadPoolExecutor(max_workers=concurrency) as executor: futures = {executor.submit(run_test, tc): tc for tc in test_cases} for future in as_completed(futures): result = future.result() all_results.append(result) else: for tc in test_cases: result = run_test(tc) all_results.append(result) return all_results @app.route('/') def index(): """主页""" return render_template('index.html') @app.route('/api/config', methods=['GET']) def get_config(): """获取配置""" return jsonify(load_config()) @app.route('/api/config', methods=['POST']) def update_config(): """更新配置""" config = request.json save_config(config) return jsonify({'status': 'success'}) @app.route('/api/test-cases', methods=['GET']) def get_test_cases(): """获取测试用例""" return jsonify(load_test_cases()) @app.route('/api/test-cases', methods=['POST']) def update_test_cases(): """更新测试用例""" test_cases = request.json save_test_cases(test_cases) return jsonify({'status': 'success'}) @app.route('/api/test-cases/', methods=['DELETE']) def delete_test_case(case_id): """删除测试用例""" test_cases = load_test_cases() test_cases = [tc for tc in test_cases if tc['id'] != case_id] save_test_cases(test_cases) return jsonify({'status': 'success'}) @app.route('/api/run-test', methods=['POST']) def run_test(): """运行测试""" data = request.json config = data.get('config', load_config()) test_case_ids = data.get('test_case_ids', []) iterations = data.get('iterations', 1) concurrency = data.get('concurrency', 1) stream = data.get('stream', False) # 加载选中的测试用例 all_test_cases = load_test_cases() if test_case_ids: selected_cases = [tc for tc in all_test_cases if tc['id'] in test_case_ids] else: selected_cases = all_test_cases if not selected_cases: return jsonify({'error': 'No test cases selected'}), 400 # 生成测试ID test_id = generate_id() # 运行测试 start_time = time.time() results = run_concurrent_tests(config, selected_cases, concurrency, iterations, stream) end_time = time.time() # 汇总结果 summary = { 'test_id': test_id, 'timestamp': datetime.now().isoformat(), 'config': config, 'test_cases_count': len(selected_cases), 'iterations': iterations, 'concurrency': concurrency, 'stream_mode': stream, 'total_duration': round(end_time - start_time, 2), 'results': results } # 保存结果 save_result(summary) # 缓存结果 with results_lock: results_cache[test_id] = summary return jsonify(summary) @app.route('/api/results', methods=['GET']) def get_results(): """获取历史测试结果""" return jsonify(load_results()) @app.route('/api/results/', methods=['GET']) def get_result(test_id): """获取单个测试结果""" # 先查缓存 with results_lock: if test_id in results_cache: return jsonify(results_cache[test_id]) # 再查文件 results = load_results() for result in results: if result.get('test_id') == test_id: return jsonify(result) return jsonify({'error': 'Result not found'}), 404 @app.route('/api/results/', methods=['DELETE']) def delete_result(test_id): """删除测试结果""" results = load_results() results = [r for r in results if r.get('test_id') != test_id] with open(RESULTS_FILE, 'w', encoding='utf-8') as f: json.dump(results, f, ensure_ascii=False, indent=2) with results_lock: if test_id in results_cache: del results_cache[test_id] return jsonify({'status': 'success'}) @app.route('/api/verify-config', methods=['POST']) def verify_config(): """验证 API 配置是否可用""" config = request.json try: api_base = config.get('api_base', '').rstrip('/') api_key = config.get('api_key', '') model = config.get('model', '') headers = {'Content-Type': 'application/json'} if api_key: headers['Authorization'] = 'Bearer ' + api_key # 尝试获取模型列表或进行简单调用 url = api_base + '/models' response = requests.get(url, headers=headers, timeout=10) if response.status_code == 200: return jsonify({'status': 'success', 'message': 'Connection successful'}) else: # 尝试一个简单的 completion 调用来验证 test_payload = { 'model': model, 'messages': [{'role': 'user', 'content': 'Hi'}], 'max_tokens': 5 } test_response = requests.post(api_base + '/chat/completions', headers=headers, json=test_payload, timeout=10) if test_response.status_code == 200: return jsonify({'status': 'success', 'message': 'Connection successful'}) else: return jsonify({'status': 'error', 'message': 'API returned status ' + str(test_response.status_code)}) except Exception as e: return jsonify({'status': 'error', 'message': str(e)}) if __name__ == '__main__': app.run(host='0.0.0.0', port=8001, debug=True)