first commit

This commit is contained in:
2025-11-01 06:00:52 +05:30
parent 6a49b604a7
commit ae7194a9f2
2110 changed files with 382375 additions and 0 deletions

View File

@@ -0,0 +1,39 @@
# --------------------------------------------------------------------------
# Gather everything into a single, convenient namespace.
# --------------------------------------------------------------------------
# The superfluous "import name as name" syntax is here to satisfy mypy's attrs-defined rule.
# Alternatively all exported objects can be listed in __all__.
from . import (
ecodes as ecodes,
ff as ff,
)
from .device import (
AbsInfo as AbsInfo,
DeviceInfo as DeviceInfo,
EvdevError as EvdevError,
InputDevice as InputDevice,
)
from .events import (
AbsEvent as AbsEvent,
InputEvent as InputEvent,
KeyEvent as KeyEvent,
RelEvent as RelEvent,
SynEvent as SynEvent,
event_factory as event_factory,
)
from .uinput import (
UInput as UInput,
UInputError as UInputError,
)
from .util import (
categorize as categorize,
list_devices as list_devices,
resolve_ecodes as resolve_ecodes,
resolve_ecodes_dict as resolve_ecodes_dict,
)

View File

@@ -0,0 +1,440 @@
import contextlib
import os
from typing import Dict, Generic, Iterator, List, Literal, NamedTuple, Tuple, TypeVar, Union, overload
from . import _input, ecodes, util
try:
from .eventio_async import EvdevError, EventIO
except ImportError:
from .eventio import EvdevError, EventIO
_AnyStr = TypeVar("_AnyStr", str, bytes)
class AbsInfo(NamedTuple):
"""Absolute axis information.
A ``namedtuple`` with absolute axis information -
corresponds to the ``input_absinfo`` struct:
Attributes
---------
value
Latest reported value for the axis.
min
Specifies minimum value for the axis.
max
Specifies maximum value for the axis.
fuzz
Specifies fuzz value that is used to filter noise from the
event stream.
flat
Values that are within this value will be discarded by joydev
interface and reported as 0 instead.
resolution
Specifies resolution for the values reported for the axis.
Resolution for main axes (``ABS_X, ABS_Y, ABS_Z``) is reported
in units per millimeter (units/mm), resolution for rotational
axes (``ABS_RX, ABS_RY, ABS_RZ``) is reported in units per
radian.
Note
----
The input core does not clamp reported values to the ``[minimum,
maximum]`` limits, such task is left to userspace.
"""
value: int
min: int
max: int
fuzz: int
flat: int
resolution: int
def __str__(self):
return "value {}, min {}, max {}, fuzz {}, flat {}, res {}".format(*self) # pylint: disable=not-an-iterable
class KbdInfo(NamedTuple):
"""Keyboard repeat rate.
Attributes
----------
delay
Amount of time that a key must be depressed before it will start
to repeat (in milliseconds).
repeat
Keyboard repeat rate in characters per second.
"""
delay: int
repeat: int
def __str__(self):
return "delay {}, repeat {}".format(self.delay, self.repeat)
class DeviceInfo(NamedTuple):
"""
Attributes
----------
bustype
vendor
product
version
"""
bustype: int
vendor: int
product: int
version: int
def __str__(self) -> str:
msg = "bus: {:04x}, vendor {:04x}, product {:04x}, version {:04x}"
return msg.format(*self) # pylint: disable=not-an-iterable
class InputDevice(EventIO, Generic[_AnyStr]):
"""
A linux input device from which input events can be read.
"""
__slots__ = ("path", "fd", "info", "name", "phys", "uniq", "_rawcapabilities", "version", "ff_effects_count")
def __init__(self, dev: Union[_AnyStr, "os.PathLike[_AnyStr]"]):
"""
Arguments
---------
dev : str|bytes|PathLike
Path to input device
"""
#: Path to input device.
self.path: _AnyStr = dev if not hasattr(dev, "__fspath__") else dev.__fspath__()
# Certain operations are possible only when the device is opened in read-write mode.
try:
fd = os.open(dev, os.O_RDWR | os.O_NONBLOCK)
except OSError:
fd = os.open(dev, os.O_RDONLY | os.O_NONBLOCK)
#: A non-blocking file descriptor to the device file.
self.fd: int = fd
# Returns (bustype, vendor, product, version, name, phys, capabilities).
info_res = _input.ioctl_devinfo(self.fd)
#: A :class:`DeviceInfo <evdev.device.DeviceInfo>` instance.
self.info = DeviceInfo(*info_res[:4])
#: The name of the event device.
self.name: str = info_res[4]
#: The physical topology of the device.
self.phys: str = info_res[5]
#: The unique identifier of the device.
self.uniq: str = info_res[6]
#: The evdev protocol version.
self.version: int = _input.ioctl_EVIOCGVERSION(self.fd)
#: The raw dictionary of device capabilities - see `:func:capabilities()`.
self._rawcapabilities = _input.ioctl_capabilities(self.fd)
#: The number of force feedback effects the device can keep in its memory.
self.ff_effects_count = _input.ioctl_EVIOCGEFFECTS(self.fd)
def __del__(self) -> None:
if hasattr(self, "fd") and self.fd is not None:
try:
self.close()
except (OSError, ImportError, AttributeError):
pass
def _capabilities(self, absinfo: bool = True):
res = {}
for etype, _ecodes in self._rawcapabilities.items():
for code in _ecodes:
l = res.setdefault(etype, [])
if isinstance(code, tuple):
if absinfo:
a = code[1] # (0, 0, 0, 255, 0, 0)
i = AbsInfo(*a)
l.append((code[0], i))
else:
l.append(code[0])
else:
l.append(code)
return res
@overload
def capabilities(self, verbose: Literal[False] = ..., absinfo: bool = ...) -> Dict[int, List[int]]:
...
@overload
def capabilities(self, verbose: Literal[True], absinfo: bool = ...) -> Dict[Tuple[str, int], List[Tuple[str, int]]]:
...
def capabilities(self, verbose: bool = False, absinfo: bool = True) -> Union[Dict[int, List[int]], Dict[Tuple[str, int], List[Tuple[str, int]]]]:
"""
Return the event types that this device supports as a mapping of
supported event types to lists of handled event codes.
Example
--------
>>> device.capabilities()
{ 1: [272, 273, 274],
2: [0, 1, 6, 8] }
If ``verbose`` is ``True``, event codes and types will be resolved
to their names.
::
{ ('EV_KEY', 1): [('BTN_MOUSE', 272),
('BTN_RIGHT', 273),
('BTN_MIDDLE', 273)],
('EV_REL', 2): [('REL_X', 0),
('REL_Y', 1),
('REL_HWHEEL', 6),
('REL_WHEEL', 8)] }
Unknown codes or types will be resolved to ``'?'``.
If ``absinfo`` is ``True``, the list of capabilities will also
include absolute axis information in the form of
:class:`AbsInfo` instances::
{ 3: [ (0, AbsInfo(min=0, max=255, fuzz=0, flat=0)),
(1, AbsInfo(min=0, max=255, fuzz=0, flat=0)) ]}
Combined with ``verbose`` the above becomes::
{ ('EV_ABS', 3): [ (('ABS_X', 0), AbsInfo(min=0, max=255, fuzz=0, flat=0)),
(('ABS_Y', 1), AbsInfo(min=0, max=255, fuzz=0, flat=0)) ]}
"""
if verbose:
return dict(util.resolve_ecodes_dict(self._capabilities(absinfo)))
else:
return self._capabilities(absinfo)
def input_props(self, verbose: bool = False):
"""
Get device properties and quirks.
Example
-------
>>> device.input_props()
[0, 5]
If ``verbose`` is ``True``, input properties are resolved to their
names. Unknown codes are resolved to ``'?'``::
[('INPUT_PROP_POINTER', 0), ('INPUT_PROP_POINTING_STICK', 5)]
"""
props = _input.ioctl_EVIOCGPROP(self.fd)
if verbose:
return util.resolve_ecodes(ecodes.INPUT_PROP, props)
return props
def leds(self, verbose: bool = False):
"""
Return currently set LED keys.
Example
-------
>>> device.leds()
[0, 1, 8, 9]
If ``verbose`` is ``True``, event codes are resolved to their
names. Unknown codes are resolved to ``'?'``::
[('LED_NUML', 0), ('LED_CAPSL', 1), ('LED_MISC', 8), ('LED_MAIL', 9)]
"""
leds = _input.ioctl_EVIOCG_bits(self.fd, ecodes.EV_LED)
if verbose:
return util.resolve_ecodes(ecodes.LED, leds)
return leds
def set_led(self, led_num: int, value: int) -> None:
"""
Set the state of the selected LED.
Example
-------
>>> device.set_led(ecodes.LED_NUML, 1)
"""
self.write(ecodes.EV_LED, led_num, value)
def __eq__(self, other):
"""
Two devices are equal if their :data:`info` attributes are equal.
"""
return isinstance(other, self.__class__) and self.info == other.info and self.path == other.path
def __str__(self) -> str:
msg = 'device {}, name "{}", phys "{}", uniq "{}"'
return msg.format(self.path, self.name, self.phys, self.uniq or "")
def __repr__(self) -> str:
msg = (self.__class__.__name__, self.path)
return "{}({!r})".format(*msg)
def __fspath__(self):
return self.path
def close(self) -> None:
if self.fd > -1:
try:
super().close()
os.close(self.fd)
finally:
self.fd = -1
def grab(self) -> None:
"""
Grab input device using ``EVIOCGRAB`` - other applications will
be unable to receive events until the device is released. Only
one process can hold a ``EVIOCGRAB`` on a device.
Warning
-------
Grabbing an already grabbed device will raise an ``OSError``.
"""
_input.ioctl_EVIOCGRAB(self.fd, 1)
def ungrab(self) -> None:
"""
Release device if it has been already grabbed (uses `EVIOCGRAB`).
Warning
-------
Releasing an already released device will raise an
``OSError('Invalid argument')``.
"""
_input.ioctl_EVIOCGRAB(self.fd, 0)
@contextlib.contextmanager
def grab_context(self) -> Iterator[None]:
"""
A context manager for the duration of which only the current
process will be able to receive events from the device.
"""
self.grab()
yield
self.ungrab()
def upload_effect(self, effect: "ff.Effect"):
"""
Upload a force feedback effect to a force feedback device.
"""
data = memoryview(effect).tobytes()
ff_id = _input.upload_effect(self.fd, data)
return ff_id
def erase_effect(self, ff_id) -> None:
"""
Erase a force effect from a force feedback device. This also
stops the effect.
"""
_input.erase_effect(self.fd, ff_id)
@property
def repeat(self):
"""
Get or set the keyboard repeat rate (in characters per
minute) and delay (in milliseconds).
"""
return KbdInfo(*_input.ioctl_EVIOCGREP(self.fd))
@repeat.setter
def repeat(self, value: Tuple[int, int]):
return _input.ioctl_EVIOCSREP(self.fd, *value)
def active_keys(self, verbose: bool = False):
"""
Return currently active keys.
Example
-------
>>> device.active_keys()
[1, 42]
If ``verbose`` is ``True``, key codes are resolved to their
verbose names. Unknown codes are resolved to ``'?'``. For
example::
[('KEY_ESC', 1), ('KEY_LEFTSHIFT', 42)]
"""
active_keys = _input.ioctl_EVIOCG_bits(self.fd, ecodes.EV_KEY)
if verbose:
return util.resolve_ecodes(ecodes.KEY, active_keys)
return active_keys
def absinfo(self, axis_num: int):
"""
Return current :class:`AbsInfo` for input device axis
Arguments
---------
axis_num : int
EV_ABS keycode (example :attr:`ecodes.ABS_X`)
Example
-------
>>> device.absinfo(ecodes.ABS_X)
AbsInfo(value=1501, min=-32768, max=32767, fuzz=0, flat=128, resolution=0)
"""
return AbsInfo(*_input.ioctl_EVIOCGABS(self.fd, axis_num))
def set_absinfo(self, axis_num: int, value=None, min=None, max=None, fuzz=None, flat=None, resolution=None) -> None:
"""
Update :class:`AbsInfo` values. Only specified values will be overwritten.
Arguments
---------
axis_num : int
EV_ABS keycode (example :attr:`ecodes.ABS_X`)
Example
-------
>>> device.set_absinfo(ecodes.ABS_X, min=-2000, max=2000)
You can also unpack AbsInfo tuple that will overwrite all values
>>> device.set_absinfo(ecodes.ABS_Y, *AbsInfo(0, -2000, 2000, 0, 15, 0))
"""
cur_absinfo = self.absinfo(axis_num)
new_absinfo = AbsInfo(
value if value is not None else cur_absinfo.value,
min if min is not None else cur_absinfo.min,
max if max is not None else cur_absinfo.max,
fuzz if fuzz is not None else cur_absinfo.fuzz,
flat if flat is not None else cur_absinfo.flat,
resolution if resolution is not None else cur_absinfo.resolution,
)
_input.ioctl_EVIOCSABS(self.fd, axis_num, new_absinfo)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,111 @@
# pylint: disable=undefined-variable
"""
This modules exposes the integer constants defined in ``linux/input.h`` and
``linux/input-event-codes.h``.
Exposed constants::
KEY, ABS, REL, SW, MSC, LED, BTN, REP, SND, ID, EV,
BUS, SYN, FF, FF_STATUS, INPUT_PROP
This module also provides reverse and forward mappings of the names and values
of the above mentioned constants::
>>> evdev.ecodes.KEY_A
30
>>> evdev.ecodes.ecodes['KEY_A']
30
>>> evdev.ecodes.KEY[30]
'KEY_A'
>>> evdev.ecodes.REL[0]
'REL_X'
>>> evdev.ecodes.EV[evdev.ecodes.EV_KEY]
'EV_KEY'
>>> evdev.ecodes.bytype[evdev.ecodes.EV_REL][0]
'REL_X'
Keep in mind that values in reverse mappings may point to one or more event
codes. For example::
>>> evdev.ecodes.FF[80]
('FF_EFFECT_MIN', 'FF_RUMBLE')
>>> evdev.ecodes.FF[81]
'FF_PERIODIC'
"""
from inspect import getmembers
from . import _ecodes
#: Mapping of names to values.
ecodes = {}
prefixes = "KEY ABS REL SW MSC LED BTN REP SND ID EV BUS SYN FF_STATUS FF INPUT_PROP UI_FF".split()
prev_prefix = ""
g = globals()
# eg. code: 'REL_Z', val: 2
for code, val in getmembers(_ecodes):
for prefix in prefixes: # eg. 'REL'
if code.startswith(prefix):
ecodes[code] = val
# FF_STATUS codes should not appear in the FF reverse mapping
if not code.startswith(prev_prefix):
d = g.setdefault(prefix, {})
# codes that share the same value will be added to a list. eg:
# >>> ecodes.FF_STATUS
# {0: 'FF_STATUS_STOPPED', 1: ['FF_STATUS_MAX', 'FF_STATUS_PLAYING']}
if val in d:
if isinstance(d[val], list):
d[val].append(code)
else:
d[val] = [d[val], code]
else:
d[val] = code
prev_prefix = prefix
# Convert lists to tuples.
k, v = None, None
for prefix in prefixes:
for k, v in g[prefix].items():
if isinstance(v, list):
g[prefix][k] = tuple(v)
#: Keys are a combination of all BTN and KEY codes.
keys = {}
keys.update(BTN)
keys.update(KEY)
# make keys safe to use for the default list of uinput device
# capabilities
del keys[_ecodes.KEY_MAX]
del keys[_ecodes.KEY_CNT]
#: Mapping of event types to other value/name mappings.
bytype = {
_ecodes.EV_KEY: keys,
_ecodes.EV_ABS: ABS,
_ecodes.EV_REL: REL,
_ecodes.EV_SW: SW,
_ecodes.EV_MSC: MSC,
_ecodes.EV_LED: LED,
_ecodes.EV_REP: REP,
_ecodes.EV_SND: SND,
_ecodes.EV_SYN: SYN,
_ecodes.EV_FF: FF,
_ecodes.EV_FF_STATUS: FF_STATUS,
}
from evdev._ecodes import *
# cheaper than whitelisting in an __all__
del code, val, prefix, getmembers, g, d, k, v, prefixes, prev_prefix

View File

@@ -0,0 +1,152 @@
import fcntl
import functools
import os
import select
from typing import Iterator, Union
from . import _input, _uinput, ecodes
from .events import InputEvent
# --------------------------------------------------------------------------
class EvdevError(Exception):
pass
class EventIO:
"""
Base class for reading and writing input events.
This class is used by :class:`InputDevice` and :class:`UInput`.
- On, :class:`InputDevice` it used for reading user-generated events (e.g.
key presses, mouse movements) and writing feedback events (e.g. leds,
beeps).
- On, :class:`UInput` it used for writing user-generated events (e.g.
key presses, mouse movements) and reading feedback events (e.g. leds,
beeps).
"""
def fileno(self):
"""
Return the file descriptor to the open event device. This makes
it possible to pass instances directly to :func:`select.select()` and
:class:`asyncore.file_dispatcher`.
"""
return self.fd
def read_loop(self) -> Iterator[InputEvent]:
"""
Enter an endless :func:`select.select()` loop that yields input events.
"""
while True:
r, w, x = select.select([self.fd], [], [])
for event in self.read():
yield event
def read_one(self) -> Union[InputEvent, None]:
"""
Read and return a single input event as an instance of
:class:`InputEvent <evdev.events.InputEvent>`.
Return ``None`` if there are no pending input events.
"""
# event -> (sec, usec, type, code, val)
event = _input.device_read(self.fd)
if event:
return InputEvent(*event)
def read(self) -> Iterator[InputEvent]:
"""
Read multiple input events from device. Return a generator object that
yields :class:`InputEvent <evdev.events.InputEvent>` instances. Raises
`BlockingIOError` if there are no available events at the moment.
"""
# events -> ((sec, usec, type, code, val), ...)
events = _input.device_read_many(self.fd)
for event in events:
yield InputEvent(*event)
# pylint: disable=no-self-argument
def need_write(func):
"""
Decorator that raises :class:`EvdevError` if there is no write access to the
input device.
"""
@functools.wraps(func)
def wrapper(*args):
fd = args[0].fd
if fcntl.fcntl(fd, fcntl.F_GETFL) & os.O_RDWR:
# pylint: disable=not-callable
return func(*args)
msg = 'no write access to device "%s"' % args[0].path
raise EvdevError(msg)
return wrapper
def write_event(self, event):
"""
Inject an input event into the input subsystem. Events are
queued until a synchronization event is received.
Arguments
---------
event: InputEvent
InputEvent instance or an object with an ``event`` attribute
(:class:`KeyEvent <evdev.events.KeyEvent>`, :class:`RelEvent
<evdev.events.RelEvent>` etc).
Example
-------
>>> ev = InputEvent(1334414993, 274296, ecodes.EV_KEY, ecodes.KEY_A, 1)
>>> ui.write_event(ev)
"""
if hasattr(event, "event"):
event = event.event
self.write(event.type, event.code, event.value)
@need_write
def write(self, etype: int, code: int, value: int):
"""
Inject an input event into the input subsystem. Events are
queued until a synchronization event is received.
Arguments
---------
etype
event type (e.g. ``EV_KEY``).
code
event code (e.g. ``KEY_A``).
value
event value (e.g. 0 1 2 - depends on event type).
Example
---------
>>> ui.write(e.EV_KEY, e.KEY_A, 1) # key A - down
>>> ui.write(e.EV_KEY, e.KEY_A, 0) # key A - up
"""
_uinput.write(self.fd, etype, code, value)
def syn(self):
"""
Inject a ``SYN_REPORT`` event into the input subsystem. Events
queued by :func:`write()` will be fired. If possible, events
will be merged into an 'atomic' event.
"""
self.write(ecodes.EV_SYN, ecodes.SYN_REPORT, 0)
def close(self):
pass

View File

@@ -0,0 +1,106 @@
import asyncio
import select
import sys
from . import eventio
from .events import InputEvent
# needed for compatibility
from .eventio import EvdevError
if sys.version_info >= (3, 11):
from typing import Self
else:
from typing import Any as Self
class ReadIterator:
def __init__(self, device):
self.current_batch = iter(())
self.device = device
# Standard iterator protocol.
def __iter__(self) -> Self:
return self
def __next__(self) -> InputEvent:
try:
# Read from the previous batch of events.
return next(self.current_batch)
except StopIteration:
r, w, x = select.select([self.device.fd], [], [])
self.current_batch = self.device.read()
return next(self.current_batch)
def __aiter__(self) -> Self:
return self
def __anext__(self) -> "asyncio.Future[InputEvent]":
future = asyncio.Future()
try:
# Read from the previous batch of events.
future.set_result(next(self.current_batch))
except StopIteration:
def next_batch_ready(batch):
try:
self.current_batch = batch.result()
future.set_result(next(self.current_batch))
except Exception as e:
future.set_exception(e)
self.device.async_read().add_done_callback(next_batch_ready)
return future
class EventIO(eventio.EventIO):
def _do_when_readable(self, callback):
loop = asyncio.get_event_loop()
def ready():
loop.remove_reader(self.fileno())
callback()
loop.add_reader(self.fileno(), ready)
def _set_result(self, future, cb):
try:
future.set_result(cb())
except Exception as error:
future.set_exception(error)
def async_read_one(self):
"""
Asyncio coroutine to read and return a single input event as
an instance of :class:`InputEvent <evdev.events.InputEvent>`.
"""
future = asyncio.Future()
self._do_when_readable(lambda: self._set_result(future, self.read_one))
return future
def async_read(self):
"""
Asyncio coroutine to read multiple input events from device. Return
a generator object that yields :class:`InputEvent <evdev.events.InputEvent>`
instances.
"""
future = asyncio.Future()
self._do_when_readable(lambda: self._set_result(future, self.read))
return future
def async_read_loop(self) -> ReadIterator:
"""
Return an iterator that yields input events. This iterator is
compatible with the ``async for`` syntax.
"""
return ReadIterator(self)
def close(self):
try:
loop = asyncio.get_event_loop()
loop.remove_reader(self.fileno())
except RuntimeError:
# no event loop present, so there is nothing to
# remove the reader from. Ignore
pass

View File

@@ -0,0 +1,192 @@
"""
This module provides the :class:`InputEvent` class, which closely
resembles the ``input_event`` struct defined in ``linux/input.h``:
.. code-block:: c
struct input_event {
struct timeval time;
__u16 type;
__u16 code;
__s32 value;
};
This module also defines several :class:`InputEvent` sub-classes that
know more about the different types of events (key, abs, rel etc). The
:data:`event_factory` dictionary maps event types to these classes.
Assuming you use the :func:`evdev.util.categorize()` function to
categorize events according to their type, adding or replacing a class
for a specific event type becomes a matter of modifying
:data:`event_factory`.
All classes in this module have reasonable ``str()`` and ``repr()``
methods::
>>> print(event)
event at 1337197425.477827, code 04, type 04, val 458792
>>> print(repr(event))
InputEvent(1337197425L, 477827L, 4, 4, 458792L)
>>> print(key_event)
key event at 1337197425.477835, 28 (KEY_ENTER), up
>>> print(repr(key_event))
KeyEvent(InputEvent(1337197425L, 477835L, 1, 28, 0L))
"""
# event type descriptions have been taken mot-a-mot from:
# http://www.kernel.org/doc/Documentation/input/event-codes.txt
# pylint: disable=no-name-in-module
from typing import Final
from .ecodes import ABS, EV_ABS, EV_KEY, EV_REL, EV_SYN, KEY, REL, SYN, keys
class InputEvent:
"""A generic input event."""
__slots__ = "sec", "usec", "type", "code", "value"
def __init__(self, sec, usec, type, code, value):
#: Time in seconds since epoch at which event occurred.
self.sec: int = sec
#: Microsecond portion of the timestamp.
self.usec: int = usec
#: Event type - one of ``ecodes.EV_*``.
self.type: int = type
#: Event code related to the event type.
self.code: int = code
#: Event value related to the event type.
self.value: int = value
def timestamp(self) -> float:
"""Return event timestamp as a float."""
return self.sec + (self.usec / 1000000.0)
def __str__(self):
msg = "event at {:f}, code {:02d}, type {:02d}, val {:02d}"
return msg.format(self.timestamp(), self.code, self.type, self.value)
def __repr__(self):
msg = "{}({!r}, {!r}, {!r}, {!r}, {!r})"
return msg.format(self.__class__.__name__, self.sec, self.usec, self.type, self.code, self.value)
class KeyEvent:
"""An event generated by a keyboard, button or other key-like devices."""
key_up: Final[int] = 0x0
key_down: Final[int] = 0x1
key_hold: Final[int] = 0x2
__slots__ = "scancode", "keycode", "keystate", "event"
def __init__(self, event: InputEvent, allow_unknown: bool = False):
"""
The ``allow_unknown`` argument determines what to do in the event of an event code
for which a key code cannot be found. If ``False`` a ``KeyError`` will be raised.
If ``True`` the keycode will be set to the hex value of the event code.
"""
self.scancode: int = event.code
if event.value == 0:
self.keystate = KeyEvent.key_up
elif event.value == 2:
self.keystate = KeyEvent.key_hold
elif event.value == 1:
self.keystate = KeyEvent.key_down
try:
self.keycode = keys[event.code]
except KeyError:
if allow_unknown:
self.keycode = "0x{:02X}".format(event.code)
else:
raise
#: Reference to an :class:`InputEvent` instance.
self.event: InputEvent = event
def __str__(self):
try:
ks = ("up", "down", "hold")[self.keystate]
except IndexError:
ks = "unknown"
msg = "key event at {:f}, {} ({}), {}"
return msg.format(self.event.timestamp(), self.scancode, self.keycode, ks)
def __repr__(self):
return "{}({!r})".format(self.__class__.__name__, self.event)
class RelEvent:
"""A relative axis event (e.g moving the mouse 5 units to the left)."""
__slots__ = "event"
def __init__(self, event: InputEvent):
#: Reference to an :class:`InputEvent` instance.
self.event: InputEvent = event
def __str__(self):
msg = "relative axis event at {:f}, {}"
return msg.format(self.event.timestamp(), REL[self.event.code])
def __repr__(self):
return "{}({!r})".format(self.__class__.__name__, self.event)
class AbsEvent:
"""An absolute axis event (e.g the coordinates of a tap on a touchscreen)."""
__slots__ = "event"
def __init__(self, event: InputEvent):
#: Reference to an :class:`InputEvent` instance.
self.event: InputEvent = event
def __str__(self):
msg = "absolute axis event at {:f}, {}"
return msg.format(self.event.timestamp(), ABS[self.event.code])
def __repr__(self):
return "{}({!r})".format(self.__class__.__name__, self.event)
class SynEvent:
"""
A synchronization event. Used as markers to separate events. Events may be
separated in time or in space, such as with the multitouch protocol.
"""
__slots__ = "event"
def __init__(self, event: InputEvent):
#: Reference to an :class:`InputEvent` instance.
self.event: InputEvent = event
def __str__(self):
msg = "synchronization event at {:f}, {}"
return msg.format(self.event.timestamp(), SYN[self.event.code])
def __repr__(self):
return "{}({!r})".format(self.__class__.__name__, self.event)
#: A mapping of event types to :class:`InputEvent` sub-classes. Used
#: by :func:`evdev.util.categorize()`
event_factory = {
EV_KEY: KeyEvent,
EV_REL: RelEvent,
EV_ABS: AbsEvent,
EV_SYN: SynEvent,
}
__all__ = ("InputEvent", "KeyEvent", "RelEvent", "SynEvent", "AbsEvent", "event_factory")

View File

@@ -0,0 +1,181 @@
"""
Usage: evtest [options] [<device>, ...]
Input device enumerator and event monitor.
Running evtest without any arguments will let you select
from a list of all readable input devices.
Options:
-h, --help Show this help message and exit.
-c, --capabilities List device capabilities and exit.
-g, --grab Other applications will not receive events from
the selected devices while evtest is running.
Examples:
evtest /dev/input/event0 /dev/input/event1
"""
import atexit
import optparse
import re
import select
import sys
import termios
from . import AbsInfo, InputDevice, ecodes, list_devices
def parseopt():
parser = optparse.OptionParser(add_help_option=False)
parser.add_option("-h", "--help", action="store_true")
parser.add_option("-g", "--grab", action="store_true")
parser.add_option("-c", "--capabilities", action="store_true")
return parser.parse_args()
def main():
opts, devices = parseopt()
if opts.help:
print(__doc__.strip())
return 0
if not devices:
devices = select_devices()
else:
devices = [InputDevice(path) for path in devices]
if opts.capabilities:
for device in devices:
print_capabilities(device)
return 0
if opts.grab:
for device in devices:
device.grab()
# Disable tty echoing if stdin is a tty.
if sys.stdin.isatty():
toggle_tty_echo(sys.stdin, enable=False)
atexit.register(toggle_tty_echo, sys.stdin, enable=True)
print("Listening for events (press ctrl-c to exit) ...")
fd_to_device = {dev.fd: dev for dev in devices}
while True:
r, w, e = select.select(fd_to_device, [], [])
for fd in r:
for event in fd_to_device[fd].read():
print_event(event)
def select_devices(device_dir="/dev/input"):
"""
Select one or more devices from a list of accessible input devices.
"""
def devicenum(device_path):
digits = re.findall(r"\d+$", device_path)
return [int(i) for i in digits]
devices = sorted(list_devices(device_dir), key=devicenum)
devices = [InputDevice(path) for path in devices]
if not devices:
msg = "error: no input devices found (do you have rw permission on %s/*?)"
print(msg % device_dir, file=sys.stderr)
sys.exit(1)
dev_format = "{0:<3} {1.path:<20} {1.name:<35} {1.phys:<35} {1.uniq:<4}"
dev_lines = [dev_format.format(num, dev) for num, dev in enumerate(devices)]
print("ID {:<20} {:<35} {:<35} {}".format("Device", "Name", "Phys", "Uniq"))
print("-" * len(max(dev_lines, key=len)))
print("\n".join(dev_lines))
print()
choices = input("Select devices [0-%s]: " % (len(dev_lines) - 1))
try:
choices = choices.split()
choices = [devices[int(num)] for num in choices]
except ValueError:
choices = None
if not choices:
msg = "error: invalid input - please enter one or more numbers separated by spaces"
print(msg, file=sys.stderr)
sys.exit(1)
return choices
def print_capabilities(device):
capabilities = device.capabilities(verbose=True)
input_props = device.input_props(verbose=True)
print("Device name: {.name}".format(device))
print("Device info: {.info}".format(device))
print("Repeat settings: {}\n".format(device.repeat))
if ("EV_LED", ecodes.EV_LED) in capabilities:
leds = ",".join(i[0] for i in device.leds(True))
print("Active LEDs: %s" % leds)
active_keys = ",".join(k[0] for k in device.active_keys(True))
print("Active keys: %s\n" % active_keys)
if input_props:
print("Input properties:")
for type, code in input_props:
print(" %s %s" % (type, code))
print()
print("Device capabilities:")
for type, codes in capabilities.items():
print(" Type {} {}:".format(*type))
for code in codes:
# code <- ('BTN_RIGHT', 273) or (['BTN_LEFT', 'BTN_MOUSE'], 272)
if isinstance(code[1], AbsInfo):
print(" Code {:<4} {}:".format(*code[0]))
print(" {}".format(code[1]))
else:
# Multiple names may resolve to one value.
s = ", ".join(code[0]) if isinstance(code[0], list) else code[0]
print(" Code {:<4} {}".format(s, code[1]))
print("")
def print_event(e):
if e.type == ecodes.EV_SYN:
if e.code == ecodes.SYN_MT_REPORT:
msg = "time {:<17} +++++++++++++ {} +++++++++++++"
elif e.code == ecodes.SYN_DROPPED:
msg = "time {:<17} !!!!!!!!!!!!! {} !!!!!!!!!!!!!"
else:
msg = "time {:<17} ------------- {} -------------"
print(msg.format(e.timestamp(), ecodes.SYN[e.code]))
else:
if e.type in ecodes.bytype:
codename = ecodes.bytype[e.type][e.code]
else:
codename = "?"
evfmt = "time {:<17} type {} ({}), code {:<4} ({}), value {}"
print(evfmt.format(e.timestamp(), e.type, ecodes.EV[e.type], e.code, codename, e.value))
def toggle_tty_echo(fh, enable=True):
flags = termios.tcgetattr(fh.fileno())
if enable:
flags[3] |= termios.ECHO
else:
flags[3] &= ~termios.ECHO
termios.tcsetattr(fh.fileno(), termios.TCSANOW, flags)
if __name__ == "__main__":
try:
ret = main()
except (KeyboardInterrupt, EOFError):
ret = 0
sys.exit(ret)

View File

@@ -0,0 +1,198 @@
import ctypes
from . import ecodes
_u8 = ctypes.c_uint8
_u16 = ctypes.c_uint16
_u32 = ctypes.c_uint32
_s16 = ctypes.c_int16
_s32 = ctypes.c_int32
class Replay(ctypes.Structure):
"""
Defines scheduling of the force-feedback effect
@length: duration of the effect
@delay: delay before effect should start playing
"""
_fields_ = [
("length", _u16),
("delay", _u16),
]
class Trigger(ctypes.Structure):
"""
Defines what triggers the force-feedback effect
@button: number of the button triggering the effect
@interval: controls how soon the effect can be re-triggered
"""
_fields_ = [
("button", _u16),
("interval", _u16),
]
class Envelope(ctypes.Structure):
"""
Generic force-feedback effect envelope
@attack_length: duration of the attack (ms)
@attack_level: level at the beginning of the attack
@fade_length: duration of fade (ms)
@fade_level: level at the end of fade
The @attack_level and @fade_level are absolute values; when applying
envelope force-feedback core will convert to positive/negative
value based on polarity of the default level of the effect.
Valid range for the attack and fade levels is 0x0000 - 0x7fff
"""
_fields_ = [
("attack_length", _u16),
("attack_level", _u16),
("fade_length", _u16),
("fade_level", _u16),
]
class Constant(ctypes.Structure):
"""
Defines parameters of a constant force-feedback effect
@level: strength of the effect; may be negative
@envelope: envelope data
"""
_fields_ = [
("level", _s16),
("ff_envelope", Envelope),
]
class Ramp(ctypes.Structure):
"""
Defines parameters of a ramp force-feedback effect
@start_level: beginning strength of the effect; may be negative
@end_level: final strength of the effect; may be negative
@envelope: envelope data
"""
_fields_ = [
("start_level", _s16),
("end_level", _s16),
("ff_envelope", Envelope),
]
class Condition(ctypes.Structure):
"""
Defines a spring or friction force-feedback effect
@right_saturation: maximum level when joystick moved all way to the right
@left_saturation: same for the left side
@right_coeff: controls how fast the force grows when the joystick moves to the right
@left_coeff: same for the left side
@deadband: size of the dead zone, where no force is produced
@center: position of the dead zone
"""
_fields_ = [
("right_saturation", _u16),
("left_saturation", _u16),
("right_coeff", _s16),
("left_coeff", _s16),
("deadband", _u16),
("center", _s16),
]
class Periodic(ctypes.Structure):
"""
Defines parameters of a periodic force-feedback effect
@waveform: kind of the effect (wave)
@period: period of the wave (ms)
@magnitude: peak value
@offset: mean value of the wave (roughly)
@phase: 'horizontal' shift
@envelope: envelope data
@custom_len: number of samples (FF_CUSTOM only)
@custom_data: buffer of samples (FF_CUSTOM only)
"""
_fields_ = [
("waveform", _u16),
("period", _u16),
("magnitude", _s16),
("offset", _s16),
("phase", _u16),
("envelope", Envelope),
("custom_len", _u32),
("custom_data", ctypes.POINTER(_s16)),
]
class Rumble(ctypes.Structure):
"""
Defines parameters of a periodic force-feedback effect
@strong_magnitude: magnitude of the heavy motor
@weak_magnitude: magnitude of the light one
Some rumble pads have two motors of different weight. Strong_magnitude
represents the magnitude of the vibration generated by the heavy one.
"""
_fields_ = [
("strong_magnitude", _u16),
("weak_magnitude", _u16),
]
class EffectType(ctypes.Union):
_fields_ = [
("ff_constant_effect", Constant),
("ff_ramp_effect", Ramp),
("ff_periodic_effect", Periodic),
("ff_condition_effect", Condition * 2), # one for each axis
("ff_rumble_effect", Rumble),
]
class Effect(ctypes.Structure):
_fields_ = [
("type", _u16),
("id", _s16),
("direction", _u16),
("ff_trigger", Trigger),
("ff_replay", Replay),
("u", EffectType),
]
class UInputUpload(ctypes.Structure):
_fields_ = [
("request_id", _u32),
("retval", _s32),
("effect", Effect),
("old", Effect),
]
class UInputErase(ctypes.Structure):
_fields_ = [
("request_id", _u32),
("retval", _s32),
("effect_id", _u32),
]
# ff_types = {
# ecodes.FF_CONSTANT,
# ecodes.FF_PERIODIC,
# ecodes.FF_RAMP,
# ecodes.FF_SPRING,
# ecodes.FF_FRICTION,
# ecodes.FF_DAMPER,
# ecodes.FF_RUMBLE,
# ecodes.FF_INERTIA,
# ecodes.FF_CUSTOM,
# }

View File

@@ -0,0 +1,147 @@
"""
Generate a Python extension module with the constants defined in linux/input.h.
"""
import getopt
import os
import re
import sys
# -----------------------------------------------------------------------------
# The default header file locations to try.
headers = [
"/usr/include/linux/input.h",
"/usr/include/linux/input-event-codes.h",
"/usr/include/linux/uinput.h",
]
opts, args = getopt.getopt(sys.argv[1:], "", ["ecodes", "stubs", "reproducible"])
if not opts:
print("usage: genecodes.py [--ecodes|--stubs] [--reproducible] <headers>")
exit(2)
if args:
headers = args
reproducible = ("--reproducible", "") in opts
# -----------------------------------------------------------------------------
macro_regex = r"#define\s+((?:KEY|ABS|REL|SW|MSC|LED|BTN|REP|SND|ID|EV|BUS|SYN|FF|UI_FF|INPUT_PROP)_\w+)"
macro_regex = re.compile(macro_regex)
if reproducible:
uname = "hidden for reproducibility"
else:
# Uname without hostname.
uname = list(os.uname())
uname = " ".join((uname[0], *uname[2:]))
# -----------------------------------------------------------------------------
template_ecodes = r"""
#include <Python.h>
#ifdef __FreeBSD__
#include <dev/evdev/input.h>
#include <dev/evdev/uinput.h>
#else
#include <linux/input.h>
#include <linux/uinput.h>
#endif
/* Automatically generated by evdev.genecodes */
/* Generated on %s */
/* Generated from %s */
#define MODULE_NAME "_ecodes"
#define MODULE_HELP "linux/input.h macros"
static PyMethodDef MethodTable[] = {
{ NULL, NULL, 0, NULL}
};
static struct PyModuleDef moduledef = {
PyModuleDef_HEAD_INIT,
MODULE_NAME,
MODULE_HELP,
-1, /* m_size */
MethodTable, /* m_methods */
NULL, /* m_reload */
NULL, /* m_traverse */
NULL, /* m_clear */
NULL, /* m_free */
};
PyMODINIT_FUNC
PyInit__ecodes(void)
{
PyObject* m = PyModule_Create(&moduledef);
if (m == NULL) return NULL;
%s
return m;
}
"""
template_stubs = r"""
# Automatically generated by evdev.genecodes
# Generated on %s
# Generated from %s
# pylint: skip-file
ecodes: dict[str, int]
keys: dict[int, str|list[str]]
bytype: dict[int, dict[int, str|list[str]]]
KEY: dict[int, str|list[str]]
ABS: dict[int, str|list[str]]
REL: dict[int, str|list[str]]
SW: dict[int, str|list[str]]
MSC: dict[int, str|list[str]]
LED: dict[int, str|list[str]]
BTN: dict[int, str|list[str]]
REP: dict[int, str|list[str]]
SND: dict[int, str|list[str]]
ID: dict[int, str|list[str]]
EV: dict[int, str|list[str]]
BUS: dict[int, str|list[str]]
SYN: dict[int, str|list[str]]
FF_STATUS: dict[int, str|list[str]]
FF_INPUT_PROP: dict[int, str|list[str]]
%s
"""
def parse_headers(headers=headers):
for header in headers:
try:
fh = open(header)
except (IOError, OSError):
continue
for line in fh:
macro = macro_regex.search(line)
if macro:
yield macro.group(1)
all_macros = list(parse_headers())
if not all_macros:
print("no input macros found in: %s" % " ".join(headers), file=sys.stderr)
sys.exit(1)
# pylint: disable=possibly-used-before-assignment, used-before-assignment
if ("--ecodes", "") in opts:
body = (" PyModule_AddIntMacro(m, %s);" % macro for macro in all_macros)
template = template_ecodes
elif ("--stubs", "") in opts:
body = ("%s: int" % macro for macro in all_macros)
template = template_stubs
body = os.linesep.join(body)
text = template % (uname, headers if not reproducible else ["hidden for reproducibility"], body)
print(text.strip())

View File

@@ -0,0 +1,54 @@
import sys
from unittest import mock
from pprint import PrettyPrinter
sys.modules["evdev.ecodes"] = mock.Mock()
from evdev import ecodes_runtime as ecodes
pprint = PrettyPrinter(indent=2, sort_dicts=True, width=120).pprint
print("# Automatically generated by evdev.genecodes_py")
print()
print('"""')
print(ecodes.__doc__.strip())
print('"""')
print()
print("from typing import Final, Dict, Tuple, Union")
print()
for name, value in ecodes.ecodes.items():
print(f"{name}: Final[int] = {value}")
print()
entries = [
("ecodes", "Dict[str, int]", "#: Mapping of names to values."),
("bytype", "Dict[int, Dict[int, Union[str, Tuple[str]]]]", "#: Mapping of event types to other value/name mappings."),
("keys", "Dict[int, Union[str, Tuple[str]]]", "#: Keys are a combination of all BTN and KEY codes."),
("KEY", "Dict[int, Union[str, Tuple[str]]]", None),
("ABS", "Dict[int, Union[str, Tuple[str]]]", None),
("REL", "Dict[int, Union[str, Tuple[str]]]", None),
("SW", "Dict[int, Union[str, Tuple[str]]]", None),
("MSC", "Dict[int, Union[str, Tuple[str]]]", None),
("LED", "Dict[int, Union[str, Tuple[str]]]", None),
("BTN", "Dict[int, Union[str, Tuple[str]]]", None),
("REP", "Dict[int, Union[str, Tuple[str]]]", None),
("SND", "Dict[int, Union[str, Tuple[str]]]", None),
("ID", "Dict[int, Union[str, Tuple[str]]]", None),
("EV", "Dict[int, Union[str, Tuple[str]]]", None),
("BUS", "Dict[int, Union[str, Tuple[str]]]", None),
("SYN", "Dict[int, Union[str, Tuple[str]]]", None),
("FF", "Dict[int, Union[str, Tuple[str]]]", None),
("UI_FF", "Dict[int, Union[str, Tuple[str]]]", None),
("FF_STATUS", "Dict[int, Union[str, Tuple[str]]]", None),
("INPUT_PROP", "Dict[int, Union[str, Tuple[str]]]", None)
]
for key, annotation, doc in entries:
if doc:
print(doc)
print(f"{key}: {annotation} = ", end="")
pprint(getattr(ecodes, key))
print()

View File

@@ -0,0 +1,579 @@
/*
* Python bindings to certain linux input subsystem functions.
*
* While everything here can be implemented in pure Python with struct and
* fcntl.ioctl, imho, it is much more straightforward to do so in C.
*
*/
#include <Python.h>
#include <stdio.h>
#include <stdint.h>
#include <string.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#ifdef __FreeBSD__
#include <dev/evdev/input.h>
#else
#include <linux/input.h>
#endif
#ifndef input_event_sec
#define input_event_sec time.tv_sec
#define input_event_usec time.tv_usec
#endif
#define MAX_NAME_SIZE 256
extern char* EV_NAME[EV_CNT];
extern int EV_TYPE_MAX[EV_CNT];
extern char** EV_TYPE_NAME[EV_CNT];
extern char* BUS_NAME[];
int test_bit(const char* bitmask, int bit) {
return bitmask[bit/8] & (1 << (bit % 8));
}
// Read input event from a device and return a tuple that mimics input_event
static PyObject *
device_read(PyObject *self, PyObject *args)
{
struct input_event event;
// get device file descriptor (O_RDONLY|O_NONBLOCK)
int fd = (int)PyLong_AsLong(PyTuple_GET_ITEM(args, 0));
int n = read(fd, &event, sizeof(event));
if (n < 0) {
if (errno == EAGAIN) {
Py_INCREF(Py_None);
return Py_None;
}
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
PyObject* sec = PyLong_FromLong(event.input_event_sec);
PyObject* usec = PyLong_FromLong(event.input_event_usec);
PyObject* val = PyLong_FromLong(event.value);
PyObject* type = PyLong_FromLong(event.type);
PyObject* code = PyLong_FromLong(event.code);
PyObject* py_input_event = PyTuple_Pack(5, sec, usec, type, code, val);
return py_input_event;
}
// Read multiple input events from a device and return a list of tuples
static PyObject *
device_read_many(PyObject *self, PyObject *args)
{
// get device file descriptor (O_RDONLY|O_NONBLOCK)
int fd = (int)PyLong_AsLong(PyTuple_GET_ITEM(args, 0));
PyObject* py_input_event = NULL;
PyObject* events = NULL;
PyObject* sec = NULL;
PyObject* usec = NULL;
PyObject* val = NULL;
PyObject* type = NULL;
PyObject* code = NULL;
struct input_event event[64];
size_t event_size = sizeof(struct input_event);
ssize_t nread = read(fd, event, event_size*64);
if (nread < 0) {
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
// Construct a tuple of event tuples. Each tuple is the arguments to InputEvent.
size_t num_events = nread / event_size;
events = PyTuple_New(num_events);
for (size_t i = 0 ; i < num_events; i++) {
sec = PyLong_FromLong(event[i].input_event_sec);
usec = PyLong_FromLong(event[i].input_event_usec);
val = PyLong_FromLong(event[i].value);
type = PyLong_FromLong(event[i].type);
code = PyLong_FromLong(event[i].code);
py_input_event = PyTuple_Pack(5, sec, usec, type, code, val);
PyTuple_SET_ITEM(events, i, py_input_event);
}
return events;
}
// Get the event types and event codes that the input device supports
static PyObject *
ioctl_capabilities(PyObject *self, PyObject *args)
{
int fd, ev_type, ev_code;
char ev_bits[EV_MAX/8 + 1], code_bits[KEY_MAX/8 + 1];
struct input_absinfo absinfo;
int ret = PyArg_ParseTuple(args, "i", &fd);
if (!ret) return NULL;
// @todo: figure out why fd gets zeroed on an ioctl after the
// refactoring and get rid of this workaround
const int _fd = fd;
// Capabilities is a mapping of supported event types to lists of handled
// events e.g: {1: [272, 273, 274, 275], 2: [0, 1, 6, 8]}
PyObject* capabilities = PyDict_New();
PyObject* eventcodes = NULL;
PyObject* evlong = NULL;
PyObject* capability = NULL;
PyObject* py_absinfo = NULL;
PyObject* absitem = NULL;
memset(&ev_bits, 0, sizeof(ev_bits));
if (ioctl(_fd, EVIOCGBIT(0, sizeof(ev_bits)), ev_bits) < 0)
goto on_err;
// Build a dictionary of the device's capabilities
for (ev_type=0 ; ev_type<EV_MAX ; ev_type++) {
if (test_bit(ev_bits, ev_type)) {
capability = PyLong_FromLong(ev_type);
eventcodes = PyList_New(0);
memset(&code_bits, 0, sizeof(code_bits));
ioctl(_fd, EVIOCGBIT(ev_type, sizeof(code_bits)), code_bits);
for (ev_code = 0; ev_code < KEY_MAX; ev_code++) {
if (test_bit(code_bits, ev_code)) {
// Get abs{min,max,fuzz,flat} values for ABS_* event codes
if (ev_type == EV_ABS) {
memset(&absinfo, 0, sizeof(absinfo));
ioctl(_fd, EVIOCGABS(ev_code), &absinfo);
py_absinfo = Py_BuildValue("(iiiiii)",
absinfo.value,
absinfo.minimum,
absinfo.maximum,
absinfo.fuzz,
absinfo.flat,
absinfo.resolution);
evlong = PyLong_FromLong(ev_code);
absitem = Py_BuildValue("(OO)", evlong, py_absinfo);
// absitem -> tuple(ABS_X, (0, 255, 0, 0))
PyList_Append(eventcodes, absitem);
Py_DECREF(absitem);
Py_DECREF(py_absinfo);
}
else {
evlong = PyLong_FromLong(ev_code);
PyList_Append(eventcodes, evlong);
}
Py_DECREF(evlong);
}
}
// capabilities[EV_KEY] = [KEY_A, KEY_B, KEY_C, ...]
// capabilities[EV_ABS] = [(ABS_X, (0, 255, 0, 0)), ...]
PyDict_SetItem(capabilities, capability, eventcodes);
Py_DECREF(capability);
Py_DECREF(eventcodes);
}
}
return capabilities;
on_err:
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
// An all-in-one function for describing an input device
static PyObject *
ioctl_devinfo(PyObject *self, PyObject *args)
{
int fd;
struct input_id iid;
char name[MAX_NAME_SIZE];
char phys[MAX_NAME_SIZE] = {0};
char uniq[MAX_NAME_SIZE] = {0};
int ret = PyArg_ParseTuple(args, "i", &fd);
if (!ret) return NULL;
memset(&iid, 0, sizeof(iid));
if (ioctl(fd, EVIOCGID, &iid) < 0) goto on_err;
if (ioctl(fd, EVIOCGNAME(sizeof(name)), name) < 0) goto on_err;
// Some devices do not have a physical topology associated with them
ioctl(fd, EVIOCGPHYS(sizeof(phys)), phys);
// Some kernels have started reporting bluetooth controller MACs as phys.
// This lets us get the real physical address. As with phys, it may be blank.
ioctl(fd, EVIOCGUNIQ(sizeof(uniq)), uniq);
return Py_BuildValue("hhhhsss", iid.bustype, iid.vendor, iid.product, iid.version,
name, phys, uniq);
on_err:
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
static PyObject *
ioctl_EVIOCGABS(PyObject *self, PyObject *args)
{
int fd, ev_code;
struct input_absinfo absinfo;
PyObject* py_absinfo = NULL;
int ret = PyArg_ParseTuple(args, "ii", &fd, &ev_code);
if (!ret) return NULL;
memset(&absinfo, 0, sizeof(absinfo));
ret = ioctl(fd, EVIOCGABS(ev_code), &absinfo);
if (ret == -1) {
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
py_absinfo = Py_BuildValue("(iiiiii)",
absinfo.value,
absinfo.minimum,
absinfo.maximum,
absinfo.fuzz,
absinfo.flat,
absinfo.resolution);
return py_absinfo;
}
static PyObject *
ioctl_EVIOCSABS(PyObject *self, PyObject *args)
{
int fd, ev_code;
struct input_absinfo absinfo;
int ret = PyArg_ParseTuple(args,
"ii(iiiiii)",
&fd,
&ev_code,
&absinfo.value,
&absinfo.minimum,
&absinfo.maximum,
&absinfo.fuzz,
&absinfo.flat,
&absinfo.resolution);
if (!ret) return NULL;
ret = ioctl(fd, EVIOCSABS(ev_code), &absinfo);
if (ret == -1) {
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
Py_INCREF(Py_None);
return Py_None;
}
static PyObject *
ioctl_EVIOCGREP(PyObject *self, PyObject *args)
{
int fd, ret;
unsigned int rep[REP_CNT] = {0};
ret = PyArg_ParseTuple(args, "i", &fd);
if (!ret) return NULL;
ret = ioctl(fd, EVIOCGREP, &rep);
if (ret == -1)
return NULL;
return Py_BuildValue("(ii)", rep[REP_DELAY], rep[REP_PERIOD]);
}
static PyObject *
ioctl_EVIOCSREP(PyObject *self, PyObject *args)
{
int fd, ret;
unsigned int rep[REP_CNT] = {0};
ret = PyArg_ParseTuple(args, "iii", &fd, &rep[0], &rep[1]);
if (!ret) return NULL;
ret = ioctl(fd, EVIOCSREP, &rep);
if (ret == -1)
return NULL;
return Py_BuildValue("i", ret);
}
static PyObject *
ioctl_EVIOCGVERSION(PyObject *self, PyObject *args)
{
int fd, ret, res;
ret = PyArg_ParseTuple(args, "i", &fd);
if (!ret) return NULL;
ret = ioctl(fd, EVIOCGVERSION, &res);
if (ret == -1)
return NULL;
return Py_BuildValue("i", res);
}
static PyObject *
ioctl_EVIOCGRAB(PyObject *self, PyObject *args)
{
int fd, ret, flag;
ret = PyArg_ParseTuple(args, "ii", &fd, &flag);
if (!ret) return NULL;
ret = ioctl(fd, EVIOCGRAB, (intptr_t)flag);
if (ret != 0) {
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
Py_INCREF(Py_None);
return Py_None;
}
static PyObject *
ioctl_EVIOCG_bits(PyObject *self, PyObject *args)
{
int max, fd, evtype, ret;
ret = PyArg_ParseTuple(args, "ii", &fd, &evtype);
if (!ret) return NULL;
switch (evtype) {
case EV_LED:
max = LED_MAX; break;
case EV_SND:
max = SND_MAX; break;
case EV_KEY:
max = KEY_MAX; break;
case EV_SW:
max = SW_MAX; break;
default:
return NULL;
}
char bytes[(max+7)/8];
memset(bytes, 0, sizeof bytes);
switch (evtype) {
case EV_LED:
ret = ioctl(fd, EVIOCGLED(sizeof(bytes)), &bytes);
break;
case EV_SND:
ret = ioctl(fd, EVIOCGSND(sizeof(bytes)), &bytes);
break;
case EV_KEY:
ret = ioctl(fd, EVIOCGKEY(sizeof(bytes)), &bytes);
break;
case EV_SW:
ret = ioctl(fd, EVIOCGSW(sizeof(bytes)), &bytes);
break;
}
if (ret == -1)
return NULL;
PyObject* res = PyList_New(0);
for (int i=0; i<=max; i++) {
if (test_bit(bytes, i)) {
PyList_Append(res, Py_BuildValue("i", i));
}
}
return res;
}
static PyObject *
ioctl_EVIOCGEFFECTS(PyObject *self, PyObject *args)
{
int fd, ret, res;
ret = PyArg_ParseTuple(args, "i", &fd);
if (!ret) return NULL;
ret = ioctl(fd, EVIOCGEFFECTS, &res);
if (ret == -1)
return NULL;
return Py_BuildValue("i", res);
}
void print_ff_effect(struct ff_effect* effect) {
fprintf(stderr,
"ff_effect:\n"
" type: %d \n"
" id: %d \n"
" direction: %d\n"
" trigger: (%d, %d)\n"
" replay: (%d, %d)\n",
effect->type, effect->id, effect->direction,
effect->trigger.button, effect->trigger.interval,
effect->replay.length, effect->replay.delay
);
switch (effect->type) {
case FF_CONSTANT:
fprintf(stderr, " constant: (%d, (%d, %d, %d, %d))\n", effect->u.constant.level,
effect->u.constant.envelope.attack_length,
effect->u.constant.envelope.attack_level,
effect->u.constant.envelope.fade_length,
effect->u.constant.envelope.fade_level);
break;
case FF_RUMBLE:
fprintf(stderr, " rumble: (%d, %d)\n",
effect->u.rumble.strong_magnitude,
effect->u.rumble.weak_magnitude);
break;
}
}
static PyObject *
upload_effect(PyObject *self, PyObject *args)
{
int fd, ret;
PyObject* effect_data;
ret = PyArg_ParseTuple(args, "iO", &fd, &effect_data);
if (!ret) return NULL;
void* data = PyBytes_AsString(effect_data);
struct ff_effect effect = {};
memmove(&effect, data, sizeof(struct ff_effect));
// print_ff_effect(&effect);
ret = ioctl(fd, EVIOCSFF, &effect);
if (ret != 0) {
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
return Py_BuildValue("i", effect.id);
}
static PyObject *
erase_effect(PyObject *self, PyObject *args)
{
int fd, ret;
PyObject* ff_id_obj;
ret = PyArg_ParseTuple(args, "iO", &fd, &ff_id_obj);
if (!ret) return NULL;
long ff_id = PyLong_AsLong(ff_id_obj);
ret = ioctl(fd, EVIOCRMFF, ff_id);
if (ret != 0) {
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
Py_INCREF(Py_None);
return Py_None;
}
static PyObject *
ioctl_EVIOCGPROP(PyObject *self, PyObject *args)
{
int fd, ret;
ret = PyArg_ParseTuple(args, "i", &fd);
if (!ret) return NULL;
char bytes[(INPUT_PROP_MAX+7)/8];
memset(bytes, 0, sizeof bytes);
ret = ioctl(fd, EVIOCGPROP(sizeof(bytes)), &bytes);
if (ret == -1)
return NULL;
PyObject* res = PyList_New(0);
for (int i=0; i<INPUT_PROP_MAX; i++) {
if (test_bit(bytes, i)) {
PyList_Append(res, Py_BuildValue("i", i));
}
}
return res;
}
static PyMethodDef MethodTable[] = {
{ "ioctl_devinfo", ioctl_devinfo, METH_VARARGS, "fetch input device info" },
{ "ioctl_capabilities", ioctl_capabilities, METH_VARARGS, "fetch input device capabilities" },
{ "ioctl_EVIOCGABS", ioctl_EVIOCGABS, METH_VARARGS, "get input device absinfo"},
{ "ioctl_EVIOCSABS", ioctl_EVIOCSABS, METH_VARARGS, "set input device absinfo"},
{ "ioctl_EVIOCGREP", ioctl_EVIOCGREP, METH_VARARGS},
{ "ioctl_EVIOCSREP", ioctl_EVIOCSREP, METH_VARARGS},
{ "ioctl_EVIOCGVERSION", ioctl_EVIOCGVERSION, METH_VARARGS},
{ "ioctl_EVIOCGRAB", ioctl_EVIOCGRAB, METH_VARARGS},
{ "ioctl_EVIOCGEFFECTS", ioctl_EVIOCGEFFECTS, METH_VARARGS, "fetch the number of effects the device can keep in its memory." },
{ "ioctl_EVIOCG_bits", ioctl_EVIOCG_bits, METH_VARARGS, "get state of KEY|LED|SND|SW"},
{ "ioctl_EVIOCGPROP", ioctl_EVIOCGPROP, METH_VARARGS, "get device properties"},
{ "device_read", device_read, METH_VARARGS, "read an input event from a device" },
{ "device_read_many", device_read_many, METH_VARARGS, "read all available input events from a device" },
{ "upload_effect", upload_effect, METH_VARARGS, "" },
{ "erase_effect", erase_effect, METH_VARARGS, "" },
{ NULL, NULL, 0, NULL}
};
static struct PyModuleDef moduledef = {
PyModuleDef_HEAD_INIT,
"_input",
"Python bindings to certain linux input subsystem functions",
-1, /* m_size */
MethodTable, /* m_methods */
NULL, /* m_reload */
NULL, /* m_traverse */
NULL, /* m_clear */
NULL, /* m_free */
};
static PyObject *
moduleinit(void)
{
PyObject* m = PyModule_Create(&moduledef);
if (m == NULL) return NULL;
return m;
}
PyMODINIT_FUNC
PyInit__input(void)
{
return moduleinit();
}

View File

@@ -0,0 +1,417 @@
#include <Python.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#ifdef __FreeBSD__
#include <dev/evdev/input.h>
#include <dev/evdev/uinput.h>
#else
#include <linux/input.h>
#include <linux/uinput.h>
#endif
#ifndef input_event_sec
#define input_event_sec time.tv_sec
#define input_event_usec time.tv_usec
#endif
// Workaround for installing on kernels newer than 4.4.
#ifndef FF_MAX_EFFECTS
#define FF_MAX_EFFECTS FF_GAIN;
#endif
int _uinput_close(int fd)
{
if (ioctl(fd, UI_DEV_DESTROY) < 0) {
int oerrno = errno;
close(fd);
errno = oerrno;
return -1;
}
return close(fd);
}
static PyObject *
uinput_open(PyObject *self, PyObject *args)
{
const char* devnode;
int ret = PyArg_ParseTuple(args, "s", &devnode);
if (!ret) return NULL;
int fd = open(devnode, O_RDWR | O_NONBLOCK);
if (fd < 0) {
PyErr_SetString(PyExc_OSError, "could not open uinput device in write mode");
return NULL;
}
return Py_BuildValue("i", fd);
}
static PyObject *
uinput_set_phys(PyObject *self, PyObject *args)
{
int fd;
const char* phys;
int ret = PyArg_ParseTuple(args, "is", &fd, &phys);
if (!ret) return NULL;
if (ioctl(fd, UI_SET_PHYS, phys) < 0)
goto on_err;
Py_RETURN_NONE;
on_err:
_uinput_close(fd);
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
static PyObject *
uinput_set_prop(PyObject *self, PyObject *args)
{
int fd;
uint16_t prop;
int ret = PyArg_ParseTuple(args, "ih", &fd, &prop);
if (!ret) return NULL;
if (ioctl(fd, UI_SET_PROPBIT, prop) < 0)
goto on_err;
Py_RETURN_NONE;
on_err:
_uinput_close(fd);
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
static PyObject *
uinput_get_sysname(PyObject *self, PyObject *args)
{
int fd;
char sysname[64];
int ret = PyArg_ParseTuple(args, "i", &fd);
if (!ret) return NULL;
#ifdef UI_GET_SYSNAME
if (ioctl(fd, UI_GET_SYSNAME(sizeof(sysname)), &sysname) < 0)
goto on_err;
return Py_BuildValue("s", &sysname);
#endif
on_err:
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
// Different kernel versions have different device setup methods. You can read
// more about it here:
// https://github.com/torvalds/linux/commit/052876f8e5aec887d22c4d06e54aa5531ffcec75
// Setup function for kernel >= v4.5
#if defined(UI_DEV_SETUP) && defined(UI_ABS_SETUP)
static PyObject *
uinput_setup(PyObject *self, PyObject *args) {
int fd, len, i;
uint16_t vendor, product, version, bustype;
uint32_t max_effects;
PyObject *absinfo = NULL, *item = NULL;
struct uinput_abs_setup abs_setup;
const char* name;
int ret = PyArg_ParseTuple(args, "isHHHHOI", &fd, &name, &vendor,
&product, &version, &bustype, &absinfo, &max_effects);
if (!ret) return NULL;
// Setup absinfo:
len = PyList_Size(absinfo);
for (i=0; i<len; i++) {
// item -> (ABS_X, 0, 255, 0, 0, 0, 0)
item = PyList_GetItem(absinfo, i);
memset(&abs_setup, 0, sizeof(abs_setup)); // Clear struct
abs_setup.code = PyLong_AsLong(PyList_GetItem(item, 0));
abs_setup.absinfo.value = PyLong_AsLong(PyList_GetItem(item, 1));
abs_setup.absinfo.minimum = PyLong_AsLong(PyList_GetItem(item, 2));
abs_setup.absinfo.maximum = PyLong_AsLong(PyList_GetItem(item, 3));
abs_setup.absinfo.fuzz = PyLong_AsLong(PyList_GetItem(item, 4));
abs_setup.absinfo.flat = PyLong_AsLong(PyList_GetItem(item, 5));
abs_setup.absinfo.resolution = PyLong_AsLong(PyList_GetItem(item, 6));
if(ioctl(fd, UI_ABS_SETUP, &abs_setup) < 0)
goto on_err;
}
// Setup evdev:
struct uinput_setup usetup;
memset(&usetup, 0, sizeof(usetup));
strncpy(usetup.name, name, sizeof(usetup.name) - 1);
usetup.id.vendor = vendor;
usetup.id.product = product;
usetup.id.version = version;
usetup.id.bustype = bustype;
usetup.ff_effects_max = max_effects;
if(ioctl(fd, UI_DEV_SETUP, &usetup) < 0)
goto on_err;
Py_RETURN_NONE;
on_err:
_uinput_close(fd);
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
// Fallback setup function (Linux <= 4.5 and FreeBSD).
#else
static PyObject *
uinput_setup(PyObject *self, PyObject *args) {
int fd, len, i, abscode;
uint16_t vendor, product, version, bustype;
uint32_t max_effects;
PyObject *absinfo = NULL, *item = NULL;
struct uinput_user_dev uidev;
const char* name;
int ret = PyArg_ParseTuple(args, "isHHHHOI", &fd, &name, &vendor,
&product, &version, &bustype, &absinfo, &max_effects);
if (!ret) return NULL;
memset(&uidev, 0, sizeof(uidev));
strncpy(uidev.name, name, sizeof(uidev.name) - 1);
uidev.id.vendor = vendor;
uidev.id.product = product;
uidev.id.version = version;
uidev.id.bustype = bustype;
uidev.ff_effects_max = max_effects;
len = PyList_Size(absinfo);
for (i=0; i<len; i++) {
// item -> (ABS_X, 0, 255, 0, 0, 0, 0)
item = PyList_GetItem(absinfo, i);
abscode = (int)PyLong_AsLong(PyList_GetItem(item, 0));
/* min/max/fuzz/flat start from index 2 because index 1 is value */
uidev.absmin[abscode] = PyLong_AsLong(PyList_GetItem(item, 2));
uidev.absmax[abscode] = PyLong_AsLong(PyList_GetItem(item, 3));
uidev.absfuzz[abscode] = PyLong_AsLong(PyList_GetItem(item, 4));
uidev.absflat[abscode] = PyLong_AsLong(PyList_GetItem(item, 5));
}
if (write(fd, &uidev, sizeof(uidev)) != sizeof(uidev))
goto on_err;
Py_RETURN_NONE;
on_err:
_uinput_close(fd);
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
#endif
static PyObject *
uinput_create(PyObject *self, PyObject *args)
{
int fd;
int ret = PyArg_ParseTuple(args, "i", &fd);
if (!ret) return NULL;
if (ioctl(fd, UI_DEV_CREATE) < 0)
goto on_err;
Py_RETURN_NONE;
on_err:
_uinput_close(fd);
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
static PyObject *
uinput_close(PyObject *self, PyObject *args)
{
int fd;
int ret = PyArg_ParseTuple(args, "i", &fd);
if (!ret) return NULL;
if (_uinput_close(fd) < 0) {
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
Py_RETURN_NONE;
}
static PyObject *
uinput_write(PyObject *self, PyObject *args)
{
int fd, type, code, value;
int ret = PyArg_ParseTuple(args, "iiii", &fd, &type, &code, &value);
if (!ret) return NULL;
struct input_event event;
struct timeval tval;
memset(&event, 0, sizeof(event));
gettimeofday(&tval, 0);
event.input_event_usec = tval.tv_usec;
event.input_event_sec = tval.tv_sec;
event.type = type;
event.code = code;
event.value = value;
if (write(fd, &event, sizeof(event)) != sizeof(event)) {
// @todo: elaborate
// PyErr_SetString(PyExc_OSError, "error writing event to uinput device");
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
Py_RETURN_NONE;
}
static PyObject *
uinput_enable_event(PyObject *self, PyObject *args)
{
int fd;
uint16_t type, code;
unsigned long req;
int ret = PyArg_ParseTuple(args, "ihh", &fd, &type, &code);
if (!ret) return NULL;
switch (type) {
case EV_KEY: req = UI_SET_KEYBIT; break;
case EV_ABS: req = UI_SET_ABSBIT; break;
case EV_REL: req = UI_SET_RELBIT; break;
case EV_MSC: req = UI_SET_MSCBIT; break;
case EV_SW: req = UI_SET_SWBIT; break;
case EV_LED: req = UI_SET_LEDBIT; break;
case EV_FF: req = UI_SET_FFBIT; break;
case EV_SND: req = UI_SET_SNDBIT; break;
default:
errno = EINVAL;
goto on_err;
}
if (ioctl(fd, UI_SET_EVBIT, type) < 0)
goto on_err;
if (ioctl(fd, req, code) < 0)
goto on_err;
Py_RETURN_NONE;
on_err:
_uinput_close(fd);
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
int _uinput_begin_upload(int fd, struct uinput_ff_upload *upload)
{
return ioctl(fd, UI_BEGIN_FF_UPLOAD, upload);
}
int _uinput_end_upload(int fd, struct uinput_ff_upload *upload)
{
return ioctl(fd, UI_END_FF_UPLOAD, upload);
}
int _uinput_begin_erase(int fd, struct uinput_ff_erase *upload)
{
return ioctl(fd, UI_BEGIN_FF_ERASE, upload);
}
int _uinput_end_erase(int fd, struct uinput_ff_erase *upload)
{
return ioctl(fd, UI_END_FF_ERASE, upload);
}
static PyMethodDef MethodTable[] = {
{ "open", uinput_open, METH_VARARGS,
"Open uinput device node."},
{ "setup", uinput_setup, METH_VARARGS,
"Set an uinput device up."},
{ "create", uinput_create, METH_VARARGS,
"Create an uinput device."},
{ "close", uinput_close, METH_VARARGS,
"Destroy uinput device."},
{ "write", uinput_write, METH_VARARGS,
"Write event to uinput device."},
{ "enable", uinput_enable_event, METH_VARARGS,
"Enable a type of event."},
{ "set_phys", uinput_set_phys, METH_VARARGS,
"Set physical path"},
{ "get_sysname", uinput_get_sysname, METH_VARARGS,
"Obtain the sysname of the uinput device."},
{ "set_prop", uinput_set_prop, METH_VARARGS,
"Set device input property"},
{ NULL, NULL, 0, NULL}
};
static struct PyModuleDef moduledef = {
PyModuleDef_HEAD_INIT,
"_uinput",
"Python bindings for parts of linux/uinput.c",
-1, /* m_size */
MethodTable, /* m_methods */
NULL, /* m_reload */
NULL, /* m_traverse */
NULL, /* m_clear */
NULL, /* m_free */
};
static PyObject *
moduleinit(void)
{
PyObject* m = PyModule_Create(&moduledef);
if (m == NULL) return NULL;
PyModule_AddIntConstant(m, "maxnamelen", UINPUT_MAX_NAME_SIZE);
return m;
}
PyMODINIT_FUNC
PyInit__uinput(void)
{
return moduleinit();
}

View File

@@ -0,0 +1,375 @@
import ctypes
import os
import platform
import re
import stat
import time
from collections import defaultdict
from typing import Union, Tuple, Dict, Sequence, Optional
from . import _uinput, ecodes, ff, util
from .device import InputDevice, AbsInfo
from .events import InputEvent
try:
from evdev.eventio_async import EventIO
except ImportError:
from evdev.eventio import EventIO
class UInputError(Exception):
pass
class UInput(EventIO):
"""
A userland input device and that can inject input events into the
linux input subsystem.
"""
__slots__ = (
"name",
"vendor",
"product",
"version",
"bustype",
"events",
"devnode",
"fd",
"device",
)
@classmethod
def from_device(
cls,
*devices: Union[InputDevice, Union[str, bytes, os.PathLike]],
filtered_types: Tuple[int] = (ecodes.EV_SYN, ecodes.EV_FF),
**kwargs,
):
"""
Create an UInput device with the capabilities of one or more input
devices.
Arguments
---------
devices : InputDevice|str
Varargs of InputDevice instances or paths to input devices.
filtered_types : Tuple[event type codes]
Event types to exclude from the capabilities of the uinput device.
**kwargs
Keyword arguments to UInput constructor (i.e. name, vendor etc.).
"""
device_instances = []
for dev in devices:
if not isinstance(dev, InputDevice):
dev = InputDevice(str(dev))
device_instances.append(dev)
all_capabilities = defaultdict(set)
if "max_effects" not in kwargs:
kwargs["max_effects"] = min([dev.ff_effects_count for dev in device_instances])
# Merge the capabilities of all devices into one dictionary.
for dev in device_instances:
for ev_type, ev_codes in dev.capabilities().items():
all_capabilities[ev_type].update(ev_codes)
for evtype in filtered_types:
if evtype in all_capabilities:
del all_capabilities[evtype]
return cls(events=all_capabilities, **kwargs)
def __init__(
self,
events: Optional[Dict[int, Sequence[int]]] = None,
name: str = "py-evdev-uinput",
vendor: int = 0x1,
product: int = 0x1,
version: int = 0x1,
bustype: int = 0x3,
devnode: str = "/dev/uinput",
phys: str = "py-evdev-uinput",
input_props=None,
# CentOS 7 has sufficiently old headers that FF_MAX_EFFECTS is not defined there,
# which causes the whole module to fail loading. Fallback on a hardcoded value of
# FF_MAX_EFFECTS if it is not defined in the ecodes.
max_effects=ecodes.ecodes.get("FF_MAX_EFFECTS", 96),
):
"""
Arguments
---------
events : dict
Dictionary of event types mapping to lists of event codes. The
event types and codes that the uinput device will be able to
inject - defaults to all key codes.
name
The name of the input device.
vendor
Vendor identifier.
product
Product identifier.
version
Version identifier.
bustype
Bustype identifier.
phys
Physical path.
input_props
Input properties and quirks.
max_effects
Maximum simultaneous force-feedback effects.
Note
----
If you do not specify any events, the uinput device will be able
to inject only ``KEY_*`` and ``BTN_*`` event codes.
"""
self.name: str = name #: Uinput device name.
self.vendor: int = vendor #: Device vendor identifier.
self.product: int = product #: Device product identifier.
self.version: int = version #: Device version identifier.
self.bustype: int = bustype #: Device bustype - e.g. ``BUS_USB``.
self.phys: str = phys #: Uinput device physical path.
self.devnode: str = devnode #: Uinput device node - e.g. ``/dev/uinput/``.
if not events:
events = {ecodes.EV_KEY: ecodes.keys.keys()}
self._verify()
#: Write-only, non-blocking file descriptor to the uinput device node.
self.fd = _uinput.open(devnode)
# Prepare the list of events for passing to _uinput.enable and _uinput.setup.
absinfo, prepared_events = self._prepare_events(events)
# Set phys name
_uinput.set_phys(self.fd, phys)
# Set properties
input_props = input_props or []
for prop in input_props:
_uinput.set_prop(self.fd, prop)
for etype, code in prepared_events:
_uinput.enable(self.fd, etype, code)
_uinput.setup(self.fd, name, vendor, product, version, bustype, absinfo, max_effects)
# Create the uinput device.
_uinput.create(self.fd)
self.dll = ctypes.CDLL(_uinput.__file__)
self.dll._uinput_begin_upload.restype = ctypes.c_int
self.dll._uinput_end_upload.restype = ctypes.c_int
#: An :class:`InputDevice <evdev.device.InputDevice>` instance
#: for the fake input device. ``None`` if the device cannot be
#: opened for reading and writing.
self.device: InputDevice = self._find_device(self.fd)
def _prepare_events(self, events):
"""Prepare events for passing to _uinput.enable and _uinput.setup"""
absinfo, prepared_events = [], []
for etype, codes in events.items():
for code in codes:
# Handle max, min, fuzz, flat.
if isinstance(code, (tuple, list, AbsInfo)):
# Flatten (ABS_Y, (0, 255, 0, 0, 0, 0)) to (ABS_Y, 0, 255, 0, 0, 0, 0).
f = [code[0]]
f.extend(code[1])
# Ensure the tuple is always 6 ints long, since uinput.c:uinput_create
# does little in the way of checking the length.
f.extend([0] * (6 - len(code[1])))
absinfo.append(f)
code = code[0]
prepared_events.append((etype, code))
return absinfo, prepared_events
def __enter__(self):
return self
def __exit__(self, type, value, tb):
if hasattr(self, "fd"):
self.close()
def __repr__(self):
# TODO:
v = (repr(getattr(self, i)) for i in ("name", "bustype", "vendor", "product", "version", "phys"))
return "{}({})".format(self.__class__.__name__, ", ".join(v))
def __str__(self):
msg = 'name "{}", bus "{}", vendor "{:04x}", product "{:04x}", version "{:04x}", phys "{}"\nevent types: {}'
evtypes = [i[0] for i in self.capabilities(True).keys()]
msg = msg.format(
self.name, ecodes.BUS[self.bustype], self.vendor, self.product, self.version, self.phys, " ".join(evtypes)
)
return msg
def close(self):
# Close the associated InputDevice, if it was previously opened.
if self.device is not None:
self.device.close()
# Destroy the uinput device.
if self.fd > -1:
_uinput.close(self.fd)
self.fd = -1
def capabilities(self, verbose: bool = False, absinfo: bool = True):
"""See :func:`capabilities <evdev.device.InputDevice.capabilities>`."""
if self.device is None:
raise UInputError("input device not opened - cannot read capabilities")
return self.device.capabilities(verbose, absinfo)
def begin_upload(self, effect_id):
upload = ff.UInputUpload()
upload.effect_id = effect_id
ret = self.dll._uinput_begin_upload(self.fd, ctypes.byref(upload))
if ret:
raise UInputError("Failed to begin uinput upload: " + os.strerror(ret))
return upload
def end_upload(self, upload):
ret = self.dll._uinput_end_upload(self.fd, ctypes.byref(upload))
if ret:
raise UInputError("Failed to end uinput upload: " + os.strerror(ret))
def begin_erase(self, effect_id):
erase = ff.UInputErase()
erase.effect_id = effect_id
ret = self.dll._uinput_begin_erase(self.fd, ctypes.byref(erase))
if ret:
raise UInputError("Failed to begin uinput erase: " + os.strerror(ret))
return erase
def end_erase(self, erase):
ret = self.dll._uinput_end_erase(self.fd, ctypes.byref(erase))
if ret:
raise UInputError("Failed to end uinput erase: " + os.strerror(ret))
def _verify(self):
"""
Verify that an uinput device exists and is readable and writable
by the current process.
"""
try:
m = os.stat(self.devnode)[stat.ST_MODE]
assert stat.S_ISCHR(m)
except (IndexError, OSError, AssertionError):
msg = '"{}" does not exist or is not a character device file - verify that the uinput module is loaded'
raise UInputError(msg.format(self.devnode))
if not os.access(self.devnode, os.W_OK):
msg = '"{}" cannot be opened for writing'
raise UInputError(msg.format(self.devnode))
if len(self.name) > _uinput.maxnamelen:
msg = "uinput device name must not be longer than {} characters"
raise UInputError(msg.format(_uinput.maxnamelen))
def _find_device(self, fd: int) -> InputDevice:
"""
Tries to find the device node. Will delegate this task to one of
several platform-specific functions.
"""
if platform.system() == "Linux":
try:
sysname = _uinput.get_sysname(fd)
return self._find_device_linux(sysname)
except OSError:
# UI_GET_SYSNAME returned an error code. We're likely dealing with
# an old kernel. Guess the device based on the filesystem.
pass
# If we're not running or Linux or the above method fails for any reason,
# use the generic fallback method.
return self._find_device_fallback()
def _find_device_linux(self, sysname: str) -> InputDevice:
"""
Tries to find the device node when running on Linux.
"""
syspath = f"/sys/devices/virtual/input/{sysname}"
# The sysfs entry for event devices should contain exactly one folder
# whose name matches the format "event[0-9]+". It is then assumed that
# the device node in /dev/input uses the same name.
regex = re.compile("event[0-9]+")
for entry in os.listdir(syspath):
if regex.fullmatch(entry):
device_path = f"/dev/input/{entry}"
break
else: # no break
raise FileNotFoundError()
# It is possible that there is some delay before /dev/input/event* shows
# up on old systems that do not use devtmpfs, so if the device cannot be
# found, wait for a short amount and then try again once.
#
# Furthermore, even if devtmpfs is in use, it is possible that the device
# does show up immediately, but without the correct permissions that
# still need to be set by udev. Wait for up to two seconds for either the
# device to show up or the permissions to be set.
for attempt in range(19):
try:
return InputDevice(device_path)
except (FileNotFoundError, PermissionError):
time.sleep(0.1)
# Last attempt. If this fails, whatever exception the last attempt raises
# shall be the exception that this function raises.
return InputDevice(device_path)
def _find_device_fallback(self) -> Union[InputDevice, None]:
"""
Tries to find the device node when UI_GET_SYSNAME is not available or
we're running on a system sufficiently exotic that we do not know how
to interpret its return value.
"""
#:bug: the device node might not be immediately available
time.sleep(0.1)
# There could also be another device with the same name already present,
# make sure to select the newest one.
# Strictly speaking, we cannot be certain that everything returned by list_devices()
# ends at event[0-9]+: it might return something like "/dev/input/events_all". Find
# the devices that have the expected structure and extract their device number.
path_number_pairs = []
regex = re.compile("/dev/input/event([0-9]+)")
for path in util.list_devices("/dev/input/"):
regex_match = regex.fullmatch(path)
if not regex_match:
continue
device_number = int(regex_match[1])
path_number_pairs.append((path, device_number))
# The modification date of the devnode is not reliable unfortunately, so we
# are sorting by the number in the name
path_number_pairs.sort(key=lambda pair: pair[1], reverse=True)
for path, _ in path_number_pairs:
d = InputDevice(path)
if d.name == self.name:
return d

View File

@@ -0,0 +1,146 @@
import collections
import glob
import os
import re
import stat
from typing import Union, List
from . import ecodes
from .events import InputEvent, event_factory, KeyEvent, RelEvent, AbsEvent, SynEvent
def list_devices(input_device_dir: Union[str, bytes, os.PathLike] = "/dev/input") -> List[str]:
"""List readable character devices in ``input_device_dir``."""
fns = glob.glob("{}/event*".format(input_device_dir))
return list(filter(is_device, fns))
def is_device(fn: Union[str, bytes, os.PathLike]) -> bool:
"""Check if ``fn`` is a readable and writable character device."""
if not os.path.exists(fn):
return False
m = os.stat(fn)[stat.ST_MODE]
if not stat.S_ISCHR(m):
return False
if not os.access(fn, os.R_OK | os.W_OK):
return False
return True
def categorize(event: InputEvent) -> Union[InputEvent, KeyEvent, RelEvent, AbsEvent, SynEvent]:
"""
Categorize an event according to its type.
The :data:`event_factory <evdev.events.event_factory>` dictionary
maps event types to sub-classes of :class:`InputEvent
<evdev.events.InputEvent>`. If the event cannot be categorized, it
is returned unmodified."""
if event.type in event_factory:
return event_factory[event.type](event)
else:
return event
def resolve_ecodes_dict(typecodemap, unknown="?"):
"""
Resolve event codes and types to their verbose names.
:param typecodemap: mapping of event types to lists of event codes.
:param unknown: symbol to which unknown types or codes will be resolved.
Example
-------
>>> resolve_ecodes_dict({ 1: [272, 273, 274] })
{ ('EV_KEY', 1): [('BTN_MOUSE', 272),
('BTN_RIGHT', 273),
('BTN_MIDDLE', 274)] }
If ``typecodemap`` contains absolute axis info (instances of
:class:`AbsInfo <evdev.device.AbsInfo>` ) the result would look
like:
>>> resolve_ecodes_dict({ 3: [(0, AbsInfo(...))] })
{ ('EV_ABS', 3L): [(('ABS_X', 0L), AbsInfo(...))] }
"""
for etype, codes in typecodemap.items():
type_name = ecodes.EV[etype]
# ecodes.keys are a combination of KEY_ and BTN_ codes
if etype == ecodes.EV_KEY:
ecode_dict = ecodes.keys
else:
ecode_dict = getattr(ecodes, type_name.split("_")[-1])
resolved = resolve_ecodes(ecode_dict, codes, unknown)
yield (type_name, etype), resolved
def resolve_ecodes(ecode_dict, ecode_list, unknown="?"):
"""
Resolve event codes and types to their verbose names.
Example
-------
>>> resolve_ecodes(ecodes.BTN, [272, 273, 274])
[(['BTN_LEFT', 'BTN_MOUSE'], 272), ('BTN_RIGHT', 273), ('BTN_MIDDLE', 274)]
"""
res = []
for ecode in ecode_list:
# elements with AbsInfo(), eg { 3 : [(0, AbsInfo(...)), (1, AbsInfo(...))] }
if isinstance(ecode, tuple):
if ecode[0] in ecode_dict:
l = ((ecode_dict[ecode[0]], ecode[0]), ecode[1])
else:
l = ((unknown, ecode[0]), ecode[1])
# just ecodes, e.g: { 0 : [0, 1, 3], 1 : [30, 48] }
else:
if ecode in ecode_dict:
l = (ecode_dict[ecode], ecode)
else:
l = (unknown, ecode)
res.append(l)
return res
def find_ecodes_by_regex(regex):
"""
Find ecodes matching a regex and return a mapping of event type to event codes.
regex can be a pattern string or a compiled regular expression object.
Example
-------
>>> find_ecodes_by_regex(r'(ABS|KEY)_BR(AKE|EAK)')
{1: [411], 3: [10]}
>>> res = find_ecodes_by_regex(r'(ABS|KEY)_BR(AKE|EAK)')
>>> resolve_ecodes_dict(res)
{
('EV_KEY', 1): [('KEY_BREAK', 411)],
('EV_ABS', 3): [('ABS_BRAKE', 10)]
}
"""
regex = re.compile(regex) # re.compile is idempotent
result = collections.defaultdict(list)
for type_code, codes in ecodes.bytype.items():
for code, names in codes.items():
names = (names,) if isinstance(names, str) else names
for name in names:
if regex.match(name):
result[type_code].append(code)
break
return dict(result)
__all__ = ("list_devices", "is_device", "categorize", "resolve_ecodes", "resolve_ecodes_dict", "find_ecodes_by_regex")