交换机批量在线监控python脚本

批量监控交换机列表的CPU/内存使用率、温度、风扇、电源以及关键端口状态,在线每分钟刷新输出

运行环境

  • python 3.8+、可以ssh目标交换机

如何运行程序

#文件同目录下创建 `switch_list.xlsx` 文件
#python switch_runtime_check_latest.py

交换机清单文件

文件格式

必须包含以下列:

  • IP: 交换机IP地址
  • Username: SSH用户名
  • Password: SSH密码
  • Vendor: 设备厂商(cisco/h3c/huawei)

文件样例

IP Username Password Vendor
192.168.1.1 admin Cisco123 cisco

监控效果

交换机实时监控 - 2025-05-31 14:45:30
------------------------------------------------------------------------------------------
IP地址           厂商      状态        CPU   内存   温度   风扇     电源     接口状态
-----------------------------------------------------------------------------------------
192.168.1.1     cisco    ✓ 在线      15%   45%   42℃   正常     正常     正常
-----------------------------------------------------------------------------------------
设备总数: 1 | 在线设备: 1 | 离线设备: 0
按 Ctrl+C 退出监控
=========================================================================================

代码

import pandas as pd  # 数据处理核心库,用于读取Excel设备清单和生成检查报告
import paramiko  # SSH客户端库,用于连接网络设备并执行命令
import re  # 正则表达式库,用于解析设备返回的命令输出
from datetime import datetime  # 日期时间处理,用于生成时间戳和报告文件名
import time  # 时间处理,用于SSH连接中的等待操作
import os
import sys

# ==========================================
# 监控系统配置参数
# ==========================================
MONITOR_INTERVAL = 60  # 监控间隔(秒) - 控制刷新频率
MAX_DEVICES = 20       # 最大显示设备数 - 限制屏幕显示数量
STATUS_WIDTH = 90      # 状态显示宽度 - 控制台输出宽度

# ==========================================
# 不同厂商设备的监控命令模板
# ==========================================
COMMANDS = {
    'cisco': {  # Cisco设备命令
        'cpu': 'show processes cpu | include CPU',  # 获取CPU使用率
        'mem': 'show memory statistics',            # 获取内存使用率
        'temp': 'show environment temperature',     # 获取温度信息
        'fans': 'show environment fans',            # 获取风扇状态
        'power': 'show environment power',          # 获取电源状态
        'interfaces': 'show interfaces | include line protocol|input rate|output rate'  # 获取接口状态
    },
    'h3c': {    # H3C设备命令
        'cpu': 'display cpu-usage',
        'mem': 'display memory',
        'temp': 'display environment',
        'fans': 'display fan',
        'power': 'display power',
        'interfaces': 'display interface brief'
    },
    'huawei': {  # 华为设备命令
        'cpu': 'display cpu-usage',
        'mem': 'display memory-usage',
        'temp': 'display temperature all',
        'fans': 'display fan',
        'power': 'display power',
        'interfaces': 'display interface brief'
    }
}

def clear_screen():
    """
    清屏函数 - 清除控制台输出,提供干净的显示界面
    根据操作系统类型自动选择清屏命令
    """
    os.system('cls' if os.name == 'nt' else 'clear')

def print_header():
    """
    打印监控头部信息 - 显示监控标题和状态表头
    包含时间戳和状态列标题
    """
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # 当前时间戳
    print(f"交换机实时监控 - {timestamp}")
    print("-" * STATUS_WIDTH)  # 分隔线

    # 状态表头
    print(f"{'IP地址':<16}{'厂商':<8}{'状态':<6}{'CPU':<6}{'内存':<6}{'温度':<6}{'风扇':<6}{'电源':<6}{'接口状态'}")

    print("-" * STATUS_WIDTH)  # 分隔线

def get_device_status(ssh, vendor):
    """
    获取设备状态信息 - 通过SSH连接执行命令并解析设备状态

    参数:
        ssh: 已建立的SSH连接对象
        vendor: 设备厂商类型 (cisco/h3c/huawei)

    返回:
        包含设备状态的字典:
        - cpu: CPU使用率百分比
        - mem: 内存使用率百分比
        - temp: 设备最高温度
        - fans: 风扇状态 (正常/故障)
        - power: 电源状态 (正常/故障)
        - interfaces: 接口状态 (正常/警告/高负载)
    """
    # 初始化默认状态值
    status = {
        'cpu': '?',    # CPU使用率 (未知)
        'mem': '?',     # 内存使用率 (未知)
        'temp': '?',    # 温度 (未知)
        'fans': '?',    # 风扇状态 (未知)
        'power': '?',   # 电源状态 (未知)
        'interfaces': '?'  # 接口状态 (未知)
    }

    try:
        # ==========================================
        # 获取并解析CPU使用率
        # ==========================================
        _, stdout, _ = ssh.exec_command(COMMANDS[vendor]['cpu'], timeout=10)
        output = stdout.read().decode('utf-8', errors='ignore')  # 读取命令输出

        # 根据不同厂商解析CPU使用率
        if vendor == 'cisco':
            # Cisco设备CPU解析正则表达式
            match = re.search(r'CPU utilization for five seconds: (\d+)%', output)
        else:
            # H3C/华为设备CPU解析正则表达式
            match = re.search(r'[Cc][Pp][Uu] usage: (\d+)%', output)

        if match:
            status['cpu'] = f"{match.group(1)}%"  # 设置CPU使用率

        # ==========================================
        # 获取并解析内存使用率
        # ==========================================
        _, stdout, _ = ssh.exec_command(COMMANDS[vendor]['mem'], timeout=10)
        output = stdout.read().decode('utf-8', errors='ignore')

        # 根据不同厂商解析内存使用率
        if vendor == 'cisco':
            match = re.search(r'Total:\s*\d+\s*(\d+)', output)
        else:
            match = re.search(r'Memory Using Percentage: (\d+)%', output)

        if match:
            status['mem'] = f"{match.group(1)}%"  # 设置内存使用率

        # ==========================================
        # 获取并解析温度信息
        # ==========================================
        _, stdout, _ = ssh.exec_command(COMMANDS[vendor]['temp'], timeout=10)
        output = stdout.read().decode('utf-8', errors='ignore')

        # 提取所有温度值
        matches = re.findall(r'\b(\d{2,3})[°cC]?\b', output)
        if matches:
            # 获取最高温度值
            max_temp = max(map(int, matches))
            status['temp'] = f"{max_temp}℃"  # 设置最高温度

        # ==========================================
        # 获取并解析风扇状态
        # ==========================================
        _, stdout, _ = ssh.exec_command(COMMANDS[vendor]['fans'], timeout=10)
        output = stdout.read().decode('utf-8', errors='ignore')

        # 检查风扇状态关键词
        if any(keyword in output for keyword in ['OK', 'Normal', 'Present', '正常']):
            status['fans'] = '正常'  # 风扇工作正常
        elif any(keyword in output for keyword in ['Fail', 'Absent', 'Error', 'Abnormal']):
            status['fans'] = '故障'  # 风扇故障

        # ==========================================
        # 获取并解析电源状态
        # ==========================================
        _, stdout, _ = ssh.exec_command(COMMANDS[vendor]['power'], timeout=10)
        output = stdout.read().decode('utf-8', errors='ignore')

        # 检查电源状态关键词
        if any(keyword in output for keyword in ['OK', 'Normal', 'Present', '正常']):
            status['power'] = '正常'  # 电源工作正常
        elif any(keyword in output for keyword in ['Fail', 'Absent', 'Error', 'Abnormal']):
            status['power'] = '故障'  # 电源故障

        # ==========================================
        # 获取并解析接口状态
        # ==========================================
        _, stdout, _ = ssh.exec_command(COMMANDS[vendor]['interfaces'], timeout=15)
        output = stdout.read().decode('utf-8', errors='ignore')
        status['interfaces'] = "正常"  # 默认设为正常

        # 检查接口问题关键词
        if re.search(r'down|err|discard|error', output, re.I):
            status['interfaces'] = "警告"  # 检测到接口问题

        # 检查高速率接口关键词
        if re.search(r'100%|\b10G|\b40G|\b100G', output, re.I):
            status['interfaces'] = "高负载"  # 检测到高负载接口

    except Exception as e:
        # 忽略所有异常,保持监控连续性
        pass

    return status

def color_text(text, color_code):
    """
    为文本添加ANSI颜色代码 - 增强控制台显示可读性

    参数:
        text: 需要着色的文本
        color_code: ANSI颜色代码 (字符串)
          92: 绿色 (正常)
          91: 红色 (故障/离线)
          93: 黄色 (警告)
          90: 灰色 (未知)

    返回:
        带颜色代码的文本字符串
    """
    return f"\033[{color_code}m{text}\033[0m"

def monitor_devices(devices):
    """
    设备监控主循环 - 持续监控设备状态并刷新显示

    参数:
        devices: 设备列表 (字典列表)
    """
    # 设备状态存储字典
    device_status = {}

    # 主监控循环
    while True:
        clear_screen()  # 清屏
        print_header()  # 打印头部信息

        # 重置所有设备状态为离线
        for ip in device_status:
            device_status[ip]['status'] = '离线'

        # ==========================================
        # 遍历所有设备并获取状态
        # ==========================================
        for device in devices:
            ip = device['IP']
            vendor = device['Vendor'].lower()  # 统一为小写

            # 初始化新设备状态记录
            if ip not in device_status:
                device_status[ip] = {
                    'vendor': vendor,
                    'status': '离线',  # 初始状态设为离线
                    'cpu': '?',       # CPU状态未知
                    'mem': '?',        # 内存状态未知
                    'temp': '?',       # 温度状态未知
                    'fans': '?',       # 风扇状态未知
                    'power': '?',      # 电源状态未知
                    'interfaces': '?'  # 接口状态未知
                }

            try:
                # ==========================================
                # 建立SSH连接
                # ==========================================
                ssh = paramiko.SSHClient()
                ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())  # 自动添加主机密钥
                ssh.connect(
                    ip,
                    username=device['Username'],
                    password=device['Password'],
                    timeout=8,           # 连接超时时间(秒)
                    banner_timeout=15,   # Banner等待时间(秒)
                    look_for_keys=False  # 不查找密钥文件
                )

                # ==========================================
                # 获取设备状态并更新
                # ==========================================
                status = get_device_status(ssh, vendor)
                device_status[ip].update(status)          # 更新状态信息
                device_status[ip]['status'] = '在线'      # 标记设备在线

                # 关闭SSH连接
                ssh.close()

            except:
                # 连接失败,标记设备离线
                device_status[ip]['status'] = '离线'

        # ==========================================
        # 显示设备状态
        # ==========================================
        count = 0
        for ip, status in device_status.items():
            # 限制显示设备数量
            if count >= MAX_DEVICES:
                break

            # ==========================================
            # 设备连接状态显示处理
            # ==========================================
            if status['status'] == '在线':
                status_icon = '✓'
                status_display = color_text(f"{status_icon} 在线", '92')  # 绿色显示
            else:
                status_icon = '✗'
                status_display = color_text(f"{status_icon} 离线", '91')  # 红色显示

            # ==========================================
            # 风扇状态显示处理
            # ==========================================
            if status['fans'] == '正常':
                fans_display = color_text('正常', '92')  # 绿色
            elif status['fans'] == '故障':
                fans_display = color_text('故障', '91')  # 红色
            else:
                fans_display = color_text('?', '90')     # 灰色

            # ==========================================
            # 电源状态显示处理
            # ==========================================
            if status['power'] == '正常':
                power_display = color_text('正常', '92')  # 绿色
            elif status['power'] == '故障':
                power_display = color_text('故障', '91')  # 红色
            else:
                power_display = color_text('?', '90')    # 灰色

            # ==========================================
            # 接口状态显示处理
            # ==========================================
            if status['interfaces'] == '正常':
                iface_display = color_text('正常', '92')  # 绿色
            elif status['interfaces'] == '警告':
                iface_display = color_text('警告', '93')  # 黄色
            elif status['interfaces'] == '高负载':
                iface_display = color_text('高负载', '93')  # 黄色
            else:
                iface_display = color_text('?', '90')     # 灰色

            # 打印设备状态行
            print(f"{ip:<16}{status['vendor']:<8}{status_display:<12}{status['cpu']:<6}{status['mem']:<6}{status['temp']:<6}{fans_display:<8}{power_display:<8}{iface_display}")
            count += 1

        # ==========================================
        # 显示统计信息
        # ==========================================
        online_count = sum(1 for s in device_status.values() if s['status'] == '在线')
        print("-" * STATUS_WIDTH)  # 分隔线
        print(f"设备总数: {len(device_status)} | 在线设备: {online_count} | 离线设备: {len(device_status) - online_count}")
        print("按 Ctrl+C 退出监控")
        print("=" * STATUS_WIDTH)  # 分隔线

        # 等待下一个监控周期
        time.sleep(MONITOR_INTERVAL)

def main():
    """
    主函数 - 程序入口点
    加载设备清单并启动监控
    """
    try:
        # ==========================================
        # 加载设备清单
        # ==========================================
        device_file = 'switch_list.xlsx'

        # 检查设备清单文件是否存在
        if not os.path.exists(device_file):
            print(f"错误: 设备清单文件不存在 - {device_file}")
            return

        # 读取Excel文件
        df_devices = pd.read_excel(device_file)

        # 检查必要列是否存在
        required_columns = ['IP', 'Username', 'Password', 'Vendor']
        if not all(col in df_devices.columns for col in required_columns):
            # 找出缺失的列
            missing = set(required_columns) - set(df_devices.columns)
            print(f"错误: 缺少必要列 - {', '.join(missing)}")
            return

        # 转换为字典列表格式
        devices = df_devices.to_dict('records')
        print(f"开始监控 {len(devices)} 台设备,刷新间隔 {MONITOR_INTERVAL} 秒...")
        time.sleep(2)  # 等待2秒,让用户看到提示

        # 启动监控主循环
        monitor_devices(devices)

    except KeyboardInterrupt:
        # 捕获Ctrl+C中断
        print("\n监控已停止")
    except Exception as e:
        # 捕获其他异常
        print(f"监控错误: {str(e)}")

if __name__ == "__main__":
    # 程序入口
    main()

注意事项

  • 确保交换机已启用SSH服务
  • 网络可达目标交换机
  • 使用的SSH账号有足够的权限执行检查命令

venv环境

可使用虚拟运行环境隔离版本和依赖

#cd path\to\your\project                #进入项目文件夹
#python -m venv venv                    #生成venv虚拟环境
#.\venv\Scripts\activate                #cmd命令行下激活venv环境
#pip install pandas paramiko openpyxl   #为临时环境安装依赖包
#python switch_runtime_check_latest.py  #虚拟VENV环境下运行
#deactivate                             #退出虚拟环境