Source code for psychopy.iohub.client.keyboard

# -*- coding: utf-8 -*-
# Part of the PsychoPy library
# Copyright (C) 2012-2020 iSolver Software Solutions (C) 2021 Open Science Tools Ltd.
# Distributed under the terms of the GNU General Public License (GPL).

from collections import deque
import time
from ..client import ioHubDeviceView, ioEvent, DeviceRPC
from ..devices import DeviceEvent, Computer
from ..util import win32MessagePump
from ..devices.keyboard import KeyboardInputEvent
from ..constants import EventConstants, KeyboardConstants

# pylint: disable=protected-access

getTime = Computer.getTime
kb_cls_attr_names = KeyboardInputEvent.CLASS_ATTRIBUTE_NAMES
kb_mod_codes2labels = KeyboardConstants._modifierCodes2Labels


class KeyboardEvent(ioEvent):
    """
    Base class for KeyboardPress and KeyboardRelease events.

    Note that keyboard events can be compared using a single character
    basestring. For example::

        kb_evts = keyboard.getKeys(['a','b','c'])
        for event in kb_evts:
            if event.key == 'b' or event.char == 'b':
                return True
        return False

    can be written as:

        return 'b' in keyboard.getKeys()
    """
    _attrib_index = dict()
    _attrib_index['key'] = kb_cls_attr_names.index('key')
    _attrib_index['char'] = kb_cls_attr_names.index('char')
    _attrib_index['modifiers'] = kb_cls_attr_names.index('modifiers')

    def __init__(self, ioe_array):
        super(KeyboardEvent, self).__init__(ioe_array)
        for aname, aindex, in list(self._attrib_index.items()):
            setattr(self, '_%s' % aname, ioe_array[aindex])
        self._modifiers = kb_mod_codes2labels(self._modifiers)

    @property
    def key(self):
        return self._key

    @property
    def char(self):
        """The unicode value of the keyboard event, if available. This field is
        only populated when the keyboard event results in a character that
        could be printable.

        :return: unicode, '' if no char value is available for the event.

        """
        return self._char

    @property
    def modifiers(self):
        """
        A list of any modifier keys that were pressed when this keyboard event
        occurred.  Each element of the list contains a keyboard modifier string
        constant. Possible values are:

        * 'lctrl', 'rctrl'
        * 'lshift', 'rshift'
        * 'lalt', 'ralt' (labelled as 'option' keys on Apple Keyboards)
        * 'lcmd', 'rcmd' (map to the 'windows' key(s) on Windows keyboards)
        * 'menu'
        * 'capslock'
        * 'numlock'
        * 'function' (OS X only)
        * 'modhelp' (OS X only)

        If no modifiers were active when the event occurred, an empty list is
        returned.

        :return: tuple
        """
        return self._modifiers

    def __str__(self):
        pstr = ioEvent.__str__(self)
        return '{}, key: {} char: {}, modifiers: {}'.format(pstr, self.key,
                                                            self.char, self.modifiers)

    def __eq__(self, v):
        if isinstance(v, KeyboardEvent):
            return self.key == v.key
        return self.key == v

    def __ne__(self, v):
        return not self.__eq__(v)


[docs]class KeyboardPress(KeyboardEvent): """An iohub Keyboard device key press event.""" def __init__(self, ioe_array): super(KeyboardPress, self).__init__(ioe_array)
[docs]class KeyboardRelease(KeyboardEvent): """An iohub Keyboard device key release event.""" _attrib_index = dict(KeyboardEvent._attrib_index) _attrib_index['duration'] = kb_cls_attr_names.index('duration') _attrib_index['press_event_id'] = kb_cls_attr_names.index('press_event_id') def __init__(self, ioe_array): super(KeyboardRelease, self).__init__(ioe_array) # self._duration = ioe_array[self._attrib_index['duration']] # self._press_event_id = ioe_array[self._attrib_index['press_event_id']] @property def duration(self): """The duration (in seconds) of the key press. This is calculated by subtracting the current event.time from the associated keypress.time. If no matching keypress event was reported prior to this event, then 0.0 is returned. This can happen, for example, when the key was pressed prior to psychopy starting to monitor the device. This condition can also happen when keyboard.reset() method is called between the press and release event times. :return: float """ return self._duration @property def pressEventID(self): """The event.id of the associated press event. The key press id is 0 if no associated KeyboardPress event was found. See the duration property documentation for details on when this can occur. :return: unsigned int """ return self._press_event_id def __str__(self): return '%s, duration: %.3f press_event_id: %d' % ( KeyboardEvent.__str__(self), self.duration, self.pressEventID)
[docs]class Keyboard(ioHubDeviceView): """The Keyboard device provides access to KeyboardPress and KeyboardRelease events as well as the current keyboard state. Examples: A. Print all keyboard events received for 5 seconds:: from psychopy.iohub import launchHubServer from psychopy.core import getTime # Start the ioHub process. 'io' can now be used during the # experiment to access iohub devices and read iohub device events. io = launchHubServer() keyboard = io.devices.keyboard # Check for and print any Keyboard events received for 5 seconds. stime = getTime() while getTime()-stime < 5.0: for e in keyboard.getEvents(): print(e) # Stop the ioHub Server io.quit() B. Wait for a keyboard press event (max of 5 seconds):: from psychopy.iohub import launchHubServer from psychopy.core import getTime # Start the ioHub process. 'io' can now be used during the # experiment to access iohub devices and read iohub device events. io = launchHubServer() keyboard = io.devices.keyboard # Wait for a key keypress event ( max wait of 5 seconds ) presses = keyboard.waitForPresses(maxWait=5.0) print(presses) # Stop the ioHub Server io.quit() """ KEY_PRESS = EventConstants.KEYBOARD_PRESS KEY_RELEASE = EventConstants.KEYBOARD_RELEASE _type2class = {KEY_PRESS: KeyboardPress, KEY_RELEASE: KeyboardRelease} def __init__(self, ioclient, dev_cls_name, dev_config): super(Keyboard, self).__init__(ioclient, 'client.Keyboard', dev_cls_name, dev_config) self.clock = Computer.global_clock self._events = dict() self._reporting = self.isReportingEvents() self._pressed_keys = {} self._device_config = dev_config self._event_buffer_length = dev_config.get('event_buffer_length') self._clearEventsRPC = DeviceRPC(self.hubClient._sendToHubServer, self.device_class, 'clearEvents') def _clearLocalEvents(self, event_type=None): for etype, elist in list(self._events.items()): if event_type is None: elist.clear() elif event_type == etype: elist.clear()
[docs] def _syncDeviceState(self): """An optimized iohub server request that receives all device state and event information in one response. :return: None """ kb_state = self.getCurrentDeviceState() events = {int(k): v for k, v in list(kb_state.get('events').items())} pressed_keys = {int(k): v for k, v in list(kb_state.get('pressed_keys', {}).items())} self._reporting = kb_state.get('reporting_events') self._pressed_keys.clear() akeyix = KeyboardEvent._attrib_index['key'] iotimeix = DeviceEvent.EVENT_HUB_TIME_INDEX for _, (key_array, _) in pressed_keys.items(): self._pressed_keys[key_array[akeyix]] = key_array[iotimeix] for etype, event_arrays in events.items(): ddeque = deque(maxlen=self._event_buffer_length) evts = [self._type2class[etype](e) for e in event_arrays] self._events.setdefault(etype, ddeque).extend(evts)
@property def state(self): """ Returns all currently pressed keys as a dictionary of key : time values. The key is taken from the originating press event .key field. The time value is time of the key press event. Note that any pressed, or active, modifier keys are included in the return value. :return: dict """ self._syncDeviceState() self._pressed_keys = {keys: vals for keys, vals in self._pressed_keys.items()} return self._pressed_keys @property def reporting(self): """Specifies if the keyboard device is reporting / recording events. * True: keyboard events are being reported. * False: keyboard events are not being reported. By default, the Keyboard starts reporting events automatically when the ioHub process is started and continues to do so until the process is stopped. This property can be used to read or set the device reporting state:: # Read the reporting state of the keyboard. is_reporting_keyboard_event = keyboard.reporting # Stop the keyboard from reporting any new events. keyboard.reporting = False """ return self._reporting @reporting.setter def reporting(self, r): """Sets the state of keyboard event reporting / recording.""" self._reporting = self.enableEventReporting(r) return self._reporting def clearEvents(self, event_type=None, filter_id=None): self._clearLocalEvents(event_type) return self._clearEventsRPC(event_type=event_type, filter_id=filter_id)
[docs] def getKeys(self, keys=None, chars=None, ignoreKeys=None, mods=None, duration=None, etype=None, clear=True): """ Return a list of any KeyboardPress or KeyboardRelease events that have occurred since the last time either: * this method was called with the kwarg clear=True (default) * the keyboard.clear() method was called. Other than the 'clear' kwarg, any kwargs that are not None or an empty list are used to filter the possible events that can be returned. If multiple filter criteria are provided, only events that match **all** specified criteria are returned. If no KeyboardEvent's are found that match the filtering criteria, an empty tuple is returned. Returned events are sorted by time. :param keys: Include events where .key in keys. :param chars: Include events where .char in chars. :param ignoreKeys: Ignore events where .key in ignoreKeys. :param mods: Include events where .modifiers include >=1 mods element. :param duration: Include KeyboardRelease events where .duration > duration or .duration < -(duration). :param etype: Include events that match etype of Keyboard.KEY_PRESS or Keyboard.KEY_RELEASE. :param clear: True (default) = clear returned events from event buffer, False = leave the keyboard event buffer unchanged. :return: tuple of KeyboardEvent instances, or () """ self._syncDeviceState() ecount = 0 for elist in list(self._events.values()): ecount += len(elist) if ecount == 0: return [] def filterEvent(e): r1 = (keys is None or e.key in keys) and ( ignoreKeys is None or e.key not in ignoreKeys) r2 = (chars is None or e.char in chars) r3 = True if duration is not None: r3 = (duration // abs(duration) * e.duration) >= duration r4 = True if mods: r4 = len([m for m in mods if m in e.modifiers]) > 0 return r1 and r2 and r3 and r4 press_evt = [] release_evt = [] if etype is None or etype == Keyboard.KEY_PRESS: press_evt = [ e for e in self._events.get( Keyboard.KEY_PRESS, []) if filterEvent(e)] if etype is None or etype == Keyboard.KEY_RELEASE: release_evt = [ e for e in self._events.get( Keyboard.KEY_RELEASE, []) if filterEvent(e)] return_events = sorted(press_evt + release_evt, key=lambda x: x.time) if clear is True: for e in return_events: self._events[e._type].remove(e) return return_events
[docs] def getPresses(self, keys=None, chars=None, ignoreKeys=None, mods=None, clear=True): """See the getKeys() method documentation. This method is identical, but only returns KeyboardPress events. """ return self.getKeys( keys=keys, chars=chars, ignoreKeys=ignoreKeys, mods=mods, duration=None, etype=self.KEY_PRESS, clear=clear )
[docs] def getReleases(self, keys=None, chars=None, ignoreKeys=None, mods=None, duration=None, clear=True): """See the getKeys() method documentation. This method is identical, but only returns KeyboardRelease events. """ return self.getKeys( keys=keys, chars=chars, ignoreKeys=ignoreKeys, mods=mods, duration=duration, etype=self.KEY_RELEASE, clear=clear )
[docs] def waitForKeys(self, maxWait=None, keys=None, chars=None, mods=None, duration=None, etype=None, clear=True, checkInterval=0.002): """Blocks experiment execution until at least one matching KeyboardEvent occurs, or until maxWait seconds has passed since the method was called. Keyboard events are filtered the same way as in the getKeys() method. As soon as at least one matching KeyboardEvent occurs prior to maxWait, the matching events are returned as a tuple. Returned events are sorted by time. :param maxWait: Maximum seconds method waits for >=1 matching event. If <=0.0, method functions the same as getKeys(). If None, the methods blocks indefinitely. :param keys: Include events where .key in keys. :param chars: Include events where .char in chars. :param mods: Include events where .modifiers include >=1 mods element. :param duration: Include KeyboardRelease events where .duration > duration or .duration < -(duration). :param etype: Include events that match etype of Keyboard.KEY_PRESS or Keyboard.KEY_RELEASE. :param clear: True (default) = clear returned events from event buffer, False = leave the keyboard event buffer unchanged. :param checkInterval: The time between geyKeys() calls while waiting. The method sleeps between geyKeys() calls, up until checkInterval*2.0 sec prior to the maxWait. After that time, keyboard events are constantly checked until the method times out. :return: tuple of KeyboardEvent instances, or () """ start_time = getTime() if maxWait is None: maxWait = 60000.0 timeout = start_time + maxWait key = [] def pumpKeys(): key = self.getKeys( keys=keys, chars=chars, mods=mods, duration=duration, etype=etype, clear=clear ) if key: return key win32MessagePump() return key # Don't wait if maxWait is <= 0 if maxWait <= 0: key = pumpKeys() return key while getTime() < (timeout - checkInterval * 2): # Pump events on pyglet windows if they exist ltime = getTime() key = pumpKeys() if key: return key sleep_dur = max(checkInterval - (getTime() - ltime), 0.0001) time.sleep(sleep_dur) while getTime() < timeout: key = pumpKeys() if key: return key return key
[docs] def waitForPresses(self, maxWait=None, keys=None, chars=None, mods=None, duration=None, clear=True, checkInterval=0.002): """See the waitForKeys() method documentation. This method is identical, but only returns KeyboardPress events. """ return self.waitForKeys( maxWait=maxWait, keys=keys, chars=chars, mods=mods, duration=duration, etype=Keyboard.KEY_PRESS, # used this instead of `self.KEY_PRESS` clear=clear, checkInterval=checkInterval )
[docs] def waitForReleases(self, maxWait=None, keys=None, chars=None, mods=None, duration=None, clear=True, checkInterval=0.002): """See the waitForKeys() method documentation. This method is identical, but only returns KeyboardRelease events. """ return self.waitForKeys( maxWait=maxWait, keys=keys, chars=chars, mods=mods, duration=duration, etype=Keyboard.KEY_RELEASE, # used this instead of `self.KEY_RLEASE` clear=clear, checkInterval=checkInterval )

Back to top