PiDP-8/I Software

Artifact [2b365fe69f]
Log In

Artifact 2b365fe69fa9793bc019df4108996aed6249fcf1:


#!/usr/bin/env @PYCMD@
# -*- coding: utf-8 -*-
########################################################################
# simh/__init__.py - A wrapper class around pexpect for communicating
#   with an instance of the PiDP-8/I SIMH simulator running OS/8.
#
#   See ../doc/class-simh.md for a usage tutorial.
#
# Copyright © 2017 by Jonathan Trites, © 2017-2020 by William Cattey
# and Warren Young.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS LISTED ABOVE BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT
# OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE
# OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
# Except as contained in this notice, the names of the authors above
# shall not be used in advertising or otherwise to promote the sale,
# use or other dealings in this Software without prior written
# authorization from those authors.
########################################################################

import os
import pexpect
import pkg_resources
import subprocess
import tempfile
import time
import re
import sys

import pidp8i

#### simh #############################################################
# Object to manage a running SIMH process programmatically with pexpect.
# Functions in this file are organized from lower level to higher
# level where possible.


#### Private functions ################################################


#######################################################################
#  simh class

class simh:
  #### constants ######################################################
  # pexpect object instance, set by ctor
  _child = None

  # Constant used by os8_kbd_delay, assembled in stages:
  #
  # 1. PDP-8 RS-232 bits per character: 7-bit ASCII plus necessary
  #    start, stop, and parity bits.
  #
  # 2. The ratio of the instructions per second ratios of a PDP-8/I to
  #    that of the host hardware running the simulator.  The former is
  #    an approximate value; see lib/pidp8i/ips.py.in for the value and
  #    its defense.  The latter is either the IPS rate for:
  #
  #    a) a Raspberry Pi Model B+, that being the slowest host system
  #       we run this simulator on; or
  #
  #    b) the IPS rate of the actual host hardware if you have run the
  #       "bin/teco-pi-demo -b" benchmark on it.
  #
  # 2. The fact that real PDP-8s ran OS/8 reliably at 300 bps, and have
  #    been claimed to get flaky as early as 600 bps by some.  (Others
  #    claim to have run them up to 9,600 bps.)
  #
  # 3. The "safe BPS" value is the fastest bit per second speed actual
  #    PDP-8 hardware was known to run OS/8 terminal I/O at.  In this
  #    case, it is the high-speed tape reader.
  #
  #    TODO: We may be able to increase this.
  #
  #    We have one report that OS/8 was tested with terminals up to
  #    about ~600 bps before becoming unreliable.
  #
  #    We have another report that OS/8 could run under ETOS with
  #    9,600 bps terminals, but we don't know if that tells us anything
  #    about OS/8 running without the ETOS multitasking hardware.
  #
  # 4. Given above, calculate safe characters per second for host HW.
  #
  # 5. Invert to get seconds per character, that being the delay value.
  _bpc = 7 + 1 + 1 + 1                                       # [1]
  _ips_ratio = float (pidp8i.ips.current) / pidp8i.ips.pdp8i # [2]
  _pdp8i_safe_bps = 300                                      # [3]
  _host_safe_cps = _pdp8i_safe_bps * _ips_ratio / _bpc       # [4]
  _os8_kbd_delay = 1 / _host_safe_cps                        # [5]


  #### _simh_replies #################################################
  # Array of items that describe replies one might get from SIMH.
  # Each item is a 2 element array consisting of:
  #    [0] A help string that describes the reply.
  #        Match against this string when testing error values.
  #        This value may not be unique. When not unique
  #        the multiple values should represent the same error.
  #
  #    [1] A regular expression for pexpect to use to match on it.
  #        It is a good idea to terminate status messages with "\r"
  #        in order to confirm we've got the whole line,
  #        and prompts with "$" to terminate the search.
  #
  #        For speed and efficiency an array of compiled
  #        regular expressions, indexed to _simh_replies
  #        is created by __init__: simh_replies_rex
  
  _simh_replies = [
    # Prompts:
    ["Prompt", "sim> $"],
    # Guard against case we used the simh table when we meant the OS/8 table. 
    ["Monitor Prompt", "\n\\.$"],
    ["Non-existent device", "Non-existent device"],
    ]

  ####_os8_replies ##################################################
  # Simiar to _simh_replies, but describing replies from OS/8
  # when it is running under SIMH.
  # Each item is a 3 element array consisting of:
  #    [0] A help string that describes the reply.
  #        Match against this string when testing error values.
  #        This value may not be unique. When not unique
  #        the multiple values should represent the same error.
  #
  #    [1] A regular expression for pexpect to use to match on it.
  #        It is a good idea to terminate status messages with "\r"
  #        in order to confirm we've got the whole line,
  #        and prompts with "$" to terminate the search.
  #
  #        For speed and efficiency an array of compiled
  #        regular expressions, indexed to _simh_replies
  #        is created by __init__: os8_replies_rex
  #
  #    [2] A True/False indicator if the command returned to the
  #        keyboard monitor, rather than continuing in the program.    

  _os8_replies = [
    # Prompts:
    ["Monitor Prompt", "\n\\.$", True],
    ["Command Decoder Prompt", "\n\\*$", False],
    ["PIP Continue", "\\^$", True],                  # Newline NOT always present! 
    # OS/8 Handbook 1974 page 1-43/81 Keyboard Monitor Error Messages:
    ["Directory I/O Error", "MONITOR ERROR 2 AT \d+ \\(DIRECTORY I/O ERROR\\)", True],
    ["I/O Error on SYS", "MONITOR ERROR 5 AT \d+ \\(I/O ERROR ON SYS\\)", True],
    ["Directory I/O Error", "MONITOR ERROR 6 AT \d+ \\(DIRECTORY I/O ERROR\\)\r", True],
    ["Device not available", "(\S+) NOT AVAILABLE", False],
    ["File not found", "(\S+) NOT FOUND", False],
    #  OS/8 Handbook 1974 page 1-51/89 Command Decoder Error Messages
    ["Illegal Syntax", "ILLEGAL SYNTAX", False],
    ["File does not exist", "(\S+) DOES NOT EXIST", False],
    #  ["(\S+) NOT FOUND", False],                                 # See above
    ["Too many files", "TOO MANY FILES", False],
    #  OS/8 Handbook 1974 page 1-75/113 CCL Error Messages
    ["Bad Device", "BAD DEVICE", False],
    ["Bad Extension", "BAD EXTENSION", False],
    #"",  OS/8 Handbook 1974 page 1-106/144 PIP Error Messages
    ["ARE YOU SURE?", "ARE YOU SURE\\?", False],
    ["BAD DIRECTORY ON DEVICE", "BAD DIRECTORY ON DEVICE #\s?\d+", False],
    ["BAD SYSTEM HEAD", "BAD SYSTEM HEAD", False],
    ["CAN'T OPEN OUTPUT FILE", "CAN'T OPEN OUTPUT FILE", False],
    ["DEVICE NOT A DIRECTORY DEVICE", "DEVICE #\d+ NOT A DIRECTORY DEVICE", False],
    ["DIRECTORY ERROR", "DIRECTORY ERROR", False],
    ["ERROR DELETING FILE", "ERROR DELETING FILE", False],
    ["ILLEGIAL BINARY INPUT, FILE", "ILLEGIAL BINARY INPUT, FILE #\d+", False],
    ["INPUT ERROR", "INPUT ERROR, FILE #\s?\d+", False],
    ["IO ERROR--CONTINUING", "IO ERROR IN \\(file name\\) --CONTINUING", False],
    ["NO ROOM FOR OUTPUT FILE", "NO ROOM FOR OUTPUT FILE", False],
    ["NO ROOM--CONTINUING", "NO ROOM IN \\(file name\\) --CONTINUING", False],
    ["OUTPUT ERROR", "OUTPUT ERROR", False],
    ["PREMATURE END OF FILE", "PREMATURE END OF FILE, FILE #\s?\d+", False],
    ["ZERO SYS?", "ZERO SYS?", False],
    #"",  OS/8 Handbook 1974 page 2-81/244: DIRECT Error Messages
    ["BAD INPUT DIRECTORY", "BAD INPUT DIRECTORY", False],
    ["DEVICE DOES NOT HAVE A DIRECTORY", "DEVICE DOES NOT HAVE A DIRECTORY", False],
    ["ERROR CLOSING FILE", "ERROR CLOSING FILE", False],
    ["ERROR CLOSING FILE", "ERROR CLOSING FILE", False],
    ["ERROR READING INPUT DIRECTORY", "ERROR READING INPUT DIRECTORY", False],
    ["ILLEGAL *", "ILLEGAL \\*", False],
    #"",  OS/8 Handbook 1974 page: 2-109/272: FOTP Error Messages
    ["ERROR ON INPUT DEVICE, SKIPPING", "ERROR ON INPUT DEVICE, SKIPPING \\((\S+)\\)", False],
    ["ERROR ON OUTPUT DEVICE, SKIPPING", "ERROR ON OUTPUT DEVICE, SKIPPING \\((\S+)\\)", False],
    ["ERROR READING INPUT DIRECTORY", "ERROR READING INPUT DIRECTORY", False],
    ["ERROR READING OUTPUT DIRECTORY", "ERROR READING OUTPUT DIRECTORY", False],
    ["ILLEGAL ?", "ILLEGAL \\?", False],
    ["NO FILES OF THE FORM", "NO FILES OF THE FORM: (\S+)", False],
    ["NO ROOM, SKIPPING", "NO ROOM, SKIPPING \\((\S+)\\)", False],
    ["SYSTEM ERROR-CLOSING FILE", "SYSTEM ERROR-CLOSING FILE", False],
    ["USE PIP FOR NON-FILE STRUCTURED DEVICE", "USE PIP FOR NON-FILE STRUCTURED DEVICE", False],
    ["LINE TOO LONG IN FILE", "LINE TOO LONG IN FILE#\d+", False],
    ]
  

  # Pattern to match a SIMH command.  The command verb ends up in
  # match().group(1), and anything after the verb in group(3).
  _simh_comm_re = re.compile ("^\s*(\S+)(\s+(.*))?$")

  # Significant prefixes of SIMH command verbs that transition from SIMH
  # command context back into the simulation: BOOT, CONTINUE, and GO.
  # We need only the first letter in all cases, since these particular
  # commands are not ambiguous.  They're uppercase because the code that
  # uses this always uppercases the command before searching this list.
  _enters_os8_context = ["B", "C", "G"]


  #### ctor ############################################################
  # basedir is the parent of bin/{pdp8,pidp8i-sim}.
  #
  # Setting the skip_gpio flag to True causes us to use bin/pdp8 instead
  # of bin/pidp8i-sim, which runs the simulator with the PiDP-8/I GPIO 
  # thread disabled.  You might do this because you don't need any front
  # panel interaction or because you know you're not running on an RPi
  # in the first place.
  
  def __init__ (self, basedir, skip_gpio = False):
    # Start the simulator instance
    sim = 'pdp8' if skip_gpio else 'pidp8i-sim';
    self._child = pexpect.spawn (os.path.join (basedir, 'bin', sim))
    self._valid_pip_options = ["/A", "/B", "/I"]
    self._os8_file_re = re.compile("(\S+):(\S+)?")
    self._os8_error_match_strings = []
    self._os8_fatal_check = []
    self.verbose = False

    # We keep track of the command context and transition automatically.
    self._context = "simh"
    
    # Pre-compile our os8_replies regexps.
    self._os8_replies_rex = []
    for item in self._os8_replies:
      self._os8_replies_rex.append(re.compile(item[1].encode()))

    # Pre-compile our simh_replies regexps.
    self._simh_replies_rex = []
    for item in self._simh_replies:
      self._simh_replies_rex.append(re.compile(item[1].encode()))
        
    # Turn off pexpect's default inter-send() delay.  We add our own as
    # necessary.  The conditional tracks an API change between 3 and 4.
    pev4 = (pkg_resources.get_distribution("pexpect").parsed_version >
            pkg_resources.parse_version("4.0"))
    self._child.delaybeforesend = None if pev4 else 0

    # Wait for the simulator's startup message.
    if not self.try_wait ('PDP-8 simulator V.*git commit id: [0-9a-f]', 10):
      raise RuntimeError ('Simulator failed to start')


  #################### simh pexpect Abstraction layer ##################
  # Ideally these should be the public interface to pexpect
  # used with the simh object
  
  #### set_logfile #####################################################

  def set_logfile (self, lf):
    self._child.logfile = lf


  #### send_line #######################################################
  # Sends the given line "blind", without before or after checks

  def send_line (self, line):
    self._child.sendline (line)


  #### read_tail #######################################################
  # Watch for a literal string, then get what follows on that line.

  def read_tail (self, head, timeout = -1):
    self._child.expect_exact ([head], timeout)
    return self._child.readline ()


  #### spin ############################################################
  # Let child run without asking anything more from it, with an optional
  # timeout value.  If no value is given, lets child run indefinitely.

  def spin (self, timeout = None):
    self._child.expect (pexpect.EOF, timeout = timeout)


  #### child_expect ###################################################
  # Higher level accessor to _child.expect that catches the timeout.

  def child_expect(self, replies, timeout=10):
    try:
      reply = self._child.expect(replies, timeout=timeout)
    except pexpect.exceptions.TIMEOUT:
      print ("pexpect timeout")
      print("\tmatch before: {" + self._child.before.decode() + "}")
      return -1      
    return reply

  #### child_after #####################################################
  # Abstract accessor for the pexpect object's after contents.
  # Copes with the possibility that we timed out or hit end of file.

  def child_after (self):
    obj = self._child.after
    if obj == pexpect.TIMEOUT: return "TIMEOUT"
    elif obj == pexpect.EOF: return "End Of File"
    return self._child.after.decode()
      

  #### try_wait ########################################################
  # A wrapper around self._child.expect which catches exceptions and
  # returns false on pexpect timeout.  If you pass a list instead of a
  # string, it also returns true if the match wasn't for the first
  # element, so you can pass [success, failure1, failure2, etc.] to
  # check for a known-success case and one or more failure cases.

  def try_wait (self, matches, timeout = -1):
    try:
      return self._child.expect (matches, timeout = timeout) == 0
    except pexpect.exceptions.TIMEOUT:
      sys.stderr.write ("Exceeded " + str(timeout) + " sec timeout " +
          "waiting for " + str(matches) + "\n")
      return False
    except:
      sys.stderr.write ("Failed to match " + str(matches) + 
          ": unknown exception.\n")
      return False

  ######################## Utility Functions #############################
    
  #### test_result ######################################################
  # Given a result number, an expected result, and an array of result elements,
  # return True if the result matched expected value.
  # If caller is not the empty string, print a message about it.
  #
  # This routine does the repetitive work of testing results returned
  # from running a command, and optionally printing status.

  def test_result (self, reply, expected, results, caller, debug=False):
    # Cover case of utter failure.
    if reply == -1:
      if debug or caller != "":
        print (caller + ": failure.")
      return False
    reply_ID = results[reply][0]
    if debug:
      print ("test_result: Got reply: " + str(reply) + " -> "  + reply_ID)
      print("\tmatch before: {" + self._child.before.decode() + "}")
      print("\tmatch after: {" + self.child_after() + "}")
    if reply_ID.lower() == expected.lower():
      return True
    else:
      if caller != "":
        print (caller + ": Expected: \"" + expected + "\". Instead got: \"" + reply_ID + "\".")
      return False      

  
  ##################### Basic SIMH Interaction ###########################
  # Use these to work with simh under pexpect

  
  #### esc_to_simh  ######################################################
  # Unconditionally go to SIMH
  def esc_to_simh (self, debug=False):
    self._child.sendcontrol ('e')
    reply = self.child_expect (self._simh_replies_rex)  # Wait to get simh prompt
    self.simh_test_result(reply, "Prompt", "esc_to_simh")
    self._context = "simh"
    if debug:
      print("esc_to_simh: reply: " + str(reply) + " -> " + self._simh_replies[reply][0])
      print("\tmatch before: {" + self._child.before.decode() + "}")
      print("\tmatch after: {" + self.child_after() + "}")
    return reply


  #### simh_test_result #################################################
  # Convenience wrapper for test_result that uses SIMH replies.

  def simh_test_result (self, reply, expected, caller, debug=False):
    return self.test_result (reply, expected, self._simh_replies, caller)

  
  #### simh_send_line ########################################################
  # Send a line to simh while managing context.
  # If we are not in the simh context call esc_to_simh which will
  #    send ^e, and set the context to simh.
  # If we issue a command that enters os8 context, set context "os8".
  # It is up to the caller to then do an expect to confirm success.

  def simh_send_line (self, cmd, debug=False):
    if debug: print ("Context: " + self._context)
    if self._context != "simh":
      self.esc_to_simh()
    if debug: print ("simh_send_line: Sending: " + cmd)
    self._child.sendline (cmd)
    m = re.match (self._simh_comm_re, cmd)
    if m != None and m.group(1)[:1].upper() in self._enters_os8_context:
      self._context = "os8"

  

  #### simh_cmd ########################################################
  # If we are not in the simh context call esc_to_simh which will
  #    send ^e, and set the context to simh.
  # If we issue a command that enters os8 context, set context "os8".
  # Wait for a reply for error checking.
  # replies is an optional argument that defaults to _simh_replies_rex
  
  def simh_cmd (self, cmd, replies=None, debug=False):
    if replies == None: replies = self._simh_replies_rex
    self.simh_send_line (cmd, debug=debug)
    reply = self.child_expect(replies)
    if reply == -1:
      print ("simh_cmd: was sending:" + cmd)
    if debug:
      print("\tGot reply: " + str(reply) + " -> " + self._simh_replies[reply][0])
      print("\tmatch before: {" + self._child.before.decode() + "}")
      print("\tmatch after: {" + self.child_after() + "}")
    return reply


  #### quit ############################################################
  # Quits the simulator and waits for it to exit
  # By calling simh_send_line, we are careful to look to our context
  # and escape to SIMH if necessary.

  def quit (self):
    self.simh_send_line("quit")
  

  #### zero_core #######################################################
  # From SIMH context, zero the entire contents of core, which is
  # assumed to be 32 kWords.
  #
  # SIMH's PDP-8 simulator doesn't start with core zeroed, on purpose,
  # because the actual hardware did not do that.  SIMH does not attempt
  # to simulate the persistence of core memory by saving it to disk
  # between runs, but the SIMH developers are right to refuse to do this
  # by default: you cannot trust the prior state of a PDP-8's core
  # memory before initializing it yourself.
  #
  # See os8_zero_core () for a less heavy-handed alternative for use
  # when running under OS/8.

  def zero_core (self):
    reply = self.simh_cmd ('de all 0')
    self.simh_test_result(reply, "Prompt", "simh_zero_core")


  ################# PDP-8 SIMH Interaction #############################
  # High level interfaces to learn about and change device configurations
  # for the PDP-8 under SIMH.
  
  #### describe_dev_config #############################################
  # We provide an interface to alter SIMH device configurations for
  # specific parameters and specific devices
  #
  # dev configs supported:  rx, tti, tape
  #
  # rx:     RX8E, RX28   RX8E is the simh name for RX01 support.
  #                      RX28 is the simh name for RX02 support.
  # tti:    KSR, 7b      7b is full keyboard support.
  #                      KSR forces upcase of lower case keys on input.
  # tape:   td, dt       td is the TD8E DECtape device
  #                      dt is the TC08 DECtape device

  def describe_dev_config (self, name):
    if name == "tape":
      lines = self.do_simh_show("dt")
      dev_status = self.parse_show_tape_dev(lines)

      if dev_status == "dt": return "dt"
      else:
        lines = self.do_simh_show("td")
        return self.parse_show_tape_dev(lines)
    elif name == "rx":
      lines = self.do_simh_show("rx")
      return self.parse_show_rx_dev(lines)
    elif name == "tti":
      lines = self.do_simh_show("tti")
      return self.parse_show_tti(lines)
    else: return None

      
  #### do_simh_show  ###################################################
  # Calls show on the device name.
  # Returns array of lines from output.
  # This is focused on OS/8 devices.

  def do_simh_show (self, name):
    supported_shows = ["dt", "td", "tti", "rx"]
    if name not in supported_shows: return None
    
    ucname = name.upper()
    self.simh_send_line("show " + name)
    if self.child_expect(ucname + "\s+(.+)\r") == -1: return []
    lines = self.child_after().split ("\r")
    return lines


  #### parse_show_tape_dev  ############################################
  # Returns current state of DECtape support.
  # One of: disabled, td, dt, or None if parse fails.

  def parse_show_tape_dev (self, lines):
    if lines == None: return None
    is_enabled_re = re.compile("^(TD|DT)\s+(disabled|(devno=\S+,\s(\d)\s+units))$")
    m = re.match(is_enabled_re, lines[0])

    if m == None or m.group(1) == None or m.group(2) == None: return None
    if m.group(2) == "disabled": return "disabled"
    elif m.group(1) == "TD": return "td"
    elif m.group(1) == "DT": return "dt"
    else: return None


  #### parse_show_tape_attached  ########################################
  # Returns an ordered list of files attached or None if disabled.
  def parse_show_tape_attached (self, lines):
    if lines == None: return None
    if len(lines) < 2: return None
    attached = {}
    attachment_re = re.compile("^\s+(((DT|TD)(\d)(.+),\s+(not\s+attached|attached\s+to\s+(\S+)),(.+))|12b)$")
    for line in lines[1:]:
      m = re.match(attachment_re, line)
      if m == None or m.group(1) == None or m.group(1) == "12b": continue
      filename = m.group(7)
      if filename == None: filename = ""
      attached[m.group(4)] = filename
    return attached

  #### do_print_lines ###################################################
  # Debugging aid. Prints what we parsed out of child_after.
  def do_print_lines (self, lines):
    for line in lines:
      print(line)

  #### simh_configure routines #########################################
  # These routines affect the state of device configuration in SIMH.
  # They are intended as robust ways to toggle between incompatible
  # configurations of SIMH:
  # Choice of TD8E or TC08 DECtape (SIMH td and dt devices).
  # Choice of RX01 or RX02 Floppy emulation.
  # The SIMH rx device sets RX8E for RX01, and RX28 for RX02.  
  # Choice of KSR or 7bit console configuration.
  #
  # When re-configuring dt, dt, and rx devices, any attached
  # images are detached before reconfiguration is attempted.
  # (SIMH errors out if you don't detach them.)
  #
  # The check to see if the change is unnecessary.
  # For now they return None if no change necessary.
  #
  # After re-configuring the device, the SIMH show command is used
  # to confirm the re-configuration was successful.
  #
  # In future, we should add exception handling for no change necessary.
  # For now, return True if the change was successful and False if not.


  def set_tape_config (self, to_tape):
    if to_tape == "dt": from_tape = "td"
    elif to_tape == "td": from_tape = "dt"
    else:
      print("Cannot set_tape_config for " + to_tape)
      return False

    if self.verbose:
      print("Disable: " + from_tape + ", and enable: " + to_tape)
    
    lines = self.do_simh_show(from_tape)
    from_status = self.parse_show_tape_dev(lines)

    if from_status == None:
      print("do_tape_change: Trouble parsing \'show " + from_tape + \
          "\' output from simh. Giving up on:")
      self.do_print_lines (lines)
      return False

    if from_status == "disabled":
      print(from_tape + " already is disabled.")
    else:
      attached_from = self.parse_show_tape_attached(lines)
      if attached_from == None:
        print("do_tape_change: Trouble parsing \'show " + from_tape + \
            "\' output from simh. Giving up on:")
        self.do_print_lines (lines)
        return False
      else:
        for unit in attached_from.keys():
          if attached_from[unit] != "":
            det_comm = "det " + from_tape + unit
            if self.verbose:
              print(det_comm + "(Had: " + attached_from[unit] + ")")
            reply = self.simh_cmd(det_comm)
            self.simh_test_result(reply, "Prompt", "set_tape_config: " + det_com)
        reply = self.simh_cmd("set " + from_tape + " disabled")
        self.simh_test_result(reply, "Prompt", "set_tape_config disable " + from_tape)

    lines = self.do_simh_show(to_tape)
    to_status = self.parse_show_tape_dev(lines)

    if to_status == None:
      print("do_tape_change: Trouble parsing \'show " + to_tape + \
          "\' output from simh. Giving up on:")
      self.do_print_lines (lines)
      return False
    elif to_status != "disabled":
      print(to_tape + " already is enabled.")
    else:
      reply = self.simh_cmd("set " + to_tape + " enabled")
      self.simh_test_result(reply, "Prompt", "set_tape_config enable " + to_tape)

    # Test to confirm to_tape is now enabled.

    lines = self.do_simh_show(to_tape)
    to_status = self.parse_show_tape_dev(lines)

    if to_status == None:
      print("Failed enable of " + to_tape + \
          ". Parse fail on \'show " + to_tape + "\'. Got:")
      self.do_print_lines (lines)
      return False
    elif to_status == "disabled":
      print("Failed enable of " + to_tape + ". Device still disabled.")
      return False
    else: 
      return True


  #### parse_show_rx_dev ###############################################
  # Show the rx device configuration.

  def parse_show_rx_dev (self, lines):
    if lines == None: return None
    is_enabled_re = re.compile("^\s*(RX)\s+(disabled|((RX8E|RX28),\s+devno=\S+,\s+(\d)\s+units))$")
    m = re.match(is_enabled_re, lines[0])
    if m == None or m.group(2) == None: return None
    if m.group(2) == "disabled": return "disabled"
    return m.group(4)


   #### parse_show_rx_attached ##########################################
  # Returns an ordered list of files attached or None if disabled.

  def parse_show_rx_attached (self, lines):
    if len(lines) < 2: return None
    attached = {}
    attachment_re = re.compile("^\s+(((RX)(\d)(.+),\s+(not\s+attached|attached\s+to\s+(\S+)),(.+))|autosize)$")
    for line in lines[1:]:
      m = re.match(attachment_re, line)
      if m == None or m.group(1) == None or m.group(1) == "autosize": continue
      filename = m.group(7)
      if filename == None: filename = ""
      attached[m.group(4)] = filename
    return attached


  #### set_rx_config ####################################################
  
  def set_rx_config (self, to_rx):
    to_rx = to_rx.lower()
    if to_rx == "rx8e": from_rx = "rx28"
    elif to_rx == "rx01":
      to_rx = "rx8e"
      from_rx = "rx28"
    elif to_rx == "rx28": from_rx = "rx8e"
    elif to_rx == "rx02":
      to_rx = "rx28"
      from_rx = "rx8e"
    else:
      print("Cannot set_rx_config for " + to_rx)
      return False
      
    if self.verbose:
      print("Switch rx driver: " + from_rx + ", to: " + to_rx)
    lines = self.do_simh_show("rx")

    rx_type = self.parse_show_rx_dev (lines)
    if rx_type == None:
      print("do_rx_change: Trouble parsing \'show rx\' output from simh.  Giving up on:")
      self.do_print_lines(lines)
      return False
    elif rx_type == "disabled":
      if self.verbose: print("rx is disabled. Enabling...")
      reply = self.simh_cmd("set rx enabled")
      self.simh_test_result(reply, "Prompt", "set_rx_config enable  rx")
      # Retry getting rx info
      lines = self.do_simh_show("rx")
      rx_type = self.parse_show_rx_dev (lines)
      if rx_type == None:
        print("do_rx_change after re-enable: Trouble parsing \`show rx\` output from simh. Giving up on:")
        self.do_print_lines(lines)
        return False
      elif rx_type == "disabled":
        print("do_rx_change re-enable of rx failed. Giving up.")
        return False

    if rx_type.lower() == to_rx:
      print("rx device is already set to " + to_rx)
      return None
      
    attached_rx= self.parse_show_rx_attached(lines)
    if attached_rx == None:
      print("do_rx_change: Trouble parsing /'show rx\' from simh to find rx attachments. Got:")
      self.do_print_lines(lines)
    else:
      for unit in attached_rx.keys():
        if attached_rx[unit] != "":
          det_comm = "det rx" + unit
          if self.verbose:
            print(det_comm + "(Had: " + attached_rx[unit] + ")")
          reply = self.simh_cmd(det_comm)
          self.simh_test_result(reply, "Prompt", "set_rx_config detach: " + det_comm)

    reply = self.simh_cmd("set rx " + to_rx)
    self.simh_test_result(reply, "Prompt", "set_rx_config set rx " + to_rx)

    # Test to confirm new setting of RX
    lines = self.do_simh_show("rx")
    rx_type = self.parse_show_rx_dev (lines)

    if rx_type == None:
      print("Failed change of rx to " + to_rx + \
          ". Parse fail on \'show rx\'.")
      return False
    elif rx_type.lower() != to_rx: 
      print("Failed change of rx to " + to_rx + ". Instead got: " + \
          rx_type)
      return False
    return True


  #### get_tti ##################################################
  # Returns an ordered list of files attached or None if disabled.
  def parse_show_tti (self, lines):
    if lines == None: return None
    is_enabled_re = re.compile("^(KSR|7b)$")
    if len(lines) < 2: return None

    # That second line of output contains embedded newlines.
    m = re.match(is_enabled_re, lines[1].strip())
    if m == None or m.group(1) == None: return None
    return m.group(1)



  #### do_tti_change ###################################################

  def set_tti_config (self, to_tti):
    if to_tti == "KSR": from_tti = "7b"
    elif to_tti == "7b": from_tti = "KSR"
    else:
      print("Cannot set_tti_config to " + to_tti)
      return
    
    if self.verbose:
      print("Switch tti driver from: " + from_tti + ", to: " + to_tti)

    lines = self.do_simh_show("tti")
    tti_type = self.parse_show_tti (lines)
    if tti_type == None:
      print("do_tti_change: Trouble parsing \'show tti\' output from simh. Giving up on:")
      self.do_print_lines(lines)
      return False
    elif tti_type == to_tti:
      print("tti device is already set to " + to_tti)
      return None

    reply = self.simh_cmd("set tti " + to_tti)
    self.simh_test_result(reply, "Prompt", "set_tti_config setting tti " + to_tti)

    # Test to confirm new setting of tti
    lines = self.do_simh_show("tti")
    tti_type = self.parse_show_tti (lines)
    
    if tti_type == None:
      print("Failed change of tti to " + to_tti + \
          ". Parse fail on \'show tti\'.")
      return False
    elif tti_type != to_tti: 
      print("Failed change of tti to " + to_tti + ". Instead got: " + \
          tti_type)
      return False
    else:
      return True

  ############### Basic OS/8 Interaction ###############################
  # Intereact with OS/8 under SIMH.

  #### os8_kbd_delay ###################################################
  # Artificially delay the media generation process to account for the
  # fact that OS/8 lacks a modern multi-character keyboard input buffer.
  # It is unsafe to send text faster than a contemporary terminal could,
  # though we can scale it based on how much faster this host is than a
  # real PDP-8.  See the constants above for the calculation.

  def os8_kbd_delay (self):
    time.sleep (self._os8_kbd_delay)


  #### os8_send_str ########################################################
  # Core of os8_send_line.  Also used by code that needs to send text
  # "blind" to OS/8, without expecting a prompt and without a CR, as
  # when driving TECO.

  def os8_send_str (self, str):
    for i in range (0, len (str)):
      self.os8_kbd_delay ()
      self._child.send (str[i])


  #### os8_send_ctrl ###################################################
  # Send a control character to OS/8 corresponding to the ASCII letter
  # given.  We precede it with the OS/8 keyboard delay, since we're
  # probably following a call to os8_send_line or os8_cmd.

  def os8_send_ctrl (self, char):
    cc = char[0].lower ()
    self.os8_kbd_delay ()
    self._child.sendcontrol (cc)
    
    if cc == 'e': self._context = 'simh'


  #### os8_send_line ###################################################
  # Core of os8_cmd.  Also used by code that needs to send text
  # "blind" to OS/8, without expecting a prompt, as when driving EDIT.

  def os8_send_line (self, line):
    self.os8_send_str (line)
    self._child.send ("\r")


  #### os8_test_result ##################################################
  # Convenience wrapper for test_result that uses OS/8 replies.

  def os8_test_result (self, reply, expected, caller, debug=False):
    return self.test_result (reply, expected, self._os8_replies, caller, debug)


  #### mk_os8_name # ###################################################
  # Create an OS/8 filename: of the form XXXXXX.YY
  # From a POSIX path.

  def mk_os8_name(self, dev, path):
    bns = os.path.basename (path)
    bns = re.sub("-|:|\(|\)|!", "", bns)
    bns = bns.upper()
    if "." not in bns:
      return dev + bns[:min(6, len(bns))]
    else:
      dot = bns.index('.')
      return dev + bns[:min(6, dot, len(bns))] + "." + bns[dot+1: dot+3]


  #### os8_cmd_ ###################################################
  # Send the given command to OS/8.
  # replies is an array of possible replies that command will get.
  # Returns the index into the replies array that pexpect got,
  # or -1 if the command could not be run.
  # replies is an optional argument which defaults to _os8_replies_rex

  def os8_cmd (self, cmd, replies=None, debug=False, timeout=60):
    if replies == None: replies = self._os8_replies_rex
    if self._context != 'os8': 
      print("OS/8 is not running. Cannot execute: " + cmd)
      return -1
    self.os8_send_line (cmd)

    reply = self.child_expect (replies, timeout = timeout)
    return reply
    

  #### os8_cfm_monitor ##################################################
  # Confirm return to OS/8 monitor.
  # This function is necessary so that we know our command has returned to OS/8.
  # Without this test, the next child_expect hits a monitor prompt instead of
  # what it will be looking for.
  # If caller is not empty, it will emit an error string if the prompt wasn't seen.
  # Returns True if we are where we expect.

  def os8_cfm_monitor (self, caller, debug=False):
    reply = self.child_expect(self._os8_replies_rex)
    return self.os8_test_result(reply, "Monitor Prompt", caller, debug=debug)


  #### os8_ctrl_c ##################################################
  # Return to OS/8 monitor using the ^C given escape character.
  # We will probably, but not always get an echo of ^C.
  # For example BUILD is known NOT to.
  # Our testing makes it optional.
  # We listen for an uparow and assume the C follows, and listen again
  # for the monitor prompt.
  # Optional caller argument enables a message if the ^C escape failed.
  # Note: OS/8 will respond to this escape IMMEDIATELY,
  # even if it has pending output.
  # You will need to make sure all pending output is in
  # a known state and the running program is quiescent
  # before calling this method. Otherwise pexpect may get lost.

  def os8_ctrl_c (self, caller = "", debug=False):
    self.os8_send_ctrl ("c")
    reply = self.child_expect(self._os8_replies_rex)
    uparrow = self.os8_test_result(reply, "PIP Continue", "")
    if uparrow:
      reply = self.child_expect(self._os8_replies_rex)
    return self.os8_test_result(reply, "Monitor Prompt", caller)


  #### os8_escape ##################################################
  # Return to OS/8 monitor using the escape (^]) character.
  # We need to listen for the $ echo or else cfm_monitor gets confused.
  # Confirm we got our monitor prompt.
  # Optional caller argument enables a message if escape failed.
  # Note: OS/8 will respond to this escape IMMEDIATELY,
  # even if it has pending output.
  # You will need to make sure all pending output is in
  # a known state and the running program is quiescent
  # before calling this method. Otherwise pexpect may get lost.

  def os8_escape (self, caller = "", debug=False):
    self.os8_send_ctrl ('[')
    self.child_expect("\\$")
    return self.os8_cfm_monitor (caller)


  #### simh_restart_os8 #######################################################
  # Abstraction on returning to OS/8 monitor from within SIMH.
  # It is common practice to "load address 7600; start" at the console.
  
  def simh_restart_os8 (self, caller = "", debug=False):
    # Note we're calling simh with os8 replies because we will
    # be switching contexts.
    # simh_cmd manages the context, and checks for success
    # by expecting a command prompt.
    reply = self.simh_cmd ('go 7600', self._os8_replies_rex, debug=debug)
    # We test our reply with os8_test result because we're in OS/8 now.
    self.os8_test_result(reply, "Monitor Prompt", caller)

    
  #### simh_resume_os8 ##########################################################
  # Continue execution of OS/8.
  # This has been extremely tricky to get right.
  # What we must do is give OS/8 a chance to wake up.
  # We do this by asking pexpect to show us our "cont\n" echo and then
  # we do an os8_kbd_delay()
  # We carefully use simh_send_line to send the command so that we
  # know to set the os8 context.
  # WARNING! Using simh_resume_os8 without having booted OS/8
  # has undefined reslts.

  def simh_resume_os8 (self):
    self.simh_send_line ("cont")
    # Give OS/8 a chance to wake up.
    self.child_expect("cont\r")
    self.os8_kbd_delay()


  #### os8_zero_core ###################################################
  # Starting from OS/8 context, bounce out to SIMH context and zero all
  # of core excepting:
  #
  # 0. zero page - many apps put temporary data here
  # 1. the top pages of fields 1 & 2 - OS/8 is resident here
  # 2. the top page of field 2 - OS/8's TD8E driver (if any) lives here
  #
  # We then restart OS/8, which means we absolutely need to do #1 and
  # may need to do #2.  We could probably get away with zeroing page 0.
  #
  # All of the above explains why we have this special OS/8 alternative
  # to the zero_core() method.

  def os8_zero_core (self):
    reply = self.simh_cmd ('de 00200-07577 0')
    self.simh_test_result(reply, "Prompt", "os8_zero_core 00200-07577")
    reply = self.simh_cmd ('de 10000-17577 0')
    self.simh_test_result(reply, "Prompt", "os8_zero_core de 10000-17577 0")
    reply = self.simh_cmd ('de 20000-27577 0')
    self.simh_test_result(reply, "Prompt", "os8_zero_core de 20000-27577 0")
    reply = self.simh_cmd ('de 30000-77777 0')
    self.simh_test_result(reply, "Prompt", "os8_zero_core de 30000-37577 0")
    self.simh_restart_os8 ()


  #### os8_squish ########################################################
  # Wraps the OS/8 SQUISH command for a given device.

  def os8_squish (self, device, caller = ""):
    reply = self.os8_cmd ("SQUISH " + device + ":")
    self.os8_test_result (reply, "ARE YOU SURE?", caller)
    reply = self.os8_cmd ("Y")
    self.os8_test_result (reply, "Monitor Prompt", caller)


  #### os8_pip_to ###################################################
  # Send a copy of a local file to OS/8 using PIP.
  #
  # The file is sent via the SIMH paper tape device through PIP
  # specifying a transfer option.  If no option is specified,
  # ASCII is assumed.
  #
  # In ASCII mode, we pre-process with txt2ptp which translates
  # POSIX ASCII conventions to OS/8 conventions.  In all other
  # modes, we do not do any translation.
  #
  # However, we should supply a sacrificial NULL as an additional character
  # because the OS/8 PTR driver throws the last character away. (NOT DONE YET)
  #
  # Entry context should be inside OS/8.  Exit context is inside OS/8.
  #

  def os8_pip_to (self, path, os8name, option = None, debug=False):
    if option == None: option = ""
    # If os8name is just a device, synthesize an upcased name from
    # the POSIX file basename.
    if not os.path.exists(path):
      print(path + " not found. Skipping.")
      return
    m = re.match(self._os8_file_re, os8name)
    if m != None and (m.group(2) == None or m.group(2) == ""):
        dest = self.mk_os8_name(os8name, path)
    else:
        dest = os8name

    # Gross hack:
    # The command decoder prompt that comes when we continue a PTR/PTP
    # does not have a newline.  But everything else that matches on
    # a command decoder prompt NEEDS that newline. Otherwise we match
    # on asterisks in file specs.
    # Remember to decrement replies on this rex list if we use
    # _os8_replies

    pip_rex = [re.compile("\\*$".encode())]
    pip_rex.extend(self._os8_replies_rex)
    
    did_conversion = False
    if option == "" or option == "/A":
      # Convert text file to SIMH paper tape format in current dir of path.
      if self.verbose: print("Format converting " + path)
      bdir = pidp8i.dirs.build
      # Create uniquified temp path name.
      pt   = path + "-" + str(os.getpid()) + ".pt_temp"
      tool = os.path.join (bdir, 'bin', 'txt2ptp')
      subprocess.call (tool + ' < ' + path + ' > ' + pt, shell = True)
      did_conversion = True
    elif option not in self._valid_pip_options:
      print("Invalid PIP option: " + option + ". Ignoring: " + path + \
          " to OS/8.")
      return
    else:
      pt = path

    # TODO: Sacrificial extra character code goes here.

    # Paper tape created, so attach it read-only and copy it in.  We're
    # relying on txt2ptp to insert the Ctrl-Z EOF marker at the end of
    # the file.
    reply = self.simh_cmd ('attach -r ptr ' + pt, debug=debug)
    self.simh_test_result(reply, "Prompt", "os8_pip_to attaching ptr")
    # Enter OS/8.
    self.simh_restart_os8 (caller="os8_pip_to")
    
    reply = self.os8_cmd ('R PIP', debug=debug)
    # Has PIP Startup been successful?
    if self.os8_test_result (reply, "Command Decoder Prompt", "os8_pip_to 0", \
                             debug=debug) == False: return

    # Give file and test for ready to send.
    if debug: print ("os8_pip_to: Sending: " + dest + '<PTR:' + option)
    reply = self.os8_cmd (dest + '<PTR:' + option, debug=debug)
    if self.os8_test_result (reply, "PIP Continue", "os8_pip_to 1", debug=debug) == False:
      # If we got "PIP Continue" we keep going. Otherwise return.
      # Send escape if reply indicates we didn't go back to the monitor.
      if self._os8_replies[reply][2] == False:
        self.os8_send_ctrl ('[')
      return

    # Finish transfer
    # The test for success is implicit.
    self.os8_send_ctrl ('[')
    reply = self.child_expect (pip_rex)
    if reply != 0:
      self.os8_test_result (reply - 1, "Command Decoder Prompt", "os8_pip_to 2", debug=debug)
    # If we did not return to monitor hit escape to exit PIP.
    if self._os8_replies[reply - 1][2] == False:
        self.os8_send_ctrl ('[')      # Success or Non-fatal error.
        # Must exit PIP and wait for monitor prompt. Complain if we don't get it!
        reply = self.child_expect (self._os8_replies_rex)
        self.os8_test_result (reply, "Monitor Prompt", "os8_pip_to 3")
    # detach ptr
    # self.esc_to_simh ()
    # self.simh_cmd ('detach ptr ')
    # self.os8_restart ()
    
    # Remove the temp file if we created one.
    if did_conversion:
      os.remove (pt)


  #### os8_pip_from ###################################################
  # Fetch a file from OS/8 to a local path using PIP.
  #
  # The OS/8 source filename is synthesized from the basename of the path,
  # upcasing if necessary.
  #
  # The file is sent via the SIMH paper tape device through PIP
  # specifying a transfer option.  If no option is specified,
  # ASCII is assumed.
  #
  # In ASCII mode, we post-process with ptp2txt which translates
  # POSIX ASCII conventions to OS/8 conventions.  In all other
  # modes, we do not do any translation.
  #
  # Entry context should be inside OS/8.  Exit context is inside OS/8.

  def os8_pip_from (self, os8name, path, option = None, debug=False):
    if option == None: option = ""
    # If path is not a file, use the name portion of os8name.
    if os.path.isdir(path):
      colon = os8name.find(':')
      if colon == -1:                # No dev, just a name.
        path = path + "/" + os8name
      else:
        path = path + "/" + os8name[colon+1:]

    if option != "" and option not in self._valid_pip_options:
      print("Invalid PIP option: " + option + \
          ". Ignoring os8_pip_from on: " + path)
      return

    reply = self.simh_cmd ('attach ptp ' + path, debug=debug)
    self.simh_test_result(reply, "Prompt", "os8_pip_from attachng ptp")
    # Enter OS/8.
    self.simh_restart_os8 (caller="os8_pip_from 0")
    
    reply = self.os8_cmd ('R PIP', debug=debug)
    # Has PIP Startup been successful?
    if self.os8_test_result (reply, "Command Decoder Prompt", "os8_pip_from 0") == False: return
    
    # Issue file transfer spec.
    reply = self.os8_cmd ('PTP:<' + os8name + option, debug=debug)
    # Test for Success
    if self.os8_test_result (reply, "Command Decoder Prompt", "os8_pip_from 1") == False:
      # There is an empty PTP file we need to remove.
      os.remove(path)
      # Was this a fatal error?
      if self._os8_replies[reply][2] == False:
        self.os8_send_ctrl ('[')      # Non-fatal error. Must exit PIP
      return

    self.os8_send_ctrl ('[')      # exit PIP
    # Must wait for monitor prompt. Complain if we don't get it!
    reply = self.child_expect (self._os8_replies_rex)
    self.os8_test_result (reply, "Monitor Prompt", "os8_pip_from 2")

    reply = self.simh_cmd ('detach ptp', debug=debug)  # Clean flush of buffers.
    self.simh_test_result(reply, "Prompt", "os8_pip_from detaching ptp")
    # Enter OS/8.
    self.simh_restart_os8(caller="os8_pip_from 3")

    if option == "" or option == "/A":
      if self.verbose: print("Format converting " + path)
      # Convert text file to SIMH paper tape format
      bdir = pidp8i.dirs.build
      # Create uniquified temp path name.
      pf = path + "-" + str(os.getpid()) + ".pf_temp"
      os.rename(path, pf)
      tool = os.path.join (bdir, 'bin', 'ptp2txt')
      subprocess.call (tool + ' < ' + pf + ' > ' + path, shell = True)
      os.remove(pf)