self.handle =
None
def
ssh_ap(self, host, username, password, port=22, timeout=5, retry_times=3
):
windows ssh AP 服务器;
:param host: AP信息,如hostname 或IP;
:param username: 登录AP使用的用户名;
:param password: 登录AP使用的密码;
:param port: ssh 使用的端口,默认为22;
:param timeout: 登录设置的超时时间,输入用户名密码等待指定信息的时间,默认值为5;
:param retry_times: 登录失败后,重试登录次数,默认值为3次;
:return:登录成功返回True,否则False;
return
self.
__ssh_connect_ap
(host, username, password, port=port, timeout=timeout, retry_times=
retry_times,
flag
=
'
this is ap shell
'
)
def
ssh_ac(self, host, username, password, port=22, timeout=5, retry_times=3
):
windows ssh AP 服务器;
param host: AP信息,如hostname 或IP;
:param username: 登录AP使用的用户名;
:param password: 登录AP使用的密码;
:param port: ssh 使用的端口,默认为22;
:param timeout: 登录设置的超时时间,输入用户名密码等待指定信息的时间,默认值为5;
:param retry_times: 登录失败后,重试登录次数,默认值为3次;
:return:登录成功返回True,否则False;
return
self.
__ssh_connect_ap
(host, username, password, port=port, timeout=timeout, retry_times=
retry_times,
flag
=
'
Current license state .*
'
)
def
__ssh_connect_ap
(self, host, username, password, port=22, timeout=5, retry_times=3, flag=
None):
windows ssh 服务器;
:param host: 服务器信息,如hostname 或IP;
:param username: 登录使用的用户名;
:param password: 登录使用的密码;
:param port: ssh 使用的端口,默认为22;
:param timeout: 登录设置的超时时间,输入用户名密码等待指定信息的时间,默认值为5;
:param retry_times: 登录失败后,重试登录次数,默认值为3次;
:param flag: 判断登录成功的标志;
:return:登录成功返回True,否则False;
cmd
=
'
plink.exe -ssh -P {} {}
'
.format(port, host)
login_status
=
False
if
host
is
None
or
host ==
''
:
logging.error(
'
Host is invalid,please check
'
)
if
username
is
None
or
host.strip() ==
''
:
logging.error(
'
username is invalid,please check
'
)
else
:
logging.debug(
'
ssh execute command:{}
'
.format(cmd))
ssh_handle
= pexpect.popen_spawn.PopenSpawn(cmd, timeout=
timeout)
self.handle
=
ssh_handle
exp
= self.handle.expect([
'
login as:
'
, pexpect.EOF, pexpect.TIMEOUT], timeout=5
)
if
exp ==
0:
logging.info(
'
input username:{}
'
.format(username))
self.handle.sendline(username)
exp1
= self.handle.expect([
'
password:
'
, pexpect.EOF, pexpect.TIMEOUT], timeout=
timeout)
if
exp1 ==
0:
logging.info(self.handle.before
+
self.handle.after)
self.handle.sendline(password)
logging.info(
'
input password:{}
'
.format(password))
exp11
= ssh_handle.expect([flag,
'
Access denied
'
, pexpect.EOF, pexpect.TIMEOUT],
timeout
=
timeout)
if
exp11 ==
0:
login_status
=
True
logging.info(
'
buffer value:{}
'
.format(ssh_handle.before +
ssh_handle.after))
logging.info(
'
login success
'
)
elif
exp11 == 1
:
logging.error(
'
login failed,username or password error
'
)
for
i
in
range(1, retry_times + 1
):
logging.info(
'
retry login,retry times:{}
'
.format(i))
self.handle.sendline(password)
exp12
= self.handle.expect([
'
this is ap shell
'
, pexpect.EOF, pexpect.TIMEOUT],
timeout
=
timeout)
if
exp12 ==
0:
logging.info(
'
buffer value:{}
'
.format(ssh_handle.before +
ssh_handle.after))
logging.info(
'
Login success
'
)
login_status
=
True
break
elif
exp12 == 1
:
self.
__log_eof
()
else
:
self.
__log_timeout
()
elif
exp11 == 2
:
self.
__log_eof
()
else
:
self.
__log_timeout
()
elif
exp == 1
:
self.
__log_eof
()
else
:
self.
__log_timeout
()
elif
exp == 1
:
self.
__log_eof
()
else
:
self.
__log_timeout
()
return
login_status
def
__buf_get
(self):
return
str(self.handle.before) +
str(self.handle.after)
def
__log_eof
(self):
logging.error(
'
Received EOF signal,buf value:{}
'
.format(self.
__buf_get
()))
def
__log_timeout
(self):
logging.error(
'
Received TIMEOUT,buf value:{}
'
.format(self.
__buf_get
()))
def
execute_cmd_by_ssh(self, cmd, timeout=5, prompt=
'
>
'
):
ssh 命令执行,AC 与AP均可使用此方法,并将结果进行返回;
:param cmd: 执行的命令;
:param timeout:设置等待某信息的超时时间,默认为5s;
:param prompt: 等待的提示符,默认为:>;
:return: 返回执行消息的返回值,如果执行出现异常,超时等返回None;
self.handle.sendline(cmd)
exp
= self.handle.expect([prompt, pexpect.EOF, pexpect.TIMEOUT], timeout=
timeout)
res
=
None
if
exp ==
0:
res
= self.handle.before +
self.handle.after
elif
exp == 1
:
self.
__log_eof
()
else
:
self.
__log_timeout
()
return
res