# -*- coding: utf-8 -*-
# Copyright (c) 2016, French National Center for Scientific Research (CNRS)
# Distributed under the (new) BSD License. See LICENSE for more info.
from pyqtgraph.Qt import QtCore
import pyqtgraph as pg
from pyqtgraph.util.mutex import Mutex
import numpy as np
from ..core import (Node, register_node_type, ThreadPollInput, StreamConverter)
import distutils.version
try:
import scipy.signal
HAVE_SCIPY = True
# scpy.signal.sosfilt was introduced in scipy 0.16
assert distutils.version.LooseVersion(scipy.__version__)>'0.16'
except ImportError:
HAVE_SCIPY = False
try:
import pyopencl
mf = pyopencl.mem_flags
HAVE_PYOPENCL = True
except ImportError:
HAVE_PYOPENCL = False
#See
#http://scipy.github.io/devdocs/generated/scipy.signal.sosfilt.html
#TODO: make a kernel that mix SosFilter_OpenCL_V1 and SosFilter_OpenCL_V2 approach
class SosFilter_Scipy:
"""
Implementation with scipy.
"""
def __init__(self, coefficients, nb_channel, dtype, chunksize):
self.coefficients = coefficients
self.nb_section = coefficients.shape[0]
self.nb_channel = nb_channel
self.zi = np.zeros((self.nb_section, 2, self.nb_channel), dtype= dtype)
self.dtype=dtype
self.chunksize = chunksize
def compute_one_chunk(self, pos, chunk):
chunk_filtered, self.zi = scipy.signal.sosfilt(self.coefficients, chunk, zi = self.zi, axis = 0)
chunk_filtered = chunk_filtered.astype(self.dtype)
return chunk_filtered
class SosFilter_OpenCl_Base:
def __init__(self, coefficients, nb_channel, dtype, chunksize):
self.dtype = np.dtype(dtype)
assert self.dtype == np.dtype('float32')
self.nb_channel = nb_channel
self.chunksize = chunksize
assert self.chunksize is not None, 'chunksize for opencl must be fixed'
self.coefficients = coefficients.astype(self.dtype)
if self.coefficients.ndim==2: #(nb_section, 6) to (nb_channel, nb_section, 6)
self.coefficients = np.tile(self.coefficients[None,:,:], (nb_channel, 1,1))
if not self.coefficients.flags['C_CONTIGUOUS']:
self.coefficients = self.coefficients.copy()
self.nb_section = self.coefficients.shape[1]
assert self.coefficients.shape[0]==self.nb_channel, 'wrong coefficients.shape'
assert self.coefficients.shape[2]==6, 'wrong coefficients.shape'
self.ctx = pyopencl.create_some_context()
#TODO : add arguments gpu_platform_index/gpu_device_index
#self.devices = [pyopencl.get_platforms()[self.gpu_platform_index].get_devices()[self.gpu_device_index] ]
#self.ctx = pyopencl.Context(self.devices)
self.queue = pyopencl.CommandQueue(self.ctx)
#host arrays
self.zi = np.zeros((nb_channel, self.nb_section, 2), dtype= self.dtype)
#GPU buffers
nbytes = self.chunksize * self.nb_channel * self.dtype.itemsize
self.coefficients_cl = pyopencl.Buffer(self.ctx, mf.READ_ONLY | mf.COPY_HOST_PTR, hostbuf=self.coefficients)
self.zi_cl = pyopencl.Buffer(self.ctx, mf.READ_WRITE | mf.COPY_HOST_PTR, hostbuf=self.zi)
self.input_cl = pyopencl.Buffer(self.ctx, mf.READ_WRITE, size=nbytes)
self.output_cl = pyopencl.Buffer(self.ctx, mf.READ_WRITE, size=nbytes)
#nb works
kernel = self.kernel%dict(chunksize = self.chunksize, nb_section=self.nb_section, nb_channel=self.nb_channel)
prg = pyopencl.Program(self.ctx, kernel)
self.opencl_prg = prg.build(options='-cl-mad-enable')
class SosFilter_OpenCL_V1(SosFilter_OpenCl_Base):
"""
Implementation with OpenCL : this version scale nb_channel.
"""
def __init__(self, coefficients, nb_channel, dtype, chunksize):
SosFilter_OpenCl_Base.__init__(self, coefficients, nb_channel, dtype, chunksize)
self.global_size = (self.nb_channel, )
self.local_size = (self.nb_channel, )
self.output = np.zeros((self.chunksize, self.nb_channel), dtype= self.dtype)
self.kernel_func_name = 'sos_filter'
def compute_one_chunk(self, pos, chunk):
assert chunk.dtype==self.dtype
assert chunk.shape==(self.chunksize, self.nb_channel), 'wrong shape'
if not chunk.flags['C_CONTIGUOUS']:
chunk = chunk.copy()
pyopencl.enqueue_copy(self.queue, self.input_cl, chunk)
kern_call = getattr(self.opencl_prg, self.kernel_func_name)
event = kern_call(self.queue, self.global_size, self.local_size,
self.input_cl, self.output_cl, self.coefficients_cl, self.zi_cl)
event.wait()
pyopencl.enqueue_copy(self.queue, self.output, self.output_cl)
chunk_filtered = self.output
return chunk_filtered
kernel = """
#define chunksize %(chunksize)d
#define nb_section %(nb_section)d
#define nb_channel %(nb_channel)d
__kernel void sos_filter(__global float *input, __global float *output, __constant float *coefficients,
__global float *zi) {
int chan = get_global_id(0); //channel indice
int offset_filt2; //offset channel within section
int offset_zi = chan*nb_section*2;
int idx;
float w0, w1,w2;
float res;
for (int section=0; section<nb_section; section++){
offset_filt2 = chan*nb_section*6+section*6;
w1 = zi[offset_zi+section*2+0];
w2 = zi[offset_zi+section*2+1];
for (int s=0; s<chunksize;s++){
idx = s*nb_channel+chan;
if (section==0) {w0 = input[idx];}
else {w0 = output[idx];}
w0 -= coefficients[offset_filt2+4] * w1;
w0 -= coefficients[offset_filt2+5] * w2;
res = coefficients[offset_filt2+0] * w0 + coefficients[offset_filt2+1] * w1 + coefficients[offset_filt2+2] * w2;
w2 = w1; w1 =w0;
output[idx] = res;
}
zi[offset_zi+section*2+0] = w1;
zi[offset_zi+section*2+1] = w2;
}
}
"""
class SosFilter_OpenCL_V2:
"""
Implementation with OpenCL : this version scale nb_section.
"""
def __init__(self, coefficients, nb_channel, dtype, chunksize):
SosFilter_OpenCl_Base.__init__(self, coefficients, nb_channel, dtype, chunksize)
self.global_size = (self.nb_channel, self.nb_section)
self.local_size = (1, self.nb_section, )
self.output = np.zeros((self.nb_channel,self.chunksize), dtype= self.dtype)
self.kernel_func_name = 'sos_filter'
def compute_one_chunk(self, pos, chunk):
assert chunk.dtype==self.dtype
assert chunk.shape==(self.chunksize, self.nb_channel), 'wrong shape'
chunk = chunk.transpose()
if not chunk.flags['C_CONTIGUOUS']:
chunk = chunk.copy()
pyopencl.enqueue_copy(self.queue, self.input_cl, chunk)
kern_call = getattr(self.opencl_prg, self.kernel_func_name)
event = kern_call(self.queue, self.global_size, self.local_size,
self.input_cl, self.output_cl, self.coefficients_cl, self.zi_cl)
event.wait()
pyopencl.enqueue_copy(self.queue, self.output, self.output_cl)
chunk_filtered = self.output.transpose()
return chunk_filtered
kernel = """
#define chunksize %(chunksize)d
#define nb_section %(nb_section)d
#define nb_channel %(nb_channel)d
__kernel void sos_filter(__global float *input, __global float *output,
__constant float *coefficients, __global float *zi) {
int chan = get_global_id(0); //channel indice
int section = get_global_id(1); //section indice
int offset_buf = chan*chunksize;
int offset_filt = chan*nb_section*6; //offset channel
int offset_filt2; //offset channel within section
int offset_zi = chan*nb_section*2;
// copy channel to local group
__local float out_channel[chunksize];
if (section ==0) for (int s=0; s<chunksize;s++) out_channel[s] = input[offset_buf+s];
float w0, w1,w2;
float y0;
w1 = zi[offset_zi+section*2+0];
w2 = zi[offset_zi+section*2+1];
int s2;
for (int s=0; s<chunksize+(3*nb_section);s++){
barrier(CLK_LOCAL_MEM_FENCE);
s2 = s-section*3;
if (s2>=0 && (s2<chunksize)){
// if (direction==-1) s2 = chunksize - s2 - 1; //this is for bacward
offset_filt2 = offset_filt+section*6;
w0 = out_channel[s2];
w0 -= coefficients[offset_filt2+4] * w1;
w0 -= coefficients[offset_filt2+5] * w2;
out_channel[s2] = coefficients[offset_filt2+0] * w0 + coefficients[offset_filt2+1] * w1 + coefficients[offset_filt2+2] * w2;
w2 = w1; w1 =w0;
}
}
zi[offset_zi+section*2+0] = w1;
zi[offset_zi+section*2+1] = w2;
if (section ==(nb_section-1)){
for (int s=0; s<chunksize;s++) output[offset_buf+s] = out_channel[s];
}
}
"""
class SosFilter_OpenCL_V3(SosFilter_OpenCl_Base):
"""
Implementation with OpenCL : similar to SosFilter_OpenCL_V2 but with global
memory and no transpose on host.
"""
def __init__(self, coefficients, nb_channel, dtype, chunksize):
SosFilter_OpenCl_Base.__init__(self, coefficients, nb_channel, dtype, chunksize)
self.global_size = (self.nb_channel, self.nb_section)
self.local_size = (1, self.nb_section)
self.output = np.zeros((self.chunksize, self.nb_channel), dtype= self.dtype)
self.kernel_func_name = 'sos_filter'
def compute_one_chunk(self, pos, chunk):
assert chunk.dtype==self.dtype
assert chunk.shape==(self.chunksize, self.nb_channel), 'wrong shape'
if not chunk.flags['C_CONTIGUOUS']:
chunk = chunk.copy()
pyopencl.enqueue_copy(self.queue, self.input_cl, chunk)
kern_call = getattr(self.opencl_prg, self.kernel_func_name)
event = kern_call(self.queue, self.global_size, self.local_size,
self.input_cl, self.output_cl, self.coefficients_cl, self.zi_cl)
event.wait()
pyopencl.enqueue_copy(self.queue, self.output, self.output_cl)
chunk_filtered = self.output
return chunk_filtered
kernel = """
#define chunksize %(chunksize)d
#define nb_section %(nb_section)d
#define nb_channel %(nb_channel)d
__kernel void sos_filter(__global float *input, __global float *output, __constant float *coefficients,
__global float *zi) {
int chan = get_global_id(0); //channel indice
int section = get_global_id(1); //section indice
int offset_filt2; //offset channel within section
int offset_zi = chan*nb_section*2;
int idx;
float w0, w1,w2;
float res;
int s2;
w1 = zi[offset_zi+section*2+0];
w2 = zi[offset_zi+section*2+1];
for (int s=0; s<chunksize+(3*nb_section);s++){
barrier(CLK_GLOBAL_MEM_FENCE);
s2 = s-section*3;
if (s2>=0 && (s2<chunksize)){
offset_filt2 = chan*nb_section*6+section*6;
idx = s2*nb_channel+chan;
if (section==0) {w0 = input[idx];}
else {w0 = output[idx];}
w0 -= coefficients[offset_filt2+4] * w1;
w0 -= coefficients[offset_filt2+5] * w2;
res = coefficients[offset_filt2+0] * w0 + coefficients[offset_filt2+1] * w1 + coefficients[offset_filt2+2] * w2;
w2 = w1; w1 =w0;
output[idx] = res;
}
}
zi[offset_zi+section*2+0] = w1;
zi[offset_zi+section*2+1] = w2;
}
"""
sosfilter_engines = { 'scipy' : SosFilter_Scipy, 'opencl' : SosFilter_OpenCL_V1,
'opencl2' : SosFilter_OpenCL_V2, 'opencl3' : SosFilter_OpenCL_V3, }
class SosFilterThread(ThreadPollInput):
def __init__(self, input_stream, output_stream, timeout = 200, parent = None):
ThreadPollInput.__init__(self, input_stream, timeout = timeout, return_data=True, parent = parent)
self.output_stream = output_stream
self.mutex = Mutex()
def process_data(self, pos, data):
with self.mutex:
chunk_filtered = self.filter_engine.compute_one_chunk(pos, data)
self.output_stream.send(chunk_filtered, index=pos)
def set_params(self, engine, coefficients, nb_channel, dtype, chunksize):
assert engine in sosfilter_engines
EngineClass = sosfilter_engines[engine]
with self.mutex:
self.filter_engine = EngineClass(coefficients, nb_channel, dtype, chunksize)
[docs]class SosFilter(Node, QtCore.QObject):
"""
Node for filtering multi channel signals.
This uses a second order filter, it is a casde of IIR filter of order 2.
Example::
dev = NumpyDeviceBuffer()
dev.configure(...)
dev.output.configure(...)
dev.initialize(...)
f1, f2 = 40., 60.
coefficients = scipy.signal.iirfilter(7, [f1/sample_rate*2, f2/sample_rate*2],
btype='bandpass', ftype='butter', output='sos')
filter = SosFilter()
filter.configure(coefficients=coefficients)
filter.input.connect(dev.output)
filter.output.configure(...)
filter.initialize()
The ``coefficients.shape`` must be (nb_section, 6).
If pyopencl is avaible you can use ``SosFilter.configure(engine='opencl')``.
In that case the coefficients.shape can also be (nb_channel, nb_section, 6)
this helps for having different filters on each channel.
The opencl engine inernally requires data to be in (channel, sample) order.
If the input data does not have this order, then it must be copied and
performance will be affected.
"""
_input_specs = {'signals' : dict(streamtype = 'signals')}
_output_specs = {'signals' : dict(streamtype = 'signals')}
def __init__(self, parent = None, **kargs):
QtCore.QObject.__init__(self, parent)
Node.__init__(self, **kargs)
assert HAVE_SCIPY, "SosFilter need scipy>0.16"
def _configure(self, coefficients = None, engine='scipy', chunksize=None):
"""
Set the coefficient of the filter.
See http://scipy.github.io/devdocs/generated/scipy.signal.sosfilt.html for details.
"""
self.set_coefficients(coefficients)
self.engine = engine
self.chunksize = chunksize
def _initialize(self):
self.thread = SosFilterThread(self.input, self.output)
self.thread.set_params(self.engine, self.coefficients, self.nb_channel,
self.output.params['dtype'], self.chunksize)
def _start(self):
self.thread.last_pos = None
self.thread.start()
def _stop(self):
self.thread.stop()
self.thread.wait()
def set_coefficients(self, coefficients):
self.coefficients = coefficients
if self.initialized():
self.thread.set_params(self.engine, self.coefficients, self.nb_channel,
self.output.params['dtype'], self.chunksize)
register_node_type(SosFilter)