相关文章推荐
听话的棒棒糖  ·  Spring ...·  2 年前    · 

用Python运维网络(3):paramiko windows

我的环境是在windows7 64位sp1环境下,python版本是3.6.6,paramiko的版本是2.4.2,安装方法很简单,这里就不赘述了。

需要强调的是:

在Windows里运行python脚本的方式主要有三种:

a. 左键双击脚本即可执行

b. 右键单击脚本,选择用IDLE编辑脚本,然后点击Run—>Run Module执行脚本。

c. 在CMD命令行里输入"python xxx.py"来执行文件

这里主要讲下第一种方法: 左键双击运行脚本后,你会看到一个“闪退”的CMD窗口(“闪退”很快,从窗口弹出到消失只有0.1-0.2秒的时间,肉眼刚刚能看到),根本看不到运行脚本后的结果,这是因为程序执行完后自动退出了,要让窗口停留,可以在代码最后放一个input()。

本文参考了 弈心:网络工程师的Python之路---进阶篇

案例1

案例背景:

单位有24口的华为3700交换机共1000台,分别分布在5个掩码为/24的C类网络子网下:

  • 192.168.11.x /24
  • 192.168.12.x /24
  • 192.168.13.x /24
  • 192.168.14.x /24
  • 192.168.15.x /24

在之前网络环境的基础上,先给交换机增加上该段的地址和回程路由:

#!/usr/bin/env python
# _*_ coding: utf-8 _*_
 @author: antenna
 @contact: lilyef2000@gmail.com
 @software: 
 @file: changeswitchmanageip.py
 @time: 2019/3/29 9:07
 @desc:
import paramiko
import time
import getpass
import socket
import re
from argparse import ArgumentParser
if __name__ == '__main__':
    parser = ArgumentParser(description='Excute command in cmd_file to switch in ip_file')
    parser.add_argument('ipfile', metavar='ipfile', help='your ip_file')
    args = parser.parse_args()
    ip_file = args.ipfile
    username = input('Username: ')
    password = getpass.getpass('password: ')
    with open(ip_file, "r") as f:
        ip_list = f.readlines()
    switch_with_authentication_issue = []
    switch_not_reachable = []
    ssh_client = paramiko.SSHClient()
    ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    for ip_txt in ip_list:
        try:
            ip = ip_txt.strip()
            ssh_client.connect(hostname=ip, username=username, password=password, look_for_keys=False)
            print("You have successfully connect to {}".format(ip))
            # 找出ip地址第四位数字
            pattern_ip = r"((25[0-5]|2[0-4]\d|[01]?\d?\d)\.){3}(25[0-5]|2[0-4]\d|[01]?\d?\d)"  # 精确匹配ip地址的正则
            findip = re.compile(pattern_ip)
            match = findip.search(ip)
            ip4 = match.group(3) if match else None
            command = ssh_client.invoke_shell()
            command.send("system-view\n")
            cmd = "vlan {}\n".format(ip4)
            command.send(cmd)
            command.send("quit\n")
            cmd = "interface Vlanif {}\n".format(ip4)
            command.send(cmd)




    

            cmd = "ip address 192.168.{}.253 24\n".format(ip4)
            command.send(cmd)
            command.send("quit\n")
            command.send("ip route-static 192.168.56.0 24 192.168.{}.1\n".format(ip4))
            command.send("quit\n")
            command.send("save\n")
            command.send("Y\n")
            command.send("\n")
            time.sleep(1)
            output = command.recv(65535)
            print(output.decode(encoding='utf-8'))
        except paramiko.ssh_exception.AuthenticationException:
            print("User authentication failed for " + ip + ".")
            switch_with_authentication_issue.append(ip)
        except socket.error:
            print(ip + " is not reachable.")
            switch_not_reachable.append(ip)
    ssh_client.close()  

有关正则的参考资料: zhuanlan.zhihu.com/p/30

修改完后五个段内分别有五台交换机:

  • 192.168.11.253
  • 192.168.12.253
  • 192.168.13.253
  • 192.168.14.253
  • 192.168.15.253

案例需求:

在不借助任何NMS软件或网络安全工具的帮助的前提下,使用Python脚本依次ping所有交换机的管理IP地址,来确定当前有哪些交换机可达,并且统计当前每个交换机有多少终端物理端口是UP的(级联端口不算),以及1000台交换机所有UP的终端端物理端口的总数,并统计网络里的端口使用率(也就是端口的up率)。

案例思路:

根据需求我们可以写两个脚本,第一个脚本用来ping5个网段下所有交换机的管理IP,因为掩码是/24,IP地址的最后一位我们可以指定python来ping .1到.254,然后将所有可达的交换机IP写入并保存在一个名为reachable_ip.txt的文本文件中。

之后,写第二个脚本来读取该文本文件中所保存的IP地址,依次登录所有这些可达的交换机,输入命令dis int brief | i up命令查看有哪些端口是up的,再配合re这个模块(正则表达式),来匹配我们所要的用户端物理端口号(Enx/x/x),统计它们的总数,即可得到当前一个交换机有多少物理端口是up的。 (注:因为dis int brief | i up的结果里也会出现虚拟端口,比如vlan或者loopback端口,所以这里强调的是用正则表达式来匹配用户端物理端口Enx/x/x)

代码:

1.advance1_1.py # ping几个网段,查找可达的ip地址,记录到reachable_ip.txt中

#!/usr/bin/env python
# _*_ coding: utf-8 _*_
 @author: antenna
 @contact: lilyef2000@gmail.com
 @software: 
 @file: advance1_1.py
 @time: 2019/3/28 20:31
 @desc:
import subprocess
import os
class Ping(object):
    def __init__(self):
        self.third_octect = range(11, 16)
        self.last_octect = range(253, 254)
        self.f = None
        self.ping_result = None
        self.ip = None
    def ping(self):
        self.remove_last_reachable_ip_file_exist()
        for ip3 in self.third_octect:
            for ip4 in self.last_octect:
                self.ip = '192.168.' + str(ip3) + '.' + str(ip4)
                self.ping_result = subprocess.call(['ping', '-n', '2', '-w', '2', self.ip])
                print("ping {} result:{}".format(self.ip, self.ping_result))
                self.open_ip_record_file()
                self.check_ping_result()
        self.f.close()
    def open_ip_record_file(self):
        self.f = open('reachable_ip.txt', 'a')
    def check_ping_result(self):
        if self.ping_result == 0:
            self.f.write(self.ip + "\n")
    def remove_last_reachable_ip_file_exist(self):
        if os.path.exists('reachable_ip.txt'):
            os.remove('reachable_ip.txt')
if __name__ == '__main__':
    script1_1 = Ping()
    script1_1.ping()
    input()

代码讲解:

  • import subprocess模块,用来ping交换机的管理IP地址。
  • import os模块,第一次运行脚本时,通过open()函数的追加模式(也就是参数a),把所有可达的交换机管理IP地址依次写入reachable_ip.txt文件中(脚本中的 open_ip_record_file(self) )。第二次运行脚本 所有可达的IP地址又会被继续以追加的形式写入reachable_ip.txt文件中,这样的话显得可能干扰判断结果,所以使用os.path.exists来判断reachable_ip.txt这个文件是否存在,如果存在的话就将它删除,这样可以保证每次运行脚本时,reachabe_ip.txt这个文件里保存本次运行脚本后所有可达的IP地址。由 remove_last_reachable_ip_file_exsit(self) 方法实现。
  • 因为我们要依次ping 192.168.11.x, 192.168.12.x, 192.168.13.x, 192.168.14.x, 192.168.15.x这五个网段的IP,它们是有规律可循的,第三字段是从11-15, 所以这里我们用range(11,16)创建一个包含数字11-15的列表,并把它赋值给 third _ octect 这个变量,第四字段我们要从1 ping到254, 所以又用range(1,255)创建第二个列表并把它赋值给 last_octect 这个变量。
  • 配合subprocess.call来ping所有这些IP, subprocess.call(['ping','-n','2','-w','2',self.ip]) 中的"-n"和"-w"是Windows里的ping命令的参数,表示每个IP只ping两次,每次最多等待两秒钟。
  • 注意subprocess.call会返回命令执行状态,0表示命令执行成功(未必代表该ip存在,后面看实例)。所以下面的 check_ping_result(self) 方法用来做判断,如果返回的值是0 ( if self.ping_result == 0: ),则将它写入reachable_ip.txt文件中reachable_ip.txt( self.f.write(self.ip + "\n")

一个坑:返回0,但是并未ping成功的例子:

测试环境:

测试代码:

import subprocess
ip = '192.168.11.101'
ping_result = subprocess.call(['ping', '-n', '2', '-w', '2', ip])
print("ping {} result:{}".format(ip, ping_result))
ip = '192.168.12.101'
ping_result = subprocess.call(['ping', '-n', '2', '-w', '2', ip])
print("ping {} result:{}".format(ip, ping_result))
input()

我在上图的路由中删除L3_1交换机中vlan11地址,也就是192.168.11.0所在网段的网关地址:

运行结果:

重新设置L3_1交换机中vlan11地址:

运行结果:

所以说subprocess.call返回的命令执行状态为0只表示命令执行成功,至于目标ip是否可达还要再分析。

2.advance1_2.py # 读取advance1_1.py的运行结果文件reachable_ip.txt,并尝试连接到其对应的交换机上,分析端口使用率。

#!/usr/bin/env python
# _*_ coding: utf-8 _*_
 @author: antenna
 @contact: lilyef2000@gmail.com
 @software: 
 @file: advance1_2.py
 @time: 2019/3/28 20:48
 @desc:
import paramiko
import time
import re
import socket
from datetime import datetime
now = datetime.now()
date = "%s-%s-%s" % (now.month, now.day, now.year)
time_now = "%s:%s:%s" % (now.hour, now.minute, now.second)
class PortStatistics(object):
    def __init__(self):
        self.switch_with_authentication_issue = []
        self.switch_not_reachable = []
        self.total_number_of_up_port = 0
        self.iplist = None
        self.number_of_switch = None
        self.ssh_client = paramiko.SSHClient()
        self.ip = None
        self.command = None
        self.search_up_port = None
        self.number_of_up_port = None
        self.total_number_of_ports = None
        self.ssh_login()
        self.summary()
    def ssh_login(self):
        with open('reachable_ip.txt', 'r') as file:
            ip_list = file.readlines()
        self.number_of_switch = len(ip_list)
        for line in ip_list:
            try:
                self.ip = line.strip()
                self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
                self.ssh_client.connect(hostname=self.ip, username='python', password='123', look_for_keys=False)
                print("\nYou have successfully connect to {}".format(self.ip))
                self.command = self.ssh_client.invoke_shell()
                self.check_up_port()
            except paramiko.ssh_exception.AuthenticationException:
                print("User authentication failed for " + self.ip + ".")
                self.switch_with_authentication_issue.append(self.ip)
            except socket.error:
                print(self.ip + " is not reachable.")
                self.switch_not_reachable.append(self.ip)
    def check_up_port(self):
        self.command.send('display interface brief | include up\n')  # or 'dis int b | i up' 显示up状态的接口
        # 'display ip interface brief | include up\n' or 'dis ip int b | i up\n' 显示配置了ip地址的up状态的接口
        time.sleep(1)
        output = self.command.recv(65535)
        self.ssh_client.close()
        output = output.decode(encoding='utf-8')
        print(output)
        self.search_up_port = re.findall(r'Ethernet', output)
        self.number_of_up_port = len(self.search_up_port)
        print(self.ip + " has " + str(self.number_of_up_port) + " ports up.")
        self.total_number_of_up_port += self.number_of_up_port
    def summary(self):
        self.total_number_of_ports = self.number_of_switch * 24
        dis_str = "\n"
        dis_str += "There are totally " + str(self.total_number_of_ports) + " ports available in the network."
        dis_str




    
 += str(self.total_number_of_up_port) + " ports are currently up."
        dis_str += "Port up rate is %.2f%%" % (self.total_number_of_up_port / float(self.total_number_of_ports) * 100)
        print(dis_str)
        print('\nUser authentication failed for below switches: ')
        for i in self.switch_with_authentication_issue:
            print(i)
        print('\nBelow switches are not reachable: ')
        for i in self.switch_not_reachable:
            print(i)
        with open(date + ".txt", "a+") as f:
            f.write('As of ' + date + " " + time_now)
            f.write("\n\nThere are totally " + str(self.total_number_of_ports) + " ports available in the network.")
            f.write("\n" + str(self.total_number_of_up_port) + " ports are currently up.")
            f.write("\nPort up rate is %.2f%%" % 
                    (self.total_number_of_up_port / float(self.total_number_of_ports) * 100))
            f.write("\n***************************************************************\n\n")
if __name__ == '__main__':
    script1_2 = PortStatistics()
    input()

代码讲解:

  • import datetime模块用来记录每次运行代码的时间。
  • 记录当前时间可以调用datetime.now()方法,将它赋值给now,datetime.now()含.year()(年)、.month()(月)、.day()(日)、.hour()(时)、.minute()(分)、.second()(秒)几个子方法,将“月-日-年”赋值给date变量,将“时:分:秒”赋值给time_now变量。
  • total_number_of_up_port = 0, 先将总up端口数设为0,后面再用累加的方法统计。
  • self.number_of_up_port = len(self.search_up_port) ,很简单明了,现在 self.number_of_up_port 这个变量代表的就是一个交换机有多少个GigabitEthernet或Ethernet端口是up的。 然后将每个交换机有多少端口是up的打印出来。
  • 最后通过 self.total_number_of_up_port += self.number_of_up_port 来累加在整个网路里总共有多少端口是up的。
  • summary()方法里,除了将统计信息各种打印出来外,我们还将另外创建一个文件,将运行脚本时的日子作为该脚本的名字,将统计信息写入进去,方便我们调阅查看, 注意写入的内容里面有f.write('As of ' + date + " " + time_now),这样还可以清晰直观的看到我们是哪一天,几时几分几秒运行的脚本。
  • 为什么要用日期名作为文件名?这样做的好处是一旦运行脚本时的日期不同,脚本就会自动创建一个新的文件,比如2018年6月16号运行了一次脚本,Python创建了一个名为 6-16-2018.txt 的文件,如果第二天再运行一次脚本,Python又会创建一个名为 6-17-2018.txt 的文件。如果在同一天里数次运行脚本,则多次运行的结果会以追加的形式写进同一个.txt文件,不会创建新文件。可以配合Windows的Task Scheduler或者Linux的Crontab来定期自动执行脚本,每天自动生成当天的端口使用量的统计情况,方便管理员随时观察网络里交换机的端口使用情况。

执行结果:

advance1_1.py执行结果

advance1_2.py执行结果

C:\Users\Administrator\Envs\py3NetworkProgramming\Scripts\python.exe D:/personal/网络工程师的python之路/advance1_2.py
You have successfully connect to 192.168.11.253
Info: The max number of VTY users is 5, and the number
      of current VTY users on line is 1.
      The current login time is 2019-03-29 11:19:41.
<S3700-1>display interface brief | include up
PHY: Physical
*down: administratively down
(l): loopback
(s): spoofing
(b): BFD down
(e): ETHOAM down
(dl): DLDP down
(d): Dampening Suppressed
InUti/OutUti: input utility/output utility
Interface                   PHY   Protocol InUti OutUti   inErrors  outErrors
Ethernet0/0/1               up    up          0%     0%          0          0
GigabitEthernet0/0/1        up    up          0%     0%          0          0
NULL0                       up    up(s)       0%     0%          0          0
Vlanif1                     up    down        --     --          0          0
Vlanif10                    up    up          --     --          0          0
Vlanif11                    up    up          --     --          0          0
<S3700-1>
192.168.11.253 has 2 ports up.
192.168.12.253 has 2 ports up.
192.168.13.253 has 2 ports up.
192.168.14.253 has 2 ports up.
192.168.15.253 has 3 ports up.
There are totally 120 ports available in the network.11 ports are currently up.Port up rate is 9.17%
User authentication failed for below switches: 
Below switches are not reachable: 


案例2:

案例背景:

案例1中的交换机型号为3700的交换机配置了多个管理ip地址,尝试确认交换机的型号以后,再删除掉其vlan10上的地址,并确认修改成功。

案例思路:

  1. 删除管理vlan 10上的管理ip之前要先确认交换机型号为3700,使用display version 查看版本信息,使用正则匹配证实是3700后,进行删除多余ip操作,同时注意及时保存重启。
  2. 重启后验证的方式有两个,一是直接登录验证,二是查询ip是否可达。

案例代码:

#!/usr/bin/env python
# _*_ coding: utf-8 _*_
 @author: antenna
 @contact: lilyef2000@gmail.com
 @software: 
 @file: advance2_1.py
 @time: 2019/3/29 11:45
 @desc:
import paramiko
import re
import socket
import time
username = 'python' 
password = '123' 
with open('ip_list.txt', 'r+') as f:
    iplist = f.readlines()
switch_ipremoved = []
switch_not_ipremoved = []
switch_with_authentication_issue = []
switch_not_reachable = []
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
for line in iplist:
    try:
        ip_address = line.strip()
        ssh_client.connect(hostname=ip_address, username=username, password=password, timeout=2, look_for_keys=False)
        print("Successfully connect to {}".format(ip_address))
        command = ssh_client.invoke_shell()
        command.send("display version\n")
        time.sleep(1)
        output = command.recv(




    
65535)
        output = output.decode(encoding='utf-8')
        switch_model = re.search(r'(S\d.*?-.*?-[A-Z]+)', output)
        vrp_version = re.search(r'\((S.*?)\)', output)
        print("switch model:{}  vrp version:{}".format(switch_model.group(1), vrp_version.group(1)))
        if switch_model.group(1) == 'S3700-26C-HI' and vrp_version.group(1) == 'S3700 V200R001C00':  # 型号和软件版本都正确
            command.send("display interface vlan10\n")
            time.sleep(1)
            output = command.recv(65535)
            output = output.decode(encoding='utf-8')
            ip_vlan10 = re.search(r'Internet Address is ((\d+\.){3}\d+)/(\d+)', output)
            if ip_vlan10:  # 如果发现配置了vlan 10的管理地址,则将其删除保存并重启
                print("vlan 10 ip address:{}".format(ip_vlan10.group()))
                command.send("system-view\n")
                command.send("interface Vlanif 10\n")
                command.send("undo ip address {} {}\n".format(ip_vlan10.group(1), ip_vlan10.group(3)))
                command.send("quit\n")
                command.send("quit\n")
                command.send("save\n")
                command.send("Y\n")
                command.send("reboot\n")
                command.send("Y\n")
                time.sleep(1)
                output = command.recv(65535)
                output = output.decode(encoding='utf-8')
                print("find vlan 10 ip address, removing.")
                switch_ipremoved.append(ip_address)
            else:
                print("not find vlan 10 ip address.")
                switch_not_ipremoved.append(ip_address)
    except paramiko.ssh_exception.AuthenticationException:
        print('Error: User authentication failed for below switches: {}'.format(ip_address))
        switch_with_authentication_issue.append(ip_address)
    except socket.error:
        print("Error: {} is not reachable.".format(ip_address))
        switch_not_reachable.append(ip_address)
ssh_client.close()
print('\nUser authentication failed for below switches: ')
for i in switch_with_authentication_issue:
    print(i)
print('\nBelow switches are not reachable: ')
for i in switch_not_reachable:
    print(i)
print('\nBelow switches vlan ip address are removed in this process: ')
for i in switch_ipremoved:
    print(i)
print('\nBelow switches vlan ip address are not removed in this process: ')
for i in switch_not_ipremoved:
    print(i)

执行结果:

C:\py3NetworkProgramming\python advance2_1.py
Successfully connect to 192.168.11.253
switch model:S3700-26C-HI  vrp version:S3700 V200R001C00
not find vlan 10 ip address.
Successfully connect to 192.168.12.253
switch model:S3700-26C-HI  vrp version:S3700 V200R001C00
vlan 10 ip address:Internet Address is 192.168.56.12/24
find vlan 10 ip address, removing.
Error: 192.168.13.253 is not reachable.
Successfully connect to 192.168.56.10
switch model:S5700-28C-HI  vrp version:S5700 V200R001C00
Error: User authentication failed for below switches: 192.168.15.253
User authentication failed for below switches: 
192.168.15.253
Below switches are not reachable: 
192.168.13.253
Below switches vlan ip address are removed  in this process:
192.168.12.253
Below switches vlan ip address are not removed  in this process: 
192.168.11.253
Process finished with exit code 0

代码讲解:

从执行结果上可以看到,几种结果都测试到了:

  1. User authentication failed
  2. not reachable
  3. vlan 10 ip 存在
  4. vlan 10 ip 不存在

实现的关键是使用正则匹配所需要的关键字:

  • if switch_model.group(1) == 'S3700-26C-HI' and vrp_version.group(1) == 'S3700 V200R001C00': 保证交换机型号是3700,并且vrp版本符合要求才进行下一步的删除vlan 10 ip的操作
  • ip_vlan10 = re.search(r'Internet Address is ((\d+\.){3}\d+)/(\d+)', output) if ip_vlan10: # 如果发现配置了vlan 10的管理地址,则将其删除保存并重启
  • switch_ipremoved和switch_not_ipremoved只是表明本次处理是否进行了删除ip操作,并不是实际的状态
  • 为了尝试正则匹配的有效性,在第4个ip设置了一台S5700:switch model:S5700-28C-HI vrp version:S5700 V200R001C00
  • 192.168.13.253 not reachable的原因是我的win7运行在192.168.56.0/24 ip地址段,与其网关可以正常通信,但该交换机上未配置回程路由: ip route-static 192.168.56.0 255.255.255.0 192.168.13.1

代码优化

这两个案例的代码可以进一步优化,比如加入日志记录模块可以在程序出现问题时及时记录下日志:

import logging
logging.basicConfig(                             level=logging.INFO, format='%(asctime)s, %(levelname)s - %(message)s')
#logging.basicConfig(filename='myProgramLog.txt', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.info('toss is %s, guess is %s' % (toss, guess))

还可以加入colorama模块使输出的不同状态用不同颜色表示:

#!/usr/bin/env python
# _*_ coding: utf-8 _*_
 @author: antenna
 @contact: lilyef2000@gmail.com
 @software: 
 @file: color_test.py
 @time: 2019/3/9 21:43
 @desc:
# 参考 https://blog.csdn.net/qianghaohao/article/details/52117082
# -----------------colorama模块的一些常量---------------------------
# Fore: BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, RESET.
# Back: BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, RESET.
# Style: DIM, NORMAL, BRIGHT, RESET_ALL
from colorama import init, Fore, Back, Style
init(autoreset=True)
class Colored(object):
    #  前景色:红色  背景色:默认
    def red(self, s):
        return Fore.RED + s + Fore.RESET
    #  前景色:绿色  背景色:默认
    def green(self, s):
        return Fore.GREEN + s + Fore.RESET
    #  前景色:黄色  背景色:默认
    def yellow(self, s):
        return Fore.YELLOW + s + Fore.RESET
    #  前景色:蓝色  背景色:默认
    def blue(self, s):
        return Fore.BLUE + s + Fore.RESET
    #  前景色:洋红色  背景色:默认
    def magenta(self, s):
        return Fore.MAGENTA + s + Fore.RESET
    #  前景色:青色  背景色:默认
    def cyan(self, s):
        return Fore.CYAN + s + Fore.RESET
    #  前景色:白色  背景色:默认
    def white(self, s):
        return Fore.WHITE + s + Fore.RESET
    #  前景色:黑色  背景色:默认
    def black(self, s):
        return Fore.BLACK
    #  前景色:白色  背景色:绿色
    def white_green(self, s):
        return Fore.WHITE + Back.GREEN + s + Fore.RESET + Back.RESET
color = Colored()
print(color.red('I am red!'))