Source code for pyacq.viewers.qoscilloscope

# -*- coding: utf-8 -*-
# Copyright (c) 2016, French National Center for Scientific Research (CNRS)
# Distributed under the (new) BSD License. See LICENSE for more info.

from pyqtgraph.Qt import QtCore, QtGui
import pyqtgraph as pg

import numpy as np
import weakref

from ..core import (WidgetNode, register_node_type, InputStream,
        ThreadPollInput, StreamConverter)


class MyViewBox(pg.ViewBox):
    doubleclicked = QtCore.pyqtSignal()
    gain_zoom = QtCore.pyqtSignal(float)
    xsize_zoom = QtCore.pyqtSignal(float)
    def __init__(self, *args, **kwds):
        pg.ViewBox.__init__(self, *args, **kwds)
        self.disableAutoRange()
    def mouseClickEvent(self, ev):
        ev.accept()
    def mouseDoubleClickEvent(self, ev):
        self.doubleclicked.emit()
        ev.accept()
    def mouseDragEvent(self, ev):
        ev.ignore()
    def wheelEvent(self, ev, axis=None):
        if ev.modifiers() == QtCore.Qt.ControlModifier:
            z = 10 if ev.delta()>0 else 1/10.
        else:
            z = 1.3 if ev.delta()>0 else 1/1.3
        self.gain_zoom.emit(z)
        ev.accept()
    def mouseDragEvent(self, ev):
        ev.accept()
        self.xsize_zoom.emit((ev.pos()-ev.lastPos()).x())

class BaseOscilloscope(WidgetNode):
    """
    Base Class for QOscilloscope and QOscilloscopeDigital
    
    The BaseOscilloscope requires its input stream to have the following properties:
    
    * transfermode==sharedarray
    
    If the input stream does not meet these requirements, then a StreamConverter
    is created to proxy the input. This can degrade performance when multiple
    Oscilloscopes are used to view data from the same device; in this case it is
    better to manually create single StreamConverter to provide shared input
    for all Oscilloscopes.
    """
    
    #In the code  willingly : self.input is self.inputs['signal'] because some subclass can have several inputs
    
    def __init__(self, **kargs):
        WidgetNode.__init__(self, **kargs)
        
        self.layout = QtGui.QVBoxLayout()
        self.setLayout(self.layout)
        
        self.graphicsview = pg.GraphicsView()
        self.layout.addWidget(self.graphicsview)
        
        # create graphic view and plot item
        self.viewBox = MyViewBox()
        
        self.plot = pg.PlotItem(viewBox=self.viewBox)
        self.graphicsview.setCentralItem(self.plot)
        self.plot.hideButtons()
        self.plot.showAxis('left', False)
        self.plot.showAxis('bottom', False)
        
        self.all_mean, self.all_sd = None, None
        
    def show_params_controller(self):
        self.params_controller.show()
        # TODO deal with modality
    
    def _configure(self, with_user_dialog=True, max_xsize=60.):
        self.with_user_dialog = with_user_dialog
        self.max_xsize = max_xsize
    

    def _check_nb_channel(self):
        self.nb_channel = self.inputs['signals'].params['shape'][1]
    
    def _initialize(self):
        self._check_nb_channel()
        assert len(self.inputs['signals'].params['shape']) == 2, 'Are you joking ?'
        
        self.sample_rate = self.inputs['signals'].params['sample_rate']
        buf_size = int(self.sample_rate * self.max_xsize)
        self.inputs['signals'].set_buffer(size=buf_size, axisorder=[1,0], double=True)
        #TODO : check that this not lead 
        
        # channel names
        channel_info = self.inputs['signals'].params.get('channel_info', None)
        if channel_info is None:
            self.channel_names = ['ch{}'.format(c) for c in range(self.nb_channel)]
        else:
            self.channel_names = [ch_info['name'] for ch_info in channel_info]
    
        # Create parameters
        all = []
        for i in range(self.nb_channel):
            by_chan_p = [{'name': 'label', 'type': 'str', 'value': self.channel_names[i], 'readonly':True}] + list(self._default_by_channel_params)
            all.append({'name': 'ch{}'.format(i), 'type': 'group', 'children': by_chan_p})
        self.by_channel_params = pg.parametertree.Parameter.create(name='AnalogSignals', type='group', children=all)
        self.params = pg.parametertree.Parameter.create(name='Global options',
                                                    type='group', children=self._default_params)
        self.all_params = pg.parametertree.Parameter.create(name='all param',
                                    type='group', children=[self.params, self.by_channel_params])
        self.all_params.sigTreeStateChanged.connect(self.on_param_change)
        
        
        if self.with_user_dialog and self._ControllerClass:
            self.params_controller = self._ControllerClass(parent=self, viewer=self)
            self.params_controller.setWindowFlags(QtCore.Qt.Window)
            self.viewBox.doubleclicked.connect(self.show_params_controller)
        else:
            self.params_controller = None
        
        # poller
        self.poller = ThreadPollInput(input_stream=self.inputs['signals'], return_data=None)
        self.poller.new_data.connect(self._on_new_data)
        # timer
        self._head = 0
        self.timer = QtCore.QTimer(singleShot=False, interval=100)
        self.timer.timeout.connect(self.refresh)

    def _start(self):
        self._head = 0
        self.estimate_decimate()
        self.reset_curves_data()
        self.poller.start()
        self.timer.start()
    
    def _stop(self):
        self.poller.stop()
        self.poller.wait()
        self.timer.stop()
    
    def _close(self):
        if self.running():
            self.stop()
        if self.with_user_dialog:
            self.params_controller.close()

    def _on_new_data(self, pos, data):
        self._head = pos
    
    def refresh(self):
        self._refresh()

    def reset_curves_data(self):
        xsize = self.params['xsize']
        decimate = self.params['decimate']
        #~ sr = self.input.params['sample_rate']
        self.full_size = int(xsize*self.sample_rate)
        self.small_size = self.full_size//decimate
        if self.small_size%2!=0:  # ensure for min_max decimate
            self.small_size -=1
        self.full_size = self.small_size*decimate
        self.t_vect = np.arange(0,self.small_size, dtype=float)/(self.sample_rate/decimate)
        self.t_vect -= self.t_vect[-1]
        self.curves_data = [np.zeros((self.small_size), dtype=float) for i in range(self.nb_channel)]

    def estimate_decimate(self, nb_point=4000):
        xsize = self.params['xsize']
        self.params['decimate'] = max(int(xsize*self.sample_rate)//nb_point, 1)

    def apply_xsize_zoom(self, xmove):
        factor = xmove/100.
        newsize = self.params['xsize']*(factor+1.)
        limits = self.params.param('xsize').opts['limits']
        if newsize>limits[0] and newsize<limits[1]:
            self.params['xsize'] = newsize


class OscilloscopeController(QtGui.QWidget):
    channel_visibility_changed = QtCore.pyqtSignal()
    
    def __init__(self, parent=None, viewer=None):
        QtGui.QWidget.__init__(self, parent)
        
        self._viewer = weakref.ref(viewer)
        
        # layout
        self.mainlayout = QtGui.QVBoxLayout()
        self.setLayout(self.mainlayout)
        t = 'Options for {}'.format(self.viewer.name)
        self.setWindowTitle(t)
        self.mainlayout.addWidget(QtGui.QLabel('<b>'+t+'<\b>'))
        
        h = QtGui.QHBoxLayout()
        self.mainlayout.addLayout(h)
        
        self.v1 = QtGui.QVBoxLayout()
        h.addLayout(self.v1)
        self.tree_params = pg.parametertree.ParameterTree()
        self.tree_params.setParameters(self.viewer.params, showTop=True)
        self.tree_params.header().hide()
        self.v1.addWidget(self.tree_params)

        self.tree_by_channel_params = pg.parametertree.ParameterTree()
        self.tree_by_channel_params.header().hide()
        h.addWidget(self.tree_by_channel_params)
        self.tree_by_channel_params.setParameters(self.viewer.by_channel_params, showTop=True)

        v = QtGui.QVBoxLayout()
        h.addLayout(v)
        
        self.channel_visibility_changed.connect(self.on_channel_visibility_changed)

        but = QtGui.QPushButton('Auto scale')
        v.addWidget(but)
        #~ but.clicked.connect(self.compute_rescale)
        but.clicked.connect(self.on_channel_visibility_changed)
        
        
        if self.viewer.nb_channel>1:
            v.addWidget(QtGui.QLabel('<b>Select channel...</b>'))
            names = [ '{}: {}'.format(c, name) for c, name in enumerate(self.viewer.channel_names)]
            self.qlist = QtGui.QListWidget()
            v.addWidget(self.qlist, 2)
            self.qlist.addItems(names)
            self.qlist.setSelectionMode(QtGui.QAbstractItemView.ExtendedSelection)
            self.qlist.doubleClicked.connect(self.on_double_clicked)
            
            for i in range(len(names)):
                self.qlist.item(i).setSelected(True)            
            v.addWidget(QtGui.QLabel('<b>and apply...<\b>'))
            
        # Gain and offset
        but = QtGui.QPushButton('set visble')
        v.addWidget(but)
        but.clicked.connect(self.on_set_visible)
        
    @property
    def viewer(self):
        return self._viewer()

    @property
    def selected(self):
        selected = np.ones(self.viewer.nb_channel, dtype=bool)
        if self.viewer.nb_channel>1:
            selected[:] = False
            selected[[ind.row() for ind in self.qlist.selectedIndexes()]] = True
        return selected
    
    @property
    def visible_channels(self):
        visible = [self.viewer.by_channel_params['ch{}'.format(i), 'visible'] for i in range(self.viewer.nb_channel)]
        return np.array(visible, dtype='bool')

    @property
    def gains(self):
        gains = [self.viewer.by_channel_params['ch{}'.format(i), 'gain'] for i in range(self.viewer.nb_channel)]
        return np.array(gains)

    @gains.setter
    def gains(self, val):
        for c, v in enumerate(val):
            self.viewer.by_channel_params['ch{}'.format(c), 'gain'] = v

    @property
    def offsets(self):
        offsets = [self.viewer.by_channel_params['ch{}'.format(i), 'offset'] for i in range(self.viewer.nb_channel)]
        return np.array(offsets)

    @offsets.setter
    def offsets(self, val):
        for c, v in enumerate(val):
            self.viewer.by_channel_params['ch{}'.format(c), 'offset'] = v
    
    def on_set_visible(self):
        # apply
        self.viewer.by_channel_params.blockSignals(True)
        visibles = self.selected
        for i,param in enumerate(self.viewer.by_channel_params.children()):
            param['visible'] = visibles[i]
            if visibles[i]:
                self.viewer.curves[i].show()
            else:
                self.viewer.curves[i].hide()
        self.viewer.by_channel_params.blockSignals(False)
        self.channel_visibility_changed.emit()
    
    def on_double_clicked(self, index):
        self.viewer.by_channel_params.blockSignals(True)
        for i, p in enumerate(self.viewer.by_channel_params.children()):
            p['visible'] = (i==index.row())
            if p['visible']:
                self.viewer.curves[i].show()
            else:
                self.viewer.curves[i].hide()
        self.viewer.by_channel_params.blockSignals(False)
        self.channel_visibility_changed.emit()

    def on_channel_visibility_changed(self):
        self.compute_rescale()
        self.viewer.refresh()

    def estimate_median_mad(self):
        sigs = self.viewer.get_visible_chunk()
        self.signals_med = med = np.nanmedian(sigs, axis=0)
        self.signals_mad = np.nanmedian(np.abs(sigs-med),axis=0)*1.4826
        self.signals_min = np.nanmin(sigs, axis=0)
        self.signals_max = np.nanmax(sigs, axis=0)
    
    def compute_rescale(self, spacing_factor=9.):
        scale_mode = self.viewer.params['scale_mode']
        
        self.viewer.by_channel_params.blockSignals(True)
        
        gains = np.ones(self.viewer.nb_channel)
        offsets = np.zeros(self.viewer.nb_channel)
        nb_visible = np.sum(self.visible_channels)
        self.estimate_median_mad()
        
        if scale_mode=='real_scale':
            self.viewer.params['ylim_min'] = np.nanmin(self.signals_min[self.visible_channels])
            self.viewer.params['ylim_max'] = np.nanmax(self.signals_max[self.visible_channels])
        else:
            if scale_mode=='same_for_all':
                inv_scale =  max(self.signals_mad[self.visible_channels]) * spacing_factor
                if inv_scale == 0:
                    inv_scale = 1.
            elif scale_mode=='by_channel':
                inv_scale = self.signals_mad[self.visible_channels] * spacing_factor
                inv_scale[inv_scale==0.] = 1.
            gains[self.visible_channels] = np.ones(nb_visible, dtype=float) / inv_scale
            offsets[self.visible_channels] = np.arange(nb_visible)[::-1] - self.signals_med[self.visible_channels]*gains[self.visible_channels]
            self.viewer.params['ylim_min'] = -0.5
            self.viewer.params['ylim_max'] = nb_visible-0.5
        
        self.gains = gains
        self.offsets = offsets
        self.viewer.by_channel_params.blockSignals(False)

    def apply_ygain_zoom(self, factor_ratio):
        
        scale_mode = self.viewer.params['scale_mode']
        
        self.viewer.all_params.blockSignals(True)
        if scale_mode=='real_scale':
            ymin, ymax = self.viewer.params['ylim_min'], self.viewer.params['ylim_max']
            d = (ymax-ymin) * factor_ratio / 2.
            self.viewer.params['ylim_max'] = (ymin+ymax)/2. + d
            self.viewer.params['ylim_min'] = (ymin+ymax)/2. - d
        else :
            if not hasattr(self, 'self.signals_med'):
                self.estimate_median_mad()
            vis_offset = self.offsets + self.signals_med*self.gains
            self.gains = self.gains * factor_ratio
            #~ self.offsets = self.offsets + self.signals_med*self.gains * (1-factor_ratio)
            self.offsets = vis_offset - self.signals_med*self.gains
        self.viewer.all_params.blockSignals(False)
        
        self.viewer.refresh()
        
    def apply_xsize_zoom(self, xmove):
        factor = xmove/100.
        factor = max(factor, -0.999999999)
        factor = min(factor, 1)
        newsize = self.viewer.params['xsize']*(factor+1.)
        self.viewer.params['xsize'] = max(newsize, MIN_XSIZE)


default_params = [
    {'name': 'xsize', 'type': 'float', 'value': 3., 'step': 0.1},
    {'name': 'ylim_max', 'type': 'float', 'value': 10.},
    {'name': 'ylim_min', 'type': 'float', 'value': -10.},
    {'name': 'scale_mode', 'type': 'list', 'value': 'real_scale', 
        'values':['real_scale', 'same_for_all', 'by_channel'] },
    {'name': 'background_color', 'type': 'color', 'value': 'k'},
    {'name': 'refresh_interval', 'type': 'int', 'value': 100, 'limits':[5, 1000]},
    {'name': 'mode', 'type': 'list', 'value': 'scan', 'values': ['scan', 'scroll']},
    {'name': 'auto_decimate', 'type': 'bool', 'value': True},
    {'name': 'decimate', 'type': 'int', 'value': 1, 'limits': [1, None], },
    {'name': 'decimation_method', 'type': 'list', 'value': 'pure_decimate', 'values': ['pure_decimate', 'min_max', 'mean']},
    {'name': 'display_labels', 'type': 'bool', 'value': False},
    {'name': 'show_bottom_axis', 'type': 'bool', 'value': False},
    {'name': 'show_left_axis', 'type': 'bool', 'value': False},
    ]

default_by_channel_params = [ 
    {'name': 'gain', 'type': 'float', 'value': 1, 'step': 0.1},
    {'name': 'offset', 'type': 'float', 'value': 0., 'step': 0.1},
    {'name': 'visible', 'type': 'bool', 'value': True},
    ]


[docs]class QOscilloscope(BaseOscilloscope): """ Continuous, multi-channel oscilloscope based on Qt and pyqtgraph. """ _input_specs = {'signals': dict(streamtype='signals')} _default_params = default_params _default_by_channel_params = default_by_channel_params _ControllerClass = OscilloscopeController def __init__(self, **kargs): BaseOscilloscope.__init__(self, **kargs) def _configure(self, with_user_dialog=True, max_xsize = 60.): BaseOscilloscope._configure(self, with_user_dialog=with_user_dialog, max_xsize = max_xsize) def _initialize(self): BaseOscilloscope._initialize(self) if self.params_controller is not None: self.viewBox.gain_zoom.connect(self.params_controller.apply_ygain_zoom) self.viewBox.xsize_zoom.connect(self.apply_xsize_zoom) self.params.param('xsize').setLimits([2./self.sample_rate, self.max_xsize*.95]) self.curves = [] self.channel_labels = [] for i in range(self.nb_channel): color = '#7FFF00' # TODO curve = pg.PlotCurveItem(pen=color) self.plot.addItem(curve) self.curves.append(curve) txt = '{}: {}'.format(i, self.channel_names[i]) label = pg.TextItem(txt, color=color, anchor=(0.5, 0.5), border=None, fill=pg.mkColor((128,128,128, 200))) self.plot.addItem(label) self.channel_labels.append(label) self.reset_curves_data() def _refresh(self): mode = self.params['mode'] decimate = int(self.params['decimate']) gains = self.params_controller.gains offsets = self.params_controller.offsets visibles = self.params_controller.visible_channels xsize = self.params['xsize'] head = self._head if decimate>1: if self.params['decimation_method'] == 'min_max': head = head - head%(decimate*2) else: head = head - head%decimate full_arr = self.get_visible_chunk(head=head, limit_to_head_0=False).T full_arr = full_arr.astype(float) if decimate>1: if self.params['decimation_method'] == 'pure_decimate': small_arr = full_arr[:, ::decimate].copy() elif self.params['decimation_method'] == 'min_max': arr = full_arr.reshape(full_arr.shape[0], -1, decimate*2) small_arr = np.empty((full_arr.shape[0], self.small_size), dtype=full_arr.dtype) small_arr[:, ::2] = arr.max(axis=2) small_arr[:, 1::2] = arr.min(axis=2) elif self.params['decimation_method'] == 'mean': arr = full_arr.reshape(full_arr.shape[0], -1, decimate) small_arr = arr.mean(axis=2) else: raise(NotImplementedError) else: small_arr = full_arr.copy() # gain/offset small_arr[visibles, :] *= gains[visibles, None] small_arr[visibles, :] += offsets[visibles, None] if mode=='scroll': for c, visible in enumerate(visibles): if visible: self.curves_data[c] = small_arr[c,:] elif mode =='scan': ind = (head//decimate)%self.small_size+1 for c, visible in enumerate(visibles): if visible: self.curves_data[c] = np.concatenate((small_arr[c,-ind:], small_arr[c,:-ind])) for c, visible in enumerate(visibles): if visible: self.curves[c].setData(self.t_vect, self.curves_data[c], antialias=False) self.plot.setXRange(self.t_vect[0], self.t_vect[-1]) self.plot.setYRange(self.params['ylim_min'], self.params['ylim_max']) for c, visible in enumerate(visibles): label = self.channel_labels[c] if visible and self.params['display_labels']: if self.all_mean is not None: label.setPos(-self.params['xsize'], self.all_mean[c]*gains[c]+offsets[c]) else: label.setPos(-self.params['xsize'], offsets[c]) label.setVisible(True) else: label.setVisible(False) self.plot.showAxis('left', self.params['show_left_axis']) self.plot.showAxis('bottom', self.params['show_bottom_axis']) def on_param_change(self, params, changes): for param, change, data in changes: if change != 'value': continue if param.name()=='visible': c = self.by_channel_params.children().index(param.parent()) if data: self.curves[c].show() else: self.curves[c].hide() if param.name()=='background_color': self.graphicsview.setBackground(data) if param.name()=='xsize': if self.params['auto_decimate']: self.estimate_decimate() self.reset_curves_data() if param.name()=='decimate': self.reset_curves_data() if param.name()=='auto_decimate': if data: self.estimate_decimate() self.reset_curves_data() if param.name()=='refresh_interval': self.timer.setInterval(data) if param.name()=='mode': self.reset_curves_data() if param.name()=='scale_mode': self.params_controller.compute_rescale() def gain_zoom(self, factor, selected=None): for i, p in enumerate(self.by_channel_params.children()): if selected is not None and not selected[i]: continue if self.all_mean is not None: p['offset'] = p['offset'] + self.all_mean[i]*p['gain'] - self.all_mean[i]*p['gain']*factor p['gain'] = p['gain']*factor def get_visible_chunk(self, head=None, limit_to_head_0=True): if head is None: head = self._head if limit_to_head_0: sigs = self.inputs['signals'].get_data(max(-1, head-self.full_size), head, copy=False, join=True) # this ensure having at least one sample else: # get signal even before head=0 sigs = self.inputs['signals'].get_data(head-self.full_size, head, copy=False, join=True) return sigs def auto_scale(self, spacing_factor=9.): self.params_controller.compute_rescale(spacing_factor=spacing_factor) self.refresh()
register_node_type(QOscilloscope)