view en/examples/run-example @ 744:1114da00d30e

Fix '<programlisting>' in ch02-tour-basic.xml
author Dongsheng Song <dongsheng.song@gmail.com>
date Wed, 18 Mar 2009 19:43:46 +0800
parents 60ee738fdc0e
children 80928ea6e7ae
line wrap: on
line source

#!/usr/bin/env python
#
# This program takes something that resembles a shell script and runs
# it, spitting input (commands from the script) and output into text
# files, for use in examples.

import cStringIO
import errno
import getopt
import os
import pty
import re
import select
import shutil
import signal
import stat
import sys
import tempfile
import time

xml_subs = {
    '<': '&lt;',
    '>': '&gt;',
    '&': '&amp;',
    }

def gensubs(s):
    start = 0
    for i, c in enumerate(s):
        sub = xml_subs.get(c)
        if sub:
            yield s[start:i]
            start = i + 1
            yield sub
    yield s[start:]

def xml_escape(s):
    return ''.join(gensubs(s))
        
def maybe_unlink(name):
    try:
        os.unlink(name)
        return True
    except OSError, err:
        if err.errno != errno.ENOENT:
            raise
    return False

def find_path_to(program):
    for p in os.environ.get('PATH', os.defpath).split(os.pathsep):
        name = os.path.join(p, program)
        if os.access(name, os.X_OK):
            return p
    return None
        
def result_name(name):
    dirname, basename = os.path.split(name)
    return os.path.join(dirname, 'results', basename)

class example:
    shell = '/usr/bin/env bash'
    ps1 = '__run_example_ps1__ '
    ps2 = '__run_example_ps2__ '
    pi_re = re.compile(r'#\$\s*(name|ignore):\s*(.*)$')
    
    timeout = 10

    entities = dict.fromkeys(l.rstrip() for l in open('auto-snippets.xml'))

    def __init__(self, name, verbose, keep_change):
        self.name = name
        self.verbose = verbose
        self.keep_change = keep_change
        self.poll = select.poll()

    def parse(self):
        '''yield each hunk of input from the file.'''
        fp = open(self.name)
        cfp = cStringIO.StringIO()
        for line in fp:
            cfp.write(line)
            if not line.rstrip().endswith('\\'):
                yield cfp.getvalue()
                cfp.seek(0)
                cfp.truncate()
        
    def status(self, s):
        sys.stdout.write(s)
        if not s.endswith('\n'):
            sys.stdout.flush()

    def send(self, s):
        if self.verbose:
            print >> sys.stderr, '>', self.debugrepr(s)
        while s:
            count = os.write(self.cfd, s)
            s = s[count:]

    def debugrepr(self, s):
        rs = repr(s)
        limit = 60
        if len(rs) > limit:
            return ('%s%s ... [%d bytes]' % (rs[:limit], rs[0], len(s)))
        else:
            return rs
            
    timeout = 5

    def read(self, hint):
        events = self.poll.poll(self.timeout * 1000)
        if not events:
            print >> sys.stderr, ('[%stimed out after %d seconds]' %
                                  (hint, self.timeout))
            os.kill(self.pid, signal.SIGHUP)
            return ''
        return os.read(self.cfd, 1024)
        
    def receive(self, hint):
        out = cStringIO.StringIO()
        while True:
            try:
                if self.verbose:
                    sys.stderr.write('< ')
                s = self.read(hint)
            except OSError, err:
                if err.errno == errno.EIO:
                    return '', ''
                raise
            if self.verbose:
                print >> sys.stderr, self.debugrepr(s)
            out.write(s)
            s = out.getvalue()
            if s.endswith(self.ps1):
                return self.ps1, s.replace('\r\n', '\n')[:-len(self.ps1)]
            if s.endswith(self.ps2):
                return self.ps2, s.replace('\r\n', '\n')[:-len(self.ps2)]
        
    def sendreceive(self, s, hint):
        self.send(s)
        ps, r = self.receive(hint)
        if r.startswith(s):
            r = r[len(s):]
        return ps, r
    
    def run(self):
        ofp = None
        basename = os.path.basename(self.name)
        self.status('running %s ' % basename)
        tmpdir = tempfile.mkdtemp(prefix=basename)

        # remove the marker file that we tell make to use to see if
        # this run succeeded
        maybe_unlink(self.name + '.run')

        rcfile = os.path.join(tmpdir, '.hgrc')
        rcfp = open(rcfile, 'w')
        print >> rcfp, '[ui]'
        print >> rcfp, "username = Bryan O'Sullivan <bos@serpentine.com>"
        
        rcfile = os.path.join(tmpdir, '.bashrc')
        rcfp = open(rcfile, 'w')
        print >> rcfp, 'PS1="%s"' % self.ps1
        print >> rcfp, 'PS2="%s"' % self.ps2
        print >> rcfp, 'unset HISTFILE'
        path = ['/usr/bin', '/bin']
        hg = find_path_to('hg')
        if hg and hg not in path:
            path.append(hg)
        def re_export(envar):
            v = os.getenv(envar)
            if v is not None:
                print >> rcfp, 'export ' + envar + '=' + v
        print >> rcfp, 'export PATH=' + ':'.join(path)
        re_export('PYTHONPATH')
        print >> rcfp, 'export EXAMPLE_DIR="%s"' % os.getcwd()
        print >> rcfp, 'export HGMERGE=merge'
        print >> rcfp, 'export LANG=C'
        print >> rcfp, 'export LC_ALL=C'
        print >> rcfp, 'export TZ=GMT'
        print >> rcfp, 'export HGRC="%s/.hgrc"' % tmpdir
        print >> rcfp, 'export HGRCPATH=$HGRC'
        print >> rcfp, 'cd %s' % tmpdir
        rcfp.close()
        sys.stdout.flush()
        sys.stderr.flush()
        self.pid, self.cfd = pty.fork()
        if self.pid == 0:
            cmdline = ['/usr/bin/env', '-i', 'bash', '--noediting',
                       '--noprofile', '--norc']
            try:
                os.execv(cmdline[0], cmdline)
            except OSError, err:
                print >> sys.stderr, '%s: %s' % (cmdline[0], err.strerror)
                sys.stderr.flush()
                os._exit(0)
        self.poll.register(self.cfd, select.POLLIN | select.POLLERR |
                           select.POLLHUP)

        prompts = {
            '': '',
            self.ps1: '$',
            self.ps2: '>',
            }

        ignore = [
            r'\d+:[0-9a-f]{12}', # changeset number:hash
            r'[0-9a-f]{40}', # long changeset hash
            r'[0-9a-f]{12}', # short changeset hash
            r'^(?:---|\+\+\+) .*', # diff header with dates
            r'^date:.*', # date
            #r'^diff -r.*', # "diff -r" is followed by hash
            r'^# Date \d+ \d+', # hg patch header
            ]

        err = False
        read_hint = ''

        try:
            try:
                # eat first prompt string from shell
                self.read(read_hint)
                # setup env and prompt
                ps, output = self.sendreceive('source %s\n' % rcfile,
                                              read_hint)
                for hunk in self.parse():
                    # is this line a processing instruction?
                    m = self.pi_re.match(hunk)
                    if m:
                        pi, rest = m.groups()
                        if pi == 'name':
                            self.status('.')
                            out = rest
                            if out in ('err', 'lxo', 'out', 'run', 'tmp'):
                                print >> sys.stderr, ('%s: illegal section '
                                                      'name %r' %
                                                      (self.name, out))
                                return 1
                            assert os.sep not in out
                            if ofp is not None:
                                ofp.write('</screen>\n')
                                ofp.close()
                                err |= self.rename_output(ofp_basename, ignore)
                            if out:
                                ofp_basename = '%s.%s' % (self.name, out)
                                norm = os.path.normpath(ofp_basename)
                                example.entities[
                                    '<!ENTITY interaction.%s '
                                    'SYSTEM "results/%s.out">'
                                    % (norm, norm)] = 1
                                read_hint = ofp_basename + ' '
                                ofp = open(result_name(ofp_basename + '.tmp'),
                                           'w')
                                ofp.write('<screen>')
                            else:
                                ofp = None
                        elif pi == 'ignore':
                            ignore.append(rest)
                    elif hunk.strip():
                        # it's something we should execute
                        newps, output = self.sendreceive(hunk, read_hint)
                        if not ofp:
                            continue
                        # first, print the command we ran
                        if not hunk.startswith('#'):
                            nl = hunk.endswith('\n')
                            hunk = ('<prompt>%s</prompt> <userinput>%s</userinput>' %
                                    (prompts[ps],
                                     xml_escape(hunk.rstrip('\n'))))
                            if nl: hunk += '\n'
                        ofp.write(hunk)
                        # then its output
                        ofp.write(xml_escape(output))
                    ps = newps
                self.status('\n')
            except:
                print >> sys.stderr, '(killed)'
                os.kill(self.pid, signal.SIGKILL)
                pid, rc = os.wait()
                raise
            else:
                try:
                    ps, output = self.sendreceive('exit\n', read_hint)
                    if ofp is not None:
                        ofp.write(output)
                        ofp.write('</screen>\n')
                        ofp.close()
                        err |= self.rename_output(ofp_basename, ignore)
                    os.close(self.cfd)
                except IOError:
                    pass
                os.kill(self.pid, signal.SIGTERM)
                pid, rc = os.wait()
                err = err or rc
                if err:
                    if os.WIFEXITED(rc):
                        print >> sys.stderr, '(exit %s)' % os.WEXITSTATUS(rc)
                    elif os.WIFSIGNALED(rc):
                        print >> sys.stderr, '(signal %s)' % os.WTERMSIG(rc)
                else:
                    open(result_name(self.name + '.run'), 'w')
                return err
        finally:
            shutil.rmtree(tmpdir)

    def rename_output(self, base, ignore):
        mangle_re = re.compile('(?:' + '|'.join(ignore) + ')')
        def mangle(s):
            return mangle_re.sub('', s)
        def matchfp(fp1, fp2):
            while True:
                s1 = mangle(fp1.readline())
                s2 = mangle(fp2.readline())
                if cmp(s1, s2):
                    break
                if not s1:
                    return True
            return False

        oldname = result_name(base + '.out')
        tmpname = result_name(base + '.tmp')
        errname = result_name(base + '.err')
        errfp = open(errname, 'w+')
        for line in open(tmpname):
            errfp.write(mangle_re.sub('', line))
        os.rename(tmpname, result_name(base + '.lxo'))
        errfp.seek(0)
        try:
            oldfp = open(oldname)
        except IOError, err:
            if err.errno != errno.ENOENT:
                raise
            os.rename(errname, oldname)
            return False
        if matchfp(oldfp, errfp):
            os.unlink(errname)
            return False
        else:
            print >> sys.stderr, '\nOutput of %s has changed!' % base
            if self.keep_change:
                os.rename(errname, oldname)
                return False
            else:
                os.system('diff -u %s %s 1>&2' % (oldname, errname))
            return True

def print_help(exit, msg=None):
    if msg:
        print >> sys.stderr, 'Error:', msg
    print >> sys.stderr, 'Usage: run-example [options] [test...]'
    print >> sys.stderr, 'Options:'
    print >> sys.stderr, '  -a --all       run all examples in this directory'
    print >> sys.stderr, '  -h --help      print this help message'
    print >> sys.stderr, '     --help      keep new output as desired output'
    print >> sys.stderr, '  -v --verbose   display extra debug output'
    sys.exit(exit)

def main(path='.'):
    if os.path.realpath(path).split(os.sep)[-1] != 'examples':
        print >> sys.stderr, 'Not being run from the examples directory!'
        sys.exit(1)

    opts, args = getopt.getopt(sys.argv[1:], '?ahv',
                               ['all', 'help', 'keep', 'verbose'])
    verbose = False
    run_all = False
    keep_change = False

    for o, a in opts:
        if o in ('-h', '-?', '--help'):
            print_help(0)
        if o in ('-a', '--all'):
            run_all = True
        if o in ('--keep',):
            keep_change = True
        if o in ('-v', '--verbose'):
            verbose = True
    errs = 0
    if args:
        for a in args:
            try:
                st = os.lstat(a)
            except OSError, err:
                print >> sys.stderr, '%s: %s' % (a, err.strerror)
                errs += 1
                continue
            if stat.S_ISREG(st.st_mode) and st.st_mode & 0111:
                if example(a, verbose, keep_change).run():
                    errs += 1
            else:
                print >> sys.stderr, '%s: not a file, or not executable' % a
                errs += 1
    elif run_all:
        names = os.listdir(path)
        names.sort()
        for name in names:
            if name == 'run-example' or name.startswith('.'): continue
            if name.endswith('~'): continue
            pathname = os.path.join(path, name)
            try:
                st = os.lstat(pathname)
            except OSError, err:
                # could be an output file that was removed while we ran
                if err.errno != errno.ENOENT:
                    raise
                continue
            if stat.S_ISREG(st.st_mode) and st.st_mode & 0111:
                if example(pathname, verbose, keep_change).run():
                    errs += 1
        print >> open(os.path.join(path, '.run'), 'w'), time.asctime()
    else:
        print_help(1, msg='no test names given, and --all not provided')

    fp = open('auto-snippets.xml', 'w')
    for key in sorted(example.entities.iterkeys()):
        print >> fp, key
    fp.close()
    return errs

if __name__ == '__main__':
    try:
        sys.exit(main())
    except KeyboardInterrupt:
        print >> sys.stderr, 'interrupted!'
        sys.exit(1)