1.利用tornado提供的websocket功能与浏览器建立长连接,读取实时日志并输出到浏览器
2.写一个实时读取日志的脚本,利用saltstack远程执行,并把实时日志发往redis中。
3.tornado读取redis中的信息,发往浏览器。
此过程用到了redis的发布和订阅功能。
先看一下tornado中是如何处理的:
import os
import sys
import tornado.websocket
import tornado.web
import tornado.ioloop
import redis
import salt.client
from tornado import gen
from tornado.escape import to_unicode
from logs.utility import get_last_lines
from logs import settings
class SubWebSocket(tornado.websocket.WebSocketHandler):
此handler处理远程日志查看
def open(self, *args, **kwargs):
print("opened")
@gen.coroutine
def on_message(self, message):
hostname, log_path, cmd = message.split("||")
local = salt.client.LocalClient()
r = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT,
password=settings.REDIS_PASSWD, db=5)
key = settings.LOG_KEY.format(server=hostname.strip(), log_path=log_path.strip())
channel = r.pubsub()
channel.subscribe(key)
local.cmd_async(hostname, "cmd.run", [cmd])
try:
while True:
data = channel.get_message()
if not data:
yield gen.sleep(0.05)
continue
if data["type"] == "message":
line = format_line(data["data"])
self.write_message(line)
except tornado.websocket.WebSocketClosedError:
self.close()
def on_close(self):
global FLAG
FLAG = False
print("closed")
def format_line(line):
line = to_unicode(line)
if "INFO" in line:
color = "#46A3FF"
elif "WARN" in line:
color = "#FFFF37"
elif "ERROR" in line:
color = "red"
elif "CRITICAL" in line:
color = "red"
else:
color = "#FFFFFF"
return "<span style='color:{}'>{}</span>".format(color, line)
class EchoWebSocket(tornado.websocket.WebSocketHandler):
def open(self):
print("WebSocket opened")
@gen.coroutine
def on_message(self, message):
log = message
print "log file: ", log
try:
with open(log, 'r') as f:
for line in get_last_lines(f):
line1 = format_line(line)
self.write_message(line1)
while True:
line = f.readline()
if not line:
yield gen.sleep(0.05)
continue
self.write_message(format_line(line.strip()))
except tornado.websocket.WebSocketClosedError as e:
print e
self.close()
def on_close(self):
print("WebSocket closed")
class Application(tornado.web.Application):
def __init__(self):
handlers = [
(r'/log/', MainHandler),
(r'/log/local', EchoWebSocket),
(r'/log/remote', SubWebSocket),
settings = {
"debug": True,
"template_path": os.path.join(os.path.dirname(__file__), "templates"),
"static_path": os.path.join(os.path.dirname(__file__), "static"),
super(Application, self).__init__(handlers, **settings)
class MainHandler(tornado.web.RequestHandler):
def get(self):
log = self.get_argument("log", None)
hostname = self.get_argument("hostname", None)
type = self.get_argument("type", "local")
cmd = self.get_argument("cmd", "")
context = {
"log": log,
"hostname": hostname,
"type": type,
"cmd": cmd,
self.render("index.html", **context)
配置文件中主要记录了redis服务器的地址等信息
LOG_KEY = "logs:{server}:{log_path}"
LOG_NAME = "catalina.out"
TAIL_LINE_NUM = 20
REDIS_HOST = "127.0.0.1"
REDIS_PORT = "6379"
REDIS_PASSWD = None
REDIS_EXPIRE = 300
try:
from local_settings import *
except ImportError:
index.html的内容如下:
<link href="{{ static_url('public/css/public.css') }}" rel="stylesheet" />
<link href="{{ static_url('kylin/css/style.css') }}" rel="stylesheet" />
</head>
<body style="background:#000000">
<div style="margin-left:10px;">
<pre id="id-content">
</pre>
<div id="id-bottom"></div>
<input type="hidden" id="id-log" value="{{ log }}" />
<input type="hidden" id="id-type" value="{{ type }}" />
<input type="hidden" id="id-hostname" value="{{ hostname }}" />
<input type="hidden" id="id-cmd" value="{{ cmd }}" />
<div class="btns btns_big">
<button type="button" class="query_btn cancle" id="id-stop">Stop</button>
<button type="button" class="query_btn commit" id="id-start">Start</button>
</div>
</div>
<script type="text/javascript" src="{{ static_url('js/jquery-1.11.3.min.js') }}"></script>
<script type="text/javascript">
var log_name = $("#id-log").val();
var type = $("#id-type").val();
var hostname = $("#id-hostname").val();
var cmd = $("#id-cmd").val();
var ws = new WebSocket("ws://{{ request.host }}/log/" + type);
ws.onopen = function(){
if (type === "local"){
ws.send(log_name);
} else {
ws.send(hostname + "||" + log_name + "||" + cmd);
var get_message = function(evt){
$("#id-content").append(evt.data + "\n");
document.getElementById("id-bottom").scrollIntoView()
ws.onmessage = get_message;
$("#id-stop").click(function(){
ws.onmessage = function(){};
$("#id-start").click(function(){
ws.onmessage = get_message;
</script>
</body>
</html>
这个tornado仅仅是提供了实时日志的服务,实际项目使用的是django,django中要做的其实很简单,提供log_name,hostname,type,cmd等四个参数。下面看一个实例:
class LogView(KylinView):
client_path = "/tmp/logtail.py"
def get(self, request):
minion_id = request.GET.get("minion_id")
context = {
"minion_id": minion_id,
"tail_log_url": settings.TAIL_LOG_URL,
return render(request, "cmdb/log_view.html", context)
def post(self, request):
minion_id = request.POST.get("minion_id")
log_path = request.POST.get("log_path")
if not log_path:
return JsonResponse({"success": False, "message": "请填写日志路径"})
try:
line_count = request.POST.get("line_count")
except (TypeError, ValueError):
return JsonResponse({"success": False, "message": "请输入正确的行数"})
local = salt.client.LocalClient()
ret = local.cmd(minion_id, "file.file_exists", [log_path])
if minion_id not in ret:
return JsonResponse({"success": False, "message": "服务器无法连通"})
if not ret[minion_id]:
return JsonResponse({"success": False, "message": "日志文件不存在"})
cmd = "{} {} {} {} {} {} {} {}".format(
settings.PYTHON_BIN, self.client_path, minion_id, log_path, line_count, settings.REDIS_HOST,
settings.REDIS_PORT, settings.REDIS_PASSWD)
url = "{}?type=remote&log={}&hostname={}&cmd={}".format(
settings.TAIL_LOG_URL, log_path, minion_id, cmd)
local.cmd(minion_id, "cmd.run",
["kill `ps aux|grep logtail.py|grep %s|grep -v grep|awk '{print $2}'`" % (log_path,)])
return JsonResponse({"success": True, "url": url})
下面来看看logtail.py的实现:
from __future__ import unicode_literals, division
import math
import time
import sys
import socket
import signal
import redis
FLAG = True
def get_last_lines(f, num=10):
"""读取文件的最后几行
size = 1000
try:
f.seek(-size, 2)
except IOError:
f.seek(0)
return f.readlines()[-num:]
data = f.read()
lines = data.splitlines()
n = len(lines)
while n < num:
size *= int(math.ceil(num / n))
try:
f.seek(-size, 2)
except IOError:
f.seek(0)
return f.readlines()[-num:]
data = f.read()
lines = data.splitlines()
n = len(lines)
return lines[-num:]
def process_line(r, channel, line):
r.publish(channel, line.strip())
def sig_handler(signum, frame):
global FLAG
FLAG = False
signal.signal(signal.SIGTERM, sig_handler)
signal.signal(signal.SIGALRM, sig_handler)
signal.alarm(300)
def get_hostname():
return socket.gethostname()
def force_str(s):
if isinstance(s, unicode):
s = s.encode("utf-8")
return s
def tail():
password = sys.argv[6]
if password == "None":
password = None
r = redis.StrictRedis(host=sys.argv[4], port=sys.argv[5], password=password, db=5)
log_path = sys.argv[2]
line_count = int(sys.argv[3])
channel = "logs:{hostname}:{log_path}".format(hostname=sys.argv[1], log_path=log_path)
with open(log_path, 'r') as f:
last_lines = get_last_lines(f, line_count)
for line in last_lines:
process_line(r, channel, force_str(line))
try:
while FLAG:
line = f.readline()
if not line:
time.sleep(0.05)
continue
process_line(r, channel, line)
except KeyboardInterrupt:
print("Exiting...")
if __name__ == "__main__":
if len(sys.argv) < 6:
print "Usage: %s minion_id log_path host port redis_pass"
exit(1)
tail()
到此为止,整个实时读取远程日志的流程就讲完了。
github: https://github.com/tuxinhang1989/logs
大致思路:1.利用tornado提供的websocket功能与浏览器建立长连接,读取实时日志并输出到浏览器2.写一个实时读取日志的脚本,利用saltstack远程执行,并把实时日志发往redis中。3.tornado读取redis中的信息,发往浏览器。此过程用到了redis的发布和订阅功能。先看一下tornado中是如何处理的:import osimport s
需求:正在开发一个监控系统,要求将多台日志信息实时采集出来,然后保存到Kafka中,后期对日志数据进行spark运算、大数据处理分析,日志按大小,时间切分。
运用的技术:RandomAccessFile类中seek方法可以从指定位置读取文件,可以用来实现文件实时读取,JDK文档有对RandomAccessFile的介绍。
思想:在每一次读取后,close一下就不会影响重命
public static String getLogContent(String url) {
StringBuffer sb = new StringBuffer();
URL HttpUrl;
HttpURLConnection con = ...
二 简单的tornado的‘Hello World’实现
pip install tornado安装就可以。我安装的环境是win10,python 2.10.1, tornado 5.1。稍微提一下,由于一些原因我是离线安装的,这时候需要下载三个依赖(找到路径 pip install **.whl)。
主要需求:连接远程服务器(HBase写入端),监听HBase写入日志,截取ROWKEY,写入Kafka。
实现方式:通过ch.ethz.ganymed包ganymed-ssh2远程连接linux服务器,执行tail命令,生成实时数据流,写入kafka
主要代码:
pom.xml
<dependency>
<groupId>org.apache.kafka...
古怪的需求
在实习的公司碰到一个古怪的需求:在一台服务器上写日志文件,每当日志文件写到一定大小时,比如是1G,会将这个日志文件改名成另一个名字,并新建一个与原文件名相同的日志文件,再往这个新建的日志文件里写数据;要求写一个程序能实时地读取日志文件中的内容,并且不能影响写操作与重命名操作。
RandomAccessFile类中seek方法可以从指定位置读取文件,可以用来实现文件实时读取。JDK文档对...
二、使用Django创建工程
注意:我的项目名用helloword代替,APP名用helloWeb代替,通过访问MySQL数据库实现一个登陆的功能
1、新建一个Django项目
django-admin.py startpro
def __new__(cls, name, bases, dct):
print("calling Meta's __new__", cls)
return type.__new__(cls, name, bases, dct)
def __call__(cls, *args, **kwargs):
print("calling Meta's __call__", cls)
i = cls.__new__(cls)
i.__init__(*args, **kwargs)
return i
class A(metaclass = Meta):
__metaclass__ = Meta
def __new__(cls, *args, **kwargs):
print("calling A's __new__")
return object.__new__(cls)
def __init__(self, *args, **kwargs):
print("calling A's __init__")
a = A()
print("a is", a)
[/code]
django+tornado实现实时查看远程日志
xiaok_007: