#!/usr/bin/env python# -*- coding: utf-8 -*-# Part of the PsychoPy library# Copyright (C) 2012-2020 iSolver Software Solutions (C) 2021 Open Science Tools Ltd.# Distributed under the terms of the GNU General Public License (GPL).importosimportsysimporttimeimportsubprocessimportjsonimportsignalimportatexitfromweakrefimportproxyimportpsutiltry:importpsychopy.loggingaspsycho_loggingexceptImportError:psycho_logging=Nonefrom..lazy_importimportlazy_importfrom..importIOHUB_DIRECTORYfrom..utilimportyload,yLoaderfrom..errorsimportprint2err,ioHubError,printExceptionDetailsToStdErrfrom..utilimportisIterable,updateDict,win32MessagePumpfrom..devicesimportDeviceEvent,import_devicefrom..devices.computerimportComputerfrom..devices.experimentimportMessageEvent,LogEventfrom..constantsimportDeviceConstants,EventConstantsfrompsychopyimportconstantsgetTime=Computer.getTimeSHUTDOWN_FUNCS=[]_currentSessionInfo=NonedefwindowInfoDict(win):windict=dict(handle=win._hw_handle,pos=win.pos,size=win.size,units=win.units,useRetina=win.useRetina,monitor=None)ifwin.monitor:windict['monitor']=dict(resolution=win.monitor.getSizePix(),width=win.monitor.getWidth(),distance=win.monitor.getDistance())returnwindictdefgetFullClassName(klass):module=klass.__module__ifmodule=='builtins':returnklass.__qualname__# avoid outputs like 'builtins.str'returnmodule+'.'+klass.__qualname__classDeviceRPC():''' ioHubDeviceView creates an RPC interface with the iohub server. Each iohub device method exposed by an ioHubDeviceView is represented by an associated DeviceRPC instance. '''_log_time_index=DeviceEvent.EVENT_HUB_TIME_INDEX_log_text_index=LogEvent.CLASS_ATTRIBUTE_NAMES.index('text')_log_level_index=LogEvent.CLASS_ATTRIBUTE_NAMES.index('log_level')def__init__(self,sendToHub,device_class,method_name):self.device_class=device_classself.method_name=method_nameself.sendToHub=sendToHub@staticmethoddef_returnarg(a):# pragma: no coverreturnadef__call__(self,*args,**kwargs):# Send the device method call request to the ioHub Server and wait# for the method return value sent back from the ioHub Server.r=self.sendToHub(('EXP_DEVICE','DEV_RPC',self.device_class,self.method_name,args,kwargs))ifrisNone:# print("r is None:",('EXP_DEVICE', 'DEV_RPC', self.device_class,# self.method_name, args, kwargs))returnNoner=r[1:]iflen(r)==1:r=r[0]ifself.method_name!='getEvents':returnr# The result of a call to an iohub Device getEvents() method# gets some special handling, converting the returned events# into the desired object type, etc...asType='namedtuple'if'asType'inkwargs:asType=kwargs['asType']elif'as_type'inkwargs:asType=kwargs['as_type']conversionMethod=self._returnargifasType=='dict':conversionMethod=ioHubConnection.eventListToDictelifasType=='object':conversionMethod=ioHubConnection.eventListToObjectelifasType=='namedtuple':conversionMethod=ioHubConnection.eventListToNamedTupleifself.device_class!='Experiment':return[conversionMethod(el)forelinr]EVT_TYPE_IX=DeviceEvent.EVENT_TYPE_ID_INDEXLOG_EVT=LogEvent.EVENT_TYPE_IDtoBeLogged=[elforelinrifel[EVT_TYPE_IX]==LOG_EVT]forlintoBeLogged:r.remove(l)ifpsycho_logging:ltime=l[self._log_time_index]ltext=l[self._log_text_index]llevel=l[self._log_level_index]psycho_logging.log(ltext,llevel,ltime)return[conversionMethod(el)forelinr]# pylint: disable=protected-accessclassioHubDeviceView():""" ioHubDeviceView is used as a client / PsychoPy process side representation of an ioHub device that is actually running on the separate iohub process. An ioHubDeviceView instance allows the PsychoPy script process to call public iohub device methods as if the device method calls were being made locally. The ioHubConnection class creates an ioHubDeviceView instance for each ioHub device being run during the experiment. ioHubDeviceView instances are never created directly by a user script, they are created for you by the ioHubConnection class when it connects to the ioHub Process. """def__init__(self,hubClient,device_class_path,device_class_name,device_config):self.hubClient=hubClientself.name=device_config.get('name',device_class_name.lower())self.device_class=device_class_nameself.device_class_path=device_class_pathrpc_request=('EXP_DEVICE','GET_DEV_INTERFACE',device_class_name)r=self.hubClient._sendToHubServer(rpc_request)self._methods=r[1]def__getattr__(self,name):ifnameinself._methods:r=DeviceRPC(self.hubClient._sendToHubServer,self.device_class,name)returnrraiseAttributeError(self,name)defgetName(self):""" Gets the name given to the device in the ioHub configuration file. ( the device: name: property ) Args: None Returns: (str): the user defined label / name of the device """returnself.namedefgetIOHubDeviceClass(self,full=False):""" Gets the ioHub Device class associated with the oHubDeviceView. This is specified for a device in the ioHub configuration file. ( the device: device_class: property ) :param full: Returns: (class): ioHub Device class associated with this ioHubDeviceView """iffull:returnself.device_class_pathreturnself.device_classdefgetDeviceInterface(self):"""getDeviceInterface returns a list containing the names of all methods that are callable for the ioHubDeviceView object. Only public methods are included in the interface. Any method beginning with a '_' is not included. Args: None Returns: (tuple): list of method names in the ioHubDeviceView interface. """returnself._methods# pylint: enable=protected-accessclassioHubDevices():""" Provides .name access to the ioHub device's created when the ioHub Server is started. Each iohub device is accessible via a dynamically created attribute of this class, the name of which is defined by the device configuration 'name' setting. Each device attribute is an instance of the ioHubDeviceView class. A user script never creates an instance of this class directly, access is provided via the ioHubConnection.devices attribute. """def__init__(self,hubClient):self.hubClient=hubClientself._devicesByName=dict()defaddDevice(self,name,d):setattr(self,name,d)self._devicesByName[name]=ddefgetDevice(self,name):returnself._devicesByName.get(name)defgetAll(self):returnself._devicesByName.values()defgetNames(self):returnself._devicesByName.keys()
[docs]classioHubConnection():"""ioHubConnection is responsible for creating, sending requests to, and reading replies from the ioHub Process. This class is also used to shut down and disconnect the ioHub Server process. The ioHubConnection class is also used as the interface to any ioHub Device instances that have been created so that events from the device can be monitored. These device objects can be accessed via the ioHubConnection .devices attribute, providing 'dot name' access to enabled devices. Alternatively, the .getDevice(name) method can be used and will return None if the device name specified does not exist. Using the .devices attribute is handy if you know the name of the device to be accessed and you are sure it is actually enabled on the ioHub Process. An example of accessing a device using the .devices attribute:: # get the Mouse device, named mouse mouse=hub.devices.mouse mouse_position = mouse.getPosition() print('mouse position: ', mouse_position) # Returns something like: # >> mouse position: [-211.0, 371.0] """ACTIVE_CONNECTION=Nonedef__init__(self,ioHubConfig=None,ioHubConfigAbsPath=None):ifioHubConfig:ifnotisinstance(ioHubConfig,dict):raiseioHubError('The provided ioHub Configuration is not a dictionary.',ioHubConfig)ifioHubConnection.ACTIVE_CONNECTIONisnotNone:raiseRuntimeError('An existing ioHubConnection is already open. Use ''iohub.client.ioHubConnection.getActiveConnection() ''to access it; or use ''iohub.ioHubConnection.getActiveConnection().quit() ''to close it.')Computer.psychopy_process=psutil.Process()# udp port setupself.udp_client=None# the dynamically generated object that contains an attribute for# each device registered for monitoring with the ioHub server so# that devices can be accessed experiment process side by device name.self.devices=ioHubDevices(self)# A circular buffer used to hold events retrieved from self.getEvents()# during self.wait() periods.self.allEvents=[]self.experimentID=Noneself.experimentSessionID=Noneself._experimentMetaData=Noneself._sessionMetaData=Noneself._server_process=Noneself._iohub_server_config=Noneself._shutdown_attempted=Falseself._cv_order=Noneself._message_cache=[]self.iohub_status=self._startServer(ioHubConfig,ioHubConfigAbsPath)ifself.iohub_status!='OK':raiseRuntimeError('Error starting ioHub server: {}'.format(self.iohub_status))@classmethoddefgetActiveConnection(cls):returncls.ACTIVE_CONNECTION
[docs]defgetDevice(self,deviceName):""" Returns the ioHubDeviceView that has a matching name (based on the device : name property specified in the ioHub_config.yaml for the experiment). If no device with the given name is found, None is returned. Example, accessing a Keyboard device that was named 'kb' :: keyboard = self.getDevice('kb') kb_events= keyboard.getEvent() This is the same as using the 'natural naming' approach supported by the .devices attribute, i.e:: keyboard = self.devices.kb kb_events= keyboard.getEvent() However the advantage of using getDevice(device_name) is that an exception is not created if you provide an invalid device name, or if the device is not enabled on the ioHub server; None is returned instead. Args: deviceName (str): Name given to the ioHub Device to be returned Returns: The ioHubDeviceView instance for deviceName. """returnself.devices.getDevice(deviceName)
[docs]defgetEvents(self,device_label=None,as_type='namedtuple'):"""Retrieve any events that have been collected by the ioHub Process from monitored devices since the last call to getEvents() or clearEvents(). By default all events for all monitored devices are returned, with each event being represented as a namedtuple of all event attributes. When events are retrieved from an event buffer, they are removed from that buffer as well. If events are only needed from one device instead of all devices, providing a valid device name as the device_label argument will result in only events from that device being returned. Events can be received in one of several object types by providing the optional as_type property to the method. Valid values for as_type are the following str values: * 'list': Each event is a list of ordered attributes. * 'namedtuple': Each event is converted to a namedtuple object. * 'dict': Each event converted to a dict object. * 'object': Each event is converted to a DeviceEvent subclass based on the event's type. Args: device_label (str): Name of device to retrieve events for. If None ( the default ) returns device events from all devices. as_type (str): Returned event object type. Default: 'namedtuple'. Returns: tuple: List of event objects; object type controlled by 'as_type'. """r=Noneifdevice_labelisNone:events=self._sendToHubServer(('GET_EVENTS',))[1]ifeventsisNone:r=self.allEventselse:self.allEvents.extend(events)r=self.allEventsself.allEvents=[]else:r=self.devices.getDevice(device_label).getEvents()ifr:ifas_type=='list':returnrconversionMethod=Noneifas_type=='namedtuple':conversionMethod=self.eventListToNamedTupleelifas_type=='dict':conversionMethod=self.eventListToDictelifas_type=='object':conversionMethod=self.eventListToObjectifconversionMethod:return[conversionMethod(el)forelinr]returnrreturn[]
[docs]defclearEvents(self,device_label='all'):"""Clears unread events from the ioHub Server's Event Buffer(s) so that unneeded events are not discarded. If device_label is 'all', ( the default ), then events from both the ioHub *Global Event Buffer* and all *Device Event Buffer's* are cleared. If device_label is None then all events in the ioHub *Global Event Buffer* are cleared, but the *Device Event Buffers* are unaffected. If device_label is a str giving a valid device name, then that *Device Event Buffer* is cleared, but the *Global Event Buffer* is not affected. Args: device_label (str): device name, 'all', or None Returns: None """ifdevice_labelandisinstance(device_label,str):device_label=device_label.lower()ifdevice_label=='all':self.allEvents=[]self._sendToHubServer(('RPC','clearEventBuffer',[True,]))try:self.getDevice('keyboard')._clearLocalEvents()except:passelse:d=self.devices.getDevice(device_label)ifd:d.clearEvents()elifdevice_labelin[None,'',False]:self.allEvents=[]self._sendToHubServer(('RPC','clearEventBuffer',[False,]))try:self.getDevice('keyboard')._clearLocalEvents()except:passelse:raiseValueError('Invalid device_label value: {}'.format(device_label))
[docs]defsendMessageEvent(self,text,category='',offset=0.0,sec_time=None):""" Create and send an Experiment MessageEvent to the ioHub Server for storage in the ioDataStore hdf5 file. Args: text (str): The text message for the message event. 128 char max. category (str): A str grouping code for the message. Optional. 32 char max. offset (float): Optional sec.msec offset applied to the message event time stamp. Default 0. sec_time (float): Absolute sec.msec time stamp for the message in. If not provided, or None, then the MessageEvent is time stamped when this method is called using the global timer (core.getTime()). """self.cacheMessageEvent(text,category,offset,sec_time)self._sendToHubServer(('EXP_DEVICE','EVENT_TX',self._message_cache))self._message_cache=[]
[docs]defcacheMessageEvent(self,text,category='',offset=0.0,sec_time=None):""" Create an Experiment MessageEvent and store in local cache. Message must be sent before it is saved to hdf5 file. Args: text (str): The text message for the message event. 128 char max. category (str): A str grouping code for the message. Optional. 32 char max. offset (float): Optional sec.msec offset applied to the message event time stamp. Default 0. sec_time (float): Absolute sec.msec time stamp for the message in. If not provided, or None, then the MessageEvent is time stamped when this method is called using the global timer (core.getTime()). """self._message_cache.append(MessageEvent._createAsList(text,# pylint: disable=protected-accesscategory=category,msg_offset=offset,sec_time=sec_time))
defsendMessageEvents(self,messageList=[]):ifmessageList:self.cacheMessageEvents(messageList)ifself._message_cache:self._sendToHubServer(('EXP_DEVICE','EVENT_TX',self._message_cache))self._message_cache=[]defcacheMessageEvents(self,messageList):forminmessageList:self._message_cache.append(MessageEvent._createAsList(**m))defgetHubServerConfig(self):"""Returns a dict containing the current ioHub Server configuration. Args: None Returns: dict: ioHub Server configuration. """returnself._iohub_server_configdefgetSessionID(self):returnself.experimentSessionIDdefgetSessionMetaData(self):"""Returns a dict representing the experiment session data that is being used for the current ioHub Experiment Session. Changing values in the dict has no effect on the session data that has already been saved to the ioHub DataStore. Args: None Returns: dict: Experiment Session metadata saved to the ioHub DataStore. None if the ioHub DataStore is not enabled. """returnself._sessionMetaDatadefgetExperimentID(self):returnself.experimentIDdefgetExperimentMetaData(self):"""Returns a dict representing the experiment data that is being used for the current ioHub Experiment. Args: None Returns: dict: Experiment metadata saved to the ioHub DataStore. None if the ioHub DataStore is not enabled. """returnself._experimentMetaDatadefwait(self,delay,check_hub_interval=0.02):# TODO: Integrate iohub event collection done in this version of wait# with psychopy wait() and deprecate this method."""Pause the experiment script execution for delay seconds. time.sleep() is used for delays > 0.02 sec (20 msec) During the wait period, events are received from iohub every 'check_hub_interval' seconds, being buffered so they can be accessed after the wait duration. This is done for two reasons: * The iohub server's global and device level event buffers do not start to drop events if one of the (circular) event buffers becomes full during the wait duration. * The number of events in the iohub process event buffers does not becaome too large, which could result in a longer than normal getEvents() call time. Args: delay (float): The sec.msec delay until method returns. check_hub_interval (float): The sec.msec interval between calls to io.getEvents() during the delay period. Returns: float: The actual duration of the delay in sec.msec format. """stime=Computer.getTime()targetEndTime=stime+delayifcheck_hub_interval<0:check_hub_interval=0ifcheck_hub_interval>0:remainingSec=targetEndTime-Computer.getTime()whileremainingSec>check_hub_interval+0.025:time.sleep(check_hub_interval)events=self.getEvents()ifevents:self.allEvents.extend(events)# Call win32MessagePump so PsychoPy Windows do not become# 'unresponsive' if delay is long.win32MessagePump()remainingSec=targetEndTime-Computer.getTime()time.sleep(max(0.0,targetEndTime-Computer.getTime()-0.02))while(targetEndTime-Computer.getTime())>0.0:passreturnComputer.getTime()-stime
[docs]defcreateTrialHandlerRecordTable(self,trials,cv_order=None):""" Create a condition variable table in the ioHub data file based on the a psychopy TrialHandler. By doing so, the iohub data file can contain the DV and IV values used for each trial of an experiment session, along with all the iohub device events recorded by iohub during the session. Example psychopy code usage:: # Load a trial handler and # create an associated table in the iohub data file # from psychopy.data import TrialHandler, importConditions exp_conditions=importConditions('trial_conditions.xlsx') trials = TrialHandler(exp_conditions, 1) # Inform the ioHub server about the TrialHandler # io.createTrialHandlerRecordTable(trials) # Read a row of the trial handler for # each trial of your experiment # for trial in trials: # do whatever... # During the trial, trial variable values can be updated # trial['TRIAL_START']=flip_time # At the end of each trial, before getting # the next trial handler row, send the trial # variable states to iohub so they can be stored for future # reference. # io.addTrialHandlerRecord(trial) """trial=trials.trialList[0]self._cv_order=cv_orderifcv_orderisNone:self._cv_order=trial.keys()trial_condition_types=[]forcond_nameinself._cv_order:cond_val=trial[cond_name]ifisinstance(cond_val,str):numpy_dtype=(cond_name,'S',256)elifisinstance(cond_val,int):numpy_dtype=(cond_name,'i8')elifisinstance(cond_val,float):numpy_dtype=(cond_name,'f8')else:numpy_dtype=(cond_name,'S',256)trial_condition_types.append(numpy_dtype)# pylint: disable=protected-accesscvt_rpc=('RPC','initConditionVariableTable',(self.experimentID,self.experimentSessionID,trial_condition_types))r=self._sendToHubServer(cvt_rpc)returnr[2]
[docs]defaddTrialHandlerRecord(self,cv_row):"""Adds the values from a TriaHandler row / record to the iohub data file for future data analysis use. :param cv_row: :return: None """data=[]ifisinstance(cv_row,(list,tuple)):data=list(cv_row)elifself._cv_order:forcv_nameinself._cv_order:data.append(cv_row[cv_name])else:data=list(cv_row.values())fori,dinenumerate(data):ifisinstance(d,str):data[i]=d.encode('utf-8')cvt_rpc=('RPC','extendConditionVariableTable',(self.experimentID,self.experimentSessionID,data))r=self._sendToHubServer(cvt_rpc)returnr[2]
defregisterWindowHandles(self,*winHandles):""" Sends 1 - n Window handles to iohub so it can determine if kb or mouse events were targeted at a psychopy window or other window. """r=self._sendToHubServer(('RPC','registerWindowHandles',winHandles))returnr[2]defunregisterWindowHandles(self,*winHandles):""" Sends 1 - n Window handles to iohub so it can determine if kb or mouse events were targeted at a psychopy window or other window. """r=self._sendToHubServer(('RPC','unregisterWindowHandles',winHandles))returnr[2]defupdateWindowPos(self,win,x,y):r=self._sendToHubServer(('RPC','updateWindowPos',(win._hw_handle,(x,y))))returnr[2]
[docs]defgetTime(self):""" **Deprecated Method:** Use Computer.getTime instead. Remains here for testing time bases between processes only. """returnself._sendToHubServer(('RPC','getTime'))[2]
[docs]defsyncClock(self,clock):""" Synchronise ioHub's internal clock with a given instance of MonotonicClock. """params={'_timeAtLastReset':clock._timeAtLastReset,'_epochTimeAtLastReset':clock._epochTimeAtLastReset,'format':clock.format,}ifisinstance(params['format'],type):params['format']=params['format'].__name__# sync clock in this processforkey,valueinparams.items():setattr(Computer.global_clock,key,value)# sync clock in server processreturnself._sendToHubServer(('RPC','syncClock',(params,)))
[docs]defsetPriority(self,level='normal',disable_gc=False):"""See Computer.setPriority documentation, where current process will be the iohub process."""returnself._sendToHubServer(('RPC','setPriority',[level,disable_gc]))[2]
[docs]defgetPriority(self):"""See Computer.getPriority documentation, where current process will be the iohub process."""returnself._sendToHubServer(('RPC','getPriority'))[2]
[docs]defgetProcessAffinity(self):""" Returns the current **ioHub Process** affinity setting, as a list of 'processor' id's (from 0 to getSystemProcessorCount()-1). A Process's Affinity determines which CPU's or CPU cores a process can run on. By default the ioHub Process can run on any CPU or CPU core. This method is not supported on OS X at this time. Args: None Returns: list: A list of integer values between 0 and Computer.getSystemProcessorCount()-1, where values in the list indicate processing unit indexes that the ioHub process is able to run on. """r=self._sendToHubServer(('RPC','getProcessAffinity'))returnr[2]
[docs]defsetProcessAffinity(self,processor_list):""" Sets the **ioHub Process** Affinity based on the value of processor_list. A Process's Affinity determines which CPU's or CPU cores a process can run on. By default the ioHub Process can run on any CPU or CPU core. The processor_list argument must be a list of 'processor' id's; integers in the range of 0 to Computer.processing_unit_count-1, representing the processing unit indexes that the ioHub Server should be allowed to run on. If processor_list is given as an empty list, the ioHub Process will be able to run on any processing unit on the computer. This method is not supported on OS X at this time. Args: processor_list (list): A list of integer values between 0 and Computer.processing_unit_count-1, where values in the list indicate processing unit indexes that the ioHub process is able to run on. Returns: None """r=self._sendToHubServer(('RPC','setProcessAffinity',processor_list))returnr[2]
defaddDeviceToMonitor(self,device_class,device_config=None):""" Normally this method should not be used, as all devices should be specified when the iohub server is being started. Adds a device to the ioHub Server for event monitoring during the experiment. Adding a device to the iohub server after it has been started can take 10'2 to 100's of msec to perform on the ioHub server (depending on the device type). When the device is being added, events from existing devices can not be monitored. Args: device_class (str): The iohub class name of the device being added. device_config (dict): The device configuration settings to be set. Device settings not provided in device_config will be set to the default values specified by the device. Returns: DeviceView Instance: The PsychoPy Process's view of the ioHub Device created that was created, as would be returned if a device was accessed using the .devices attribute or the .getDeviceByLabel() method. """ifdevice_configisNone:device_config={}drpc=('EXP_DEVICE','ADD_DEVICE',device_class,device_config)r=self._sendToHubServer(drpc)device_class_name,dev_name,_=r[2]returnself._addDeviceView(dev_name,device_class_name)
[docs]defflushDataStoreFile(self):"""Manually tell the iohub datastore to flush any events it has buffered in memory to disk. Any cached message events are sent to the iohub server before flushing the iohub datastore. Args: None Returns: None """self.sendMessageEvents()r=self._sendToHubServer(('RPC','flushIODataStoreFile'))returnr
[docs]defstartCustomTasklet(self,task_name,task_class_path,**class_kwargs):""" Instruct the iohub server to start running a custom tasklet given by task_class_path. It is important that the custom task does not block for any significant amount of time, or the processing of events by the iohub server will be negatively effected. See the customtask.py demo for an example of how to make a long running task not block the rest of the iohub server. """class_kwargs.setdefault('name',task_name)r=self._sendToHubServer(('CUSTOM_TASK','START',task_name,task_class_path,class_kwargs))returnr
[docs]defstopCustomTasklet(self,task_name):""" Instruct the iohub server to stop the custom task that was previously started by calling self.startCustomTasklet(....). task_name identifies which custom task should be stopped and must match the task_name of a previously started custom task. """r=self._sendToHubServer(('CUSTOM_TASK','STOP',task_name))returnr
[docs]defshutdown(self):"""Tells the ioHub Server to close all ioHub Devices, the ioDataStore, and the connection monitor between the PsychoPy and ioHub Processes. Then end the server process itself. Args: None Returns: None """self._shutDownServer()
[docs]defquit(self):"""Same as the shutdown() method, but has same name as PsychoPy core.quit() so maybe easier to remember."""self.shutdown()
# Private Methods.....
[docs]def_startServer(self,ioHubConfig=None,ioHubConfigAbsPath=None):"""Starts the ioHub Process, storing it's process id, and creating the experiment side device representation for IPC access to public device methods."""experiment_info=Nonesession_info=Nonehub_defaults_config={}rootScriptPath=os.path.dirname(sys.argv[0])iflen(rootScriptPath)<=1:rootScriptPath=os.path.abspath(".")# >>>>> Load / Create / Update iohub config file.....cfpath=os.path.join(IOHUB_DIRECTORY,'default_config.yaml')withopen(cfpath,'r')asconfig_file:hub_defaults_config=yload(config_file,Loader=yLoader)ifioHubConfigAbsPathisNoneandioHubConfigisNone:ioHubConfig=dict(monitor_devices=[dict(Keyboard={}),dict(Display={}),dict(Mouse={})])elifioHubConfigisnotNoneandioHubConfigAbsPathisNone:if'monitor_devices'notinioHubConfig:raiseKeyError("ioHubConfig must be provided with ""'monitor_devices' key:value.")if'data_store'inioHubConfig:iods=ioHubConfig['data_store']if'experiment_info'iniodsand'session_info'iniods:experiment_info=iods['experiment_info']session_info=iods['session_info']else:raiseKeyError("ERROR: ioHubConfig:ioDataStore must ""contain both a 'experiment_info' and a ""'session_info' entry.")elifioHubConfigAbsPathisnotNoneandioHubConfigisNone:withopen(ioHubConfigAbsPath,'r')asconfig_file:ioHubConfig=yload(config_file,Loader=yLoader)else:raiseValueError('Both a ioHubConfig dict object AND a path to an ''ioHubConfig file can not be provided.')ifioHubConfig:updateDict(ioHubConfig,hub_defaults_config)ifioHubConfigandioHubConfigAbsPathisNone:ifisinstance(ioHubConfig.get('monitor_devices'),dict):# short hand device spec is being used. Convert dict of# devices in a list of device dicts.devs=ioHubConfig.get('monitor_devices')devsList=[{dname:dc}fordname,dcindevs.items()]ioHubConfig['monitor_devices']=devsListimporttempfilewithtempfile.NamedTemporaryFile(mode='w',suffix='iohub',delete=False)astfile:tfile.write(json.dumps(ioHubConfig))ioHubConfigAbsPath=os.path.abspath(tfile.name)# <<<<< Finished Load / Create / Update iohub config file.self._iohub_server_config=ioHubConfigifsys.platform=='darwin':self._osxKillAndFreePort()# >>>> Start iohub subprocessrun_script=os.path.join(IOHUB_DIRECTORY,'start_iohub_process.py')subprocessArgList=[sys.executable,run_script,'%.6f'%Computer.global_clock.getLastResetTime(),rootScriptPath,ioHubConfigAbsPath,"{}".format(Computer.current_process.pid)]# To enable coverage in the iohub process, set the iohub\default_config# setting 'coverage_env_var' to the name of the coverage# config file that exists in the psychopy\iohub site-packages folder.# For example:# coverage_env_var: .coveragerc## If coverage_env_var is None or the file is not found,# coverage of ioHub Server process is disabled.coverage_env_var=self._iohub_server_config.get('coverage_env_var')envars=dict(os.environ)ifcoverage_env_varnotin[None,'None']:coverage_env_var="{}".format(coverage_env_var)cov_config_path=os.path.join(IOHUB_DIRECTORY,coverage_env_var)ifos.path.exists(cov_config_path):print("Coverage enabled for ioHub Server Process.")else:print("ioHub Process Coverage conf file not found: %s",cov_config_path)envars['COVERAGE_PROCESS_START']=coverage_env_varself._server_process=subprocess.Popen(subprocessArgList,env=envars,cwd=IOHUB_DIRECTORY,# set sub process stderr to be stdout so PsychoPy Runner# shows errors from iohubstderr=subprocess.STDOUT,)# Get iohub server pid and psutil process object# for affinity and process priority setting.Computer.iohub_process_id=self._server_process.pidComputer.iohub_process=psutil.Process(self._server_process.pid)globalSHUTDOWN_FUNCSSHUTDOWN_FUNCS.append(self._shutDownServer)# >>>>> Create open UDP port to ioHub Serverserver_udp_port=self._iohub_server_config.get('udp_port',9000)from..netimportUDPClientConnection# initially open with a timeout so macOS does not hang. self.udp_client=UDPClientConnection(remote_port=server_udp_port,timeout=0.1)# If ioHub server does not respond correctly,# terminate process and exit the program.ifself._waitForServerInit()isFalse:self._server_process.terminate()return"ioHub startup failed."# close and reopen blocking version of socketself.udp_client.close()self.udp_client=UDPClientConnection(remote_port=server_udp_port)# <<<<< Done Creating open UDP port to ioHub Server# <<<<< Done starting iohub subprocessioHubConnection.ACTIVE_CONNECTION=proxy(self)# Send iohub server any existing open psychopy window handles.try:frompsychopy.visualimportwindowwindow.IOHUB_ACTIVE=Trueifwindow.openWindows:whs=[]# pylint: disable=protected-accessforwinwindow.openWindows:winfo=windowInfoDict(w())whs.append(winfo)w().backend.onMoveCallback=self.updateWindowPosself.registerWindowHandles(*whs)exceptImportError:pass# Sending experiment_info if available.....ifexperiment_info:self._sendExperimentInfo(experiment_info)# Sending session_info if available.....ifsession_info:# print 'Sending session_info: {0}'.format(session_info)self._sendSessionInfo(session_info)# >>>> Creating client side iohub device wrappers...self._createDeviceList(ioHubConfig['monitor_devices'])return'OK'
def_waitForServerInit(self):# >>>> Wait for iohub server ready signal ....hubonline=False# timeout if ioServer does not reply in 30 secondstimeout_duration=self._iohub_server_config.get('start_process_timeout',30.0)timeout_time=Computer.getTime()+timeout_durationwhilehubonlineisFalseandComputer.getTime()<timeout_time:r=self._sendToHubServer(['GET_IOHUB_STATUS',])ifr:hubonline=r[1]=='RUNNING'time.sleep(0.1)returnhubonline# # <<<< Finished wait for iohub server ready signal ....
[docs]def_createDeviceList(self,monitor_devices_config):"""Create client side iohub device views. """# get the list of devices registered with the ioHubfordevice_config_dictinmonitor_devices_config:device_class_name=list(device_config_dict.keys())[0]device_config=list(device_config_dict.values())[0]ifdevice_config.get('enable',True)isTrue:try:self._addDeviceView(device_class_name,device_config)exceptException:# pylint: disable=broad-exceptprint2err('_createDeviceList: Error adding class. ')printExceptionDetailsToStdErr()
[docs]def_addDeviceView(self,dev_cls_name,dev_config):"""Add an iohub device view to self.devices"""try:name=dev_config.get('name',dev_cls_name.lower())dev_cls_name="{}".format(dev_cls_name)dev_name=dev_cls_name.lower()cls_name_start=dev_name.rfind('.')dev_mod_pth='psychopy.iohub.devices.'ifcls_name_start>0:dev_mod_pth2=dev_name[:cls_name_start]dev_mod_pth='{0}{1}'.format(dev_mod_pth,dev_mod_pth2)dev_cls_name=dev_cls_name[cls_name_start+1:]else:dev_mod_pth='{0}{1}'.format(dev_mod_pth,dev_name)# try to import EyeTracker class from given pathtry:dev_import_result=import_device(dev_mod_pth,dev_cls_name)exceptModuleNotFoundError:# if not found, try importing from root (may have entry point)dev_import_result=import_device("psychopy.iohub.devices",dev_cls_name)dev_cls,dev_cls_name,evt_cls_list=dev_import_resultDeviceConstants.addClassMapping(dev_cls)device_event_ids=[]forevinlist(evt_cls_list.values()):ifev.EVENT_TYPE_ID:device_event_ids.append(ev.EVENT_TYPE_ID)EventConstants.addClassMappings(device_event_ids,evt_cls_list)name_start=name.rfind('.')ifname_start>0:name=name[name_start+1:]from..importclientasiohubclientmodlocal_class=Nonelocal_module=getattr(iohubclientmod,dev_cls_name.lower(),False)iflocal_module:# need to touch local_module since it was lazy loaded# pylint: disable=exec-usedexec('import psychopy.iohub.client.{}'.format(dev_cls_name.lower()))local_class=getattr(local_module,dev_cls_name,False)iflocal_class:d=local_class(self,dev_cls_name,dev_config)else:d=ioHubDeviceView(self,dev_mod_pth+"."+dev_cls_name,dev_cls_name,dev_config)self.devices.addDevice(name,d)returndexceptException:# pylint: disable=broad-exceptprint2err('_addDeviceView: Error adding class. ')printExceptionDetailsToStdErr()returnNone
[docs]def_sendToHubServer(self,tx_data):"""General purpose local <-> iohub server process UDP based request - reply code. The method blocks until the request is fulfilled and and a response is received from the ioHub server. Args: tx_data (tuple): data to send to iohub server Return (object): response from the ioHub Server process. """try:# send request to host, return is # bytes sent.#print("SEND:",tx_data)self.udp_client.sendTo(tx_data)exceptExceptionase:# pylint: disable=broad-exceptimporttracebacktraceback.print_exc()self.shutdown()raiseeresult=Nonetry:# wait for response from ioHub server, which will be the# result data and iohub server address (ip4,port).result=self.udp_client.receive()ifresult:result,_=result#print("RESULT:",result)exceptExceptionase:# pylint: disable=broad-exceptimporttracebacktraceback.print_exc()self.shutdown()raisee# check if the reply is an error or not. If it is, raise the error.# TODO: This is not really working as planned, in part because iohub# server does not consistently return error responses when needederrorReply=self._isErrorReply(result)iferrorReply:raiseioHubError(result)# Otherwise return the resultifresultisnotNone:# Use recursive conversion funcs ifisinstance(result,list)orisinstance(result,tuple):result=self._convertList(result)elifisinstance(result,dict):result=self._convertDict(result)returnresult
[docs]def_sendExperimentInfo(self,experimentInfoDict):"""Sends the experiment info from the experiment config file to the ioHub Server, which passes it to the ioDataStore, determines if the experiment already exists in the hdf5 file based on 'experiment_code', and returns a new or existing experiment ID based on that criteria. """fieldOrder=(('experiment_id',0),('code',''),('title',''),('description',''),('version',''))values=[]forkey,defaultValueinfieldOrder:ifkeyinexperimentInfoDict:values.append(experimentInfoDict[key])else:values.append(defaultValue)experimentInfoDict[key]=defaultValuer=self._sendToHubServer(('RPC','setExperimentInfo',(values,)))self.experimentID=r[2]experimentInfoDict['experiment_id']=self.experimentIDself._experimentMetaData=experimentInfoDictreturnr[2]
[docs]def_sendSessionInfo(self,sess_info):"""Sends the experiment session info from the experiment config file and the values entered into the session dialog to the ioHub Server, which passes it to the ioDataStore. The dataStore determines if the session already exists in the experiment file based on 'session_code', and returns a new session ID if session_code is not in use by the experiment. """ifself.experimentIDisNone:raiseRuntimeError("Experiment ID must be set by calling"" _sendExperimentInfo before calling"" _sendSessionInfo.")if'code'notinsess_info:raiseValueError("Code must be provided in sessionInfoDict"" ( StringCol(24) ).")if'name'notinsess_info:sess_info['name']=''if'comments'notinsess_info:sess_info['comments']=''if'user_variables'notinsess_info:sess_info['user_variables']={}org_sess_info=sess_info['user_variables']sess_info['user_variables']=json.dumps(sess_info['user_variables'])r=self._sendToHubServer(('RPC','createExperimentSessionEntry',(sess_info,)))self.experimentSessionID=r[2]sess_info['user_variables']=org_sess_infosess_info['session_id']=self.experimentSessionIDself._sessionMetaData=sess_inforeturnsess_info['session_id']
@staticmethoddefeventListToObject(evt_data):"""Convert an ioHub event currently in list value format into the correct ioHub.devices.DeviceEvent subclass for the given event type."""evt_type=evt_data[DeviceEvent.EVENT_TYPE_ID_INDEX]returnEventConstants.getClass(evt_type).createEventAsClass(evt_data)@staticmethoddefeventListToDict(evt_data):"""Convert an ioHub event currently in list value format into the event as a dictionary of attribute name, attribute values."""ifisinstance(evt_data,dict):returnevt_dataetype=evt_data[DeviceEvent.EVENT_TYPE_ID_INDEX]returnEventConstants.getClass(etype).createEventAsDict(evt_data)@staticmethoddefeventListToNamedTuple(evt_data):"""Convert an ioHub event currently in list value format into the namedtuple format for an event."""ifnotisinstance(evt_data,list):returnevt_dataetype=evt_data[DeviceEvent.EVENT_TYPE_ID_INDEX]returnEventConstants.getClass(etype).createEventAsNamedTuple(evt_data)# client utility methods.def_getDeviceList(self):r=self._sendToHubServer(('EXP_DEVICE','GET_DEVICE_LIST'))returnr[2]def_shutDownServer(self):ifself._shutdown_attemptedisFalse:# send any cached experiment messagesself.sendMessageEvents()try:frompsychopy.visualimportwindowwindow.IOHUB_ACTIVE=FalseexceptImportError:passself._shutdown_attempted=TrueTimeoutError=psutil.TimeoutExpiredtry:ifself.udp_client:# if it isn't already garbage-collectedself.udp_client.sendTo(('STOP_IOHUB_SERVER',))self.udp_client.close()ifComputer.iohub_process:# This wait() used to have timeout=5, removing it to allow# sufficient time for all iohub devices to be closed.r=Computer.iohub_process.wait()print('ioHub Server Process Completed With Code: ',r)exceptTimeoutError:print('Warning: TimeoutExpired, Killing ioHub Server process.')Computer.iohub_process.kill()exceptException:# pylint: disable=broad-exceptprint("Warning: Unhandled Exception. ""Killing ioHub Server process.")ifComputer.iohub_process:Computer.iohub_process.kill()printExceptionDetailsToStdErr()finally:ioHubConnection.ACTIVE_CONNECTION=Noneself._server_process=NoneComputer.iohub_process_id=NoneComputer.iohub_process=NonereturnTrue
[docs]@staticmethoddef_isErrorReply(data):""" Check if an iohub server reply contains an error that should be raised by the local process. """# is it an ioHub error object?ifisinstance(data,ioHubError):returnTrueifisIterable(data)andlen(data)>0:d0=data[0]ifisIterable(d0):returnFalseelse:ifisinstance(d0,str)andd0.find('ERROR')>=0:returndatareturnFalseelse:returndata#'Invalid Response Received from ioHub Server'
def_osxKillAndFreePort(self):server_udp_port=self._iohub_server_config.get('udp_port',9000)p=subprocess.Popen(['lsof','-i:%d'%server_udp_port,'-P'],stdout=subprocess.PIPE,encoding='utf-8')lines=p.communicate()[0]forlineinlines.splitlines():ifline.startswith('Python'):PID,userID=line.split()[1:3]# could verify same userID as current user, probably not neededos.kill(int(PID),signal.SIGKILL)print('Called os.kill(int(PID),signal.SIGKILL): ',PID,userID)def__del__(self):try:self._shutDownServer()ioHubConnection.ACTIVE_CONNECTION=NoneexceptException:# pylint: disable=broad-exceptpass
##############################################################################classioEvent():""" Parent class for all events generated by a psychopy.iohub.client Device wrapper. """_attrib_index=dict()_attrib_index['id']=DeviceEvent.EVENT_ID_INDEX_attrib_index['time']=DeviceEvent.EVENT_HUB_TIME_INDEX_attrib_index['type']=DeviceEvent.EVENT_TYPE_ID_INDEXdef__init__(self,ioe_array,device=None):self._time=ioe_array[ioEvent._attrib_index['time']]self._id=ioe_array[ioEvent._attrib_index['id']]self._type=ioe_array[ioEvent._attrib_index['type']]self._device=device@propertydefdevice(self):""" The ioHubDeviceView that is associated with the event, i.e. the iohub device view for the device that generated the event. :return: ioHubDeviceView """returnself._device@propertydeftime(self):""" The time stamp of the event. Uses the same time base that is used by psychopy.core.getTime() :return: float """returnself._time@propertydefid(self):"""The unique id for the event; in some cases used to track associated events. :return: int """returnself._id@propertydeftype(self):"""The event type string constant. :return: str """returnEventConstants.getName(self._type)@propertydefdict(self):d={}forkinself._attrib_index:d[k]=getattr(self,k)returnddef__str__(self):return'time: %.3f, type: %s, id: %d'%(self.time,self.type,self.id)defshutdownActiveConnections():"""Shutdown any active ioHub connections that are currently running. """activeConnection=ioHubConnection.getActiveConnection()ifactiveConnectionisnotNoneandhasattr(activeConnection,'shutdown'):activeConnection.shutdown()atexit.register(shutdownActiveConnections)_lazyImports="""from psychopy.iohub.client.connect import launchHubServerfrom psychopy.iohub.client import keyboardfrom psychopy.iohub.client import wintab"""try:lazy_import(globals(),_lazyImports)exceptExceptionase:#pylint: disable=broad-exceptprint2err('lazy_import Exception:',e)exec(_lazyImports)#pylint: disable=exec-used