"""
Introduction
------------
panda-py is a Python library for the Franka Emika Robot System
that allows you to program and control the robot in real-time.
"""
import base64
import configparser
import dataclasses
import hashlib
import json as json_module
import logging
import os
import ssl
import threading
import typing
from urllib import parse
import requests
from requests.packages import urllib3
from websockets.sync.client import connect
# pylint: disable=no-name-in-module
from ._core import Panda, PandaContext, fk, ik, ik_full
__all__ = [
'Panda', 'PandaContext', 'constants', 'controllers', 'libfranka', 'motion',
'fk', 'ik', 'ik_full', 'Desk', 'TOKEN_PATH'
]
__version__ = '0.8.1'
_logger = logging.getLogger('desk')
TOKEN_PATH = '~/.panda_py/token.conf'
"""
Path to the configuration file holding known control tokens.
If :py:class:`Desk` is used to connect to a control unit's
web interface and takes control, the generated token is stored
in this file under the unit's IP address or hostname.
"""
@dataclasses.dataclass
class Token:
"""
Represents a Desk token owned by a user.
"""
id: str = ''
owned_by: str = ''
token: str = ''
[docs]
class Desk:
"""
Connects to the control unit running the web-based Desk interface
to manage the robot. Use this class to interact with the Desk
from Python, e.g. if you use a headless setup. This interface
supports common tasks such as unlocking the brakes, activating
the FCI etc.
Newer versions of the system software use role-based access
management to allow only one user to be in control of the Desk
at a time. The controlling user is authenticated using a token.
The :py:class:`Desk` class saves those token in :py:obj:`TOKEN_PATH`
and will use them when reconnecting to the Desk, retaking control.
Without a token, control of a Desk can only be taken, if there is
no active claim or the controlling user explicitly relinquishes control.
If the controlling user's token is lost, a user can take control
forcefully (cf. :py:func:`Desk.take_control`) but needs to confirm
physical access to the robot by pressing the circle button on the
robot's Pilot interface.
"""
def __init__(self,
hostname: str,
username: str,
password: str,
platform: str = 'panda') -> None:
urllib3.disable_warnings()
self._session = requests.Session()
self._session.verify = False
self._hostname = hostname
self._username = username
self._password = password
self._logged_in = False
self._token = self._load_token()
self._listening = False
self._listen_thread = None
self.login()
self._legacy = False
if platform.lower() in [
'panda', 'fer', 'franka_emika_robot', 'frankaemikarobot'
]:
self._platform = 'panda'
elif platform.lower() in ['fr3', 'frankaresearch3', 'franka_research_3']:
self._platform = 'fr3'
else:
raise ValueError("Unknown platform! Must be either 'panda' or 'fr3'!")
try:
self.take_control()
except ConnectionError as error:
if 'File not found' in str(error):
_logger.info('Legacy desk detected.')
self._legacy = True
else:
raise error
[docs]
def lock(self, force: bool = True) -> None:
"""
Locks the brakes. API call blocks until the brakes are locked.
"""
if self._platform == 'panda':
url = '/desk/api/robot/close-brakes'
elif self._platform == 'fr3':
url = '/desk/api/joints/lock'
self._request('post', url, files={'force': force})
[docs]
def unlock(self, force: bool = True) -> None:
"""
Unlocks the brakes. API call blocks until the brakes are unlocked.
"""
if self._platform == 'panda':
url = '/desk/api/robot/open-brakes'
elif self._platform == 'fr3':
url = '/desk/api/joints/unlock'
self._request('post',
url,
files={'force': force},
headers={'X-Control-Token': self._token.token})
[docs]
def reboot(self) -> None:
"""
Reboots the robot hardware (this will close open connections).
"""
self._request('post',
'/admin/api/reboot',
headers={'X-Control-Token': self._token.token})
[docs]
def activate_fci(self) -> None:
"""
Activates the Franka Research Interface (FCI). Note that the
brakes must be unlocked first. For older Desk versions, this
function does nothing.
"""
if not self._legacy:
self._request('post',
'/admin/api/control-token/fci',
json={'token': self._token.token})
[docs]
def deactivate_fci(self) -> None:
"""
Deactivates the Franka Research Interface (FCI). For older
Desk versions, this function does nothing.
"""
if not self._legacy:
self._request('delete',
'/admin/api/control-token/fci',
json={'token': self._token.token})
def _load_token(self) -> Token:
config_path = os.path.expanduser(TOKEN_PATH)
config = configparser.ConfigParser()
token = Token()
if os.path.exists(config_path):
config.read(config_path)
if config.has_section(self._hostname):
token.id = config.get(self._hostname, 'id')
token.owned_by = config.get(self._hostname, 'owned_by')
token.token = config.get(self._hostname, 'token')
return token
def _save_token(self, token: Token) -> None:
config_path = os.path.expanduser(TOKEN_PATH)
config = configparser.ConfigParser()
if os.path.exists(config_path):
config.read(config_path)
config[self._hostname] = {
'id': token.id,
'owned_by': token.owned_by,
'token': token.token
}
os.makedirs(os.path.dirname(config_path), exist_ok=True)
with open(config_path, 'w') as config_file:
config.write(config_file)
self._token = token
[docs]
def take_control(self, force: bool = False) -> bool:
"""
Takes control of the Desk, generating a new control token and saving it.
If `force` is set to True, control can be taken forcefully even if another
user is already in control. However, the user will have to press the circle
button on the robot's Pilot within an alotted amount of time to confirm
physical access.
For legacy versions of the Desk, this function does nothing.
"""
if self._legacy:
return True
active = self._get_active_token()
if active.id != '' and self._token.id == active.id:
_logger.info('Retaken control.')
return True
if active.id != '' and not force:
_logger.warning('Cannot take control. User %s is in control.',
active.owned_by)
return False
response = self._request(
'post',
f'/admin/api/control-token/request{"?force" if force else ""}',
json={
'requestedBy': self._username
}).json()
if force:
timeout = self._request('get',
'/admin/api/safety').json()['tokenForceTimeout']
_logger.warning(
'You have %d seconds to confirm control by pressing circle button on robot.',
timeout)
with connect(f'wss://{self._hostname}/desk/api/navigation/events',
server_hostname='robot.franka.de',
additional_headers={
'authorization':
self._session.cookies.get('authorization')
}) as websocket:
while True:
event: typing.Dict = json_module.loads(websocket.recv(timeout))
if 'circle' in event.keys():
if event['circle']:
break
self._save_token(
Token(str(response['id']), self._username, response['token']))
_logger.info('Taken control.')
return True
[docs]
def release_control(self) -> None:
"""
Explicitly relinquish control of the Desk. This will allow
other users to take control or transfer control to the next
user if there is an active queue of control requests.
"""
if self._legacy:
return
_logger.info('Releasing control.')
try:
self._request('delete',
'/admin/api/control-token',
json={'token': self._token.token})
except ConnectionError as err:
if 'ControlTokenUnknown' in str(err):
_logger.warning('Control release failed. Not in control.')
else:
raise err
self._token = Token()
[docs]
@staticmethod
def encode_password(username: str, password: str) -> bytes:
"""
Encodes the password into the form needed to log into the Desk interface.
"""
bytes_str = ','.join([
str(b) for b in hashlib.sha256((
f'{password}#{username}@franka').encode('utf-8')).digest()
])
return base64.encodebytes(bytes_str.encode('utf-8')).decode('utf-8')
[docs]
def login(self) -> None:
"""
Uses the object's instance parameters to log into the Desk.
The :py:class`Desk` class's constructor will try to connect
and login automatically.
"""
login = self._request(
'post',
'/admin/api/login',
json={
'login': self._username,
'password': self.encode_password(self._username, self._password)
})
self._session.cookies.set('authorization', login.text)
self._logged_in = True
_logger.info('Login succesful.')
[docs]
def logout(self) -> None:
"""
Logs the current user out of the Desk. API calls will no longer
be possible.
"""
self._request('post', '/admin/api/logout')
self._session.cookies.clear()
self._logged_in = False
_logger.info('Logout successful.')
def _get_active_token(self) -> Token:
token = Token()
if self._legacy:
return Token()
response = self._request('get', '/admin/api/control-token').json()
if response['activeToken'] is not None:
token.id = str(response['activeToken']['id'])
token.owned_by = response['activeToken']['ownedBy']
return token
[docs]
def has_control(self) -> bool:
"""
Returns:
bool: True if this instance is in control of the Desk.
"""
if self._legacy:
return True
return self._token.id == self._get_active_token().id
def _request(self,
method: typing.Literal['post', 'get', 'delete'],
url: str,
json: typing.Dict[str, str] = None,
headers: typing.Dict[str, str] = None,
files: typing.Dict[str, str] = None) -> requests.Response:
fun = getattr(self._session, method)
response: requests.Response = fun(parse.urljoin(f'https://{self._hostname}',
url),
json=json,
headers=headers,
files=files)
if response.status_code != 200:
raise ConnectionError(response.text)
return response
def _listen(self, cb, timeout):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
with connect(
f'wss://{self._hostname}/desk/api/navigation/events',
# server_hostname='robot.franka.de',
ssl_context=ctx,
additional_headers={
'authorization': self._session.cookies.get('authorization')
}) as websocket:
self._listening = True
while self._listening:
try:
event: typing.Dict = json_module.loads(websocket.recv(timeout))
cb(event)
except TimeoutError:
pass
[docs]
def listen(self, cb: typing.Callable[[typing.Dict], None]) -> None:
"""
Starts a thread listening to Pilot button events. All the Pilot buttons,
except for the `Pilot Mode` button can be captured. Make sure Pilot Mode is
set to Desk instead of End-Effector to receive direction key events. You can
change the Pilot mode by pressing the `Pilot Mode` button or changing the mode
in the Desk. Events will be triggered while buttons are pressed down or released.
Args:
cb: Callback fucntion that is called whenever a button event is received from the
Desk. The callback receives a dict argument that contains the triggered buttons
as keys. The values of those keys will depend on the kind of event, either True
for a button pressed down or False when released.
The possible buttons are: `circle`, `cross`, `check`, `left`, `right`, `down`,
and `up`.
"""
self._listen_thread = threading.Thread(target=self._listen, args=(cb, 1.0))
self._listen_thread.start()
[docs]
def stop_listen(self) -> None:
"""
Stop listener thread (cf. :py:func:`panda_py.Desk.listen`).
"""
self._listening = False
if self._listen_thread is not None:
self._listen_thread.join()