Mercurial > hgbook
diff ja/examples/run-example @ 290:b0db5adf11c1 ja_root
fork Japanese translation.
author | Yoshiki Yazawa <yaz@cc.rim.or.jp> |
---|---|
date | Wed, 06 Feb 2008 17:43:11 +0900 |
parents | en/examples/run-example@1a55ba6ceca1 |
children | 32d33b238b7e |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/ja/examples/run-example Wed Feb 06 17:43:11 2008 +0900 @@ -0,0 +1,391 @@ +#!/usr/bin/env python +# +# This program takes something that resembles a shell script and runs +# it, spitting input (commands from the script) and output into text +# files, for use in examples. + +import cStringIO +import errno +import getopt +import os +import pty +import re +import select +import shutil +import signal +import stat +import sys +import tempfile +import time + +tex_subs = { + '\\': '\\textbackslash{}', + '{': '\\{', + '}': '\\}', + } + +def gensubs(s): + start = 0 + for i, c in enumerate(s): + sub = tex_subs.get(c) + if sub: + yield s[start:i] + start = i + 1 + yield sub + yield s[start:] + +def tex_escape(s): + return ''.join(gensubs(s)) + +def maybe_unlink(name): + try: + os.unlink(name) + return True + except OSError, err: + if err.errno != errno.ENOENT: + raise + return False + +def find_path_to(program): + for p in os.environ.get('PATH', os.defpath).split(os.pathsep): + name = os.path.join(p, program) + if os.access(name, os.X_OK): + return p + return None + +class example: + shell = '/usr/bin/env bash' + ps1 = '__run_example_ps1__ ' + ps2 = '__run_example_ps2__ ' + pi_re = re.compile(r'#\$\s*(name|ignore):\s*(.*)$') + + timeout = 10 + + def __init__(self, name, verbose): + self.name = name + self.verbose = verbose + self.poll = select.poll() + + def parse(self): + '''yield each hunk of input from the file.''' + fp = open(self.name) + cfp = cStringIO.StringIO() + for line in fp: + cfp.write(line) + if not line.rstrip().endswith('\\'): + yield cfp.getvalue() + cfp.seek(0) + cfp.truncate() + + def status(self, s): + sys.stdout.write(s) + if not s.endswith('\n'): + sys.stdout.flush() + + def send(self, s): + if self.verbose: + print >> sys.stderr, '>', self.debugrepr(s) + while s: + count = os.write(self.cfd, s) + s = s[count:] + + def debugrepr(self, s): + rs = repr(s) + limit = 60 + if len(rs) > limit: + return ('%s%s ... [%d bytes]' % (rs[:limit], rs[0], len(s))) + else: + return rs + + timeout = 5 + + def read(self, hint): + events = self.poll.poll(self.timeout * 1000) + if not events: + print >> sys.stderr, ('[%stimed out after %d seconds]' % + (hint, self.timeout)) + os.kill(self.pid, signal.SIGHUP) + return '' + return os.read(self.cfd, 1024) + + def receive(self, hint): + out = cStringIO.StringIO() + while True: + try: + if self.verbose: + sys.stderr.write('< ') + s = self.read(hint) + except OSError, err: + if err.errno == errno.EIO: + return '', '' + raise + if self.verbose: + print >> sys.stderr, self.debugrepr(s) + out.write(s) + s = out.getvalue() + if s.endswith(self.ps1): + return self.ps1, s.replace('\r\n', '\n')[:-len(self.ps1)] + if s.endswith(self.ps2): + return self.ps2, s.replace('\r\n', '\n')[:-len(self.ps2)] + + def sendreceive(self, s, hint): + self.send(s) + ps, r = self.receive(hint) + if r.startswith(s): + r = r[len(s):] + return ps, r + + def run(self): + ofp = None + basename = os.path.basename(self.name) + self.status('running %s ' % basename) + tmpdir = tempfile.mkdtemp(prefix=basename) + + # remove the marker file that we tell make to use to see if + # this run succeeded + maybe_unlink(self.name + '.run') + + rcfile = os.path.join(tmpdir, '.hgrc') + rcfp = open(rcfile, 'w') + print >> rcfp, '[ui]' + print >> rcfp, "username = Bryan O'Sullivan <bos@serpentine.com>" + + rcfile = os.path.join(tmpdir, '.bashrc') + rcfp = open(rcfile, 'w') + print >> rcfp, 'PS1="%s"' % self.ps1 + print >> rcfp, 'PS2="%s"' % self.ps2 + print >> rcfp, 'unset HISTFILE' + path = ['/usr/bin', '/bin'] + hg = find_path_to('hg') + if hg and hg not in path: + path.append(hg) + def re_export(envar): + v = os.getenv(envar) + if v is not None: + print >> rcfp, 'export ' + envar + '=' + v + print >> rcfp, 'export PATH=' + ':'.join(path) + re_export('PYTHONPATH') + print >> rcfp, 'export EXAMPLE_DIR="%s"' % os.getcwd() + print >> rcfp, 'export HGMERGE=merge' + print >> rcfp, 'export LANG=C' + print >> rcfp, 'export LC_ALL=C' + print >> rcfp, 'export TZ=GMT' + print >> rcfp, 'export HGRC="%s/.hgrc"' % tmpdir + print >> rcfp, 'export HGRCPATH=$HGRC' + print >> rcfp, 'cd %s' % tmpdir + rcfp.close() + sys.stdout.flush() + sys.stderr.flush() + self.pid, self.cfd = pty.fork() + if self.pid == 0: + cmdline = ['/usr/bin/env', '-i', 'bash', '--noediting', + '--noprofile', '--norc'] + try: + os.execv(cmdline[0], cmdline) + except OSError, err: + print >> sys.stderr, '%s: %s' % (cmdline[0], err.strerror) + sys.stderr.flush() + os._exit(0) + self.poll.register(self.cfd, select.POLLIN | select.POLLERR | + select.POLLHUP) + + prompts = { + '': '', + self.ps1: '$', + self.ps2: '>', + } + + ignore = [ + r'\d+:[0-9a-f]{12}', # changeset number:hash + r'[0-9a-f]{40}', # long changeset hash + r'[0-9a-f]{12}', # short changeset hash + r'^(?:---|\+\+\+) .*', # diff header with dates + r'^date:.*', # date + #r'^diff -r.*', # "diff -r" is followed by hash + r'^# Date \d+ \d+', # hg patch header + ] + + err = False + read_hint = '' + + try: + try: + # eat first prompt string from shell + self.read(read_hint) + # setup env and prompt + ps, output = self.sendreceive('source %s\n' % rcfile, + read_hint) + for hunk in self.parse(): + # is this line a processing instruction? + m = self.pi_re.match(hunk) + if m: + pi, rest = m.groups() + if pi == 'name': + self.status('.') + out = rest + if out in ('err', 'lxo', 'out', 'run', 'tmp'): + print >> sys.stderr, ('%s: illegal section ' + 'name %r' % + (self.name, out)) + return 1 + assert os.sep not in out + if ofp is not None: + ofp.close() + err |= self.rename_output(ofp_basename, ignore) + if out: + ofp_basename = '%s.%s' % (self.name, out) + read_hint = ofp_basename + ' ' + ofp = open(ofp_basename + '.tmp', 'w') + else: + ofp = None + elif pi == 'ignore': + ignore.append(rest) + elif hunk.strip(): + # it's something we should execute + newps, output = self.sendreceive(hunk, read_hint) + if not ofp: + continue + # first, print the command we ran + if not hunk.startswith('#'): + nl = hunk.endswith('\n') + hunk = ('%s \\textbf{%s}' % + (prompts[ps], + tex_escape(hunk.rstrip('\n')))) + if nl: hunk += '\n' + ofp.write(hunk) + # then its output + ofp.write(tex_escape(output)) + ps = newps + self.status('\n') + except: + print >> sys.stderr, '(killed)' + os.kill(self.pid, signal.SIGKILL) + pid, rc = os.wait() + raise + else: + try: + ps, output = self.sendreceive('exit\n', read_hint) + if ofp is not None: + ofp.write(output) + ofp.close() + err |= self.rename_output(ofp_basename, ignore) + os.close(self.cfd) + except IOError: + pass + os.kill(self.pid, signal.SIGTERM) + pid, rc = os.wait() + err = err or rc + if err: + if os.WIFEXITED(rc): + print >> sys.stderr, '(exit %s)' % os.WEXITSTATUS(rc) + elif os.WIFSIGNALED(rc): + print >> sys.stderr, '(signal %s)' % os.WTERMSIG(rc) + else: + open(self.name + '.run', 'w') + return err + finally: + shutil.rmtree(tmpdir) + + def rename_output(self, base, ignore): + mangle_re = re.compile('(?:' + '|'.join(ignore) + ')') + def mangle(s): + return mangle_re.sub('', s) + def matchfp(fp1, fp2): + while True: + s1 = mangle(fp1.readline()) + s2 = mangle(fp2.readline()) + if cmp(s1, s2): + break + if not s1: + return True + return False + + oldname = base + '.out' + tmpname = base + '.tmp' + errname = base + '.err' + errfp = open(errname, 'w+') + for line in open(tmpname): + errfp.write(mangle_re.sub('', line)) + os.rename(tmpname, base + '.lxo') + errfp.seek(0) + try: + oldfp = open(oldname) + except IOError, err: + if err.errno != errno.ENOENT: + raise + os.rename(errname, oldname) + return False + if matchfp(oldfp, errfp): + os.unlink(errname) + return False + else: + print >> sys.stderr, '\nOutput of %s has changed!' % base + os.system('diff -u %s %s 1>&2' % (oldname, errname)) + return True + +def print_help(exit, msg=None): + if msg: + print >> sys.stderr, 'Error:', msg + print >> sys.stderr, 'Usage: run-example [options] [test...]' + print >> sys.stderr, 'Options:' + print >> sys.stderr, ' -a --all run all tests in this directory' + print >> sys.stderr, ' -h --help print this help message' + print >> sys.stderr, ' -v --verbose display extra debug output' + sys.exit(exit) + +def main(path='.'): + opts, args = getopt.getopt(sys.argv[1:], '?ahv', + ['all', 'help', 'verbose']) + verbose = False + run_all = False + for o, a in opts: + if o in ('-h', '-?', '--help'): + print_help(0) + if o in ('-a', '--all'): + run_all = True + if o in ('-v', '--verbose'): + verbose = True + errs = 0 + if args: + for a in args: + try: + st = os.lstat(a) + except OSError, err: + print >> sys.stderr, '%s: %s' % (a, err.strerror) + errs += 1 + continue + if stat.S_ISREG(st.st_mode) and st.st_mode & 0111: + if example(a, verbose).run(): + errs += 1 + else: + print >> sys.stderr, '%s: not a file, or not executable' % a + errs += 1 + elif run_all: + names = os.listdir(path) + names.sort() + for name in names: + if name == 'run-example' or name.startswith('.'): continue + if name.endswith('.out') or name.endswith('~'): continue + if name.endswith('.run'): continue + pathname = os.path.join(path, name) + try: + st = os.lstat(pathname) + except OSError, err: + # could be an output file that was removed while we ran + if err.errno != errno.ENOENT: + raise + continue + if stat.S_ISREG(st.st_mode) and st.st_mode & 0111: + if example(pathname, verbose).run(): + errs += 1 + print >> open(os.path.join(path, '.run'), 'w'), time.asctime() + else: + print_help(1, msg='no test names given, and --all not provided') + return errs + +if __name__ == '__main__': + try: + sys.exit(main()) + except KeyboardInterrupt: + print >> sys.stderr, 'interrupted!' + sys.exit(1)