Mercurial > hgbook
view en/examples/run-example @ 812:6b7818eb3d8e
Fix bogons
author | Bryan O'Sullivan <bos@serpentine.com> |
---|---|
date | Fri, 24 Apr 2009 00:31:21 -0700 |
parents | 3b640272a966 |
children | 019040fbf5f5 |
line wrap: on
line source
#!/usr/bin/env python # # This program takes something that resembles a shell script and runs # it, spitting input (commands from the script) and output into text # files, for use in examples. import cStringIO import errno import getopt import glob import os import pty import re import select import shutil import signal import stat import sys import tempfile import time xml_subs = { '<': '<', '>': '>', '&': '&', } def gensubs(s): start = 0 for i, c in enumerate(s): sub = xml_subs.get(c) if sub: yield s[start:i] start = i + 1 yield sub yield s[start:] def xml_escape(s): return ''.join(gensubs(s)) def maybe_unlink(name): try: os.unlink(name) return True except OSError, err: if err.errno != errno.ENOENT: raise return False def find_path_to(program): for p in os.environ.get('PATH', os.defpath).split(os.pathsep): name = os.path.join(p, program) if os.access(name, os.X_OK): return p return None def result_name(name): return os.path.normpath(os.path.join('results', name.replace(os.sep, '-'))) def wopen(name): path = os.path.dirname(name) if path: try: os.makedirs(path) except OSError, err: if err.errno != errno.EEXIST: raise return open(name, 'w') class example: entities = dict.fromkeys(l.rstrip() for l in open('auto-snippets.xml')) def __init__(self, name, verbose, keep_change): self.name = os.path.normpath(name) self.verbose = verbose self.keep_change = keep_change def status(self, s): sys.stdout.write(s) if not s.endswith('\n'): sys.stdout.flush() def rename_output(self, base, ignore=[]): mangle_re = re.compile('(?:' + '|'.join(ignore) + ')') def mangle(s): return mangle_re.sub('', s) def matchfp(fp1, fp2): while True: s1 = mangle(fp1.readline()) s2 = mangle(fp2.readline()) if cmp(s1, s2): break if not s1: return True return False oldname = result_name(base + '.out') tmpname = result_name(base + '.tmp') errname = result_name(base + '.err') errfp = open(errname, 'w+') for line in open(tmpname): errfp.write(mangle_re.sub('', line)) os.rename(tmpname, result_name(base + '.lxo')) errfp.seek(0) try: oldfp = open(oldname) except IOError, err: if err.errno != errno.ENOENT: raise os.rename(errname, oldname) return False if matchfp(oldfp, errfp): os.unlink(errname) return False else: print >> sys.stderr, '\nOutput of %s has changed!' % base if self.keep_change: os.rename(errname, oldname) return False else: os.system('diff -u %s %s 1>&2' % (oldname, errname)) return True class static_example(example): def run(self): self.status('running %s\n' % self.name) s = open(self.name).read().rstrip() s = s.replace('&', '&').replace('<', '<').replace('>', '>') ofp = wopen(result_name(self.name + '.tmp')) ofp.write('<!-- BEGIN %s -->\n' % self.name) ofp.write('<programlisting>') ofp.write(s) ofp.write('</programlisting>\n') ofp.write('<!-- END %s -->\n' % self.name) ofp.close() self.rename_output(self.name) norm = self.name.replace(os.sep, '-') example.entities[ '<!ENTITY %s SYSTEM "results/%s.lxo">' % (norm, norm)] = 1 class shell_example(example): shell = '/usr/bin/env bash' ps1 = '__run_example_ps1__ ' ps2 = '__run_example_ps2__ ' pi_re = re.compile(r'#\$\s*(name|ignore):\s*(.*)$') timeout = 10 def __init__(self, name, verbose, keep_change): example.__init__(self, name, verbose, keep_change) self.poll = select.poll() def parse(self): '''yield each hunk of input from the file.''' fp = open(self.name) cfp = cStringIO.StringIO() for line in fp: cfp.write(line) if not line.rstrip().endswith('\\'): yield cfp.getvalue() cfp.seek(0) cfp.truncate() def send(self, s): if self.verbose: print >> sys.stderr, '>', self.debugrepr(s) while s: count = os.write(self.cfd, s) s = s[count:] def debugrepr(self, s): rs = repr(s) limit = 60 if len(rs) > limit: return ('%s%s ... [%d bytes]' % (rs[:limit], rs[0], len(s))) else: return rs timeout = 5 def read(self, hint): events = self.poll.poll(self.timeout * 1000) if not events: print >> sys.stderr, ('[%stimed out after %d seconds]' % (hint, self.timeout)) os.kill(self.pid, signal.SIGHUP) return '' return os.read(self.cfd, 1024) def receive(self, hint): out = cStringIO.StringIO() while True: try: if self.verbose: sys.stderr.write('< ') s = self.read(hint) except OSError, err: if err.errno == errno.EIO: return '', '' raise if self.verbose: print >> sys.stderr, self.debugrepr(s) out.write(s) s = out.getvalue() if s.endswith(self.ps1): return self.ps1, s.replace('\r\n', '\n')[:-len(self.ps1)] if s.endswith(self.ps2): return self.ps2, s.replace('\r\n', '\n')[:-len(self.ps2)] def sendreceive(self, s, hint): self.send(s) ps, r = self.receive(hint) if r.startswith(s): r = r[len(s):] return ps, r def run(self): ofp = None basename = os.path.basename(self.name) self.status('running %s ' % basename) tmpdir = tempfile.mkdtemp(prefix=basename) # remove the marker file that we tell make to use to see if # this run succeeded maybe_unlink(self.name + '.run') rcfile = os.path.join(tmpdir, '.hgrc') rcfp = wopen(rcfile) print >> rcfp, '[ui]' print >> rcfp, "username = Bryan O'Sullivan <bos@serpentine.com>" rcfile = os.path.join(tmpdir, '.bashrc') rcfp = wopen(rcfile) print >> rcfp, 'PS1="%s"' % self.ps1 print >> rcfp, 'PS2="%s"' % self.ps2 print >> rcfp, 'unset HISTFILE' path = ['/usr/bin', '/bin'] hg = find_path_to('hg') if hg and hg not in path: path.append(hg) def re_export(envar): v = os.getenv(envar) if v is not None: print >> rcfp, 'export ' + envar + '=' + v print >> rcfp, 'export PATH=' + ':'.join(path) re_export('PYTHONPATH') print >> rcfp, 'export EXAMPLE_DIR="%s"' % os.getcwd() print >> rcfp, 'export HGMERGE=merge' print >> rcfp, 'export LANG=C' print >> rcfp, 'export LC_ALL=C' print >> rcfp, 'export TZ=GMT' print >> rcfp, 'export HGRC="%s/.hgrc"' % tmpdir print >> rcfp, 'export HGRCPATH=$HGRC' print >> rcfp, 'cd %s' % tmpdir rcfp.close() sys.stdout.flush() sys.stderr.flush() self.pid, self.cfd = pty.fork() if self.pid == 0: cmdline = ['/usr/bin/env', '-i', 'bash', '--noediting', '--noprofile', '--norc'] try: os.execv(cmdline[0], cmdline) except OSError, err: print >> sys.stderr, '%s: %s' % (cmdline[0], err.strerror) sys.stderr.flush() os._exit(0) self.poll.register(self.cfd, select.POLLIN | select.POLLERR | select.POLLHUP) prompts = { '': '', self.ps1: '$', self.ps2: '>', } ignore = [ r'\d+:[0-9a-f]{12}', # changeset number:hash r'[0-9a-f]{40}', # long changeset hash r'[0-9a-f]{12}', # short changeset hash r'^(?:---|\+\+\+) .*', # diff header with dates r'^date:.*', # date #r'^diff -r.*', # "diff -r" is followed by hash r'^# Date \d+ \d+', # hg patch header ] err = False read_hint = '' try: try: # eat first prompt string from shell self.read(read_hint) # setup env and prompt ps, output = self.sendreceive('source %s\n' % rcfile, read_hint) for hunk in self.parse(): # is this line a processing instruction? m = self.pi_re.match(hunk) if m: pi, rest = m.groups() if pi == 'name': self.status('.') out = rest if out in ('err', 'lxo', 'out', 'run', 'tmp'): print >> sys.stderr, ('%s: illegal section ' 'name %r' % (self.name, out)) return 1 assert os.sep not in out if ofp is not None: ofp.write('</screen>\n') ofp.write('<!-- END %s -->\n' % ofp_basename) ofp.close() err |= self.rename_output(ofp_basename, ignore) if out: ofp_basename = '%s.%s' % (self.name, out) norm = os.path.normpath(ofp_basename) norm = norm.replace(os.sep, '-') example.entities[ '<!ENTITY interaction.%s ' 'SYSTEM "results/%s.lxo">' % (norm, norm)] = 1 read_hint = ofp_basename + ' ' ofp = wopen(result_name(ofp_basename + '.tmp')) ofp.write('<!-- BEGIN %s -->\n' % ofp_basename) ofp.write('<screen>') else: ofp = None elif pi == 'ignore': ignore.append(rest) elif hunk.strip(): # it's something we should execute newps, output = self.sendreceive(hunk, read_hint) if not ofp: continue # first, print the command we ran if not hunk.startswith('#'): nl = hunk.endswith('\n') hunk = ('<prompt>%s</prompt> ' '<userinput>%s</userinput>' % (prompts[ps], xml_escape(hunk.rstrip('\n')))) if nl: hunk += '\n' ofp.write(hunk) # then its output ofp.write(xml_escape(output)) ps = newps self.status('\n') except: print >> sys.stderr, '(killed)' os.kill(self.pid, signal.SIGKILL) pid, rc = os.wait() raise else: try: ps, output = self.sendreceive('exit\n', read_hint) if ofp is not None: ofp.write(output) ofp.write('</screen>\n') ofp.write('<!-- END %s -->\n' % ofp_basename) ofp.close() err |= self.rename_output(ofp_basename, ignore) os.close(self.cfd) except IOError: pass os.kill(self.pid, signal.SIGTERM) pid, rc = os.wait() err = err or rc if err: if os.WIFEXITED(rc): print >> sys.stderr, '(exit %s)' % os.WEXITSTATUS(rc) elif os.WIFSIGNALED(rc): print >> sys.stderr, '(signal %s)' % os.WTERMSIG(rc) else: wopen(result_name(self.name + '.run')) return err finally: shutil.rmtree(tmpdir) def print_help(exit, msg=None): if msg: print >> sys.stderr, 'Error:', msg print >> sys.stderr, 'Usage: run-example [options] [test...]' print >> sys.stderr, 'Options:' print >> sys.stderr, ' -a --all run all examples in this directory' print >> sys.stderr, ' -h --help print this help message' print >> sys.stderr, ' --keep keep new output as desired output' print >> sys.stderr, ' -v --verbose display extra debug output' sys.exit(exit) def main(path='.'): if os.path.realpath(path).split(os.sep)[-1] != 'examples': print >> sys.stderr, 'Not being run from the examples directory!' sys.exit(1) opts, args = getopt.getopt(sys.argv[1:], '?ahv', ['all', 'help', 'keep', 'verbose']) verbose = False run_all = False keep_change = False for o, a in opts: if o in ('-h', '-?', '--help'): print_help(0) if o in ('-a', '--all'): run_all = True if o in ('--keep',): keep_change = True if o in ('-v', '--verbose'): verbose = True errs = 0 if args: for a in args: try: st = os.lstat(a) except OSError, err: print >> sys.stderr, '%s: %s' % (a, err.strerror) errs += 1 continue if stat.S_ISREG(st.st_mode): if st.st_mode & 0111: if shell_example(a, verbose, keep_change).run(): errs += 1 elif a.endswith('.lst'): static_example(a, verbose, keep_change).run() else: print >> sys.stderr, '%s: not a file, or not executable' % a errs += 1 elif run_all: names = glob.glob("*") + glob.glob("app*/*") + glob.glob("ch*/*") names.sort() for name in names: if name == 'run-example' or name.endswith('~'): continue pathname = os.path.join(path, name) try: st = os.lstat(pathname) except OSError, err: # could be an output file that was removed while we ran if err.errno != errno.ENOENT: raise continue if stat.S_ISREG(st.st_mode): if st.st_mode & 0111: if shell_example(pathname, verbose, keep_change).run(): errs += 1 elif pathname.endswith('.lst'): static_example(pathname, verbose, keep_change).run() print >> wopen(os.path.join(path, '.run')), time.asctime() else: print_help(1, msg='no test names given, and --all not provided') fp = wopen('auto-snippets.xml') for key in sorted(example.entities.iterkeys()): print >> fp, key fp.close() return errs if __name__ == '__main__': try: sys.exit(main()) except KeyboardInterrupt: print >> sys.stderr, 'interrupted!' sys.exit(1)