相关文章推荐
温柔的槟榔  ·  关于Visual Studio ...·  10 月前    · 
Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

I am using Paramiko to monitor logs on remote machines during a test run.

The monitor happens in a daemon thread which pretty much does this:

        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        transport = ssh.get_transport()
        channel = transport.open_session()
        channel.exec_command('sudo tail -f ' + self.logfile)
        last_partial = ''
        while not self.stopped.isSet():
                if None == select or None == channel:
                    break
                rl, wl, xl = select.select([channel], [], [],  1.0)
                if None == rl:
                    break
                if len(rl) > 0:
                    # Must be stdout, how can I check?
                    line = channel.recv(1024)
                else:
                    time.sleep(1.0)
                    continue
            except:
                break
            if line:
               #handle saving the line... lines are 'merged' so that one log is made from all the sources
        ssh.close()

I had problems with blocking reads so I started doing things this way and most of the time it works nicely. I think that I run into problems when there is network slowness.

Sometimes I see this error at the end of a run (after self.stopped above is set). I've tried sleeping after setting stopped and joining all the monitor threads but the hang can still happen.

Exception in thread Thread-9 (most likely raised during interpreter shutdown):
Traceback (most recent call last):
  File "/usr/lib64/python2.6/threading.py", line 532, in __bootstrap_inner
  File "/usr/lib/python2.6/site-packages/paramiko/transport.py", line 1470, in run
<type 'exceptions.AttributeError'>: 'NoneType' object has no attribute 'error'

In transport.py of Paramiko, I think this is where the error is. Look for #<<<<<<<<<<<<<<<<<<<<<<<<<<< below.

                       self._channel_handler_table[ptype](chan, m)
                    elif chanid in self.channels_seen:
                        self._log(DEBUG, 'Ignoring message for dead channel %d' % chanid)
                    else:
                        self._log(ERROR, 'Channel request for unknown channel %d' % chanid)
                        self.active = False
                        self.packetizer.close()
                elif (self.auth_handler is not None) and (ptype in self.auth_handler._handler_table):
                    self.auth_handler._handler_table[ptype](self.auth_handler, m)
                else:
                    self._log(WARNING, 'Oops, unhandled type %d' % ptype)
                    msg = Message()
                    msg.add_byte(cMSG_UNIMPLEMENTED)
                    msg.add_int(m.seqno)
                    self._send_message(msg)
        except SSHException as e:
            self._log(ERROR, 'Exception: ' + str(e))
            self._log(ERROR, util.tb_strings())   #<<<<<<<<<<<<<<<<<<<<<<<<<<< line 1470
            self.saved_exception = e
        except EOFError as e:
            self._log(DEBUG, 'EOF in transport thread')
            #self._log(DEBUG, util.tb_strings())
            self.saved_exception = e
        except socket.error as e:
            if type(e.args) is tuple:
                if e.args:
                    emsg = '%s (%d)' % (e.args[1], e.args[0])
                else:  # empty tuple, e.g. socket.timeout
                    emsg = str(e) or repr(e)
            else:
                emsg = e.args
            self._log(ERROR, 'Socket exception: ' + emsg)
            self.saved_exception = e
        except Exception as e:
            self._log(ERROR, 'Unknown exception: ' + str(e))
            self._log(ERROR, util.tb_strings())

When a run is stuck, I can run >>>>> sudo lsof -i -n | egrep '\' to see that there are indeed stuck ssh connections (indefinitely stuck). My main test process is PID 15010.

sshd       6478          root    3u  IPv4   46405      0t0  TCP *:ssh (LISTEN)
sshd       6478          root    4u  IPv6   46407      0t0  TCP *:ssh (LISTEN)
sshd      14559          root    3r  IPv4 3287615      0t0  TCP 172.16.0.171:ssh-    >10.42.80.100:59913 (ESTABLISHED)
sshd      14563         cmead    3u  IPv4 3287615      0t0  TCP 172.16.0.171:ssh->10.42.80.100:59913 (ESTABLISHED)
python    15010          root   12u  IPv4 3291525      0t0  TCP 172.16.0.171:43227->172.16.0.142:ssh (ESTABLISHED)
python    15010          root   15u  IPv4 3291542      0t0  TCP 172.16.0.171:41928->172.16.0.227:ssh (ESTABLISHED)
python    15010          root   16u  IPv4 3291784      0t0  TCP 172.16.0.171:57682->172.16.0.48:ssh (ESTABLISHED)
python    15010          root   17u  IPv4 3291779      0t0  TCP 172.16.0.171:43246->172.16.0.142:ssh (ESTABLISHED)
python    15010          root   20u  IPv4 3291789      0t0  TCP 172.16.0.171:41949->172.16.0.227:ssh (ESTABLISHED)
python    15010          root   65u  IPv4 3292014      0t0  TCP 172.16.0.171:51886->172.16.0.226:ssh (ESTABLISHED)
sshd      15106          root    3r  IPv4 3292962      0t0  TCP 172.16.0.171:ssh->10.42.80.100:60540 (ESTABLISHED)
sshd      15110         cmead    3u  IPv4 3292962      0t0  TCP 172.16.0.171:ssh->10.42.80.100:60540 (ESTABLISHED)

So, I really just want my process not to hang. Oh, and I don't want to update Paramiko if it will require updating Python past 2.6.6 because I on centos and from what I have read going past 2.6.6 can be 'complicated'.

Thanks for any ideas.

Comment to shavenwarthog that is too long for comments:

Hi, thanks for the answer. I have a couple quick questions. 1) what if i need to stop the threads at an unknown time? in other words, the tail -f blah.log threads will run for maybe 3 minutes and i want to maybe check the accumulated data 10 times in those three minutes? 2) kind of the same, I guess, when i tried this with some actual remote machines it wouldn't exit (since tail -f never exits). I had forgotten about this but I think the non-blocking read was to solve this. Do you think the other thread you commented on plus this one is enough to make this work? Basically use my non blocking read to gather data local to each runner thread. Then i would only need to lock when the main thread wants data from each runner, which seems like it would distribute my one lock to say 10 locks and that would help. Does that make sense?

I have commented out my time.sleep(1.0) and am possibly seeing a big improvement so far. I see more of the 'most likely raised during interpreter shutdown' which I guess is just because daemon threads are not great. Hopefully that sleep was what was making the program get stuck. – chrismead Jun 16, 2014 at 21:30 UPDATE: Removing the sleep decreased the frequency of hangs significantly, but didn't fix it completely – chrismead Jun 24, 2014 at 18:01 Does anyone have another recommendation for the 'non blocking read code'? something to experiment with? (my first code block) – chrismead Jun 24, 2014 at 18:02

The following code runs a command on multiple hosts. When each command has some data waiting, it gets printed to the screen.

The overall form is adapted from Alex Martelli's code. This version has more logging, including showing a human-readable version of the each connection's host.

The original code was written for commands that run then exit. I changed it to print data incrementally, when it's available. Previously, the first thread that grabbed the lock would block on the read(), and all the threads would starve. The new solution bypasses that.

EDIT, some notes:

To stop the program at a later time, we run into a rather sticky situation. Threads are uninterruptible -- we can't just set up a signal handler to sys.exit() the program. The updated code is set up to exit safely after 3 seconds, by using a while loop to join() each thread. For real code, if the parent exits then the threads should also correctly. Note carefully the two WARNINGs in the code, because the signal/exit/thread interaction is rather squirrely.

Code processes data as it comes it -- right now data is just printed to the console. It doesn't use non-blocking reads because 1) non-blocking code is much more complex, and 2) the original program didn't process child thread's data in the parent. For threads, it's easier to do everything the child thread, which writes to a file, database, or service. For anything more complicated, use multiprocessing which is much easier, and has nice facilities for doing lots of jobs and restarting them if they die. That library also lets you distribute load across multiple CPUs, which threading does not allow.

Have fun!

EDIT #2

Note it's possible, and probably preferred, to run multiple processes without using threading nor multiprocessing. TLDR: use Popen and a select() loop to process batches of output. See sample code in Pastebin: run multiple commands without subprocess/multiprocessing

source

# adapted from https://stackoverflow.com/questions/3485428/creating-multiple-ssh-connections-at-a-time-using-paramiko
import signal, sys, threading
import paramiko
CMD = 'tail -f /var/log/syslog'
def signal_cleanup(_signum, _frame):
    print '\nCLEANUP\n'
    sys.exit(0)
def workon(host):
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(host)
    _stdin, stdout, _stderr = ssh.exec_command(CMD)
    for line in stdout:
        print threading.current_thread().name, line,
def main():
    hosts = ['localhost', 'localhost']
    # exit after a few seconds (see WARNINGs)
    signal.signal(signal.SIGALRM, signal_cleanup)
    signal.alarm(3)
    threads = [
        threading.Thread(
            target=workon, 
            args=(host,),
            name='host #{}'.format(num+1)
        for num,host in enumerate(hosts)
    print 'starting'
    for t in threads:
        # WARNING: daemon=True allows program to exit when main proc
        # does; otherwise we'll wait until all threads complete.
        t.daemon = True    
        t.start()
    print 'joining'
    for t in threads:
        # WARNING: t.join() is uninterruptible; this while loop allows
        # signals
        # see: http://snakesthatbite.blogspot.com/2010/09/cpython-threading-interrupting.html
        while t.is_alive():
            t.join(timeout=0.1)
    print 'done!'
if __name__=='__main__':
    main()

output

starting
joining
host #2 Jun 27 16:28:25 palabras kernel: [158950.369443] ideapad_laptop: Unknown event: 1
host #2 Jun 27 16:29:12 palabras kernel: [158997.098833] ideapad_laptop: Unknown event: 1
host #1 Jun 27 16:28:25 palabras kernel: [158950.369443] ideapad_laptop: Unknown event: 1
host #1 Jun 27 16:29:12 palabras kernel: [158997.098833] ideapad_laptop: Unknown event: 1
host #1 Jun 27 16:29:36 palabras kernel: [159020.809748] ideapad_laptop: Unknown event: 1
                @chrismead thanks for the reply, I've updated the code and added some notes that will help
– johntellsall
                Jun 27, 2014 at 20:34
        

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.