调试技巧

本文档介绍ComfyUI的调试技巧和工具,帮助开发者快速定位和解决问题,包括日志分析、性能分析、断点调试等方法。

简介

调试方法分类

graph TD
    A[调试方法] --> B[日志调试]
    A --> C[性能分析]
    A --> D[断点调试]
    A --> E[可视化调试]
    A --> F[远程调试]

    B --> B1[日志输出]
    B --> B2[日志分析]
    B --> B3[日志过滤]

    C --> C1[性能监控]
    C --> C2[瓶颈分析]
    C --> C3[优化建议]

    D --> D1[断点设置]
    D --> D2[变量检查]
    D --> D3[单步执行]

    E --> E1[数据可视化]
    E --> E2[流程可视化]
    E --> E3[状态可视化]

    F --> F1[远程连接]
    F --> F2[远程日志]
    F --> F3[远程控制]

    style A fill:#e1f5ff
    style B fill:#e1ffe1
    style C fill:#e1ffe1
    style D fill:#e1ffe1
    style E fill:#e1ffe1
    style F fill:#e1ffe1

日志调试

1. 日志配置

import logging

# 配置日志级别
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('comfyui_debug.log'),
        logging.StreamHandler()
    ]
)

# ComfyUI特定日志
logger = logging.getLogger('comfyui')
logger.setLevel(logging.DEBUG)
# 设置日志级别
export COMFYUI_LOG_LEVEL=DEBUG

# 启用性能日志
export COMFYUI_PERFORMANCE_LOG=true

# 启用详细输出
export COMFYUI_VERBOSE=true

2. 日志分析

graph TD
    A[日志条目] --> B[时间戳]
    A --> C[日志级别]
    A --> D[模块名称]
    A --> E[消息内容]
    A --> F[堆栈跟踪]

    B --> B1[2024-01-27 10:30:45]
    C --> C1[DEBUG/INFO/WARNING/ERROR]
    D --> D1[comfyui.nodes]
    E --> E1[具体消息]
    F --> F1[错误堆栈]

    style A fill:#e1f5ff
    style B fill:#e1ffe1
    style C fill:#ffe1e1
    style D fill:#e1ffe1
    style E fill:#e1ffe1
    style F fill:#ffe1e1
级别 用途 示例
DEBUG 详细调试信息 变量值、函数调用
INFO 一般信息 节点执行、状态更新
WARNING 警告信息 非致命错误、性能问题
ERROR 错误信息 节点失败、异常情况
CRITICAL 严重错误 系统崩溃、致命错误
import re

def filter_logs(log_file, pattern):
    """过滤日志文件"""
    with open(log_file, 'r') as f:
        for line in f:
            if re.search(pattern, line):
                print(line)

# 使用示例
filter_logs('comfyui_debug.log', r'ERROR|CRITICAL')
filter_logs('comfyui_debug.log', r'KSampler')
filter_logs('comfyui_debug.log', r'CUDA out of memory')

3. 自定义日志

import logging

logger = logging.getLogger('custom_debug')

def debug_node_execution(node_name, inputs, outputs):
    """记录节点执行信息"""
    logger.debug(f"Node: {node_name}")
    logger.debug(f"Inputs: {inputs}")
    logger.debug(f"Outputs: {outputs}")
    logger.debug("-" * 50)

# 使用示例
debug_node_execution(
    "KSampler",
    {"steps": 30, "cfg": 7.0},
    {"latent": "tensor(1,4,64,64)"}
)
import time
import logging

logger = logging.getLogger('performance')

class PerformanceTimer:
    def __init__(self, name):
        self.name = name
        self.start_time = None

    def __enter__(self):
        self.start_time = time.time()
        return self

    def __exit__(self, *args):
        elapsed = time.time() - self.start_time
        logger.info(f"{self.name}: {elapsed:.3f}s")

# 使用示例
with PerformanceTimer("KSampler execution"):
    # 执行KSampler
    pass

性能分析

4. 性能监控

import torch
import time

def monitor_gpu(interval=1.0):
    """监控GPU使用情况"""
    if not torch.cuda.is_available():
        print("CUDA not available")
        return

    while True:
        allocated = torch.cuda.memory_allocated() / 1024**3
        reserved = torch.cuda.memory_reserved() / 1024**3

        print(f"GPU Memory:")
        print(f"  Allocated: {allocated:.2f} GB")
        print(f"  Reserved: {reserved:.2f} GB")
        print(f"  Free: {torch.cuda.get_device_properties(0).total_memory / 1024**3 - reserved:.2f} GB")

        time.sleep(interval)

# 使用示例
# monitor_gpu(interval=5.0)
import psutil
import time

def monitor_cpu(interval=1.0):
    """监控CPU使用情况"""
    while True:
        cpu_percent = psutil.cpu_percent(interval=interval)
        memory = psutil.virtual_memory()

        print(f"CPU: {cpu_percent}%")
        print(f"Memory: {memory.percent}%")
        print(f"Memory Used: {memory.used / 1024**3:.2f} GB")

        time.sleep(interval)

# 使用示例
# monitor_cpu(interval=5.0)

5. 瓶颈分析

import time
from collections import defaultdict

class NodeProfiler:
    def __init__(self):
        self.node_times = defaultdict(list)
        self.current_node = None

    def start_node(self, node_name):
        self.current_node = node_name
        self.start_time = time.time()

    def end_node(self):
        if self.current_node:
            elapsed = time.time() - self.start_time
            self.node_times[self.current_node].append(elapsed)
            self.current_node = None

    def get_stats(self):
        stats = {}
        for node, times in self.node_times.items():
            stats[node] = {
                'count': len(times),
                'total': sum(times),
                'avg': sum(times) / len(times),
                'min': min(times),
                'max': max(times)
            }
        return stats

    def print_report(self):
        stats = self.get_stats()
        print("\nNode Performance Report:")
        print("-" * 60)
        for node, data in sorted(stats.items(), key=lambda x: x[1]['total'], reverse=True):
            print(f"{node}:")
            print(f"  Count: {data['count']}")
            print(f"  Total: {data['total']:.3f}s")
            print(f"  Average: {data['avg']:.3f}s")
            print(f"  Min: {data['min']:.3f}s")
            print(f"  Max: {data['max']:.3f}s")
            print()

# 使用示例
profiler = NodeProfiler()

profiler.start_node("KSampler")
# 执行KSampler
profiler.end_node()

profiler.start_node("VAEDecode")
# 执行VAEDecode
profiler.end_node()

profiler.print_report()
import torch
import tracemalloc

def analyze_memory():
    """分析内存使用情况"""
    # GPU内存
    if torch.cuda.is_available():
        gpu_allocated = torch.cuda.memory_allocated() / 1024**3
        gpu_reserved = torch.cuda.memory_reserved() / 1024**3
        print(f"GPU Memory:")
        print(f"  Allocated: {gpu_allocated:.2f} GB")
        print(f"  Reserved: {gpu_reserved:.2f} GB")

    # CPU内存
    tracemalloc.start()
    snapshot = tracemalloc.take_snapshot()
    top_stats = snapshot.statistics('lineno')

    print("\nTop 10 CPU Memory Allocations:")
    for stat in top_stats[:10]:
        print(stat)

# 使用示例
# analyze_memory()

6. 优化建议

def get_optimization_suggestions(profiler_stats):
    """根据性能分析结果提供优化建议"""
    suggestions = []

    # 分析节点性能
    for node, data in profiler_stats.items():
        if data['avg'] > 5.0:
            suggestions.append(f"{node} 平均执行时间过长 ({data['avg']:.2f}s),建议优化")

        if data['count'] > 100 and data['total'] > 300:
            suggestions.append(f"{node} 执行次数过多 ({data['count']}次),考虑缓存结果")

    # 分析内存使用
    if torch.cuda.is_available():
        gpu_usage = torch.cuda.memory_allocated() / torch.cuda.get_device_properties(0).total_memory
        if gpu_usage > 0.9:
            suggestions.append("GPU显存使用率过高,建议启用模型卸载或降低分辨率")

    return suggestions

# 使用示例
# suggestions = get_optimization_suggestions(profiler.get_stats())
# for suggestion in suggestions:
#     print(suggestion)

断点调试

7. 断点设置

import pdb

def debug_function():
    # 设置断点
    pdb.set_trace()

    # 调试代码
    x = 10
    y = 20
    result = x + y

    return result

# 调试命令:
# n - 下一行
# s - 步入函数
# c - 继续执行
# p variable - 打印变量
# l - 显示代码
from IPython import embed

def debug_with_ipython():
    x = 10
    y = 20

    # 启动IPython调试器
    embed()

    result = x + y
    return result

8. 变量检查

class VariableWatcher:
    def __init__(self):
        self.variables = {}

    def watch(self, name, value):
        """监视变量"""
        self.variables[name] = {
            'value': value,
            'type': type(value).__name__,
            'shape': getattr(value, 'shape', None),
            'size': getattr(value, 'size', len(value) if hasattr(value, '__len__') else None)
        }

    def print_status(self):
        """打印变量状态"""
        print("\nVariable Status:")
        print("-" * 60)
        for name, info in self.variables.items():
            print(f"{name}:")
            print(f"  Type: {info['type']}")
            if info['shape']:
                print(f"  Shape: {info['shape']}")
            if info['size']:
                print(f"  Size: {info['size']}")
            print(f"  Value: {info['value']}")
            print()

# 使用示例
watcher = VariableWatcher()
watcher.watch('x', 10)
watcher.watch('tensor', torch.randn(2, 3))
watcher.print_status()
def check_tensor(tensor, name="Tensor"):
    """检查Tensor属性"""
    print(f"\n{name}:")
    print(f"  Shape: {tensor.shape}")
    print(f"  Dtype: {tensor.dtype}")
    print(f"  Device: {tensor.device}")
    print(f"  Min: {tensor.min():.4f}")
    print(f"  Max: {tensor.max():.4f}")
    print(f"  Mean: {tensor.mean():.4f}")
    print(f"  Std: {tensor.std():.4f}")
    print(f"  NaN: {torch.isnan(tensor).any().item()}")
    print(f"  Inf: {torch.isinf(tensor).any().item()}")

# 使用示例
# tensor = torch.randn(1, 4, 64, 64)
# check_tensor(tensor, "Latent")

9. 单步执行

def execute_workflow_step_by_step(workflow):
    """单步执行工作流"""
    for node_id, node_data in workflow.items():
        print(f"\nExecuting node: {node_id}")
        print(f"Class type: {node_data['class_type']}")
        print(f"Inputs: {node_data['inputs']}")

        try:
            # 执行节点
            result = execute_node(node_data)
            print(f"Output: {result}")

        except Exception as e:
            print(f"Error: {e}")
            import traceback
            traceback.print_exc()

            # 询问是否继续
            response = input("Continue? (y/n): ")
            if response.lower() != 'y':
                break

可视化调试

10. 数据可视化

import matplotlib.pyplot as plt

def visualize_tensor(tensor, title="Tensor"):
    """可视化Tensor"""
    if tensor.dim() == 2:
        # 2D Tensor
        plt.figure(figsize=(10, 8))
        plt.imshow(tensor.cpu().numpy(), cmap='viridis')
        plt.colorbar()
        plt.title(title)
        plt.show()

    elif tensor.dim() == 3:
        # 3D Tensor (channels, height, width)
        fig, axes = plt.subplots(1, tensor.shape[0], figsize=(15, 5))
        for i, channel in enumerate(tensor):
            axes[i].imshow(channel.cpu().numpy(), cmap='viridis')
            axes[i].set_title(f'Channel {i}')
        plt.suptitle(title)
        plt.show()

# 使用示例
# visualize_tensor(latent[0], "Latent Channels")
def visualize_workflow(workflow):
    """可视化工作流结构"""
    import networkx as nx
    import matplotlib.pyplot as plt

    G = nx.DiGraph()

    # 添加节点
    for node_id, node_data in workflow.items():
        G.add_node(node_id, label=f"{node_id}\n{node_data['class_type']}")

    # 添加边
    for node_id, node_data in workflow.items():
        for input_name, input_value in node_data['inputs'].items():
            if isinstance(input_value, list) and len(input_value) == 2:
                source_node = input_value[0]
                G.add_edge(source_node, node_id, label=input_name)

    # 绘制图形
    plt.figure(figsize=(20, 15))
    pos = nx.spring_layout(G)
    nx.draw(G, pos, with_labels=True, node_size=3000, node_color='lightblue', 
            font_size=8, arrows=True)
    edge_labels = nx.get_edge_attributes(G, 'label')
    nx.draw_networkx_edge_labels(G, pos, edge_labels)
    plt.title("Workflow Structure")
    plt.show()

# 使用示例
# visualize_workflow(workflow)

11. 流程可视化

class ExecutionTracker:
    def __init__(self):
        self.execution_order = []
        self.execution_times = {}

    def track_execution(self, node_id, node_data):
        """跟踪节点执行"""
        start_time = time.time()

        # 执行节点
        result = execute_node(node_data)

        elapsed = time.time() - start_time

        self.execution_order.append(node_id)
        self.execution_times[node_id] = elapsed

        return result

    def visualize_execution(self):
        """可视化执行流程"""
        import matplotlib.pyplot as plt

        # 执行顺序
        plt.figure(figsize=(12, 6))
        plt.subplot(1, 2, 1)
        plt.plot(range(len(self.execution_order)), self.execution_order, 'o-')
        plt.xlabel('Step')
        plt.ylabel('Node ID')
        plt.title('Execution Order')

        # 执行时间
        plt.subplot(1, 2, 2)
        nodes = list(self.execution_times.keys())
        times = list(self.execution_times.values())
        plt.bar(nodes, times)
        plt.xlabel('Node ID')
        plt.ylabel('Time (s)')
        plt.title('Execution Time')

        plt.tight_layout()
        plt.show()

# 使用示例
# tracker = ExecutionTracker()
# tracker.track_execution(node_id, node_data)
# tracker.visualize_execution()

12. 状态可视化

def visualize_system_status():
    """可视化系统状态"""
    import psutil
    import matplotlib.pyplot as plt

    fig, axes = plt.subplots(2, 2, figsize=(12, 10))

    # CPU使用率
    axes[0, 0].bar(['CPU'], [psutil.cpu_percent()])
    axes[0, 0].set_ylim(0, 100)
    axes[0, 0].set_title('CPU Usage')

    # 内存使用率
    memory = psutil.virtual_memory()
    axes[0, 1].bar(['Memory'], [memory.percent])
    axes[0, 1].set_ylim(0, 100)
    axes[0, 1].set_title('Memory Usage')

    # GPU使用率
    if torch.cuda.is_available():
        gpu_allocated = torch.cuda.memory_allocated() / torch.cuda.get_device_properties(0).total_memory * 100
        axes[1, 0].bar(['GPU'], [gpu_allocated])
        axes[1, 0].set_ylim(0, 100)
        axes[1, 0].set_title('GPU Usage')

    # 磁盘使用率
    disk = psutil.disk_usage('/')
    axes[1, 1].bar(['Disk'], [disk.percent])
    axes[1, 1].set_ylim(0, 100)
    axes[1, 1].set_title('Disk Usage')

    plt.tight_layout()
    plt.show()

# 使用示例
# visualize_system_status()

远程调试

13. 远程连接

# 建立SSH连接
ssh user@remote-server

# 端口转发
ssh -L 8188:localhost:8188 user@remote-server

# 远程运行ComfyUI
cd /path/to/comfyui
python main.py --listen 0.0.0.0 --port 8188
import paramiko

def view_remote_logs(host, username, password, log_file):
    """查看远程日志"""
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(host, username=username, password=password)

    # 读取日志文件
    sftp = ssh.open_sftp()
    with sftp.file(log_file, 'r') as f:
        for line in f:
            print(line.strip())

    ssh.close()

# 使用示例
# view_remote_logs('remote-server', 'user', 'password', '/path/to/comfyui_debug.log')

14. 远程控制

import requests

def execute_remote_workflow(url, workflow):
    """远程执行工作流"""
    response = requests.post(f"{url}/prompt", json={
        "prompt": workflow
    })

    if response.status_code == 200:
        result = response.json()
        prompt_id = result['prompt_id']
        print(f"Workflow submitted with ID: {prompt_id}")
        return prompt_id
    else:
        print(f"Error: {response.status_code}")
        return None

# 使用示例
# prompt_id = execute_remote_workflow('http://localhost:8188', workflow)
def monitor_remote_execution(url, prompt_id):
    """监控远程执行状态"""
    import time

    while True:
        response = requests.get(f"{url}/history/{prompt_id}")

        if response.status_code == 200:
            history = response.json()

            if prompt_id in history:
                status = history[prompt_id]['status']
                print(f"Status: {status}")

                if status.get('completed', False):
                    print("Execution completed!")
                    break

        time.sleep(1)

# 使用示例
# monitor_remote_execution('http://localhost:8188', prompt_id)

调试工具集成

15. 综合调试工具

class ComfyUIDebugger:
    def __init__(self):
        self.profiler = NodeProfiler()
        self.watcher = VariableWatcher()
        self.tracker = ExecutionTracker()

    def start_debugging(self):
        """开始调试"""
        import logging
        logging.basicConfig(level=logging.DEBUG)

    def stop_debugging(self):
        """停止调试并生成报告"""
        print("\n" + "="*60)
        print("DEBUGGING REPORT")
        print("="*60)

        print("\n1. Node Performance:")
        self.profiler.print_report()

        print("\n2. Variable Status:")
        self.watcher.print_status()

        print("\n3. Execution Order:")
        print(" -> ".join(self.tracker.execution_order))

        print("\n" + "="*60)

# 使用示例
# debugger = ComfyUIDebugger()
# debugger.start_debugging()
# 执行工作流...
# debugger.stop_debugging()

调试最佳实践

调试流程

graph TD
    A[发现问题] --> B[启用调试日志]
    B --> C[重现问题]
    C --> D[分析日志]
    D --> E{找到原因?}
    E -->|否| F[添加更多日志]
    E -->|是| G[定位问题位置]
    F --> C
    G --> H[修复问题]
    H --> I[验证修复]
    I --> J{问题解决?}
    J -->|否| D
    J -->|是| K[清理调试代码]

    style A fill:#ffe1e1
    style G fill:#e1ffe1
    style K fill:#e1ffe1

调试技巧

  1. 逐步调试: 从简单到复杂逐步调试
  2. 日志分级: 使用不同日志级别
  3. 性能监控: 持续监控性能指标
  4. 可视化辅助: 使用可视化工具辅助调试

调试注意事项

  1. 不要过度调试: 只在必要时启用详细日志
  2. 清理调试代码: 调试完成后清理调试代码
  3. 记录调试过程: 记录调试过程和发现
  4. 分享调试经验: 分享调试经验和技巧

总结

掌握调试技巧可以大大提高问题解决效率。关键要点:

  1. 系统化调试: 使用系统化的调试方法
  2. 工具辅助: 充分利用调试工具
  3. 可视化辅助: 使用可视化工具辅助分析
  4. 经验积累: 积累调试经验和技巧

通过不断练习和实践,可以成为ComfyUI调试专家。