Source code for pyacq.devices.webcam_av

# -*- coding: utf-8 -*-
# Copyright (c) 2016, French National Center for Scientific Research (CNRS)
# Distributed under the (new) BSD License. See LICENSE for more info.

"""
av is python binding to libav or ffmpeg and this is so great.
http://mikeboers.github.io/PyAV/index.html
"""
import time
import sys
import subprocess
import os
import re

import numpy as np

from ..core import Node, register_node_type
from pyqtgraph.Qt import QtCore, QtGui
from pyqtgraph.util.mutex import Mutex

try:
    import av
    HAVE_AV = True
except ImportError:
    HAVE_AV = False



class AVThread(QtCore.QThread):
    def __init__(self, out_stream, container, parent=None):
        QtCore.QThread.__init__(self)
        self.out_stream = out_stream
        self.container = container

        self.lock = Mutex()
        self.running = False
        
    def run(self):
        with self.lock:
            self.running = True
        n = 0
        stream = self.container.streams[0]

        for packet in self.container.demux(stream):
            with self.lock:
                if not self.running:
                    break
            for frame in packet.decode():
                arr = frame.to_rgb().to_nd_array()
                n += 1
                self.out_stream.send(arr, index=n)

    def stop(self):
        with self.lock:
            self.running = False


[docs]class WebCamAV(Node): """ Simple webcam device using the `av` python module, which is a wrapper around ffmpeg or libav. See http://mikeboers.github.io/PyAV/index.html. """ _output_specs = {'video': dict(streamtype='video',dtype='uint8', shape=(4800, 6400, 3), compression ='', sample_rate = 1.) } def __init__(self, **kargs): Node.__init__(self, **kargs) assert HAVE_AV, "WebCamAV node depends on the `av` package, but it could not be imported." def _configure(self, camera_num=0, **options): self.camera_num = camera_num self.options = options # todo 'dshow' under windows if sys.platform.startswith('win'): self.format = 'dshow' dev_names = get_device_list_dshow() self.filepath = "video={}".format(dev_names[camera_num]) else: self.filepath = '/dev/video{}'.format(self.camera_num) self.format = 'video4linux2' container = av.open(self.filepath, 'r', self.format , self.options) stream = next(s for s in container.streams if s.type == 'video') self.output.spec['shape'] = (stream.format.height, stream.format.width, 3) self.output.spec['sample_rate'] = float(stream.average_rate) def _initialize(self): pass def _start(self): self.container = av.open(self.filepath, 'r', self.format , self.options) self._thread = AVThread(self.output, self.container) self._thread.start() def _stop(self): self._thread.stop() self._thread.wait() self._running = False # this delete container (+thread) to close the device del(self.container) del(self._thread) def _close(self): pass
register_node_type(WebCamAV) def get_device_list_dshow(): """ Some uggly code to get get device name list under windows directshow """ cmd = "ffmpeg -list_devices true -f dshow -i dummy" proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) txt = proc.stdout.read().decode('ascii') txt = txt.split("DirectShow video devices")[1].split("DirectShow audio devices")[0] pattern = '"([^"]*)"' l = re.findall(pattern, txt, ) l = [e for e in l if not e.startswith('@')] return l