Source code for pyacq.core.nodegroup

# -*- coding: utf-8 -*-
# Copyright (c) 2016, French National Center for Scientific Research (CNRS)
# Distributed under the (new) BSD License. See LICENSE for more info.

from .rpc import ProcessSpawner, RPCServer, RPCClient
from . import nodelist


[docs]class NodeGroup(object): """ NodeGroup is responsible for managing a collection of Nodes within a single process. NodeGroups themselves are created and destroyed by Hosts, which manage all NodeGroups on a particular machine. """ def __init__(self, host, manager): self.host = host self.manager = manager self.nodes = set()
[docs] def create_node(self, node_class, *args, **kwds): """Create a new Node and add it to this NodeGroup. Return the new Node. """ assert isinstance(node_class, str) cls = nodelist.all_nodes[node_class] node = cls(*args, **kwds) self.add_node(node) return node
[docs] def list_node_types(self): """Return a list of the class names for all registered node types. """ return list(nodelist.all_nodes.keys())
[docs] def register_node_type_from_module(self, modname, classname): """Register a Node subclass with this NodeGroup. This allows custom Node subclasses to be instantiated in this NodeGroup using :func:`NodeGroup.create_node`. """ nodelist.register_node_type_from_module(modname, classname)
[docs] def add_node(self, node): """Add a Node to this NodeGroup. """ self.nodes.add(node)
[docs] def remove_node(self, node): """Remove a Node from this NodeGroup. """ if node.running(): raise RuntimeError("Refusing to remove Node while it is running.") self.nodes.remove(node)
def list_nodes(self): return list(self.nodes)
[docs] def start_all_nodes(self): """Call `Node.start()` for all Nodes in this group. """ for node in self.nodes: node.start()
[docs] def stop_all_nodes(self): """Call `Node.stop()` for all Nodes in this group. """ for node in self.nodes: if node.running(): node.stop()
[docs] def any_node_running(self): """Return True if any of the Nodes in this group are running. """ return any(node.running() for node in self.nodes)
def close(self): self.stop_all_nodes() cli = RPCServer.local_client() cli.close_server(sync='off')