zabbix 网络质量监控,自定义python模块,集成ICMP/TCP/UDP探测,批量监控线路质量自定义阈值联动mtr保存线路故障日志并发送至noc邮箱
互联网故障一般表现为丢包和时延增大,持续性故障不难排查,难的是间歇性或凌晨故障,后者往往来不及等我们测试就已经恢复正常,得不到异常时的mtr无法判断故障点在哪里
故此有了根据丢包率和时延变换联动mtr的需求
前段时间使用Mysql实现了这个功能,缺点是占用太多系统资源,且脚本繁重,优点是数据可复用,做多种形式的展示
后续使用socket+deque实现低能耗与轻量,也可用通过开放互联网API来做分布式监控,缺点是历史数据不留存,用完即丢
Ubuntu 18.04.5 LTS+Python 3.6.9
python库
自带基本库,考虑到系统权限问题没有使用第三方库
http://ip-api.com,免费版,限制频率45次/分钟,国外归属地准确率较高,国内查询一塌糊涂,国内推荐使用ipip
1 #!/usr/bin/env python3
2 #-*-coding:utf-8-*-
3 from collections import deque
4 import itertools,time
5 import queue,json
6 import argparse,sys,re,os,subprocess
7 import time,socket,random,string
8 import threading
9 from functools import reduce
10 import logging
12 ipqli=deque()
13 filename = os.path.realpath(sys.argv[0])
14 def logger():
15 dir = os.path.dirname(os.path.realpath(sys.argv[0]))
16 log_name = dir+'/log'
17 logger = logging.getLogger()
18 fh = logging.FileHandler(log_name)
19 formater = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
20 fh.setFormatter(formater)
21 logger.setLevel(logging.DEBUG)
22 logger.addHandler(fh)
23 return logger
24 log = logger()
25 #ping程序,避免系统权限问题未使用ping3
26 class Ping:
27 def __init__(self,ip,count=20,udp_length=64):
28 ip = tuple(ip)
29 self.sip,self.tip,self.type,self.port,self.inver=ip
30 self.type = self.type.lower()
31 self.port = int(self.port)
32 self.count=count
33 self.inver = float(self.inver)
34 self.udp_length=udp_length
35 restime_name = 'restime_deque'+''.join(ip).replace('.','')
36 pkloss_name = 'pkloss_deque'+''.join(ip).replace('.','')
37 ipqevent = 'event'+''.join(ip).replace('.','')
38 locals()[restime_name] = deque(maxlen=60)
39 locals()[pkloss_name] = deque(maxlen=60)
40 self.restime_deque = locals()[restime_name]
41 self.pkloss_deque = locals()[pkloss_name]
42 self.ret_restime_deque = globals()[restime_name]
43 self.ret_pkloss_deque = globals()[pkloss_name]
44 self.ipqevent = globals()[ipqevent]
45 self.compile= r'(?<=time=)\d+\.?\d+(?= ms)'
46 def _tcp(self):
47 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
48 s.settimeout(1)
49 start_time = time.time()
50 res_count=0
51 try:
52 s.bind((self.sip,0))
53 s.connect((self.tip, self.port))
54 s.shutdown(socket.SHUT_RD)
55 value = (time.time() - start_time)*1000
56 self.restime_deque.append(value)
57 self.pkloss_deque.append(0)
58 res_count=1
59 except (socket.timeout,ConnectionError):
60 self.restime_deque.append(0)
61 self.pkloss_deque.append(1)
62 except OSError as e:
63 log.debug(e)
64 return 0,0
65 usetime = time.time()-start_time
66 sleep_time = self.inver - usetime if usetime<self.inver else self.inver
67 return sleep_time,res_count
68 def _udp(self):
69 res_count=0
70 s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
71 s.settimeout(1)
72 start_time = time.time()
73 data=''.join(random.choice(string.ascii_letters+ string.digits) for x in range(self.udp_length))
74 try:
75 s.sendto(data.encode('utf-8'),(self.tip,self.port))
76 s.recv(1024)
77 value = (time.time() - start_time)*1000
78
79 self.restime_deque.append(value)
80 self.pkloss_deque.append(0)
81 res_count=1
82 except socket.timeout:
83 self.restime_deque.append(0)
84 self.pkloss_deque.append(1)
85 except OSError as e:
86 log.debug(e)
87 return 0,0
88 usetime = time.time()-start_time
89 sleep_time = self.inver - usetime if usetime<self.inver else self.inver
90 return sleep_time,res_count
91 def _icmp(self):
92 res_count=0
93 start_time = time.time()
94 cmd = 'ping -i %s -c 1 -W 1 -I %s %s'%(self.inver,self.sip,self.tip)
95 ret = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].decode('utf8')
96 try:
97 value=re.findall(self.compile, ret,re.S)[0]
98 self.restime_deque.append(value)
99 self.pkloss_deque.append(0)
100 res_count=1
101 except:
102 self.pkloss_deque.append(1)
103 self.restime_deque.append(0)
104 usetime = time.time()-start_time
105 sleep_time = self.inver - usetime if usetime<self.inver else self.inver
106 return sleep_time,res_count
107 def fastping(self):
108 getattr(self, '_'+self.type)()
109 def slow_ping(self):
110 index = 0
111 res_count=0
112 self.ipqevent.set()
113 while index<self.count:
114 sleep_time,count=getattr(self, '_'+self.type)()
115 index+=1
116 res_count+=count
117 if len(self.ret_restime_deque)<2 or len(self.ret_pkloss_deque)<2 :
118 break
119 time.sleep(sleep_time)
120 return index,res_count
121 def ping_value(self):
122 start_time = time.time()
123 count = self.count
124 rescount = self.count
125 if len(self.ret_restime_deque)<2 or len(self.ret_pkloss_deque)<2:
126 fastli=[]
127 for x in range(self.count):
128 t = threading.Thread(target=self.fastping)
129 t.start()
130 fastli.append(t)
131 for th in fastli:
132 th.join()
133 else:
134 count,rescount = self.slow_ping()
135 rescount=count if rescount==0 else rescount
136 use_time = round(time.time()-start_time,4)
137 li = [self.restime_deque.pop() for x in range(count)]
138 pkli = [self.pkloss_deque.pop() for x in range(count)]
139 try:
140 restime = reduce(lambda x ,y :round(float(x)+float(y),2), li)/rescount if len(li) >1 else round(float(li[0]),2)
141 pkloss= reduce(lambda x ,y :int(x)+int(y), pkli)/count*100
142 return (round(restime,2),round(pkloss,2),use_time)
143 except Exception as e:
144 log.debug(e)
145 return 0,0,0
146 #server端代码
147 class Server():
148 def __init__(self,sock):
149 global ipqli
150 self.ipqli=ipqli
151 self.thli=[]
152 self.ipli = []
153 self.sock=sock
154 self.basedir = os.path.dirname(os.path.realpath(sys.argv[0]))
155 self.env = threading.Event()
156 @classmethod
157 def start(cls):
158 s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
159 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
160 address = ('127.0.0.1',6590)
161 s.bind(address)
162 obj = cls(s)
163 ping_server=threading.Thread(target=obj.server)
164 ping_server.start()
165 obj.thli.append(ping_server)
166 create_t = threading.Thread(target=obj.create)
167 create_t.start()
168 obj.thli.append(create_t)
169 for t in obj.thli:
170 t.join()
171 def server(self):
172 while True:
173 try:
174 self.sock.listen(100)
175 conn,addr = self.sock.accept()
176 data=conn.recv(1024)
177 data = data.decode('utf-8')
178 data = json.loads(data)
179 ip,item = data
180 restime_ipq = 'restime_deque'+''.join(ip).replace('.','')
181 pkloss_ipq = 'pkloss_deque'+''.join(ip).replace('.','')
182 ipqevent = 'event'+''.join(ip).replace('.','')
183 if ip not in self.ipli:
184 globals()[restime_ipq] = deque(maxlen=30)
185 globals()[pkloss_ipq] = deque(maxlen=30)
186 globals()[ipqevent] = threading.Event()
187 self.ipqli.append(ip)
188 self.ipli.append(ip)
189 log.debug('create ipdeque %s %s'%(restime_ipq,pkloss_ipq))
190 self.env.set()
191 self.sendvalue(conn,ip,item)
192 conn.close()
193 except Exception as e:
194 log.debug(str(e))
195 conn.close()
196 def create(self):
197 while True:
198 self.env.wait()
199 try:
200 ip = self.ipqli.pop()
201 log.debug('create %s'%ip)
202 t=threading.Thread(target=self.makevalue,args=(ip,))
203 t.start()
204 except Exception as a:
205 log.debug(str(a))
206 if not self.ipqli:
207 self.env.clear()
209 def makevalue(self,ip):
210 restime_name = 'restime_deque'+''.join(ip).replace('.','')
211 pkloss_name = 'pkloss_deque'+''.join(ip).replace('.','')
212 ipqevent_name = 'event'+''.join(ip).replace('.','')
213 restime_ipq = globals()[restime_name]
214 pkloss_ipq = globals()[pkloss_name]
215 ipqevent = globals()[ipqevent_name]
216 obj = Ping(ip)
217 while len(restime_ipq) < 30 or len(pkloss_ipq) <30:
218 restime,pkloss,use_time=obj.ping_value()
219 restime_ipq.append((restime,use_time))
220 pkloss_ipq.append((pkloss,use_time))
221 else:
222 del restime_ipq
223 del pkloss_ipq
224 del ipqevent
225 self.ipli.remove(ip)
226 log.debug('delete ipdeque %s %s'%(restime_name,pkloss_name))
227 def sendvalue(self,conn,ip,item):
228 fromat_ip=''.join(ip).replace('.','')
229 _,tip,*arg=ip
230 restime_name = 'restime_deque'+fromat_ip
231 pkloss_name = 'pkloss_deque'+fromat_ip
232 ipqevent_name = 'event'+fromat_ip
233 restime_ipq = globals()[restime_name]
234 pkloss_ipq = globals()[pkloss_name]
235 ipqevent = globals()[ipqevent_name]
236 mtr_dir = self.basedir+'/mtr_log/'+tip+'-'+time.strftime('%Y-%m-%d',time.localtime()) + '.log'
237 mtr_cmd = self.basedir + '/mtr.py'+' '+tip+' '+mtr_dir
238 if len(restime_ipq) < 2 and len(restime_ipq) <2:
239 ipqevent.clear()
240 try:
241 ipqevent.wait()
242 if item =='restime':
243 ret,use_time = restime_ipq.pop()
244 hisret,_=restime_ipq[-1]
245 if ret - hisret >20:
246 subprocess.Popen(mtr_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
247 elif item =='pkloss':
248 ret,use_time = pkloss_ipq.pop()
249 if 100> ret >20:
250 subprocess.Popen(mtr_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
251 except Exception as a:
252 ret = a
253 log.debug(str(ret))
254 conn.sendall(str(ret).encode())
256 #用户输入IP格式检查
257 class Ipcheck():
258 def __init__(self,sip,tip,item,ping_type,inver):
259 self.sip =sip
260 self.tip=tip
261 self.item=item
262 self.type = ping_type.lower()
263 self.inver=float(inver)
264 def check(self):
265 if self.item not in ['restime','pkloss'] or self.type not in ['icmp','tcp','udp'] or self.inver<0.2:
266 return False
267 elif not self.checkipformat():
268 return False
269 else:
270 return True
271 def check_fun(self,ip):
272 return int(ip)<256
273 def checkipformat(self):
274 try:
275 tiplist = self.tip.split('.')
276 tipformat = re.findall(r'^\d+\.\d+\.\d+\.\d+$', self.tip)
277 if self.sip:
278 siplist = self.sip.split('.')
279 sipformat = re.findall(r'^\d+\.\d+\.\d+\.\d+$', self.sip)
280 else:
281 siplist=[1,1,1,1]
282 sipformat=True
283 if not tipformat or not sipformat:
284 raise
285 check_ipli = tiplist+siplist
286 return self.checkiplength(check_ipli)
287 except:
288 return False
289 def checkiplength(self,check_ipli):
290 if list(itertools.filterfalse(self.check_fun, check_ipli)):
291 return False
292 else:
293 return True
294 def run():
296 cmd = 'python3 %s -S server'%filename
297 subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
298 #socket_client端,向server请求数据并返回给用户
299 def socket_client(ip,item):
300 try:
301 s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
302 s.settimeout(3)
303 s.connect(('127.0.0.1',6590))
304 data = [ip,item]
305 data = json.dumps(data)
306 s.sendall(data.encode())
307 ret = s.recv(1024)
308 s.close()
309 print(ret.decode())
310 except socket.timeout as t:
311 log.debug(str(t))
312 s.close()
313 sys.exit(0)
314 except Exception as e:
315 print('server will start')
316 log.debug(str(e))
317 sys.exit(0)
319 if __name__ == '__main__':
320 parser = argparse.ArgumentParser(description='icmp for monitor')
321 parser.add_argument('-S',action = 'store',dest='server')
322 parser.add_argument('-t',action = 'store',dest='tip')
323 parser.add_argument('-s',action = 'store',dest='sip')
324 parser.add_argument('-I',action='store',dest='item')
325 parser.add_argument('-i',action='store',dest='inver',default='1')
326 parser.add_argument('-T',action='store',dest='ping_type',default='icmp')
327 parser.add_argument('-p',action='store',dest='port',default='0')
328 args= parser.parse_args()
329 server_status_cmd = "ps -ef | grep '%s -S server' | grep -v grep | cut -c 9-16"%filename
330 server_status = subprocess.Popen(server_status_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0]
331 if not server_status:
332 run()
333 if args.server:
334 Server.start()
335 sys.exit(0)
336 try:
337 tip = socket.gethostbyname(args.tip)
338 sip = args.sip
339 item = args.item
340 ping_type = args.ping_type
341 port = args.port
342 inver=args.inver
343 ip=(sip,tip,ping_type,port,inver)
344 except:
345 print('format error')
346 check = Ipcheck(sip, tip, item,ping_type,inver)
347 if not check.check():
348 print('''---------------------------Options-----------------------------------
349 -s --source ip address
350 -t --destination ip address
351 -I --item(restime/pkloss)
352 -T --type(icmp/tcp/udp default icmp)
353 -p --port(default 0)
354 -i --inver(default 1/min 0.2)
355 ---------------------------Example-----------------------------------
356 ------pingd -s 10.0.3.108 -t 10.0.0.1 -I restime -i 1 -T tcp -p 80-------
357 ''')
358 sys.exit(0)
359 socket_client(ip,item)
mtr.py
1 #!/usr/bin/env python3
2 #-*-coding:utf-8-*-
3 import sys,logging,os,subprocess,requests
4 import email
5 import smtplib
6 from email.header import Header
7 from email.utils import formataddr
8 from email.mime.text import MIMEText
9 def logger(ip,log_name):
10 logger = logging.getLogger()
11 fh = logging.FileHandler(log_name)
12 formater = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
13 fh.setFormatter(formater)
14 logger.setLevel(logging.DEBUG)
15 logger.addHandler(fh)
16 return logger
17 def ip_search(ip):
18 r=requests.get('http://ip-api.com/json/%s?lang=zh-CN'%ip)
19 ret = r.json()
20 return (ret['regionName']+' '+ret['city'])
21 class sendemail:
22 def __init__(self,email_list,content,subject):
23 self.email_list = email_list
24 self.content = content
25 self.subject = subject
26 def sendemail(self):
27 msg = MIMEText(self.content,'plain','utf-8')
28 msg['from'] = formataddr(['dark','976584601@qq.com'])
29 msg['to'] = ','.join(self.email_list)
30 msg['subject'] = self.subject
31 service = smtplib.SMTP('smtp.qq.com')
32 service.login('976584601@qq.com','password')
33 service.sendmail('976584601@qq.com',self.email_list,msg.as_string())
34 service.quit()
35 def mtr(ip,log_name):
36 mtr_log_dir = os.path.dirname(os.path.realpath(sys.argv[0]))+'/mtr_log'
37 cmd ='mtr -r -n -c 1 -w -b %s'%ip
38 data = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].decode('utf8')
39 if log_name.split('/')[-1] not in os.listdir(mtr_log_dir):
40 ip_city = ip_search(ip)
41 title = '德国腾讯到 %s %s 线路异常'%(ip_city,ip)
42 mail_list = ['cs11241991@163.com']
43 mail = sendemail(mail_list,data,title)
44 mail.sendemail()
45 log = logger(ip,log_name)
46 log.debug(data)
47 if __name__ =='__main__':
48 ip = sys.argv[1]
49 log_name = sys.argv[2]
50 mtr(ip,log_name)
3
while
True:
4
sock =
socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
5
sock.bind((
'
ipaddress
'
,port))
6
data,addr = sock.recvfrom(65535
)
7
sock.sendto(data,addr)
也可以使用socat,实际测试使用socat会引入额外开销,时延不准确
socat -v UDP-LISTEN:4000,fork PIPE