Source code for psychopy_pyo.backend_pyo

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Originally part of the PsychoPy library
# Copyright (C) 2002-2018 Jonathan Peirce (C) 2019-2022 Open Science Tools Ltd.
# Distributed under the terms of the GNU General Public License (GPL).

__all__ = [
    'init',
    'get_devices_infos',
    'get_input_devices',
    'get_output_devices',
    'getDevices',
    'SoundPyo'
]

import sys
import os
import atexit
import threading
from itertools import chain
from numpy import float64
from psychopy import prefs
from psychopy.sound.exceptions import DependencyError
from psychopy import core, logging
from psychopy.constants import (STARTED, FINISHED, STOPPED, NOT_STARTED,
                                FOREVER)
from psychopy.sound._base import _SoundBase


travisCI = bool(str(os.environ.get('TRAVIS')).lower() == 'true')
try:
    import pyo
except ImportError as err:
    if not travisCI:
        # convert this import error to our own, pyo probably not installed
        raise DependencyError(repr(err))

pyoSndServer = None
audioDriver = None


def _bestDriver(devNames, devIDs):
    """Find ASIO or Windows sound drivers
    """
    preferredDrivers = prefs.hardware['audioDriver']
    outputID = None
    audioDriver = None
    for prefDriver in preferredDrivers:
        logging.info(u'Looking for {}'.format(prefDriver))
        if prefDriver.lower() == 'directsound':
            prefDriver = u'Primary Sound'
        # look for that driver in available devices
        for devN, devString in enumerate(devNames):
            logging.info(u'Examining for {}'.format(devString))
            try:
                ds = devString.lower()
                if prefDriver.lower() in ds:
                    audioDriver = devString
                    outputID = devIDs[devN]
                    logging.info(u'Success: {}'.format(devString))
                    # we found a driver don't look for others
                    return audioDriver, outputID
            except (UnicodeDecodeError, UnicodeEncodeError):
                logging.info(u'Failed: {}'.format(devString))
                logging.warn('find best sound driver - could not '
                             'interpret unicode in driver name')
    else:
        return None, None


def get_devices_infos():
    in_devices, out_devices = pyo.pa_get_devices_infos()
    for index, device in chain(in_devices.items(), out_devices.items()):
        device.update({
            'default sr': '{} Hz'.format(device['default sr']),
            'host api index': str(device['host api index']),
            'latency': '{} s'.format(round(device['latency'], 6)),
        })
    return (in_devices, out_devices)


def get_output_devices():
    _, out_devices = get_devices_infos()
    return tuple(zip(*[
        (device['name'], dev_id) for dev_id, device in out_devices.items()
    ]))


def get_input_devices():
    in_devices, _ = get_devices_infos()
    return tuple(zip(*[
        (device['name'], dev_id) for dev_id, device in in_devices.items()
    ]))


def getDevices(kind=None):
    """Returns a dict of dict of audio devices of specified `kind`

    The dict keys are names and items are dicts of properties
    """
    inputs, outputs = get_devices_infos()
    if kind is None:
        allDevs = inputs.copy()
        allDevs.update(outputs)
    elif kind == 'output':
        allDevs = outputs
    else:
        allDevs = inputs
    devs = {}
    for ii in allDevs:  # in pyo this is a dict but keys are ii ! :-/
        dev = allDevs[ii]
        # newline characters must be removed
        devName = dev['name'].replace('\r\n', '')
        devs[devName] = dev
        dev['id'] = ii
    return devs


# these will be controlled by sound.__init__.py
defaultInput = None
defaultOutput = None


def init(rate=44100, stereo=True, buffer=128):
    """setup the pyo (sound) server
    """
    global pyoSndServer, Sound, audioDriver, duplex, maxChnls
    Sound = SoundPyo
    global pyo
    try:
        assert pyo
    except NameError:  # pragma: no cover
        import pyo
        # can be needed for microphone.switchOn(), which calls init even
        # if audioLib is something else

    # subclass the pyo.Server so that we can insert a __del__ function that
    # shuts it down skip coverage since the class is never used if we have
    # a recent version of pyo

    class _Server(pyo.Server):  # pragma: no cover
        # make libs class variables so they don't get deleted first
        core = core
        logging = logging

        def __del__(self):
            self.stop()
            # make sure enough time passes for the server to shutdown
            self.core.wait(0.5)
            self.shutdown()
            # make sure enough time passes for the server to shutdown
            self.core.wait(0.5)
            # this may never get printed
            self.logging.debug('pyo sound server shutdown')
    if '.'.join(map(str, pyo.getVersion())) < '0.6.4':
        Server = _Server
    else:
        Server = pyo.Server

    # if we already have a server, just re-initialize it
    if 'pyoSndServer' in globals() and hasattr(pyoSndServer, 'shutdown'):
        pyoSndServer.stop()
        # make sure enough time passes for the server to shutdown
        core.wait(0.5)
        pyoSndServer.shutdown()
        core.wait(0.5)
        pyoSndServer.reinit(sr=rate, nchnls=maxChnls,
                            buffersize=buffer, audio=audioDriver)
        pyoSndServer.boot()
    else:
        if sys.platform == 'win32':
            # check for output device/driver
            #todo: Throwing errors on one users' config https://discourse.psychopy.org/t/error-with-microphone-component-on-psychopy-2020/13168
            devNames, devIDs = get_output_devices()
            audioDriver, outputID = _bestDriver(devNames, devIDs)
            if outputID is None:
                # using the default output because we didn't find the one(s)
                # requested
                audioDriver = 'Windows Default Output'
                outputID = pyo.pa_get_default_output()
            if outputID is not None:
                logging.info(u'Using sound driver: %s (ID=%i)' %
                             (audioDriver, outputID))
                maxOutputChnls = pyo.pa_get_output_max_channels(outputID)
            else:
                logging.warning(
                    'No audio outputs found (no speakers connected?')
                return -1
            # check for valid input (mic)
            # If no input device is available, devNames and devIDs are empty
            # lists.
            devNames, devIDs = get_input_devices()
            audioInputName, inputID = _bestDriver(devNames, devIDs)
            # Input devices were found, but requested devices were not found
            if len(devIDs) > 0 and inputID is None:
                defaultID = pyo.pa_get_default_input()
                if defaultID is not None and defaultID != -1:
                    # default input is found
                    # use the default input because we didn't find the one(s)
                    # requested
                    audioInputName = 'Windows Default Input'
                    inputID = defaultID
                else:
                    # default input is not available
                    inputID = None
            if inputID is not None:
                msg = u'Using sound-input driver: %s (ID=%i)'
                logging.info(msg % (audioInputName, inputID))
                maxInputChnls = pyo.pa_get_input_max_channels(inputID)
                duplex = bool(maxInputChnls > 0)
            else:
                maxInputChnls = 0
                duplex = False
        # for other platforms set duplex to True (if microphone is available)
        else:
            audioDriver = prefs.hardware['audioDriver'][0]
            maxInputChnls = pyo.pa_get_input_max_channels(
                pyo.pa_get_default_input())
            maxOutputChnls = pyo.pa_get_output_max_channels(
                pyo.pa_get_default_output())
            duplex = bool(maxInputChnls > 0)

        maxChnls = min(maxInputChnls, maxOutputChnls)
        if maxInputChnls < 1:  # pragma: no cover
            msg = (u'%s.init could not find microphone hardware; '
                   u'recording not available')
            logging.warning(msg % __name__)
            maxChnls = maxOutputChnls
        if maxOutputChnls < 1:  # pragma: no cover
            msg = (u'%s.init could not find speaker hardware; '
                   u'sound not available')
            logging.error(msg % __name__)
            return -1

        # create the instance of the server:
        if sys.platform == 'darwin' or sys.platform.startswith('linux'):
            # for mac/linux we set the backend using the server audio param
            pyoSndServer = Server(sr=rate, nchnls=maxChnls,
                                  buffersize=buffer, audio=audioDriver)
        else:
            # with others we just use portaudio and then set the OutputDevice
            # below
            pyoSndServer = Server(sr=rate, nchnls=maxChnls, buffersize=buffer)

        pyoSndServer.setVerbosity(1)
        if sys.platform == 'win32':
            pyoSndServer.setOutputDevice(outputID)
            if inputID is not None:
                pyoSndServer.setInputDevice(inputID)
        # do other config here as needed (setDuplex? setOutputDevice?)
        pyoSndServer.setDuplex(duplex)
        pyoSndServer.boot()
    core.wait(0.5)  # wait for server to boot before starting the sound stream
    pyoSndServer.start()

    # atexit is filo, will call stop then shutdown upon closing
    atexit.register(pyoSndServer.shutdown)
    atexit.register(pyoSndServer.stop)
    try:
        Sound()  # test creation, no play
    except pyo.PyoServerStateException:
        msg = "Failed to start pyo sound Server"
        if sys.platform == 'darwin' and audioDriver != 'portaudio':
            msg += "; maybe try prefs.general.audioDriver 'portaudio'?"
        logging.error(msg)
        core.quit()
    logging.debug('pyo sound server started')
    logging.flush()


[docs]class SoundPyo(_SoundBase): """Create a sound object, from one of MANY ways. """ def __init__(self, value="C", secs=0.5, octave=4, stereo=True, volume=1.0, loops=0, sampleRate=44100, bits=16, hamming=True, start=0, stop=-1, name='', autoLog=True): """ value: can be a number, string or an array: * If it's a number between 37 and 32767 then a tone will be generated at that frequency in Hz. * It could be a string for a note ('A', 'Bfl', 'B', 'C', 'Csh', ...). Then you may want to specify which octave as well * Or a string could represent a filename in the current location, or mediaLocation, or a full path combo * Or by giving an Nx2 numpy array of floats (-1:1) you can specify the sound yourself as a waveform By default, a Hanning window (5ms duration) will be applied to a generated tone, so that onset and offset are smoother (to avoid clicking). To disable the Hanning window, set `hamming=False`. secs: Duration of a tone. Not used for sounds from a file. start : float Where to start playing a sound file; default = 0s (start of the file). stop : float Where to stop playing a sound file; default = end of file. octave: is only relevant if the value is a note name. Middle octave of a piano is 4. Most computers won't output sounds in the bottom octave (1) and the top octave (8) is generally painful stereo: True (= default, two channels left and right), False (one channel) volume: loudness to play the sound, from 0.0 (silent) to 1.0 (max). Adjustments are not possible during playback, only before. loops : int How many times to repeat the sound after it plays once. If `loops` == -1, the sound will repeat indefinitely until stopped. sampleRate (= 44100): if the psychopy.sound.init() function has been called or if another sound has already been created then this argument will be ignored and the previous setting will be used bits: has no effect for the pyo backend hamming: boolean (default True) to indicate if the sound should be apodized (i.e., the onset and offset smoothly ramped up from down to zero). The function apodize uses a Hanning window, but arguments named 'hamming' are preserved so that existing code is not broken by the change from Hamming to Hanning internally. Not applied to sounds from files. """ global pyoSndServer if pyoSndServer is None or pyoSndServer.getIsBooted() == 0: init(rate=sampleRate) # Check if the user tried to change the sample rate. Since this isn't # possible presently, just warn the user for now. actualSampleRate = pyoSndServer.getSamplingRate() if actualSampleRate != sampleRate: logging.warning( "Cannot change sample rate to {} since audio server has " "already started, using {} instead.".format( sampleRate, actualSampleRate)) self.sampleRate = actualSampleRate self.format = bits self.isStereo = stereo self.channels = 1 + int(stereo) self.secs = secs self.startTime = start self.stopTime = stop self.autoLog = autoLog self.name = name # try to create sound; set volume and loop before setSound (else # needsUpdate=True) self._snd = None self.volume = min(1.0, max(0.0, volume)) # distinguish the loops requested from loops actual because of # infinite tones (which have many loops but none requested) # -1 for infinite or a number of loops self.requestedLoops = self.loops = int(loops) self.setSound(value=value, secs=secs, octave=octave, hamming=hamming) self.needsUpdate = False self._isPlaying = False @property def isPlaying(self): """`True` if the audio playback is ongoing.""" return self._isPlaying
[docs] def play(self, loops=None, autoStop=True, log=True, when=None): """Starts playing the sound on an available channel. loops : int How many times to repeat the sound after it plays once. If `loops` == -1, the sound will repeat indefinitely until stopped. when: not used but included for compatibility purposes For playing a sound file, you cannot specify the start and stop times when playing the sound, only when creating the sound initially. Playing a sound runs in a separate thread i.e. your code won't wait for the sound to finish before continuing. To pause while playing, you need to use a `psychopy.core.wait(mySound.getDuration())`. If you call `play()` while something is already playing the sounds will be played over each other. """ if self.isPlaying: return if loops is not None and self.loops != loops: self.setLoops(loops) if self.needsUpdate: # ~0.00015s, regardless of the size of self._sndTable self._updateSnd() self._snd.out() self._isPlaying = True if autoStop or self.loops != 0: # pyo looping is boolean: loop forever or not at all # so track requested loops using time; limitations: not # sample-accurate if self.loops >= 0: duration = self.getDuration() * (self.loops + 1) else: duration = FOREVER self.terminator = threading.Timer(duration, self._onEOS) self.terminator.start() if log and self.autoLog: logging.exp(u"Sound %s started" % (self.name), obj=self) return self
def _onEOS(self): # call _onEOS from a thread based on time, enables loop termination if self.loops != 0: # then its looping forever as a pyo object self._snd.stop() if self.isPlaying: # in case of multiple successive trials self._isPlaying = False return True
[docs] def stop(self, log=True): """Stops the sound immediately""" if not self.isPlaying: return self._snd.stop() try: self.terminator.cancel() except Exception: # pragma: no cover pass self._isPlaying = False if log and self.autoLog: logging.exp(u"Sound %s stopped" % (self.name), obj=self)
def _updateSnd(self): self.needsUpdate = False doLoop = bool(self.loops != 0) # if True, end it via threading.Timer if type(self.volume) == float64: self.volume = self.volume.item() self._snd = pyo.TableRead(self._sndTable, freq=self._sndTable.getRate(), loop=doLoop, mul=self.volume) def _setSndFromFile(self, fileName): # want mono sound file played to both speakers, not just left / 0 self.fileName = fileName self._sndTable = pyo.SndTable(initchnls=self.channels) # in case a tone with inf loops had been used before self.loops = self.requestedLoops # mono file loaded to all chnls: try: self._sndTable.setSound(self.fileName, start=self.startTime, stop=self.stopTime) except Exception: msg = (u'Could not open sound file `%s` using pyo; not found ' 'or format not supported.') logging.error(msg % fileName) raise TypeError(msg % fileName) self._updateSnd() self.duration = self._sndTable.getDur() def _setSndFromArray(self, thisArray): self._sndTable = pyo.DataTable(size=len(thisArray), init=thisArray.T.tolist(), chnls=self.channels) self._updateSnd() # a DataTable has no .getDur() method, so just store the duration: self.duration = float(len(thisArray)) / self.sampleRate
if __name__ == "__main__": pass

Back to top