Python下载M3U8加密视频示例[通俗易懂]
大家好,又见面了,我是你们的朋友全栈君。
大家好,我是小小明。
最近看到几个 视频 网站的地址依然是m3u8格式,不禁有了使用 python 进行下载的想法,虽然下载m3u8格式视频的工具很多,但如果我们自行编码就能应对更多的情况。
关于m3u8的基础知识可以参考: Python实时下载B站直播间视频(M3U8视频流)
下面我们将使用Python下载m3u8格式的加密离线视频流。
游览器抓包过滤能够获取该影片的m3u8播放地址:
首先,测试一下该地址:
import m3u8
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36"
playlist = m3u8.load(
uri='https://vod8.wenshibaowenbei.com/20210628/g4yNLlI7/index.m3u8', headers=headers)
playlist.data
{'media_sequence': None,
'is_variant': True,
'is_endlist': False,
'is_i_frames_only': False,
'is_independent_segments': False,
'playlist_type': None,
'playlists': [{'uri': '/20210628/g4yNLlI7/1000kb/hls/index.m3u8',
'stream_info': {'program_id': 1,
'bandwidth': 1000000,
'resolution': '1280x720'}}],
'segments': [],
'iframe_playlists': [],
'media': [],
'keys': [],
'rendition_reports': [],
'skip': {},
'part_inf': {},
'session_data': [],
'session_keys': []}
从结果看到,这是一个嵌套的地址。
所以写个方法解析真实地址:
import m3u8
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36"
def get_real_url(url):
playlist = m3u8.load(uri=url, headers=headers)
return playlist.playlists[0].absolute_uri
real_url = get_real_url(
'https://vod8.wenshibaowenbei.com/20210628/g4yNLlI7/index.m3u8')
real_url
'https://vod8.wenshibaowenbei.com/20210628/g4yNLlI7/1000kb/hls/index.m3u8'
解析真实地址的加密key:
playlist = m3u8.load(uri=real_url, headers=headers)
key = playlist.keys[-1]
print(key.uri, key.method, key.iv)
https://ts8.hhmm0.com:9999/20210628/g4yNLlI7/1000kb/hls/key.key AES-128 None
可以看到密钥下载地址和加密类型。
使用request下载密钥:
import requests
r = requests.get(playlist.keys[0].uri, headers=headers)
key = r.content
key
b'7ec5143edebbc899'
可以单线程直接下载视频:
import time
n = len(playlist.segments)
size = 0
start = time.time()
for i, seg in enumerate(playlist.segments, 1):
r = requests.get(seg.absolute_uri, headers=headers)
data = r.content
data = AESDecrypt(data, key=key, iv=key)
size += len(data)
with open("reusult.mp4", "ab") as f:
f.write(data)
print(f"\r下载进度({i}/{n}),已下载:{size/1024/1024:.2f}MB,下载已耗时:{time.time()-start:.2f}s", end=" ")
下载进度(1435/1435),已下载:424.69MB,下载已耗时:850s
单线程下载,好处是不会产生多余的文件,缺点是速度太慢了,一个视频下载了10多分钟。
下面我们整理一下完整的代码:
单线程视频下载的完整代码
import time
from Crypto.Util.Padding import pad
from Crypto.Cipher import AES
import requests
import m3u8
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36"
def get_real_url(url):
playlist = m3u8.load(uri=url, headers=headers)
return playlist.playlists[0].absolute_uri
def AESDecrypt(cipher_text, key, iv):
cipher_text = pad(data_to_pad=cipher_text, block_size=AES.block_size)
aes = AES.new(key=key, mode=AES.MODE_CBC, iv=key)
cipher_text = aes.decrypt(cipher_text)
return cipher_text
def download_m3u8_video(url, save_name):
real_url = get_real_url(url)
playlist = m3u8.load(uri=real_url, headers=headers)
key = requests.get(playlist.keys[-1].uri, headers=headers).content
n = len(playlist.segments)
size = 0
start = time.time()
for i, seg in enumerate(playlist.segments, 1):
r = requests.get(seg.absolute_uri, headers=headers)
data = r.content
data = AESDecrypt(data, key=key, iv=key)
size += len(data)
with open(save_name, "ab" if i != 1 else "wb") as f:
f.write(data)
print(
f"\r下载进度({i}/{n}),已下载:{size/1024/1024:.2f}MB,下载已耗时:{time.time()-start:.2f}s", end=" ")
download_m3u8_video('https://vod8.wenshibaowenbei.com/20210628/g4yNLlI7/index.m3u8', '走进家门.mp4')
多线程下载改造
对于多线程,由于下载的文件可能出现间断,所以我们不能直接追加到目标视频中,可以先下载下来,最后统一合并并删除。
先创建ts视频下载的方法:
import os
import requests
def download_ts(url, key, i):
r = requests.get(url, headers=headers)
data = r.content
data = AESDecrypt(data, key=key, iv=key)
with open(f"tmp/{i:0>5d}.ts", "ab") as f:
f.write(data)
print(f"\r{i:0>5d}.ts已下载", end=" ")
if not os.path.exists("tmp"):
os.mkdir('tmp')
任意下载一个片段测试一下:
import requests
import m3u8
def get_real_url(url):
playlist = m3u8.load(uri=url, headers=headers)
return playlist.playlists[0].absolute_uri
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36"
real_url = get_real_url(
'https://vod8.wenshibaowenbei.com/20210628/g4yNLlI7/index.m3u8')
playlist = m3u8.load(uri=real_url, headers=headers)
key = requests.get(playlist.keys[-1].uri, headers=headers).content
download_ts(playlist.segments[0].absolute_uri, key, 1)
00001.ts已下载
检查该片段可以正常播放。
然后执行以下方法即可10个线程同时一起下载:
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=10) as pool:
for i, seg in enumerate(playlist.segments):
pool.submit(download_ts, seg.absolute_uri, key, i)
经过一分20秒左右的时间,所有视频流已经全部下载完毕,比单线程的速度快了不止10倍。
最后我们实现文件的合并和ts临时文件清除:
import glob
with open('video.mp4', 'wb') as fw:
files = glob.glob('tmp/*.ts')
for file in files:
with open(file, 'rb') as fr:
fw.write(fr.read())
print(f'\r{file}已合并!总数:{len(files)}', end=" ")
os.remove(file)
执行后,已经在1秒左右时间合并并清除完临时文件。
多线程下载的完整代码
import glob
from concurrent.futures import ThreadPoolExecutor
import m3u8
import os
import requests
from Crypto.Util.Padding import pad
from Crypto.Cipher import AES
import requests
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36"
def download_ts(url, key, i):
r = requests.get(url, headers=headers)
data = r.content
data = AESDecrypt(data, key=key, iv=key)
with open(f"tmp/{i:0>5d}.ts", "ab") as f:
f.write(data)
print(f"\r{i:0>5d}.ts已下载", end=" ")
def get_real_url(url):
playlist = m3u8.load(uri=url, headers=headers)
return playlist.playlists[0].absolute_uri
def AESDecrypt(cipher_text, key, iv):
cipher_text = pad(data_to_pad=cipher_text, block_size=AES.block_size)
aes = AES.new(key=key, mode=AES.MODE_CBC, iv=key)
cipher_text = aes.decrypt(cipher_text)
return cipher_text
def download_m3u8_video(url, save_name, max_workers=10):
if not os.path.exists("tmp"):
os.mkdir('tmp')
real_url = get_real_url(url)
playlist = m3u8.load(uri=real_url, headers=headers)
key = requests.get(playlist.keys[-1].uri, headers=headers).content
with ThreadPoolExecutor(max_workers=max_workers) as pool:
for i, seg in enumerate(playlist.segments):
pool.submit(download_ts, seg.absolute_uri, key, i)
with open(save_name, 'wb') as fw:
files = glob.glob('tmp/*.ts')
for file in files:
with open(file, 'rb') as fr:
fw.write(fr.read())