Source code for psychopy.sound.backend_ptb

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
New backend for the Psychtoolbox portaudio engine
"""
import sys
import os
import time
import re
import weakref
from pathlib import Path

from psychopy import prefs, logging, exceptions
from psychopy.constants import (STARTED, PAUSED, FINISHED, STOPPING,
                                NOT_STARTED)
from psychopy.tools import systemtools
from psychopy.tools import filetools as ft
from .exceptions import SoundFormatError, DependencyError
from ._base import _SoundBase, HammingWindow
from ..hardware import DeviceManager

try:
    from psychtoolbox import audio
    import psychtoolbox as ptb
except Exception:
    raise DependencyError("psychtoolbox audio failed to import")
try:
    import soundfile as sf
except Exception:
    raise DependencyError("soundfile not working")

import numpy as np

try:
    defaultLatencyClass = int(prefs.hardware['audioLatencyMode'][0])
except (TypeError, IndexError):  # maybe we were given a number instead
    defaultLatencyClass = prefs.hardware['audioLatencyMode']
"""vals in prefs.hardware['audioLatencyMode'] are:
     {0:_translate('Latency not important'),
      1:_translate('Share low-latency driver'),
      2:_translate('Exclusive low-latency'),
      3:_translate('Aggressive low-latency'),
      4:_translate('Latency critical')}
Based on help at http://psychtoolbox.org/docs/PsychPortAudio-Open
"""
# suggestedLatency = 0.005  ## Not currently used. Keep < 1 scr refresh

if prefs.hardware['audioDriver']=='auto':
    audioDriver = None
else:
    audioDriver = prefs.hardware['audioDriver']

if prefs.hardware['audioDevice']=='auto':
    audioDevice = None
else:
    audioDevice = prefs.hardware['audioDevice']

# these will be used by sound.__init__.py
defaultInput = None
defaultOutput = audioDevice

logging.info("Loaded psychtoolbox audio version {}"
             .format(audio.get_version_info()['version']))

# ask PTB to align verbosity with our current logging level at console
_verbosities = ((logging.DEBUG, 5),
                (logging.INFO, 4),
                (logging.EXP, 3),
                (logging.WARNING, 2),
                (logging.ERROR, 1))

for _logLevel, _verbos in _verbosities:
    if logging.console.level <= _logLevel:
        audio.verbosity(_verbos)
        break


def init(rate=48000, stereo=True, buffer=128):
    pass  # for compatibility with other backends


def getDevices(kind=None):
    """Returns a dict of dict of audio devices of specified `kind`

    kind can be None, 'input' or 'output'
    The dict keys are names, and items are dicts of properties
    """
    if sys.platform == 'win32':
        deviceTypes = 13  # only WASAPI drivers need apply!
    else:
        deviceTypes = None
    devs = {}
    if systemtools.isVM_CI():  # GitHub actions VM does not have a sound device
        return devs
    else:
        allDevs = audio.get_devices(device_type=deviceTypes)

    # annoyingly query_devices is a DeviceList or a dict depending on number
    if isinstance(allDevs, dict):
        allDevs = [allDevs]

    for ii, dev in enumerate(allDevs):
        if kind and kind.startswith('in'):
            if dev['NrInputChannels'] < 1:
                continue
        elif kind and kind.startswith('out'):
            if dev['NrOutputChannels'] < 1:
                continue
        # we have a valid device so get its name
        # newline characters must be removed
        devName = dev['DeviceName'].replace('\r\n', '')
        devs[devName] = dev
        dev['id'] = ii
    return devs


def getStreamLabel(sampleRate, channels, blockSize):
    """Returns the string repr of the stream label
    """
    return "{}_{}_{}".format(sampleRate, channels, blockSize)


class _StreamsDict(dict):
    """Keeps track of what streams have been created. On macOS we can have
    multiple streams under portaudio but under windows we can only have one.

    use the instance `streams` rather than creating a new instance of this
    """

    def getStream(self, sampleRate, channels, blockSize):
        """Gets a stream of exact match or returns a new one
        (if possible for the current operating system)
        """
        # if the query looks flexible then try getSimilar
        if channels == -1 or blockSize == -1:
            return self._getSimilar(sampleRate,
                                    channels=channels,
                                    blockSize=blockSize)
        else:
            return self._getStream(sampleRate,
                                   channels=channels,
                                   blockSize=blockSize)

    def _getSimilar(self, sampleRate, channels=-1, blockSize=-1):
        """Do we already have a compatible stream?

        Many sounds can allow channels and blocksize to change but samplerate
        is generally fixed. Any values set to -1 above will be flexible. Any
        values set to an alternative number will be fixed

        usage:

            label, stream = streams._getSimilar(sampleRate=44100,  # must match
                                               channels=-1,  # any
                                               blockSize=-1)  # wildcard
        """
        label = getStreamLabel(sampleRate, channels, blockSize)
        # replace -1 with any regex integer
        simil = re.compile(label.replace("-1", r"[-+]?(\d+)"))  # I hate REGEX!
        for thisFormat in self:
            if simil.match(thisFormat):  # we found a close-enough match
                return thisFormat, self[thisFormat]
        # if we've been given values in each place then create stream
        if (sampleRate not in [None, -1, 0] and
                channels not in [None, -1] and
                blockSize not in [None, -1]):
            return self._getStream(sampleRate, channels, blockSize)

    def _getStream(self, sampleRate, channels, blockSize):
        """Strict check for this format or create new
        """
        label = getStreamLabel(sampleRate, channels, blockSize)
        # try to retrieve existing stream of that name
        if label in self:
            pass
        # todo: check if this is still needed on win32
        # on some systems more than one stream isn't supported so check
        elif sys.platform == 'win32' and len(self):
            raise SoundFormatError(
                "Tried to create audio stream {} but {} already exists "
                "and {} doesn't support multiple portaudio streams"
                .format(label, list(self.keys())[0], sys.platform)
            )
        else:

            # create new stream
            self[label] = _MasterStream(sampleRate, channels, blockSize,
                                        device=defaultOutput)
        return label, self[label]


streams = _StreamsDict()


class _MasterStream(audio.Stream):
    def __init__(self, sampleRate, channels, blockSize,
                 device=None, duplex=False, mode=1,
                 audioLatencyClass=None):
        # initialise thread
        if audioLatencyClass is None:
            audioLatencyClass = defaultLatencyClass
        self.streamLabel = None
        self.streams = []
        self.list = []
        # sound stream info
        self.sampleRate = sampleRate
        self.channels = channels
        self.duplex = duplex
        self.blockSize = blockSize
        self.label = getStreamLabel(sampleRate, channels, blockSize)
        if isinstance(device, list) and len(device):
            device = device[0]
        if isinstance(device, str):  # we need to convert name to an ID or make None
            devs = getDevices('output')
            if device in devs:
                deviceID = devs[device]['DeviceIndex']
            else:
                deviceID = None
        else:
            deviceID = device
        self.sounds = []  # list of dicts for sounds currently playing
        self.takeTimeStamp = False
        self.frameN = 1
        # self.frameTimes = range(5)  # DEBUGGING: store the last 5 callbacks
        if not systemtools.isVM_CI():  # Github Actions VM does not have a sound device
            try:
                audio.Stream.__init__(self, device_id=deviceID, mode=mode+8,
                                      latency_class=audioLatencyClass,
                                      freq=sampleRate,
                                      channels=channels,
                                      )  # suggested_latency=suggestedLatency
            except OSError as e:  # noqa: F841
                audio.Stream.__init__(self, device_id=deviceID, mode=mode+8,
                                      latency_class=audioLatencyClass,
                                      # freq=sampleRate,
                                      channels=channels,
                                      )
                self.sampleRate = self.status['SampleRate']
                print("Failed to start PTB.audio with requested rate of "
                      "{} but succeeded with a default rate ({}). "
                      "This is depends on the selected latency class and device."
                      .format(sampleRate, self.sampleRate))
            except TypeError as e:
                print("device={}, mode={}, latency_class={}, freq={}, channels={}"
                      .format(device, mode+8, audioLatencyClass, sampleRate, channels))
                raise e
            except Exception as e:
                if deviceID == -1:
                    # if default device doesn't setup from ptb, pick first device
                    for device in audio.get_devices(device_type=13):
                        logging.error(
                            f"System default audio device failed to connect, so using first found "
                            f"device: {device['DeviceName']}"
                        )
                        audio.Stream.__init__(
                            self,mode=mode+8,
                            device_id=device['DeviceIndex'],
                            freq=device['DefaultSampleRate'],
                            channels=device['NrOutputChannels']
                        )
                        break
                else:
                    # any other problem, try with no device ID
                    audio.Stream.__init__(
                        self, mode=mode+8,
                        latency_class=audioLatencyClass,
                        freq=sampleRate,
                        channels=channels,
                    )

                if "there isn't any audio output device" in str(e):
                    print("Failed to load audio device:\n"
                          "    '{}'\n"
                          "so fetching default audio device instead: \n"
                          "    '{}'"
                          .format(device, 'test'))
            self.start(0, 0, 1)
            # self.device = self._sdStream.device
            # self.latency = self._sdStream.latency
            # self.cpu_load = self._sdStream.cpu_load
        self._tSoundRequestPlay = 0


[docs]class SoundPTB(_SoundBase): """Play a variety of sounds using the new PsychPortAudio library """ def __init__(self, value="C", secs=0.5, octave=4, stereo=-1, volume=1.0, loops=0, sampleRate=None, blockSize=128, preBuffer=-1, hamming=True, startTime=0, stopTime=-1, name='', autoLog=True, syncToWin=None, speaker=None): """ :param value: note name ("C","Bfl"), filename or frequency (Hz) :param secs: duration (for synthesised tones) :param octave: which octave to use for note names (4 is middle) :param stereo: -1 (auto), True or False to force sounds to stereo or mono :param volume: float 0-1 :param loops: number of loops to play (-1=forever, 0=single repeat) :param sampleRate: sample rate for synthesized tones :param blockSize: the size of the buffer on the sound card (small for low latency, large for stability) :param preBuffer: integer to control streaming/buffering - -1 means store all - 0 (no buffer) means stream from disk - potentially we could buffer a few secs(!?) :param hamming: boolean (default True) to indicate if the sound should be apodized (i.e., the onset and offset smoothly ramped up from down to zero). The function apodize uses a Hanning window, but arguments named 'hamming' are preserved so that existing code is not broken by the change from Hamming to Hanning internally. Not applied to sounds from files. :param startTime: for sound files this controls the start of snippet :param stopTime: for sound files this controls the end of snippet :param name: string for logging purposes :param autoLog: whether to automatically log every change :param syncToWin: if you want start/stop to sync with win flips add this """ self.speaker = self._parseSpeaker(speaker) self.sound = value self.name = name self.secs = secs # for any synthesised sounds (notesand freqs) self.octave = octave # for note name sounds self.loops = self._loopsRequested = loops self._loopsFinished = 0 self.volume = volume self.startTime = startTime # for files self.stopTime = stopTime # for files specify thesection to be played self.blockSize = blockSize # can be per-sound unlike other backends self.preBuffer = preBuffer self.frameN = 0 self._tSoundRequestPlay = 0 self.sampleRate = sampleRate self.channels = None # let this be set by stereo self.stereo = stereo self.duplex = None self.autoLog = autoLog self.streamLabel = "" self.sourceType = 'unknown' # set to be file, array or freq self.sndFile = None self.sndArr = None self.hamming = hamming self._hammingWindow = None # will be created during setSound self.win = syncToWin # setSound (determines sound type) self.setSound(value, secs=self.secs, octave=self.octave, hamming=self.hamming) self._isPlaying = False # set `True` after `play()` is called self._isFinished = False self.status = NOT_STARTED @property def isPlaying(self): """`True` if the audio playback is ongoing.""" # This will update _isPlaying if sound has stopped by _EOS() _ = self._checkPlaybackFinished() return self._isPlaying @property def isFinished(self): """`True` if the audio playback has completed.""" return self._checkPlaybackFinished()
[docs] def _getDefaultSampleRate(self): """Check what streams are open and use one of these""" if len(streams): return list(streams.values())[0].sampleRate else: return 48000 # seems most widely supported
@property def statusDetailed(self): if not self.track: return None return self.track.status @property def volume(self): return self.__dict__['volume'] @volume.setter def volume(self, newVolume): self.__dict__['volume'] = newVolume if 'track' in self.__dict__: # Update volume of an existing track, if it exists. # (BUGFIX, otherwise only the member variable is updated, but the sound # volume does not change while playing - Suddha Sourav, 14.10.2020) self.__dict__['track']().volume = newVolume else: return None @property def stereo(self): return self.__dict__['stereo'] @stereo.setter def stereo(self, val): self.__dict__['stereo'] = val if val is True: self.__dict__['channels'] = 2 elif val is False: self.__dict__['channels'] = 1 elif val == -1: self.__dict__['channels'] = -1
[docs] def setSound(self, value, secs=0.5, octave=4, hamming=None, log=True): """Set the sound to be played. Often this is not needed by the user - it is called implicitly during initialisation. :parameters: value: can be a number, string or an array: * If it's a number between 37 and 32767 then a tone will be generated at that frequency in Hz. * It could be a string for a note ('A', 'Bfl', 'B', 'C', 'Csh'. ...). Then you may want to specify which octave. * Or a string could represent a filename in the current location, or mediaLocation, or a full path combo * Or by giving an Nx2 numpy array of floats (-1:1) you can specify the sound yourself as a waveform secs: duration (only relevant if the value is a note name or a frequency value) octave: is only relevant if the value is a note name. Middle octave of a piano is 4. Most computers won't output sounds in the bottom octave (1) and the top octave (8) is generally painful """ # reset self.loops to what was requested (in case altered for infinite play of tones) self.loops = self._loopsRequested # start with the base class method _SoundBase.setSound(self, value, secs, octave, hamming, log)
def _setSndFromFile(self, filename): # alias default names (so it always points to default.png) if filename in ft.defaultStim: filename = Path(prefs.paths['assets']) / ft.defaultStim[filename] self.sndFile = f = sf.SoundFile(filename) self.sourceType = 'file' self.sampleRate = f.samplerate if self.channels == -1: # if channels was auto then set to file val self.channels = f.channels fileDuration = float(len(f)) / f.samplerate # needed for duration? # process start time if self.startTime and self.startTime > 0: startFrame = self.startTime * self.sampleRate self.sndFile.seek(int(startFrame)) self.t = self.startTime else: self.t = 0 # process stop time if self.stopTime and self.stopTime > 0: requestedDur = self.stopTime - self.t self.duration = min(requestedDur, fileDuration) else: self.duration = fileDuration - self.t # can now calculate duration in frames self.durationFrames = int(round(self.duration * self.sampleRate)) # are we preloading or streaming? if self.preBuffer == 0: # no buffer - stream from disk on each call to nextBlock pass elif self.preBuffer == -1: # full pre-buffer. Load requested duration to memory sndArr = self.sndFile.read( frames=int(self.sampleRate * self.duration)) self.sndFile.close() self._setSndFromArray(sndArr) self._channelCheck( self.sndArr) # Check for fewer channels in stream vs data array def _setSndFromArray(self, thisArray): self.sndArr = np.asarray(thisArray).astype('float32') if thisArray.ndim == 1: self.sndArr.shape = [len(thisArray), 1] # make 2D for broadcasting if self.channels == 2 and self.sndArr.shape[1] == 1: # mono -> stereo self.sndArr = self.sndArr.repeat(2, axis=1) elif self.sndArr.shape[1] == 1: # if channels in [-1,1] then pass pass else: try: self.sndArr.shape = [len(thisArray), 2] except ValueError: raise ValueError("Failed to format sound with shape {} " "into sound with channels={}" .format(self.sndArr.shape, self.channels)) # is this stereo? if self.stereo == -1: # auto stereo. Try to detect if self.sndArr.shape[1] == 1: self.stereo = 0 elif self.sndArr.shape[1] == 2: self.stereo = 1 else: raise IOError("Couldn't determine whether array is " "stereo. Shape={}".format(self.sndArr.shape)) self._nSamples = thisArray.shape[0] if self.stopTime == -1: self.duration = self._nSamples / float(self.sampleRate) # set to run from the start: self.seek(0) self.sourceType = "array" if not self.track: # do we have one already? self.track = audio.Slave(self.stream.handle, data=self.sndArr, volume=self.volume) else: self.track.stop() self.track.fill_buffer(self.sndArr)
[docs] def _channelCheck(self, array): """Checks whether stream has fewer channels than data. If True, ValueError""" if self.channels < array.shape[1]: msg = ( "The sound stream is set up incorrectly. You have fewer channels in the buffer " "than in data file ({} vs {}).\n**Ensure you have selected 'Force stereo' in " "experiment settings**".format(self.channels, array.shape[1])) logging.error(msg) raise ValueError(msg)
[docs] def _checkPlaybackFinished(self): """Checks whether playback has finished by looking up the status. """ # get detailed status from backend pa_status = self.statusDetailed # was the sound already finished? wasFinished = self._isFinished # is it finished now? isFinished = self._isFinished = not pa_status['Active'] and pa_status['State'] == 0 # if it wasn't finished but now is, do end of stream behaviour if isFinished and not wasFinished: self._EOS() return self._isFinished
[docs] def play(self, loops=None, when=None, log=True): """Start the sound playing. Calling this after the sound has finished playing will restart the sound. """ if self._checkPlaybackFinished(): self.stop(reset=True) if loops is not None and self.loops != loops: self.setLoops(loops) self._tSoundRequestPlay = time.time() if hasattr(when, 'getFutureFlipTime'): logTime = when.getFutureFlipTime(clock=None) when = when.getFutureFlipTime(clock='ptb') elif when is None and hasattr(self.win, 'getFutureFlipTime'): logTime = self.win.getFutureFlipTime(clock=None) when = self.win.getFutureFlipTime(clock='ptb') else: logTime = None self.track.start(repetitions=loops, when=when) self._isPlaying = True self._isFinished = False # time.sleep(0.) if log and self.autoLog: logging.exp(u"Sound %s started" % (self.name), obj=self, t=logTime)
[docs] def pause(self, log=True): """Stops the sound without reset, so that play will continue from here if needed """ if self._isPlaying: self.stop(reset=False, log=False) if log and self.autoLog: logging.exp(u"Sound %s paused" % (self.name), obj=self)
[docs] def stop(self, reset=True, log=True): """Stop the sound and return to beginning """ # this uses FINISHED for some reason, all others use STOPPED if not self._isPlaying: return self.track.stop() self._isPlaying = False if reset: self.seek(0) if log and self.autoLog: logging.exp(u"Sound %s stopped" % (self.name), obj=self)
def seek(self, t): self.t = t self.frameN = int(round(t * self.sampleRate)) if self.sndFile and not self.sndFile.closed: self.sndFile.seek(self.frameN) self._isFinished = t >= self.duration
[docs] def _EOS(self, reset=True, log=True): """Function called on End Of Stream """ self._loopsFinished += 1 if self._loopsFinished >= self._loopsRequested: # if we have finished all requested loops self.stop(reset=reset, log=False) else: # reset _isFinished back to False self._isFinished = False if log and self.autoLog: logging.exp(u"Sound %s reached end of file" % self.name, obj=self)
@property def stream(self): """Read-only property returns the stream on which the sound will be played """ if not self.streamLabel: try: label, s = streams.getStream(sampleRate=self.sampleRate, channels=self.channels, blockSize=self.blockSize) except SoundFormatError as err: # try to use something similar (e.g. mono->stereo) # then check we have an appropriate stream open altern = streams._getSimilar(sampleRate=self.sampleRate, channels=-1, blockSize=-1) if altern is None: raise SoundFormatError(err) else: # safe to extract data label, s = altern # update self in case it changed to fit the stream self.sampleRate = s.sampleRate self.channels = s.channels self.blockSize = s.blockSize self.streamLabel = label return streams[self.streamLabel] def __del__(self): if self.track: self.track.close() self.track = None @property def track(self): """The track on the master stream to which we belong""" # the track is actually a weak reference to avoid circularity if 'track' in self.__dict__: return self.__dict__['track']() else: return None @track.setter def track(self, track): if track is None: self.__dict__['track'] = None else: self.__dict__['track'] = weakref.ref(track)

Back to top