交换机批量健康检查python脚本

批量自动化检查网络交换机的健康状态,包括运行时间、CPU/内存使用率、温度、风扇、电源状态以及关键端口状态,记录python检查日志和输出检查结果

运行环境

  • python 3.8+、可以ssh目标交换机

如何运行程序

#文件同目录下创建 `switch_list.xlsx` 文件
#python switch_health_check_latest.py

交换机清单文件

文件格式

必须包含以下列:

  • IP: 交换机IP地址
  • Username: SSH用户名
  • Password: SSH密码
  • Vendor: 设备厂商(cisco/h3c/huawei)
  • CriticalPorts (可选): 关键端口列表,逗号分隔

文件样例

IP Username Password Vendor CriticalPorts
192.168.1.1 admin Cisco123 cisco Gig1/0/1, Gig1/0/24

输出文件

日志文件

  • 位置:当前目录下的 switch_inspection.log
  • 内容
    • 程序运行详细日志
    • 设备连接状态
    • 错误和警告信息

检查报告

  • 位置:当前目录下
  • 文件名格式switch_inspection_report_YYYYMMDD_HHMMSS.xlsx
  • 内容
    • 所有检查项的详细结果
    • 状态标记(🟢/🟠/🔴)
    • 时间戳信息

代码

import pandas as pd  # 数据处理核心库,用于读取Excel设备清单和生成检查报告
import paramiko  # SSH客户端库,用于连接网络设备并执行命令
import re  # 正则表达式库,用于解析设备返回的命令输出
from datetime import datetime  # 日期时间处理,用于生成时间戳和报告文件名
import time  # 时间处理,用于SSH连接中的等待操作
import warnings  # 警告处理,用于忽略paramiko的特定警告
import logging  # 日志记录,用于创建运行日志文件和控制台输出
from typing import Dict, List, Optional  # 类型注解,用于函数参数和返回值的类型提示

# 配置日志系统
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[logging.FileHandler('switch_inspection.log'), logging.StreamHandler()]
)
logger = logging.getLogger(__name__)

# 忽略paramiko的特定警告
warnings.filterwarnings("ignore", category=UserWarning, module="paramiko")

"""
交换机命令模板
针对不同厂商的设备定义对应的CLI命令
"""
COMMAND_TEMPLATES = {
    'cisco': {
        'uptime': 'show version | include uptime',
        'cpu': 'show processes cpu sorted | exclude 0.0',
        'mem': 'show memory statistics',
        'temp': 'show environment temperature',
        'fans': 'show environment fans',
        'power': 'show environment power',
        'interfaces': 'show interfaces {port}'
    },
    'h3c': {
        'uptime': 'display version | include uptime',
        'cpu': 'display cpu-usage',
        'mem': 'display memory',
        'temp': 'display environment',
        'fans': 'display fan',
        'power': 'display power',
        'interfaces': 'display interface {port}'
    },
    'huawei': {
        'uptime': 'display version | include uptime',
        'cpu': 'display cpu-usage',
        'mem': 'display memory-usage',
        'temp': 'display temperature all',
        'fans': 'display fan',
        'power': 'display power',
        'interfaces': 'display interface {port}'
    }
}

"""
关键阈值配置
定义设备健康状态的警戒线
"""
THRESHOLDS = {
    'cpu_warning': 70,    # CPU使用率警告阈值(%)
    'cpu_critical': 90,   # CPU使用率严重阈值(%)
    'mem_warning': 75,    # 内存使用率警告阈值(%)
    'temp_warning': 60,   # 温度警告阈值(℃)
    'min_uptime': 30      # 最小正常运行时间(天)
}

def parse_switch_output(vendor: str, output: str, command_type: str) -> str:
    """
    解析交换机返回的原始数据
    根据不同厂商和命令类型提取关键信息

    参数:
        vendor: 设备厂商 (cisco/h3c/huawei)
        output: CLI命令的原始输出
        command_type: 命令类型 (uptime/cpu/mem等)

    返回:
        解析后的关键信息字符串
    """
    result = "N/A"

    try:
        if command_type == 'uptime':
            # 匹配不同格式的运行时间信息
            patterns = [
                r'(\d+)\s*year[^\d]*(\d+)\s*week[^\d]*(\d+)\s*day',  # 年/周/天格式
                r'(\d+)\s*day[s]?,\s*(\d+):(\d+)',  # 天:小时:分钟格式
                r'uptime\s+is\s+(\d+)\s+days'  # 直接天数格式
            ]

            for pattern in patterns:
                match = re.search(pattern, output, re.I)
                if match:
                    groups = match.groups()
                    if len(groups) == 3:  # 年/周/天格式
                        years, weeks, days = map(int, groups)
                        total_days = years*365 + weeks*7 + days
                    elif len(groups) == 2:  # 天:小时格式
                        days, hours = map(int, groups)
                        total_days = days
                    elif len(groups) == 1:  # 直接天数
                        total_days = int(groups[0])
                    else:
                        continue

                    # 检查是否达到最小运行时间要求
                    status = "⚠️ Low" if total_days < THRESHOLDS['min_uptime'] else "✔️ Normal"
                    result = f"{status} ({total_days} days)"
                    break

        elif command_type == 'cpu':
            # 提取CPU使用率
            patterns = {
                'cisco': r'CPU utilization for five seconds: (\d+)%',
                'default': r'[Cc][Pp][Uu] usage: (\d+)%'
            }

            pattern = patterns.get(vendor, patterns['default'])
            match = re.search(pattern, output)
            if match:
                cpu_usage = int(match.group(1))
                # 根据阈值标记状态
                if cpu_usage > THRESHOLDS['cpu_critical']:
                    status = "🔴 Critical"
                elif cpu_usage > THRESHOLDS['cpu_warning']:
                    status = "🟠 Warning"
                else:
                    status = "🟢 Normal"
                result = f"{status} ({cpu_usage}%)"

        elif command_type == 'mem':
            # 提取内存使用率
            patterns = {
                'cisco': r'Total:\s*\d+\s*(\d+)',
                'default': r'Memory Using Percentage: (\d+)%'
            }

            pattern = patterns.get(vendor, patterns['default'])
            match = re.search(pattern, output)
            if match:
                mem_usage = int(match.group(1))
                # 根据阈值标记状态
                status = "🔴 Critical" if mem_usage > THRESHOLDS['mem_warning'] else "🟢 Normal"
                result = f"{status} ({mem_usage}%)"

        elif command_type == 'temp':
            # 检查温度状态
            if any(keyword in output for keyword in ['OK', 'Normal', '正常']):
                result = "🟢 Normal"
            else:
                # 提取所有温度值
                matches = re.findall(r'\b(\d{2,3})[°cC]?\b', output)
                if matches:
                    max_temp = max(map(int, matches))
                    # 根据阈值标记状态
                    status = "🟠 Warning" if max_temp > THRESHOLDS['temp_warning'] else "🟢 Normal"
                    result = f"{status} ({max_temp}°C)"

        elif command_type in ('fans', 'power'):
            # 检查风扇/电源状态
            if any(keyword in output for keyword in ['OK', 'Normal', 'Present', '正常']):
                result = "🟢 Normal"
            else:
                # 检测故障单元
                failed = re.findall(r'(Fail|Absent|Error|Abnormal|Failed)', output, re.I)
                if failed:
                    result = f"🔴 Failed ({len(failed)} units)"

    except Exception as e:
        logger.error(f"解析错误: {str(e)}")

    return result

def check_critical_ports(ssh: paramiko.SSHClient, vendor: str, ports: str) -> str:
    """
    检查关键端口状态
    包括端口链路状态和错误计数

    参数:
        ssh: 已建立的SSH连接
        vendor: 设备厂商
        ports: 逗号分隔的端口列表

    返回:
        格式化后的端口状态报告
    """
    results = []
    port_list = [p.strip() for p in ports.split(',') if p.strip()]

    if not port_list:
        return "N/A"

    for port in port_list:
        try:
            # 执行端口检查命令
            cmd = COMMAND_TEMPLATES[vendor]['interfaces'].format(port=port)
            _, stdout, _ = ssh.exec_command(cmd)
            output = stdout.read().decode(errors='ignore')

            # 判断端口状态
            if re.search(r'(up|connected)', output, re.I) and not re.search(r'down|disable', output, re.I):
                status = "🟢 Up"
            elif re.search(r'not connect|disable|admin down', output, re.I):
                status = "🟠 AdminDown"
            else:
                status = "🔴 Down"

            # 提取错误计数
            input_errors = re.search(r'(\d+)\s*input errors', output, re.I)
            output_errors = re.search(r'(\d+)\s*output errors', output, re.I)
            error_count = (int(input_errors.group(1)) if input_errors else 0) + \
                          (int(output_errors.group(1)) if output_errors else 0)

            # 组装结果
            port_result = f"{port}: {status}"
            if error_count > 0:
                port_result += f" | 🔴 Errors: {error_count}"
            results.append(port_result)

        except Exception as e:
            logger.error(f"端口检查失败 {port}: {str(e)}")
            results.append(f"{port}: ❌ Check Failed")

    return "\n".join(results)

def inspect_switch(
    ip: str, 
    username: str, 
    password: str, 
    vendor: str, 
    critical_ports: Optional[str]
) -> Dict[str, str]:
    """
    执行单台交换机健康检查
    连接设备并收集各项健康指标

    参数:
        ip: 交换机IP地址
        username: SSH用户名
        password: SSH密码
        vendor: 设备厂商
        critical_ports: 关键端口列表(逗号分隔)

    返回:
        包含检查结果的字典
    """
    # 初始化结果字典
    results = {
        'IP': ip,
        'Vendor': vendor.upper(),
        'Uptime': '❌ Connection Failed',
        'CPU Usage': 'N/A',
        'Memory Usage': 'N/A',
        'Temperature': 'N/A',
        'Fans': 'N/A',
        'Power': 'N/A',
        'Critical Ports': 'N/A' if not critical_ports else 'Not Configured',
        'Check Time': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    }

    ssh = None
    try:
        # 建立SSH连接
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(ip, 
                   username=username, 
                   password=password, 
                   timeout=15, 
                   banner_timeout=30,
                   look_for_keys=False)

        # 获取shell并禁用分页
        shell = ssh.invoke_shell()
        page_cmd = 'terminal length 0\n' if vendor == 'cisco' else 'screen-length disable\n'
        shell.send(page_cmd)
        time.sleep(1)  # 等待命令执行

        # 依次执行健康检查命令
        for cmd_type in ['uptime', 'cpu', 'mem', 'temp', 'fans', 'power']:
            cmd = COMMAND_TEMPLATES[vendor][cmd_type]
            _, stdout, stderr = ssh.exec_command(cmd, timeout=20)
            output = stdout.read().decode(errors='ignore')
            error = stderr.read().decode(errors='ignore')

            if error:
                logger.warning(f"{ip} {cmd_type} command error: {error.strip()}")

            # 解析并存储结果
            results[cmd_type.title()] = parse_switch_output(vendor, output, cmd_type)

        # 检查关键端口
        if critical_ports:
            results['Critical Ports'] = check_critical_ports(ssh, vendor, critical_ports)

        # 更新连接状态
        results['Uptime'] = results['Uptime'].replace('❌ Connection Failed', '🟢 Connected')

    except paramiko.AuthenticationException:
        logger.error(f"{ip} 认证失败,请检查用户名/密码")
        results['Uptime'] = "❌ Authentication Failed"
    except paramiko.SSHException as e:
        logger.error(f"{ip} SSH连接错误: {str(e)}")
        results['Uptime'] = f"❌ SSH Error: {str(e)}"
    except Exception as e:
        logger.error(f"{ip} 未知错误: {str(e)}")
        results['Uptime'] = f"❌ Error: {str(e)}"
    finally:
        # 确保关闭SSH连接
        if ssh:
            ssh.close()

    return results

def main():
    """主函数:读取设备清单,执行批量检查,生成报告"""
    try:
        # 读取设备清单
        df_switches = pd.read_excel('switch_list.xlsx')
        logger.info(f"成功读取 {len(df_switches)} 台交换机信息")

        # 检查必要列是否存在
        required_columns = ['IP', 'Username', 'Password', 'Vendor']
        if not all(col in df_switches.columns for col in required_columns):
            missing = set(required_columns) - set(df_switches.columns)
            logger.error(f"缺少必要列: {', '.join(missing)}")
            return

        # 执行批量检查
        inspection_results = []
        for _, row in df_switches.iterrows():
            logger.info(f"正在检查 {row['IP']} ({row['Vendor']})...")
            result = inspect_switch(
                ip=row['IP'],
                username=row['Username'],
                password=row['Password'],
                vendor=row['Vendor'].lower(),
                critical_ports=row.get('CriticalPorts', '')
            )
            inspection_results.append(result)

        # 生成报告
        df_results = pd.DataFrame(inspection_results)

        # 重排序列顺序
        column_order = [
            'IP', 'Vendor', 'Check Time', 'Uptime', 
            'CPU Usage', 'Memory Usage', 'Temperature',
            'Fans', 'Power', 'Critical Ports'
        ]
        df_results = df_results[column_order]

        # 保存结果
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        output_file = f'switch_inspection_report_{timestamp}.xlsx'
        df_results.to_excel(output_file, index=False)

        logger.info(f"\n巡检完成! 结果已保存至: {output_file}")

        # 生成统计摘要
        success_count = len([r for r in inspection_results if '🟢 Connected' in r['Uptime']])
        failed_count = len(inspection_results) - success_count

        logger.info("统计摘要:")
        logger.info(f"- 成功检查: {success_count} 台")
        logger.info(f"- 失败设备: {failed_count} 台")

        # 输出关键问题设备
        problem_devices = []
        for device in inspection_results:
            if any(('🔴' in str(device[k]) or '⚠️' in str(device[k])) for k in device):
                problem_devices.append(device['IP'])

        if problem_devices:
            logger.warning(f"⚠️ 发现异常设备: {', '.join(problem_devices)}")

    except FileNotFoundError:
        logger.error("错误: switch_list.xlsx 文件不存在")
    except Exception as e:
        logger.error(f"主程序异常: {str(e)}", exc_info=True)

if __name__ == "__main__":
    main()

注意事项

  1. 交换机配置要求

    • 确保交换机已启用SSH服务
    • 网络可达目标交换机
    • 使用的SSH账号有足够的权限执行检查命令
  2. 首次运行建议

# 测试单台交换机连接
python -c "import paramiko; ssh = paramiko.SSHClient(); ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()); ssh.connect('192.168.1.1', username='admin', password='password'); print('连接成功')"
  1. 定时任务

可配置为每日自动执行(Linux使用cron,Windows使用任务计划程序)

# 每天凌晨2点执行
0 2 * * * /usr/bin/python3 /path/to/switch_health_check.py
  1. venv环境

可使用虚拟运行环境隔离版本和依赖

#cd path\to\your\project                #进入项目文件夹
#python -m venv venv                    #生成venv虚拟环境
#.\venv\Scripts\activate                #cmd命令行下激活venv环境
#pip install pandas paramiko openpyxl   #为临时环境安装依赖包
#python switch_health_check_latest.py   #虚拟VENV环境下运行
#deactivate                             #退出虚拟环境