交换机批量健康检查python脚本
批量自动化检查网络交换机的健康状态,包括运行时间、CPU/内存使用率、温度、风扇、电源状态以及关键端口状态,记录python检查日志和输出检查结果
运行环境
- python 3.8+、可以ssh目标交换机
如何运行程序
#文件同目录下创建 `switch_list.xlsx` 文件
#python switch_health_check_latest.py
交换机清单文件
文件格式
必须包含以下列:
IP
: 交换机IP地址Username
: SSH用户名Password
: SSH密码Vendor
: 设备厂商(cisco/h3c/huawei)CriticalPorts
(可选): 关键端口列表,逗号分隔
文件样例
IP | Username | Password | Vendor | CriticalPorts |
---|---|---|---|---|
192.168.1.1 | admin | Cisco123 | cisco | Gig1/0/1, Gig1/0/24 |
输出文件
日志文件
- 位置:当前目录下的
switch_inspection.log
- 内容:
- 程序运行详细日志
- 设备连接状态
- 错误和警告信息
检查报告
- 位置:当前目录下
- 文件名格式:
switch_inspection_report_YYYYMMDD_HHMMSS.xlsx
- 内容:
- 所有检查项的详细结果
- 状态标记(🟢/🟠/🔴)
- 时间戳信息
代码
import pandas as pd # 数据处理核心库,用于读取Excel设备清单和生成检查报告
import paramiko # SSH客户端库,用于连接网络设备并执行命令
import re # 正则表达式库,用于解析设备返回的命令输出
from datetime import datetime # 日期时间处理,用于生成时间戳和报告文件名
import time # 时间处理,用于SSH连接中的等待操作
import warnings # 警告处理,用于忽略paramiko的特定警告
import logging # 日志记录,用于创建运行日志文件和控制台输出
from typing import Dict, List, Optional # 类型注解,用于函数参数和返回值的类型提示
# 配置日志系统
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.FileHandler('switch_inspection.log'), logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# 忽略paramiko的特定警告
warnings.filterwarnings("ignore", category=UserWarning, module="paramiko")
"""
交换机命令模板
针对不同厂商的设备定义对应的CLI命令
"""
COMMAND_TEMPLATES = {
'cisco': {
'uptime': 'show version | include uptime',
'cpu': 'show processes cpu sorted | exclude 0.0',
'mem': 'show memory statistics',
'temp': 'show environment temperature',
'fans': 'show environment fans',
'power': 'show environment power',
'interfaces': 'show interfaces {port}'
},
'h3c': {
'uptime': 'display version | include uptime',
'cpu': 'display cpu-usage',
'mem': 'display memory',
'temp': 'display environment',
'fans': 'display fan',
'power': 'display power',
'interfaces': 'display interface {port}'
},
'huawei': {
'uptime': 'display version | include uptime',
'cpu': 'display cpu-usage',
'mem': 'display memory-usage',
'temp': 'display temperature all',
'fans': 'display fan',
'power': 'display power',
'interfaces': 'display interface {port}'
}
}
"""
关键阈值配置
定义设备健康状态的警戒线
"""
THRESHOLDS = {
'cpu_warning': 70, # CPU使用率警告阈值(%)
'cpu_critical': 90, # CPU使用率严重阈值(%)
'mem_warning': 75, # 内存使用率警告阈值(%)
'temp_warning': 60, # 温度警告阈值(℃)
'min_uptime': 30 # 最小正常运行时间(天)
}
def parse_switch_output(vendor: str, output: str, command_type: str) -> str:
"""
解析交换机返回的原始数据
根据不同厂商和命令类型提取关键信息
参数:
vendor: 设备厂商 (cisco/h3c/huawei)
output: CLI命令的原始输出
command_type: 命令类型 (uptime/cpu/mem等)
返回:
解析后的关键信息字符串
"""
result = "N/A"
try:
if command_type == 'uptime':
# 匹配不同格式的运行时间信息
patterns = [
r'(\d+)\s*year[^\d]*(\d+)\s*week[^\d]*(\d+)\s*day', # 年/周/天格式
r'(\d+)\s*day[s]?,\s*(\d+):(\d+)', # 天:小时:分钟格式
r'uptime\s+is\s+(\d+)\s+days' # 直接天数格式
]
for pattern in patterns:
match = re.search(pattern, output, re.I)
if match:
groups = match.groups()
if len(groups) == 3: # 年/周/天格式
years, weeks, days = map(int, groups)
total_days = years*365 + weeks*7 + days
elif len(groups) == 2: # 天:小时格式
days, hours = map(int, groups)
total_days = days
elif len(groups) == 1: # 直接天数
total_days = int(groups[0])
else:
continue
# 检查是否达到最小运行时间要求
status = "⚠️ Low" if total_days < THRESHOLDS['min_uptime'] else "✔️ Normal"
result = f"{status} ({total_days} days)"
break
elif command_type == 'cpu':
# 提取CPU使用率
patterns = {
'cisco': r'CPU utilization for five seconds: (\d+)%',
'default': r'[Cc][Pp][Uu] usage: (\d+)%'
}
pattern = patterns.get(vendor, patterns['default'])
match = re.search(pattern, output)
if match:
cpu_usage = int(match.group(1))
# 根据阈值标记状态
if cpu_usage > THRESHOLDS['cpu_critical']:
status = "🔴 Critical"
elif cpu_usage > THRESHOLDS['cpu_warning']:
status = "🟠 Warning"
else:
status = "🟢 Normal"
result = f"{status} ({cpu_usage}%)"
elif command_type == 'mem':
# 提取内存使用率
patterns = {
'cisco': r'Total:\s*\d+\s*(\d+)',
'default': r'Memory Using Percentage: (\d+)%'
}
pattern = patterns.get(vendor, patterns['default'])
match = re.search(pattern, output)
if match:
mem_usage = int(match.group(1))
# 根据阈值标记状态
status = "🔴 Critical" if mem_usage > THRESHOLDS['mem_warning'] else "🟢 Normal"
result = f"{status} ({mem_usage}%)"
elif command_type == 'temp':
# 检查温度状态
if any(keyword in output for keyword in ['OK', 'Normal', '正常']):
result = "🟢 Normal"
else:
# 提取所有温度值
matches = re.findall(r'\b(\d{2,3})[°cC]?\b', output)
if matches:
max_temp = max(map(int, matches))
# 根据阈值标记状态
status = "🟠 Warning" if max_temp > THRESHOLDS['temp_warning'] else "🟢 Normal"
result = f"{status} ({max_temp}°C)"
elif command_type in ('fans', 'power'):
# 检查风扇/电源状态
if any(keyword in output for keyword in ['OK', 'Normal', 'Present', '正常']):
result = "🟢 Normal"
else:
# 检测故障单元
failed = re.findall(r'(Fail|Absent|Error|Abnormal|Failed)', output, re.I)
if failed:
result = f"🔴 Failed ({len(failed)} units)"
except Exception as e:
logger.error(f"解析错误: {str(e)}")
return result
def check_critical_ports(ssh: paramiko.SSHClient, vendor: str, ports: str) -> str:
"""
检查关键端口状态
包括端口链路状态和错误计数
参数:
ssh: 已建立的SSH连接
vendor: 设备厂商
ports: 逗号分隔的端口列表
返回:
格式化后的端口状态报告
"""
results = []
port_list = [p.strip() for p in ports.split(',') if p.strip()]
if not port_list:
return "N/A"
for port in port_list:
try:
# 执行端口检查命令
cmd = COMMAND_TEMPLATES[vendor]['interfaces'].format(port=port)
_, stdout, _ = ssh.exec_command(cmd)
output = stdout.read().decode(errors='ignore')
# 判断端口状态
if re.search(r'(up|connected)', output, re.I) and not re.search(r'down|disable', output, re.I):
status = "🟢 Up"
elif re.search(r'not connect|disable|admin down', output, re.I):
status = "🟠 AdminDown"
else:
status = "🔴 Down"
# 提取错误计数
input_errors = re.search(r'(\d+)\s*input errors', output, re.I)
output_errors = re.search(r'(\d+)\s*output errors', output, re.I)
error_count = (int(input_errors.group(1)) if input_errors else 0) + \
(int(output_errors.group(1)) if output_errors else 0)
# 组装结果
port_result = f"{port}: {status}"
if error_count > 0:
port_result += f" | 🔴 Errors: {error_count}"
results.append(port_result)
except Exception as e:
logger.error(f"端口检查失败 {port}: {str(e)}")
results.append(f"{port}: ❌ Check Failed")
return "\n".join(results)
def inspect_switch(
ip: str,
username: str,
password: str,
vendor: str,
critical_ports: Optional[str]
) -> Dict[str, str]:
"""
执行单台交换机健康检查
连接设备并收集各项健康指标
参数:
ip: 交换机IP地址
username: SSH用户名
password: SSH密码
vendor: 设备厂商
critical_ports: 关键端口列表(逗号分隔)
返回:
包含检查结果的字典
"""
# 初始化结果字典
results = {
'IP': ip,
'Vendor': vendor.upper(),
'Uptime': '❌ Connection Failed',
'CPU Usage': 'N/A',
'Memory Usage': 'N/A',
'Temperature': 'N/A',
'Fans': 'N/A',
'Power': 'N/A',
'Critical Ports': 'N/A' if not critical_ports else 'Not Configured',
'Check Time': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
ssh = None
try:
# 建立SSH连接
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(ip,
username=username,
password=password,
timeout=15,
banner_timeout=30,
look_for_keys=False)
# 获取shell并禁用分页
shell = ssh.invoke_shell()
page_cmd = 'terminal length 0\n' if vendor == 'cisco' else 'screen-length disable\n'
shell.send(page_cmd)
time.sleep(1) # 等待命令执行
# 依次执行健康检查命令
for cmd_type in ['uptime', 'cpu', 'mem', 'temp', 'fans', 'power']:
cmd = COMMAND_TEMPLATES[vendor][cmd_type]
_, stdout, stderr = ssh.exec_command(cmd, timeout=20)
output = stdout.read().decode(errors='ignore')
error = stderr.read().decode(errors='ignore')
if error:
logger.warning(f"{ip} {cmd_type} command error: {error.strip()}")
# 解析并存储结果
results[cmd_type.title()] = parse_switch_output(vendor, output, cmd_type)
# 检查关键端口
if critical_ports:
results['Critical Ports'] = check_critical_ports(ssh, vendor, critical_ports)
# 更新连接状态
results['Uptime'] = results['Uptime'].replace('❌ Connection Failed', '🟢 Connected')
except paramiko.AuthenticationException:
logger.error(f"{ip} 认证失败,请检查用户名/密码")
results['Uptime'] = "❌ Authentication Failed"
except paramiko.SSHException as e:
logger.error(f"{ip} SSH连接错误: {str(e)}")
results['Uptime'] = f"❌ SSH Error: {str(e)}"
except Exception as e:
logger.error(f"{ip} 未知错误: {str(e)}")
results['Uptime'] = f"❌ Error: {str(e)}"
finally:
# 确保关闭SSH连接
if ssh:
ssh.close()
return results
def main():
"""主函数:读取设备清单,执行批量检查,生成报告"""
try:
# 读取设备清单
df_switches = pd.read_excel('switch_list.xlsx')
logger.info(f"成功读取 {len(df_switches)} 台交换机信息")
# 检查必要列是否存在
required_columns = ['IP', 'Username', 'Password', 'Vendor']
if not all(col in df_switches.columns for col in required_columns):
missing = set(required_columns) - set(df_switches.columns)
logger.error(f"缺少必要列: {', '.join(missing)}")
return
# 执行批量检查
inspection_results = []
for _, row in df_switches.iterrows():
logger.info(f"正在检查 {row['IP']} ({row['Vendor']})...")
result = inspect_switch(
ip=row['IP'],
username=row['Username'],
password=row['Password'],
vendor=row['Vendor'].lower(),
critical_ports=row.get('CriticalPorts', '')
)
inspection_results.append(result)
# 生成报告
df_results = pd.DataFrame(inspection_results)
# 重排序列顺序
column_order = [
'IP', 'Vendor', 'Check Time', 'Uptime',
'CPU Usage', 'Memory Usage', 'Temperature',
'Fans', 'Power', 'Critical Ports'
]
df_results = df_results[column_order]
# 保存结果
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = f'switch_inspection_report_{timestamp}.xlsx'
df_results.to_excel(output_file, index=False)
logger.info(f"\n巡检完成! 结果已保存至: {output_file}")
# 生成统计摘要
success_count = len([r for r in inspection_results if '🟢 Connected' in r['Uptime']])
failed_count = len(inspection_results) - success_count
logger.info("统计摘要:")
logger.info(f"- 成功检查: {success_count} 台")
logger.info(f"- 失败设备: {failed_count} 台")
# 输出关键问题设备
problem_devices = []
for device in inspection_results:
if any(('🔴' in str(device[k]) or '⚠️' in str(device[k])) for k in device):
problem_devices.append(device['IP'])
if problem_devices:
logger.warning(f"⚠️ 发现异常设备: {', '.join(problem_devices)}")
except FileNotFoundError:
logger.error("错误: switch_list.xlsx 文件不存在")
except Exception as e:
logger.error(f"主程序异常: {str(e)}", exc_info=True)
if __name__ == "__main__":
main()
注意事项
-
交换机配置要求:
- 确保交换机已启用SSH服务
- 网络可达目标交换机
- 使用的SSH账号有足够的权限执行检查命令
-
首次运行建议:
# 测试单台交换机连接
python -c "import paramiko; ssh = paramiko.SSHClient(); ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()); ssh.connect('192.168.1.1', username='admin', password='password'); print('连接成功')"
- 定时任务:
可配置为每日自动执行(Linux使用cron,Windows使用任务计划程序)
# 每天凌晨2点执行
0 2 * * * /usr/bin/python3 /path/to/switch_health_check.py
- venv环境:
可使用虚拟运行环境隔离版本和依赖
#cd path\to\your\project #进入项目文件夹
#python -m venv venv #生成venv虚拟环境
#.\venv\Scripts\activate #cmd命令行下激活venv环境
#pip install pandas paramiko openpyxl #为临时环境安装依赖包
#python switch_health_check_latest.py #虚拟VENV环境下运行
#deactivate #退出虚拟环境