# -*- coding:utf-8 -*-
# pyftpdlib 搭建ftp服务端
from pyftpdlib.authorizers import DummyAuthorizer
from pyftpdlib.handlers  import FTPHandler
from pyftpdlib.servers import FTPServer
class ftp_tools:
    port = 2121
    user = "admin"
    pwd = "12345"
    # 创建ftp服务端
    def creat_ftp_server(self):
        # 实例化DummyAuthorizer来创建ftp用户
        authorizer = DummyAuthorizer()
        # 参数:用户名,密码,目录,权限[见拓展部分服务权限]
        authorizer.add_user(self.user, self.pwd, r'D:\FTP', perm='elradfmwMT')
        handler = FTPHandler
        handler.authorizer = authorizer
        # 参数:IP,端口,handler
        server = FTPServer(('0.0.0.0', self.port), handler)  # 设置为0.0.0.0为本机的IP地址
        server.serve_forever()
if __name__ == "__main__":
    ftp_tools().creat_ftp_server()

运行结果:

2、连接FTP

连接FTP

    # 连接ftp
    def conn_ftp(self):
        ftp = FTP()
        # 连接ftp
        ftp.connect(host=self.host, port=self.port, timeout=10)
        # 登录ftp
        ftp.login(user=self.user, passwd=self.pwd)
        # 关闭ftp
        ftp.close()

3、上传和下载

从本地上传文件到FTP

    def uploadfile(ftppath, localpath):
        ftp = FTP()
        # 连接ftp
        ftp.connect(host=self.host, port=self.port, timeout=10)
        # 登录ftp
        ftp.login(user=self.user, passwd=self.pwd)
        # 设置的缓冲区大小
        bufsize = 1024
        fp = open(localpath, 'rb')
        # 二进制存储器
        ftp.storbinary('STOR ' + ftppath, fp, bufsize)
        # 参数为0,关闭调试模式
        ftp.set_debuglevel(0)
        # 关闭文件并退出ftp
        fp.close()
        ftp.quit()    

从FTP下载文件到本地

    def downloadfile(ftppath, localpath):
        ftp = FTP()
        # 连接ftp
        ftp.connect(host=self.host, port=self.port, timeout=10)
        # 登录ftp
        ftp.login(user=self.user, passwd=self.pwd)
        # 设置的缓冲区大小
        bufsize = 1024
        fp = open(localpath, 'wb')
        # 二进制文件
        ftp.retrbinary('RETR ' + ftppath, fp.write, bufsize)
        # 参数为0,关闭调试模式
        ftp.set_debuglevel(0)
      # 关闭文件并退出ftp
        fp.close()
     ftp.quit()
# coding: utf-8
from ftplib import FTP
from loguru import logger as logs
class FTP_tools:
    """FTP 操作工具"""
    def __init__(self, pathType, section='ftp'):
        :param pathType:路径类型
        :param section:配置文件section名
        self.section = section
        self.host = config.get(self.section, "ftp_host")
        self.port = config.get(self.section, "ftp_port")
        self.user = config.get(self.section, "ftp_user")
        self.password = config.get(self.section, "ftp_password")
        self.pathType = pathType
        # 根据功能判断服务器及本地文件路径
        if self.pathType == "path1":
            self.server_path = config.get(self.section, "ftp_cloudpath_path1")
            self.local_path = config.get(self.section, "ftp_localpath_path1")
        elif self.pathType == "path2":
            self.server_path = config.get(self.section, "ftp_cloudpath_path2")
            self.local_path = config.get(self.section, "ftp_localpath_path2")
        else:
            logs.error("pathType传值错误,暂且仅支持image、loanfile文件的上传下载")
            sys.exit()
        logs.info("======== FTP连接开始 ========")
        logs.debug("FTP连接服务器地址 - {}:{}".format(self.host, self.port))
        logs.debug("FTP连接服务器用户 - {}/{}".format(self.user, self.password))
        # 实例化
        self.ftp = FTP()
    def __del__(self):
        """析构函数 对象资源被释放时触发"""
        # 关闭并退出ftp
        self.ftp.quit()
        logs.info("======= FTP 操作结束,连接关闭 =======")
    def ftp_upload_file(self, file, timeout: int = 10, bufsize: int = 1024):
        FTP上传文件,注意:不支持文件夹
        :param file:需要上传的文件名,如:test.txt
        :param timeout: 超时时间(默认),必须是int类型
        :param bufsize: 缓冲区大小
        :return: 
        serveFilePath = os.path.join(self.server_path, file)
        localFilePath = os.path.join(self.local_path, file)
        logs.debug("FTP服务器路径:{}".format(serveFilePath))
        logs.debug("FTP本地路径:{}".format(localFilePath))
        try:
            # 连接ftp
            self.ftp.connect(host=self.host, port=eval(self.port), timeout=timeout)
            # 登录ftp
            self.ftp.login(user=self.user, passwd=self.password)
            with open(localFilePath, 'wb') as fp:
                # fp = open(localFilePath, 'rb')
                # 二进制存储器
                self.ftp.storbinary('STOR ' + serveFilePath, fp, bufsize)
                # 参数为0,关闭调试模式
                self.ftp.set_debuglevel(0)
        except Exception as e:
            logs.error("FTP上传操作异常,异常原因:{}".format(e))
        # finally:
        #     # 关闭文件
        #     fp.close()
    def ftp_download_file(self, file, timeout: int = 10, bufsize: int = 1024):
        下载文件,注意:不支持文件夹
        :param file:需要下载的文件名,如:test.txt
        :param timeout: 超时时间(默认),必须是int类型
        :param bufsize: 设置的缓冲区大小
        :return: 
        serverFilePath = os.path.join(self.server_path, file)
        localFilePath = os.path.join(self.local_path, "download/"+file)
        logs.debug("FTP服务器路径:{}".format(serverFilePath))
        logs.debug("FTP本地路径:{}".format(localFilePath))
        # 判断下载路径是否存在
        # logs.debug(os.path.dirname(localFilePath))
        if not os.path.exists(os.path.dirname(localFilePath)):
            os.makedirs(os.path.dirname(localFilePath))
        try:
            # 连接ftp
            self.ftp.connect(host=self.host, port=eval(self.port), timeout=timeout)
            # 登录ftp
            self.ftp.login(user=self.user, passwd=self.password)
            with open(localFilePath, 'wb') as fp:
                # fp = open(localFilePath, 'wb')
                # 二进制文件
                self.ftp.retrbinary('RETR ' + serverFilePath, fp.write, bufsize)
                # 参数为0,关闭调试模式
                self.ftp.set_debuglevel(0)
        except Exception as e:
            logs.error("FTP下载操作异常,异常原因:{}".format(e))
        # finally:
        #     # 关闭文件
        #     fp.close()
if __name__ == '__main__':
    """run"""
    file = "test01.txt"
    FTP_tools("path1").ftp_download_file(file)
    # FTP_tools("path1").ftp_download_file(file)
ftp_tools.py

perm 权限参数解释

读取权限:
  "e" =更改目录(CWD,CDUP命令)
  "l" =列表文件(LIST,NLST,STAT,MLSD,MLST,SIZE命令)
  "r" =从服务器检索文件(RETR命令)
写入权限:
  "a" =将数据追加到现有文件(APPE命令)
  "d" =删除文件或目录(DELE,RMD命令)
  "f" =重命名文件或目录(RNFR,RNTO命令)
  "m" =创建目录(MKD命令)
  "w" =将文件存储到服务器(STOR,STOU命令)
  "M" =更改文件模式/权限(SITE CHMOD命令)
  "T" =更改文件修改时间(SITE MFMT命令)

ftp常用函数释义

from ftplib import FTP                          #加载ftp模块
ftp=FTP()               #实例化对象 ftp.set_debuglevel(2)               #打开调试级别2,显示详细信息 ftp.connect("IP","port")               #连接ftp的ip和端口 ftp.login("user","password")               #连接ftp的用户名和密码 print(ftp.getwelcome())               #打印出欢迎信息 ftp.cmd("xxx/xxx")               #进入远程目录 bufsize=1024                #设置的缓冲区大小 filename="xxxx.txt"                  #需要下载的文件 file_handle=open(filename,"wb").write             #以写模式在本地打开文件 ftp.set_debuglevel(0)               #关闭调试模式 ftp.quit()                #退出ftp (发送命令并关闭连接,出现错误会抛异常) ftp.close()                            #关闭ftp (单方面强制关闭ftp)
ftp.cwd(pathname)               #设置FTP当前操作的路径 ftp.dir()               #显示目录下所有目录信息 ftp.nlst()                #获取目录下的文件 ftp.mkd(pathname)                #新建远程目录 ftp.pwd()               #返回当前所在位置 ftp.rmd(dirname)                #删除远程目录 ftp.delete(filename)                #删除远程文件 ftp.rename(fromname, toname)                  #将fromname修改名称为toname。 ftp.storbinaly("STOR filename.txt",file_handel,bufsize) #上传目标文件 ftp.retrbinary("RETR filename.txt",file_handel,bufsize) #下载FTP文件
-------------------------------------------------------------------------------------

如果万事开头难 那请结局一定圆满 @ Phoenixy

-------------------------------------------------------------------------------------