view en/examples/run-example @ 83:b476081a9c04

Much progress in template chapter.
author Bryan O'Sullivan <bos@serpentine.com>
date Tue, 03 Oct 2006 13:03:42 -0700
parents 53427f786a0f
children 5b80c922ebdd
line wrap: on
line source

#!/usr/bin/env python
#
# This program takes something that resembles a shell script and runs
# it, spitting input (commands from the script) and output into text
# files, for use in examples.

import cStringIO
import errno
import getopt
import os
import pty
import re
import select
import shutil
import signal
import stat
import sys
import tempfile
import time

tex_subs = {
    '\\': '\\textbackslash{}',
    '{': '\\{',
    '}': '\\}',
    }

def gensubs(s):
    start = 0
    for i, c in enumerate(s):
        sub = tex_subs.get(c)
        if sub:
            yield s[start:i]
            start = i + 1
            yield sub
    yield s[start:]

def tex_escape(s):
    return ''.join(gensubs(s))
        
class example:
    shell = '/usr/bin/env bash'
    prompt = '__run_example_prompt__ '
    pi_re = re.compile(r'#\$\s*(name):\s*(.*)$')
    
    timeout = 5

    def __init__(self, name, verbose):
        self.name = name
        self.verbose = verbose
        self.poll = select.poll()

    def parse(self):
        '''yield each hunk of input from the file.'''
        fp = open(self.name)
        cfp = cStringIO.StringIO()
        for line in fp:
            cfp.write(line)
            if not line.rstrip().endswith('\\'):
                yield cfp.getvalue()
                cfp.seek(0)
                cfp.truncate()
        
    def status(self, s):
        sys.stdout.write(s)
        if not s.endswith('\n'):
            sys.stdout.flush()

    def send(self, s):
        if self.verbose:
            print >> sys.stderr, '>', self.debugrepr(s)
        while s:
            count = os.write(self.cfd, s)
            s = s[count:]

    def debugrepr(self, s):
        rs = repr(s)
        limit = 60
        if len(rs) > limit:
            return ('%s%s ... [%d bytes]' % (rs[:limit], rs[0], len(s)))
        else:
            return rs
            
    timeout = 5

    def read(self):
        events = self.poll.poll(self.timeout * 1000)
        if not events:
            print >> sys.stderr, '[timed out after %d seconds]' % self.timeout
            os.kill(self.pid, signal.SIGHUP)
            return ''
        return os.read(self.cfd, 1024)
        
    def receive(self):
        out = cStringIO.StringIO()
        while True:
            try:
                if self.verbose:
                    sys.stderr.write('< ')
                s = self.read()
            except OSError, err:
                if err.errno == errno.EIO:
                    return ''
                raise
            if self.verbose:
                print >> sys.stderr, self.debugrepr(s)
            out.write(s)
            s = out.getvalue()
            if s.endswith(self.prompt):
                return s.replace('\r\n', '\n')[:-len(self.prompt)]
        
    def sendreceive(self, s):
        self.send(s)
        r = self.receive()
        if r.startswith(s):
            r = r[len(s):]
        return r
    
    def run(self):
        ofp = None
        basename = os.path.basename(self.name)
        self.status('running %s ' % basename)
        tmpdir = tempfile.mkdtemp(prefix=basename)

        rcfile = os.path.join(tmpdir, '.hgrc')
        rcfp = open(rcfile, 'w')
        print >> rcfp, '[ui]'
        print >> rcfp, "username = Bryan O'Sullivan <bos@serpentine.com>"
        
        rcfile = os.path.join(tmpdir, '.bashrc')
        rcfp = open(rcfile, 'w')
        print >> rcfp, 'PS1="%s"' % self.prompt
        print >> rcfp, 'PS2="%s"' % self.prompt
        print >> rcfp, 'unset HISTFILE'
        print >> rcfp, 'export EXAMPLE_DIR="%s"' % os.getcwd()
        print >> rcfp, 'export LANG=C'
        print >> rcfp, 'export LC_ALL=C'
        print >> rcfp, 'export TZ=GMT'
        print >> rcfp, 'export HGRC="%s/.hgrc"' % tmpdir
        print >> rcfp, 'export HGRCPATH=$HGRC'
        print >> rcfp, 'cd %s' % tmpdir
        rcfp.close()
        sys.stdout.flush()
        sys.stderr.flush()
        self.pid, self.cfd = pty.fork()
        if self.pid == 0:
            cmdline = ['/usr/bin/env', 'bash', '--noediting', '--noprofile',
                       '--norc']
            try:
                os.execv(cmdline[0], cmdline)
            except OSError, err:
                print >> sys.stderr, '%s: %s' % (cmdline[0], err.strerror)
                sys.stderr.flush()
                os._exit(0)
        self.poll.register(self.cfd, select.POLLIN | select.POLLERR |
                           select.POLLHUP)
        try:
            try:
                # eat first prompt string from shell
                self.read()
                # setup env and prompt
                self.sendreceive('source %s\n' % rcfile)
                for hunk in self.parse():
                    # is this line a processing instruction?
                    m = self.pi_re.match(hunk)
                    if m:
                        pi, rest = m.groups()
                        if pi == 'name':
                            self.status('.')
                            out = rest
                            assert os.sep not in out
                            if out:
                                ofp = open('%s.%s.out' % (self.name, out), 'w')
                            else:
                                ofp = None
                    elif hunk.strip():
                        # it's something we should execute
                        output = self.sendreceive(hunk)
                        if not ofp:
                            continue
                        # first, print the command we ran
                        if not hunk.startswith('#'):
                            nl = hunk.endswith('\n')
                            hunk = ('$ \\textbf{%s}' %
                                    tex_escape(hunk.rstrip('\n')))
                            if nl: hunk += '\n'
                        ofp.write(hunk)
                        # then its output
                        ofp.write(tex_escape(output))
                self.status('\n')
                open(self.name + '.run', 'w')
            except:
                print >> sys.stderr, '(killed)'
                os.kill(self.pid, signal.SIGKILL)
                pid, rc = os.wait()
                raise
            else:
                try:
                    output = self.sendreceive('exit\n')
                    if ofp:
                        ofp.write(output)
                    os.close(self.cfd)
                except IOError:
                    pass
                os.kill(self.pid, signal.SIGTERM)
                pid, rc = os.wait()
                if rc:
                    if os.WIFEXITED(rc):
                        print >> sys.stderr, '(exit %s)' % os.WEXITSTATUS(rc)
                    elif os.WIFSIGNALED(rc):
                        print >> sys.stderr, '(signal %s)' % os.WTERMSIG(rc)
                return rc
        finally:
            shutil.rmtree(tmpdir)

def main(path='.'):
    opts, args = getopt.getopt(sys.argv[1:], 'v', ['verbose'])
    verbose = False
    for o, a in opts:
        if o in ('-v', '--verbose'):
            verbose = True
    errs = 0
    if args:
        for a in args:
            try:
                st = os.lstat(a)
            except OSError, err:
                print >> sys.stderr, '%s: %s' % (a, err.strerror)
                errs += 1
                continue
            if stat.S_ISREG(st.st_mode) and st.st_mode & 0111:
                if example(a, verbose).run():
                    errs += 1
            else:
                print >> sys.stderr, '%s: not a file, or not executable' % a
                errs += 1
        return errs
    for name in os.listdir(path):
        if name == 'run-example' or name.startswith('.'): continue
        if name.endswith('.out') or name.endswith('~'): continue
        if name.endswith('.run'): continue
        pathname = os.path.join(path, name)
        st = os.lstat(pathname)
        if stat.S_ISREG(st.st_mode) and st.st_mode & 0111:
            if example(pathname, verbose).run():
                errs += 1
    print >> open(os.path.join(path, '.run'), 'w'), time.asctime()
    return errs

if __name__ == '__main__':
    sys.exit(main())