# Summary
本质是[Transformers](Transformers.md)这种通用推理框架是针对所有[深度学习@](深度学习@.md)任务的,而 vllm 针对[LLM大模型](LLM大模型.md)这种特殊任务做了特殊优化,所以更快。
1. 自动批处理
2. [GPU Kernel 内核](GPU%20Kernel%20内核.md)
3. [PagedAttention](PagedAttention.md) 分页注意力
# Cues
你这个是**“为什么后端从 FlashAttention-2 降级到 xFormers 的经典问题变体”**。
简明答案:**因为你用的是 T4(Turing,SM_75)**。
FlashAttention-2 需要 **Ampere/Hopper 级别的 GPU(≥SM_80,如 A100/A10/L4/H100)** 才能启用其关键指令路径(如 `cp.async` 等)。在 **Volta(V100, SM_70)** 和 **Turing(T4, SM_75)** 上不满足硬件要求,vLLM 就会自动回退到 **xFormers** 后端,所以日志里写着:
# Notes
## 部署参数
**`gpu-memory-utilization` 会影响推理速度,但影响程度取决于使用场景:**
1. **并发处理能力**
- 较高(0.9):能同时处理更多请求,吞吐量更高
- 较低(0.5):并发请求数受限,吞吐量可能降低
2. **KV Cache 大小**
- 较高:可缓存更多 KV,减少重复计算,提升长文本生成效率
- 较低:KV Cache 受限,长文本或长上下文可能受影响
3. **单请求延迟**
- 对单个请求的延迟影响通常较小,主要影响并发吞吐
## 针对你的情况(Qwen3-4B 模型)
- 4B 参数量较小,0.5 利用率通常足够
- 单请求延迟通常无明显影响
- 并发处理能力可能略降,但仍能支持一定并发
- 如果是长文本生成或高并发,建议适当提高
## 建议
- 如果只是单请求或少并发:`0.5` 通常足够,速度影响不明显
- 如果需要高并发或长文本:尽量提高(如 `0.7-0.8`),前提是 GPU 显存允许
**总结**:0.5 不会显著影响单请求速度;在高并发或长文本场景下,适当提高利用率能提升整体吞吐。
## 部署脚本参考
```python
#!/usr/bin/env python3
"""
vLLM 本地模型部署脚本
功能:启动OpenAI兼容服务器 + 测试
适用于已下载到本地的模型
"""
import os
import sys
import time
import json
import subprocess
import argparse
from pathlib import Path
from typing import Optional
# ============= 配置类 =============
class Config:
"""配置管理"""
def __init__(self, verbose: bool = False):
self.project_dir = Path.home() / "qwen-vllm-server"
# 自动查找 qwen3-4b 目录
possible_paths = [
Path.cwd() / "qwen3-4b", # 当前目录下
Path.home() / "qwen3-4b", # home目录下
Path("/home/yishou.liu/model/qwen3-4b/qwen3-4b"), # 完整路径
]
self.model_path = None
if verbose:
print_info("正在查找模型路径...")
for path in possible_paths:
if verbose:
print_info(f" 尝试: {path} -> 存在: {path.exists()}")
if path.exists() and (path / "config.json").exists():
self.model_path = str(path.resolve()) # 使用绝对路径
if verbose:
print_success(f" 找到模型: {self.model_path}")
break
if self.model_path is None:
self.model_path = str(Path.cwd() / "qwen3-4b") # 默认使用当前目录
if verbose:
print_warning(f" 未找到模型,使用默认路径: {self.model_path}")
self.model_name = "Qwen3-4B" # 服务中显示的模型名
self.port = 8000
self.host = "0.0.0.0"
self.gpu_ids = "0" # 使用的GPU,多卡用逗号分隔 "0,1,2,3"
self.tensor_parallel_size = 1 # 张量并行度
self.gpu_memory_utilization = 0.9
self.max_model_len = 4096
def save(self, filepath: Path):
"""保存配置到文件"""
config_dict = {
'model_path': self.model_path,
'model_name': self.model_name,
'port': self.port,
'host': self.host,
'gpu_ids': self.gpu_ids,
'tensor_parallel_size': self.tensor_parallel_size,
'gpu_memory_utilization': self.gpu_memory_utilization,
'max_model_len': self.max_model_len,
}
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(config_dict, f, indent=2, ensure_ascii=False)
print(f"✅ 配置已保存: {filepath}")
@classmethod
def load(cls, filepath: Path):
"""从文件加载配置"""
with open(filepath, 'r', encoding='utf-8') as f:
config_dict = json.load(f)
config = cls()
config.model_path = config_dict.get('model_path', config.model_path)
config.model_name = config_dict.get('model_name', config.model_name)
config.port = config_dict.get('port', config.port)
config.host = config_dict.get('host', config.host)
config.gpu_ids = config_dict.get('gpu_ids', config.gpu_ids)
config.tensor_parallel_size = config_dict.get('tensor_parallel_size', config.tensor_parallel_size)
config.gpu_memory_utilization = config_dict.get('gpu_memory_utilization', config.gpu_memory_utilization)
config.max_model_len = config_dict.get('max_model_len', config.max_model_len)
return config
# ============= 工具函数 =============
def print_header(text: str):
"""打印标题"""
print(f"\n{'='*70}")
print(f" {text}")
print(f"{'='*70}")
def print_info(text: str):
"""打印信息"""
print(f"ℹ️ {text}")
def print_success(text: str):
"""打印成功信息"""
print(f"✅ {text}")
def print_error(text: str):
"""打印错误信息"""
print(f"❌ {text}")
def print_warning(text: str):
"""打印警告信息"""
print(f"⚠️ {text}")
# ============= 模型验证 =============
def validate_model_path(model_path: str, verbose: bool = True) -> bool:
"""验证模型路径是否有效"""
path = Path(model_path)
if verbose:
print_info(f"正在验证模型路径: {path}")
print_info(f"路径是否存在: {path.exists()}")
if not path.exists():
print_error(f"模型路径不存在: {model_path}")
return False
# 检查必要的模型文件
required_files = ['config.json'] # 只检查最基本的文件
safetensors_files = list(path.glob('*.safetensors'))
if verbose:
print_info(f"目录内容: {list(path.iterdir())[:5]}...") # 显示前5个文件
for req_file in required_files:
file_path = path / req_file
if verbose:
print_info(f"检查文件 {req_file}: {file_path.exists()}")
if not file_path.exists():
print_error(f"缺少必要文件: {req_file}")
return False
if not safetensors_files:
print_error("未找到 .safetensors 模型文件")
return False
print_success(f"模型验证通过: {model_path}")
print_info(f"找到 {len(safetensors_files)} 个模型分片文件")
return True
# ============= 服务器管理 =============
class VLLMServer:
"""vLLM服务器管理"""
def __init__(self, config: Config):
self.config = config
def start(self):
"""启动vLLM服务器"""
print_header("启动 vLLM OpenAI 兼容服务器")
# 验证模型路径
if not validate_model_path(self.config.model_path):
print_error("模型验证失败,请检查模型路径")
sys.exit(1)
print_info("服务器配置:")
print(f" 模型路径: {self.config.model_path}")
print(f" 模型名称: {self.config.model_name}")
print(f" 服务地址: http://{self.config.host}:{self.config.port}")
print(f" GPU: {self.config.gpu_ids}")
print(f" 并行度: {self.config.tensor_parallel_size}")
print(f" 显存利用率: {self.config.gpu_memory_utilization}")
print(f" 最大上下文: {self.config.max_model_len}")
# 设置环境变量
os.environ['CUDA_VISIBLE_DEVICES'] = self.config.gpu_ids
# 构建启动命令
cmd = [
sys.executable, "-m", "vllm.entrypoints.openai.api_server",
"--model", self.config.model_path,
"--host", self.config.host,
"--port", str(self.config.port),
"--trust-remote-code",
"--tensor-parallel-size", str(self.config.tensor_parallel_size),
"--gpu-memory-utilization", str(self.config.gpu_memory_utilization),
"--max-model-len", str(self.config.max_model_len),
"--dtype", "auto",
"--served-model-name", self.config.model_name,
]
print_info("\n启动命令:")
print(" ".join(cmd))
print("\n" + "="*70)
print_warning("服务器启动中,按 Ctrl+C 停止服务")
print("="*70 + "\n")
try:
# 启动服务器
subprocess.run(cmd, check=True)
except KeyboardInterrupt:
print_info("\n收到停止信号,正在关闭服务器...")
except Exception as e:
print_error(f"服务器启动失败: {e}")
sys.exit(1)
# ============= 测试客户端 =============
class TestClient:
"""测试客户端"""
def __init__(self, config: Config):
self.config = config
self.base_url = f"http://localhost:{config.port}/v1"
def test_all(self):
"""运行所有测试"""
print_header("测试服务器")
if not self._wait_for_server():
print_error("服务器未响应,请先启动服务器")
return False
success = True
success = self._test_connection() and success
success = self._test_chat_completion() and success
success = self._test_streaming() and success
success = self._test_batch() and success
if success:
print_success("\n所有测试通过!🎉")
else:
print_warning("\n部分测试失败")
return success
def _wait_for_server(self, timeout: int = 60):
"""等待服务器启动"""
print_info(f"等待服务器启动(超时{timeout}秒)...")
import requests
start_time = time.time()
while time.time() - start_time < timeout:
try:
response = requests.get(f"http://localhost:{self.config.port}/health", timeout=2)
if response.status_code == 200:
print_success("服务器已就绪")
return True
except:
pass
time.sleep(2)
return False
def _test_connection(self):
"""测试连接"""
print("\n" + "-"*70)
print("测试1: 连接服务器")
print("-"*70)
try:
from openai import OpenAI
client = OpenAI(base_url=self.base_url, api_key="EMPTY")
models = client.models.list()
print_success("连接成功")
print(f"可用模型: {[m.id for m in models.data]}")
return True
except Exception as e:
print_error(f"连接失败: {e}")
return False
def _test_chat_completion(self):
"""测试对话补全"""
print("\n" + "-"*70)
print("测试2: 对话补全")
print("-"*70)
try:
from openai import OpenAI
client = OpenAI(base_url=self.base_url, api_key="EMPTY")
print_info("发送请求: 用一句话介绍你自己")
response = client.chat.completions.create(
model=self.config.model_name,
messages=[
{"role": "system", "content": "你是一个有帮助的AI助手"},
{"role": "user", "content": "用一句话介绍你自己"}
],
temperature=0.7,
max_tokens=100,
)
print_success("对话成功")
print(f"回答: {response.choices[0].message.content}")
print(f"Token使用: prompt={response.usage.prompt_tokens}, "
f"completion={response.usage.completion_tokens}, "
f"total={response.usage.total_tokens}")
return True
except Exception as e:
print_error(f"对话失败: {e}")
return False
def _test_streaming(self):
"""测试流式输出"""
print("\n" + "-"*70)
print("测试3: 流式输出")
print("-"*70)
try:
from openai import OpenAI
client = OpenAI(base_url=self.base_url, api_key="EMPTY")
print_info("发送请求: 从1数到5")
print("AI: ", end="", flush=True)
stream = client.chat.completions.create(
model=self.config.model_name,
messages=[{"role": "user", "content": "从1数到5,每个数字用逗号分隔"}],
temperature=0.7,
max_tokens=50,
stream=True,
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
print("\n" + "✅ 流式输出成功")
return True
except Exception as e:
print(f"\n❌ 流式输出失败: {e}")
return False
def _test_batch(self):
"""测试批量推理"""
print("\n" + "-"*70)
print("测试4: 批量推理")
print("-"*70)
try:
from openai import OpenAI
client = OpenAI(base_url=self.base_url, api_key="EMPTY")
questions = [
"1+1等于几?",
"Python是什么?",
"什么是AI?",
]
print_info(f"批量发送 {len(questions)} 个请求")
start_time = time.time()
for i, question in enumerate(questions, 1):
response = client.chat.completions.create(
model=self.config.model_name,
messages=[{"role": "user", "content": question}],
temperature=0.7,
max_tokens=50,
)
print(f" {i}. Q: {question}")
print(f" A: {response.choices[0].message.content[:50]}...")
elapsed = time.time() - start_time
print_success(f"批量推理完成,耗时: {elapsed:.2f}秒")
return True
except Exception as e:
print_error(f"批量推理失败: {e}")
return False
# ============= 命令行接口 =============
def create_parser():
"""创建命令行参数解析器"""
parser = argparse.ArgumentParser(
description="vLLM 本地模型部署脚本",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
# 启动服务器(使用默认配置)
python %(prog)s serve
# 测试服务器
python %(prog)s test
# 自定义配置启动
python %(prog)s serve --model-path ~/qwen3-4b --port 8001 --gpu-ids 0,1
# 后台启动
nohup python %(prog)s serve > server.log 2>&1 &
"""
)
parser.add_argument(
'command',
choices=['serve', 'test', 'config'],
help='执行的命令'
)
parser.add_argument('--model-path', default=None,
help='本地模型路径 (默认: 自动查找 qwen3-4b)')
parser.add_argument('--model-name', default='Qwen3-4B',
help='服务中显示的模型名称 (默认: Qwen3-4B)')
parser.add_argument('--port', type=int, default=8000,
help='服务器端口 (默认: 8000)')
parser.add_argument('--host', default='0.0.0.0',
help='服务器地址 (默认: 0.0.0.0)')
parser.add_argument('--gpu-ids', default='0',
help='使用的GPU ID,多卡用逗号分隔 (默认: 0)')
parser.add_argument('--tensor-parallel-size', type=int, default=1,
help='张量并行度 (默认: 1)')
parser.add_argument('--gpu-memory-utilization', type=float, default=0.9,
help='GPU显存利用率 (默认: 0.9)')
parser.add_argument('--max-model-len', type=int, default=4096,
help='最大上下文长度 (默认: 4096)')
return parser
# ============= 主函数 =============
def main():
"""主函数"""
parser = create_parser()
args = parser.parse_args()
# 创建配置
config = Config(verbose=True) # 启用调试信息
if args.model_path:
config.model_path = args.model_path
config.model_name = args.model_name
config.port = args.port
config.host = args.host
config.gpu_ids = args.gpu_ids
config.tensor_parallel_size = args.tensor_parallel_size
config.gpu_memory_utilization = args.gpu_memory_utilization
config.max_model_len = args.max_model_len
# 创建项目目录
config.project_dir.mkdir(parents=True, exist_ok=True)
config_file = config.project_dir / "config.json"
# 执行命令
if args.command == 'config':
# 保存配置
config.save(config_file)
print_success("配置文件已创建")
print_info(f"位置: {config_file}")
elif args.command == 'serve':
# 启动服务器
config.save(config_file)
server = VLLMServer(config)
server.start()
elif args.command == 'test':
# 测试服务器
if config_file.exists():
config = Config.load(config_file)
tester = TestClient(config)
success = tester.test_all()
sys.exit(0 if success else 1)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print_info("\n用户中断")
sys.exit(0)
except Exception as e:
print_error(f"程序错误: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
```
```python
#!/usr/bin/env python3
"""
vLLM + 魔搭 完整部署脚本
功能:下载模型 + 启动OpenAI兼容服务器 + 测试
"""
import os
import sys
import time
import json
import subprocess
import argparse
from pathlib import Path
from typing import Optional
# ============= 配置类 =============
class Config:
"""配置管理"""
def __init__(self):
self.project_dir = Path.home() / "qwen-vllm-server"
self.model_name = "Qwen/Qwen3-4B"
self.model_cache_dir = self.project_dir / "models"
self.port = 8000
self.host = "0.0.0.0"
self.gpu_ids = "0" # 使用的GPU,多卡用逗号分隔 "0,1,2,3"
self.tensor_parallel_size = 1 # 张量并行度
self.gpu_memory_utilization = 0.9
self.max_model_len = 4096
def save(self, filepath: Path):
"""保存配置到文件"""
config_dict = {
'model_name': self.model_name,
'model_cache_dir': str(self.model_cache_dir),
'port': self.port,
'host': self.host,
'gpu_ids': self.gpu_ids,
'tensor_parallel_size': self.tensor_parallel_size,
'gpu_memory_utilization': self.gpu_memory_utilization,
'max_model_len': self.max_model_len,
}
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(config_dict, f, indent=2, ensure_ascii=False)
print(f"✅ 配置已保存: {filepath}")
@classmethod
def load(cls, filepath: Path):
"""从文件加载配置"""
with open(filepath, 'r', encoding='utf-8') as f:
config_dict = json.load(f)
config = cls()
config.model_name = config_dict.get('model_name', config.model_name)
config.model_cache_dir = Path(config_dict.get('model_cache_dir', config.model_cache_dir))
config.port = config_dict.get('port', config.port)
config.host = config_dict.get('host', config.host)
config.gpu_ids = config_dict.get('gpu_ids', config.gpu_ids)
config.tensor_parallel_size = config_dict.get('tensor_parallel_size', config.tensor_parallel_size)
config.gpu_memory_utilization = config_dict.get('gpu_memory_utilization', config.gpu_memory_utilization)
config.max_model_len = config_dict.get('max_model_len', config.max_model_len)
return config
# ============= 工具函数 =============
def print_header(text: str):
"""打印标题"""
print(f"\n{'='*70}")
print(f" {text}")
print(f"{'='*70}")
def print_info(text: str):
"""打印信息"""
print(f"ℹ️ {text}")
def print_success(text: str):
"""打印成功信息"""
print(f"✅ {text}")
def print_error(text: str):
"""打印错误信息"""
print(f"❌ {text}")
def print_warning(text: str):
"""打印警告信息"""
print(f"⚠️ {text}")
# ============= 模型下载 =============
class ModelDownloader:
"""模型下载器"""
def __init__(self, config: Config):
self.config = config
self.model_path_file = config.project_dir / "model_path.txt"
def download(self) -> str:
"""从魔搭下载模型"""
print_header("步骤1: 下载模型")
# 创建缓存目录
self.config.model_cache_dir.mkdir(parents=True, exist_ok=True)
# 检查是否已下载
if self.model_path_file.exists():
model_path = self.model_path_file.read_text().strip()
if Path(model_path).exists():
print_info(f"模型已存在: {model_path}")
return model_path
print_info(f"开始下载模型: {self.config.model_name}")
print_info(f"缓存目录: {self.config.model_cache_dir}")
print_warning("首次下载可能需要较长时间,请耐心等待...")
try:
from modelscope import snapshot_download
start_time = time.time()
model_path = snapshot_download(
self.config.model_name,
cache_dir=str(self.config.model_cache_dir),
)
elapsed = time.time() - start_time
# 保存模型路径
self.model_path_file.write_text(model_path)
print_success(f"模型下载完成!耗时: {elapsed:.1f}秒")
print_success(f"模型路径: {model_path}")
return model_path
except Exception as e:
print_error(f"模型下载失败: {e}")
sys.exit(1)
def get_model_path(self) -> Optional[str]:
"""获取已下载的模型路径"""
if self.model_path_file.exists():
return self.model_path_file.read_text().strip()
return None
# ============= 服务器管理 =============
class VLLMServer:
"""vLLM服务器管理"""
def __init__(self, config: Config, model_path: str):
self.config = config
self.model_path = model_path
def start(self):
"""启动vLLM服务器"""
print_header("步骤2: 启动vLLM OpenAI兼容服务器")
print_info("服务器配置:")
print(f" 模型: {self.model_path}")
print(f" 地址: http://{self.config.host}:{self.config.port}")
print(f" GPU: {self.config.gpu_ids}")
print(f" 并行度: {self.config.tensor_parallel_size}")
print(f" 显存利用率: {self.config.gpu_memory_utilization}")
print(f" 最大上下文: {self.config.max_model_len}")
# 设置环境变量
os.environ['CUDA_VISIBLE_DEVICES'] = self.config.gpu_ids
# 构建启动命令
cmd = [
sys.executable, "-m", "vllm.entrypoints.openai.api_server",
"--model", self.model_path,
"--host", self.config.host,
"--port", str(self.config.port),
"--trust-remote-code",
"--tensor-parallel-size", str(self.config.tensor_parallel_size),
"--gpu-memory-utilization", str(self.config.gpu_memory_utilization),
"--max-model-len", str(self.config.max_model_len),
"--dtype", "auto",
"--served-model-name", self.config.model_name,
]
print_info("\n启动命令:")
print(" ".join(cmd))
print("\n" + "="*70)
print_warning("服务器启动中,按 Ctrl+C 停止服务")
print("="*70 + "\n")
try:
# 启动服务器
subprocess.run(cmd, check=True)
except KeyboardInterrupt:
print_info("\n收到停止信号,正在关闭服务器...")
except Exception as e:
print_error(f"服务器启动失败: {e}")
sys.exit(1)
# ============= 测试客户端 =============
class TestClient:
"""测试客户端"""
def __init__(self, config: Config):
self.config = config
self.base_url = f"http://localhost:{config.port}/v1"
def test_all(self):
"""运行所有测试"""
print_header("步骤3: 测试服务器")
if not self._wait_for_server():
print_error("服务器未响应,请先启动服务器")
return False
success = True
success = self._test_connection() and success
success = self._test_chat_completion() and success
success = self._test_streaming() and success
success = self._test_batch() and success
if success:
print_success("\n所有测试通过!🎉")
else:
print_warning("\n部分测试失败")
return success
def _wait_for_server(self, timeout: int = 60):
"""等待服务器启动"""
print_info(f"等待服务器启动(超时{timeout}秒)...")
import requests
start_time = time.time()
while time.time() - start_time < timeout:
try:
response = requests.get(f"http://localhost:{self.config.port}/health", timeout=2)
if response.status_code == 200:
print_success("服务器已就绪")
return True
except:
pass
time.sleep(2)
return False
def _test_connection(self):
"""测试连接"""
print("\n" + "-"*70)
print("测试1: 连接服务器")
print("-"*70)
try:
from openai import OpenAI
client = OpenAI(base_url=self.base_url, api_key="EMPTY")
models = client.models.list()
print_success("连接成功")
print(f"可用模型: {[m.id for m in models.data]}")
return True
except Exception as e:
print_error(f"连接失败: {e}")
return False
def _test_chat_completion(self):
"""测试对话补全"""
print("\n" + "-"*70)
print("测试2: 对话补全")
print("-"*70)
try:
from openai import OpenAI
client = OpenAI(base_url=self.base_url, api_key="EMPTY")
print_info("发送请求: 用一句话介绍你自己")
response = client.chat.completions.create(
model=self.config.model_name,
messages=[
{"role": "system", "content": "你是一个有帮助的AI助手"},
{"role": "user", "content": "用一句话介绍你自己"}
],
temperature=0.7,
max_tokens=100,
)
print_success("对话成功")
print(f"回答: {response.choices[0].message.content}")
print(f"Token使用: prompt={response.usage.prompt_tokens}, "
f"completion={response.usage.completion_tokens}, "
f"total={response.usage.total_tokens}")
return True
except Exception as e:
print_error(f"对话失败: {e}")
return False
def _test_streaming(self):
"""测试流式输出"""
print("\n" + "-"*70)
print("测试3: 流式输出")
print("-"*70)
try:
from openai import OpenAI
client = OpenAI(base_url=self.base_url, api_key="EMPTY")
print_info("发送请求: 从1数到5")
print("AI: ", end="", flush=True)
stream = client.chat.completions.create(
model=self.config.model_name,
messages=[{"role": "user", "content": "从1数到5,每个数字用逗号分隔"}],
temperature=0.7,
max_tokens=50,
stream=True,
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
print("\n" + "✅ 流式输出成功")
return True
except Exception as e:
print(f"\n❌ 流式输出失败: {e}")
return False
def _test_batch(self):
"""测试批量推理"""
print("\n" + "-"*70)
print("测试4: 批量推理")
print("-"*70)
try:
from openai import OpenAI
client = OpenAI(base_url=self.base_url, api_key="EMPTY")
questions = [
"1+1等于几?",
"Python是什么?",
"什么是AI?",
]
print_info(f"批量发送 {len(questions)} 个请求")
start_time = time.time()
for i, question in enumerate(questions, 1):
response = client.chat.completions.create(
model=self.config.model_name,
messages=[{"role": "user", "content": question}],
temperature=0.7,
max_tokens=50,
)
print(f" {i}. Q: {question}")
print(f" A: {response.choices[0].message.content[:50]}...")
elapsed = time.time() - start_time
print_success(f"批量推理完成,耗时: {elapsed:.2f}秒")
return True
except Exception as e:
print_error(f"批量推理失败: {e}")
return False
# ============= 命令行接口 =============
def create_parser():
"""创建命令行参数解析器"""
parser = argparse.ArgumentParser(
description="vLLM + 魔搭 一键部署脚本",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
# 下载模型
python %(prog)s download
# 启动服务器
python %(prog)s serve
# 测试服务器
python %(prog)s test
# 一键部署(下载+启动)
python %(prog)s deploy
# 自定义配置
python %(prog)s serve --port 8001 --gpu-ids 0,1 --tensor-parallel-size 2
"""
)
parser.add_argument(
'command',
choices=['download', 'serve', 'test', 'deploy', 'config'],
help='执行的命令'
)
parser.add_argument('--model-name', default='Qwen/Qwen3-4B',
help='模型名称 (默认: Qwen/Qwen3-4B)')
parser.add_argument('--port', type=int, default=8000,
help='服务器端口 (默认: 8000)')
parser.add_argument('--host', default='0.0.0.0',
help='服务器地址 (默认: 0.0.0.0)')
parser.add_argument('--gpu-ids', default='0',
help='使用的GPU ID,多卡用逗号分隔 (默认: 0)')
parser.add_argument('--tensor-parallel-size', type=int, default=1,
help='张量并行度 (默认: 1)')
parser.add_argument('--gpu-memory-utilization', type=float, default=0.9,
help='GPU显存利用率 (默认: 0.9)')
parser.add_argument('--max-model-len', type=int, default=4096,
help='最大上下文长度 (默认: 4096)')
return parser
# ============= 主函数 =============
def main():
"""主函数"""
parser = create_parser()
args = parser.parse_args()
# 创建配置
config = Config()
config.model_name = args.model_name
config.port = args.port
config.host = args.host
config.gpu_ids = args.gpu_ids
config.tensor_parallel_size = args.tensor_parallel_size
config.gpu_memory_utilization = args.gpu_memory_utilization
config.max_model_len = args.max_model_len
# 创建项目目录
config.project_dir.mkdir(parents=True, exist_ok=True)
config_file = config.project_dir / "config.json"
# 执行命令
if args.command == 'config':
# 保存配置
config.save(config_file)
print_success("配置文件已创建")
print_info(f"位置: {config_file}")
elif args.command == 'download':
# 只下载模型
downloader = ModelDownloader(config)
downloader.download()
config.save(config_file)
elif args.command == 'serve':
# 启动服务器
downloader = ModelDownloader(config)
model_path = downloader.get_model_path()
if model_path is None:
print_warning("模型未下载,开始下载...")
model_path = downloader.download()
config.save(config_file)
server = VLLMServer(config, model_path)
server.start()
elif args.command == 'test':
# 测试服务器
if config_file.exists():
config = Config.load(config_file)
tester = TestClient(config)
success = tester.test_all()
sys.exit(0 if success else 1)
elif args.command == 'deploy':
# 一键部署
print_header("vLLM + 魔搭 一键部署")
# 1. 下载模型
downloader = ModelDownloader(config)
model_path = downloader.download()
# 2. 保存配置
config.save(config_file)
# 3. 提示后续步骤
print_header("部署完成")
print_info("下一步:")
print(f" 1. 启动服务器: python {sys.argv[0]} serve")
print(f" 2. 新开终端测试: python {sys.argv[0]} test")
print(f"\n或使用后台启动:")
print(f" nohup python {sys.argv[0]} serve > server.log 2>&1 &")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print_info("\n用户中断")
sys.exit(0)
except Exception as e:
print_error(f"程序错误: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
```
从零实现 vLLM (1.3):如何加速 Attention 计算:[https://mp.weixin.qq.com/s/BdWG6_ZTaGRknmsbGfFkMQ…](https://t.co/J49S4ccB3g) 从零实现 vLLM (1.2):如何实现张量并行:[https://mp.weixin.qq.com/s/8BVEVPPqDQhQ2l8L90dMNQ…](https://t.co/wnbHusOQwO)
**从零实现 vLLM (1.1):并行词嵌入**https://mp.weixin.qq.com/s/h1cFYDNxcHC30APcarF47A
llm = LLM(
model=MODEL_PATH,
tokenizer=MODEL_PATH,
trust_remote_code=True,
dtype="half",
quantization="gptq", # [模型量化](模型量化.md)的方式
max_model_len=1024,
tensor_parallel_size=max(1, torch.cuda.device_count()),
gpu_memory_utilization=0.9,
tensor_parallel_size=1, # [GPU 显卡](GPU%20显卡.md)并行数量
)
llm_config = {
"model": MODEL_PATH,
"trust_remote_code": True,
"dtype": "half", # FP16 权重
"max_model_len": 512, # 从 2048 降到 512
"gpu_memory_utilization": 0.72, # 从 0.90 降到 0.72
"tensor_parallel_size": tensor_parallel_size,
"disable_log_stats": True,
"swap_space": 8, # 8 GiB CPU 交换空间
"kv_cache_dtype": "fp8", # FP8 KV Cache(大幅省显存)
"max_num_batched_tokens": 512, # 控制批量总 token
"max_num_seqs": 16, # 控制并发序列
"enforce_eager": True, # 避免 CUDA graph 额外占用
"distributed_executor_backend": "mp", # ★ 强制用 MP,绕开 Ray
}
vLLM 是一个开源的 Python 库,用于快速、高效地部署大型语言模型(LLMs)。它的主要目标是显著提升 LLMs 的推理速度和吞吐量,使其更适用于生产环境。
以下是 vLLM 的一些关键特点和功能:
- **高效的内存管理:**
- vLLM 使用一种称为 [[PagedAttention]] 的创新技术,它可以有效地管理注意力机制中的内存,避免浪费。
- Paged Attention 允许 vLLM 连续处理更长的序列,并支持更高的吞吐量。
- **快速推理:**
- 通过优化的内核和高效的内存管理,vLLM 实现了比传统 LLM 服务框架更高的推理速度。
- 通过最优的batching和cache机制实现了更高级的推理效率。
- **易于使用:**
- vLLM 提供了一个简单的 API,可以轻松地将 LLMs 部署为服务。
- 它支持各种流行的 LLM 架构,并且可以与 Hugging Face Transformers 等库集成。
- **支持多种解码算法:**
- 支持各种解码算法,其中包括贪婪解码,采样,beam search等等。
- **应用场景:**
- vLLM 适用于需要高性能 LLM 推理的各种应用程序,例如聊天机器人、文本生成和问答系统。
简而言之,vLLM 的主要优势在于它能够显著提高大型语言模型推理的效率,从而降低延迟并提高吞吐量,这使得它在大规模部署 LLM 时非常有价值。