Source code for pyacq.core.rpc.processspawner

# -*- coding: utf-8 -*-
# Copyright (c) 2016, French National Center for Scientific Research (CNRS)
# Distributed under the (new) BSD License. See LICENSE for more info.

import sys
import os
import json
import subprocess
import atexit
import zmq
import logging
import threading
import time
from pyqtgraph.Qt import QtCore

from .client import RPCClient
from .log import get_logger_address, LogSender


logger = logging.getLogger(__name__)


[docs]class ProcessSpawner(object): """Utility for spawning and bootstrapping a new process with an :class:`RPCServer`. Automatically creates an :class:`RPCClient` that is connected to the remote process (``spawner.client``). Parameters ---------- name : str | None Optional process name that will be assigned to all remote log records. address : str ZMQ socket address that the new process's RPCServer will bind to. Default is ``'tcp://127.0.0.1:*'``. **Note:** binding RPCServer to a public IP address is a potential security hazard (see :class:`RPCServer`). qt : bool If True, then start a Qt application in the remote process, and use a :class:`QtRPCServer`. log_addr : str Optional log server address to which the new process will send its log records. This will also cause the new process's stdout and stderr to be captured and forwarded as log records. log_level : int Optional initial log level to assign to the root logger in the new process. executable : str | None Optional python executable to invoke. The default value is `sys.executable`. Examples -------- :: # start a new process proc = ProcessSpawner() # ask the child process to do some work mod = proc._import('my.module') mod.do_work() # close the child process proc.close() proc.wait() """ def __init__(self, name=None, address="tcp://127.0.0.1:*", qt=False, log_addr=None, log_level=None, executable=None): #logger.warn("Spawning process: %s %s %s", name, log_addr, log_level) assert qt in (True, False) assert isinstance(address, (str, bytes)) assert name is None or isinstance(name, str) assert log_addr is None or isinstance(log_addr, (str, bytes)), "log_addr must be str or None; got %r" % log_addr if log_addr is None: log_addr = get_logger_address() assert log_level is None or isinstance(log_level, int) if log_level is None: log_level = logger.getEffectiveLevel() self.qt = qt self.name = name # temporary socket to allow the remote process to report its status. bootstrap_addr = 'tcp://127.0.0.1:*' bootstrap_sock = zmq.Context.instance().socket(zmq.PAIR) bootstrap_sock.setsockopt(zmq.RCVTIMEO, 10000) bootstrap_sock.bind(bootstrap_addr) bootstrap_sock.linger = 1000 # don't let socket deadlock when exiting bootstrap_addr = bootstrap_sock.last_endpoint # Spawn new process class_name = 'QtRPCServer' if qt else 'RPCServer' args = {'address': address} bootstrap_conf = dict( class_name=class_name, args=args, bootstrap_addr=bootstrap_addr.decode(), loglevel=log_level, logaddr=log_addr.decode() if log_addr is not None else None, qt=qt, ) if executable is None: executable = sys.executable cmd = (executable, '-m', 'pyacq.core.rpc.bootstrap') if name is not None: cmd = cmd + (name,) if log_addr is not None: # start process with stdout/stderr piped self.proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE, stdout=subprocess.PIPE) self.proc.stdin.write(json.dumps(bootstrap_conf).encode()) self.proc.stdin.close() # create a logger for handling stdout/stderr and forwarding to log server self.logger = logging.getLogger(__name__ + '.' + str(id(self))) self.logger.propagate = False self.log_handler = LogSender(log_addr, self.logger) if log_level is not None: self.logger.level = log_level # create threads to poll stdout/stderr and generate / send log records self.stdout_poller = PipePoller(self.proc.stdout, self.logger.info, '[%s.stdout] '%name) self.stderr_poller = PipePoller(self.proc.stderr, self.logger.warn, '[%s.stderr] '%name) else: # don't intercept stdout/stderr self.proc = subprocess.Popen(cmd, stdin=subprocess.PIPE) self.proc.stdin.write(json.dumps(bootstrap_conf).encode()) self.proc.stdin.close() logger.info("Spawned process: %d", self.proc.pid) # Receive status information (especially the final RPC address) try: status = bootstrap_sock.recv_json() except zmq.error.Again: raise TimeoutError("Timed out waiting for response from spawned process.") logger.debug("recv status %s", status) bootstrap_sock.send(b'OK') bootstrap_sock.close() if 'address' in status: self.address = status['address'] #: An RPCClient instance that is connected to the RPCServer in the remote process self.client = RPCClient(self.address.encode()) else: err = ''.join(status['error']) self.kill() raise RuntimeError("Error while spawning process:\n%s" % err) # Automatically shut down process when we exit. atexit.register(self.stop)
[docs] def wait(self, timeout=10): """Wait for the process to exit and return its return code. """ # Using proc.wait() can deadlock; use communicate() instead. # see: https://docs.python.org/2/library/subprocess.html#subprocess.Popen.wait try: self.proc.communicate() except (AttributeError, ValueError): # Python bug: http://bugs.python.org/issue30203 # Calling communicate on process with closed i/o can generate # exceptions. pass start = time.time() sleep = 1e-3 while True: rcode = self.proc.poll() if rcode is not None: return rcode if time.time() - start > timeout: raise TimeoutError("Timed out waiting on process exit for %s" % self.name) time.sleep(sleep) sleep = min(sleep*2, 100e-3)
[docs] def kill(self): """Kill the spawned process immediately. """ if self.proc.poll() is not None: return logger.info("Kill process: %d", self.proc.pid) self.proc.kill() self.wait()
[docs] def stop(self): """Stop the spawned process by asking its RPC server to close. """ if self.proc.poll() is not None: return logger.info("Close process: %d", self.proc.pid) closed = self.client.close_server() assert closed is True, "Server refused to close. (reply: %s)" % closed self.wait()
[docs] def poll(self): """Return the spawned process's return code, or None if it has not exited yet. """ return self.proc.poll()
class PipePoller(threading.Thread): def __init__(self, pipe, callback, prefix): threading.Thread.__init__(self, daemon=True) self.pipe = pipe self.callback = callback self.prefix = prefix self.start() def run(self): callback = self.callback prefix = self.prefix pipe = self.pipe while True: line = pipe.readline().decode() if line == '': break callback(prefix + line[:-1])