@property
def config(self):
return self._config
@config.setter
def config(self, value):
self._set(_config=value)
class DataProxy
shorthand = self.derive_shorthand(host)
host = shorthand["host"]
err = (
"You supplied the {} via both shorthand and kwarg! Please pick one." # noqa
if shorthand["user"] is not None:
if user is not None:
raise ValueError(err.format("user"))
user = shorthand["user"]
if shorthand["port"] is not None:
if port is not None:
raise ValueError(err.format("port"))
port = shorthand["port"]
Connection.__init__()
这里是处理host参数的, host可以有一下几种传参形式
user@host:port # 例如: root@10.10.10.10:6666
user@host # 例如: root@10.10.10.10
host:port # 例如: 10.10.10.10:6666
host # 例如: 10.10.10.10
前三种会调用
self.derive_shorthand(host)
分别解析出
self.host
,
self.user
和
self.port
,最后一种需单独传入
user
,
port
。
如果用前三种传入方式的话,不能再重复传入
user
或
port
了,会抛出异常
以上分析
定位报错位置:
kwargs = dict(
self.connect_kwargs,
username=self.user,
hostname=self.host,
port=self.port,
if self.gateway:
kwargs["sock"] = self.open_gateway()
if self.connect_timeout:
kwargs["timeout"] = self.connect_timeout
# Strip out empty defaults for less noisy debugging
if "key_filename" in kwargs and not kwargs["key_filename"]:
del kwargs["key_filename"]
# Actually connect!
self.client.connect(**kwargs)
/usr/python/lib/python3.7/site-packages/paramiko/client.py in connect(self, hostname, port, username, password, pkey, key_filename, timeout, allow_agent, look_for_keys, compress, sock, gss_auth, gss_kex, gss_deleg_creds, gss_host, banner_timeout, auth_timeout, gss_trust_dns, passphrase)
435 gss_deleg_creds,
436 t.gss_host,
--> 437 passphrase,
438 )
可以看到,在执行connect方法的时候解析参数错误,这里我们没有传递passphrase参数,导致ssh连接报错
传参的时候是将kwargs传了过去,刚才我们的参数里面缺少self.connect_kwargs这个参数
connect的定义为:
def connect(
self,
hostname,
port=SSH_PORT,
username=None,
password=None, # 你
pkey=None, # 你
key_filename=None, # 还有你
timeout=None,
allow_agent=True,
look_for_keys=True,
compress=False,
sock=None,
gss_auth=False,
gss_kex=False,
gss_deleg_creds=True,
gss_host=None,
banner_timeout=None,
auth_timeout=None,
gss_trust_dns=True,
passphrase=None,
)
使用password方式:
In [27]: c = Connection('47.104.148.179',user='root', connect_kwargs={'password':'your password'})
In [28]: result = c.run('uname -s')
Linux
In [29]: result.stdout.strip() == "Linux"
Out[29]: True
In [30]: result.exited
Out[30]: 0
In [31]: result.ok
Out[31]: True
In [32]: result.command
Out[32]: 'uname -s'
In [33]: result.connection
Out[33]: <Connection host=47.104.148.179>
In [39]: result.connection.host
Out[39]: '47.104.148.179
使用key_filename方式:
In [11]: c = Connection('47.104.148.179', user='root', connect_kwargs={'key_filename':'/root/.ssh/authorized_keys'}
...: )
In [12]: c.run("uname -s")
Linux
Out[12]: <Result cmd='uname -s' exited=0>
In [13]: c.run("ls")
coding_time
comment_tree
python_document_manage
python_linux_automation
python_linux_manage
python_linux_monitor
python_linux_network_manage
sys_back
sys_manager
Out[13]: <Result cmd='ls' exited=0>
通过run命令使用sudo提权执行命令
>>> from fabric import Connection
>>> c = Connection('db1')
>>> c.run('sudo useradd mydbuser', pty=True)
[sudo] password:
<Result cmd='sudo useradd mydbuser' exited=0>
>>> c.run('id -u mydbuser')
<Result cmd='id -u mydbuser' exited=0>
auto-response
自动响应:
当用户是普通用户的时候,可以使用run里面的watchers用法,进行自动响应
添加用户
In [21]: c.run('useradd mydbuser', pty=True)
Out[21]: <Result cmd='useradd mydbuser' exited=0>
In [23]: c.run('id mydbuser')
uid=1003(mydbuser) gid=1003(mydbuser) groups=1003(mydbuser)
Out[23]: <Result cmd='id mydbuser' exited=0>
执行命令
In [21]: from invoke import Responder
In [22]: from fabric import Connection
In [23]: c = Connection('47.104.148.179', user='ykyk', connect_kwargs={'password':'123456'})
In [30]: sudopass = Responder(
...: pattern=r'\[sudo\] password for ykyk:',
...: response='xxxxxxx\n',
...:
In [29]: c.run('sudo whoami', pty=True, watchers=[sudopass])
[sudo] password for ykyk:
Out[29]: <Result cmd='sudo whoami' exited=0>
高级用法:
watchers/responders 在上一步很有效,但是每次使用使用时都要设置一次模板,在实际环境中不够便利,
Invoke提供 Context.sudo 方法,这个方法能够处理大部分常用情况,而不会越权
使用这个方法之前必须保证用户密码已经存储在环境变量中,剩余的就可以交给Connection.sudo来解决
示例如下:
>>> import getpass
>>> from fabric import Connection, Config
>>> sudo_pass = getpass.getpass("What's your sudo password?")
What's your sudo password?
>>> config = Config(overrides={'sudo': {'password': sudo_pass}})
>>> c = Connection('db1', config=config)
>>> c.sudo('whoami', hide='stderr')
<Result cmd="...whoami" exited=0>
>>> c.sudo('useradd mydbuser')
<Result cmd="...useradd mydbuser" exited=0>
>>> c.run('id -u mydbuser')
<Result cmd='id -u mydbuser' exited=0>
传输文件
In [1]: ls
coding_time python_document_manage/ python_linux_manage/ python_linux_network_manage/ sys_manager/
comment_tree/ python_linux_automation/ python_linux_monitor/ sys_back/
In [2]: from fabric import Connection
In [3]: result = Connection('own').put('coding_time', remote='/tmp/')
In [4]: print('Upload {0.local} to {0.remote}'.format(result))
Upload /root/coding_time to /tmp/coding_time
多任务整合
示例:
当我们需要上传某个文件到服务器并解压到特定目录时,可以这样写:
In [1]: ls
binlog2sql-master/ paramiko-master.zip vim81/
cclang/ Pydiction-master/ vim-8.1.tar.bz2
c_study/ Pydiction-master.zip vim-master/
master.zip pyenv-master.zip vim-master.zip
mysql-8.0.13-linux-glibc2.12-x86_64.tar.xz pyenv-virtualenv-master.zip vim-snipmate/
paramiko-master/ rabbitmq-server-3.6.6-1.el7.noarch.rpm
In [2]: from fabric import Connection
In [3]: c = Connection('own')
In [4]: c.put('mysql-8.0.13-linux-glibc2.12-x86_64.tar.xz','/tmp')
Out[4]: <fabric.transfer.Result at 0x7fedf9e36518>
In [6]: c.run('tar xf /tmp/mysql-8.0.13-linux-glibc2.12-x86_64.tar.xz -C /tmp')
Out[6]: <Result cmd='tar xf /tmp/mysql-8.0.13-linux-glibc2.12-x86_64.tar.xz -C /tmp' exited=0>
这里我们可以直接封装成一个方法:
In [7]: def upload_file(c):
...: c.put('mysql-8.0.13-linux-glibc2.12-x86_64.tar.xz','/tmp')
...: c.run('tar xf /tmp/mysql-8.0.13-linux-glibc2.12-x86_64.tar.xz -C /tmp')
在多个服务器上执行命令
In [3]: for host in ('own', 'redis','mysql_test'):
...: result = Connection(host).run('uname -s')
...: print("{}: {}".format(host, result.stdout.strip()))
Linux
own: Linux
Linux
redis: Linux
Linux
mysql_test: Linux
还可以使用fabric中的SerialGroup方法:
In [4]: from fabric import SerialGroup as Group
In [5]: results = Group('own', 'redis', 'mysql_test').run('uname -s')
Linux
Linux
Linux
In [8]: for connection, result in results.items():
...: print("{0.host}: {1.stdout}".format(connection, result))
47.104.148.xx: Linux
116.62.195.xx: Linux
47.99.123.xx: Linux
集成到一起:
from fabric import SerialGroup as Group
def upload_and_unpack(c):
if c.run('test -f /opt/mydata/myfile', warn=True).failed:
c.put('myfiles.tgz', '/opt/mydata')
c.run('tar -C /opt/mydata -xzvf /opt/mydata/myfiles.tgz')
for connection in Group('web1', 'web2', 'web3'):
upload_and_unpack(connection)
fabric 命令行工具
fabric提供了一个类似Shell终端的工具:fab
fab执行命令时,默认引用一个名称为fabfile.py的文件,这个文件包含一个到多个函数,使用fab命令可以调用这些函数, 函数在fabric中成为task
下面给出fabfile.py的样例文件:
from fabric import task
@task
def hostname(c):
c.run('hostname')
@task
def ls(path='.'):
c.run('ls {}'.format(path))
def tail(path='/etc/passwd', line=10):
sudo('tail -n {0}, {1}'.format(line, path))
注意: 新版本的fab取消了api,所以相应的方法较之旧版本使用起来更加简洁,许多方法较之以前变化较大
[root@ykyk python_linux_automation]# fab3 --list
Available tasks:
hostname
获取服务器信息需要在命令行指定:
[root@ykyk python_linux_automation]# fab3 -H mysql_test hostname
izbp1cmbkj49ynx81cezu3z
[root@ykyk python_linux_automation]# fab3 -H mysql_test,own,redis hostname
izbp1cmbkj49ynx81cezu3z
izbp1a43b9q4zlsifma7muz
fab命令行参数:
[root@ykyk python_linux_automation]# fab3 --help
Usage: fab3 [--core-opts] task1 [--task1-opts] ... taskN [--taskN-opts]
Core options:
--complete Print tab-completion candidates for given parse remainder.
--hide=STRING Set default value of run()'s 'hide' kwarg.
--no-dedupe Disable task deduplication.
--print-completion-script=STRING Print the tab-completion script for your preferred shell (bash|zsh|fish).
--prompt-for-login-password Request an upfront SSH-auth password prompt.
--prompt-for-passphrase Request an upfront SSH key passphrase prompt.
--prompt-for-sudo-password Prompt user at start of session for the sudo.password config value.
--write-pyc Enable creation of .pyc files.
-c STRING, --collection=STRING Specify collection name to load.
-d, --debug Enable debug output.
-D INT, --list-depth=INT When listing tasks, only show the first INT levels.
-e, --echo Echo executed commands before running.
-f STRING, --config=STRING Runtime configuration file to use.
-F STRING, --list-format=STRING Change the display format used when listing tasks. Should be one of: flat
(default), nested, json.
-h [STRING], --help[=STRING] Show core or per-task help and exit.
-H STRING, --hosts=STRING Comma-separated host name(s) to execute tasks against.
-i, --identity Path to runtime SSH identity (key) file. May be given multiple times.
-l [STRING], --list[=STRING] List available tasks, optionally limited to a namespace.
-p, --pty Use a pty when executing shell commands.
-r STRING, --search-root=STRING Change root directory used for finding task modules.
-S STRING, --ssh-config=STRING Path to runtime SSH config file.
-V, --version Show version and exit.
-w, --warn-only Warn, instead of failing, when shell commands fail.
pty用于设置伪终端,如果执行命令后需要一个常驻的服务进程,需要设置为pty=False,避免因fabric退出而导致程序退出
fabric装饰器
-
fabric中的task
-
task是fabric需要在远程服务器执行的任务,
-
默认情况下,fabfile中的所有可调用对象都是task,python中的函数是一个可调用对象
-
继承fabric的task类,不推荐
-
使用fabric装饰器,注意:如果fabfile中定义了多个task,只有其中一个使用了task,那么其他notask函数不是task
role新版本取消了。使用SerialGroup
runs_once 只运行一次
以上就是fabric的一些方法
-------------------------------------------------------------------------------------------------------------
案例:使用fabric源码安装redis
from fabric import task
from fabric import connection
from invoke import Exit
from invocations.console import confirm
hosts = ['own']
@task#(hosts='own')
def test(c):
with c.prefix('cd /root/python_linux_automation/redis-4.0.9'):
result = c.run('make && make test', warn=True, pty=False)
if result.failed and not confirm('Tests failed, continue anyway?'):
raise SystemExit("Aborting at user requeset")
else:
print('All tests passed without errors')
c.run('make clean', warn=True, pty=False, hide=True)
with c.prefix("cd /root/python_linux_automation/"):
c.run('tar -czf redis-4.0.9.tar.gz redis-4.0.9')
@task
def deploy(c):
c.put('redis-4.0.9.tar.gz', '/tmp/redis-4.0.9.tar.gz')
with c.cd('/tmp'):
c.run('tar xzf redis-4.0.9.tar.gz')
with c.cd('redis-4.0.9'):
c.run('make')
with c.cd('src'):
c.run('make install')
@task
def clean_file(c):
with c.cd('/tmp'):
c.run('rm -rf redis-4.0.9.tar.gz')
c.run('rm -rf redis-4.0.9')
@task
def clean_local_file(c):
with c.prefix('cd /root/python_linux_automation/'):
c.run('rm -rf redis-4.0.9.tar.gz')
@task
def install(c):
for host in hosts:
c = connection.Connection('own')
test(c)
deploy(c)
clean_file(c)
clean_local_file(c)
Nothing is true,Nothing is fake.