zabbix 网络质量监控,自定义python模块,集成ICMP/TCP/UDP探测,批量监控线路质量自定义阈值联动mtr保存线路故障日志并发送至noc邮箱

互联网故障一般表现为丢包和时延增大,持续性故障不难排查,难的是间歇性或凌晨故障,后者往往来不及等我们测试就已经恢复正常,得不到异常时的mtr无法判断故障点在哪里

故此有了根据丢包率和时延变换联动mtr的需求

前段时间使用Mysql实现了这个功能,缺点是占用太多系统资源,且脚本繁重,优点是数据可复用,做多种形式的展示

后续使用socket+deque实现低能耗与轻量,也可用通过开放互联网API来做分布式监控,缺点是历史数据不留存,用完即丢

Ubuntu 18.04.5 LTS+Python 3.6.9

python库

自带基本库,考虑到系统权限问题没有使用第三方库

http://ip-api.com,免费版,限制频率45次/分钟,国外归属地准确率较高,国内查询一塌糊涂,国内推荐使用ipip

  1 #!/usr/bin/env python3
  2 #-*-coding:utf-8-*-
  3 from collections import deque
  4 import itertools,time
  5 import queue,json
  6 import argparse,sys,re,os,subprocess
  7 import time,socket,random,string
  8 import threading
  9 from functools import reduce
 10 import logging
 12 ipqli=deque()
 13 filename = os.path.realpath(sys.argv[0])
 14 def logger():
 15     dir = os.path.dirname(os.path.realpath(sys.argv[0]))
 16     log_name = dir+'/log'
 17     logger = logging.getLogger()
 18     fh = logging.FileHandler(log_name)
 19     formater = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
 20     fh.setFormatter(formater)
 21     logger.setLevel(logging.DEBUG)
 22     logger.addHandler(fh)
 23     return logger
 24 log = logger()
 25 #ping程序,避免系统权限问题未使用ping3
 26 class Ping:
 27     def __init__(self,ip,count=20,udp_length=64):
 28         ip = tuple(ip)
 29         self.sip,self.tip,self.type,self.port,self.inver=ip
 30         self.type = self.type.lower()
 31         self.port = int(self.port)
 32         self.count=count
 33         self.inver = float(self.inver)
 34         self.udp_length=udp_length
 35         restime_name = 'restime_deque'+''.join(ip).replace('.','')
 36         pkloss_name = 'pkloss_deque'+''.join(ip).replace('.','')
 37         ipqevent = 'event'+''.join(ip).replace('.','')
 38         locals()[restime_name] = deque(maxlen=60)
 39         locals()[pkloss_name] = deque(maxlen=60)
 40         self.restime_deque = locals()[restime_name]
 41         self.pkloss_deque = locals()[pkloss_name]
 42         self.ret_restime_deque = globals()[restime_name]
 43         self.ret_pkloss_deque = globals()[pkloss_name]
 44         self.ipqevent = globals()[ipqevent]
 45         self.compile= r'(?<=time=)\d+\.?\d+(?= ms)'
 46     def _tcp(self):
 47             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 48             s.settimeout(1)
 49             start_time = time.time()
 50             res_count=0
 51             try:
 52                 s.bind((self.sip,0))
 53                 s.connect((self.tip, self.port))
 54                 s.shutdown(socket.SHUT_RD)
 55                 value = (time.time() - start_time)*1000  
 56                 self.restime_deque.append(value)
 57                 self.pkloss_deque.append(0)
 58                 res_count=1
 59             except (socket.timeout,ConnectionError):
 60                 self.restime_deque.append(0)
 61                 self.pkloss_deque.append(1)
 62             except OSError as e:
 63                 log.debug(e)
 64                 return 0,0
 65             usetime = time.time()-start_time
 66             sleep_time = self.inver - usetime if usetime<self.inver else self.inver
 67             return sleep_time,res_count
 68     def _udp(self):
 69         res_count=0
 70         s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
 71         s.settimeout(1)
 72         start_time = time.time()
 73         data=''.join(random.choice(string.ascii_letters+ string.digits) for x in range(self.udp_length))
 74         try:
 75             s.sendto(data.encode('utf-8'),(self.tip,self.port))
 76             s.recv(1024)
 77             value = (time.time() - start_time)*1000
 78  
79
self.restime_deque.append(value) 80 self.pkloss_deque.append(0) 81 res_count=1 82 except socket.timeout: 83 self.restime_deque.append(0) 84 self.pkloss_deque.append(1) 85 except OSError as e: 86 log.debug(e) 87 return 0,0 88 usetime = time.time()-start_time 89 sleep_time = self.inver - usetime if usetime<self.inver else self.inver 90 return sleep_time,res_count 91 def _icmp(self): 92 res_count=0 93 start_time = time.time() 94 cmd = 'ping -i %s -c 1 -W 1 -I %s %s'%(self.inver,self.sip,self.tip) 95 ret = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].decode('utf8') 96 try: 97 value=re.findall(self.compile, ret,re.S)[0] 98 self.restime_deque.append(value) 99 self.pkloss_deque.append(0) 100 res_count=1 101 except: 102 self.pkloss_deque.append(1) 103 self.restime_deque.append(0) 104 usetime = time.time()-start_time 105 sleep_time = self.inver - usetime if usetime<self.inver else self.inver 106 return sleep_time,res_count 107 def fastping(self): 108 getattr(self, '_'+self.type)() 109 def slow_ping(self): 110 index = 0 111 res_count=0 112 self.ipqevent.set() 113 while index<self.count: 114 sleep_time,count=getattr(self, '_'+self.type)() 115 index+=1 116 res_count+=count 117 if len(self.ret_restime_deque)<2 or len(self.ret_pkloss_deque)<2 : 118 break 119 time.sleep(sleep_time) 120 return index,res_count 121 def ping_value(self): 122 start_time = time.time() 123 count = self.count 124 rescount = self.count 125 if len(self.ret_restime_deque)<2 or len(self.ret_pkloss_deque)<2: 126 fastli=[] 127 for x in range(self.count): 128 t = threading.Thread(target=self.fastping) 129 t.start() 130 fastli.append(t) 131 for th in fastli: 132 th.join() 133 else: 134 count,rescount = self.slow_ping() 135 rescount=count if rescount==0 else rescount 136 use_time = round(time.time()-start_time,4) 137 li = [self.restime_deque.pop() for x in range(count)] 138 pkli = [self.pkloss_deque.pop() for x in range(count)] 139 try: 140 restime = reduce(lambda x ,y :round(float(x)+float(y),2), li)/rescount if len(li) >1 else round(float(li[0]),2) 141 pkloss= reduce(lambda x ,y :int(x)+int(y), pkli)/count*100 142 return (round(restime,2),round(pkloss,2),use_time) 143 except Exception as e: 144 log.debug(e) 145 return 0,0,0 146 #server端代码 147 class Server(): 148 def __init__(self,sock): 149 global ipqli 150 self.ipqli=ipqli 151 self.thli=[] 152 self.ipli = [] 153 self.sock=sock 154 self.basedir = os.path.dirname(os.path.realpath(sys.argv[0])) 155 self.env = threading.Event() 156 @classmethod 157 def start(cls): 158 s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) 159 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 160 address = ('127.0.0.1',6590) 161 s.bind(address) 162 obj = cls(s) 163 ping_server=threading.Thread(target=obj.server) 164 ping_server.start() 165 obj.thli.append(ping_server) 166 create_t = threading.Thread(target=obj.create) 167 create_t.start() 168 obj.thli.append(create_t) 169 for t in obj.thli: 170 t.join() 171 def server(self): 172 while True: 173 try: 174 self.sock.listen(100) 175 conn,addr = self.sock.accept() 176 data=conn.recv(1024) 177 data = data.decode('utf-8') 178 data = json.loads(data) 179 ip,item = data 180 restime_ipq = 'restime_deque'+''.join(ip).replace('.','') 181 pkloss_ipq = 'pkloss_deque'+''.join(ip).replace('.','') 182 ipqevent = 'event'+''.join(ip).replace('.','') 183 if ip not in self.ipli: 184 globals()[restime_ipq] = deque(maxlen=30) 185 globals()[pkloss_ipq] = deque(maxlen=30) 186 globals()[ipqevent] = threading.Event() 187 self.ipqli.append(ip) 188 self.ipli.append(ip) 189 log.debug('create ipdeque %s %s'%(restime_ipq,pkloss_ipq)) 190 self.env.set() 191 self.sendvalue(conn,ip,item) 192 conn.close() 193 except Exception as e: 194 log.debug(str(e)) 195 conn.close() 196 def create(self): 197 while True: 198 self.env.wait() 199 try: 200 ip = self.ipqli.pop() 201 log.debug('create %s'%ip) 202 t=threading.Thread(target=self.makevalue,args=(ip,)) 203 t.start() 204 except Exception as a: 205 log.debug(str(a)) 206 if not self.ipqli: 207 self.env.clear() 209 def makevalue(self,ip): 210 restime_name = 'restime_deque'+''.join(ip).replace('.','') 211 pkloss_name = 'pkloss_deque'+''.join(ip).replace('.','') 212 ipqevent_name = 'event'+''.join(ip).replace('.','') 213 restime_ipq = globals()[restime_name] 214 pkloss_ipq = globals()[pkloss_name] 215 ipqevent = globals()[ipqevent_name] 216 obj = Ping(ip) 217 while len(restime_ipq) < 30 or len(pkloss_ipq) <30: 218 restime,pkloss,use_time=obj.ping_value() 219 restime_ipq.append((restime,use_time)) 220 pkloss_ipq.append((pkloss,use_time)) 221 else: 222 del restime_ipq 223 del pkloss_ipq 224 del ipqevent 225 self.ipli.remove(ip) 226 log.debug('delete ipdeque %s %s'%(restime_name,pkloss_name)) 227 def sendvalue(self,conn,ip,item): 228 fromat_ip=''.join(ip).replace('.','') 229 _,tip,*arg=ip 230 restime_name = 'restime_deque'+fromat_ip 231 pkloss_name = 'pkloss_deque'+fromat_ip 232 ipqevent_name = 'event'+fromat_ip 233 restime_ipq = globals()[restime_name] 234 pkloss_ipq = globals()[pkloss_name] 235 ipqevent = globals()[ipqevent_name] 236 mtr_dir = self.basedir+'/mtr_log/'+tip+'-'+time.strftime('%Y-%m-%d',time.localtime()) + '.log' 237 mtr_cmd = self.basedir + '/mtr.py'+' '+tip+' '+mtr_dir 238 if len(restime_ipq) < 2 and len(restime_ipq) <2: 239 ipqevent.clear() 240 try: 241 ipqevent.wait() 242 if item =='restime': 243 ret,use_time = restime_ipq.pop() 244 hisret,_=restime_ipq[-1] 245 if ret - hisret >20: 246 subprocess.Popen(mtr_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) 247 elif item =='pkloss': 248 ret,use_time = pkloss_ipq.pop() 249 if 100> ret >20: 250 subprocess.Popen(mtr_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) 251 except Exception as a: 252 ret = a 253 log.debug(str(ret)) 254 conn.sendall(str(ret).encode()) 256 #用户输入IP格式检查 257 class Ipcheck(): 258 def __init__(self,sip,tip,item,ping_type,inver): 259 self.sip =sip 260 self.tip=tip 261 self.item=item 262 self.type = ping_type.lower() 263 self.inver=float(inver) 264 def check(self): 265 if self.item not in ['restime','pkloss'] or self.type not in ['icmp','tcp','udp'] or self.inver<0.2: 266 return False 267 elif not self.checkipformat(): 268 return False 269 else: 270 return True 271 def check_fun(self,ip): 272 return int(ip)<256 273 def checkipformat(self): 274 try: 275 tiplist = self.tip.split('.') 276 tipformat = re.findall(r'^\d+\.\d+\.\d+\.\d+$', self.tip) 277 if self.sip: 278 siplist = self.sip.split('.') 279 sipformat = re.findall(r'^\d+\.\d+\.\d+\.\d+$', self.sip) 280 else: 281 siplist=[1,1,1,1] 282 sipformat=True 283 if not tipformat or not sipformat: 284 raise 285 check_ipli = tiplist+siplist 286 return self.checkiplength(check_ipli) 287 except: 288 return False 289 def checkiplength(self,check_ipli): 290 if list(itertools.filterfalse(self.check_fun, check_ipli)): 291 return False 292 else: 293 return True 294 def run(): 296 cmd = 'python3 %s -S server'%filename 297 subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) 298 #socket_client端,向server请求数据并返回给用户 299 def socket_client(ip,item): 300 try: 301 s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) 302 s.settimeout(3) 303 s.connect(('127.0.0.1',6590)) 304 data = [ip,item] 305 data = json.dumps(data) 306 s.sendall(data.encode()) 307 ret = s.recv(1024) 308 s.close() 309 print(ret.decode()) 310 except socket.timeout as t: 311 log.debug(str(t)) 312 s.close() 313 sys.exit(0) 314 except Exception as e: 315 print('server will start') 316 log.debug(str(e)) 317 sys.exit(0) 319 if __name__ == '__main__': 320 parser = argparse.ArgumentParser(description='icmp for monitor') 321 parser.add_argument('-S',action = 'store',dest='server') 322 parser.add_argument('-t',action = 'store',dest='tip') 323 parser.add_argument('-s',action = 'store',dest='sip') 324 parser.add_argument('-I',action='store',dest='item') 325 parser.add_argument('-i',action='store',dest='inver',default='1') 326 parser.add_argument('-T',action='store',dest='ping_type',default='icmp') 327 parser.add_argument('-p',action='store',dest='port',default='0') 328 args= parser.parse_args() 329 server_status_cmd = "ps -ef | grep '%s -S server' | grep -v grep | cut -c 9-16"%filename 330 server_status = subprocess.Popen(server_status_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0] 331 if not server_status: 332 run() 333 if args.server: 334 Server.start() 335 sys.exit(0) 336 try: 337 tip = socket.gethostbyname(args.tip) 338 sip = args.sip 339 item = args.item 340 ping_type = args.ping_type 341 port = args.port 342 inver=args.inver 343 ip=(sip,tip,ping_type,port,inver) 344 except: 345 print('format error') 346 check = Ipcheck(sip, tip, item,ping_type,inver) 347 if not check.check(): 348 print('''---------------------------Options----------------------------------- 349 -s --source ip address 350 -t --destination ip address 351 -I --item(restime/pkloss) 352 -T --type(icmp/tcp/udp default icmp) 353 -p --port(default 0) 354 -i --inver(default 1/min 0.2) 355 ---------------------------Example----------------------------------- 356 ------pingd -s 10.0.3.108 -t 10.0.0.1 -I restime -i 1 -T tcp -p 80------- 357 ''') 358 sys.exit(0) 359 socket_client(ip,item)

mtr.py

 1 #!/usr/bin/env python3
 2 #-*-coding:utf-8-*-
 3 import sys,logging,os,subprocess,requests
 4 import email
 5 import smtplib
 6 from email.header import Header
 7 from email.utils import formataddr
 8 from email.mime.text import MIMEText
 9 def logger(ip,log_name):
10     logger = logging.getLogger()
11     fh = logging.FileHandler(log_name)
12     formater = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
13     fh.setFormatter(formater)
14     logger.setLevel(logging.DEBUG)
15     logger.addHandler(fh)
16     return logger
17 def ip_search(ip):
18     r=requests.get('http://ip-api.com/json/%s?lang=zh-CN'%ip)
19     ret = r.json()
20     return (ret['regionName']+' '+ret['city'])
21 class sendemail:
22     def __init__(self,email_list,content,subject):
23         self.email_list = email_list
24         self.content = content
25         self.subject = subject
26     def sendemail(self):
27         msg = MIMEText(self.content,'plain','utf-8')
28         msg['from'] = formataddr(['dark','976584601@qq.com'])
29         msg['to'] = ','.join(self.email_list)
30         msg['subject'] = self.subject
31         service = smtplib.SMTP('smtp.qq.com')
32         service.login('976584601@qq.com','password')
33         service.sendmail('976584601@qq.com',self.email_list,msg.as_string())
34         service.quit()
35 def mtr(ip,log_name):
36     mtr_log_dir = os.path.dirname(os.path.realpath(sys.argv[0]))+'/mtr_log'
37     cmd ='mtr -r -n -c 1 -w -b %s'%ip
38     data = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].decode('utf8')
39     if log_name.split('/')[-1] not in os.listdir(mtr_log_dir):
40         ip_city = ip_search(ip)
41         title = '德国腾讯到 %s %s 线路异常'%(ip_city,ip)
42         mail_list = ['cs11241991@163.com']
43         mail = sendemail(mail_list,data,title)
44         mail.sendemail()
45     log = logger(ip,log_name)
46     log.debug(data)
47 if __name__ =='__main__':
48     ip = sys.argv[1]
49     log_name = sys.argv[2]
50     mtr(ip,log_name)
3 while True: 4 sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) 5 sock.bind(( ' ipaddress ' ,port)) 6 data,addr = sock.recvfrom(65535 ) 7 sock.sendto(data,addr)

也可以使用socat,实际测试使用socat会引入额外开销,时延不准确

socat -v UDP-LISTEN:4000,fork PIPE