Collectives™ on Stack Overflow
Find centralized, trusted content and collaborate around the technologies you use most.
Learn more about Collectives
Teams
Q&A for work
Connect and share knowledge within a single location that is structured and easy to search.
Learn more about Teams
I am using Paramiko to monitor logs on remote machines during a test run.
The monitor happens in a daemon thread which pretty much does this:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
transport = ssh.get_transport()
channel = transport.open_session()
channel.exec_command('sudo tail -f ' + self.logfile)
last_partial = ''
while not self.stopped.isSet():
if None == select or None == channel:
break
rl, wl, xl = select.select([channel], [], [], 1.0)
if None == rl:
break
if len(rl) > 0:
# Must be stdout, how can I check?
line = channel.recv(1024)
else:
time.sleep(1.0)
continue
except:
break
if line:
#handle saving the line... lines are 'merged' so that one log is made from all the sources
ssh.close()
I had problems with blocking reads so I started doing things this way and most of the time it works nicely. I think that I run into problems when there is network slowness.
Sometimes I see this error at the end of a run (after self.stopped above is set). I've tried sleeping after setting stopped and joining all the monitor threads but the hang can still happen.
Exception in thread Thread-9 (most likely raised during interpreter shutdown):
Traceback (most recent call last):
File "/usr/lib64/python2.6/threading.py", line 532, in __bootstrap_inner
File "/usr/lib/python2.6/site-packages/paramiko/transport.py", line 1470, in run
<type 'exceptions.AttributeError'>: 'NoneType' object has no attribute 'error'
In transport.py of Paramiko, I think this is where the error is. Look for #<<<<<<<<<<<<<<<<<<<<<<<<<<< below.
self._channel_handler_table[ptype](chan, m)
elif chanid in self.channels_seen:
self._log(DEBUG, 'Ignoring message for dead channel %d' % chanid)
else:
self._log(ERROR, 'Channel request for unknown channel %d' % chanid)
self.active = False
self.packetizer.close()
elif (self.auth_handler is not None) and (ptype in self.auth_handler._handler_table):
self.auth_handler._handler_table[ptype](self.auth_handler, m)
else:
self._log(WARNING, 'Oops, unhandled type %d' % ptype)
msg = Message()
msg.add_byte(cMSG_UNIMPLEMENTED)
msg.add_int(m.seqno)
self._send_message(msg)
except SSHException as e:
self._log(ERROR, 'Exception: ' + str(e))
self._log(ERROR, util.tb_strings()) #<<<<<<<<<<<<<<<<<<<<<<<<<<< line 1470
self.saved_exception = e
except EOFError as e:
self._log(DEBUG, 'EOF in transport thread')
#self._log(DEBUG, util.tb_strings())
self.saved_exception = e
except socket.error as e:
if type(e.args) is tuple:
if e.args:
emsg = '%s (%d)' % (e.args[1], e.args[0])
else: # empty tuple, e.g. socket.timeout
emsg = str(e) or repr(e)
else:
emsg = e.args
self._log(ERROR, 'Socket exception: ' + emsg)
self.saved_exception = e
except Exception as e:
self._log(ERROR, 'Unknown exception: ' + str(e))
self._log(ERROR, util.tb_strings())
When a run is stuck, I can run >>>>> sudo lsof -i -n | egrep '\'
to see that there are indeed stuck ssh connections (indefinitely stuck). My main test process is PID 15010.
sshd 6478 root 3u IPv4 46405 0t0 TCP *:ssh (LISTEN)
sshd 6478 root 4u IPv6 46407 0t0 TCP *:ssh (LISTEN)
sshd 14559 root 3r IPv4 3287615 0t0 TCP 172.16.0.171:ssh- >10.42.80.100:59913 (ESTABLISHED)
sshd 14563 cmead 3u IPv4 3287615 0t0 TCP 172.16.0.171:ssh->10.42.80.100:59913 (ESTABLISHED)
python 15010 root 12u IPv4 3291525 0t0 TCP 172.16.0.171:43227->172.16.0.142:ssh (ESTABLISHED)
python 15010 root 15u IPv4 3291542 0t0 TCP 172.16.0.171:41928->172.16.0.227:ssh (ESTABLISHED)
python 15010 root 16u IPv4 3291784 0t0 TCP 172.16.0.171:57682->172.16.0.48:ssh (ESTABLISHED)
python 15010 root 17u IPv4 3291779 0t0 TCP 172.16.0.171:43246->172.16.0.142:ssh (ESTABLISHED)
python 15010 root 20u IPv4 3291789 0t0 TCP 172.16.0.171:41949->172.16.0.227:ssh (ESTABLISHED)
python 15010 root 65u IPv4 3292014 0t0 TCP 172.16.0.171:51886->172.16.0.226:ssh (ESTABLISHED)
sshd 15106 root 3r IPv4 3292962 0t0 TCP 172.16.0.171:ssh->10.42.80.100:60540 (ESTABLISHED)
sshd 15110 cmead 3u IPv4 3292962 0t0 TCP 172.16.0.171:ssh->10.42.80.100:60540 (ESTABLISHED)
So, I really just want my process not to hang. Oh, and I don't want to update Paramiko if it will require updating Python past 2.6.6 because I on centos and from what I have read going past 2.6.6 can be 'complicated'.
Thanks for any ideas.
Comment to shavenwarthog that is too long for comments:
Hi, thanks for the answer. I have a couple quick questions. 1) what if i need to stop the threads at an unknown time? in other words, the tail -f blah.log threads will run for maybe 3 minutes and i want to maybe check the accumulated data 10 times in those three minutes? 2) kind of the same, I guess, when i tried this with some actual remote machines it wouldn't exit (since tail -f never exits). I had forgotten about this but I think the non-blocking read was to solve this. Do you think the other thread you commented on plus this one is enough to make this work? Basically use my non blocking read to gather data local to each runner thread. Then i would only need to lock when the main thread wants data from each runner, which seems like it would distribute my one lock to say 10 locks and that would help. Does that make sense?
–
–
–
The following code runs a command on multiple hosts. When each command has some data waiting, it gets printed to the screen.
The overall form is adapted from Alex Martelli's code. This version has more logging, including showing a human-readable version of the each connection's host.
The original code was written for commands that run then exit. I changed it to print data incrementally, when it's available. Previously, the first thread that grabbed the lock would block on the read()
, and all the threads would starve. The new solution bypasses that.
EDIT, some notes:
To stop the program at a later time, we run into a rather sticky situation. Threads are uninterruptible -- we can't just set up a signal handler to sys.exit()
the program. The updated code is set up to exit safely after 3 seconds, by using a while loop to join()
each thread. For real code, if the parent exits then the threads should also correctly. Note carefully the two WARNINGs in the code, because the signal/exit/thread interaction is rather squirrely.
Code processes data as it comes it -- right now data is just printed to the console. It doesn't use non-blocking reads because 1) non-blocking code is much more complex, and 2) the original program didn't process child thread's data in the parent. For threads, it's easier to do everything the child thread, which writes to a file, database, or service. For anything more complicated, use multiprocessing
which is much easier, and has nice facilities for doing lots of jobs and restarting them if they die. That library also lets you distribute load across multiple CPUs, which threading does not allow.
Have fun!
EDIT #2
Note it's possible, and probably preferred, to run multiple processes without using threading
nor multiprocessing
. TLDR: use Popen
and a select()
loop to process batches of output. See sample code in Pastebin: run multiple commands without subprocess/multiprocessing
source
# adapted from https://stackoverflow.com/questions/3485428/creating-multiple-ssh-connections-at-a-time-using-paramiko
import signal, sys, threading
import paramiko
CMD = 'tail -f /var/log/syslog'
def signal_cleanup(_signum, _frame):
print '\nCLEANUP\n'
sys.exit(0)
def workon(host):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(host)
_stdin, stdout, _stderr = ssh.exec_command(CMD)
for line in stdout:
print threading.current_thread().name, line,
def main():
hosts = ['localhost', 'localhost']
# exit after a few seconds (see WARNINGs)
signal.signal(signal.SIGALRM, signal_cleanup)
signal.alarm(3)
threads = [
threading.Thread(
target=workon,
args=(host,),
name='host #{}'.format(num+1)
for num,host in enumerate(hosts)
print 'starting'
for t in threads:
# WARNING: daemon=True allows program to exit when main proc
# does; otherwise we'll wait until all threads complete.
t.daemon = True
t.start()
print 'joining'
for t in threads:
# WARNING: t.join() is uninterruptible; this while loop allows
# signals
# see: http://snakesthatbite.blogspot.com/2010/09/cpython-threading-interrupting.html
while t.is_alive():
t.join(timeout=0.1)
print 'done!'
if __name__=='__main__':
main()
output
starting
joining
host #2 Jun 27 16:28:25 palabras kernel: [158950.369443] ideapad_laptop: Unknown event: 1
host #2 Jun 27 16:29:12 palabras kernel: [158997.098833] ideapad_laptop: Unknown event: 1
host #1 Jun 27 16:28:25 palabras kernel: [158950.369443] ideapad_laptop: Unknown event: 1
host #1 Jun 27 16:29:12 palabras kernel: [158997.098833] ideapad_laptop: Unknown event: 1
host #1 Jun 27 16:29:36 palabras kernel: [159020.809748] ideapad_laptop: Unknown event: 1
–
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.