#!/usr/bin/env @PYCMD@
# -*- coding: utf-8 -*-
########################################################################
# simh-os8-script.py Library for scripting OS/8 under SIMH
# Contains validators and callers for os8 and simh commands to make
# it easier to create scripts.
#
# Copyright © 2017-2019 by Jonathan Trites, William Cattey, and
# Warren Young.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS LISTED ABOVE BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT
# OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE
# OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
# Except as contained in this notice, the names of the authors above
# shall not be used in advertising or otherwise to promote the sale,
# use or other dealings in this Software without prior written
# authorization from those authors.
########################################################################
# Bring in just the basics so we can bring in our local modules
import os
import sys
import tempfile
import time
sys.path.insert (0, os.path.dirname (__file__) + '/../lib')
sys.path.insert (0, os.getcwd () + '/lib')
# Python core modules we use
import re
from string import Template
import shutil
import subprocess
# Our local modules
from pidp8i import *
from simh import *
# Script Language Version
# Update this version number as the language evolves.
# Version 1.0 is the first public version.
LANG_VERSION = "1.0"
# Error Class Definitions ##############################################
# Enables us to use exceptions from within this module.
class Error(Exception):
"""Base Class for exceptions in this module."""
pass
class InputError(Error):
"""Exception raised for errors in the input.
Attributes:
expr -- input expression in which the error occurred
msg -- explanation of the error
"""
def __init__(self, msg):
self.msg = msg
def __str__(self):
return self.msg
# Private globals ######################################################
# Visible within this file, but not to the outside.
# Identify a begin enabled/not_disabled command. group(1) contains either the enabled or
# disabled flag. Put the rest of the line in group(2)
_begin_en_dis_comm_re = re.compile (r"^begin\s+(enabled|default|version)\s+(.+)$")
# Identify an end enabled/not_disabled command. group(1) contains either the enabled or
# disabled flag. Put the rest of the line in group(2)
_end_en_dis_comm_re = re.compile (r"^end\s+(enabled|default|version)\s+(.+)$")
# Identify an end comm and put the rest of the line in group(1)
_end_comm_re = re.compile (r"^end\s+(.+)?$")
# Identify an end option command and put the rest of the line in group(1)
_end_option_comm_re = re.compile (r"^end\s+option\s+(.+)$")
# A valid version spec
_version_parse_re = re.compile (r"^((\d+\.)*)?(\d+)?$")
# Name of the DECtape image file we create
_new_sys_tape_prefix = "system"
# Parser regexps used in patcher
_com_os8_parse_str = r"^\.([a-zA-Z]+)\s*(.*)$"
_com_os8_parse = re.compile(_com_os8_parse_str)
_com_split_str = r"^([a-zA-Z]+)\s*(.*)$"
_com_split_parse = re.compile(_com_split_str)
_odt_parse_str = r"^([0-7]+)\s*/\s*(\S+)\s+([0-7;]+)"
_odt_parse = re.compile(_odt_parse_str)
# Put command keyword in group(1) and the rest is in group(3)
_comm_re_str = r"^(\S+)(\s+(.+))?$"
_comm_re = re.compile(_comm_re_str)
# Identify an end comm and put the rest of the line in group(1)
_end_comm_re = re.compile (r"^end\s+(.+)?$")
# Identify an end option command and put the rest of the line in group(1)
_end_option_comm_re = re.compile (r"^end\s+option\s+(.+)$")
# Identify a begin command and put the rest of the line in group(1)
_begin_option_comm_re = re.compile (r"^begin\s+option\s+(.+)$")
# Parse an argument string into a sys device with
# device name in group(1), unit number in group(2)
# We put all bootable devices into this string so that when
# we add more devices, for example rl for RL01, we change one
# string not many.
_simh_boot_dev_str = r"(rk|td|dt|rx)(\d*)"
_simh_boot_re = re.compile("^" + _simh_boot_dev_str + "$")
# Parse an argument string for mount into SIMH device
# device name in group(1), unit number in group(2)
# And the rest in group (3)
_mount_regex_str = "^" + _simh_boot_dev_str + r"\s+(.+)$"
_mount_re = re.compile(_mount_regex_str)
# Map of SIMH device names to OS/8 device name prefixes.
_os8_from_simh_dev = {"rk" : "RK", "td" : "DTA", "dt" : "DTA", "rx" : "RX"}
_os8_partitions = {"RK": ["A", "B"]}
# OS/8 file name matching regex
_os8_file_re = re.compile(r"(\S+):(\S+)?")
# Regular expression for syntax checking inside FOTP
# Destination is in group(1), Source is in group(3)
_fotp_re = re.compile (r"^((\S+:)?\S+)<((\S+:)?\S+)$")
# Regular expression for detecting the 2 arg and 3 arg forms
# of the "pal8" script command.
# OS/8 name regex template:
# Optional device spec, i.e. DTA0:
# File spec with a specific extension or no extension.
_os8_fspec = Template (r"((\S+:)?([A-Z0-9]{1,6}|[A-Z0-9]{1,6}\.$ext))")
_os8_BN_fspec = _os8_fspec.substitute(ext="BN")
_os8_PA_fspec = _os8_fspec.substitute(ext="PA")
_os8_LS_fspec = _os8_fspec.substitute(ext="LS")
# Regex to parse a valid OS/8 option string either with slash or parens.
_opt_str = r"((/[A-Z0-9])+|\([A-Z0-9]+\))?"
# Regular expression for syntax checking inside ABSLDR
# FIXME: Use (and test) _opt_str for full OS/8 Option spec compatibility
# One or more OS/8 binary files and optional args beginning with a slash.
_absldr_re = re.compile ("^" + _os8_BN_fspec + "(," + _os8_BN_fspec + r")*(/\S)*$")
# Regular expressions for syntax checking for cpto and cpfrom.
# May be <source> where destination and default option /A is implied.
# Or <source> <option> where destination is implied and option is set.
# Or <source> <destination> where option /A is implied.
# Or <source> <destination> <option> are explicit.
# Valid options are "/A", "/I", and "/B"
# Use two regex's in order:
# <source> in group 1, <dest> in group 2.
# Option is one of /I /B /A in group 4.
# source in group 1, option in group 3.
_from_to_re_1 = re.compile (r"^(\S+)(\s+(/[AIB]))?$")
# source in group 1, destination in group 2, option in group 4.
_from_to_re_2 = re.compile (r"^(\S+)\s+(\S+)(\s+(/[AIB]))?$")
# Array of regular expressions for syntax checking inside BUILD
_build_comm_regs = {"LOAD" : re.compile(r"^(\S+:)?\S+(.BN)?$"),
"UNLOAD": re.compile(r"^\S+(,\S+)?$"),
"INSERT": re.compile(r"^\S+,\S+(,\S+)?$"),
"DELETE": re.compile(r"^\S+(,\S+)?$"),
"SYSTEM": re.compile(r"^\S+$"),
"DSK" : re.compile(r"^(\S+:)?\S+$"),
"BUILD" : re.compile(r"^(\S+(.BN)?)\s+(\S+(.BN)?)$"),
"PRINT" : None,
"BOOT" : None,
"end" : None}
# Parse two whitspace separated arguments into group(1) and group(2)
_two_args_re = re.compile(r"^(\S+)\s+(\S+)$")
_rx_settings = ["rx01", "rx02", "RX8E", "RX28"]
_tape_settings = ["td", "dt"]
_tti_settings = ["KSR", "7b"]
_configurables = {"rx": _rx_settings, "tape": _tape_settings,
"tti": _tti_settings}
# Matches if the string begins with a dollar sign, and has at least
# one slash, returning the string between the dollar sign and the
# first slash in group 1 and the rest in group 2.
# No whitespace in the string.
_expandable_re = re.compile (r"^\$([^/\s]+)/(\S*)$")
# Parse an exit arg for an integer or an integer in parentheses
_exit_arg_re = re.compile (r"^(\s*[+-]?\s*\d+)|\s*\(\s*([+-]?\s*\d+)\s*\)\s*$")
# Options enabled/not_disabled for conditional execution in scripts.
#
# Earlier code allowed --enable and --disable. We interface to it.
# We maintain two arrays: options_enable and options_disabled for those
# two argument constructs.
#
# Argument parsing of repeated enable and disable arguments is as follows:
# --enable looks on options_disabled and if present removes it, then adds
# to options_enabled.
# --disable looks on options_enabled and if present removes it, then adds
# to options_enabled.
#
# Last seen enable/disable command line or executed command for a
# particular option wins.
#
# Scripts have enable/disable commands that are run-time
# changers of the contents of options_enabled and options_disabled.
#
# When we run a script we have begin/end blocks for enabled/not_disabled options:
# "begin enabled <option name>" ... "end enabled <option name>
# "begin not_disabled <option name>" ... "end not_disabled <option name>
#
# The enabled block looks for an explicit enablement on the options_enabled
# list. If none is found we default to ignoring the contents of the block.
#
# The not_disabled block looks for an explicit disablement on the
# options_disabled lis. If found, the block is ignored. Otherwise
# the block defaults to being executed.
#
# begin/end blocks can be nested. We track the nesting with options_stack.
# Testing for options happens when the begin command is evaluated.
# So changing an enable/disable option inside a begin/end block
# takes effect at the next begin statement.
# You can write a script as follows:
# enable foo
# begin enabled foo
# # Commands to executed
# disable foo
# # Commands still being executed.
# begin enabled foo
# # Commands to ignore
# end enabled foo
# end enabled foo
# Local routine to perform a save of a pre-existing file
# because we do this in a couple places
def save_if_needed(path):
if os.path.isfile(path):
save_path = path + ".save"
print("Pre-existing " + path + " found. Saving as " + path + ".save")
if os.path.isfile(save_path):
print("Overwriting old " + path + ".save")
os.remove(save_path)
os.rename(path, save_path)
def version_to_array (version):
vers_array = []
this_str = ""
for c in version:
if c != ".":
this_str += c
else:
vers_array.append(this_str)
this_str = ""
if this_str != "": vers_array.append(this_str)
return vers_array
class os8script:
# Contains a simh object, other global state and methods
# for running OS/8 scripts under simh.
#### globals and constants ###########################################
# Replies that pal8 adds.
_pal8_rep_adds = [
# Prompts come from the existing _os8_replies
# Status:
["ERRORS DETECTED", "ERRORS DETECTED: ", False],
["LINKS GENERATED", "LINKS GENERATED: ", False],
# Errors:
["Buffer Exceeded", r"BE\s+\S+.*\r", True],
["Cref not Found", r"CF\s+\S+.*\r", False],
["Device Error", r"DE\s+\S+.*\r", True],
["Device Full", r"DF\r", True],
["Illegal Character", r"IC\s+\S+.*\r", False],
["Illegal re-Definition", r"ID\s+\S+.*\r", False],
["Illegal Equals", r"IE\s+\S+.*\r", False],
["Illegal Indirect", r"II\s+\S+.*\r", False],
["Illegal PseudoOp", r"IP\s+\S+.*\r", False],
["Illegal page Zero reference", r"IZ\s+\S+.*\r", False],
["Loader not founD", r"LD\s+\S+.*\r", False],
["Link Generated", r"LG\s+\S+.*\r", False],
["current Page Exceeded", r"PE\s+\S+.*\r", False],
["PHase Error", r"PH\s+\S+.*\r", False],
["Re Definition", r"RD\s+\S+.*\r", False],
["Symbol table Exceeded", r"SE\s+\S+.*\r", False],
["Undefined Origin", r"UO\s+\S+.*\r", False],
["Undefined Symbol", r"US\s+\S.*\r", False],
["page Zero Exceeded", r"ZE\s+\S+.*\r", False],
["NOT FOUND", r"\S+ NOT FOUND", False],
]
_build_rep_adds = [
# Prompts:
# Add the BUILD prompt
# Subtle point: Dollar sign appears in all kinds of output
# so we try to minimize where we look for it so as to avoid
# confusing pexpect.
["BUILD Prompt", "\n\\$$", True],
# Status:
["SYS BUILT", "SYS BUILT", False],
["WRITE ZERO DIRECT?", "WRITE ZERO DIRECT\\?", False],
["LOAD OS8", "LOAD OS/8: ", False],
["LOAD CD", "LOAD CD: ", False],
# Errors:
["BAD ARG", "\\?BAD ARG", False],
["BAD INPUT", "\\?BAD INPUT", False],
["BAD LOAD", "\\?BAD LOAD", False],
["BAD ORIGIN", "\\?BAD ORIGIN", False],
["CORE", "\\?CORE", False],
["DSK", "\\?DSK", False],
["HANDLERS", "\\?HANDLERS", False],
["I/O ERR", "I/O ERR", False],
["NAME", "\\?NAME", False],
["NO ROOM", "NO ROOM", False],
["SYS NOT FOUND", "SYS NOT FOUND", False],
["PLAT", "\\?PLAT", False],
["SYNTAX", "\\?SYNTAX", False],
["SYS", "\\?SYS", False],
["SYS ERR", "SYS ERR", False],
["File NOT FOUND", r"\S+ NOT FOUND", False]
]
_ocomp_rep_adds = [
["NOTHING OUTPUT", "NOTHING OUTPUT", False],
["RELATIVE BLOCK", "RELATIVE BLOCK", False],
["USER ERROR", "USER ERROR", False],
]
#### intern_replies ##############################################################
# Teach the os8script object about another set of replies.
# Return True if successful, False if the replies of that name are already in place
# If with_os8 is True, append the simh _os8_replies array.
def intern_replies (self, name, replies, with_os8):
if name in self.replies: return False
self.replies[name] = replies
if with_os8: self.replies[name].extend(self.simh._os8_replies)
# Create the pre-compiled regex array too.
self.replies_rex[name] = []
for item in self.replies[name]:
self.replies_rex[name].append(re.compile(item[1].encode()))
return True
def __init__ (self, simh, enabled_options, disabled_options, verbose=False, debug=True):
self.lang_version = LANG_VERSION
self.verbose = verbose
self.debug = debug
self.simh = simh
self.options_enabled = enabled_options
self.options_disabled = disabled_options
# Do we need separate stacks for enabled/disabled options?
self.options_stack = []
# List of scratch files to delete when we are done with all script runs.
self.scratch_list = []
self.booted = False
# Initial line count stack has a single element, value 0
# In this way we get graceful operation when the API
# is called directly without having opened a script file.
self.line_ct_stack = [0]
# Create the table of reply tables.
# Enable access to the simh and os8 tables through this interface.
self.replies = {"simh": self.simh._simh_replies, "os8": self.simh._os8_replies}
self.replies_rex = {"simh": self.simh._simh_replies_rex, "os8": self.simh._os8_replies_rex}
# Perform intern_replies on all known sub-commands.
self.intern_replies("build", self._build_rep_adds, True)
self.intern_replies("pal8", self._pal8_rep_adds, True)
self.intern_replies("ocomp", self._ocomp_rep_adds, True)
#### path_expand #######################################################
# Simple minded variable substitution in a path.
# A path beginning with a dollar sign parses the characters between
# the dollar sign and the first slash seen becomes a name to
# expand with a couple local names: $home and the anchor directories
# defined in lib/pidp8i/dirs.py.
# Returns None if the expansion fails. That signals the caller to fail.
def path_expand (self, path):
m = re.match(_expandable_re, path)
if m == None: return path
var = m.group(1)
val = getattr (dirs, var, None)
if val != None:
return os.path.join(val,m.group(2))
else:
print("At line " + str(self.line_ct_stack[0]) + \
": {$" + var + "} is not a valid path expansion in " + path)
return None
#### print_expand ######################################################
# Close kin to path_expand. Takes a string that may name a path
# substitution or the magic $version value and performs the appropriate
# value substitution.
def print_expand (self,str):
end = str.find("$")
if end == -1: return str
m = re.findall(r"\$\S+",str)
if m == None: return str
outstr = ""
start = 0
for name in m:
end = str.index(name, start)
outstr += str[start:end]
sub = getattr (dirs, name[1:], None)
if sub == None:
if name == "$version": sub = self.lang_version
else: sub = name
outstr += sub
start = end + len(name)
return outstr
#### version_test ######################################################
# Compare each component of the version test agains the actual version
# Return true if actual version is greater than or equal to the test
# version.
# Caller validates test with _version_parse_re so we only
# need to return True or False, not error.
def version_test (self, test):
test_array = version_to_array(test)
version_array = version_to_array(self.lang_version)
idx = 0
endpoint = len(test_array)
while idx < endpoint:
# If version has more digits than test, the greater than test succeeds.
if idx >= len(version_array):
vers_item = "0"
else:
vers_item = version_array[idx]
test_item = test_array[idx]
if self.debug:
print("version_test: vers_item: " + vers_item + \
", test_item: " + test_item)
vers_num = int(vers_item)
test_num = int(test_item)
# First time version componet greater than test -> success.
if vers_num > test_num:
if self.debug:
print("version_test: Success: version greater than test.")
return True
# First time version component less than test -> failure.
elif test_num > vers_num:
if self.debug: print("version_test: Fails on sub compare.")
return False
#Otherwise is equal. Keep going.
idx += 1
# Made it all the way through. Test succeeds.
if self.debug:
print("version_test: Success. Made it thru test string.")
return True
#### basic_line_parse ################################################
# Returns stripped line and any other cleanup we want.
# Returns None if we should just 'continue' on to the next line.
# Filters out comments.
# Processes the option begin/end blocks.
def basic_line_parse (self, line, script_file):
self.line_ct_stack[0] += 1
retval = line.strip()
if retval == "": return None
elif retval[0] == "#": return None
# First test if we are in a begin option block
m = re.match (_begin_en_dis_comm_re, retval)
if m != None:
en_dis = m.group(1)
rest = m.group(2)
if self.verbose: print("Line " + str(self.line_ct_stack[0]) + \
": doing_begin_option: " + en_dis + " " + rest)
if self.debug:
print("options_enabled: " + str (self.options_enabled))
print("options_disabled: " + str (self.options_disabled))
print("options_stack: " + str(self.options_stack))
vers_match = False
if en_dis == "version":
# Check for mal-formed version match first
if re.match (_version_parse_re, rest) == None:
print("Mal-formed version match string {" + rest + "} at line " + \
str(self.line_ct_stack[0]) + ". Ignoring this block.")
self.ignore_to_subcomm_end (retval, script_file, en_dis + " " + rest)
return None
vers_match = self.version_test (rest)
if vers_match:
# Block is active. We push it onto the stack
if self.debug:
print("Pushing version enabled block " + rest + \
" onto options_stack")
self.options_stack.insert(0, rest)
if self.debug: print(" new options_stack: " + \
str(self.options_stack))
else:
# Option is inactive. Ignore all subseqent lines
# until we get to an end command that matches our option.
self.ignore_to_subcomm_end (retval, script_file, en_dis + " " + rest)
return None
elif en_dis == "enabled":
if rest in self.options_enabled:
# Block is active. We push it onto the stack
if self.debug:
print("Pushing enabled block " + rest + " onto options_stack")
self.options_stack.insert(0, rest)
if self.debug:
print("new options_stack: " + str(self.options_stack))
else:
# Option is inactive. Ignore all subseqent lines
# until we get to an end command that matches our option.
self.ignore_to_subcomm_end (retval, script_file, en_dis + " " + rest)
return None
# only other choice is disabled because of our regex.
else:
if rest not in self.options_disabled:
# Block defaults to active. We push it onto the stack
if self.debug:
print("Pushing not_disabled block " + rest + \
" onto options_stack")
self.options_stack.insert(0, rest)
if self.debug:
print("new options_stack: " + str(self.options_stack))
else:
# Block is inactive. Ignore all subseqent lines
# until we get to an end command that matches our option.
self.ignore_to_subcomm_end (retval, script_file, en_dis + " " + rest)
return None
m = re.match(_end_en_dis_comm_re, retval)
if m != None:
rest = m.group(2)
if self.verbose:
print("Line " + str(self.line_ct_stack[0]) + ": end rest = " + rest)
if (rest == None or rest == ""):
print("Warning! option end statement at line " + \
str(self.line_ct_stack[0]) + " encountered with no argument.")
return None
if len(self.options_stack) == 0:
print("Warning! option end statement at line " + \
str(self.line_ct_stack[0]) + \
" found with no matching begin for option: " + rest)
return None
if rest != self.options_stack[0]:
print("Warning! Mismatched option begin/end group at line " + \
str(self.line_ct_stack[0]) + ". Currently inside option: " + \
self.options_stack[0] + " not " + rest)
return None
else:
if self.debug: print("Popping " + self.options_stack[0])
self.options_stack.pop(0)
if self.debug:
print("new options_stack: " + str(self.options_stack))
return None
return retval
#### ignore_to_subcomm_end ###########################################
def ignore_to_subcomm_end (self, old_line, script_file, end_str):
if self.debug: print("ignore to: " + end_str)
for line in script_file:
self.line_ct_stack[0] += 1
line = line.strip()
if self.verbose:
print("Ignore line " + str(self.line_ct_stack[0]) + ": " + line)
m = re.match(_end_comm_re, line)
if m == None: continue
rest = m.group(1)
if rest == None: rest = ""
if rest == end_str: return
#### include_command #################################################
# Call run_script_file recursively on the file path provided.
def include_command (self, line, script_file):
path = self.path_expand(line)
if path == None:
print("Ignoring: \n\tinclude " + line)
return "fail"
if not os.path.isfile(path):
print("Line " + str(self.line_ct_stack[0]) + \
": Could not find include file: " + path)
return "fail"
if self.verbose:
print("line: " + str(self.line_ct_stack[0]) + ": include " + path)
return self.run_script_file (path)
#### enable_option_command ###########################################
# Deletes an option from the list of active options.
# Parses the first argument after "enable" as the key to enable.
# The end of the key is the end of the line or the first whitespace
# character.
def enable_option_command (self, line, script_file):
if line == "":
print("Empty option to enable at line: " + \
str(self.line_ct_stack[0]) + ".")
return "fail"
m = re.match(_comm_re, line)
if m == None:
print("Could not parse enable command at line " + \
str(self.line_ct_stack[0]) + ".")
return "fail"
option = m.group(1)
if option == None:
print("Empty option to enable command at line: " + \
str(self.line_ct_stack[0]) + ".")
return "fail"
if self.verbose:
print("Line " + str(self.line_ct_stack[0]) + \
": enable option: " + option)
# Remove it from other set if present
if option in self.options_disabled:
self.options_disabled.remove(option)
# Add it if not already present.
if option not in self.options_enabled:
self.options_enabled.append(option)
return "success"
#### disable_option_command ###########################################
# Deletes an option from the list of active options.
# Parses the first argument after "disable" as the key to enable.
# The end of the key is the end of the line or the first whitespace
# character.
def disable_option_command (self, line, script_file):
if line == "":
print("Empty option to disable at line: " + \
str(self.line_ct_stack[0]) + ".")
return "fail"
m = re.match(_comm_re, line)
if m == None:
print("Could not parse disable option command at line " + \
str(self.line_ct_stack[0]) + ".")
return "fail"
option = m.group(1)
if option == None:
print("Empty option to disable command at line " + \
str(self.line_ct_stack[0]) + ".")
return "fail"
if self.verbose:
print("line: " + str(self.line_ct_stack[0]) + \
": disable option: " + option)
# Remove it from other set if present
if option in self.options_enabled:
self.options_enabled.remove(option)
# Add it if not already present.
if option not in self.options_disabled:
self.options_disabled.append(option)
return "success"
#### configure_command ###############################################
# First arg is the item to configure.
# Second arg is the setting.
# This enables adding option setting inside a script file.
def configure_command (self, line, script_file):
m = re.match(_two_args_re, line)
if m == None or m.group(1) == None or m.group(2) == None:
print("Could not parse configure command arguments at line " + \
str(self.line_ct_stack[0]) + ": {" + line + "}")
return "fail"
item = m.group(1)
setting = m.group(2)
if item not in _configurables:
print("Ignoring invalid configuration item at line " + \
str(self.line_ct_stack[0]) + ": " + item)
return "fail"
if setting not in _configurables[item]:
print("At line " + str(self.line_ct_stack[0]) + \
": Cannot set " + item + " to " + setting + ".")
return "fail"
if item == "tape":
self.simh.set_tape_config(setting)
elif item == "rx":
self.simh.set_rx_config (setting)
elif item == "tti":
self.simh.set_tti_config (setting)
return "success"
#### cpto_command ###########################################
# Calls os8_pip_to with the command line arguments.
def cpto_command (self, line, script_file):
if not self.booted:
print("Cannot run cpto command at line " + \
str(self.line_ct_stack[0]) + ". OS/8 has not been booted.")
return "die"
# Is 2nd and final arg the option?
m = re.match(_from_to_re_1, line)
if m != None:
# Yes. Expand Source first.
path = self.path_expand(m.group(1))
if path == None:
print("Ignoring: \n\tcpto " + line)
return "fail"
self.simh.os8_pip_to (path, "DSK:", m.group(2))
else:
# Is this normal case of source, dest, with possibly empty option?
m = re.match(_from_to_re_2, line)
if m == None:
print("Could not parse cpto command at line " + \
str(self.line_ct_stack[0]) + ".")
return "fail"
path = self.path_expand(m.group(1))
if path == None:
print("Ignoring: \n\tcpto " + line)
return "fail"
self.simh.os8_pip_to (path, m.group(2), m.group(4), self.debug)
return "success"
#### cpfrom_command #########################################
# Calls os8_pip_from with the command line arguments.
def cpfrom_command (self, line, script_file):
if not self.booted:
print("Cannot run cpfrom command at line " + \
str(self.line_ct_stack[0]) + ". OS/8 has not been booted.")
return "die"
m = re.match(_from_to_re_2, line)
if m == None:
print("Could not parse cpfrom command at line " + \
str(self.line_ct_stack[0]) + ".")
return "fail"
path = self.path_expand(m.group(2))
if path == None:
print("Ignoring: \n\tcpfrom " + line)
return "fail"
self.simh.os8_pip_from (m.group(1), path, m.group(4))
return "success"
#### copy_command ###############################################
# Simple script interface to create a copy of a file.
def copy_command (self, line, script_file):
m = re.match(_two_args_re, line)
if m == None or m.group(1) == None or m.group(2) == None:
print("Could not copy command: " + line)
return "fail"
from_path = self.path_expand(m.group(1))
to_path = self.path_expand(m.group(2))
if from_path == None or to_path == None:
print("Ignoring: \n\t copy " + line)
return "fail"
print("copy command: \n\tfrom: " + from_path + ", \n\tto: " + to_path)
if (not os.path.isfile(from_path)):
print("At line " + str(self.line_ct_stack[0]) + \
": Required copy input file: " + from_path + " not found.")
return "fail"
save_if_needed(to_path)
try:
shutil.copyfile(from_path, to_path)
except shutil.Error as e:
print("copy command failed with error: " + str(e))
return "fail"
except IOError as e:
print("copy command failed with IOError: " + str(e))
return "fail"
return "success"
#### resume_command #############################################
# Call the os8_resume in simh to resume OS/8.
def resume_command (self, line, script_file):
if not self.booted:
print("Cannot run resume command at line " + \
str(self.line_ct_stack[0]) + ". OS/8 has not been booted.")
return "die"
if self.verbose:
print("Resuming OS/8 at line " + str(self.line_ct_stack[0]) + ".")
self.simh.simh_resume_os8()
return "success"
#### restart_command #############################################
# Call the os8_restart in simh to resume OS/8.
def restart_command (self, line, script_file):
if not self.booted:
print("Cannot run restart command at line " + \
str(self.line_ct_stack[0]) + ". OS/8 has not been booted.")
return "die"
if self.verbose:
print("Restarting OS/8 at line " + str(self.line_ct_stack[0]) + ".")
self.simh.simh_restart_os8()
return "success"
#### patch_command ##############################################
# Read the named patch file and perform its actions.
def patch_command (self, line, script_file):
if not self.booted:
print("Cannot run patch command at line " + \
str(self.line_ct_stack[0]) + ". OS/8 has not been booted.")
return "die"
path = self.path_expand(line)
if path == None:
print("Ignoring: \n\t" + "patch " + line)
return "fail"
if not os.path.isfile(path):
print("At line " + str(self.line_ct_stack[0]) + \
": Patch file: " + path + " not found.")
return "fail"
self.run_patch_file (path, line, script_file)
return "success"
#### exit_command ###############################################
# Make a graceful exit:
# Remove scratch files.
# Detach all devices from running image.
# Quit SIMH.
# Parse an exit status value from line. Default to 0.
# Call POSIX exit to exit the running program,
def exit_command (self, line, script_file):
m = re.match (_exit_arg_re, line)
if m != None:
if m.group(1) != None:
status = int(m.group(1))
elif m.group(2) != None:
status = int(m.group(2))
else:
status = 0
for filename in self.scratch_list:
if self.verbose: print("Deleting scratch_copy: " + filename)
os.remove(filename)
self.simh.simh_cmd ("detach all")
self.simh._child.sendline("quit")
if self.verbose:
print("Calling sys.exit (" + str(status) + ") at line: " + \
str(self.line_ct_stack[0]) + ".")
sys.exit(status)
#### print_command ###############################################
# Print text from the running script
# If verbose is set, say what line in the script containing
# the print command.
def print_command (self, line, script_file):
if self.verbose:
print("Line: " + str(self.line_ct_stack[0]) + ": " + line)
else:
print(self.print_expand(line))
return "success"
#### _command ###########################################
#
def _command (self, line, script_file):
return "success"
#### run_script_file ############################################
# Run os8 command script file
# Call parsers as needed for supported sub commands.
#
# Commands:
# mount <simh-dev> <image-file> [<option> ...]
# option: required | preserve | readonly | ro | scratch
# umount <simh-dev>
# boot <simh-dev>
# os8 <command-line>
# the rest of the line is passed verbatim to OS/8
# pal8 <pal-command line>
# include <script-file>
# configure <device> <parameter>
# device: tti | tape | rx
# tt parameter: KSR | 7b
# tape parameter: td | dt
# rx parameter: rx8e | rx28 | rx01 | rx02
# enable <parameter>
# disable <parameter>
# cpto <posix-file> [<os8-file>] [<format>]
# cpfrom <os8-file> <posix-file> [<format>]
# format: /A | /I | /B
# copy <from-file> <to-file>
# patch <patch-file>
# resume
# restart
# begin <sub-command> <os8-path>
# end <sub-command>
# print <output text>
# exit <status>
# Sub-commands:
# build, fotp, absldr
#
# Commands return, "success", "fail", or "die".
def run_script_file (self, script_path):
try:
script_file = open(script_path, "r")
except IOError:
print(script_path + " not found.")
return "fail"
return self.run_script_handle(script_file, script_path)
def run_script_handle (self, script_file, script_path):
# Strings, regexps, and command arrays used by run_system
commands = {"mount": self.mount_command,
"boot": self.boot_command,
"os8": self.os8_command,
"ocomp": self.ocomp_command,
"pal8": self.pal8_command,
"include": self.include_command,
"begin": self.begin_command,
"end": self.end_command,
"exit": self.exit_command,
"print": self.print_command,
"umount": self.umount_command,
"simh": self.simh_command,
"configure": self.configure_command,
"enable": self.enable_option_command,
"disable": self.disable_option_command,
"cpto": self.cpto_command,
"cpfrom": self.cpfrom_command,
"copy": self.copy_command,
"resume": self.resume_command,
"restart": self.restart_command,
"patch": self.patch_command}
# Every time we start a new script
# We append a new line number count of 0
# onto our line_ct_stack
self.line_ct_stack.insert(0, 0)
if self.debug:
print("New line_ct_stack: " + str(self.line_ct_stack))
for line in script_file:
line = self.basic_line_parse (line, script_file)
if line == None: continue
m = re.match(_comm_re, line)
if m == None:
print("Ignoring command line at line " + \
str(self.line_ct_stack[0]) + ": " + line)
continue
if m.group(1) not in commands:
print("Unrecognized script command at line " + \
str(self.line_ct_stack[0]) + ": " + m.group(1))
continue
# print("arg: " + m.group(3))
if m.group(3) == None: rest = ""
else: rest = m.group(3)
retval = commands[m.group(1)](rest, script_file)
if retval == "die":
print("\nFatal error encountered in " + script_path + \
" at line " + str(self.line_ct_stack[0]) + ":")
print("\t" + line)
sys.exit(-1)
elif retval == "fail":
print("Non-fatal error encountered in " + script_path + \
" at line " + str(self.line_ct_stack[0]) + ":")
print("\t" + line + "\n")
# Done. Pop the line count off our line_ct_stack
self.line_ct_stack.pop(0)
if self.debug:
print("popped line_ct_stack: " + str(self.line_ct_stack))
return "success"
#### end_command #####################################################
def end_command (self, line, script_file):
print("Unexpectedly encountered end command at line " + \
str(self.line_ct_stack[0]) + ": " + line)
return "fail"
#### parse_odt #######################################################
def parse_odt (self, line):
if self.debug: print(line)
if line == "\\c": return "break"
match = _odt_parse.match(line)
if match == None:
print("Aborting because of bad ODT line: " + line)
self.simh.os8_ctrl_c (caller="parse_odt")
return "err"
loc = match.group(1)
old_val = match.group(2)
new_val = match.group(3)
expect_val_str = r"\s*[0-7]{4} "
if self.debug:
print("Loc: " + loc + ", old_val: " + old_val + ", new_val: " + \
new_val)
self.simh.os8_send_str (loc + "/")
self.simh._child.expect(expect_val_str)
if old_val.isdigit(): # We need to check old value
found_val = self.simh.child_after().strip()
if found_val != old_val:
print("\tOld value: " + found_val + " does not match " +
old_val + ". Aborting patch.")
# Abort out of ODT back to the OS/8 Monitor
self.simh.os8_ctrl_c ("parse_odt 2")
return "err"
self.simh.os8_send_line (new_val)
return "cont"
#### futil_exit ########################################################
def futil_exit (self, line):
self.simh.os8_send_line(line)
return "break"
#### futil_file ########################################################
def futil_file (self, line):
# Redundant re-parse of line but nobody else wants args right now.
match = _com_split_parse.match(line)
if match == None:
print("Aborting because of mal-formed FUTIL FILE command: " + line)
self.simh.os8_ctrl_c (caller="futil_file")
return "err"
fname = match.group(2)
expect_futil_file_str = "\n" + fname + r"\s+(?!" + fname + r")?(\S+)(.*)$"
self.simh.os8_send_line (line)
self.simh._child.expect(expect_futil_file_str)
result = self.simh.child_after().strip()
match = _com_split_parse.match(result)
if match == None:
print("Aborting because unexpected return status: " + result + \
" from: " + line)
self.simh.os8_ctrl_c (caller="futil_file 2")
return "err"
if match.group(2).strip() == "LOOKUP FAILED":
print("Aborting because of FUTIL lookup failure on: " + fname)
self.simh.os8_ctrl_c (caller="futil_file 3")
return "err"
else:
return "cont"
#### parse_futil #####################################################
#
# Very simple minded:
# If first char on line is an alpha, run the command.
# If the first char on line is number, do the substitute command.
#
# Substitute command acts like ODT.
# Future version should support the IF construct.
#
# When we encounter the EXIT command, we return success.
def parse_futil (self, line):
futil_specials = {
"EXIT": self.futil_exit,
"FILE": self.futil_file
}
if line[0].isdigit():
# Treat the line as ODT
return self.parse_odt(line)
else:
match = _com_split_parse.match(line)
if match == None:
print("Ignoring failed FUTIL command parse of: " + line)
return "cont"
fcom = match.group(1)
rest = match.group(2)
if fcom not in futil_specials:
# Blind faith and no error checking.
self.simh.os8_send_line(line)
return "cont"
else:
return futil_specials[fcom](line)
#### run_patch_file ##################################################
def run_patch_file (self, pathname, line, script_file):
sys.stdout.write ("Applying patch " + os.path.basename (pathname) + "...")
sys.stdout.flush ()
try:
patch_file = open(pathname, "r")
except IOError:
print(pathname + " not found. Skipping.")
return "fail"
# Supported parsers.
# Note tht FUTIL has not been tested since we flipped the expect order.
parsers = {
"ODT": self.parse_odt,
"FUTIL": self.parse_futil
}
# What prompt do we want when we run an allowed command?
# If there is none, we don't wait. But if we want one,
# We BETTER wait for it. A race condition has been observed
# where, if we don't wait for SAVE, it might not happen.
allowed_commands = ["ODT", "R", "GET", "GE", "SAVE", "SA"]
inside_a_command = False
the_command = ""
the_command_parser = None
# Any os8-run command may be called after a simh command that left us
# in simh context. Check to see if we need to restart OS/8.
# We resume OS/8. It should be non-disruptive and work well.
if self.simh._context == "simh":
self.resume_command(line, script_file)
for line in patch_file:
line = line.rstrip()
if line == "": continue
elif line[0] == '#': continue # Ignore Comments
elif inside_a_command:
retval = the_command_parser (line)
if retval == "break":
inside_a_command = False
self.simh.os8_ctrl_c (caller="run_patch_file")
elif retval == "err":
patch_file.close()
return "fail"
elif line[0] == '.': # New OS/8 Command
match = _com_os8_parse.match(line)
if match == None:
print("Aborting patch on failed OS/8 command parse of: " + line)
return "fail"
com = match.group(1)
rest = match.group(2)
if com not in allowed_commands:
print ("Command " + com + " not allowed in patch. Ignoring.")
continue
# Fall through to start up our command.
# Maybe command is ".ODT", maybe it's "R FUTIL"
if com == "R":
# Check to see if we can run this.
if rest not in parsers:
print ("Not allowed to run " + rest + " in a patch file. Aborting.")
return "fail"
com = rest # Now fall through an set the_command
the_command = com
# Noisy debug output. Our line.
if self.verbose and self.debug: print(line)
# Are we going to be inside a command after we do the run?
if com in parsers:
the_command_parser = parsers[com]
inside_a_command = True
# Run the line blind, skipping over the dot at the beginning.
# We won't get a prompt.
self.simh.os8_send_line (line[1:])
# It's an OS/8 command
else:
# We must test for monitor prompt to signify success.
# Otherwise a race condition has been shown to prevent save of
# patched binary because our exit of SIMH happens too quickly!
reply = self.simh.os8_cmd (line[1:]) # Skip over Prompt in line.
# We need to confirm we succeeded.
# If we fail, we just complain and keep going.
self.simh.os8_test_result (reply, "Monitor Prompt", "run_patch_file command " \
+ line[1:])
# Done with all patch file lines.
patch_file.close()
print("Success.")
return "success"
#### skip_patch ######################################################
# Returns true if the given filename matches one of the regex string
# keys of the given skips dict and the flag value for that key is set.
# See skips definition in make_patch, which calls this.
def skip_patch (fn, skips):
for p in skips:
if re.search (p, fn) and skips[p]: return True
return False
#### simh_command ####################################################
# I tried to avoid including this command but sometimes you just
# have to reconfigure subtle bits of device drivers.
# We assume we can call a simh command at any time, but
# doing so puts us in the simh context that persists until we
# issue a boot or go command.
def simh_command (self, line, script_file):
print("simh command is disabled. Line " + \
str(self.line_ct_stack[0]) + " ignored.")
return "fail"
if self.verbose: print(line)
reply = self.simh.simh_cmd(line)
self.simh.simh_test_result(reply, "Prompt", "simh_command")
return "success"
#### umount_command ##################################################
def umount_command (self, line, script_file):
detach_comm = "det " + line
if self.verbose: print("line: " + \
str(self.line_ct_stack[0]) + ": " + detach_comm)
reply = self.simh.simh_cmd(detach_comm, debug=self.debug)
self.simh.simh_test_result(reply, "Prompt", "umount_command")
return "success"
#### make_scratch ####################################################
# Create a copy of the contents of image file using the python
# method to create a secure, named temp filename.
# The caller has split the image file name into a base path,
# and an extension which we use.
# After copying the source image, the file is closed and the name
# of the scratch file is returned.
def make_scratch (self, base_imagename, extension):
try:
src_image = open(base_imagename+extension, "rb")
except IOError:
print("Open of imagefile " + base_imagename + extension + " failed.")
return None
dest_image = tempfile.NamedTemporaryFile(mode='w+b', \
prefix=base_imagename+'-temp-', \
suffix=extension, \
delete=False)
destname = dest_image.name
if self.debug: print("Temp file name is: " + destname)
try:
shutil.copyfileobj(src_image, dest_image)
except shutil.Error as e:
print("Copy of imagefile " + base_imagename + extension + \
" to scratch failed with error: " + str(e))
return None
except IOError as e:
print("Copy of imagefile " + base_imagename + extension + \
", failed with IOError: " + str(e))
return None
src_image.close()
dest_image.close()
self.scratch_list.append(destname)
return destname
#### mount_command ###################################################
# mount <simh-device> <image-file> [option ...]
#
# Interface to SIMH attach command with options that do a bit more.
#
# Options:
# required: <image-file> is required to exist, otherwise abort the script.
# preserve if <image-file> already exists, create a copy with a
# version number suffix. This is useful when you want to prevent
# overwrites of a good image file with changes that might not work.
# os8-run preserves all versions seen and creates a new
# version that doesn't overwrite any of the previous ones.
# readonly: Passes the `-r` option to SIMH attach to mount the
# device in read only mode.
# ro: abbreviation for readonly.
# scratch: Create a writeable scratch copy of the named image
# file and mount it. This is helpful when you are booting a
# distribution DECtape. Booted DECtape images must be writeable.
# To protect a distribution DECtape, use this option.
# When all script runs are done, the scratch version is deleted.
# new: If there is an existing file, move it aside as a .save because
# we want to create a new empty image file.
#
# If the mount command fails for any reason, we presume
# it is a fatal error and abort the script.
def mount_command (self, line, script_file):
m = re.match(_mount_re, line)
if m == None:
print("At line " + str(self.line_ct_stack[0]) + \
", could not parse mount. Ignoring: {" + line + "}.")
return "die"
simh_dev = m.group(1)
unit = m.group(2)
rest = m.group(3)
parts = rest.split()
if len(parts) == 0:
print("At line " + str(self.line_ct_stack[0]) + \
": No image name specified in: {" + line + "}")
return "die"
ro_arg = ""
imagename = self.path_expand(parts[0])
if imagename == None: return "die"
dot = imagename.rindex(".")
base_imagename = imagename[:dot]
extension = imagename[dot:]
copy_imagename = ""
# Case of additional arguments.
if len (parts) > 1:
# Perform must_exist before scratch
if "new" in parts[1:]:
save_if_needed(imagename)
if "must_exist" in parts[1:] or "required" in parts[1:]:
if not os.path.exists(imagename):
print("At line " + str(self.line_ct_stack[0]) + ", " + \
imagename + " must exist but was not found. Not mounting.")
return "die"
if "scratch" in parts[1:]:
copy_imagename = self.make_scratch(base_imagename, extension)
if copy_imagename == None: return "die"
imagename = copy_imagename
if "readonly" in parts[1:] or "ro" in parts[1:]:
if copy_imagename != "":
print("At line " + str(self.line_ct_stack[0]) + \
": You don't really need to set readonly on a scratch copy.")
ro_arg = "-r "
if "no_overwrite" in parts[1:] or "preserve" in parts[1:]:
if copy_imagename != "":
print("Ignoring preserve option at line " + \
str(self.line_ct_stack[0]) + \
" because scratch option is present.")
else:
next_tape = 0
while os.path.isfile(imagename):
print("Found: " + imagename)
next_tape += 1
imagename = base_imagename + "-" + str(next_tape) + extension
if unit == None or unit == "":
print("Need unit number for: " + line)
return "die"
if simh_dev not in _os8_from_simh_dev:
print("At line " + str(self.line_ct_stack[0]) + \
": Unrecognized simh dev: " + simh_dev)
return "die"
os8dev = _os8_from_simh_dev[simh_dev]
attach_comm = "att " + ro_arg + simh_dev + unit + " " + imagename
if self.verbose:
print("Line " + str(self.line_ct_stack[0]) + ": mount: " + \
attach_comm)
reply = self.simh.simh_cmd(attach_comm)
self.simh.simh_test_result(reply, "Prompt", "mount_command")
return "success"
#### boot_command ####################################################
#
# Check to see if the device to be booted has something attached.
# If not die.
# If so, boot it, and set our booted state to True.
def boot_command (self, line, script_file):
# First confirm something is attached to boot from.
ucname = line.upper()
boot_replies = [ucname + r"\s+(.+)\r", "Non-existent device"]
self.simh.simh_send_line("show " + line)
retval = self.simh._child.expect(boot_replies)
if retval == 1:
print("Attempt to boot non-existent device: " + line)
return "die"
reply_str = self.simh.child_after()
m = re.match(r"^(\S+)\s+(\S+),\s+(attached to |not attached)(\S+)?,\s+(.+)\r",
reply_str)
if m == None:
print("Could not determine if device " + line + " is attached; " +
"got '" + reply_str + "'")
return "die"
# Caution match group we want ends with a space.
if m.group(3) != "attached to ":
print("Attempt to boot on non-attached device: " + line)
return "die"
boot_comm = "boot " + line
if self.verbose:
print("Line " + str(self.line_ct_stack[0]) + ": " + boot_comm)
reply = self.simh.simh_cmd(boot_comm, self.simh._os8_replies_rex)
# We're in OS/8 now use that tester.
self.simh.os8_test_result (reply, "Monitor Prompt", "boot_command")
self.booted = True
return "success"
#### check_and_run ##################################################
# Common routine to check that we are bootable, in the right context
# are able to run a command under OS/8, and that we successfully run it.
# caller is the name we give out as our caller.
# Return -1 if we are not booted and the command needs to die.
# otherwise return the reply to the command.
# We generically support reporting out if transcript is enabled.
def check_and_run (self, caller, os8_comm, script_file, replies=None):
if not self.booted:
print("Cannot run " + caller + " command at line " + \
str(self.line_ct_stack[0]) + ". OS/8 has not been booted.")
return -1
if self.verbose: print("Line: " + \
str(self.line_ct_stack[0]) + ": " + caller + "_command: " + os8_comm)
# Any os8-run command may be called after a simh command that left us
# in simh context. Check to see if we need to restart OS/8.
# We could resume, but restart is safer.
if self.simh._context == "simh":
self.restart_command(os8_comm, script_file)
reply = self.simh.os8_cmd (os8_comm, replies, debug=self.debug)
if "transcript" in self.options_enabled:
print (self.simh._child.before.decode().strip())
return reply
#### os8_command #####################################################
def os8_command (self, line, script_file):
reply = self.check_and_run("os8", line, script_file)
if reply == -1: return "die"
# We expect that the command will exit to the OS/8 monitor when done.
# What if it doesn't?
# We want to print errors
mon = self.simh.os8_test_result (reply, "Monitor Prompt", "")
if mon: return "success"
cd = self.simh.os8_test_result (reply, "Command Decoder Prompt", "")
retval = "fail"
if cd:
# OS/8 command returned to command decoder. We escape.
mon = self.simh.os8_escape ("")
if mon == False:
print ("os8_command: Escape from command decoder failed. Trying ^C.")
mon = self.simh.os8_ctrl_c ("")
if mon == False:
print ("^C failed to return to monitor. Killing script.")
return "die"
print ("os8_command Error: \n\t" + line)
print ("\t" + self.simh.child_after().strip())
# If this was not a fatal error, we need to ^C to get back to Monitor.
# But sometimes OS/8 is not yet ready for ^C, so we wait half a second.
if self.simh._os8_replies[reply][2] == False:
time.sleep (.5)
print ("Sending ^C to return to monitor.")
escape = self.simh.os8_ctrl_c ("", debug=self.debug)
if escape: return "fail"
else:
print ("^C failed to return to monitor. Killing script.")
return "die"
else:
# Got fatal error. Expect monitor prompt so state machine is in the right place.
mon = self.simh.os8_cfm_monitor("")
if mon: return "fail"
else:
print ("os8_command: Failed to return to monitor after fatal error. Killin script.")
return "die"
#### pal8_command ####################################################
# The "pal8" script command comes in two forms:
# The two argument form where the PAL8 status is printed on the fly
# and the 3 argument form where all status goes into the listing file.
# We do the 3 argument form with a simple "os8" script command.
def pal8_command (self, line, script_file):
if self.verbose: print("Running PAL8 on: " + line)
reply = self.check_and_run("pal8", "R PAL8", script_file)
if reply == -1: return "die"
if self.simh.os8_test_result (reply, "Command Decoder Prompt", "call_pal8") == False:
print("PAL8 failed to start at line " + str(self.line_ct_stack[0]))
return "fail"
if self.verbose: print("Line: " + \
str(self.line_ct_stack[0]) + ": pal8_command: " + line)
# Send our command line args and harvest results.
reply = self.simh.os8_cmd (line, self.replies_rex["pal8"])
err_count = 0
executed_line = self.simh._child.before.decode().strip()
mon = self.simh.test_result(reply, "Monitor Prompt", self.replies["pal8"], "")
err_string = ""
ret_val = "success"
expect_loop = 1
# We're going to print all the errors we see
# and keep looking till we get either a monitor or command decoder prompt.
while not mon:
if self.debug: print ("Got reply [" + str(expect_loop) +"] " + str(reply) + " -> " + \
self.replies["pal8"] [reply][0])
ed = self.simh.test_result(reply, "ERRORS DETECTED", self.replies["pal8"], "")
cd = self.simh.test_result(reply, "Command Decoder Prompt", self.replies["pal8"], "")
lg = self.simh.test_result(reply, "LINKS GENERATED", self.replies["pal8"], "")
if ed: # Got Errors Detected. Count them.
self.simh._child.expect(r"\d+")
err_count = int(self.simh.child_after().strip())
elif lg:
self.simh._child.expect(r"\d+")
link_count = int(self.simh.child_after().strip())
elif cd: # Got Command Decoder. Exit with failure.
if self.debug: print ("call_pal8: Non-fatal error: Sending ^C")
# Exit pal8. We'll confirm we got back to OS/8 monitor below.
self.simh.os8_send_ctrl ('c')
else:
err_string += ("\t" + self.simh.child_after().strip() + "\n")
expect_loop += 1
reply = self.simh._child.expect(self.replies_rex["pal8"])
mon = self.simh.test_result(reply, "Monitor Prompt", self.replies["pal8"], "")
if err_count > 0 or err_string != "":
print ("PAL8 Error: \n\t" + executed_line)
print (err_string)
return "fail"
return "success"
#### ocomp_command ###################################################
# This command will be used to compare files in 2 places and
# Return success if they are the same, and return failure if
# one or the other is missing, or if they are different.
# This requires making an interpretation of the output that is
# different from the expected "octal dump of differences" use case.
# "NOTHING OUTPUT" means success.
# Any "USER ERROR" output means failure.
def ocomp_command (self, line, script_file):
if self.verbose: print("Running OCOMP on: " + line)
reply = self.check_and_run("ocomp", "R OCOMP", script_file)
if reply == -1: return "die"
if self.simh.os8_test_result (reply, "Command Decoder Prompt", "call_ocomp") == False:
print("OCOMP failed to start at line " + str(self.line_ct_stack[0]))
return "fail"
if self.verbose: print("Line: " + \
str(self.line_ct_stack[0]) + ": ocomp_command: " + line)
# Send our command and harvest results.
reply = self.simh.os8_cmd (line, self.replies_rex["ocomp"])
if "transcript" in self.options_enabled:
print (self.simh._child.before.decode().strip())
ret_val = "fail"
expect_loop = 1
mon = self.simh.test_result(reply, "Monitor Prompt", self.replies["ocomp"], "")
# We get return status and then clean up the state machine.
while not mon:
if self.debug:
print ("ocomp gave reply: [" + str(expect_loop) +"] " + str(reply) + " -> " + \
self.replies["ocomp"][reply][0])
ok = self.simh.test_result(reply, "NOTHING OUTPUT", self.replies["ocomp"], "")
cd = self.simh.test_result(reply, "Command Decoder Prompt", self.replies["ocomp"], "")
nf = self.simh.test_result(reply, "File not found", self.replies["ocomp"], "")
ue = self.simh.test_result(reply, "USER ERROR", self.replies["ocomp"], "")
if ok:
if self.debug: print ("Success")
ret_val = "success"
elif nf or ue:
print ("OCOMP:" + self.simh.child_after().strip() + "\n")
elif cd: # Got Command Decoder. Exit with failure.
if self.debug: print ("call_ocomp: Non-fatal error: Sending ^C")
# Exit OCOMP. We'll confirm we got back to OS/8 monitor below.
self.simh.os8_send_ctrl ('c')
reply = self.simh._child.expect(self.replies_rex["ocomp"])
if "transcript" in self.options_enabled:
print (self.simh._child.before.decode().strip())
expect_loop += 1
mon = self.simh.test_result(reply, "Monitor Prompt", self.replies["ocomp"], "")
return ret_val
#### begin_command ###################################################
def begin_command (self, line, script_file):
if not self.booted:
print("Cannot execute begin subcommand block at line " + \
str(self.line_ct_stack[0]) + ". OS/8 has not been booted.")
return "die"
sub_commands = {"build": self.build_subcomm,
"cdprog": self.cdprog_subcomm}
m = re.match(_comm_re, line)
if m == None:
print("Could not parse sub-command at line " + \
str(self.line_ct_stack[0]) + ": " + line)
if m.group(1) not in sub_commands:
print("Ignoring unrecognized sub-command at line " + \
str(self.line_ct_stack[0]) + ": " + m.group(1))
print("Ignoring everything to next end.")
self.ignore_to_subcomm_end(line, script_file, "")
return "fail"
else:
# Any os8-run command may be called after a simh command that left us
# in simh context. Check to see if we need to restart OS/8.
# We could resume, but restart is safer.
if self.simh._context == "simh":
self.restart_command(line, script_file)
return sub_commands[m.group(1)](m.group(3), script_file)
#### build_subcomm ###################################################
# Run system BUILD command.
# Use RU not R because we will save it.
# Allow specifying WHICH version of build on the command line.
def build_subcomm (self, old_line, script_file):
os8_comm = "RU " + old_line
if self.verbose:
print("Line " + str(self.line_ct_stack[0]) + ": " + os8_comm)
# Run BUILD and confirm it has started successfully.
reply = self.simh.os8_cmd (os8_comm, self.replies_rex["build"], debug=self.debug)
if self.simh.test_result (reply, "BUILD Prompt", self.replies["build"], \
"build_subcomm startup " + os8_comm, debug=self.debug) == False:
print ("Line " + str(self.line_ct_stack[0]) + ": " + os8_comm + " failed.")
print ("Ignoring the rest of this block.")
self.ignore_to_subcomm_end(os8_comm, script_file, "build")
# Confirm we're back to monitor command level.
self.simh.os8_cfm_monitor ("build_subcomm 1")
return "fail"
# Submit the lines from script file to BUILD sub-command.
for line in script_file:
line = self.basic_line_parse(line, script_file)
if line == None: continue
m = re.match(_comm_re, line)
if m == None:
print("Ignoring mal-formed build sub-command at line " + \
str(self.line_ct_stack[0]) + ": " + line)
continue
build_sub = m.group(1)
rest = m.group(3)
if rest == None: rest = ""
if self.debug: print ("build_sub: " + build_sub)
if build_sub not in _build_comm_regs:
print("Unrecognized BUILD command at line " + \
str(self.line_ct_stack[0]) + ": " + build_sub)
continue
# Handle the exit from BUILD when we hit an "end" statement.
if build_sub == "end":
if rest == "":
print("Warning! end statement encountered inside build with no argument at line " + \
str(self.line_ct_stack[0]) + ".\nExiting build.")
# Exit BUILD. Note! Unlike many commands build does NOT echo "^C!"
self.simh.os8_ctrl_c ("build_subcomm end with no argument")
return "fail"
elif rest != "build":
print("Warning! Mismatched begin/end blocks in BUILD at line " + \
str(self.line_ct_stack[0]) + ".\nEncountered end: {" + \
rest + "}. Exiting BUILD.")
# Exit BUILD. Note! Unlike many commands build does NOT echo "^C!"
self.simh.os8_ctrl_c ("build_subcomm mismatched begin/end")
return "fail"
# We're done. Print any desired verbose or debug status
# then ^C to exit and confirm we're at monitor level.
if self.verbose:
print("Line " + str(self.line_ct_stack[0]) + ": end BUILD")
if self.debug:
print("before: " + self.simh._child.before.decode().strip())
print("after: " + self.simh.child_after().strip())
# Return to monitor.
# Note! Unlike many commands build does NOT echo "^C!"
self.simh.os8_ctrl_c ("build_subcomm end with no argument.")
return "success"
# Now perform sub-commands within BUILD. They have a discernable format.
build_re = _build_comm_regs[build_sub]
if build_re != None:
m2 = re.match(build_re, rest)
if m2 == None:
print("Ignoring mal-formed BUILD at line " + \
str(self.line_ct_stack[0]) + ": " + build_sub + \
" command: " + rest)
continue
if build_sub == "BUILD":
if m2.group(1) == None or m2.group(1) == "":
print("Missing source of OS8.BN. Ignoring BUILD command at line " + \
str(self.line_ct_stack[0]) + ".")
continue
else: kbm_arg = m2.group(1)
if m2.group(3) == None or m2.group(3) == "":
print("Missing sorce of CD.BN. Ignoring BUILD command at line " + \
str(self.line_ct_stack[0]) + ".")
continue
else: cd_arg = m2.group(3)
if self.verbose:
print("Line " + str(self.line_ct_stack[0]) + \
": BUILD KBM: " + kbm_arg + ", CD: " + cd_arg)
# Confirm prompted for OS8 binary.
reply = self.simh.os8_cmd ("BUILD", self.replies_rex["build"], debug=self.debug)
if self.simh.test_result(reply, "LOAD OS8", self.replies["build"], "build_subcomm 5") == False:
print("No prompt for LOAD OS/8 in BUILD command within BUILD at line " + \
str(self.line_ct_stack[0]) + ".")
print("Fatal error. Aborting script.")
return "die"
# Send it and confirm prompted for CD binary.
reply = self.simh.os8_cmd (kbm_arg, self.replies_rex["build"], debug=self.debug)
if self.simh.test_result(reply, "LOAD CD", self.replies["build"], "build_subcomm 6") == False:
print("No prompt for LOAD CD in BUILD command within BUILD at line " + \
str(self.line_ct_stack[0]) + ".")
print("Fatal error. Aborting script.")
return "die"
# Send it and confirm we're back in BUILD command mode.
reply = self.simh.os8_cmd (cd_arg, self.replies_rex["build"], debug=self.debug)
if self.simh.test_result(reply, "BUILD Prompt", self.replies["build"], "build_subcomm 7") == False:
print ("BUILD command within build failed. Aborting script.")
return "die"
if self.debug:
print("Resume BUILD.SV command loop.")
continue
# Normal case: commands within BUILD.
comm = build_sub + " " + rest
if self.verbose:
print("Line " + str(self.line_ct_stack[0]) + ": BUILD-> " + comm)
reply = self.simh.os8_cmd (comm, self.replies_rex["build"], debug=self.debug)
# Special case "BOOT" sub-command: May ask, "WRITE ZERO DIRECT?"
# Will give a monitor prompt when done.
if build_sub == "BOOT":
if self.simh.test_result(reply, "WRITE ZERO DIRECT?", self.replies["build"], \
"", debug=self.debug):
if self.debug:
print("Got, \"WRITE ZERO DIRECT?\". Sending \"Y\".")
reply = self.simh.os8_cmd("Y", self.replies_rex["build"], debug=self.debug)
if self.simh.test_result(reply, "SYS BUILT", self.replies["build"], "", debug=self.debug):
# Successful boot. We're now at monitor level.
# Simplest to ignore anything else in the script up to "end build"
# Return success or failure based on finding that monitor prompt.
self.ignore_to_subcomm_end(os8_comm, script_file, "build")
if self.simh.os8_cfm_monitor ("build_subcomm 8"):
return "success"
else:
return "fail"
# At this point we either have a BUILD prompt or an error message.
# If it's not the BUILD prompt, we print an error.
# If the error is fatal, ignore rest of build block,
# exit back to OS/8 if necessary and return fail.
# If it's not fatal, continue running BUILD.
if self.simh.test_result(reply, "BUILD Prompt", self.replies["build"], \
"build_subcomm 9", debug=self.debug) == False:
print("BUILD error at line " + str(self.line_ct_stack[0]) + \
" with command " + self.simh._child.before.decode().strip())
if self.replies["build"][reply][2] == True:
print ("Fatal error. Ignoring rest of this block.")
self.ignore_to_subcomm_end(os8_comm, script_file, "build")
# Confirm we're back at the monitor as expected.
self.simh.os8_cfm_monitor ("build_subcomm 10")
return "fail"
# To continue, we need to ask pexpect to get that BUILD prompt before
# resuming after an error. Ignore everything until we get it.
self.simh._child.expect("\n\\$$")
print("Warning end of file encountered with no end of BUILD command block at line " + \
str(self.line_ct_stack[0]) + ".")
return "fail"
#### cdprog_subcomm ##################################################
# Cycle through OS/8 command decoder with the command specified
# in the argument.
def cdprog_subcomm (self, old_line, script_file):
os8_comm = "RU " + old_line
end_str = "cdprog " + old_line
if self.verbose:
print("Line: " + str(self.line_ct_stack[0]) + ": " + os8_comm)
reply = self.simh.os8_cmd (os8_comm, debug=self.debug)
if self.simh.os8_test_result (reply, "Command Decoder Prompt", "cdprog: " + \
os8_comm, debug=self.debug) == False:
print (" failed at Line " + str(self.line_ct_stack[0]))
print ("Ignoring the rest of this block.")
self.ignore_to_subcomm_end(os8_comm, script_file, end_str)
# Confirm we're back to monitor command level.
self.simh.os8_cfm_monitor ("cdprog OS/8 restart after failed RU command")
return "fail"
# Submit the lines from script file to the running command.
# Track whether we return to the Comnand Decoder for more or exit.
# Track whether we need to exit and ignore the rest of the block.
for line in script_file:
line = self.basic_line_parse(line, script_file)
if line == None: continue
# Test for special case, "end" and act on it if present.
m = re.match(_comm_re, line)
if m != None and m.group(1) != None and m.group(1) != "" and m.group(1) == "end":
rest = m.group(3)
retval = "fail" # Return fail unless proven successful.
if rest == None or rest == "":
print("Warning! end statement encountered inside cdprog " + \
"with no argument at line " + \
str(self.line_ct_stack[0]) + ".")
print("Expecting: {" + end_str + "}.")
elif rest != end_str:
print("Warning! Mismatched begin/end blocks in cdprog at line " + \
str(self.line_ct_stack[0]) + ".\n")
print("Expecting: {" + end_str + "}. Got: {" + rest + "}.\n")
else:
retval = "success"
if self.verbose:
print("Line " + str(self.line_ct_stack[0]) + ": end " + end_str)
if retval == "fail":
print("Exiting cdprog, possibly earlier than expected at line " + \
str(self.line_ct_stack[0]) + ".")
# Return to Monitor
reply = self.simh.os8_escape (caller="Sending escape failed. Trying ^c")
self.simh.os8_ctrl_c ("cdprog: ^c failed. State machine will be confused.")
return retval
# We could do some basic OS/8 command decoder synax checking here.
if self.verbose:
print("Line: " + str(self.line_ct_stack[0]) + ": * " + line)
print("Sending...")
reply = self.simh.os8_cmd (line)
if self.debug:
print ("cdprog sent line. Got reply: " + str(reply) + " -> " + \
self.simh._os8_replies [reply][0])
# Transcribe our output if desired.
if "transcript" in self.options_enabled:
print (self.simh._child.before.decode().strip())
mon = self.simh.os8_test_result(reply, "Monitor Prompt", "")
cd = self.simh.os8_test_result(reply, "Command Decoder Prompt", "")
if mon:
# Case of command running to completion and returning to OS/8
# Clean up the parse to the end of the block and return success.
self.ignore_to_subcomm_end(os8_comm, script_file, end_str)
return "success"
elif not cd:
# We didn't return to command decoder. That means we have an error.
# Test the fatal bit in the error structure.
# IF True: we returned to OS/8. Say we're ignoring the block and return failure.
# Otherwise we should continue.
if self.simh._os8_replies[reply][2] == True:
print ("Fatal error. Ignoring rest of this block.")
self.ignore_to_subcomm_end(os8_comm, script_file, end_str)
# Print error message.
self.simh.os8_test_result(reply, "Command Decoder Prompt", "cdprog failure")
# Confirm we're back at Monitor
self.simh.os8_cfm_monitor ("cdprog error handler return to monitor fail")
return "fail"
else:
print ("Non-fatal error encountered with: " + line)
print ("\t" + self.simh._os8_replies[reply][0])
# To continue, we need to ask pexpect to get that Command Decoder
# prompt before resuming after an error. Ignore everything until we get it.
self.simh._child.expect("\n\\*$")
print("Warning end of file encountered at line " + \
str(self.line_ct_stack[0]) + \
" with no end of cdprog command block.")
# Exit command back to OS/8 monitor.
mon = self.simh.os8_escape ("")
if mon: return "fail"
else:
print ("cdprog: Escape from Command Decoder failed. Sending ^c")
mon = self.simh.os8_ctrl_c ("")
if mon: return "fail"
else:
print ("Failed to get to monitor with ^c. Killing script.")
return "die"
#### check_exists ####################################################
# Check existence of all files needed
def check_exists (s, image_copyins):
for copyin in image_copyins:
image = copyin[1]
image_path = dirs.os8mi + image
if (not os.path.isfile(image_path)):
print("Required file: " + image_path + " not found.")
return "die"
# else: print("Found " + image_path)