#!/usr/bin/env @PYCMD@
# -*- coding: utf-8 -*-
########################################################################
# simh/__init__.py - A wrapper class around pexpect for communicating
# with an instance of the PiDP-8/I SIMH simulator running OS/8.
#
# See ../doc/class-simh.md for a usage tutorial.
#
# Copyright © 2017 by Jonathan Trites, © 2017-2020 by William Cattey
# and Warren Young.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS LISTED ABOVE BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT
# OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE
# OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
# Except as contained in this notice, the names of the authors above
# shall not be used in advertising or otherwise to promote the sale,
# use or other dealings in this Software without prior written
# authorization from those authors.
########################################################################
import os
import pexpect
import pkg_resources
import subprocess
import tempfile
import time
import re
import sys
import pidp8i
#### simh #############################################################
# Object to manage a running SIMH process programmatically with pexpect.
# Functions in this file are organized from lower level to higher
# level where possible.
#### Private functions ################################################
#######################################################################
# simh class
class simh:
#### constants ######################################################
# pexpect object instance, set by ctor
_child = None
# Constant used by os8_kbd_delay, assembled in stages:
#
# 1. PDP-8 RS-232 bits per character: 7-bit ASCII plus necessary
# start, stop, and parity bits.
#
# 2. The ratio of the instructions per second ratios of a PDP-8/I to
# that of the host hardware running the simulator. The former is
# an approximate value; see lib/pidp8i/ips.py.in for the value and
# its defense. The latter is either the IPS rate for:
#
# a) a Raspberry Pi Model B+, that being the slowest host system
# we run this simulator on; or
#
# b) the IPS rate of the actual host hardware if you have run the
# "bin/teco-pi-demo -b" benchmark on it.
#
# 2. The fact that real PDP-8s ran OS/8 reliably at 300 bps, and have
# been claimed to get flaky as early as 600 bps by some. (Others
# claim to have run them up to 9,600 bps.)
#
# 3. The "safe BPS" value is the fastest bit per second speed actual
# PDP-8 hardware was known to run OS/8 terminal I/O at. In this
# case, it is the high-speed tape reader.
#
# TODO: We may be able to increase this.
#
# We have one report that OS/8 was tested with terminals up to
# about ~600 bps before becoming unreliable.
#
# We have another report that OS/8 could run under ETOS with
# 9,600 bps terminals, but we don't know if that tells us anything
# about OS/8 running without the ETOS multitasking hardware.
#
# 4. Given above, calculate safe characters per second for host HW.
#
# 5. Invert to get seconds per character, that being the delay value.
_bpc = 7 + 1 + 1 + 1 # [1]
_ips_ratio = float (pidp8i.ips.current) / pidp8i.ips.pdp8i # [2]
_pdp8i_safe_bps = 300 # [3]
_host_safe_cps = _pdp8i_safe_bps * _ips_ratio / _bpc # [4]
_os8_kbd_delay = 1 / _host_safe_cps # [5]
#### _simh_replies #################################################
# Array of items that describe replies one might get from SIMH.
# Each item is a 2 element array consisting of:
# [0] A help string that describes the reply.
# Match against this string when testing error values.
# This value may not be unique. When not unique
# the multiple values should represent the same error.
#
# [1] A regular expression for pexpect to use to match on it.
# It is a good idea to terminate status messages with "\r"
# in order to confirm we've got the whole line,
# and prompts with "$" to terminate the search.
#
# For speed and efficiency an array of compiled
# regular expressions, indexed to _simh_replies
# is created by __init__: simh_replies_rex
_simh_replies = [
# Prompts:
["Prompt", "sim> $"],
# Guard against case we used the simh table when we meant the OS/8 table.
["Monitor Prompt", "\n\\.$"],
["Non-existent device", "Non-existent device"],
]
####_os8_replies ##################################################
# Simiar to _simh_replies, but describing replies from OS/8
# when it is running under SIMH.
# Each item is a 3 element array consisting of:
# [0] A help string that describes the reply.
# Match against this string when testing error values.
# This value may not be unique. When not unique
# the multiple values should represent the same error.
#
# [1] A regular expression for pexpect to use to match on it.
# It is a good idea to terminate status messages with "\r"
# in order to confirm we've got the whole line,
# and prompts with "$" to terminate the search.
#
# For speed and efficiency an array of compiled
# regular expressions, indexed to _simh_replies
# is created by __init__: os8_replies_rex
#
# [2] A True/False indicator if the command returned to the
# keyboard monitor, rather than continuing in the program.
_os8_replies = [
# Prompts:
["Monitor Prompt", "\n\\.$", True],
["Command Decoder Prompt", "\n\\*$", False],
["PIP Continue", "\\^$", True], # Newline NOT always present!
# OS/8 Handbook 1974 page 1-43/81 Keyboard Monitor Error Messages:
["Directory I/O Error", "MONITOR ERROR 2 AT \d+ \\(DIRECTORY I/O ERROR\\)", True],
["I/O Error on SYS", "MONITOR ERROR 5 AT \d+ \\(I/O ERROR ON SYS\\)", True],
["Directory I/O Error", "MONITOR ERROR 6 AT \d+ \\(DIRECTORY I/O ERROR\\)\r", True],
["Device not available", "(\S+) NOT AVAILABLE", False],
["File not found", "(\S+) NOT FOUND", False],
# OS/8 Handbook 1974 page 1-51/89 Command Decoder Error Messages
["Illegal Syntax", "ILLEGAL SYNTAX", False],
["File does not exist", "(\S+) DOES NOT EXIST", False],
# ["(\S+) NOT FOUND", False], # See above
["Too many files", "TOO MANY FILES", False],
# OS/8 Handbook 1974 page 1-75/113 CCL Error Messages
["Bad Device", "BAD DEVICE", False],
["Bad Extension", "BAD EXTENSION", False],
#"", OS/8 Handbook 1974 page 1-106/144 PIP Error Messages
["ARE YOU SURE?", "ARE YOU SURE\\?", False],
["BAD DIRECTORY ON DEVICE", "BAD DIRECTORY ON DEVICE #\s?\d+", False],
["BAD SYSTEM HEAD", "BAD SYSTEM HEAD", False],
["CAN'T OPEN OUTPUT FILE", "CAN'T OPEN OUTPUT FILE", False],
["DEVICE NOT A DIRECTORY DEVICE", "DEVICE #\d+ NOT A DIRECTORY DEVICE", False],
["DIRECTORY ERROR", "DIRECTORY ERROR", False],
["ERROR DELETING FILE", "ERROR DELETING FILE", False],
["ILLEGIAL BINARY INPUT, FILE", "ILLEGIAL BINARY INPUT, FILE #\d+", False],
["INPUT ERROR", "INPUT ERROR, FILE #\s?\d+", False],
["IO ERROR--CONTINUING", "IO ERROR IN \\(file name\\) --CONTINUING", False],
["NO ROOM FOR OUTPUT FILE", "NO ROOM FOR OUTPUT FILE", False],
["NO ROOM--CONTINUING", "NO ROOM IN \\(file name\\) --CONTINUING", False],
["OUTPUT ERROR", "OUTPUT ERROR", False],
["PREMATURE END OF FILE", "PREMATURE END OF FILE, FILE #\s?\d+", False],
["ZERO SYS?", "ZERO SYS?", False],
#"", OS/8 Handbook 1974 page 2-81/244: DIRECT Error Messages
["BAD INPUT DIRECTORY", "BAD INPUT DIRECTORY", False],
["DEVICE DOES NOT HAVE A DIRECTORY", "DEVICE DOES NOT HAVE A DIRECTORY", False],
["ERROR CLOSING FILE", "ERROR CLOSING FILE", False],
["ERROR CLOSING FILE", "ERROR CLOSING FILE", False],
["ERROR READING INPUT DIRECTORY", "ERROR READING INPUT DIRECTORY", False],
["ILLEGAL *", "ILLEGAL \\*", False],
#"", OS/8 Handbook 1974 page: 2-109/272: FOTP Error Messages
["ERROR ON INPUT DEVICE, SKIPPING", "ERROR ON INPUT DEVICE, SKIPPING \\((\S+)\\)", False],
["ERROR ON OUTPUT DEVICE, SKIPPING", "ERROR ON OUTPUT DEVICE, SKIPPING \\((\S+)\\)", False],
["ERROR READING INPUT DIRECTORY", "ERROR READING INPUT DIRECTORY", False],
["ERROR READING OUTPUT DIRECTORY", "ERROR READING OUTPUT DIRECTORY", False],
["ILLEGAL ?", "ILLEGAL \\?", False],
["NO FILES OF THE FORM", "NO FILES OF THE FORM: (\S+)", False],
["NO ROOM, SKIPPING", "NO ROOM, SKIPPING \\((\S+)\\)", False],
["SYSTEM ERROR-CLOSING FILE", "SYSTEM ERROR-CLOSING FILE", False],
["USE PIP FOR NON-FILE STRUCTURED DEVICE", "USE PIP FOR NON-FILE STRUCTURED DEVICE", False],
["LINE TOO LONG IN FILE", "LINE TOO LONG IN FILE#\d+", False],
]
# Pattern to match a SIMH command. The command verb ends up in
# match().group(1), and anything after the verb in group(3).
_simh_comm_re = re.compile ("^\s*(\S+)(\s+(.*))?$")
# Significant prefixes of SIMH command verbs that transition from SIMH
# command context back into the simulation: BOOT, CONTINUE, and GO.
# We need only the first letter in all cases, since these particular
# commands are not ambiguous. They're uppercase because the code that
# uses this always uppercases the command before searching this list.
_enters_os8_context = ["B", "C", "G"]
#### ctor ############################################################
# basedir is the parent of bin/{pdp8,pidp8i-sim}.
#
# Setting the skip_gpio flag to True causes us to use bin/pdp8 instead
# of bin/pidp8i-sim, which runs the simulator with the PiDP-8/I GPIO
# thread disabled. You might do this because you don't need any front
# panel interaction or because you know you're not running on an RPi
# in the first place.
def __init__ (self, basedir, skip_gpio = False):
# Start the simulator instance
sim = 'pdp8' if skip_gpio else 'pidp8i-sim';
self._child = pexpect.spawn (os.path.join (basedir, 'bin', sim))
self._valid_pip_options = ["/A", "/B", "/I"]
self._os8_file_re = re.compile("(\S+):(\S+)?")
self._os8_error_match_strings = []
self._os8_fatal_check = []
self.verbose = False
# We keep track of the command context and transition automatically.
self._context = "simh"
# Pre-compile our os8_replies regexps.
self._os8_replies_rex = []
for item in self._os8_replies:
self._os8_replies_rex.append(re.compile(item[1].encode()))
# Pre-compile our simh_replies regexps.
self._simh_replies_rex = []
for item in self._simh_replies:
self._simh_replies_rex.append(re.compile(item[1].encode()))
# Turn off pexpect's default inter-send() delay. We add our own as
# necessary. The conditional tracks an API change between 3 and 4.
pev4 = (pkg_resources.get_distribution("pexpect").parsed_version >
pkg_resources.parse_version("4.0"))
self._child.delaybeforesend = None if pev4 else 0
# Wait for the simulator's startup message.
if not self.try_wait ('PDP-8 simulator Open SIMH V.*git commit id: [0-9a-f]', 10):
raise RuntimeError ('Simulator failed to start')
#################### simh pexpect Abstraction layer ##################
# Ideally these should be the public interface to pexpect
# used with the simh object
#### set_logfile #####################################################
def set_logfile (self, lf):
self._child.logfile = lf
#### send_line #######################################################
# Sends the given line "blind", without before or after checks
def send_line (self, line):
self._child.sendline (line)
#### read_tail #######################################################
# Watch for a literal string, then get what follows on that line.
def read_tail (self, head, timeout = -1):
self._child.expect_exact ([head], timeout)
return self._child.readline ()
#### spin ############################################################
# Let child run without asking anything more from it, with an optional
# timeout value. If no value is given, lets child run indefinitely.
def spin (self, timeout = None):
self._child.expect (pexpect.EOF, timeout = timeout)
#### child_expect ###################################################
# Higher level accessor to _child.expect that catches the timeout.
def child_expect(self, replies, timeout=10):
try:
reply = self._child.expect(replies, timeout=timeout)
except pexpect.exceptions.TIMEOUT:
print ("pexpect timeout")
print("\tmatch before: {" + self._child.before.decode() + "}")
return -1
return reply
#### child_after #####################################################
# Abstract accessor for the pexpect object's after contents.
# Copes with the possibility that we timed out or hit end of file.
def child_after (self):
obj = self._child.after
if obj == pexpect.TIMEOUT: return "TIMEOUT"
elif obj == pexpect.EOF: return "End Of File"
return self._child.after.decode()
#### try_wait ########################################################
# A wrapper around self._child.expect which catches exceptions and
# returns false on pexpect timeout. If you pass a list instead of a
# string, it also returns true if the match wasn't for the first
# element, so you can pass [success, failure1, failure2, etc.] to
# check for a known-success case and one or more failure cases.
def try_wait (self, matches, timeout = -1):
try:
return self._child.expect (matches, timeout = timeout) == 0
except pexpect.exceptions.TIMEOUT:
sys.stderr.write ("Exceeded " + str(timeout) + " sec timeout " +
"waiting for " + str(matches) + "\n")
return False
except:
sys.stderr.write ("Failed to match " + str(matches) +
": unknown exception.\n")
return False
######################## Utility Functions #############################
#### test_result ######################################################
# Given a result number, an expected result, and an array of result elements,
# return True if the result matched expected value.
# If caller is not the empty string, print a message about it.
#
# This routine does the repetitive work of testing results returned
# from running a command, and optionally printing status.
def test_result (self, reply, expected, results, caller, debug=False):
# Cover case of utter failure.
if reply == -1:
if debug or caller != "":
print (caller + ": failure.")
return False
reply_ID = results[reply][0]
if debug:
print ("test_result: Got reply: " + str(reply) + " -> " + reply_ID)
print("\tmatch before: {" + self._child.before.decode() + "}")
print("\tmatch after: {" + self.child_after() + "}")
if reply_ID.lower() == expected.lower():
return True
else:
if caller != "":
print (caller + ": Expected: \"" + expected + "\". Instead got: \"" + reply_ID + "\".")
return False
##################### Basic SIMH Interaction ###########################
# Use these to work with simh under pexpect
#### esc_to_simh ######################################################
# Unconditionally go to SIMH
def esc_to_simh (self, debug=False):
self._child.sendcontrol ('e')
reply = self.child_expect (self._simh_replies_rex) # Wait to get simh prompt
self.simh_test_result(reply, "Prompt", "esc_to_simh")
self._context = "simh"
if debug:
print("esc_to_simh: reply: " + str(reply) + " -> " + self._simh_replies[reply][0])
print("\tmatch before: {" + self._child.before.decode() + "}")
print("\tmatch after: {" + self.child_after() + "}")
return reply
#### simh_test_result #################################################
# Convenience wrapper for test_result that uses SIMH replies.
def simh_test_result (self, reply, expected, caller, debug=False):
return self.test_result (reply, expected, self._simh_replies, caller)
#### simh_send_line ########################################################
# Send a line to simh while managing context.
# If we are not in the simh context call esc_to_simh which will
# send ^e, and set the context to simh.
# If we issue a command that enters os8 context, set context "os8".
# It is up to the caller to then do an expect to confirm success.
def simh_send_line (self, cmd, debug=False):
if debug: print ("Context: " + self._context)
if self._context != "simh":
self.esc_to_simh()
if debug: print ("simh_send_line: Sending: " + cmd)
self._child.sendline (cmd)
m = re.match (self._simh_comm_re, cmd)
if m != None and m.group(1)[:1].upper() in self._enters_os8_context:
self._context = "os8"
#### simh_cmd ########################################################
# If we are not in the simh context call esc_to_simh which will
# send ^e, and set the context to simh.
# If we issue a command that enters os8 context, set context "os8".
# Wait for a reply for error checking.
# replies is an optional argument that defaults to _simh_replies_rex
def simh_cmd (self, cmd, replies=None, debug=False):
if replies == None: replies = self._simh_replies_rex
self.simh_send_line (cmd, debug=debug)
reply = self.child_expect(replies)
if reply == -1:
print ("simh_cmd: was sending:" + cmd)
if debug:
print("\tGot reply: " + str(reply) + " -> " + self._simh_replies[reply][0])
print("\tmatch before: {" + self._child.before.decode() + "}")
print("\tmatch after: {" + self.child_after() + "}")
return reply
#### quit ############################################################
# Quits the simulator and waits for it to exit
# By calling simh_send_line, we are careful to look to our context
# and escape to SIMH if necessary.
def quit (self):
self.simh_send_line("quit")
#### zero_core #######################################################
# From SIMH context, zero the entire contents of core, which is
# assumed to be 32 kWords.
#
# SIMH's PDP-8 simulator doesn't start with core zeroed, on purpose,
# because the actual hardware did not do that. SIMH does not attempt
# to simulate the persistence of core memory by saving it to disk
# between runs, but the SIMH developers are right to refuse to do this
# by default: you cannot trust the prior state of a PDP-8's core
# memory before initializing it yourself.
#
# See os8_zero_core () for a less heavy-handed alternative for use
# when running under OS/8.
def zero_core (self):
reply = self.simh_cmd ('de all 0')
self.simh_test_result(reply, "Prompt", "simh_zero_core")
################# PDP-8 SIMH Interaction #############################
# High level interfaces to learn about and change device configurations
# for the PDP-8 under SIMH.
#### describe_dev_config #############################################
# We provide an interface to alter SIMH device configurations for
# specific parameters and specific devices
#
# dev configs supported: rx, tti, tape
#
# rx: RX8E, RX28 RX8E is the simh name for RX01 support.
# RX28 is the simh name for RX02 support.
# tti: KSR, 7b 7b is full keyboard support.
# KSR forces upcase of lower case keys on input.
# tape: td, dt td is the TD8E DECtape device
# dt is the TC08 DECtape device
def describe_dev_config (self, name):
if name == "tape":
lines = self.do_simh_show("dt")
dev_status = self.parse_show_tape_dev(lines)
if dev_status == "dt": return "dt"
else:
lines = self.do_simh_show("td")
return self.parse_show_tape_dev(lines)
elif name == "rx":
lines = self.do_simh_show("rx")
return self.parse_show_rx_dev(lines)
elif name == "tti":
lines = self.do_simh_show("tti")
return self.parse_show_tti(lines)
else: return None
#### do_simh_show ###################################################
# Calls show on the device name.
# Returns array of lines from output.
# This is focused on OS/8 devices.
def do_simh_show (self, name):
supported_shows = ["dt", "td", "tti", "rx"]
if name not in supported_shows: return None
ucname = name.upper()
self.simh_send_line("show " + name)
if self.child_expect(ucname + "\s+(.+)\r") == -1: return []
lines = self.child_after().split ("\r")
return lines
#### parse_show_tape_dev ############################################
# Returns current state of DECtape support.
# One of: disabled, td, dt, or None if parse fails.
def parse_show_tape_dev (self, lines):
if lines == None: return None
is_enabled_re = re.compile("^(TD|DT)\s+(disabled|(devno=\S+,\s(\d)\s+units))$")
m = re.match(is_enabled_re, lines[0])
if m == None or m.group(1) == None or m.group(2) == None: return None
if m.group(2) == "disabled": return "disabled"
elif m.group(1) == "TD": return "td"
elif m.group(1) == "DT": return "dt"
else: return None
#### parse_show_tape_attached ########################################
# Returns an ordered list of files attached or None if disabled.
def parse_show_tape_attached (self, lines):
if lines == None: return None
if len(lines) < 2: return None
attached = {}
attachment_re = re.compile("^\s+(((DT|TD)(\d)(.+),\s+(not\s+attached|attached\s+to\s+(\S+)),(.+))|12b)$")
for line in lines[1:]:
m = re.match(attachment_re, line)
if m == None or m.group(1) == None or m.group(1) == "12b": continue
filename = m.group(7)
if filename == None: filename = ""
attached[m.group(4)] = filename
return attached
#### do_print_lines ###################################################
# Debugging aid. Prints what we parsed out of child_after.
def do_print_lines (self, lines):
for line in lines:
print(line)
#### simh_configure routines #########################################
# These routines affect the state of device configuration in SIMH.
# They are intended as robust ways to toggle between incompatible
# configurations of SIMH:
# Choice of TD8E or TC08 DECtape (SIMH td and dt devices).
# Choice of RX01 or RX02 Floppy emulation.
# The SIMH rx device sets RX8E for RX01, and RX28 for RX02.
# Choice of KSR or 7bit console configuration.
#
# When re-configuring dt, dt, and rx devices, any attached
# images are detached before reconfiguration is attempted.
# (SIMH errors out if you don't detach them.)
#
# The check to see if the change is unnecessary.
# For now they return None if no change necessary.
#
# After re-configuring the device, the SIMH show command is used
# to confirm the re-configuration was successful.
#
# In future, we should add exception handling for no change necessary.
# For now, return True if the change was successful and False if not.
def set_tape_config (self, to_tape):
if to_tape == "dt": from_tape = "td"
elif to_tape == "td": from_tape = "dt"
else:
print("Cannot set_tape_config for " + to_tape)
return False
if self.verbose:
print("Disable: " + from_tape + ", and enable: " + to_tape)
lines = self.do_simh_show(from_tape)
from_status = self.parse_show_tape_dev(lines)
if from_status == None:
print("do_tape_change: Trouble parsing \'show " + from_tape + \
"\' output from simh. Giving up on:")
self.do_print_lines (lines)
return False
if from_status == "disabled":
print(from_tape + " already is disabled.")
else:
attached_from = self.parse_show_tape_attached(lines)
if attached_from == None:
print("do_tape_change: Trouble parsing \'show " + from_tape + \
"\' output from simh. Giving up on:")
self.do_print_lines (lines)
return False
else:
for unit in attached_from.keys():
if attached_from[unit] != "":
det_comm = "det " + from_tape + unit
if self.verbose:
print(det_comm + "(Had: " + attached_from[unit] + ")")
reply = self.simh_cmd(det_comm)
self.simh_test_result(reply, "Prompt", "set_tape_config: " + det_com)
reply = self.simh_cmd("set " + from_tape + " disabled")
self.simh_test_result(reply, "Prompt", "set_tape_config disable " + from_tape)
lines = self.do_simh_show(to_tape)
to_status = self.parse_show_tape_dev(lines)
if to_status == None:
print("do_tape_change: Trouble parsing \'show " + to_tape + \
"\' output from simh. Giving up on:")
self.do_print_lines (lines)
return False
elif to_status != "disabled":
print(to_tape + " already is enabled.")
else:
reply = self.simh_cmd("set " + to_tape + " enabled")
self.simh_test_result(reply, "Prompt", "set_tape_config enable " + to_tape)
# Test to confirm to_tape is now enabled.
lines = self.do_simh_show(to_tape)
to_status = self.parse_show_tape_dev(lines)
if to_status == None:
print("Failed enable of " + to_tape + \
". Parse fail on \'show " + to_tape + "\'. Got:")
self.do_print_lines (lines)
return False
elif to_status == "disabled":
print("Failed enable of " + to_tape + ". Device still disabled.")
return False
else:
return True
#### parse_show_rx_dev ###############################################
# Show the rx device configuration.
def parse_show_rx_dev (self, lines):
if lines == None: return None
is_enabled_re = re.compile("^\s*(RX)\s+(disabled|((RX8E|RX28),\s+devno=\S+,\s+(\d)\s+units))$")
m = re.match(is_enabled_re, lines[0])
if m == None or m.group(2) == None: return None
if m.group(2) == "disabled": return "disabled"
return m.group(4)
#### parse_show_rx_attached ##########################################
# Returns an ordered list of files attached or None if disabled.
def parse_show_rx_attached (self, lines):
if len(lines) < 2: return None
attached = {}
attachment_re = re.compile("^\s+(((RX)(\d)(.+),\s+(not\s+attached|attached\s+to\s+(\S+)),(.+))|autosize)$")
for line in lines[1:]:
m = re.match(attachment_re, line)
if m == None or m.group(1) == None or m.group(1) == "autosize": continue
filename = m.group(7)
if filename == None: filename = ""
attached[m.group(4)] = filename
return attached
#### set_rx_config ####################################################
def set_rx_config (self, to_rx):
to_rx = to_rx.lower()
if to_rx == "rx8e": from_rx = "rx28"
elif to_rx == "rx01":
to_rx = "rx8e"
from_rx = "rx28"
elif to_rx == "rx28": from_rx = "rx8e"
elif to_rx == "rx02":
to_rx = "rx28"
from_rx = "rx8e"
else:
print("Cannot set_rx_config for " + to_rx)
return False
if self.verbose:
print("Switch rx driver: " + from_rx + ", to: " + to_rx)
lines = self.do_simh_show("rx")
rx_type = self.parse_show_rx_dev (lines)
if rx_type == None:
print("do_rx_change: Trouble parsing \'show rx\' output from simh. Giving up on:")
self.do_print_lines(lines)
return False
elif rx_type == "disabled":
if self.verbose: print("rx is disabled. Enabling...")
reply = self.simh_cmd("set rx enabled")
self.simh_test_result(reply, "Prompt", "set_rx_config enable rx")
# Retry getting rx info
lines = self.do_simh_show("rx")
rx_type = self.parse_show_rx_dev (lines)
if rx_type == None:
print("do_rx_change after re-enable: Trouble parsing \`show rx\` output from simh. Giving up on:")
self.do_print_lines(lines)
return False
elif rx_type == "disabled":
print("do_rx_change re-enable of rx failed. Giving up.")
return False
if rx_type.lower() == to_rx:
print("rx device is already set to " + to_rx)
return None
attached_rx= self.parse_show_rx_attached(lines)
if attached_rx == None:
print("do_rx_change: Trouble parsing /'show rx\' from simh to find rx attachments. Got:")
self.do_print_lines(lines)
else:
for unit in attached_rx.keys():
if attached_rx[unit] != "":
det_comm = "det rx" + unit
if self.verbose:
print(det_comm + "(Had: " + attached_rx[unit] + ")")
reply = self.simh_cmd(det_comm)
self.simh_test_result(reply, "Prompt", "set_rx_config detach: " + det_comm)
reply = self.simh_cmd("set rx " + to_rx)
self.simh_test_result(reply, "Prompt", "set_rx_config set rx " + to_rx)
# Test to confirm new setting of RX
lines = self.do_simh_show("rx")
rx_type = self.parse_show_rx_dev (lines)
if rx_type == None:
print("Failed change of rx to " + to_rx + \
". Parse fail on \'show rx\'.")
return False
elif rx_type.lower() != to_rx:
print("Failed change of rx to " + to_rx + ". Instead got: " + \
rx_type)
return False
return True
#### get_tti ##################################################
# Returns an ordered list of files attached or None if disabled.
def parse_show_tti (self, lines):
if lines == None: return None
is_enabled_re = re.compile("^(KSR|7b)$")
if len(lines) < 2: return None
# That second line of output contains embedded newlines.
m = re.match(is_enabled_re, lines[1].strip())
if m == None or m.group(1) == None: return None
return m.group(1)
#### do_tti_change ###################################################
def set_tti_config (self, to_tti):
if to_tti == "KSR": from_tti = "7b"
elif to_tti == "7b": from_tti = "KSR"
else:
print("Cannot set_tti_config to " + to_tti)
return
if self.verbose:
print("Switch tti driver from: " + from_tti + ", to: " + to_tti)
lines = self.do_simh_show("tti")
tti_type = self.parse_show_tti (lines)
if tti_type == None:
print("do_tti_change: Trouble parsing \'show tti\' output from simh. Giving up on:")
self.do_print_lines(lines)
return False
elif tti_type == to_tti:
print("tti device is already set to " + to_tti)
return None
reply = self.simh_cmd("set tti " + to_tti)
self.simh_test_result(reply, "Prompt", "set_tti_config setting tti " + to_tti)
# Test to confirm new setting of tti
lines = self.do_simh_show("tti")
tti_type = self.parse_show_tti (lines)
if tti_type == None:
print("Failed change of tti to " + to_tti + \
". Parse fail on \'show tti\'.")
return False
elif tti_type != to_tti:
print("Failed change of tti to " + to_tti + ". Instead got: " + \
tti_type)
return False
else:
return True
############### Basic OS/8 Interaction ###############################
# Intereact with OS/8 under SIMH.
#### os8_kbd_delay ###################################################
# Artificially delay the media generation process to account for the
# fact that OS/8 lacks a modern multi-character keyboard input buffer.
# It is unsafe to send text faster than a contemporary terminal could,
# though we can scale it based on how much faster this host is than a
# real PDP-8. See the constants above for the calculation.
def os8_kbd_delay (self):
time.sleep (self._os8_kbd_delay)
#### os8_send_str ########################################################
# Core of os8_send_line. Also used by code that needs to send text
# "blind" to OS/8, without expecting a prompt and without a CR, as
# when driving TECO.
def os8_send_str (self, str):
for i in range (0, len (str)):
self.os8_kbd_delay ()
self._child.send (str[i])
#### os8_send_ctrl ###################################################
# Send a control character to OS/8 corresponding to the ASCII letter
# given. We precede it with the OS/8 keyboard delay, since we're
# probably following a call to os8_send_line or os8_cmd.
def os8_send_ctrl (self, char):
cc = char[0].lower ()
self.os8_kbd_delay ()
self._child.sendcontrol (cc)
if cc == 'e': self._context = 'simh'
#### os8_send_line ###################################################
# Core of os8_cmd. Also used by code that needs to send text
# "blind" to OS/8, without expecting a prompt, as when driving EDIT.
def os8_send_line (self, line):
self.os8_send_str (line)
self._child.send ("\r")
#### os8_test_result ##################################################
# Convenience wrapper for test_result that uses OS/8 replies.
def os8_test_result (self, reply, expected, caller, debug=False):
return self.test_result (reply, expected, self._os8_replies, caller, debug)
#### mk_os8_name # ###################################################
# Create an OS/8 filename: of the form XXXXXX.YY
# From a POSIX path.
def mk_os8_name(self, dev, path):
bns = os.path.basename (path)
bns = re.sub("-|:|\(|\)|!", "", bns)
bns = bns.upper()
if "." not in bns:
return dev + bns[:min(6, len(bns))]
else:
dot = bns.index('.')
return dev + bns[:min(6, dot, len(bns))] + "." + bns[dot+1: dot+3]
#### os8_cmd_ ###################################################
# Send the given command to OS/8.
# replies is an array of possible replies that command will get.
# Returns the index into the replies array that pexpect got,
# or -1 if the command could not be run.
# replies is an optional argument which defaults to _os8_replies_rex
def os8_cmd (self, cmd, replies=None, debug=False, timeout=60):
if replies == None: replies = self._os8_replies_rex
if self._context != 'os8':
print("OS/8 is not running. Cannot execute: " + cmd)
return -1
self.os8_send_line (cmd)
reply = self.child_expect (replies, timeout = timeout)
return reply
#### os8_cfm_monitor ##################################################
# Confirm return to OS/8 monitor.
# This function is necessary so that we know our command has returned to OS/8.
# Without this test, the next child_expect hits a monitor prompt instead of
# what it will be looking for.
# If caller is not empty, it will emit an error string if the prompt wasn't seen.
# Returns True if we are where we expect.
def os8_cfm_monitor (self, caller, debug=False):
reply = self.child_expect(self._os8_replies_rex)
return self.os8_test_result(reply, "Monitor Prompt", caller, debug=debug)
#### os8_ctrl_c ##################################################
# Return to OS/8 monitor using the ^C given escape character.
# We will probably, but not always get an echo of ^C.
# For example BUILD is known NOT to.
# Our testing makes it optional.
# We listen for an uparow and assume the C follows, and listen again
# for the monitor prompt.
# Optional caller argument enables a message if the ^C escape failed.
# Note: OS/8 will respond to this escape IMMEDIATELY,
# even if it has pending output.
# You will need to make sure all pending output is in
# a known state and the running program is quiescent
# before calling this method. Otherwise pexpect may get lost.
def os8_ctrl_c (self, caller = "", debug=False):
self.os8_send_ctrl ("c")
reply = self.child_expect(self._os8_replies_rex)
uparrow = self.os8_test_result(reply, "PIP Continue", "")
if uparrow:
reply = self.child_expect(self._os8_replies_rex)
return self.os8_test_result(reply, "Monitor Prompt", caller)
#### os8_escape ##################################################
# Return to OS/8 monitor using the escape (^]) character.
# We need to listen for the $ echo or else cfm_monitor gets confused.
# Confirm we got our monitor prompt.
# Optional caller argument enables a message if escape failed.
# Note: OS/8 will respond to this escape IMMEDIATELY,
# even if it has pending output.
# You will need to make sure all pending output is in
# a known state and the running program is quiescent
# before calling this method. Otherwise pexpect may get lost.
def os8_escape (self, caller = "", debug=False):
self.os8_send_ctrl ('[')
self.child_expect("\\$")
return self.os8_cfm_monitor (caller)
#### simh_restart_os8 #######################################################
# Abstraction on returning to OS/8 monitor from within SIMH.
# It is common practice to "load address 7600; start" at the console.
def simh_restart_os8 (self, caller = "", debug=False):
# Note we're calling simh with os8 replies because we will
# be switching contexts.
# simh_cmd manages the context, and checks for success
# by expecting a command prompt.
reply = self.simh_cmd ('go 7600', self._os8_replies_rex, debug=debug)
# We test our reply with os8_test result because we're in OS/8 now.
self.os8_test_result(reply, "Monitor Prompt", caller)
#### simh_resume_os8 ##########################################################
# Continue execution of OS/8.
# This has been extremely tricky to get right.
# What we must do is give OS/8 a chance to wake up.
# We do this by asking pexpect to show us our "cont\n" echo and then
# we do an os8_kbd_delay()
# We carefully use simh_send_line to send the command so that we
# know to set the os8 context.
# WARNING! Using simh_resume_os8 without having booted OS/8
# has undefined reslts.
def simh_resume_os8 (self):
self.simh_send_line ("cont")
# Give OS/8 a chance to wake up.
self.child_expect("cont\r")
self.os8_kbd_delay()
#### os8_zero_core ###################################################
# Starting from OS/8 context, bounce out to SIMH context and zero all
# of core excepting:
#
# 0. zero page - many apps put temporary data here
# 1. the top pages of fields 1 & 2 - OS/8 is resident here
# 2. the top page of field 2 - OS/8's TD8E driver (if any) lives here
#
# We then restart OS/8, which means we absolutely need to do #1 and
# may need to do #2. We could probably get away with zeroing page 0.
#
# All of the above explains why we have this special OS/8 alternative
# to the zero_core() method.
def os8_zero_core (self):
reply = self.simh_cmd ('de 00200-07577 0')
self.simh_test_result(reply, "Prompt", "os8_zero_core 00200-07577")
reply = self.simh_cmd ('de 10000-17577 0')
self.simh_test_result(reply, "Prompt", "os8_zero_core de 10000-17577 0")
reply = self.simh_cmd ('de 20000-27577 0')
self.simh_test_result(reply, "Prompt", "os8_zero_core de 20000-27577 0")
reply = self.simh_cmd ('de 30000-77777 0')
self.simh_test_result(reply, "Prompt", "os8_zero_core de 30000-37577 0")
self.simh_restart_os8 ()
#### os8_squish ########################################################
# Wraps the OS/8 SQUISH command for a given device.
def os8_squish (self, device, caller = ""):
reply = self.os8_cmd ("SQUISH " + device + ":")
self.os8_test_result (reply, "ARE YOU SURE?", caller)
reply = self.os8_cmd ("Y")
self.os8_test_result (reply, "Monitor Prompt", caller)
#### os8_pip_to ###################################################
# Send a copy of a local file to OS/8 using PIP.
#
# The file is sent via the SIMH paper tape device through PIP
# specifying a transfer option. If no option is specified,
# ASCII is assumed.
#
# In ASCII mode, we pre-process with txt2ptp which translates
# POSIX ASCII conventions to OS/8 conventions. In all other
# modes, we do not do any translation.
#
# However, we should supply a sacrificial NULL as an additional character
# because the OS/8 PTR driver throws the last character away. (NOT DONE YET)
#
# Entry context should be inside OS/8. Exit context is inside OS/8.
#
def os8_pip_to (self, path, os8name, option = None, debug=False):
if option == None: option = ""
# If os8name is just a device, synthesize an upcased name from
# the POSIX file basename.
if not os.path.exists(path):
print(path + " not found. Skipping.")
return
m = re.match(self._os8_file_re, os8name)
if m != None and (m.group(2) == None or m.group(2) == ""):
dest = self.mk_os8_name(os8name, path)
else:
dest = os8name
# Gross hack:
# The command decoder prompt that comes when we continue a PTR/PTP
# does not have a newline. But everything else that matches on
# a command decoder prompt NEEDS that newline. Otherwise we match
# on asterisks in file specs.
# Remember to decrement replies on this rex list if we use
# _os8_replies
pip_rex = [re.compile("\\*$".encode())]
pip_rex.extend(self._os8_replies_rex)
did_conversion = False
if option == "" or option == "/A":
# Convert text file to SIMH paper tape format in current dir of path.
if self.verbose: print("Format converting " + path)
bdir = pidp8i.dirs.build
# Create uniquified temp path name.
pt = path + "-" + str(os.getpid()) + ".pt_temp"
tool = os.path.join (bdir, 'bin', 'txt2ptp')
subprocess.call (tool + ' < ' + path + ' > ' + pt, shell = True)
did_conversion = True
elif option not in self._valid_pip_options:
print("Invalid PIP option: " + option + ". Ignoring: " + path + \
" to OS/8.")
return
else:
pt = path
# TODO: Sacrificial extra character code goes here.
# Paper tape created, so attach it read-only and copy it in. We're
# relying on txt2ptp to insert the Ctrl-Z EOF marker at the end of
# the file.
reply = self.simh_cmd ('attach -r ptr ' + pt, debug=debug)
self.simh_test_result(reply, "Prompt", "os8_pip_to attaching ptr")
# Enter OS/8.
self.simh_restart_os8 (caller="os8_pip_to")
reply = self.os8_cmd ('R PIP', debug=debug)
# Has PIP Startup been successful?
if self.os8_test_result (reply, "Command Decoder Prompt", "os8_pip_to 0", \
debug=debug) == False: return
# Give file and test for ready to send.
if debug: print ("os8_pip_to: Sending: " + dest + '<PTR:' + option)
reply = self.os8_cmd (dest + '<PTR:' + option, debug=debug)
if self.os8_test_result (reply, "PIP Continue", "os8_pip_to 1", debug=debug) == False:
# If we got "PIP Continue" we keep going. Otherwise return.
# Send escape if reply indicates we didn't go back to the monitor.
if self._os8_replies[reply][2] == False:
self.os8_send_ctrl ('[')
return
# Finish transfer
# The test for success is implicit.
self.os8_send_ctrl ('[')
reply = self.child_expect (pip_rex)
if reply != 0:
self.os8_test_result (reply - 1, "Command Decoder Prompt", "os8_pip_to 2", debug=debug)
# If we did not return to monitor hit escape to exit PIP.
if self._os8_replies[reply - 1][2] == False:
self.os8_send_ctrl ('[') # Success or Non-fatal error.
# Must exit PIP and wait for monitor prompt. Complain if we don't get it!
reply = self.child_expect (self._os8_replies_rex)
self.os8_test_result (reply, "Monitor Prompt", "os8_pip_to 3")
# detach ptr
# self.esc_to_simh ()
# self.simh_cmd ('detach ptr ')
# self.os8_restart ()
# Remove the temp file if we created one.
if did_conversion:
os.remove (pt)
#### os8_pip_from ###################################################
# Fetch a file from OS/8 to a local path using PIP.
#
# The OS/8 source filename is synthesized from the basename of the path,
# upcasing if necessary.
#
# The file is sent via the SIMH paper tape device through PIP
# specifying a transfer option. If no option is specified,
# ASCII is assumed.
#
# In ASCII mode, we post-process with ptp2txt which translates
# POSIX ASCII conventions to OS/8 conventions. In all other
# modes, we do not do any translation.
#
# Entry context should be inside OS/8. Exit context is inside OS/8.
def os8_pip_from (self, os8name, path, option = None, debug=False):
if option == None: option = ""
# If path is not a file, use the name portion of os8name.
if os.path.isdir(path):
colon = os8name.find(':')
if colon == -1: # No dev, just a name.
path = path + "/" + os8name
else:
path = path + "/" + os8name[colon+1:]
if option != "" and option not in self._valid_pip_options:
print("Invalid PIP option: " + option + \
". Ignoring os8_pip_from on: " + path)
return
reply = self.simh_cmd ('attach ptp ' + path, debug=debug)
self.simh_test_result(reply, "Prompt", "os8_pip_from attachng ptp")
# Enter OS/8.
self.simh_restart_os8 (caller="os8_pip_from 0")
reply = self.os8_cmd ('R PIP', debug=debug)
# Has PIP Startup been successful?
if self.os8_test_result (reply, "Command Decoder Prompt", "os8_pip_from 0") == False: return
# Issue file transfer spec.
reply = self.os8_cmd ('PTP:<' + os8name + option, debug=debug)
# Test for Success
if self.os8_test_result (reply, "Command Decoder Prompt", "os8_pip_from 1") == False:
# There is an empty PTP file we need to remove.
os.remove(path)
# Was this a fatal error?
if self._os8_replies[reply][2] == False:
self.os8_send_ctrl ('[') # Non-fatal error. Must exit PIP
return
self.os8_send_ctrl ('[') # exit PIP
# Must wait for monitor prompt. Complain if we don't get it!
reply = self.child_expect (self._os8_replies_rex)
self.os8_test_result (reply, "Monitor Prompt", "os8_pip_from 2")
reply = self.simh_cmd ('detach ptp', debug=debug) # Clean flush of buffers.
self.simh_test_result(reply, "Prompt", "os8_pip_from detaching ptp")
# Enter OS/8.
self.simh_restart_os8(caller="os8_pip_from 3")
if option == "" or option == "/A":
if self.verbose: print("Format converting " + path)
# Convert text file to SIMH paper tape format
bdir = pidp8i.dirs.build
# Create uniquified temp path name.
pf = path + "-" + str(os.getpid()) + ".pf_temp"
os.rename(path, pf)
tool = os.path.join (bdir, 'bin', 'ptp2txt')
subprocess.call (tool + ' < ' + pf + ' > ' + path, shell = True)
os.remove(pf)