Module pyhamilton.util
Expand source code
class ChannelHead:
def __init__(self):
pass
def pack_moves(self, moves):
pass # return (binary channel pattern string, remaining moves)
def can_move_simul(move1, move2): # both arguments are 4-tuples, (tips, ml, source, dest)
tips, mls, sources, dests = zip(move1, move2)
move_components = zip(sources, tips, dests)
xy_deltas = (None, None, None)
for i, (pos1, pos2) in enumerate(zip(*move_components)):
resource = pos1.parent_resource
if resource is not pos2.parent_resource:
return False
dx, dy, constraints = resource.alignment_delta(pos1, pos2)
if not all(c in constraints for c in self.alignment_constraints):
return False
alignment_deltas[i] = (dx, dy)
ad_source, ad_tips, ad_dests = alignment_deltas
return ad_source == ad_tips == ad_dests
class Independent8Channel(ChannelHead):
def __init__(self):
self.alignment_constraints = [DeckResource.align.VERTICAL]
class Standard96Channel(ChannelHead):
def __init__(self):
self.alignment_constraints = [DeckResource.align.STD_96]
class HamiltonDevice:
# Just a structure with some guaranteed pieces
def __init__(self, interface, heads):
if not (isinstance(interface, HamiltonInterface) and all((isinstance(h, ChannelHead) for h in heads))
and isinstance(resource_mgr, LayoutManager)):
raise TypeError("HamiltonDevice instantiated with wrong types") # todo maybe more informative
interface.bind_device(self)
layout_mgr.bind_device(self)
self.heads = heads
for head in heads:
head.bind_device(self)
class HamiltonAction:
# Defined as a robot action that can be accomplished by some finite sequence of sent OEM commands.
# HamiltonActions are nestable, and each implements execute() which might call child execute()s.
# HamiltonActions are either instantiated with a particular HamiltonDevice or are implicitly tied to one
# execute() is responsible for doing its own error handling.
# Base-level actions like aspirate and dispense interpret the responses and errors from the robot device
# A possible() method is implemented to facilitate search over HamiltonActions.
def possible(self):
return True
def execute(self):
raise NotImplementedError()
class GroupableAction(HamiltonAction):
pass
class TipPickup(GroupableAction):
def __init__(self, tip):
self.tip = tip
def execute(self):
id = hammy.send_command(PICKUP, {'labwarePositions':str(tip.parent_resource.layout_name()) + ', ' + str(tip.index) + ';'})
response = hammy.wait_on_response(id)
class Transfer(GroupableAction):
def __init__(self, tip, ml, source, dest):
self.tip = tip
self.ml = ml
self.source = source
self.dest = dest
self.action_type = HamiltonAction.MOVE
def execute(self):
self.pickup.execute()
self.aspirate.execute()
self.dispense.execute()
def __iter__(self):
return iter((self.tip, self.ml, self.source, self.dest))
class FlyTransfer(Transfer):
def __init__(self, tip, ml, source, dest):
self.tip = tip
self.ml = ml
self.source = source
self.dest = dest
self.action_type = HamiltonAction.MOVE
class Mix(HamiltonAction):
def __init__(self, tip, ml, target, times):
self.params = (tip, ml, target, times)
self.action_type = HamiltonAction.MIX
class HamiltonCoordinator:
def __init__(self, hamilton_interface, channel_heads):
if not (isinstance(hamilton_interface, HamiltonInterface) and hamilton_interface.is_open()):
raise ValueError('Coodinator can only start with an open HamiltonInterface')
self.queued_actions = []
self.heads = channel_heads
hamilton_interface.send_command(None, INITIALIZE, block=True)
print('Initialized Hamilton')
def stage(self, action):
if not isinstance(action, HamiltonAction):
raise TypeError('Coordinator can only stage or execute HamiltonActions')
self.queued_actions.append
def execute(self, specific_actions=None):
if not self.hamilton_interface.is_open():
raise ValueError('Coodinator can only execute commands with an open HamiltonInterface')
if specific_actions is None:
work_list = self.queued_actions[:]
else:
try:
work_list = iter(specific_actions)
except TypeError:
work_list = [specific_actions]
if not work_list:
return
while work_list:
action = work_list.pop(0)
if not isinstance(action, HamiltonAction):
raise TypeError('Coordinator can only stage or execute HamiltonActions')
if not isinstance(action, GroupableAction):
action.execute()
continue
to_move = tuple([] for h in self.heads)
for hi, head in enumerate(self.heads):
for move in work_list:
if move.source is first_move.source or move.dest is first_move.dest:
continue
if head.can_move_simul(first_move, move):
to_move[hi].append(move)
def wait_for_all():
self.hamilton_interface.block_until_clear()
Classes
class ChannelHead
-
Expand source code
class ChannelHead: def __init__(self): pass def pack_moves(self, moves): pass # return (binary channel pattern string, remaining moves) def can_move_simul(move1, move2): # both arguments are 4-tuples, (tips, ml, source, dest) tips, mls, sources, dests = zip(move1, move2) move_components = zip(sources, tips, dests) xy_deltas = (None, None, None) for i, (pos1, pos2) in enumerate(zip(*move_components)): resource = pos1.parent_resource if resource is not pos2.parent_resource: return False dx, dy, constraints = resource.alignment_delta(pos1, pos2) if not all(c in constraints for c in self.alignment_constraints): return False alignment_deltas[i] = (dx, dy) ad_source, ad_tips, ad_dests = alignment_deltas return ad_source == ad_tips == ad_dests
Subclasses
Methods
def can_move_simul(move1, move2)
-
Expand source code
def can_move_simul(move1, move2): # both arguments are 4-tuples, (tips, ml, source, dest) tips, mls, sources, dests = zip(move1, move2) move_components = zip(sources, tips, dests) xy_deltas = (None, None, None) for i, (pos1, pos2) in enumerate(zip(*move_components)): resource = pos1.parent_resource if resource is not pos2.parent_resource: return False dx, dy, constraints = resource.alignment_delta(pos1, pos2) if not all(c in constraints for c in self.alignment_constraints): return False alignment_deltas[i] = (dx, dy) ad_source, ad_tips, ad_dests = alignment_deltas return ad_source == ad_tips == ad_dests
def pack_moves(self, moves)
-
Expand source code
def pack_moves(self, moves): pass # return (binary channel pattern string, remaining moves)
class FlyTransfer (tip, ml, source, dest)
-
Expand source code
class FlyTransfer(Transfer): def __init__(self, tip, ml, source, dest): self.tip = tip self.ml = ml self.source = source self.dest = dest self.action_type = HamiltonAction.MOVE
Ancestors
class GroupableAction
-
Expand source code
class GroupableAction(HamiltonAction): pass
Ancestors
Subclasses
class HamiltonAction
-
Expand source code
class HamiltonAction: # Defined as a robot action that can be accomplished by some finite sequence of sent OEM commands. # HamiltonActions are nestable, and each implements execute() which might call child execute()s. # HamiltonActions are either instantiated with a particular HamiltonDevice or are implicitly tied to one # execute() is responsible for doing its own error handling. # Base-level actions like aspirate and dispense interpret the responses and errors from the robot device # A possible() method is implemented to facilitate search over HamiltonActions. def possible(self): return True def execute(self): raise NotImplementedError()
Subclasses
Methods
def execute(self)
-
Expand source code
def execute(self): raise NotImplementedError()
def possible(self)
-
Expand source code
def possible(self): return True
class HamiltonCoordinator (hamilton_interface, channel_heads)
-
Expand source code
class HamiltonCoordinator: def __init__(self, hamilton_interface, channel_heads): if not (isinstance(hamilton_interface, HamiltonInterface) and hamilton_interface.is_open()): raise ValueError('Coodinator can only start with an open HamiltonInterface') self.queued_actions = [] self.heads = channel_heads hamilton_interface.send_command(None, INITIALIZE, block=True) print('Initialized Hamilton') def stage(self, action): if not isinstance(action, HamiltonAction): raise TypeError('Coordinator can only stage or execute HamiltonActions') self.queued_actions.append def execute(self, specific_actions=None): if not self.hamilton_interface.is_open(): raise ValueError('Coodinator can only execute commands with an open HamiltonInterface') if specific_actions is None: work_list = self.queued_actions[:] else: try: work_list = iter(specific_actions) except TypeError: work_list = [specific_actions] if not work_list: return while work_list: action = work_list.pop(0) if not isinstance(action, HamiltonAction): raise TypeError('Coordinator can only stage or execute HamiltonActions') if not isinstance(action, GroupableAction): action.execute() continue to_move = tuple([] for h in self.heads) for hi, head in enumerate(self.heads): for move in work_list: if move.source is first_move.source or move.dest is first_move.dest: continue if head.can_move_simul(first_move, move): to_move[hi].append(move) def wait_for_all(): self.hamilton_interface.block_until_clear()
Methods
def execute(self, specific_actions=None)
-
Expand source code
def execute(self, specific_actions=None): if not self.hamilton_interface.is_open(): raise ValueError('Coodinator can only execute commands with an open HamiltonInterface') if specific_actions is None: work_list = self.queued_actions[:] else: try: work_list = iter(specific_actions) except TypeError: work_list = [specific_actions] if not work_list: return while work_list: action = work_list.pop(0) if not isinstance(action, HamiltonAction): raise TypeError('Coordinator can only stage or execute HamiltonActions') if not isinstance(action, GroupableAction): action.execute() continue to_move = tuple([] for h in self.heads) for hi, head in enumerate(self.heads): for move in work_list: if move.source is first_move.source or move.dest is first_move.dest: continue if head.can_move_simul(first_move, move): to_move[hi].append(move)
def stage(self, action)
-
Expand source code
def stage(self, action): if not isinstance(action, HamiltonAction): raise TypeError('Coordinator can only stage or execute HamiltonActions') self.queued_actions.append
def wait_for_all()
-
Expand source code
def wait_for_all(): self.hamilton_interface.block_until_clear()
class HamiltonDevice (interface, heads)
-
Expand source code
class HamiltonDevice: # Just a structure with some guaranteed pieces def __init__(self, interface, heads): if not (isinstance(interface, HamiltonInterface) and all((isinstance(h, ChannelHead) for h in heads)) and isinstance(resource_mgr, LayoutManager)): raise TypeError("HamiltonDevice instantiated with wrong types") # todo maybe more informative interface.bind_device(self) layout_mgr.bind_device(self) self.heads = heads for head in heads: head.bind_device(self)
class Independent8Channel
-
Expand source code
class Independent8Channel(ChannelHead): def __init__(self): self.alignment_constraints = [DeckResource.align.VERTICAL]
Ancestors
class Mix (tip, ml, target, times)
-
Expand source code
class Mix(HamiltonAction): def __init__(self, tip, ml, target, times): self.params = (tip, ml, target, times) self.action_type = HamiltonAction.MIX
Ancestors
class Standard96Channel
-
Expand source code
class Standard96Channel(ChannelHead): def __init__(self): self.alignment_constraints = [DeckResource.align.STD_96]
Ancestors
class TipPickup (tip)
-
Expand source code
class TipPickup(GroupableAction): def __init__(self, tip): self.tip = tip def execute(self): id = hammy.send_command(PICKUP, {'labwarePositions':str(tip.parent_resource.layout_name()) + ', ' + str(tip.index) + ';'}) response = hammy.wait_on_response(id)
Ancestors
Methods
def execute(self)
-
Expand source code
def execute(self): id = hammy.send_command(PICKUP, {'labwarePositions':str(tip.parent_resource.layout_name()) + ', ' + str(tip.index) + ';'}) response = hammy.wait_on_response(id)
class Transfer (tip, ml, source, dest)
-
Expand source code
class Transfer(GroupableAction): def __init__(self, tip, ml, source, dest): self.tip = tip self.ml = ml self.source = source self.dest = dest self.action_type = HamiltonAction.MOVE def execute(self): self.pickup.execute() self.aspirate.execute() self.dispense.execute() def __iter__(self): return iter((self.tip, self.ml, self.source, self.dest))
Ancestors
Subclasses
Methods
def execute(self)
-
Expand source code
def execute(self): self.pickup.execute() self.aspirate.execute() self.dispense.execute()