view en/examples/run-example @ 773:3b640272a966

Progres on resolve
author Bryan O'Sullivan <bos@serpentine.com>
date Sun, 12 Apr 2009 00:05:30 -0700
parents c82ff69f0935
children 019040fbf5f5
line wrap: on
line source

#!/usr/bin/env python
#
# This program takes something that resembles a shell script and runs
# it, spitting input (commands from the script) and output into text
# files, for use in examples.

import cStringIO
import errno
import getopt
import glob
import os
import pty
import re
import select
import shutil
import signal
import stat
import sys
import tempfile
import time

xml_subs = {
    '<': '&lt;',
    '>': '&gt;',
    '&': '&amp;',
    }

def gensubs(s):
    start = 0
    for i, c in enumerate(s):
        sub = xml_subs.get(c)
        if sub:
            yield s[start:i]
            start = i + 1
            yield sub
    yield s[start:]

def xml_escape(s):
    return ''.join(gensubs(s))
        
def maybe_unlink(name):
    try:
        os.unlink(name)
        return True
    except OSError, err:
        if err.errno != errno.ENOENT:
            raise
    return False

def find_path_to(program):
    for p in os.environ.get('PATH', os.defpath).split(os.pathsep):
        name = os.path.join(p, program)
        if os.access(name, os.X_OK):
            return p
    return None
        
def result_name(name):
    return os.path.normpath(os.path.join('results', name.replace(os.sep, '-')))

def wopen(name):
    path = os.path.dirname(name)
    if path:
        try:
            os.makedirs(path)
        except OSError, err:
            if err.errno != errno.EEXIST:
                raise
    return open(name, 'w')

class example:
    entities = dict.fromkeys(l.rstrip() for l in open('auto-snippets.xml'))

    def __init__(self, name, verbose, keep_change):
        self.name = os.path.normpath(name)
        self.verbose = verbose
        self.keep_change = keep_change
        
    def status(self, s):
        sys.stdout.write(s)
        if not s.endswith('\n'):
            sys.stdout.flush()

    def rename_output(self, base, ignore=[]):
        mangle_re = re.compile('(?:' + '|'.join(ignore) + ')')
        def mangle(s):
            return mangle_re.sub('', s)
        def matchfp(fp1, fp2):
            while True:
                s1 = mangle(fp1.readline())
                s2 = mangle(fp2.readline())
                if cmp(s1, s2):
                    break
                if not s1:
                    return True
            return False

        oldname = result_name(base + '.out')
        tmpname = result_name(base + '.tmp')
        errname = result_name(base + '.err')
        errfp = open(errname, 'w+')
        for line in open(tmpname):
            errfp.write(mangle_re.sub('', line))
        os.rename(tmpname, result_name(base + '.lxo'))
        errfp.seek(0)
        try:
            oldfp = open(oldname)
        except IOError, err:
            if err.errno != errno.ENOENT:
                raise
            os.rename(errname, oldname)
            return False
        if matchfp(oldfp, errfp):
            os.unlink(errname)
            return False
        else:
            print >> sys.stderr, '\nOutput of %s has changed!' % base
            if self.keep_change:
                os.rename(errname, oldname)
                return False
            else:
                os.system('diff -u %s %s 1>&2' % (oldname, errname))
            return True

class static_example(example):
    def run(self):
        self.status('running %s\n' % self.name)
        s = open(self.name).read().rstrip()
        s = s.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
        ofp = wopen(result_name(self.name + '.tmp'))
        ofp.write('<!-- BEGIN %s -->\n' % self.name)
        ofp.write('<programlisting>')
        ofp.write(s)
        ofp.write('</programlisting>\n')
        ofp.write('<!-- END %s -->\n' % self.name)
        ofp.close()
        self.rename_output(self.name)
        norm = self.name.replace(os.sep, '-')
        example.entities[
            '<!ENTITY %s SYSTEM "results/%s.lxo">' % (norm, norm)] = 1


class shell_example(example):
    shell = '/usr/bin/env bash'
    ps1 = '__run_example_ps1__ '
    ps2 = '__run_example_ps2__ '
    pi_re = re.compile(r'#\$\s*(name|ignore):\s*(.*)$')
    
    timeout = 10

    def __init__(self, name, verbose, keep_change):
        example.__init__(self, name, verbose, keep_change)
        self.poll = select.poll()

    def parse(self):
        '''yield each hunk of input from the file.'''
        fp = open(self.name)
        cfp = cStringIO.StringIO()
        for line in fp:
            cfp.write(line)
            if not line.rstrip().endswith('\\'):
                yield cfp.getvalue()
                cfp.seek(0)
                cfp.truncate()

    def send(self, s):
        if self.verbose:
            print >> sys.stderr, '>', self.debugrepr(s)
        while s:
            count = os.write(self.cfd, s)
            s = s[count:]

    def debugrepr(self, s):
        rs = repr(s)
        limit = 60
        if len(rs) > limit:
            return ('%s%s ... [%d bytes]' % (rs[:limit], rs[0], len(s)))
        else:
            return rs
            
    timeout = 5

    def read(self, hint):
        events = self.poll.poll(self.timeout * 1000)
        if not events:
            print >> sys.stderr, ('[%stimed out after %d seconds]' %
                                  (hint, self.timeout))
            os.kill(self.pid, signal.SIGHUP)
            return ''
        return os.read(self.cfd, 1024)
        
    def receive(self, hint):
        out = cStringIO.StringIO()
        while True:
            try:
                if self.verbose:
                    sys.stderr.write('< ')
                s = self.read(hint)
            except OSError, err:
                if err.errno == errno.EIO:
                    return '', ''
                raise
            if self.verbose:
                print >> sys.stderr, self.debugrepr(s)
            out.write(s)
            s = out.getvalue()
            if s.endswith(self.ps1):
                return self.ps1, s.replace('\r\n', '\n')[:-len(self.ps1)]
            if s.endswith(self.ps2):
                return self.ps2, s.replace('\r\n', '\n')[:-len(self.ps2)]
        
    def sendreceive(self, s, hint):
        self.send(s)
        ps, r = self.receive(hint)
        if r.startswith(s):
            r = r[len(s):]
        return ps, r
    
    def run(self):
        ofp = None
        basename = os.path.basename(self.name)
        self.status('running %s ' % basename)
        tmpdir = tempfile.mkdtemp(prefix=basename)

        # remove the marker file that we tell make to use to see if
        # this run succeeded
        maybe_unlink(self.name + '.run')

        rcfile = os.path.join(tmpdir, '.hgrc')
        rcfp = wopen(rcfile)
        print >> rcfp, '[ui]'
        print >> rcfp, "username = Bryan O'Sullivan <bos@serpentine.com>"
        
        rcfile = os.path.join(tmpdir, '.bashrc')
        rcfp = wopen(rcfile)
        print >> rcfp, 'PS1="%s"' % self.ps1
        print >> rcfp, 'PS2="%s"' % self.ps2
        print >> rcfp, 'unset HISTFILE'
        path = ['/usr/bin', '/bin']
        hg = find_path_to('hg')
        if hg and hg not in path:
            path.append(hg)
        def re_export(envar):
            v = os.getenv(envar)
            if v is not None:
                print >> rcfp, 'export ' + envar + '=' + v
        print >> rcfp, 'export PATH=' + ':'.join(path)
        re_export('PYTHONPATH')
        print >> rcfp, 'export EXAMPLE_DIR="%s"' % os.getcwd()
        print >> rcfp, 'export HGMERGE=merge'
        print >> rcfp, 'export LANG=C'
        print >> rcfp, 'export LC_ALL=C'
        print >> rcfp, 'export TZ=GMT'
        print >> rcfp, 'export HGRC="%s/.hgrc"' % tmpdir
        print >> rcfp, 'export HGRCPATH=$HGRC'
        print >> rcfp, 'cd %s' % tmpdir
        rcfp.close()
        sys.stdout.flush()
        sys.stderr.flush()
        self.pid, self.cfd = pty.fork()
        if self.pid == 0:
            cmdline = ['/usr/bin/env', '-i', 'bash', '--noediting',
                       '--noprofile', '--norc']
            try:
                os.execv(cmdline[0], cmdline)
            except OSError, err:
                print >> sys.stderr, '%s: %s' % (cmdline[0], err.strerror)
                sys.stderr.flush()
                os._exit(0)
        self.poll.register(self.cfd, select.POLLIN | select.POLLERR |
                           select.POLLHUP)

        prompts = {
            '': '',
            self.ps1: '$',
            self.ps2: '>',
            }

        ignore = [
            r'\d+:[0-9a-f]{12}', # changeset number:hash
            r'[0-9a-f]{40}', # long changeset hash
            r'[0-9a-f]{12}', # short changeset hash
            r'^(?:---|\+\+\+) .*', # diff header with dates
            r'^date:.*', # date
            #r'^diff -r.*', # "diff -r" is followed by hash
            r'^# Date \d+ \d+', # hg patch header
            ]

        err = False
        read_hint = ''

        try:
            try:
                # eat first prompt string from shell
                self.read(read_hint)
                # setup env and prompt
                ps, output = self.sendreceive('source %s\n' % rcfile,
                                              read_hint)
                for hunk in self.parse():
                    # is this line a processing instruction?
                    m = self.pi_re.match(hunk)
                    if m:
                        pi, rest = m.groups()
                        if pi == 'name':
                            self.status('.')
                            out = rest
                            if out in ('err', 'lxo', 'out', 'run', 'tmp'):
                                print >> sys.stderr, ('%s: illegal section '
                                                      'name %r' %
                                                      (self.name, out))
                                return 1
                            assert os.sep not in out
                            if ofp is not None:
                                ofp.write('</screen>\n')
                                ofp.write('<!-- END %s -->\n' % ofp_basename)
                                ofp.close()
                                err |= self.rename_output(ofp_basename, ignore)
                            if out:
                                ofp_basename = '%s.%s' % (self.name, out)
                                norm = os.path.normpath(ofp_basename)
                                norm = norm.replace(os.sep, '-')
                                example.entities[
                                    '<!ENTITY interaction.%s '
                                    'SYSTEM "results/%s.lxo">'
                                    % (norm, norm)] = 1
                                read_hint = ofp_basename + ' '
                                ofp = wopen(result_name(ofp_basename + '.tmp'))
                                ofp.write('<!-- BEGIN %s -->\n' % ofp_basename)
                                ofp.write('<screen>')
                            else:
                                ofp = None
                        elif pi == 'ignore':
                            ignore.append(rest)
                    elif hunk.strip():
                        # it's something we should execute
                        newps, output = self.sendreceive(hunk, read_hint)
                        if not ofp:
                            continue
                        # first, print the command we ran
                        if not hunk.startswith('#'):
                            nl = hunk.endswith('\n')
                            hunk = ('<prompt>%s</prompt> '
                                    '<userinput>%s</userinput>' %
                                    (prompts[ps],
                                     xml_escape(hunk.rstrip('\n'))))
                            if nl: hunk += '\n'
                        ofp.write(hunk)
                        # then its output
                        ofp.write(xml_escape(output))
                        ps = newps
                self.status('\n')
            except:
                print >> sys.stderr, '(killed)'
                os.kill(self.pid, signal.SIGKILL)
                pid, rc = os.wait()
                raise
            else:
                try:
                    ps, output = self.sendreceive('exit\n', read_hint)
                    if ofp is not None:
                        ofp.write(output)
                        ofp.write('</screen>\n')
                        ofp.write('<!-- END %s -->\n' % ofp_basename)
                        ofp.close()
                        err |= self.rename_output(ofp_basename, ignore)
                    os.close(self.cfd)
                except IOError:
                    pass
                os.kill(self.pid, signal.SIGTERM)
                pid, rc = os.wait()
                err = err or rc
                if err:
                    if os.WIFEXITED(rc):
                        print >> sys.stderr, '(exit %s)' % os.WEXITSTATUS(rc)
                    elif os.WIFSIGNALED(rc):
                        print >> sys.stderr, '(signal %s)' % os.WTERMSIG(rc)
                else:
                    wopen(result_name(self.name + '.run'))
                return err
        finally:
            shutil.rmtree(tmpdir)

def print_help(exit, msg=None):
    if msg:
        print >> sys.stderr, 'Error:', msg
    print >> sys.stderr, 'Usage: run-example [options] [test...]'
    print >> sys.stderr, 'Options:'
    print >> sys.stderr, '  -a --all       run all examples in this directory'
    print >> sys.stderr, '  -h --help      print this help message'
    print >> sys.stderr, '     --keep      keep new output as desired output'
    print >> sys.stderr, '  -v --verbose   display extra debug output'
    sys.exit(exit)

def main(path='.'):
    if os.path.realpath(path).split(os.sep)[-1] != 'examples':
        print >> sys.stderr, 'Not being run from the examples directory!'
        sys.exit(1)

    opts, args = getopt.getopt(sys.argv[1:], '?ahv',
                               ['all', 'help', 'keep', 'verbose'])
    verbose = False
    run_all = False
    keep_change = False

    for o, a in opts:
        if o in ('-h', '-?', '--help'):
            print_help(0)
        if o in ('-a', '--all'):
            run_all = True
        if o in ('--keep',):
            keep_change = True
        if o in ('-v', '--verbose'):
            verbose = True
    errs = 0
    if args:
        for a in args:
            try:
                st = os.lstat(a)
            except OSError, err:
                print >> sys.stderr, '%s: %s' % (a, err.strerror)
                errs += 1
                continue
            if stat.S_ISREG(st.st_mode):
                if st.st_mode & 0111:
                    if shell_example(a, verbose, keep_change).run():
                        errs += 1
                elif a.endswith('.lst'):
                    static_example(a, verbose, keep_change).run()
            else:
                print >> sys.stderr, '%s: not a file, or not executable' % a
                errs += 1
    elif run_all:
        names = glob.glob("*") + glob.glob("app*/*") + glob.glob("ch*/*")
        names.sort()
        for name in names:
            if name == 'run-example' or name.endswith('~'): continue
            pathname = os.path.join(path, name)
            try:
                st = os.lstat(pathname)
            except OSError, err:
                # could be an output file that was removed while we ran
                if err.errno != errno.ENOENT:
                    raise
                continue
            if stat.S_ISREG(st.st_mode):
                if st.st_mode & 0111:
                    if shell_example(pathname, verbose, keep_change).run():
                        errs += 1
                elif pathname.endswith('.lst'):
                    static_example(pathname, verbose, keep_change).run()
        print >> wopen(os.path.join(path, '.run')), time.asctime()
    else:
        print_help(1, msg='no test names given, and --all not provided')

    fp = wopen('auto-snippets.xml')
    for key in sorted(example.entities.iterkeys()):
        print >> fp, key
    fp.close()
    return errs

if __name__ == '__main__':
    try:
        sys.exit(main())
    except KeyboardInterrupt:
        print >> sys.stderr, 'interrupted!'
        sys.exit(1)