用Python运维网络(3):paramiko windows
我的环境是在windows7 64位sp1环境下,python版本是3.6.6,paramiko的版本是2.4.2,安装方法很简单,这里就不赘述了。
需要强调的是:
在Windows里运行python脚本的方式主要有三种:
a. 左键双击脚本即可执行
b. 右键单击脚本,选择用IDLE编辑脚本,然后点击Run—>Run Module执行脚本。
c. 在CMD命令行里输入"python xxx.py"来执行文件
这里主要讲下第一种方法: 左键双击运行脚本后,你会看到一个“闪退”的CMD窗口(“闪退”很快,从窗口弹出到消失只有0.1-0.2秒的时间,肉眼刚刚能看到),根本看不到运行脚本后的结果,这是因为程序执行完后自动退出了,要让窗口停留,可以在代码最后放一个input()。
本文参考了 弈心:网络工程师的Python之路---进阶篇
案例1
案例背景:
单位有24口的华为3700交换机共1000台,分别分布在5个掩码为/24的C类网络子网下:
- 192.168.11.x /24
- 192.168.12.x /24
- 192.168.13.x /24
- 192.168.14.x /24
- 192.168.15.x /24
在之前网络环境的基础上,先给交换机增加上该段的地址和回程路由:
#!/usr/bin/env python
# _*_ coding: utf-8 _*_
@author: antenna
@contact: lilyef2000@gmail.com
@software:
@file: changeswitchmanageip.py
@time: 2019/3/29 9:07
@desc:
import paramiko
import time
import getpass
import socket
import re
from argparse import ArgumentParser
if __name__ == '__main__':
parser = ArgumentParser(description='Excute command in cmd_file to switch in ip_file')
parser.add_argument('ipfile', metavar='ipfile', help='your ip_file')
args = parser.parse_args()
ip_file = args.ipfile
username = input('Username: ')
password = getpass.getpass('password: ')
with open(ip_file, "r") as f:
ip_list = f.readlines()
switch_with_authentication_issue = []
switch_not_reachable = []
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
for ip_txt in ip_list:
try:
ip = ip_txt.strip()
ssh_client.connect(hostname=ip, username=username, password=password, look_for_keys=False)
print("You have successfully connect to {}".format(ip))
# 找出ip地址第四位数字
pattern_ip = r"((25[0-5]|2[0-4]\d|[01]?\d?\d)\.){3}(25[0-5]|2[0-4]\d|[01]?\d?\d)" # 精确匹配ip地址的正则
findip = re.compile(pattern_ip)
match = findip.search(ip)
ip4 = match.group(3) if match else None
command = ssh_client.invoke_shell()
command.send("system-view\n")
cmd = "vlan {}\n".format(ip4)
command.send(cmd)
command.send("quit\n")
cmd = "interface Vlanif {}\n".format(ip4)
command.send(cmd)
cmd = "ip address 192.168.{}.253 24\n".format(ip4)
command.send(cmd)
command.send("quit\n")
command.send("ip route-static 192.168.56.0 24 192.168.{}.1\n".format(ip4))
command.send("quit\n")
command.send("save\n")
command.send("Y\n")
command.send("\n")
time.sleep(1)
output = command.recv(65535)
print(output.decode(encoding='utf-8'))
except paramiko.ssh_exception.AuthenticationException:
print("User authentication failed for " + ip + ".")
switch_with_authentication_issue.append(ip)
except socket.error:
print(ip + " is not reachable.")
switch_not_reachable.append(ip)
ssh_client.close()
有关正则的参考资料: https:// zhuanlan.zhihu.com/p/30 833552
修改完后五个段内分别有五台交换机:
- 192.168.11.253
- 192.168.12.253
- 192.168.13.253
- 192.168.14.253
- 192.168.15.253
案例需求:
在不借助任何NMS软件或网络安全工具的帮助的前提下,使用Python脚本依次ping所有交换机的管理IP地址,来确定当前有哪些交换机可达,并且统计当前每个交换机有多少终端物理端口是UP的(级联端口不算),以及1000台交换机所有UP的终端端物理端口的总数,并统计网络里的端口使用率(也就是端口的up率)。
案例思路:
根据需求我们可以写两个脚本,第一个脚本用来ping5个网段下所有交换机的管理IP,因为掩码是/24,IP地址的最后一位我们可以指定python来ping .1到.254,然后将所有可达的交换机IP写入并保存在一个名为reachable_ip.txt的文本文件中。
之后,写第二个脚本来读取该文本文件中所保存的IP地址,依次登录所有这些可达的交换机,输入命令dis int brief | i up命令查看有哪些端口是up的,再配合re这个模块(正则表达式),来匹配我们所要的用户端物理端口号(Enx/x/x),统计它们的总数,即可得到当前一个交换机有多少物理端口是up的。 (注:因为dis int brief | i up的结果里也会出现虚拟端口,比如vlan或者loopback端口,所以这里强调的是用正则表达式来匹配用户端物理端口Enx/x/x)
代码:
1.advance1_1.py # ping几个网段,查找可达的ip地址,记录到reachable_ip.txt中
#!/usr/bin/env python
# _*_ coding: utf-8 _*_
@author: antenna
@contact: lilyef2000@gmail.com
@software:
@file: advance1_1.py
@time: 2019/3/28 20:31
@desc:
import subprocess
import os
class Ping(object):
def __init__(self):
self.third_octect = range(11, 16)
self.last_octect = range(253, 254)
self.f = None
self.ping_result = None
self.ip = None
def ping(self):
self.remove_last_reachable_ip_file_exist()
for ip3 in self.third_octect:
for ip4 in self.last_octect:
self.ip = '192.168.' + str(ip3) + '.' + str(ip4)
self.ping_result = subprocess.call(['ping', '-n', '2', '-w', '2', self.ip])
print("ping {} result:{}".format(self.ip, self.ping_result))
self.open_ip_record_file()
self.check_ping_result()
self.f.close()
def open_ip_record_file(self):
self.f = open('reachable_ip.txt', 'a')
def check_ping_result(self):
if self.ping_result == 0:
self.f.write(self.ip + "\n")
def remove_last_reachable_ip_file_exist(self):
if os.path.exists('reachable_ip.txt'):
os.remove('reachable_ip.txt')
if __name__ == '__main__':
script1_1 = Ping()
script1_1.ping()
input()
代码讲解:
- import subprocess模块,用来ping交换机的管理IP地址。
- import os模块,第一次运行脚本时,通过open()函数的追加模式(也就是参数a),把所有可达的交换机管理IP地址依次写入reachable_ip.txt文件中(脚本中的 open_ip_record_file(self) )。第二次运行脚本 , 所有可达的IP地址又会被继续以追加的形式写入reachable_ip.txt文件中,这样的话显得可能干扰判断结果,所以使用os.path.exists来判断reachable_ip.txt这个文件是否存在,如果存在的话就将它删除,这样可以保证每次运行脚本时,reachabe_ip.txt这个文件里保存本次运行脚本后所有可达的IP地址。由 remove_last_reachable_ip_file_exsit(self) 方法实现。
- 因为我们要依次ping 192.168.11.x, 192.168.12.x, 192.168.13.x, 192.168.14.x, 192.168.15.x这五个网段的IP,它们是有规律可循的,第三字段是从11-15, 所以这里我们用range(11,16)创建一个包含数字11-15的列表,并把它赋值给 third _ octect 这个变量,第四字段我们要从1 ping到254, 所以又用range(1,255)创建第二个列表并把它赋值给 last_octect 这个变量。
- 配合subprocess.call来ping所有这些IP, subprocess.call(['ping','-n','2','-w','2',self.ip]) 中的"-n"和"-w"是Windows里的ping命令的参数,表示每个IP只ping两次,每次最多等待两秒钟。
- 注意subprocess.call会返回命令执行状态,0表示命令执行成功(未必代表该ip存在,后面看实例)。所以下面的 check_ping_result(self) 方法用来做判断,如果返回的值是0 ( if self.ping_result == 0: ),则将它写入reachable_ip.txt文件中reachable_ip.txt( self.f.write(self.ip + "\n") )
一个坑:返回0,但是并未ping成功的例子:
测试环境:
测试代码:
import subprocess
ip = '192.168.11.101'
ping_result = subprocess.call(['ping', '-n', '2', '-w', '2', ip])
print("ping {} result:{}".format(ip, ping_result))
ip = '192.168.12.101'
ping_result = subprocess.call(['ping', '-n', '2', '-w', '2', ip])
print("ping {} result:{}".format(ip, ping_result))
input()
我在上图的路由中删除L3_1交换机中vlan11地址,也就是192.168.11.0所在网段的网关地址:
运行结果:
重新设置L3_1交换机中vlan11地址:
运行结果:
所以说subprocess.call返回的命令执行状态为0只表示命令执行成功,至于目标ip是否可达还要再分析。
2.advance1_2.py # 读取advance1_1.py的运行结果文件reachable_ip.txt,并尝试连接到其对应的交换机上,分析端口使用率。
#!/usr/bin/env python
# _*_ coding: utf-8 _*_
@author: antenna
@contact: lilyef2000@gmail.com
@software:
@file: advance1_2.py
@time: 2019/3/28 20:48
@desc:
import paramiko
import time
import re
import socket
from datetime import datetime
now = datetime.now()
date = "%s-%s-%s" % (now.month, now.day, now.year)
time_now = "%s:%s:%s" % (now.hour, now.minute, now.second)
class PortStatistics(object):
def __init__(self):
self.switch_with_authentication_issue = []
self.switch_not_reachable = []
self.total_number_of_up_port = 0
self.iplist = None
self.number_of_switch = None
self.ssh_client = paramiko.SSHClient()
self.ip = None
self.command = None
self.search_up_port = None
self.number_of_up_port = None
self.total_number_of_ports = None
self.ssh_login()
self.summary()
def ssh_login(self):
with open('reachable_ip.txt', 'r') as file:
ip_list = file.readlines()
self.number_of_switch = len(ip_list)
for line in ip_list:
try:
self.ip = line.strip()
self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.ssh_client.connect(hostname=self.ip, username='python', password='123', look_for_keys=False)
print("\nYou have successfully connect to {}".format(self.ip))
self.command = self.ssh_client.invoke_shell()
self.check_up_port()
except paramiko.ssh_exception.AuthenticationException:
print("User authentication failed for " + self.ip + ".")
self.switch_with_authentication_issue.append(self.ip)
except socket.error:
print(self.ip + " is not reachable.")
self.switch_not_reachable.append(self.ip)
def check_up_port(self):
self.command.send('display interface brief | include up\n') # or 'dis int b | i up' 显示up状态的接口
# 'display ip interface brief | include up\n' or 'dis ip int b | i up\n' 显示配置了ip地址的up状态的接口
time.sleep(1)
output = self.command.recv(65535)
self.ssh_client.close()
output = output.decode(encoding='utf-8')
print(output)
self.search_up_port = re.findall(r'Ethernet', output)
self.number_of_up_port = len(self.search_up_port)
print(self.ip + " has " + str(self.number_of_up_port) + " ports up.")
self.total_number_of_up_port += self.number_of_up_port
def summary(self):
self.total_number_of_ports = self.number_of_switch * 24
dis_str = "\n"
dis_str += "There are totally " + str(self.total_number_of_ports) + " ports available in the network."
dis_str
+= str(self.total_number_of_up_port) + " ports are currently up."
dis_str += "Port up rate is %.2f%%" % (self.total_number_of_up_port / float(self.total_number_of_ports) * 100)
print(dis_str)
print('\nUser authentication failed for below switches: ')
for i in self.switch_with_authentication_issue:
print(i)
print('\nBelow switches are not reachable: ')
for i in self.switch_not_reachable:
print(i)
with open(date + ".txt", "a+") as f:
f.write('As of ' + date + " " + time_now)
f.write("\n\nThere are totally " + str(self.total_number_of_ports) + " ports available in the network.")
f.write("\n" + str(self.total_number_of_up_port) + " ports are currently up.")
f.write("\nPort up rate is %.2f%%" %
(self.total_number_of_up_port / float(self.total_number_of_ports) * 100))
f.write("\n***************************************************************\n\n")
if __name__ == '__main__':
script1_2 = PortStatistics()
input()
代码讲解:
- import datetime模块用来记录每次运行代码的时间。
- 记录当前时间可以调用datetime.now()方法,将它赋值给now,datetime.now()含.year()(年)、.month()(月)、.day()(日)、.hour()(时)、.minute()(分)、.second()(秒)几个子方法,将“月-日-年”赋值给date变量,将“时:分:秒”赋值给time_now变量。
- total_number_of_up_port = 0, 先将总up端口数设为0,后面再用累加的方法统计。
- self.number_of_up_port = len(self.search_up_port) ,很简单明了,现在 self.number_of_up_port 这个变量代表的就是一个交换机有多少个GigabitEthernet或Ethernet端口是up的。 然后将每个交换机有多少端口是up的打印出来。
- 最后通过 self.total_number_of_up_port += self.number_of_up_port 来累加在整个网路里总共有多少端口是up的。
- summary()方法里,除了将统计信息各种打印出来外,我们还将另外创建一个文件,将运行脚本时的日子作为该脚本的名字,将统计信息写入进去,方便我们调阅查看, 注意写入的内容里面有f.write('As of ' + date + " " + time_now),这样还可以清晰直观的看到我们是哪一天,几时几分几秒运行的脚本。
- 为什么要用日期名作为文件名?这样做的好处是一旦运行脚本时的日期不同,脚本就会自动创建一个新的文件,比如2018年6月16号运行了一次脚本,Python创建了一个名为 6-16-2018.txt 的文件,如果第二天再运行一次脚本,Python又会创建一个名为 6-17-2018.txt 的文件。如果在同一天里数次运行脚本,则多次运行的结果会以追加的形式写进同一个.txt文件,不会创建新文件。可以配合Windows的Task Scheduler或者Linux的Crontab来定期自动执行脚本,每天自动生成当天的端口使用量的统计情况,方便管理员随时观察网络里交换机的端口使用情况。
执行结果:
advance1_1.py执行结果
advance1_2.py执行结果
C:\Users\Administrator\Envs\py3NetworkProgramming\Scripts\python.exe D:/personal/网络工程师的python之路/advance1_2.py
You have successfully connect to 192.168.11.253
Info: The max number of VTY users is 5, and the number
of current VTY users on line is 1.
The current login time is 2019-03-29 11:19:41.
<S3700-1>display interface brief | include up
PHY: Physical
*down: administratively down
(l): loopback
(s): spoofing
(b): BFD down
(e): ETHOAM down
(dl): DLDP down
(d): Dampening Suppressed
InUti/OutUti: input utility/output utility
Interface PHY Protocol InUti OutUti inErrors outErrors
Ethernet0/0/1 up up 0% 0% 0 0
GigabitEthernet0/0/1 up up 0% 0% 0 0
NULL0 up up(s) 0% 0% 0 0
Vlanif1 up down -- -- 0 0
Vlanif10 up up -- -- 0 0
Vlanif11 up up -- -- 0 0
<S3700-1>
192.168.11.253 has 2 ports up.
192.168.12.253 has 2 ports up.
192.168.13.253 has 2 ports up.
192.168.14.253 has 2 ports up.
192.168.15.253 has 3 ports up.
There are totally 120 ports available in the network.11 ports are currently up.Port up rate is 9.17%
User authentication failed for below switches:
Below switches are not reachable:
案例2:
案例背景:
案例1中的交换机型号为3700的交换机配置了多个管理ip地址,尝试确认交换机的型号以后,再删除掉其vlan10上的地址,并确认修改成功。
案例思路:
- 删除管理vlan 10上的管理ip之前要先确认交换机型号为3700,使用display version 查看版本信息,使用正则匹配证实是3700后,进行删除多余ip操作,同时注意及时保存重启。
- 重启后验证的方式有两个,一是直接登录验证,二是查询ip是否可达。
案例代码:
#!/usr/bin/env python
# _*_ coding: utf-8 _*_
@author: antenna
@contact: lilyef2000@gmail.com
@software:
@file: advance2_1.py
@time: 2019/3/29 11:45
@desc:
import paramiko
import re
import socket
import time
username = 'python'
password = '123'
with open('ip_list.txt', 'r+') as f:
iplist = f.readlines()
switch_ipremoved = []
switch_not_ipremoved = []
switch_with_authentication_issue = []
switch_not_reachable = []
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
for line in iplist:
try:
ip_address = line.strip()
ssh_client.connect(hostname=ip_address, username=username, password=password, timeout=2, look_for_keys=False)
print("Successfully connect to {}".format(ip_address))
command = ssh_client.invoke_shell()
command.send("display version\n")
time.sleep(1)
output = command.recv(
65535)
output = output.decode(encoding='utf-8')
switch_model = re.search(r'(S\d.*?-.*?-[A-Z]+)', output)
vrp_version = re.search(r'\((S.*?)\)', output)
print("switch model:{} vrp version:{}".format(switch_model.group(1), vrp_version.group(1)))
if switch_model.group(1) == 'S3700-26C-HI' and vrp_version.group(1) == 'S3700 V200R001C00': # 型号和软件版本都正确
command.send("display interface vlan10\n")
time.sleep(1)
output = command.recv(65535)
output = output.decode(encoding='utf-8')
ip_vlan10 = re.search(r'Internet Address is ((\d+\.){3}\d+)/(\d+)', output)
if ip_vlan10: # 如果发现配置了vlan 10的管理地址,则将其删除保存并重启
print("vlan 10 ip address:{}".format(ip_vlan10.group()))
command.send("system-view\n")
command.send("interface Vlanif 10\n")
command.send("undo ip address {} {}\n".format(ip_vlan10.group(1), ip_vlan10.group(3)))
command.send("quit\n")
command.send("quit\n")
command.send("save\n")
command.send("Y\n")
command.send("reboot\n")
command.send("Y\n")
time.sleep(1)
output = command.recv(65535)
output = output.decode(encoding='utf-8')
print("find vlan 10 ip address, removing.")
switch_ipremoved.append(ip_address)
else:
print("not find vlan 10 ip address.")
switch_not_ipremoved.append(ip_address)
except paramiko.ssh_exception.AuthenticationException:
print('Error: User authentication failed for below switches: {}'.format(ip_address))
switch_with_authentication_issue.append(ip_address)
except socket.error:
print("Error: {} is not reachable.".format(ip_address))
switch_not_reachable.append(ip_address)
ssh_client.close()
print('\nUser authentication failed for below switches: ')
for i in switch_with_authentication_issue:
print(i)
print('\nBelow switches are not reachable: ')
for i in switch_not_reachable:
print(i)
print('\nBelow switches vlan ip address are removed in this process: ')
for i in switch_ipremoved:
print(i)
print('\nBelow switches vlan ip address are not removed in this process: ')
for i in switch_not_ipremoved:
print(i)
执行结果:
C:\py3NetworkProgramming\python advance2_1.py
Successfully connect to 192.168.11.253
switch model:S3700-26C-HI vrp version:S3700 V200R001C00
not find vlan 10 ip address.
Successfully connect to 192.168.12.253
switch model:S3700-26C-HI vrp version:S3700 V200R001C00
vlan 10 ip address:Internet Address is 192.168.56.12/24
find vlan 10 ip address, removing.
Error: 192.168.13.253 is not reachable.
Successfully connect to 192.168.56.10
switch model:S5700-28C-HI vrp version:S5700 V200R001C00
Error: User authentication failed for below switches: 192.168.15.253
User authentication failed for below switches:
192.168.15.253
Below switches are not reachable:
192.168.13.253
Below switches vlan ip address are removed in this process:
192.168.12.253
Below switches vlan ip address are not removed in this process:
192.168.11.253
Process finished with exit code 0
代码讲解:
从执行结果上可以看到,几种结果都测试到了:
- User authentication failed
- not reachable
- vlan 10 ip 存在
- vlan 10 ip 不存在
实现的关键是使用正则匹配所需要的关键字:
- if switch_model.group(1) == 'S3700-26C-HI' and vrp_version.group(1) == 'S3700 V200R001C00': 保证交换机型号是3700,并且vrp版本符合要求才进行下一步的删除vlan 10 ip的操作
- ip_vlan10 = re.search(r'Internet Address is ((\d+\.){3}\d+)/(\d+)', output) if ip_vlan10: # 如果发现配置了vlan 10的管理地址,则将其删除保存并重启
- switch_ipremoved和switch_not_ipremoved只是表明本次处理是否进行了删除ip操作,并不是实际的状态
- 为了尝试正则匹配的有效性,在第4个ip设置了一台S5700:switch model:S5700-28C-HI vrp version:S5700 V200R001C00
- 192.168.13.253 not reachable的原因是我的win7运行在192.168.56.0/24 ip地址段,与其网关可以正常通信,但该交换机上未配置回程路由: ip route-static 192.168.56.0 255.255.255.0 192.168.13.1
代码优化
这两个案例的代码可以进一步优化,比如加入日志记录模块可以在程序出现问题时及时记录下日志:
import logging
logging.basicConfig( level=logging.INFO, format='%(asctime)s, %(levelname)s - %(message)s')
#logging.basicConfig(filename='myProgramLog.txt', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.info('toss is %s, guess is %s' % (toss, guess))
还可以加入colorama模块使输出的不同状态用不同颜色表示:
#!/usr/bin/env python
# _*_ coding: utf-8 _*_
@author: antenna
@contact: lilyef2000@gmail.com
@software:
@file: color_test.py
@time: 2019/3/9 21:43
@desc:
# 参考 https://blog.csdn.net/qianghaohao/article/details/52117082
# -----------------colorama模块的一些常量---------------------------
# Fore: BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, RESET.
# Back: BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, RESET.
# Style: DIM, NORMAL, BRIGHT, RESET_ALL
from colorama import init, Fore, Back, Style
init(autoreset=True)
class Colored(object):
# 前景色:红色 背景色:默认
def red(self, s):
return Fore.RED + s + Fore.RESET
# 前景色:绿色 背景色:默认
def green(self, s):
return Fore.GREEN + s + Fore.RESET
# 前景色:黄色 背景色:默认
def yellow(self, s):
return Fore.YELLOW + s + Fore.RESET
# 前景色:蓝色 背景色:默认
def blue(self, s):
return Fore.BLUE + s + Fore.RESET
# 前景色:洋红色 背景色:默认
def magenta(self, s):
return Fore.MAGENTA + s + Fore.RESET
# 前景色:青色 背景色:默认
def cyan(self, s):
return Fore.CYAN + s + Fore.RESET
# 前景色:白色 背景色:默认
def white(self, s):
return Fore.WHITE + s + Fore.RESET
# 前景色:黑色 背景色:默认
def black(self, s):
return Fore.BLACK
# 前景色:白色 背景色:绿色
def white_green(self, s):
return Fore.WHITE + Back.GREEN + s + Fore.RESET + Back.RESET
color = Colored()
print(color.red('I am red!'))