forked from OSchip/llvm-project
312 lines
14 KiB
Python
312 lines
14 KiB
Python
|
"""This class extends pexpect.spawn to specialize setting up SSH connections.
|
||
|
This adds methods for login, logout, and expecting the shell prompt.
|
||
|
|
||
|
$Id: pxssh.py 513 2008-02-09 18:26:13Z noah $
|
||
|
"""
|
||
|
|
||
|
from pexpect import *
|
||
|
import pexpect
|
||
|
import time
|
||
|
|
||
|
__all__ = ['ExceptionPxssh', 'pxssh']
|
||
|
|
||
|
# Exception classes used by this module.
|
||
|
class ExceptionPxssh(ExceptionPexpect):
|
||
|
"""Raised for pxssh exceptions.
|
||
|
"""
|
||
|
|
||
|
class pxssh (spawn):
|
||
|
|
||
|
"""This class extends pexpect.spawn to specialize setting up SSH
|
||
|
connections. This adds methods for login, logout, and expecting the shell
|
||
|
prompt. It does various tricky things to handle many situations in the SSH
|
||
|
login process. For example, if the session is your first login, then pxssh
|
||
|
automatically accepts the remote certificate; or if you have public key
|
||
|
authentication setup then pxssh won't wait for the password prompt.
|
||
|
|
||
|
pxssh uses the shell prompt to synchronize output from the remote host. In
|
||
|
order to make this more robust it sets the shell prompt to something more
|
||
|
unique than just $ or #. This should work on most Borne/Bash or Csh style
|
||
|
shells.
|
||
|
|
||
|
Example that runs a few commands on a remote server and prints the result::
|
||
|
|
||
|
import pxssh
|
||
|
import getpass
|
||
|
try:
|
||
|
s = pxssh.pxssh()
|
||
|
hostname = raw_input('hostname: ')
|
||
|
username = raw_input('username: ')
|
||
|
password = getpass.getpass('password: ')
|
||
|
s.login (hostname, username, password)
|
||
|
s.sendline ('uptime') # run a command
|
||
|
s.prompt() # match the prompt
|
||
|
print s.before # print everything before the prompt.
|
||
|
s.sendline ('ls -l')
|
||
|
s.prompt()
|
||
|
print s.before
|
||
|
s.sendline ('df')
|
||
|
s.prompt()
|
||
|
print s.before
|
||
|
s.logout()
|
||
|
except pxssh.ExceptionPxssh, e:
|
||
|
print "pxssh failed on login."
|
||
|
print str(e)
|
||
|
|
||
|
Note that if you have ssh-agent running while doing development with pxssh
|
||
|
then this can lead to a lot of confusion. Many X display managers (xdm,
|
||
|
gdm, kdm, etc.) will automatically start a GUI agent. You may see a GUI
|
||
|
dialog box popup asking for a password during development. You should turn
|
||
|
off any key agents during testing. The 'force_password' attribute will turn
|
||
|
off public key authentication. This will only work if the remote SSH server
|
||
|
is configured to allow password logins. Example of using 'force_password'
|
||
|
attribute::
|
||
|
|
||
|
s = pxssh.pxssh()
|
||
|
s.force_password = True
|
||
|
hostname = raw_input('hostname: ')
|
||
|
username = raw_input('username: ')
|
||
|
password = getpass.getpass('password: ')
|
||
|
s.login (hostname, username, password)
|
||
|
"""
|
||
|
|
||
|
def __init__ (self, timeout=30, maxread=2000, searchwindowsize=None, logfile=None, cwd=None, env=None):
|
||
|
spawn.__init__(self, None, timeout=timeout, maxread=maxread, searchwindowsize=searchwindowsize, logfile=logfile, cwd=cwd, env=env)
|
||
|
|
||
|
self.name = '<pxssh>'
|
||
|
|
||
|
#SUBTLE HACK ALERT! Note that the command to set the prompt uses a
|
||
|
#slightly different string than the regular expression to match it. This
|
||
|
#is because when you set the prompt the command will echo back, but we
|
||
|
#don't want to match the echoed command. So if we make the set command
|
||
|
#slightly different than the regex we eliminate the problem. To make the
|
||
|
#set command different we add a backslash in front of $. The $ doesn't
|
||
|
#need to be escaped, but it doesn't hurt and serves to make the set
|
||
|
#prompt command different than the regex.
|
||
|
|
||
|
# used to match the command-line prompt
|
||
|
self.UNIQUE_PROMPT = "\[PEXPECT\][\$\#] "
|
||
|
self.PROMPT = self.UNIQUE_PROMPT
|
||
|
|
||
|
# used to set shell command-line prompt to UNIQUE_PROMPT.
|
||
|
self.PROMPT_SET_SH = "PS1='[PEXPECT]\$ '"
|
||
|
self.PROMPT_SET_CSH = "set prompt='[PEXPECT]\$ '"
|
||
|
self.SSH_OPTS = "-o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'"
|
||
|
# Disabling X11 forwarding gets rid of the annoying SSH_ASKPASS from
|
||
|
# displaying a GUI password dialog. I have not figured out how to
|
||
|
# disable only SSH_ASKPASS without also disabling X11 forwarding.
|
||
|
# Unsetting SSH_ASKPASS on the remote side doesn't disable it! Annoying!
|
||
|
#self.SSH_OPTS = "-x -o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'"
|
||
|
self.force_password = False
|
||
|
self.auto_prompt_reset = True
|
||
|
|
||
|
def levenshtein_distance(self, a,b):
|
||
|
|
||
|
"""This calculates the Levenshtein distance between a and b.
|
||
|
"""
|
||
|
|
||
|
n, m = len(a), len(b)
|
||
|
if n > m:
|
||
|
a,b = b,a
|
||
|
n,m = m,n
|
||
|
current = range(n+1)
|
||
|
for i in range(1,m+1):
|
||
|
previous, current = current, [i]+[0]*n
|
||
|
for j in range(1,n+1):
|
||
|
add, delete = previous[j]+1, current[j-1]+1
|
||
|
change = previous[j-1]
|
||
|
if a[j-1] != b[i-1]:
|
||
|
change = change + 1
|
||
|
current[j] = min(add, delete, change)
|
||
|
return current[n]
|
||
|
|
||
|
def sync_original_prompt (self):
|
||
|
|
||
|
"""This attempts to find the prompt. Basically, press enter and record
|
||
|
the response; press enter again and record the response; if the two
|
||
|
responses are similar then assume we are at the original prompt. This
|
||
|
is a slow function. It can take over 10 seconds. """
|
||
|
|
||
|
# All of these timing pace values are magic.
|
||
|
# I came up with these based on what seemed reliable for
|
||
|
# connecting to a heavily loaded machine I have.
|
||
|
# If latency is worse than these values then this will fail.
|
||
|
|
||
|
try:
|
||
|
self.read_nonblocking(size=10000,timeout=1) # GAS: Clear out the cache before getting the prompt
|
||
|
except TIMEOUT:
|
||
|
pass
|
||
|
time.sleep(0.1)
|
||
|
self.sendline()
|
||
|
time.sleep(0.5)
|
||
|
x = self.read_nonblocking(size=1000,timeout=1)
|
||
|
time.sleep(0.1)
|
||
|
self.sendline()
|
||
|
time.sleep(0.5)
|
||
|
a = self.read_nonblocking(size=1000,timeout=1)
|
||
|
time.sleep(0.1)
|
||
|
self.sendline()
|
||
|
time.sleep(0.5)
|
||
|
b = self.read_nonblocking(size=1000,timeout=1)
|
||
|
ld = self.levenshtein_distance(a,b)
|
||
|
len_a = len(a)
|
||
|
if len_a == 0:
|
||
|
return False
|
||
|
if float(ld)/len_a < 0.4:
|
||
|
return True
|
||
|
return False
|
||
|
|
||
|
### TODO: This is getting messy and I'm pretty sure this isn't perfect.
|
||
|
### TODO: I need to draw a flow chart for this.
|
||
|
def login (self,server,username,password='',terminal_type='ansi',original_prompt=r"[#$]",login_timeout=10,port=None,auto_prompt_reset=True):
|
||
|
|
||
|
"""This logs the user into the given server. It uses the
|
||
|
'original_prompt' to try to find the prompt right after login. When it
|
||
|
finds the prompt it immediately tries to reset the prompt to something
|
||
|
more easily matched. The default 'original_prompt' is very optimistic
|
||
|
and is easily fooled. It's more reliable to try to match the original
|
||
|
prompt as exactly as possible to prevent false matches by server
|
||
|
strings such as the "Message Of The Day". On many systems you can
|
||
|
disable the MOTD on the remote server by creating a zero-length file
|
||
|
called "~/.hushlogin" on the remote server. If a prompt cannot be found
|
||
|
then this will not necessarily cause the login to fail. In the case of
|
||
|
a timeout when looking for the prompt we assume that the original
|
||
|
prompt was so weird that we could not match it, so we use a few tricks
|
||
|
to guess when we have reached the prompt. Then we hope for the best and
|
||
|
blindly try to reset the prompt to something more unique. If that fails
|
||
|
then login() raises an ExceptionPxssh exception.
|
||
|
|
||
|
In some situations it is not possible or desirable to reset the
|
||
|
original prompt. In this case, set 'auto_prompt_reset' to False to
|
||
|
inhibit setting the prompt to the UNIQUE_PROMPT. Remember that pxssh
|
||
|
uses a unique prompt in the prompt() method. If the original prompt is
|
||
|
not reset then this will disable the prompt() method unless you
|
||
|
manually set the PROMPT attribute. """
|
||
|
|
||
|
ssh_options = '-q'
|
||
|
if self.force_password:
|
||
|
ssh_options = ssh_options + ' ' + self.SSH_OPTS
|
||
|
if port is not None:
|
||
|
ssh_options = ssh_options + ' -p %s'%(str(port))
|
||
|
cmd = "ssh %s -l %s %s" % (ssh_options, username, server)
|
||
|
|
||
|
# This does not distinguish between a remote server 'password' prompt
|
||
|
# and a local ssh 'passphrase' prompt (for unlocking a private key).
|
||
|
spawn._spawn(self, cmd)
|
||
|
i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT, "(?i)connection closed by remote host"], timeout=login_timeout)
|
||
|
|
||
|
# First phase
|
||
|
if i==0:
|
||
|
# New certificate -- always accept it.
|
||
|
# This is what you get if SSH does not have the remote host's
|
||
|
# public key stored in the 'known_hosts' cache.
|
||
|
self.sendline("yes")
|
||
|
i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT])
|
||
|
if i==2: # password or passphrase
|
||
|
self.sendline(password)
|
||
|
i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT])
|
||
|
if i==4:
|
||
|
self.sendline(terminal_type)
|
||
|
i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT])
|
||
|
|
||
|
# Second phase
|
||
|
if i==0:
|
||
|
# This is weird. This should not happen twice in a row.
|
||
|
self.close()
|
||
|
raise ExceptionPxssh ('Weird error. Got "are you sure" prompt twice.')
|
||
|
elif i==1: # can occur if you have a public key pair set to authenticate.
|
||
|
### TODO: May NOT be OK if expect() got tricked and matched a false prompt.
|
||
|
pass
|
||
|
elif i==2: # password prompt again
|
||
|
# For incorrect passwords, some ssh servers will
|
||
|
# ask for the password again, others return 'denied' right away.
|
||
|
# If we get the password prompt again then this means
|
||
|
# we didn't get the password right the first time.
|
||
|
self.close()
|
||
|
raise ExceptionPxssh ('password refused')
|
||
|
elif i==3: # permission denied -- password was bad.
|
||
|
self.close()
|
||
|
raise ExceptionPxssh ('permission denied')
|
||
|
elif i==4: # terminal type again? WTF?
|
||
|
self.close()
|
||
|
raise ExceptionPxssh ('Weird error. Got "terminal type" prompt twice.')
|
||
|
elif i==5: # Timeout
|
||
|
#This is tricky... I presume that we are at the command-line prompt.
|
||
|
#It may be that the shell prompt was so weird that we couldn't match
|
||
|
#it. Or it may be that we couldn't log in for some other reason. I
|
||
|
#can't be sure, but it's safe to guess that we did login because if
|
||
|
#I presume wrong and we are not logged in then this should be caught
|
||
|
#later when I try to set the shell prompt.
|
||
|
pass
|
||
|
elif i==6: # Connection closed by remote host
|
||
|
self.close()
|
||
|
raise ExceptionPxssh ('connection closed')
|
||
|
else: # Unexpected
|
||
|
self.close()
|
||
|
raise ExceptionPxssh ('unexpected login response')
|
||
|
if not self.sync_original_prompt():
|
||
|
self.close()
|
||
|
raise ExceptionPxssh ('could not synchronize with original prompt')
|
||
|
# We appear to be in.
|
||
|
# set shell prompt to something unique.
|
||
|
if auto_prompt_reset:
|
||
|
if not self.set_unique_prompt():
|
||
|
self.close()
|
||
|
raise ExceptionPxssh ('could not set shell prompt\n'+self.before)
|
||
|
return True
|
||
|
|
||
|
def logout (self):
|
||
|
|
||
|
"""This sends exit to the remote shell. If there are stopped jobs then
|
||
|
this automatically sends exit twice. """
|
||
|
|
||
|
self.sendline("exit")
|
||
|
index = self.expect([EOF, "(?i)there are stopped jobs"])
|
||
|
if index==1:
|
||
|
self.sendline("exit")
|
||
|
self.expect(EOF)
|
||
|
self.close()
|
||
|
|
||
|
def prompt (self, timeout=20):
|
||
|
|
||
|
"""This matches the shell prompt. This is little more than a short-cut
|
||
|
to the expect() method. This returns True if the shell prompt was
|
||
|
matched. This returns False if there was a timeout. Note that if you
|
||
|
called login() with auto_prompt_reset set to False then you should have
|
||
|
manually set the PROMPT attribute to a regex pattern for matching the
|
||
|
prompt. """
|
||
|
|
||
|
i = self.expect([self.PROMPT, TIMEOUT], timeout=timeout)
|
||
|
if i==1:
|
||
|
return False
|
||
|
return True
|
||
|
|
||
|
def set_unique_prompt (self):
|
||
|
|
||
|
"""This sets the remote prompt to something more unique than # or $.
|
||
|
This makes it easier for the prompt() method to match the shell prompt
|
||
|
unambiguously. This method is called automatically by the login()
|
||
|
method, but you may want to call it manually if you somehow reset the
|
||
|
shell prompt. For example, if you 'su' to a different user then you
|
||
|
will need to manually reset the prompt. This sends shell commands to
|
||
|
the remote host to set the prompt, so this assumes the remote host is
|
||
|
ready to receive commands.
|
||
|
|
||
|
Alternatively, you may use your own prompt pattern. Just set the PROMPT
|
||
|
attribute to a regular expression that matches it. In this case you
|
||
|
should call login() with auto_prompt_reset=False; then set the PROMPT
|
||
|
attribute. After that the prompt() method will try to match your prompt
|
||
|
pattern."""
|
||
|
|
||
|
self.sendline ("unset PROMPT_COMMAND")
|
||
|
self.sendline (self.PROMPT_SET_SH) # sh-style
|
||
|
i = self.expect ([TIMEOUT, self.PROMPT], timeout=10)
|
||
|
if i == 0: # csh-style
|
||
|
self.sendline (self.PROMPT_SET_CSH)
|
||
|
i = self.expect ([TIMEOUT, self.PROMPT], timeout=10)
|
||
|
if i == 0:
|
||
|
return False
|
||
|
return True
|
||
|
|
||
|
# vi:ts=4:sw=4:expandtab:ft=python:
|