# -*- coding: utf-8 -*-
# Copyright (c) 2016, French National Center for Scientific Research (CNRS)
# Distributed under the (new) BSD License. See LICENSE for more info.
from .rpc import ProcessSpawner, RPCServer, RPCClient
from . import nodelist
[docs]class NodeGroup(object):
"""
NodeGroup is responsible for managing a collection of Nodes within a single
process.
NodeGroups themselves are created and destroyed by Hosts, which manage all
NodeGroups on a particular machine.
"""
def __init__(self, host, manager):
self.host = host
self.manager = manager
self.nodes = set()
[docs] def create_node(self, node_class, *args, **kwds):
"""Create a new Node and add it to this NodeGroup.
Return the new Node.
"""
assert isinstance(node_class, str)
cls = nodelist.all_nodes[node_class]
node = cls(*args, **kwds)
self.add_node(node)
return node
[docs] def list_node_types(self):
"""Return a list of the class names for all registered node types.
"""
return list(nodelist.all_nodes.keys())
[docs] def register_node_type_from_module(self, modname, classname):
"""Register a Node subclass with this NodeGroup.
This allows custom Node subclasses to be instantiated in this NodeGroup
using :func:`NodeGroup.create_node`.
"""
nodelist.register_node_type_from_module(modname, classname)
[docs] def add_node(self, node):
"""Add a Node to this NodeGroup.
"""
self.nodes.add(node)
[docs] def remove_node(self, node):
"""Remove a Node from this NodeGroup.
"""
if node.running():
raise RuntimeError("Refusing to remove Node while it is running.")
self.nodes.remove(node)
def list_nodes(self):
return list(self.nodes)
[docs] def start_all_nodes(self):
"""Call `Node.start()` for all Nodes in this group.
"""
for node in self.nodes:
node.start()
[docs] def stop_all_nodes(self):
"""Call `Node.stop()` for all Nodes in this group.
"""
for node in self.nodes:
if node.running():
node.stop()
[docs] def any_node_running(self):
"""Return True if any of the Nodes in this group are running.
"""
return any(node.running() for node in self.nodes)
def close(self):
self.stop_all_nodes()
cli = RPCServer.local_client()
cli.close_server(sync='off')