view en/examples/run-example @ 130:26b7a4e943aa

Describe the bisect extension.
author Bryan O'Sullivan <bos@serpentine.com>
date Thu, 28 Dec 2006 14:06:15 -0800
parents c9aad709bd3a
children 7b5894fffc37
line wrap: on
line source

#!/usr/bin/env python
#
# This program takes something that resembles a shell script and runs
# it, spitting input (commands from the script) and output into text
# files, for use in examples.

import cStringIO
import errno
import getopt
import os
import pty
import re
import select
import shutil
import signal
import stat
import sys
import tempfile
import time

tex_subs = {
    '\\': '\\textbackslash{}',
    '{': '\\{',
    '}': '\\}',
    }

def gensubs(s):
    start = 0
    for i, c in enumerate(s):
        sub = tex_subs.get(c)
        if sub:
            yield s[start:i]
            start = i + 1
            yield sub
    yield s[start:]

def tex_escape(s):
    return ''.join(gensubs(s))
        
class example:
    shell = '/usr/bin/env bash'
    ps1 = '__run_example_ps1__ '
    ps2 = '__run_example_ps2__ '
    pi_re = re.compile(r'#\$\s*(name):\s*(.*)$')
    
    timeout = 5

    def __init__(self, name, verbose):
        self.name = name
        self.verbose = verbose
        self.poll = select.poll()

    def parse(self):
        '''yield each hunk of input from the file.'''
        fp = open(self.name)
        cfp = cStringIO.StringIO()
        for line in fp:
            cfp.write(line)
            if not line.rstrip().endswith('\\'):
                yield cfp.getvalue()
                cfp.seek(0)
                cfp.truncate()
        
    def status(self, s):
        sys.stdout.write(s)
        if not s.endswith('\n'):
            sys.stdout.flush()

    def send(self, s):
        if self.verbose:
            print >> sys.stderr, '>', self.debugrepr(s)
        while s:
            count = os.write(self.cfd, s)
            s = s[count:]

    def debugrepr(self, s):
        rs = repr(s)
        limit = 60
        if len(rs) > limit:
            return ('%s%s ... [%d bytes]' % (rs[:limit], rs[0], len(s)))
        else:
            return rs
            
    timeout = 5

    def read(self):
        events = self.poll.poll(self.timeout * 1000)
        if not events:
            print >> sys.stderr, '[timed out after %d seconds]' % self.timeout
            os.kill(self.pid, signal.SIGHUP)
            return ''
        return os.read(self.cfd, 1024)
        
    def receive(self):
        out = cStringIO.StringIO()
        while True:
            try:
                if self.verbose:
                    sys.stderr.write('< ')
                s = self.read()
            except OSError, err:
                if err.errno == errno.EIO:
                    return '', ''
                raise
            if self.verbose:
                print >> sys.stderr, self.debugrepr(s)
            out.write(s)
            s = out.getvalue()
            if s.endswith(self.ps1):
                return self.ps1, s.replace('\r\n', '\n')[:-len(self.ps1)]
            if s.endswith(self.ps2):
                return self.ps2, s.replace('\r\n', '\n')[:-len(self.ps2)]
        
    def sendreceive(self, s):
        self.send(s)
        ps, r = self.receive()
        if r.startswith(s):
            r = r[len(s):]
        return ps, r
    
    def run(self):
        ofp = None
        basename = os.path.basename(self.name)
        self.status('running %s ' % basename)
        tmpdir = tempfile.mkdtemp(prefix=basename)

        rcfile = os.path.join(tmpdir, '.hgrc')
        rcfp = open(rcfile, 'w')
        print >> rcfp, '[ui]'
        print >> rcfp, "username = Bryan O'Sullivan <bos@serpentine.com>"
        
        rcfile = os.path.join(tmpdir, '.bashrc')
        rcfp = open(rcfile, 'w')
        print >> rcfp, 'PS1="%s"' % self.ps1
        print >> rcfp, 'PS2="%s"' % self.ps2
        print >> rcfp, 'unset HISTFILE'
        print >> rcfp, 'export EXAMPLE_DIR="%s"' % os.getcwd()
        print >> rcfp, 'export HGMERGE=merge'
        print >> rcfp, 'export LANG=C'
        print >> rcfp, 'export LC_ALL=C'
        print >> rcfp, 'export TZ=GMT'
        print >> rcfp, 'export HGRC="%s/.hgrc"' % tmpdir
        print >> rcfp, 'export HGRCPATH=$HGRC'
        print >> rcfp, 'cd %s' % tmpdir
        rcfp.close()
        sys.stdout.flush()
        sys.stderr.flush()
        self.pid, self.cfd = pty.fork()
        if self.pid == 0:
            cmdline = ['/usr/bin/env', 'bash', '--noediting', '--noprofile',
                       '--norc']
            try:
                os.execv(cmdline[0], cmdline)
            except OSError, err:
                print >> sys.stderr, '%s: %s' % (cmdline[0], err.strerror)
                sys.stderr.flush()
                os._exit(0)
        self.poll.register(self.cfd, select.POLLIN | select.POLLERR |
                           select.POLLHUP)

        prompts = {
            '': '',
            self.ps1: '$',
            self.ps2: '>',
            }

        try:
            try:
                # eat first prompt string from shell
                self.read()
                # setup env and prompt
                ps, output = self.sendreceive('source %s\n' % rcfile)
                for hunk in self.parse():
                    # is this line a processing instruction?
                    m = self.pi_re.match(hunk)
                    if m:
                        pi, rest = m.groups()
                        if pi == 'name':
                            self.status('.')
                            out = rest
                            assert os.sep not in out
                            if out:
                                ofp = open('%s.%s.out' % (self.name, out), 'w')
                            else:
                                ofp = None
                    elif hunk.strip():
                        # it's something we should execute
                        newps, output = self.sendreceive(hunk)
                        if not ofp:
                            continue
                        # first, print the command we ran
                        if not hunk.startswith('#'):
                            nl = hunk.endswith('\n')
                            hunk = ('%s \\textbf{%s}' %
                                    (prompts[ps],
                                     tex_escape(hunk.rstrip('\n'))))
                            if nl: hunk += '\n'
                        ofp.write(hunk)
                        # then its output
                        ofp.write(tex_escape(output))
                    ps = newps
                self.status('\n')
                open(self.name + '.run', 'w')
            except:
                print >> sys.stderr, '(killed)'
                os.kill(self.pid, signal.SIGKILL)
                pid, rc = os.wait()
                raise
            else:
                try:
                    ps, output = self.sendreceive('exit\n')
                    if ofp:
                        ofp.write(output)
                    os.close(self.cfd)
                except IOError:
                    pass
                os.kill(self.pid, signal.SIGTERM)
                pid, rc = os.wait()
                if rc:
                    if os.WIFEXITED(rc):
                        print >> sys.stderr, '(exit %s)' % os.WEXITSTATUS(rc)
                    elif os.WIFSIGNALED(rc):
                        print >> sys.stderr, '(signal %s)' % os.WTERMSIG(rc)
                return rc
        finally:
            shutil.rmtree(tmpdir)

def main(path='.'):
    opts, args = getopt.getopt(sys.argv[1:], 'v', ['verbose'])
    verbose = False
    for o, a in opts:
        if o in ('-v', '--verbose'):
            verbose = True
    errs = 0
    if args:
        for a in args:
            try:
                st = os.lstat(a)
            except OSError, err:
                print >> sys.stderr, '%s: %s' % (a, err.strerror)
                errs += 1
                continue
            if stat.S_ISREG(st.st_mode) and st.st_mode & 0111:
                if example(a, verbose).run():
                    errs += 1
            else:
                print >> sys.stderr, '%s: not a file, or not executable' % a
                errs += 1
        return errs
    for name in os.listdir(path):
        if name == 'run-example' or name.startswith('.'): continue
        if name.endswith('.out') or name.endswith('~'): continue
        if name.endswith('.run'): continue
        pathname = os.path.join(path, name)
        st = os.lstat(pathname)
        if stat.S_ISREG(st.st_mode) and st.st_mode & 0111:
            if example(pathname, verbose).run():
                errs += 1
    print >> open(os.path.join(path, '.run'), 'w'), time.asctime()
    return errs

if __name__ == '__main__':
    sys.exit(main())