view es/examples/run-example @ 719:2ff0a43f1152

Update ch03
author Bryan O'Sullivan <bos@serpentine.com>
date Mon, 06 Apr 2009 23:13:53 -0700
parents 04c08ad7e92e
children
line wrap: on
line source

#!/usr/bin/env python
#
# This program takes something that resembles a shell script and runs
# it, spitting input (commands from the script) and output into text
# files, for use in examples.

import cStringIO
import errno
import getopt
import os
import pty
import re
import select
import shutil
import signal
import stat
import sys
import tempfile
import time

tex_subs = {
    '\\': '\\textbackslash{}',
    '{': '\\{',
    '}': '\\}',
    }

def gensubs(s):
    start = 0
    for i, c in enumerate(s):
        sub = tex_subs.get(c)
        if sub:
            yield s[start:i]
            start = i + 1
            yield sub
    yield s[start:]

def tex_escape(s):
    return ''.join(gensubs(s))
        
def maybe_unlink(name):
    try:
        os.unlink(name)
        return True
    except OSError, err:
        if err.errno != errno.ENOENT:
            raise
    return False

def find_path_to(program):
    for p in os.environ.get('PATH', os.defpath).split(os.pathsep):
        name = os.path.join(p, program)
        if os.access(name, os.X_OK):
            return p
    return None
        
class example:
    shell = '/usr/bin/env bash'
    ps1 = '__run_example_ps1__ '
    ps2 = '__run_example_ps2__ '
    pi_re = re.compile(r'#\$\s*(name|ignore):\s*(.*)$')
    
    timeout = 10

    def __init__(self, name, verbose):
        self.name = name
        self.verbose = verbose
        self.poll = select.poll()

    def parse(self):
        '''yield each hunk of input from the file.'''
        fp = open(self.name)
        cfp = cStringIO.StringIO()
        for line in fp:
            cfp.write(line)
            if not line.rstrip().endswith('\\'):
                yield cfp.getvalue()
                cfp.seek(0)
                cfp.truncate()
        
    def status(self, s):
        sys.stdout.write(s)
        if not s.endswith('\n'):
            sys.stdout.flush()

    def send(self, s):
        if self.verbose:
            print >> sys.stderr, '>', self.debugrepr(s)
        while s:
            count = os.write(self.cfd, s)
            s = s[count:]

    def debugrepr(self, s):
        rs = repr(s)
        limit = 60
        if len(rs) > limit:
            return ('%s%s ... [%d bytes]' % (rs[:limit], rs[0], len(s)))
        else:
            return rs
            
    timeout = 5

    def read(self, hint):
        events = self.poll.poll(self.timeout * 1000)
        if not events:
            print >> sys.stderr, ('[%stimed out after %d seconds]' %
                                  (hint, self.timeout))
            os.kill(self.pid, signal.SIGHUP)
            return ''
        return os.read(self.cfd, 1024)
        
    def receive(self, hint):
        out = cStringIO.StringIO()
        while True:
            try:
                if self.verbose:
                    sys.stderr.write('< ')
                s = self.read(hint)
            except OSError, err:
                if err.errno == errno.EIO:
                    return '', ''
                raise
            if self.verbose:
                print >> sys.stderr, self.debugrepr(s)
            out.write(s)
            s = out.getvalue()
            if s.endswith(self.ps1):
                return self.ps1, s.replace('\r\n', '\n')[:-len(self.ps1)]
            if s.endswith(self.ps2):
                return self.ps2, s.replace('\r\n', '\n')[:-len(self.ps2)]
        
    def sendreceive(self, s, hint):
        self.send(s)
        ps, r = self.receive(hint)
        if r.startswith(s):
            r = r[len(s):]
        return ps, r
    
    def run(self):
        ofp = None
        basename = os.path.basename(self.name)
        self.status('running %s ' % basename)
        tmpdir = tempfile.mkdtemp(prefix=basename)

        # remove the marker file that we tell make to use to see if
        # this run succeeded
        maybe_unlink(self.name + '.run')

        rcfile = os.path.join(tmpdir, '.hgrc')
        rcfp = open(rcfile, 'w')
        print >> rcfp, '[ui]'
        print >> rcfp, "username = Bryan O'Sullivan <bos@serpentine.com>"
        
        rcfile = os.path.join(tmpdir, '.bashrc')
        rcfp = open(rcfile, 'w')
        print >> rcfp, 'PS1="%s"' % self.ps1
        print >> rcfp, 'PS2="%s"' % self.ps2
        print >> rcfp, 'unset HISTFILE'
        path = ['/usr/bin', '/bin']
        hg = find_path_to('hg')
        if hg and hg not in path:
            path.append(hg)
        def re_export(envar):
            v = os.getenv(envar)
            if v is not None:
                print >> rcfp, 'export ' + envar + '=' + v
        print >> rcfp, 'export PATH=' + ':'.join(path)
        re_export('PYTHONPATH')
        print >> rcfp, 'export EXAMPLE_DIR="%s"' % os.getcwd()
        print >> rcfp, 'export HGMERGE=merge'
        print >> rcfp, 'export LANG=C'
        print >> rcfp, 'export LC_ALL=C'
        print >> rcfp, 'export TZ=GMT'
        print >> rcfp, 'export HGRC="%s/.hgrc"' % tmpdir
        print >> rcfp, 'export HGRCPATH=$HGRC'
        print >> rcfp, 'cd %s' % tmpdir
        rcfp.close()
        sys.stdout.flush()
        sys.stderr.flush()
        self.pid, self.cfd = pty.fork()
        if self.pid == 0:
            cmdline = ['/usr/bin/env', '-i', 'bash', '--noediting',
                       '--noprofile', '--norc']
            try:
                os.execv(cmdline[0], cmdline)
            except OSError, err:
                print >> sys.stderr, '%s: %s' % (cmdline[0], err.strerror)
                sys.stderr.flush()
                os._exit(0)
        self.poll.register(self.cfd, select.POLLIN | select.POLLERR |
                           select.POLLHUP)

        prompts = {
            '': '',
            self.ps1: '$',
            self.ps2: '>',
            }

        ignore = [
            r'\d+:[0-9a-f]{12}', # changeset number:hash
            r'[0-9a-f]{40}', # long changeset hash
            r'[0-9a-f]{12}', # short changeset hash
            r'^(?:---|\+\+\+) .*', # diff header with dates
            r'^date:.*', # date
            #r'^diff -r.*', # "diff -r" is followed by hash
            r'^# Date \d+ \d+', # hg patch header
            ]

        err = False
        read_hint = ''

        try:
            try:
                # eat first prompt string from shell
                self.read(read_hint)
                # setup env and prompt
                ps, output = self.sendreceive('source %s\n' % rcfile,
                                              read_hint)
                for hunk in self.parse():
                    # is this line a processing instruction?
                    m = self.pi_re.match(hunk)
                    if m:
                        pi, rest = m.groups()
                        if pi == 'name':
                            self.status('.')
                            out = rest
                            if out in ('err', 'lxo', 'out', 'run', 'tmp'):
                                print >> sys.stderr, ('%s: illegal section '
                                                      'name %r' %
                                                      (self.name, out))
                                return 1
                            assert os.sep not in out
                            if ofp is not None:
                                ofp.close()
                                err |= self.rename_output(ofp_basename, ignore)
                            if out:
                                ofp_basename = '%s.%s' % (self.name, out)
                                read_hint = ofp_basename + ' '
                                ofp = open(ofp_basename + '.tmp', 'w')
                            else:
                                ofp = None
                        elif pi == 'ignore':
                            ignore.append(rest)
                    elif hunk.strip():
                        # it's something we should execute
                        newps, output = self.sendreceive(hunk, read_hint)
                        if not ofp:
                            continue
                        # first, print the command we ran
                        if not hunk.startswith('#'):
                            nl = hunk.endswith('\n')
                            hunk = ('%s \\textbf{%s}' %
                                    (prompts[ps],
                                     tex_escape(hunk.rstrip('\n'))))
                            if nl: hunk += '\n'
                        ofp.write(hunk)
                        # then its output
                        ofp.write(tex_escape(output))
                    ps = newps
                self.status('\n')
            except:
                print >> sys.stderr, '(killed)'
                os.kill(self.pid, signal.SIGKILL)
                pid, rc = os.wait()
                raise
            else:
                try:
                    ps, output = self.sendreceive('exit\n', read_hint)
                    if ofp is not None:
                        ofp.write(output)
                        ofp.close()
                        err |= self.rename_output(ofp_basename, ignore)
                    os.close(self.cfd)
                except IOError:
                    pass
                os.kill(self.pid, signal.SIGTERM)
                pid, rc = os.wait()
                err = err or rc
                if err:
                    if os.WIFEXITED(rc):
                        print >> sys.stderr, '(exit %s)' % os.WEXITSTATUS(rc)
                    elif os.WIFSIGNALED(rc):
                        print >> sys.stderr, '(signal %s)' % os.WTERMSIG(rc)
                else:
                    open(self.name + '.run', 'w')
                return err
        finally:
            shutil.rmtree(tmpdir)

    def rename_output(self, base, ignore):
        mangle_re = re.compile('(?:' + '|'.join(ignore) + ')')
        def mangle(s):
            return mangle_re.sub('', s)
        def matchfp(fp1, fp2):
            while True:
                s1 = mangle(fp1.readline())
                s2 = mangle(fp2.readline())
                if cmp(s1, s2):
                    break
                if not s1:
                    return True
            return False

        oldname = base + '.out'
        tmpname = base + '.tmp'
        errname = base + '.err'
        errfp = open(errname, 'w+')
        for line in open(tmpname):
            errfp.write(mangle_re.sub('', line))
        os.rename(tmpname, base + '.lxo')
        errfp.seek(0)
        try:
            oldfp = open(oldname)
        except IOError, err:
            if err.errno != errno.ENOENT:
                raise
            os.rename(errname, oldname)
            return False
        if matchfp(oldfp, errfp):
            os.unlink(errname)
            return False
        else:
            print >> sys.stderr, '\nOutput of %s has changed!' % base
            os.system('diff -u %s %s 1>&2' % (oldname, errname))
            return True

def print_help(exit, msg=None):
    if msg:
        print >> sys.stderr, 'Error:', msg
    print >> sys.stderr, 'Usage: run-example [options] [test...]'
    print >> sys.stderr, 'Options:'
    print >> sys.stderr, '  -a --all       run all tests in this directory'
    print >> sys.stderr, '  -h --help      print this help message'
    print >> sys.stderr, '  -v --verbose   display extra debug output'
    sys.exit(exit)

def main(path='.'):
    opts, args = getopt.getopt(sys.argv[1:], '?ahv',
                               ['all', 'help', 'verbose'])
    verbose = False
    run_all = False
    for o, a in opts:
        if o in ('-h', '-?', '--help'):
            print_help(0)
        if o in ('-a', '--all'):
            run_all = True
        if o in ('-v', '--verbose'):
            verbose = True
    errs = 0
    if args:
        for a in args:
            try:
                st = os.lstat(a)
            except OSError, err:
                print >> sys.stderr, '%s: %s' % (a, err.strerror)
                errs += 1
                continue
            if stat.S_ISREG(st.st_mode) and st.st_mode & 0111:
                if example(a, verbose).run():
                    errs += 1
            else:
                print >> sys.stderr, '%s: not a file, or not executable' % a
                errs += 1
    elif run_all:
        names = os.listdir(path)
        names.sort()
        for name in names:
            if name == 'run-example' or name.startswith('.'): continue
            if name.endswith('.out') or name.endswith('~'): continue
            if name.endswith('.run'): continue
            pathname = os.path.join(path, name)
            try:
                st = os.lstat(pathname)
            except OSError, err:
                # could be an output file that was removed while we ran
                if err.errno != errno.ENOENT:
                    raise
                continue
            if stat.S_ISREG(st.st_mode) and st.st_mode & 0111:
                if example(pathname, verbose).run():
                    errs += 1
        print >> open(os.path.join(path, '.run'), 'w'), time.asctime()
    else:
        print_help(1, msg='no test names given, and --all not provided')
    return errs

if __name__ == '__main__':
    try:
        sys.exit(main())
    except KeyboardInterrupt:
        print >> sys.stderr, 'interrupted!'
        sys.exit(1)