From 4f8048bd8df02a9f48a4c769b1ef92cb118c6b33 Mon Sep 17 00:00:00 2001 From: OpenClaw Date: Tue, 3 Mar 2026 00:00:13 +0800 Subject: [PATCH] Update for Python 3.6 compatibility --- app.py | 536 +++++++++++++++++++++++++++-------------------- requirements.txt | 17 +- 2 files changed, 318 insertions(+), 235 deletions(-) diff --git a/app.py b/app.py index 2f5f5d7..54bd431 100644 --- a/app.py +++ b/app.py @@ -21,7 +21,8 @@ app.config['SECRET_KEY'] = 'llm-perf-test-secret-key' # 数据存储目录 DATA_DIR = os.path.join(os.path.dirname(__file__), 'data') -os.makedirs(DATA_DIR, exist_ok=True) +if not os.path.exists(DATA_DIR): + os.makedirs(DATA_DIR) # 配置文件路径 CONFIG_FILE = os.path.join(DATA_DIR, 'config.json') @@ -47,26 +48,33 @@ DEFAULT_TEST_CASES = [ "expected_length": 100 }, { - "id": "tc_002", + "id": "tc_002", "name": "代码生成", - "prompt": "写一个Python函数,计算斐波那契数列的前n项。", + "prompt": "写一个Python函数计算斐波那契数列的前n项。", "expected_length": 200 }, { "id": "tc_003", "name": "长文本理解", - "prompt": """请总结以下段落的主要观点:\n\n人工智能(AI)是计算机科学的一个分支,致力于创造能够执行通常需要人类智能的任务的系统。这些任务包括视觉感知、语音识别、决策制定和语言翻译等。机器学习是AI的一个子集,它使计算机能够从数据中学习并改进,而无需明确编程。深度学习是机器学习的一种特定方法,使用人工神经网络来模拟人脑的工作方式。近年来,随着计算能力的提升和大数据的可用性,AI技术取得了显著进展,在医疗诊断、自动驾驶汽车、自然语言处理等领域展现出巨大潜力。然而,AI的发展也引发了关于隐私、就业和伦理等方面的担忧,需要社会各界共同探讨和制定相应的规范。""", + "prompt": "请总结以下段落的主要观点:人工智能正在改变我们的生活方式。从智能手机到自动驾驶汽车,AI技术已经深入到我们日常生活的方方面面。它不仅提高了效率,还创造了新的可能性。", "expected_length": 150 }, { "id": "tc_004", "name": "创意写作", - "prompt": "写一个关于未来城市的短篇科幻故事,约300字。", - "expected_length": 400 + "prompt": "写一首关于春天的四行短诗。", + "expected_length": 100 + }, + { + "id": "tc_005", + "name": "逻辑推理", + "prompt": "如果A大于B,B大于C,那么A和C的关系是什么?请解释你的推理过程。", + "expected_length": 120 } ] -# 全局锁 +# 内存中的结果缓存 +results_cache = {} results_lock = Lock() @@ -74,7 +82,12 @@ def load_config(): """加载配置""" if os.path.exists(CONFIG_FILE): with open(CONFIG_FILE, 'r', encoding='utf-8') as f: - return {**DEFAULT_CONFIG, **json.load(f)} + config = json.load(f) + # 合并默认配置 + for key, value in DEFAULT_CONFIG.items(): + if key not in config: + config[key] = value + return config return DEFAULT_CONFIG.copy() @@ -99,7 +112,7 @@ def save_test_cases(test_cases): def load_results(): - """加载历史测试结果""" + """加载历史结果""" if os.path.exists(RESULTS_FILE): with open(RESULTS_FILE, 'r', encoding='utf-8') as f: return json.load(f) @@ -108,177 +121,196 @@ def load_results(): def save_result(result): """保存测试结果""" - with results_lock: - results = load_results() - results.append(result) - with open(RESULTS_FILE, 'w', encoding='utf-8') as f: - json.dump(results, f, ensure_ascii=False, indent=2) + results = load_results() + results.append(result) + with open(RESULTS_FILE, 'w', encoding='utf-8') as f: + json.dump(results, f, ensure_ascii=False, indent=2) -def stream_chat_completion(api_base, api_key, model, messages, max_tokens, temperature, timeout): +def generate_id(): + """生成唯一ID""" + return str(uuid.uuid4())[:8] + + +def call_llm_api(config, prompt, stream=False): """ - 流式调用 LLM API,实时计算 TTFT 和 TPS + 调用 LLM API + 返回: (ttft, tps, total_time, output_text, tokens_in, tokens_out) """ + api_base = config.get('api_base', '').rstrip('/') + api_key = config.get('api_key', '') + model = config.get('model', '') + timeout = config.get('timeout', 60) + max_tokens = config.get('max_tokens', 512) + temperature = config.get('temperature', 0.7) + headers = { - "Content-Type": "application/json" + 'Content-Type': 'application/json' } if api_key: - headers["Authorization"] = f"Bearer {api_key}" + headers['Authorization'] = 'Bearer ' + api_key payload = { - "model": model, - "messages": messages, - "max_tokens": max_tokens, - "temperature": temperature, - "stream": True + 'model': model, + 'messages': [{'role': 'user', 'content': prompt}], + 'max_tokens': max_tokens, + 'temperature': temperature, + 'stream': stream } - url = f"{api_base}/chat/completions" + url = api_base + '/chat/completions' - first_token_time = None start_time = time.time() - total_tokens = 0 - content_chunks = [] + first_token_time = None + output_text = '' + tokens_in = 0 + tokens_out = 0 try: - response = requests.post(url, headers=headers, json=payload, - timeout=timeout, stream=True) - response.raise_for_status() - - for line in response.iter_lines(): - if not line: - continue + if stream: + # 流式模式 - 测量 TTFT + response = requests.post(url, headers=headers, json=payload, + timeout=timeout, stream=True) + response.raise_for_status() - line_str = line.decode('utf-8') - if line_str.startswith('data: '): - data_str = line_str[6:] - if data_str == '[DONE]': - break - - try: - data = json.loads(data_str) - delta = data.get('choices', [{}])[0].get('delta', {}) - content = delta.get('content', '') - - if content: - if first_token_time is None: - first_token_time = time.time() - content_chunks.append(content) - total_tokens += len(content) # 近似token数 - except json.JSONDecodeError: - continue - - end_time = time.time() - - # 计算指标 - ttft = (first_token_time - start_time) * 1000 if first_token_time else 0 # ms - total_time = (end_time - start_time) * 1000 # ms - tps = total_tokens / (total_time / 1000) if total_time > 0 else 0 + for line in response.iter_lines(): + if line: + line = line.decode('utf-8') + if line.startswith('data: '): + data = line[6:] + if data == '[DONE]': + break + try: + chunk = json.loads(data) + if 'choices' in chunk and len(chunk['choices']) > 0: + delta = chunk['choices'][0].get('delta', {}) + content = delta.get('content', '') + if content: + if first_token_time is None: + first_token_time = time.time() + output_text += content + except: + pass + + total_time = time.time() - start_time + ttft = first_token_time - start_time if first_token_time else total_time + + # 估算 token 数量 + tokens_in = len(prompt) // 4 + tokens_out = len(output_text) // 4 + + # 计算 TPS + generation_time = total_time - ttft + tps = tokens_out / generation_time if generation_time > 0 else 0 + + else: + # 非流式模式 + response = requests.post(url, headers=headers, json=payload, timeout=timeout) + response.raise_for_status() + + end_time = time.time() + total_time = end_time - start_time + + data = response.json() + if 'choices' in data and len(data['choices']) > 0: + output_text = data['choices'][0].get('message', {}).get('content', '') + + # 获取 token 使用量 + usage = data.get('usage', {}) + tokens_in = usage.get('prompt_tokens', len(prompt) // 4) + tokens_out = usage.get('completion_tokens', len(output_text) // 4) + + # 非流式模式下 TTFT 约等于总时间 + ttft = total_time + tps = tokens_out / total_time if total_time > 0 else 0 return { - "success": True, - "ttft_ms": round(ttft, 2), - "total_time_ms": round(total_time, 2), - "tps": round(tps, 2), - "total_chars": sum(len(c) for c in content_chunks), - "content": ''.join(content_chunks) + 'success': True, + 'ttft': round(ttft * 1000, 2), # 转换为毫秒 + 'tps': round(tps, 2), + 'total_time': round(total_time, 3), + 'output': output_text, + 'tokens_in': tokens_in, + 'tokens_out': tokens_out } except Exception as e: return { - "success": False, - "error": str(e) + 'success': False, + 'error': str(e), + 'ttft': 0, + 'tps': 0, + 'total_time': 0, + 'output': '', + 'tokens_in': 0, + 'tokens_out': 0 } -def run_single_test(api_config, test_case, run_index=0): - """运行单个测试""" - messages = [{"role": "user", "content": test_case["prompt"]}] - - result = stream_chat_completion( - api_base=api_config["api_base"], - api_key=api_config["api_key"], - model=api_config["model"], - messages=messages, - max_tokens=api_config.get("max_tokens", 512), - temperature=api_config.get("temperature", 0.7), - timeout=api_config.get("timeout", 60) - ) - - result["test_case_id"] = test_case["id"] - result["test_case_name"] = test_case["name"] - result["run_index"] = run_index - result["timestamp"] = datetime.now().isoformat() - - return result - - -def run_batch_tests(api_config, test_cases, runs_per_case=1, concurrency=1): - """批量运行测试""" - all_tasks = [] - for test_case in test_cases: - for i in range(runs_per_case): - all_tasks.append((api_config, test_case, i)) - +def run_single_test(config, test_case, iterations=1, stream=False): + """运行单个测试用例""" results = [] - completed = 0 - total = len(all_tasks) - with ThreadPoolExecutor(max_workers=concurrency) as executor: - futures = {executor.submit(run_single_test, *task): task for task in all_tasks} + for i in range(iterations): + result = call_llm_api(config, test_case['prompt'], stream=stream) + result['iteration'] = i + 1 + results.append(result) + + # 计算统计数据 + successful = [r for r in results if r['success']] + + if successful: + avg_ttft = statistics.mean([r['ttft'] for r in successful]) + avg_tps = statistics.mean([r['tps'] for r in successful]) + avg_total_time = statistics.mean([r['total_time'] for r in successful]) - for future in as_completed(futures): - try: - result = future.result() - results.append(result) - completed += 1 - print(f"Progress: {completed}/{total}") - except Exception as e: - print(f"Test failed: {e}") - - return results - - -def calculate_statistics(results): - """计算统计数据""" - successful = [r for r in results if r.get("success")] - failed = [r for r in results if not r.get("success")] - - if not successful: - return {"error": "No successful tests"} - - ttfts = [r["ttft_ms"] for r in successful] - tpss = [r["tps"] for r in successful] - times = [r["total_time_ms"] for r in successful] - - stats = { - "total_tests": len(results), - "successful": len(successful), - "failed": len(failed), - "success_rate": round(len(successful) / len(results) * 100, 2), - "ttft": { - "avg": round(statistics.mean(ttfts), 2), - "min": round(min(ttfts), 2), - "max": round(max(ttfts), 2), - "median": round(statistics.median(ttfts), 2) - }, - "tps": { - "avg": round(statistics.mean(tpss), 2), - "min": round(min(tpss), 2), - "max": round(max(tpss), 2), - "median": round(statistics.median(tpss), 2) - }, - "total_time": { - "avg": round(statistics.mean(times), 2), - "min": round(min(times), 2), - "max": round(max(times), 2) + stats = { + 'avg_ttft': round(avg_ttft, 2), + 'avg_tps': round(avg_tps, 2), + 'avg_total_time': round(avg_total_time, 3), + 'min_ttft': round(min([r['ttft'] for r in successful]), 2), + 'max_ttft': round(max([r['ttft'] for r in successful]), 2), + 'min_tps': round(min([r['tps'] for r in successful]), 2), + 'max_tps': round(max([r['tps'] for r in successful]), 2), + 'success_rate': round(len(successful) / len(results) * 100, 1) + } + else: + stats = { + 'avg_ttft': 0, + 'avg_tps': 0, + 'avg_total_time': 0, + 'success_rate': 0 } - } - return stats + return { + 'test_case': test_case, + 'iterations': iterations, + 'stream_mode': stream, + 'results': results, + 'statistics': stats + } -# ==================== Flask Routes ==================== +def run_concurrent_tests(config, test_cases, concurrency=1, iterations_per_case=1, stream=False): + """并发运行多个测试用例""" + all_results = [] + + def run_test(tc): + return run_single_test(config, tc, iterations_per_case, stream) + + if concurrency > 1: + with ThreadPoolExecutor(max_workers=concurrency) as executor: + futures = {executor.submit(run_test, tc): tc for tc in test_cases} + for future in as_completed(futures): + result = future.result() + all_results.append(result) + else: + for tc in test_cases: + result = run_test(tc) + all_results.append(result) + + return all_results + @app.route('/') def index(): @@ -286,119 +318,169 @@ def index(): return render_template('index.html') -@app.route('/api/config', methods=['GET', 'POST']) -def config_api(): - """配置管理 API""" - if request.method == 'GET': - return jsonify(load_config()) - else: - new_config = request.json - save_config(new_config) - return jsonify({"status": "success"}) +@app.route('/api/config', methods=['GET']) +def get_config(): + """获取配置""" + return jsonify(load_config()) -@app.route('/api/test-cases', methods=['GET', 'POST', 'PUT', 'DELETE']) -def test_cases_api(): - """测试用例管理 API""" - if request.method == 'GET': - return jsonify(load_test_cases()) - - elif request.method == 'POST': - test_cases = load_test_cases() - new_case = request.json - new_case['id'] = f"tc_{uuid.uuid4().hex[:6]}" - test_cases.append(new_case) - save_test_cases(test_cases) - return jsonify({"status": "success", "id": new_case['id']}) - - elif request.method == 'PUT': - updated_case = request.json - test_cases = load_test_cases() - for i, tc in enumerate(test_cases): - if tc['id'] == updated_case['id']: - test_cases[i] = updated_case - break - save_test_cases(test_cases) - return jsonify({"status": "success"}) - - elif request.method == 'DELETE': - case_id = request.args.get('id') - test_cases = load_test_cases() - test_cases = [tc for tc in test_cases if tc['id'] != case_id] - save_test_cases(test_cases) - return jsonify({"status": "success"}) +@app.route('/api/config', methods=['POST']) +def update_config(): + """更新配置""" + config = request.json + save_config(config) + return jsonify({'status': 'success'}) + + +@app.route('/api/test-cases', methods=['GET']) +def get_test_cases(): + """获取测试用例""" + return jsonify(load_test_cases()) + + +@app.route('/api/test-cases', methods=['POST']) +def update_test_cases(): + """更新测试用例""" + test_cases = request.json + save_test_cases(test_cases) + return jsonify({'status': 'success'}) + + +@app.route('/api/test-cases/', methods=['DELETE']) +def delete_test_case(case_id): + """删除测试用例""" + test_cases = load_test_cases() + test_cases = [tc for tc in test_cases if tc['id'] != case_id] + save_test_cases(test_cases) + return jsonify({'status': 'success'}) @app.route('/api/run-test', methods=['POST']) -def run_test_api(): - """运行测试 API""" +def run_test(): + """运行测试""" data = request.json - api_config = data.get('config', load_config()) + config = data.get('config', load_config()) test_case_ids = data.get('test_case_ids', []) - runs_per_case = data.get('runs_per_case', 1) + iterations = data.get('iterations', 1) concurrency = data.get('concurrency', 1) + stream = data.get('stream', False) - # 获取要运行的测试用例 + # 加载选中的测试用例 all_test_cases = load_test_cases() if test_case_ids: - test_cases = [tc for tc in all_test_cases if tc['id'] in test_case_ids] + selected_cases = [tc for tc in all_test_cases if tc['id'] in test_case_ids] else: - test_cases = all_test_cases + selected_cases = all_test_cases - if not test_cases: - return jsonify({"error": "No test cases selected"}), 400 + if not selected_cases: + return jsonify({'error': 'No test cases selected'}), 400 + + # 生成测试ID + test_id = generate_id() # 运行测试 - results = run_batch_tests(api_config, test_cases, runs_per_case, concurrency) + start_time = time.time() + results = run_concurrent_tests(config, selected_cases, concurrency, iterations, stream) + end_time = time.time() - # 计算统计 - stats = calculate_statistics(results) + # 汇总结果 + summary = { + 'test_id': test_id, + 'timestamp': datetime.now().isoformat(), + 'config': config, + 'test_cases_count': len(selected_cases), + 'iterations': iterations, + 'concurrency': concurrency, + 'stream_mode': stream, + 'total_duration': round(end_time - start_time, 2), + 'results': results + } # 保存结果 - test_run = { - "id": f"run_{uuid.uuid4().hex[:8]}", - "timestamp": datetime.now().isoformat(), - "config": api_config, - "stats": stats, - "results": results - } - save_result(test_run) + save_result(summary) - return jsonify(test_run) + # 缓存结果 + with results_lock: + results_cache[test_id] = summary + + return jsonify(summary) @app.route('/api/results', methods=['GET']) -def get_results_api(): +def get_results(): """获取历史测试结果""" return jsonify(load_results()) -@app.route('/api/results/', methods=['GET']) -def get_result_detail_api(result_id): - """获取单个测试结果详情""" +@app.route('/api/results/', methods=['GET']) +def get_result(test_id): + """获取单个测试结果""" + # 先查缓存 + with results_lock: + if test_id in results_cache: + return jsonify(results_cache[test_id]) + + # 再查文件 results = load_results() for result in results: - if result.get('id') == result_id: + if result.get('test_id') == test_id: return jsonify(result) - return jsonify({"error": "Result not found"}), 404 + + return jsonify({'error': 'Result not found'}), 404 -@app.route('/api/results/', methods=['DELETE']) -def delete_result_api(result_id): +@app.route('/api/results/', methods=['DELETE']) +def delete_result(test_id): """删除测试结果""" + results = load_results() + results = [r for r in results if r.get('test_id') != test_id] + with open(RESULTS_FILE, 'w', encoding='utf-8') as f: + json.dump(results, f, ensure_ascii=False, indent=2) + with results_lock: - results = load_results() - results = [r for r in results if r.get('id') != result_id] - with open(RESULTS_FILE, 'w', encoding='utf-8') as f: - json.dump(results, f, ensure_ascii=False, indent=2) - return jsonify({"status": "success"}) + if test_id in results_cache: + del results_cache[test_id] + + return jsonify({'status': 'success'}) + + +@app.route('/api/verify-config', methods=['POST']) +def verify_config(): + """验证 API 配置是否可用""" + config = request.json + + try: + api_base = config.get('api_base', '').rstrip('/') + api_key = config.get('api_key', '') + model = config.get('model', '') + + headers = {'Content-Type': 'application/json'} + if api_key: + headers['Authorization'] = 'Bearer ' + api_key + + # 尝试获取模型列表或进行简单调用 + url = api_base + '/models' + response = requests.get(url, headers=headers, timeout=10) + + if response.status_code == 200: + return jsonify({'status': 'success', 'message': 'Connection successful'}) + else: + # 尝试一个简单的 completion 调用来验证 + test_payload = { + 'model': model, + 'messages': [{'role': 'user', 'content': 'Hi'}], + 'max_tokens': 5 + } + test_response = requests.post(api_base + '/chat/completions', + headers=headers, json=test_payload, timeout=10) + if test_response.status_code == 200: + return jsonify({'status': 'success', 'message': 'Connection successful'}) + else: + return jsonify({'status': 'error', 'message': 'API returned status ' + str(test_response.status_code)}) + + except Exception as e: + return jsonify({'status': 'error', 'message': str(e)}) if __name__ == '__main__': - # 初始化默认配置和测试用例 - if not os.path.exists(CONFIG_FILE): - save_config(DEFAULT_CONFIG) - if not os.path.exists(TEST_CASES_FILE): - save_test_cases(DEFAULT_TEST_CASES) - app.run(host='0.0.0.0', port=8001, debug=True) diff --git a/requirements.txt b/requirements.txt index 64fae30..05a1f4d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,9 @@ -flask==3.0.0 -requests==2.31.0 -openai==1.6.0 -plotly==5.18.0 -pandas==2.1.4 -numpy==1.26.2 -gunicorn==21.2.0 -python-dotenv==1.0.0 +flask==2.0.3 +requests==2.27.1 +openai==0.28.0 +plotly==5.3.0 +pandas==1.1.5 +numpy==1.19.5 +gunicorn==20.1.0 +python-dotenv==0.19.0 +werkzeug==2.0.3