# -*- coding:utf-8 -*-
# pyftpdlib 搭建ftp服务端
from pyftpdlib.authorizers import DummyAuthorizer
from pyftpdlib.handlers import FTPHandler
from pyftpdlib.servers import FTPServer
class ftp_tools:
port = 2121
user = "admin"
pwd = "12345"
# 创建ftp服务端
def creat_ftp_server(self):
# 实例化DummyAuthorizer来创建ftp用户
authorizer = DummyAuthorizer()
# 参数:用户名,密码,目录,权限[见拓展部分服务权限]
authorizer.add_user(self.user, self.pwd, r'D:\FTP', perm='elradfmwMT')
handler = FTPHandler
handler.authorizer = authorizer
# 参数:IP,端口,handler
server = FTPServer(('0.0.0.0', self.port), handler) # 设置为0.0.0.0为本机的IP地址
server.serve_forever()
if __name__ == "__main__":
ftp_tools().creat_ftp_server()
运行结果:
2、连接FTP
连接FTP
# 连接ftp
def conn_ftp(self):
ftp = FTP()
# 连接ftp
ftp.connect(host=self.host, port=self.port, timeout=10)
# 登录ftp
ftp.login(user=self.user, passwd=self.pwd)
# 关闭ftp
ftp.close()
3、上传和下载
从本地上传文件到FTP
def uploadfile(ftppath, localpath):
ftp = FTP()
# 连接ftp
ftp.connect(host=self.host, port=self.port, timeout=10)
# 登录ftp
ftp.login(user=self.user, passwd=self.pwd)
# 设置的缓冲区大小
bufsize = 1024
fp = open(localpath, 'rb')
# 二进制存储器
ftp.storbinary('STOR ' + ftppath, fp, bufsize)
# 参数为0,关闭调试模式
ftp.set_debuglevel(0)
# 关闭文件并退出ftp
fp.close()
ftp.quit()
从FTP下载文件到本地
def downloadfile(ftppath, localpath):
ftp = FTP()
# 连接ftp
ftp.connect(host=self.host, port=self.port, timeout=10)
# 登录ftp
ftp.login(user=self.user, passwd=self.pwd)
# 设置的缓冲区大小
bufsize = 1024
fp = open(localpath, 'wb')
# 二进制文件
ftp.retrbinary('RETR ' + ftppath, fp.write, bufsize)
# 参数为0,关闭调试模式
ftp.set_debuglevel(0)
# 关闭文件并退出ftp
fp.close()
ftp.quit()
# coding: utf-8
from ftplib import FTP
from loguru import logger as logs
class FTP_tools:
"""FTP 操作工具"""
def __init__(self, pathType, section='ftp'):
:param pathType:路径类型
:param section:配置文件section名
self.section = section
self.host = config.get(self.section, "ftp_host")
self.port = config.get(self.section, "ftp_port")
self.user = config.get(self.section, "ftp_user")
self.password = config.get(self.section, "ftp_password")
self.pathType = pathType
# 根据功能判断服务器及本地文件路径
if self.pathType == "path1":
self.server_path = config.get(self.section, "ftp_cloudpath_path1")
self.local_path = config.get(self.section, "ftp_localpath_path1")
elif self.pathType == "path2":
self.server_path = config.get(self.section, "ftp_cloudpath_path2")
self.local_path = config.get(self.section, "ftp_localpath_path2")
else:
logs.error("pathType传值错误,暂且仅支持image、loanfile文件的上传下载")
sys.exit()
logs.info("======== FTP连接开始 ========")
logs.debug("FTP连接服务器地址 - {}:{}".format(self.host, self.port))
logs.debug("FTP连接服务器用户 - {}/{}".format(self.user, self.password))
# 实例化
self.ftp = FTP()
def __del__(self):
"""析构函数 对象资源被释放时触发"""
# 关闭并退出ftp
self.ftp.quit()
logs.info("======= FTP 操作结束,连接关闭 =======")
def ftp_upload_file(self, file, timeout: int = 10, bufsize: int = 1024):
FTP上传文件,注意:不支持文件夹
:param file:需要上传的文件名,如:test.txt
:param timeout: 超时时间(默认),必须是int类型
:param bufsize: 缓冲区大小
:return:
serveFilePath = os.path.join(self.server_path, file)
localFilePath = os.path.join(self.local_path, file)
logs.debug("FTP服务器路径:{}".format(serveFilePath))
logs.debug("FTP本地路径:{}".format(localFilePath))
try:
# 连接ftp
self.ftp.connect(host=self.host, port=eval(self.port), timeout=timeout)
# 登录ftp
self.ftp.login(user=self.user, passwd=self.password)
with open(localFilePath, 'wb') as fp:
# fp = open(localFilePath, 'rb')
# 二进制存储器
self.ftp.storbinary('STOR ' + serveFilePath, fp, bufsize)
# 参数为0,关闭调试模式
self.ftp.set_debuglevel(0)
except Exception as e:
logs.error("FTP上传操作异常,异常原因:{}".format(e))
# finally:
# # 关闭文件
# fp.close()
def ftp_download_file(self, file, timeout: int = 10, bufsize: int = 1024):
下载文件,注意:不支持文件夹
:param file:需要下载的文件名,如:test.txt
:param timeout: 超时时间(默认),必须是int类型
:param bufsize: 设置的缓冲区大小
:return:
serverFilePath = os.path.join(self.server_path, file)
localFilePath = os.path.join(self.local_path, "download/"+file)
logs.debug("FTP服务器路径:{}".format(serverFilePath))
logs.debug("FTP本地路径:{}".format(localFilePath))
# 判断下载路径是否存在
# logs.debug(os.path.dirname(localFilePath))
if not os.path.exists(os.path.dirname(localFilePath)):
os.makedirs(os.path.dirname(localFilePath))
try:
# 连接ftp
self.ftp.connect(host=self.host, port=eval(self.port), timeout=timeout)
# 登录ftp
self.ftp.login(user=self.user, passwd=self.password)
with open(localFilePath, 'wb') as fp:
# fp = open(localFilePath, 'wb')
# 二进制文件
self.ftp.retrbinary('RETR ' + serverFilePath, fp.write, bufsize)
# 参数为0,关闭调试模式
self.ftp.set_debuglevel(0)
except Exception as e:
logs.error("FTP下载操作异常,异常原因:{}".format(e))
# finally:
# # 关闭文件
# fp.close()
if __name__ == '__main__':
"""run"""
file = "test01.txt"
FTP_tools("path1").ftp_download_file(file)
# FTP_tools("path1").ftp_download_file(file)
ftp_tools.py
perm 权限参数解释
读取权限:
"e" =更改目录(CWD,CDUP命令)
"l" =列表文件(LIST,NLST,STAT,MLSD,MLST,SIZE命令)
"r" =从服务器检索文件(RETR命令)
写入权限:
"a" =将数据追加到现有文件(APPE命令)
"d" =删除文件或目录(DELE,RMD命令)
"f" =重命名文件或目录(RNFR,RNTO命令)
"m" =创建目录(MKD命令)
"w" =将文件存储到服务器(STOR,STOU命令)
"M" =更改文件模式/权限(SITE CHMOD命令)
"T" =更改文件修改时间(SITE MFMT命令)
ftp常用函数释义
from ftplib import FTP #加载ftp模块
ftp=FTP() #实例化对象
ftp.set_debuglevel(2) #打开调试级别2,显示详细信息
ftp.connect("IP","port") #连接ftp的ip和端口
ftp.login("user","password") #连接ftp的用户名和密码
print(ftp.getwelcome()) #打印出欢迎信息
ftp.cmd("xxx/xxx") #进入远程目录
bufsize=1024 #设置的缓冲区大小
filename="xxxx.txt" #需要下载的文件
file_handle=open(filename,"wb").write #以写模式在本地打开文件
ftp.set_debuglevel(0) #关闭调试模式
ftp.quit() #退出ftp (发送命令并关闭连接,出现错误会抛异常)
ftp.close() #关闭ftp (单方面强制关闭ftp)
ftp.cwd(pathname) #设置FTP当前操作的路径
ftp.dir() #显示目录下所有目录信息
ftp.nlst() #获取目录下的文件
ftp.mkd(pathname) #新建远程目录
ftp.pwd() #返回当前所在位置
ftp.rmd(dirname) #删除远程目录
ftp.delete(filename) #删除远程文件
ftp.rename(fromname, toname) #将fromname修改名称为toname。
ftp.storbinaly("STOR filename.txt",file_handel,bufsize) #上传目标文件
ftp.retrbinary("RETR filename.txt",file_handel,bufsize) #下载FTP文件
-------------------------------------------------------------------------------------
如果万事开头难 那请结局一定圆满 @
Phoenixy
-------------------------------------------------------------------------------------