Module pyhamilton.interface

Classes and utilities for automatic connection to a Hamilton robot.

Classes and utilities for automatic connection to a Hamilton robot.

import time, json, signal, os, requests, string, logging, subprocess, win32gui, win32con
from http import server
from threading import Thread
from multiprocessing import Process
from pyhamilton import OEM_RUN_EXE_PATH, OEM_HSL_PATH
from .oemerr import * #TODO: specify
from .defaultcmds import defaults_by_cmd
class HamiltonCmdTemplate:
    Formatter object to create valid `pyhamilton` command dicts.

    Use of this class to assemble JSON pyhamilton commands enables keyword access to command attributes, which cuts down on string literals. It also helps to fail malformed commands early, before they are sent.
    Several default `HamiltonCmdTemplate`s are defined in `pyhamilton.defaultcmds`, such as `INITIALIZE`, `ASPIRATE`, and `DISPENSE`. Casual users will most likely never need to manually instantiate a HamiltonCmdTemplate.
    def unique_id():
        """Return a "uniqe" hexadecimal string (`'0x...'`) based on time of call."""
        return hex(int((time.time()%3600e4)*1e6))

    def __init__(self, cmd_name, params_list):
        Creates a `HamiltonCmdTemplate` with a command name and required parameters.

        The command name must be one of the command names accepted by the
        `pyhamilton` interpreter and a list of expected parameters for this command.

          cmd_name (str): One of the set of string literals recognized as command names
            by the `pyhamilton` interpreter, e.g. `'mph96Dispense'`. See `pyhamilton.defaultcmds` for examples.
          params_list (list): exact list of string parameters that must have associated
            values for the command to be valid, other than those that are always present
            (`'command'` and `'id'`)
        self.cmd_name = cmd_name
        self.params_list = params_list
        if cmd_name in defaults_by_cmd:
            const_name, default_dict = defaults_by_cmd[cmd_name]
            self.defaults = {k:v for k, v in default_dict.items() if v is not None}
            self.defaults = {}

    def assemble_cmd(self, *args, **kwargs):
        Use keyword args to assemble this command. Default values auto-filled.

          kwargs (dict): map of any parameters (str) to values that should be different
            from the defaults supplied for this command in `pyhamilton.defaultcmds`
        if args:
            raise ValueError('assemble_cmd can only take keyword arguments.')
        assembled_cmd = {'command':self.cmd_name, 'id':HamiltonCmdTemplate.unique_id()}
        return assembled_cmd

    def assert_valid_cmd(self, cmd_dict):
        """Validate a finished command. Do nothing if it is valid.

        `ValueError` will be raised if the supplied command did not have all required
        parameters for this command, as well as values for keys `'id'` and `'command'`, which
        are always required.

          cmd_dict (dict): A fully assembled `pyhamilton` command

          ValueError: The command dict is not ready to send. Specifics of mismatch
            summarized in exception description.
        prefix = 'Assert valid command "' + self.cmd_name + '" failed: '
        if 'id' not in cmd_dict:
            raise ValueError(prefix + 'no key "id"')
        if 'command' not in cmd_dict:
            raise ValueError(prefix + 'no key "command"')
        if cmd_dict['command'] != self.cmd_name:
            raise ValueError(prefix + 'command name "' + cmd_dict['command'] + '" does not match')
        needs = set(['command', 'id'])
        givens = set(cmd_dict.keys())
        if givens != needs:
            prints = [prefix + 'template parameter keys (left) do not match given keys (right)\n']
            q_mark = ' (?)  '
            l_col_space = 4
            r_col_space = max((len(key) for key in needs)) + len(q_mark) + 1
            needs_l = sorted(list(needs))
            givens_l = sorted(list(givens))
            while needs_l or givens_l:
                if needs_l:
                    lval = needs_l.pop(0)
                    if lval not in givens:
                        lval = q_mark + lval
                    lval = ''
                if givens_l:
                    rval = givens_l.pop(0)
                    if rval not in needs:
                        rval = q_mark + rval
                    rval = ''
                prints.append(' '*l_col_space + lval + ' '*(r_col_space - len(lval)) + rval)
            raise ValueError('\n'.join(prints))

_builtin_templates_by_cmd = {}

for cmd in defaults_by_cmd:
    const_name, default_dict = defaults_by_cmd[cmd]
    const_template = HamiltonCmdTemplate(cmd, list(default_dict.keys()))
    globals()[const_name] = const_template
    _builtin_templates_by_cmd[cmd] = const_template

def _make_new_hamilton_serv_handler(resp_indexing_fn):
    """Make HTTP request handler to aggregate responses according to an index function.

    A new class is defined each time, bound to a specific indexing function, to keep it
    agnostic to any particular indexing scheme. In practice, the current implementation
    uses the value of the key 'id'; that is the scheme for the `pyhamilton` interpreter.

      indexed_responses (dict): aggregated responses received by this handler, keyed by
        the values returned by `resp_indexing_fn`.

      resp_indexing_fn (Callable[[str], Hashable]): Called on every response body (str)
        to extract a hashable index. Later, the response can be retrieved by this index
        from `indexed_responses`.


    class HamiltonServerHandler(server.BaseHTTPRequestHandler):
        _send_queue = []
        indexed_responses = {}
        indexing_fn = resp_indexing_fn

        def send_str(cmd_str):
            if not isinstance(cmd_str, b''.__class__):
                if isinstance(cmd_str, ''.__class__):
                    cmd_str = cmd_str.encode()
                    raise ValueError('send_command can only send strings, not ' + str(cmd_str))

        def has_queued_cmds():
            return bool(HamiltonServerHandler._send_queue)

        def pop_response(idx):
            ir = HamiltonServerHandler.indexed_responses
            if idx not in ir:
                raise KeyError('No response received with index ' + str(idx))
            return ir.pop(idx).decode()

        def _set_headers(self):
            self.send_header('Content-type', 'text/HTML')

        def do_GET(self):
            sq = HamiltonServerHandler._send_queue
            response_to_send = sq.pop(0) if sq else b''

        def do_HEAD(self):

        def do_POST(self):
            content_len = int(self.headers.get('content-length', 0))
            post_body =
            ir = HamiltonServerHandler.indexed_responses
            index = HamiltonServerHandler.indexing_fn(post_body)
            if index is None:
            ir[index] = post_body

        def log_message(self, *args, **kwargs):

    return HamiltonServerHandler

def run_hamilton_process():
    """Start the interpreter in a separate python process.

    Starts the pyhamilton interpreter, which is an HSL file to be passed to the
    RunHSLExecutor.exe executable from Hamilton. This should always be done in a
    separate python process using the subprocess module, not a Thread.
    import clr
    from pyhamilton import OEM_STAR_PATH, OEM_HSL_PATH
    clr.AddReference(os.path.join(OEM_STAR_PATH, 'RunHSLExecutor'))
    clr.AddReference(os.path.join(OEM_STAR_PATH, 'HSLHttp'))
        from RunHSLExecutor import Class1
    except ModuleNotFoundError:
        raise RuntimeError('RunHSLExecutor DLLs successfully located, but an internal '
                           'error prevented import as a CLR module. You might be '
                           'missing the standard Hamilton software suite HSL '
                           'executables, their DLLs may not be registered with Windows, '
                           'or they may not be located in the expected system '
    C = Class1()
        while True:
            pass # Send external signal to end process

_block_numfield = 'Num'
_block_mainerrfield = 'MainErr'
BLOCK_FIELDS = _block_numfield, _block_mainerrfield, 'SlaveErr', 'RecoveryBtnId', 'StepData', 'LabwareName', 'LabwarePos'
_block_field_types = int, int, int, int, str, str, str

class HamiltonInterface:
    """Main class to automatically set up and tear down an interface to a Hamilton robot.

    HamiltonInterface is the primary class offered by this module. It creates a Hamilton
    HSL background process running the `pyhamilton` interpreter, along with a `localhost`
    connection to act as a bridge. It is recommended to create a `HamiltonInterface` using
    a `with:` block to ensure proper startup and shutdown of its async components, even if
    exceptions are raised. It may be used with explicit `start()` and `stop()` calls.

      Typical usage:
      with HamiltonInterface() as ham_int:
          cmd_id = ham_int.send_command(INITIALIZE)

    known_templates = _builtin_templates_by_cmd
    default_port = 3221
    default_address = '' # localhost

    class HamiltonServerThread(Thread):
        """Private threaded local HTTP server with graceful shutdown flag."""

        def __init__(self, address, port):
            self.server_address = (address, port)
            self.should_continue = True
            self.exited = False
            def index_on_resp_id(response_str):
                    response = json.loads(response_str)
                    if 'id' in response:
                        return response['id']
                    return None
                except json.decoder.JSONDecodeError:
                    return None
            self.server_handler_class = _make_new_hamilton_serv_handler(index_on_resp_id)
            self.httpd = None

        def run(self):
            self.exited = False
            self.httpd = server.HTTPServer(self.server_address, self.server_handler_class)
            while self.should_continue:
            self.exited = True

        def disconnect(self):
            self.should_continue = False

        def has_exited(self):
            return self.exited

    def __init__(self, address=None, port=None, simulate=False):
        self.address = HamiltonInterface.default_address if address is None else address
        self.port = HamiltonInterface.default_port if port is None else port
        self.simulate = simulate
        self.server_thread = None
        self.oem_process = None = False
        self.logger = None
        self.log_queue = []

    def start(self):
        """Starts the extra processes, threads, and servers for the Hamilton connection.

        Launches: 1) the pyhamilton interpreter using the Hamilton Run Control
        executable, either in the background for normal use, or in the foreground with a
        GUI for simulation; 2) a local HTTP server to ferry messages between the python
        module and the interpreter.

        When used with a `with:` block, called automatically upon entering the block.

        self.log('starting a Hamilton interface')
        if self.simulate:
            sim_window_handle = None
                sim_window_handle = win32gui.FindWindow(None, 'Hamilton Run Control - ' + os.path.basename(OEM_HSL_PATH))
            except win32gui.error:
            if sim_window_handle:
                    win32gui.SendMessage(sim_window_handle, win32con.WM_CLOSE, 0, 0)
                    os.system('taskkill /f /im HxRun.exe')
                except win32gui.error:
                    self.log_and_raise(OSError('Simulator already open'))
            subprocess.Popen([OEM_RUN_EXE_PATH, OEM_HSL_PATH])
            self.log('started the oem application for simulation')
            self.oem_process = Process(target=run_hamilton_process, args=())
            self.log('started the oem process')
        self.server_thread = HamiltonInterface.HamiltonServerThread(self.address, self.port)
        self.log('started the server thread') = True

    def stop(self):
        """Stop this HamiltonInterface and clean up associated async processes.

        Kills the pyhamilton interpreter subprocess and executable and stops the local
        web server thread.

        When used with a `with` block, called automatically on exiting the block.
        if not
            if self.simulate:
                self.log('sending end run command to simulator')
                    self.wait_on_response(self.send_command(command='end', id=hex(0)), timeout=1.5)
                except HamiltonTimeoutError:
                for i in range(2):
                        os.kill(, signal.SIGTERM)
                        self.log('sent sigterm to oem process')
                        self.log('oem process exited')
                    except PermissionError:
                        self.log('permission denied, trying again...', 'warn')
                    self.log('Could not kill oem process, moving on with shutdown', 'warn')
   = False
            self.log('disconnected from server')
            if not self.server_thread.has_exited():
                self.log('server did not exit yet, sending dummy request to exit its loop')
                session = requests.Session()
                adapter = requests.adapters.HTTPAdapter(max_retries=20)
                session.mount('http://', adapter)
                session.get('http://' + HamiltonInterface.default_address + ':' + str(HamiltonInterface.default_port))
                self.log('dummy get request sent to server')
            self.log('server thread exited')

    def __enter__(self):
        return self

    def __exit__(self, type, value, tb):

    def is_open(self):
        """Return `True` if the HamiltonInterface has been started and not stopped."""

    def send_command(self, template=None, block_until_sent=False, *args, **cmd_dict): # returns unique id of command
        """Add a command templated after HamiltonCmdTemplate to the server send queue.

          template (HamiltonCmdTemplate): Optional; a template to provide default
            arguments not specified in `cmd_dict`. 
          block_until_sent (bool): Optional; if `True`, wait for all queued messages,
            including this one, to get picked up by the local server and sent across
            the HTTP connection, before returning. Default is False.
          cmd_dict (dict): keyword arguments to be forwarded to `template` when building
            the command, overriding its defaults. If `template` not given, cmd_dict must
            either have a 'command' key with value matching one of the command names in
            `defaultcmds` and might be missing an 'id' key, or itself be a fully formed
            and correct pyhamilton command with its own 'id' key.

          unique id (str) of the command that can be used to index it later, either
            newly generated or same as originally present in cmd_dict.
        if not self.is_open():
            self.log_and_raise(RuntimeError('Cannot send a command from a closed HamiltonInterface'))
        if template is None:
            if 'command' not in cmd_dict:
                self.log_and_raise(ValueError('Command dicts from HamiltonInterface must have a \'command\' key'))
            cmd_name = cmd_dict['command']
            if cmd_name in HamiltonInterface.known_templates:
                # raises if this is a known command but some fields in cmd_dict are invalid
                send_cmd_dict = HamiltonInterface.known_templates[cmd_name].assemble_cmd(**cmd_dict)
                send_cmd_dict = cmd_dict
            send_cmd_dict = template.assemble_cmd(**cmd_dict)
        if 'id' not in send_cmd_dict:
            self.log_and_raise(ValueError("Command dicts sent from HamiltonInterface must have a unique id with key 'id'"))
        if block_until_sent:
        return send_cmd_dict['id']

    def wait_on_response(self, id, timeout=0, raise_first_exception=False):
        """Wait and do not return until the response for the specified id comes back.

        When the command corresponding to `id` regards multiple distinct pipette channels
        or devices, responses may contain encoded errors that might be different for
        different channels or devices. For this reason, the default behavior of
        `wait_on_response` is to not raise exceptions, but to delegate handling
        exceptions to the caller. For convenience, this method can optionally raise the
        first exception it encounters, often a useful behavior for succinct scripted
        commands that regard only one device, when raise_first_exception is `True`.

          id (str): The unique id of a previously sent command
          timeout (float): Optional; maximum time in seconds to wait before raising
            `HamiltonTimeoutError`. Default is no timeout (forever).
          raise_first_exception: Optional; if True, may raise if there is an error
            encoded in the response. Default is False.

          The response dict from the hamilton interpreter.

          `HamiltonTimeoutError`: after `timeout` seconds elapse with no response, if
          `timeout` was specified.

        if timeout:
            start_time = time.time()
            start_time = float('inf')

        response_tup = None
        while time.time() - start_time < timeout:
                response_tup = self.pop_response(id, raise_first_exception)
            except KeyError:
            if response_tup is not None:
                return response_tup
        self.log_and_raise(HamiltonTimeoutError('Timed out after ' + str(timeout) + ' sec while waiting for response id ' + str(id)))

    def pop_response(self, id, raise_first_exception=False):
        """Remove and return the response with the specified id from the response queue.

        If there is a response, remove it and return the Hamilton-formatted response
        dict, like that returned from `HamiltonInterface.parse_hamilton_return`. Otherwise, raise

          id (str): Unique id of the command that initiated the response
          raise_first_exception (bool): Optional; forwarded to `wait_on_response`.
            Default is `False`.

          A 2-tuple:

            1. parsed response block dict from Hamilton as in `parse_hamilton_return`
            2. Error map, a dict mapping int keys (data block Num field) that had
                exceptions, if any, to an exception that was coded in block;
                `None` to any error not associated with a block. `{}` if no error

          KeyError: if `id` has no matching response in the queue.

            response = self.server_thread.server_handler_class.pop_response(id)
        except KeyError:
            raise KeyError('No Hamilton interface response indexed for id ' + str(id))
        errflag, blocks = self.parse_hamilton_return(response)
        err_map = {}
        if errflag:
            for blocknum in sorted(blocks.keys()):
                errcode = blocks[blocknum][_block_mainerrfield]
                if errcode != 0:
                    self.log('Exception encoded in Hamilton return.', 'warn')
                        decoded_exception = HAMILTON_ERROR_MAP[errcode]()
                    except KeyError:
                        self.log_and_raise(InvalidErrCodeError('Response returned had an unknown error code: ' + str(errcode)))
                    self.log('Exception: ' + repr(decoded_exception), 'warn')
                    if raise_first_exception:
                        self.log('Raising first exception.', 'warn')
                        raise decoded_exception
                    err_map[blocknum] = decoded_exception
                unknown_exc = HamiltonStepError('Hamilton step did not execute correctly; no error code given.')
                err_map[None] = unknown_exc
                if raise_first_exception:
                    self.log('Raising first exception; exception has no error code.', 'warn')
                    raise unknown_exc
        return blocks, err_map

    def _block_until_sq_clear(self):
        while HamiltonServerHandler.has_queued_cmds():

    def parse_hamilton_return(self, return_str):
        Return a 2-tuple:
        - [0] errflag: any error code present in response
        - [1] Block map: dict mapping int keys to dicts with str keys (MainErr, SlaveErr, RecoveryBtnId, StepData, LabwareName, LabwarePos)

        Result value 3 is the field that is returned by the OEM interface.
        "Result value 3 contains one error flag (ErrFlag) and the block data package."
        ### Data Block Format Rules

        - The error flag is set once only at the beginning of result value 3. The error flag
        does not belong to the block data but may be used for a simpler error recovery.
        If this flag is set, an error code has been set in any of the block data entries.

        - Each block data package starts with the opening square bracket character '['

        - The information within the block data package is separated by the comma delimiter ','

        - Block data information may be empty; anyway a comma delimiter is set.

        - The result value may contain more than one block data package.

        - Block data packages are returned independent of Num value ( unsorted ).

        ### Block data information

        - Num 
            - Step depended information (e.g. the channel number, a loading position etc.).
            - Note: The meaning and data type for this information is described in the corresponding help of single step.

        - MainErr
            - Main error code which occurred on instrument.

        - SlaveErr
            - Detailed error code of depended slave (e.g. auto load, washer etc.).

        - RecoveryBtnId
            - Recovery which has been used to handle this error.

        - StepData
            - Step depended information, e.g. the barcode read, the volume aspirated etc.

            - Note: The meaning and data type for this information is described in the corresponding help of single step.

        - LabwareName
            - Labware name of used labware.

        - LabwarePos
            - Used labware position.

        def raise_parse_error():
            msg = 'Could not parse response ' + repr(return_str)
            self.log(msg, 'error')
            raise HamiltonReturnParseError(msg)

            block_data_str = str(json.loads(return_str)['step-return1'])
        except KeyError:
        blocks = block_data_str.split('[')
            errflag = int(blocks.pop(0)) != 0
        except ValueError:

        blocks_by_blocknum = {}
        any_error_code = False
        for block_str in blocks:
            field_vals = block_str.split(',')
            if len(field_vals) != len(BLOCK_FIELDS):
                block_contents = {field:cast(val) for field, cast, val in zip(BLOCK_FIELDS, _block_field_types, field_vals)}
            except ValueError:
            if block_contents[_block_mainerrfield] != 0:
                any_error_code = True
            blocks_by_blocknum[block_contents.pop(_block_numfield)] = block_contents
        if blocks and errflag != any_error_code:

        return errflag, blocks_by_blocknum

    def set_log_dir(self, log_dir):
        self.logger = logging.getLogger(__name__)
        hdlr = logging.FileHandler(log_dir)
        formatter = logging.Formatter('[%(asctime)s] %(name)s %(levelname)s %(message)s')
    def log(self, msg, msg_type='info'):
        self.log_queue.append((msg, msg_type))

    def _dump_log_queue(self):
        if self.logger is None:
        log_actions = {'error':self.logger.error,
        while self.log_queue:
            msg, msg_type = self.log_queue.pop(0)
            log_actions.get(msg_type.lower(), # prints if no log path set

    def log_and_raise(self, err):
        self.log(repr(err), 'error')
        raise err


