Source code for psychopy.sound.microphone

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""Audio recording using a microphone.
"""

# Part of the PsychoPy library
# Copyright (C) 2002-2018 Jonathan Peirce (C) 2019-2022 Open Science Tools Ltd.
# Distributed under the terms of the GNU General Public License (GPL).

__all__ = ['Microphone']

import sys
import psychopy.logging as logging
from psychopy.constants import NOT_STARTED
from psychopy.preferences import prefs
from .audioclip import *
from .audiodevice import *
from .exceptions import *
import numpy as np

_hasPTB = True
try:
    import psychtoolbox.audio as audio
except (ImportError, ModuleNotFoundError):
    logging.warning(
        "The 'psychtoolbox' library cannot be loaded but is required for audio "
        "capture (use `pip install psychtoolbox` to get it). Microphone "
        "recording will be unavailable this session. Note that opening a "
        "microphone stream will raise an error.")
    _hasPTB = False


class RecordingBuffer:
    """Class for a storing a recording from a stream.

    Think of instances of this class behaving like an audio tape whereas the
    `Microphone` class is the tape recorder. Samples taken from the stream are
    written to the tape which stores the data.

    Used internally by the `Microphone` class, users usually do not create
    instances of this class themselves.

    Parameters
    ----------
    sampleRateHz : int
        Sampling rate for audio recording in Hertz (Hz). By default, 48kHz
        (``sampleRateHz=48000``) is used which is adequate for most consumer
        grade microphones (headsets and built-in).
    channels : int
        Number of channels to record samples to `1=Mono` and `2=Stereo`.
    maxRecordingSize : int
        Maximum recording size in kilobytes (Kb). Since audio recordings tend to
        consume a large amount of system memory, one might want to limit the
        size of the recording buffer to ensure that the application does not run
        out of memory. By default, the recording buffer is set to 24000 KB (or
        24 MB). At a sample rate of 48kHz, this will result in 62.5 seconds of
        continuous audio being recorded before the buffer is full.
    policyWhenFull : str
        What to do when the recording buffer is full and cannot accept any more
        samples. If 'ignore', samples will be silently dropped and the `isFull`
        property will be set to `True`. If 'warn', a warning will be logged and
        the `isFull` flag will be set. Finally, if 'error' the application will
        raise an exception.

    """
    def __init__(self, sampleRateHz=SAMPLE_RATE_48kHz, channels=2,
                 maxRecordingSize=24000, policyWhenFull='ignore'):
        self._channels = channels
        self._sampleRateHz = sampleRateHz
        self._maxRecordingSize = maxRecordingSize
        self._samples = None  # `ndarray` created in _allocRecBuffer`
        self._offset = 0  # recording offset
        self._lastSample = 0  # offset of the last sample from stream
        self._spaceRemaining = None  # set in `_allocRecBuffer`
        self._totalSamples = None  # set in `_allocRecBuffer`

        # check if the value is valid
        if policyWhenFull not in ['ignore', 'warn', 'error']:
            raise ValueError("Invalid value for `policyWhenFull`.")

        self._policyWhenFull = policyWhenFull
        self._warnedRecBufferFull = False
        self._loops = 0

        self._allocRecBuffer()

    def _allocRecBuffer(self):
        """Allocate the recording buffer. Called internally if properties are
        changed."""
        # allocate another array
        nBytes = self._maxRecordingSize * 1000
        recArraySize = int((nBytes / self._channels) / (np.float32()).itemsize)

        self._samples = np.zeros(
            (recArraySize, self._channels), dtype=np.float32, order='C')

        # sanity check
        assert self._samples.nbytes == nBytes
        self._totalSamples = len(self._samples)
        self._spaceRemaining = self._totalSamples

    @property
    def samples(self):
        """Reference to the actual sample buffer (`ndarray`)."""
        return self._samples

    @property
    def bufferSecs(self):
        """Capacity of the recording buffer in seconds (`float`)."""
        return self._totalSamples / self._sampleRateHz

    @property
    def nbytes(self):
        """Number of bytes the recording buffer occupies in memory (`int`)."""
        return self._samples.nbytes

    @property
    def sampleBytes(self):
        """Number of bytes per sample (`int`)."""
        return np.float32().itemsize

    @property
    def spaceRemaining(self):
        """The space remaining in the recording buffer (`int`). Indicates the
        number of samples that the buffer can still add before overflowing.
        """
        return self._spaceRemaining

    @property
    def isFull(self):
        """Is the recording buffer full (`bool`)."""
        return self._spaceRemaining <= 0

    @property
    def totalSamples(self):
        """Total number samples the recording buffer can hold (`int`)."""
        return self._totalSamples

    @property
    def writeOffset(self):
        """Index in the sample buffer where new samples will be written when
        `write()` is called (`int`).
        """
        return self._offset

    @property
    def lastSample(self):
        """Index of the last sample recorded (`int`). This can be used to slice
        the recording buffer, only getting data from the beginning to place
        where the last sample was written to.
        """
        return self._lastSample

    @property
    def loopCount(self):
        """Number of times the recording buffer restarted (`int`). Only valid if
        `loopback` is ``True``."""
        return self._loops

    @property
    def maxRecordingSize(self):
        """Maximum recording size in kilobytes (`int`).

        Since audio recordings tend to consume a large amount of system memory,
        one might want to limit the size of the recording buffer to ensure that
        the application does not run out of memory. By default, the recording
        buffer is set to 24000 KB (or 24 MB). At a sample rate of 48kHz, this
        will result in 62.5 seconds of continuous audio being recorded before
        the buffer is full.

        Setting this value will allocate another recording buffer of appropriate
        size. Avoid doing this in any time sensitive parts of your application.

        """
        return self._maxRecordingSize

    @maxRecordingSize.setter
    def maxRecordingSize(self, value):
        value = int(value)

        # don't do this unless the value changed
        if value == self._maxRecordingSize:
            return

        # if different than last value, update the recording buffer
        self._maxRecordingSize = value
        self._allocRecBuffer()

    def seek(self, offset, absolute=False):
        """Set the write offset.

        Use this to specify where to begin writing samples the next time `write`
        is called. You should call `seek(0)` when starting a new recording.

        Parameters
        ----------
        offset : int
            Position in the sample buffer to set.
        absolute : bool
            Use absolute positioning. Use relative positioning if `False` where
            the value of `offset` will be added to the current offset. Default
            is `False`.

        """
        if not absolute:
            self._offset += offset
        else:
            self._offset = absolute

        assert 0 <= self._offset < self._totalSamples
        self._spaceRemaining = self._totalSamples - self._offset

    def write(self, samples):
        """Write samples to the recording buffer.

        Parameters
        ----------
        samples : ArrayLike
            Samples to write to the recording buffer, usually of a stream. Must
            have the same number of dimensions as the internal array.

        Returns
        -------
        int
            Number of samples overflowed. If this is zero then all samples have
            been recorded, if not, the number of samples rejected is given.

        """
        nSamples = len(samples)
        if self.isFull:
            if self._policyWhenFull == 'ignore':
                return nSamples  # samples lost
            elif self._policyWhenFull == 'warn':
                if not self._warnedRecBufferFull:
                    logging.warning(
                        f"Audio recording buffer filled! This means that no "
                        f"samples are saved beyond {round(self.bufferSecs, 6)} "
                        f"seconds. Specify a larger recording buffer next time "
                        f"to avoid data loss.")
                    logging.flush()
                    self._warnedRecBufferFull = True
                return nSamples
            elif self._policyWhenFull == 'error':
                raise AudioRecordingBufferFullError(
                    "Cannot write samples, recording buffer is full.")
            else:
                return nSamples  # whatever

        if not nSamples:  # no samples came out of the stream, just return
            return

        if self._spaceRemaining >= nSamples:
            self._lastSample = self._offset + nSamples
            audioData = samples[:, :]
        else:
            self._lastSample = self._offset + self._spaceRemaining
            audioData = samples[:self._spaceRemaining, :]

        self._samples[self._offset:self._lastSample, :] = audioData
        self._offset += nSamples

        self._spaceRemaining -= nSamples

        # Check if the recording buffer is now full. Next call to `poll` will
        # not record anything.
        if self._spaceRemaining <= 0:
            self._spaceRemaining = 0

        d = nSamples - self._spaceRemaining
        return 0 if d < 0 else d

    def clear(self):
        # reset all live attributes
        self._samples = None
        self._offset = 0
        self._lastSample = 0
        self._spaceRemaining = None
        self._totalSamples = None
        # reallocate buffer
        self._allocRecBuffer()

    def getSegment(self, start=0, end=None):
        """Get a segment of recording data as an `AudioClip`.

        Parameters
        ----------
        start : float or int
            Absolute time in seconds for the start of the clip.
        end : float or int
            Absolute time in seconds for the end of the clip. If `None` the time
            at the last sample is used.

        Returns
        -------
        AudioClip
            Audio clip object with samples between `start` and `end`.

        """
        idxStart = int(start * self._sampleRateHz)
        idxEnd = self._lastSample if end is None else int(
            end * self._sampleRateHz)

        return AudioClip(
            np.array(self._samples[idxStart:idxEnd, :],
                     dtype=np.float32, order='C'),
            sampleRateHz=self._sampleRateHz)


[docs]class Microphone: """Class for recording audio from a microphone or input stream. Creating an instance of this class will open a stream using the specified device. Streams should remain open for the duration of your session. When a stream is opened, a buffer is allocated to store samples coming off it. Samples from the input stream will writen to the buffer once :meth:`~Microphone.start()` is called. Parameters ---------- device : int or `~psychopy.sound.AudioDevice` Audio capture device to use. You may specify the device either by index (`int`) or descriptor (`AudioDevice`). sampleRateHz : int Sampling rate for audio recording in Hertz (Hz). By default, 48kHz (``sampleRateHz=48000``) is used which is adequate for most consumer grade microphones (headsets and built-in). channels : int Number of channels to record samples to `1=Mono` and `2=Stereo`. streamBufferSecs : float Stream buffer size to pre-allocate for the specified number of seconds. The default is 2.0 seconds which is usually sufficient. maxRecordingSize : int Maximum recording size in kilobytes (Kb). Since audio recordings tend to consume a large amount of system memory, one might want to limit the size of the recording buffer to ensure that the application does not run out of memory. By default, the recording buffer is set to 24000 KB (or 24 MB). At a sample rate of 48kHz, this will result in 62.5 seconds of continuous audio being recorded before the buffer is full. audioLatencyMode : int or None Audio latency mode to use, values range between 0-4. If `None`, the setting from preferences will be used. Using `3` (exclusive mode) is adequate for most applications and required if using WASAPI on Windows for other settings (such audio quality) to take effect. Symbolic constants `psychopy.sound.audiodevice.AUDIO_PTB_LATENCY_CLASS_` can also be used. audioRunMode : int Run mode for the recording device. Default is standby-mode (`0`) which allows the system to put the device to sleep. However, when the device is needed, waking the device results in some latency. Using a run mode of `1` will keep the microphone running (or 'hot') with reduces latency when th recording is started. Cannot be set when after initialization at this time. Examples -------- Capture 10 seconds of audio from the primary microphone:: import psychopy.core as core import psychopy.sound.Microphone as Microphone mic = Microphone(bufferSecs=10.0) # open the microphone mic.start() # start recording core.wait(10.0) # wait 10 seconds mic.stop() # stop recording audioClip = mic.getRecording() print(audioClip.duration) # should be ~10 seconds audioClip.save('test.wav') # save the recorded audio as a 'wav' file The prescribed method for making long recordings is to poll the stream once per frame (or every n-th frame):: mic = Microphone(bufferSecs=2.0) mic.start() # start recording # main trial drawing loop mic.poll() win.flip() # calling the window flip function mic.stop() # stop recording audioClip = mic.getRecording() """ # Force the use of WASAPI for audio capture on Windows. If `True`, only # WASAPI devices will be returned when calling static method # `Microphone.getDevices()` enforceWASAPI = True def __init__(self, device=None, sampleRateHz=None, channels=None, streamBufferSecs=2.0, maxRecordingSize=24000, policyWhenFull='warn', audioLatencyMode=None, audioRunMode=0): if not _hasPTB: # fail if PTB is not installed raise ModuleNotFoundError( "Microphone audio capture requires package `psychtoolbox` to " "be installed.") def _getDeviceByIndex(deviceIndex): """Subroutine to get a device by index. Used to handle the case where the user specifies a device by index. Parameters ---------- deviceIndex : int, float or str Index of the device to get. Returns ------- AudioDeviceInfo Audio device information object. """ # convert to `int` first, sometimes strings can specify the enum # value from builder deviceIndex = int(deviceIndex) # get all audio devices devices_ = Microphone.getDevices() # get information about the selected device devicesByIndex = {d.deviceIndex: d for d in devices_} if deviceIndex in devicesByIndex: useDevice = devicesByIndex[deviceIndex] else: raise AudioInvalidCaptureDeviceError( 'No suitable audio recording devices found matching index ' '{}.'.format(deviceIndex)) return useDevice # get information about the selected device if isinstance(device, AudioDeviceInfo): self._device = device elif isinstance(device, (int, float, str)): self._device = _getDeviceByIndex(device) else: # get default device, first enumerated usually devices = Microphone.getDevices() if not devices: raise AudioInvalidCaptureDeviceError( 'No suitable audio recording devices found on this system. ' 'Check connections and try again.') self._device = devices[0] # use first logging.info('Using audio device #{} ({}) for audio capture'.format( self._device.deviceIndex, self._device.deviceName)) # error if specified device is not suitable for capture if not self._device.isCapture: raise AudioInvalidCaptureDeviceError( 'Specified audio device not suitable for audio recording. ' 'Has no input channels.') # get the sample rate self._sampleRateHz = \ self._device.defaultSampleRate if sampleRateHz is None else int( sampleRateHz) logging.debug('Set stream sample rate to {} Hz'.format( self._sampleRateHz)) # set the audio latency mode if audioLatencyMode is None: self._audioLatencyMode = int(prefs.hardware["audioLatencyMode"]) else: self._audioLatencyMode = audioLatencyMode logging.debug('Set audio latency mode to {}'.format( self._audioLatencyMode)) assert 0 <= self._audioLatencyMode <= 4 # sanity check for pref # set the number of recording channels self._channels = \ self._device.inputChannels if channels is None else int(channels) logging.debug('Set recording channels to {} ({})'.format( self._channels, 'stereo' if self._channels > 1 else 'mono')) if self._channels > self._device.inputChannels: raise AudioInvalidDeviceError( 'Invalid number of channels for audio input specified.') # internal recording buffer size in seconds assert isinstance(streamBufferSecs, (float, int)) self._streamBufferSecs = float(streamBufferSecs) # PTB specific stuff self._mode = 2 # open a stream in capture mode # Handle for the recording stream, should only be opened once per # session logging.debug('Opening audio stream for device #{}'.format( self._device.deviceIndex)) self._stream = audio.Stream( device_id=self._device.deviceIndex, latency_class=self._audioLatencyMode, mode=self._mode, freq=self._sampleRateHz, channels=self._channels) logging.debug('Stream opened') assert isinstance(audioRunMode, (float, int)) and \ (audioRunMode == 0 or audioRunMode == 1) self._audioRunMode = int(audioRunMode) self._stream.run_mode = self._audioRunMode logging.debug('Set run mode to `{}`'.format( self._audioRunMode)) # set latency bias self._stream.latency_bias = 0.0 logging.debug('Set stream latency bias to {} ms'.format( self._stream.latency_bias)) # pre-allocate recording buffer, called once self._stream.get_audio_data(self._streamBufferSecs) logging.debug( 'Allocated stream buffer to hold {} seconds of data'.format( self._streamBufferSecs)) # status flag for Builder self._statusFlag = NOT_STARTED # setup recording buffer self._recording = RecordingBuffer( sampleRateHz=self._sampleRateHz, channels=self._channels, maxRecordingSize=maxRecordingSize, policyWhenFull=policyWhenFull ) # setup clips and transcripts dicts self.clips = {} self.lastClip = None self.scripts = {} self.lastScript = None self._isStarted = False # internal state logging.debug('Audio capture device #{} ready'.format( self._device.deviceIndex))
[docs] @staticmethod def getDevices(): """Get a `list` of audio capture device (i.e. microphones) descriptors. On Windows, only WASAPI devices are used. Returns ------- list List of `AudioDevice` descriptors for suitable capture devices. If empty, no capture devices have been found. """ try: Microphone.enforceWASAPI = bool(prefs.hardware["audioForceWASAPI"]) except KeyError: pass # use default if option not present in settings # query PTB for devices if Microphone.enforceWASAPI and sys.platform == 'win32': allDevs = audio.get_devices(device_type=13) else: allDevs = audio.get_devices() # make sure we have an array of descriptors allDevs = [allDevs] if isinstance(allDevs, dict) else allDevs # create list of descriptors only for capture devices inputDevices = [desc for desc in [ AudioDeviceInfo.createFromPTBDesc(dev) for dev in allDevs] if desc.isCapture] return inputDevices
# def warmUp(self): # """Warm-/wake-up the audio stream. # # On some systems the first time `start` is called incurs additional # latency, whereas successive calls do not. To deal with this, it is # recommended that you run this warm-up routine prior to capturing audio # samples. By default, this routine is called when instancing a new # microphone object. # # """ # # We should put an actual test here to see if timing stabilizes after # # multiple invocations of this function. # self._stream.start() # self._stream.stop() @property def recording(self): """Reference to the current recording buffer (`RecordingBuffer`).""" return self._recording @property def recBufferSecs(self): """Capacity of the recording buffer in seconds (`float`).""" return self.recording.bufferSecs @property def maxRecordingSize(self): """Maximum recording size in kilobytes (`int`). Since audio recordings tend to consume a large amount of system memory, one might want to limit the size of the recording buffer to ensure that the application does not run out. By default, the recording buffer is set to 64000 KB (or 64 MB). At a sample rate of 48kHz, this will result in about. Using stereo audio (``nChannels == 2``) requires twice the buffer over mono (``nChannels == 2``) for the same length clip. Setting this value will allocate another recording buffer of appropriate size. Avoid doing this in any time sensitive parts of your application. """ return self._recording.maxRecordingSize @maxRecordingSize.setter def maxRecordingSize(self, value): self._recording.maxRecordingSize = value @property def latencyBias(self): """Latency bias to add when starting the microphone (`float`). """ return self._stream.latency_bias @latencyBias.setter def latencyBias(self, value): self._stream.latency_bias = float(value) @property def audioLatencyMode(self): """Audio latency mode in use (`int`). Cannot be set after initialization. """ return self._audioLatencyMode @property def streamBufferSecs(self): """Size of the internal audio storage buffer in seconds (`float`). To ensure all data is captured, there must be less time elapsed between subsequent `getAudioClip` calls than `bufferSecs`. """ return self._streamBufferSecs @property def status(self): """Status flag for the microphone. Value can be one of ``psychopy.constants.STARTED`` or ``psychopy.constants.NOT_STARTED``. This is attribute is used by Builder and does not correspond to the actual state of the microphone. Use `streamStatus` and `isStarted` instead. For detailed stream status information, use the :attr:`~psychopy.sound.microphone.Microphone.streamStatus` property. """ if hasattr(self, "_statusFlag"): return self._statusFlag @status.setter def status(self, value): self._statusFlag = value @property def streamStatus(self): """Status of the audio stream (`AudioDeviceStatus` or `None`). See :class:`~psychopy.sound.AudioDeviceStatus` for a complete overview of available status fields. This property has a value of `None` if the stream is presently closed. Examples -------- Get the capture start time of the stream:: # assumes mic.start() was called captureStartTime = mic.status.captureStartTime Check if microphone recording is active:: isActive = mic.status.active Get the number of seconds recorded up to this point:: recordedSecs = mic.status.recordedSecs """ currentStatus = self._stream.status if currentStatus != -1: return AudioDeviceStatus.createFromPTBDesc(currentStatus) @property def isRecBufferFull(self): """`True` if there is an overflow condition with the recording buffer. If this is `True`, then `poll()` is still collecting stream samples but is no longer writing them to anything, causing stream samples to be lost. """ return self._recording.isFull @property def isStarted(self): """``True`` if stream recording has been started (`bool`).""" return self._isStarted @property def isRecording(self): """``True`` if stream recording has been started (`bool`). Alias of `isStarted`.""" return self.isStarted
[docs] def start(self, when=None, waitForStart=0, stopTime=None): """Start an audio recording. Calling this method will begin capturing samples from the microphone and writing them to the buffer. Parameters ---------- when : float, int or None When to start the stream. If the time specified is a floating point (absolute) system time, the device will attempt to begin recording at that time. If `None` or zero, the system will try to start recording as soon as possible. waitForStart : bool Wait for sound onset if `True`. stopTime : float, int or None Number of seconds to record. If `None` or `-1`, recording will continue forever until `stop` is called. Returns ------- float Absolute time the stream was started. """ # check if the stream has been if self.isStarted: raise AudioStreamError( "Cannot start a stream, already started.") if self._stream is None: raise AudioStreamError("Stream not ready.") # reset the writing 'head' self._recording.seek(0, absolute=True) # reset warnings # self._warnedRecBufferFull = False startTime = self._stream.start( repetitions=0, when=when, wait_for_start=int(waitForStart), stop_time=stopTime) # recording has begun or is scheduled to do so self._isStarted = True logging.debug( 'Scheduled start of audio capture for device #{} at t={}.'.format( self._device.deviceIndex, startTime)) return startTime
[docs] def record(self, when=None, waitForStart=0, stopTime=None): """Start an audio recording (alias of `.start()`). Calling this method will begin capturing samples from the microphone and writing them to the buffer. Parameters ---------- when : float, int or None When to start the stream. If the time specified is a floating point (absolute) system time, the device will attempt to begin recording at that time. If `None` or zero, the system will try to start recording as soon as possible. waitForStart : bool Wait for sound onset if `True`. stopTime : float, int or None Number of seconds to record. If `None` or `-1`, recording will continue forever until `stop` is called. Returns ------- float Absolute time the stream was started. """ return self.start( when=when, waitForStart=waitForStart, stopTime=stopTime)
[docs] def stop(self, blockUntilStopped=True, stopTime=None): """Stop recording audio. Call this method to end an audio recording if in progress. This will simply halt recording and not close the stream. Any remaining samples will be polled automatically and added to the recording buffer. Parameters ---------- blockUntilStopped : bool Halt script execution until the stream has fully stopped. stopTime : float or None Scheduled stop time for the stream in system time. If `None`, the stream will stop as soon as possible. Returns ------- tuple or None Tuple containing `startTime`, `endPositionSecs`, `xruns` and `estStopTime`. Returns `None` if `stop` or `pause` was called previously before `start`. """ # This function must be idempotent since it can be invoked at any time # whether a stream is started or not. if not self.isStarted: return # poll remaining samples, if any if not self.isRecBufferFull: self.poll() startTime, endPositionSecs, xruns, estStopTime = self._stream.stop( block_until_stopped=int(blockUntilStopped), stopTime=stopTime) self._isStarted = False logging.debug( ('Device #{} stopped capturing audio samples at estimated time ' 't={}. Total overruns: {} Total recording time: {}').format( self._device.deviceIndex, estStopTime, xruns, endPositionSecs)) return startTime, endPositionSecs, xruns, estStopTime
[docs] def pause(self, blockUntilStopped=True, stopTime=None): """Pause a recording (alias of `.stop`). Call this method to end an audio recording if in progress. This will simply halt recording and not close the stream. Any remaining samples will be polled automatically and added to the recording buffer. Parameters ---------- blockUntilStopped : bool Halt script execution until the stream has fully stopped. stopTime : float or None Scheduled stop time for the stream in system time. If `None`, the stream will stop as soon as possible. Returns ------- tuple or None Tuple containing `startTime`, `endPositionSecs`, `xruns` and `estStopTime`. Returns `None` if `stop()` or `pause()` was called previously before `start()`. """ return self.stop(blockUntilStopped=blockUntilStopped, stopTime=stopTime)
[docs] def close(self): """Close the stream. Should not be called until you are certain you're done with it. Ideally, you should never close and reopen the same stream within a single session. """ self._stream.close() logging.debug('Stream closed')
[docs] def poll(self): """Poll audio samples. Calling this method adds audio samples collected from the stream buffer to the recording buffer that have been captured since the last `poll` call. Time between calls of this function should be less than `bufferSecs`. You do not need to call this if you call `stop` before the time specified by `bufferSecs` elapses since the `start` call. Can only be called between called of `start` (or `record`) and `stop` (or `pause`). Returns ------- int Number of overruns in sampling. """ if not self.isStarted: raise AudioStreamError( "Cannot poll samples from audio device, not started.") # figure out what to do with this other information audioData, absRecPosition, overflow, cStartTime = \ self._stream.get_audio_data() if overflow: logging.warning( "Audio stream buffer overflow, some audio samples have been " "lost! To prevent this, ensure `Microphone.poll()` is being " "called often enough, or increase the size of the audio buffer " "with `bufferSecs`.") overruns = self._recording.write(audioData) return overruns
[docs] def bank(self, tag=None, transcribe=False, **kwargs): """Store current buffer as a clip within the microphone object. This method is used internally by the Microphone component in Builder, don't use it for other applications. Either `stop()` or `pause()` must be called before calling this method. Parameters ---------- tag : str or None Label for the clip. transcribe : bool or str Set to the name of a transcription engine (e.g. "GOOGLE") to transcribe using that engine, or set as `False` to not transcribe. kwargs : dict Additional keyword arguments to pass to :class:`~psychopy.sound.AudioClip.transcribe()`. """ # make sure the tag exists in both clips and transcripts dicts if tag not in self.clips: self.clips[tag] = [] if tag not in self.scripts: self.scripts[tag] = [] # append current recording to clip list according to tag self.lastClip = self.getRecording() self.clips[tag].append(self.lastClip) # synonymise null values nullVals = ( 'undefined', 'NONE', 'None', 'none', 'False', 'false', 'FALSE') if transcribe in nullVals: transcribe = False # append current clip's transcription according to tag if transcribe: if transcribe in ('Built-in', True, 'BUILT_IN', 'BUILT-IN', 'Built-In', 'built-in'): engine = "sphinx" elif type(transcribe) == str: engine = transcribe else: raise ValueError( "Invalid transcription engine {} specified.".format( transcribe)) self.lastScript = self.lastClip.transcribe( engine=engine, **kwargs) else: self.lastScript = "Transcription disabled." self.scripts[tag].append(self.lastScript) # clear recording buffer self._recording.clear() # return banked items if transcribe: return self.lastClip, self.lastScript else: return self.lastClip
[docs] def clear(self): """Wipe all clips. Deletes previously banked audio clips. """ # clear clips self.clips = {} # clear recording self._recording.clear()
[docs] def flush(self): """Get a copy of all banked clips, then clear the clips from storage.""" # get copy of clips dict clips = self.clips.copy() self.clear() return clips
[docs] def getRecording(self): """Get audio data from the last microphone recording. Call this after `stop` to get the recording as an `AudioClip` object. Raises an error if a recording is in progress. Returns ------- AudioClip Recorded data between the last calls to `start` (or `record`) and `stop`. """ if self.isStarted: raise AudioStreamError( "Cannot get audio clip, recording was in progress. Be sure to " "call `Microphone.stop` first.") return self._recording.getSegment() # full recording
if __name__ == "__main__": pass

Back to top