简介
调试方法分类
graph TD
A[调试方法] --> B[日志调试]
A --> C[性能分析]
A --> D[断点调试]
A --> E[可视化调试]
A --> F[远程调试]
B --> B1[日志输出]
B --> B2[日志分析]
B --> B3[日志过滤]
C --> C1[性能监控]
C --> C2[瓶颈分析]
C --> C3[优化建议]
D --> D1[断点设置]
D --> D2[变量检查]
D --> D3[单步执行]
E --> E1[数据可视化]
E --> E2[流程可视化]
E --> E3[状态可视化]
F --> F1[远程连接]
F --> F2[远程日志]
F --> F3[远程控制]
style A fill:#e1f5ff
style B fill:#e1ffe1
style C fill:#e1ffe1
style D fill:#e1ffe1
style E fill:#e1ffe1
style F fill:#e1ffe1
日志调试
1. 日志配置
import logging
# 配置日志级别
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('comfyui_debug.log'),
logging.StreamHandler()
]
)
# ComfyUI特定日志
logger = logging.getLogger('comfyui')
logger.setLevel(logging.DEBUG)
# 设置日志级别
export COMFYUI_LOG_LEVEL=DEBUG
# 启用性能日志
export COMFYUI_PERFORMANCE_LOG=true
# 启用详细输出
export COMFYUI_VERBOSE=true
2. 日志分析
graph TD
A[日志条目] --> B[时间戳]
A --> C[日志级别]
A --> D[模块名称]
A --> E[消息内容]
A --> F[堆栈跟踪]
B --> B1[2024-01-27 10:30:45]
C --> C1[DEBUG/INFO/WARNING/ERROR]
D --> D1[comfyui.nodes]
E --> E1[具体消息]
F --> F1[错误堆栈]
style A fill:#e1f5ff
style B fill:#e1ffe1
style C fill:#ffe1e1
style D fill:#e1ffe1
style E fill:#e1ffe1
style F fill:#ffe1e1
| 级别 | 用途 | 示例 |
|---|---|---|
| DEBUG | 详细调试信息 | 变量值、函数调用 |
| INFO | 一般信息 | 节点执行、状态更新 |
| WARNING | 警告信息 | 非致命错误、性能问题 |
| ERROR | 错误信息 | 节点失败、异常情况 |
| CRITICAL | 严重错误 | 系统崩溃、致命错误 |
import re
def filter_logs(log_file, pattern):
"""过滤日志文件"""
with open(log_file, 'r') as f:
for line in f:
if re.search(pattern, line):
print(line)
# 使用示例
filter_logs('comfyui_debug.log', r'ERROR|CRITICAL')
filter_logs('comfyui_debug.log', r'KSampler')
filter_logs('comfyui_debug.log', r'CUDA out of memory')
3. 自定义日志
import logging
logger = logging.getLogger('custom_debug')
def debug_node_execution(node_name, inputs, outputs):
"""记录节点执行信息"""
logger.debug(f"Node: {node_name}")
logger.debug(f"Inputs: {inputs}")
logger.debug(f"Outputs: {outputs}")
logger.debug("-" * 50)
# 使用示例
debug_node_execution(
"KSampler",
{"steps": 30, "cfg": 7.0},
{"latent": "tensor(1,4,64,64)"}
)
import time
import logging
logger = logging.getLogger('performance')
class PerformanceTimer:
def __init__(self, name):
self.name = name
self.start_time = None
def __enter__(self):
self.start_time = time.time()
return self
def __exit__(self, *args):
elapsed = time.time() - self.start_time
logger.info(f"{self.name}: {elapsed:.3f}s")
# 使用示例
with PerformanceTimer("KSampler execution"):
# 执行KSampler
pass
性能分析
4. 性能监控
import torch
import time
def monitor_gpu(interval=1.0):
"""监控GPU使用情况"""
if not torch.cuda.is_available():
print("CUDA not available")
return
while True:
allocated = torch.cuda.memory_allocated() / 1024**3
reserved = torch.cuda.memory_reserved() / 1024**3
print(f"GPU Memory:")
print(f" Allocated: {allocated:.2f} GB")
print(f" Reserved: {reserved:.2f} GB")
print(f" Free: {torch.cuda.get_device_properties(0).total_memory / 1024**3 - reserved:.2f} GB")
time.sleep(interval)
# 使用示例
# monitor_gpu(interval=5.0)
import psutil
import time
def monitor_cpu(interval=1.0):
"""监控CPU使用情况"""
while True:
cpu_percent = psutil.cpu_percent(interval=interval)
memory = psutil.virtual_memory()
print(f"CPU: {cpu_percent}%")
print(f"Memory: {memory.percent}%")
print(f"Memory Used: {memory.used / 1024**3:.2f} GB")
time.sleep(interval)
# 使用示例
# monitor_cpu(interval=5.0)
5. 瓶颈分析
import time
from collections import defaultdict
class NodeProfiler:
def __init__(self):
self.node_times = defaultdict(list)
self.current_node = None
def start_node(self, node_name):
self.current_node = node_name
self.start_time = time.time()
def end_node(self):
if self.current_node:
elapsed = time.time() - self.start_time
self.node_times[self.current_node].append(elapsed)
self.current_node = None
def get_stats(self):
stats = {}
for node, times in self.node_times.items():
stats[node] = {
'count': len(times),
'total': sum(times),
'avg': sum(times) / len(times),
'min': min(times),
'max': max(times)
}
return stats
def print_report(self):
stats = self.get_stats()
print("\nNode Performance Report:")
print("-" * 60)
for node, data in sorted(stats.items(), key=lambda x: x[1]['total'], reverse=True):
print(f"{node}:")
print(f" Count: {data['count']}")
print(f" Total: {data['total']:.3f}s")
print(f" Average: {data['avg']:.3f}s")
print(f" Min: {data['min']:.3f}s")
print(f" Max: {data['max']:.3f}s")
print()
# 使用示例
profiler = NodeProfiler()
profiler.start_node("KSampler")
# 执行KSampler
profiler.end_node()
profiler.start_node("VAEDecode")
# 执行VAEDecode
profiler.end_node()
profiler.print_report()
import torch
import tracemalloc
def analyze_memory():
"""分析内存使用情况"""
# GPU内存
if torch.cuda.is_available():
gpu_allocated = torch.cuda.memory_allocated() / 1024**3
gpu_reserved = torch.cuda.memory_reserved() / 1024**3
print(f"GPU Memory:")
print(f" Allocated: {gpu_allocated:.2f} GB")
print(f" Reserved: {gpu_reserved:.2f} GB")
# CPU内存
tracemalloc.start()
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
print("\nTop 10 CPU Memory Allocations:")
for stat in top_stats[:10]:
print(stat)
# 使用示例
# analyze_memory()
6. 优化建议
def get_optimization_suggestions(profiler_stats):
"""根据性能分析结果提供优化建议"""
suggestions = []
# 分析节点性能
for node, data in profiler_stats.items():
if data['avg'] > 5.0:
suggestions.append(f"{node} 平均执行时间过长 ({data['avg']:.2f}s),建议优化")
if data['count'] > 100 and data['total'] > 300:
suggestions.append(f"{node} 执行次数过多 ({data['count']}次),考虑缓存结果")
# 分析内存使用
if torch.cuda.is_available():
gpu_usage = torch.cuda.memory_allocated() / torch.cuda.get_device_properties(0).total_memory
if gpu_usage > 0.9:
suggestions.append("GPU显存使用率过高,建议启用模型卸载或降低分辨率")
return suggestions
# 使用示例
# suggestions = get_optimization_suggestions(profiler.get_stats())
# for suggestion in suggestions:
# print(suggestion)
断点调试
7. 断点设置
import pdb
def debug_function():
# 设置断点
pdb.set_trace()
# 调试代码
x = 10
y = 20
result = x + y
return result
# 调试命令:
# n - 下一行
# s - 步入函数
# c - 继续执行
# p variable - 打印变量
# l - 显示代码
from IPython import embed
def debug_with_ipython():
x = 10
y = 20
# 启动IPython调试器
embed()
result = x + y
return result
8. 变量检查
class VariableWatcher:
def __init__(self):
self.variables = {}
def watch(self, name, value):
"""监视变量"""
self.variables[name] = {
'value': value,
'type': type(value).__name__,
'shape': getattr(value, 'shape', None),
'size': getattr(value, 'size', len(value) if hasattr(value, '__len__') else None)
}
def print_status(self):
"""打印变量状态"""
print("\nVariable Status:")
print("-" * 60)
for name, info in self.variables.items():
print(f"{name}:")
print(f" Type: {info['type']}")
if info['shape']:
print(f" Shape: {info['shape']}")
if info['size']:
print(f" Size: {info['size']}")
print(f" Value: {info['value']}")
print()
# 使用示例
watcher = VariableWatcher()
watcher.watch('x', 10)
watcher.watch('tensor', torch.randn(2, 3))
watcher.print_status()
def check_tensor(tensor, name="Tensor"):
"""检查Tensor属性"""
print(f"\n{name}:")
print(f" Shape: {tensor.shape}")
print(f" Dtype: {tensor.dtype}")
print(f" Device: {tensor.device}")
print(f" Min: {tensor.min():.4f}")
print(f" Max: {tensor.max():.4f}")
print(f" Mean: {tensor.mean():.4f}")
print(f" Std: {tensor.std():.4f}")
print(f" NaN: {torch.isnan(tensor).any().item()}")
print(f" Inf: {torch.isinf(tensor).any().item()}")
# 使用示例
# tensor = torch.randn(1, 4, 64, 64)
# check_tensor(tensor, "Latent")
9. 单步执行
def execute_workflow_step_by_step(workflow):
"""单步执行工作流"""
for node_id, node_data in workflow.items():
print(f"\nExecuting node: {node_id}")
print(f"Class type: {node_data['class_type']}")
print(f"Inputs: {node_data['inputs']}")
try:
# 执行节点
result = execute_node(node_data)
print(f"Output: {result}")
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
# 询问是否继续
response = input("Continue? (y/n): ")
if response.lower() != 'y':
break
可视化调试
10. 数据可视化
import matplotlib.pyplot as plt
def visualize_tensor(tensor, title="Tensor"):
"""可视化Tensor"""
if tensor.dim() == 2:
# 2D Tensor
plt.figure(figsize=(10, 8))
plt.imshow(tensor.cpu().numpy(), cmap='viridis')
plt.colorbar()
plt.title(title)
plt.show()
elif tensor.dim() == 3:
# 3D Tensor (channels, height, width)
fig, axes = plt.subplots(1, tensor.shape[0], figsize=(15, 5))
for i, channel in enumerate(tensor):
axes[i].imshow(channel.cpu().numpy(), cmap='viridis')
axes[i].set_title(f'Channel {i}')
plt.suptitle(title)
plt.show()
# 使用示例
# visualize_tensor(latent[0], "Latent Channels")
def visualize_workflow(workflow):
"""可视化工作流结构"""
import networkx as nx
import matplotlib.pyplot as plt
G = nx.DiGraph()
# 添加节点
for node_id, node_data in workflow.items():
G.add_node(node_id, label=f"{node_id}\n{node_data['class_type']}")
# 添加边
for node_id, node_data in workflow.items():
for input_name, input_value in node_data['inputs'].items():
if isinstance(input_value, list) and len(input_value) == 2:
source_node = input_value[0]
G.add_edge(source_node, node_id, label=input_name)
# 绘制图形
plt.figure(figsize=(20, 15))
pos = nx.spring_layout(G)
nx.draw(G, pos, with_labels=True, node_size=3000, node_color='lightblue',
font_size=8, arrows=True)
edge_labels = nx.get_edge_attributes(G, 'label')
nx.draw_networkx_edge_labels(G, pos, edge_labels)
plt.title("Workflow Structure")
plt.show()
# 使用示例
# visualize_workflow(workflow)
11. 流程可视化
class ExecutionTracker:
def __init__(self):
self.execution_order = []
self.execution_times = {}
def track_execution(self, node_id, node_data):
"""跟踪节点执行"""
start_time = time.time()
# 执行节点
result = execute_node(node_data)
elapsed = time.time() - start_time
self.execution_order.append(node_id)
self.execution_times[node_id] = elapsed
return result
def visualize_execution(self):
"""可视化执行流程"""
import matplotlib.pyplot as plt
# 执行顺序
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.plot(range(len(self.execution_order)), self.execution_order, 'o-')
plt.xlabel('Step')
plt.ylabel('Node ID')
plt.title('Execution Order')
# 执行时间
plt.subplot(1, 2, 2)
nodes = list(self.execution_times.keys())
times = list(self.execution_times.values())
plt.bar(nodes, times)
plt.xlabel('Node ID')
plt.ylabel('Time (s)')
plt.title('Execution Time')
plt.tight_layout()
plt.show()
# 使用示例
# tracker = ExecutionTracker()
# tracker.track_execution(node_id, node_data)
# tracker.visualize_execution()
12. 状态可视化
def visualize_system_status():
"""可视化系统状态"""
import psutil
import matplotlib.pyplot as plt
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
# CPU使用率
axes[0, 0].bar(['CPU'], [psutil.cpu_percent()])
axes[0, 0].set_ylim(0, 100)
axes[0, 0].set_title('CPU Usage')
# 内存使用率
memory = psutil.virtual_memory()
axes[0, 1].bar(['Memory'], [memory.percent])
axes[0, 1].set_ylim(0, 100)
axes[0, 1].set_title('Memory Usage')
# GPU使用率
if torch.cuda.is_available():
gpu_allocated = torch.cuda.memory_allocated() / torch.cuda.get_device_properties(0).total_memory * 100
axes[1, 0].bar(['GPU'], [gpu_allocated])
axes[1, 0].set_ylim(0, 100)
axes[1, 0].set_title('GPU Usage')
# 磁盘使用率
disk = psutil.disk_usage('/')
axes[1, 1].bar(['Disk'], [disk.percent])
axes[1, 1].set_ylim(0, 100)
axes[1, 1].set_title('Disk Usage')
plt.tight_layout()
plt.show()
# 使用示例
# visualize_system_status()
远程调试
13. 远程连接
# 建立SSH连接
ssh user@remote-server
# 端口转发
ssh -L 8188:localhost:8188 user@remote-server
# 远程运行ComfyUI
cd /path/to/comfyui
python main.py --listen 0.0.0.0 --port 8188
import paramiko
def view_remote_logs(host, username, password, log_file):
"""查看远程日志"""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(host, username=username, password=password)
# 读取日志文件
sftp = ssh.open_sftp()
with sftp.file(log_file, 'r') as f:
for line in f:
print(line.strip())
ssh.close()
# 使用示例
# view_remote_logs('remote-server', 'user', 'password', '/path/to/comfyui_debug.log')
14. 远程控制
import requests
def execute_remote_workflow(url, workflow):
"""远程执行工作流"""
response = requests.post(f"{url}/prompt", json={
"prompt": workflow
})
if response.status_code == 200:
result = response.json()
prompt_id = result['prompt_id']
print(f"Workflow submitted with ID: {prompt_id}")
return prompt_id
else:
print(f"Error: {response.status_code}")
return None
# 使用示例
# prompt_id = execute_remote_workflow('http://localhost:8188', workflow)
def monitor_remote_execution(url, prompt_id):
"""监控远程执行状态"""
import time
while True:
response = requests.get(f"{url}/history/{prompt_id}")
if response.status_code == 200:
history = response.json()
if prompt_id in history:
status = history[prompt_id]['status']
print(f"Status: {status}")
if status.get('completed', False):
print("Execution completed!")
break
time.sleep(1)
# 使用示例
# monitor_remote_execution('http://localhost:8188', prompt_id)
调试工具集成
15. 综合调试工具
class ComfyUIDebugger:
def __init__(self):
self.profiler = NodeProfiler()
self.watcher = VariableWatcher()
self.tracker = ExecutionTracker()
def start_debugging(self):
"""开始调试"""
import logging
logging.basicConfig(level=logging.DEBUG)
def stop_debugging(self):
"""停止调试并生成报告"""
print("\n" + "="*60)
print("DEBUGGING REPORT")
print("="*60)
print("\n1. Node Performance:")
self.profiler.print_report()
print("\n2. Variable Status:")
self.watcher.print_status()
print("\n3. Execution Order:")
print(" -> ".join(self.tracker.execution_order))
print("\n" + "="*60)
# 使用示例
# debugger = ComfyUIDebugger()
# debugger.start_debugging()
# 执行工作流...
# debugger.stop_debugging()
调试最佳实践
调试流程
graph TD
A[发现问题] --> B[启用调试日志]
B --> C[重现问题]
C --> D[分析日志]
D --> E{找到原因?}
E -->|否| F[添加更多日志]
E -->|是| G[定位问题位置]
F --> C
G --> H[修复问题]
H --> I[验证修复]
I --> J{问题解决?}
J -->|否| D
J -->|是| K[清理调试代码]
style A fill:#ffe1e1
style G fill:#e1ffe1
style K fill:#e1ffe1
调试技巧
- 逐步调试: 从简单到复杂逐步调试
- 日志分级: 使用不同日志级别
- 性能监控: 持续监控性能指标
- 可视化辅助: 使用可视化工具辅助调试
调试注意事项
- 不要过度调试: 只在必要时启用详细日志
- 清理调试代码: 调试完成后清理调试代码
- 记录调试过程: 记录调试过程和发现
- 分享调试经验: 分享调试经验和技巧
总结
掌握调试技巧可以大大提高问题解决效率。关键要点:
- 系统化调试: 使用系统化的调试方法
- 工具辅助: 充分利用调试工具
- 可视化辅助: 使用可视化工具辅助分析
- 经验积累: 积累调试经验和技巧
通过不断练习和实践,可以成为ComfyUI调试专家。