Source code for psychopy.sound.microphone

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""Audio recording using a microphone.
"""

# Part of the PsychoPy library
# Copyright (C) 2002-2018 Jonathan Peirce (C) 2019-2025 Open Science Tools Ltd.
# Distributed under the terms of the GNU General Public License (GPL).

__all__ = ['Microphone']

from pathlib import Path
from psychopy import logging
from psychopy.constants import NOT_STARTED
from psychopy.hardware import DeviceManager
from psychopy.tools.attributetools import logAttrib


[docs] class Microphone: def __init__( self, device=None, sampleRateHz=None, channels=None, streamBufferSecs=2.0, maxRecordingSize=24000, policyWhenFull='warn', exclusive=False, audioRunMode=0, name="mic", recordingFolder=Path.home(), recordingExt="wav", # legacy audioLatencyMode=None, ): # store name self.name = name # store folder self.recordingFolder = Path(recordingFolder) # store ext (without dot) while recordingExt.startswith("."): recordingExt = recordingExt[1:] self.recordingExt = recordingExt # look for device if initialised self.device = DeviceManager.getDevice(device) # if no matching name, try matching index if self.device is None: self.device = DeviceManager.getDeviceBy("index", device) # if still no match, make a new device if self.device is None: self.device = DeviceManager.addDevice( deviceClass="psychopy.hardware.microphone.MicrophoneDevice", deviceName=device, index=device, sampleRateHz=sampleRateHz, channels=channels, streamBufferSecs=streamBufferSecs, maxRecordingSize=maxRecordingSize, policyWhenFull=policyWhenFull, exclusive=exclusive, audioRunMode=audioRunMode ) # set policy when full (in case device already existed) self.device.policyWhenFull = policyWhenFull # setup clips and transcripts dicts self.clips = {} self.lastClip = None self.scripts = {} self.lastScript = None # set initial status self.status = NOT_STARTED def __del__(self): self.saveClips() @property def maxRecordingSize(self): """ Until a file is saved, the audio data from a Microphone needs to be stored in RAM. To avoid a memory leak, we limit the amount which can be stored by a single Microphone object. The `maxRecordingSize` parameter defines what this limit is. Parameters ---------- value : int How much data (in kb) to allow, default is 24mb (so 24,000kb) """ return self.device.maxRecordingSize @maxRecordingSize.setter def maxRecordingSize(self, value): # set size self.device.maxRecordingSize = value
[docs] def setMaxRecordingSize(self, value): self.maxRecordingSize = value # log logAttrib( obj=self, log=True, attrib="maxRecordingSize", value=value )
setMaxRecordingSize.__doc__ == maxRecordingSize.__doc__ # the Builder param has a different name setMaxSize = setMaxRecordingSize @property def policyWhenFull(self): """ Until a file is saved, the audio data from a Microphone needs to be stored in RAM. To avoid a memory leak, we limit the amount which can be stored by a single Microphone object. The `policyWhenFull` parameter tells the Microphone what to do when it's reached that limit. Parameters ---------- value : str One of: - "ignore": When full, just don't record any new samples - "warn": Same as ignore, but will log a warning - "error": When full, will raise an error - "rolling": When full, clears the start of the buffer to make room for new samples """ return self.device.policyWhenFull @policyWhenFull.setter def policyWhenFull(self, value): return self.device.policyWhenFull
[docs] def setPolicyWhenFull(self, value): self.policyWhenFull = value # log logAttrib( obj=self, log=True, attrib="policyWhenFull", value=value )
setPolicyWhenFull.__doc__ = policyWhenFull.__doc__ @property def recording(self): return self.device.recording @property def recBufferSecs(self): return self.device.recBufferSecs @property def maxRecordingSize(self): return self.device.maxRecordingSize @maxRecordingSize.setter def maxRecordingSize(self, value): self.device.maxRecordingSize = value @property def latencyBias(self): return self.device.latencyBias @latencyBias.setter def latencyBias(self, value): self.device.latency_bias = value @property def audioLatencyMode(self): return self.device.audioLatencyMode @property def streamBufferSecs(self): return self.device.streamBufferSecs @property def streamStatus(self): return self.device.streamStatus @property def isRecBufferFull(self): return self.device.isRecBufferFull @property def isStarted(self): return self.device.isStarted @property def isRecording(self): return self.device.isRecording
[docs] def start(self, when=None, waitForStart=0, stopTime=None): return self.device.start( when=when, waitForStart=waitForStart, stopTime=stopTime )
[docs] def record(self, when=None, waitForStart=0, stopTime=None): return self.start( when=when, waitForStart=waitForStart, stopTime=stopTime )
[docs] def stop(self, blockUntilStopped=True, stopTime=None): return self.device.stop( blockUntilStopped=blockUntilStopped, stopTime=stopTime )
[docs] def pause(self, blockUntilStopped=True, stopTime=None): return self.stop( blockUntilStopped=blockUntilStopped, stopTime=stopTime )
[docs] def close(self): return self.device.close()
[docs] def reopen(self): return self.device.reopen()
[docs] def poll(self): return self.device.poll()
[docs] def saveClips(self, clear=True): """ Save all stored clips to audio files. Parameters ---------- clear : bool If True, clips will be removed from this object once saved to files. """ # iterate through all clips for tag in self.clips: logging.info(f"Saving {len(self.clips[tag])} audio clips with tag {tag}") for i, clip in enumerate(self.clips[tag]): # construct filename filename = self.getClipFilename(tag, i) # save clip clip.save(self.recordingFolder / filename) # clear if clear: del self.clips[tag][i]
[docs] def getClipFilename(self, tag, i=0): """ Get the filename for a particular clip. Parameters ---------- tag : str Tag assigned to the clip when `bank` was called i : int Index of clip within this tag (default is -1, i.e. the last clip) Returns ------- str Constructed filename for this clip """ # if there's more than 1 clip with this tag, append a counter counter = "" if i > 0: counter += f"_{i}" # construct filename filename = f"recording_{self.name}_{tag}{counter}.{self.recordingExt}" return filename
[docs] def bank(self, tag=None, transcribe=False, **kwargs): """Store current buffer as a clip within the microphone object. This method is used internally by the Microphone component in Builder, don't use it for other applications. Either `stop()` or `pause()` must be called before calling this method. Parameters ---------- tag : str or None Label for the clip. transcribe : bool or str Set to the name of a transcription engine (e.g. "GOOGLE") to transcribe using that engine, or set as `False` to not transcribe. kwargs : dict Additional keyword arguments to pass to :class:`~psychopy.sound.AudioClip.transcribe()`. """ # make sure the tag exists in both clips and transcripts dicts if tag not in self.clips: self.clips[tag] = [] if tag not in self.scripts: self.scripts[tag] = [] # append current recording to clip list according to tag lastClip = self.getRecording() if lastClip is not None: self.lastClip = lastClip self.clips[tag].append(lastClip) else: # if no recording, return the correct number of items if transcribe: return None, None else: return None # synonymise null values nullVals = ( 'undefined', 'NONE', 'None', 'none', 'False', 'false', 'FALSE') if transcribe in nullVals: transcribe = False # append current clip's transcription according to tag if transcribe: if transcribe in ('Built-in', True, 'BUILT_IN', 'BUILT-IN', 'Built-In', 'built-in'): engine = "sphinx" elif type(transcribe) == str: engine = transcribe else: raise ValueError( "Invalid transcription engine {} specified.".format( transcribe)) self.lastScript = self.lastClip.transcribe( engine=engine, **kwargs) else: self.lastScript = "Transcription disabled." self.scripts[tag].append(self.lastScript) # clear recording buffer self.device._recording.clear() # return banked items if transcribe: return self.lastClip, self.lastScript else: return self.lastClip
[docs] def clear(self): """Wipe all clips. Deletes previously banked audio clips. """ # clear clips self.clips = {} # clear recording self._recording.clear()
[docs] def flush(self): """Get a copy of all banked clips, then clear the clips from storage.""" # get copy of clips dict clips = self.clips.copy() self.clear() return clips
[docs] def getRecording(self): return self.device.getRecording()
if __name__ == "__main__": pass

Back to top