...
Table of Contents | ||||
---|---|---|---|---|
|
...
Basic SIC Connector
Introduction
The first main component is Python API also provides its own concrete implementation of the AbstractSICConnector
class, called the BasicSICConnector
. It enables allows you to send action commands to the robot and receive data generated by either the robot or the SIC itself.
...
action commands
use the methods available in the SIC connector
allow you to create your application using the SIC framework
data generated
...
register callback functions for each action you send.
callback functions
called when the action is finished or a result becomes available
e.g.: when a button is pressed
LeftBumperPressed
or when an action is finishedWakeUpDone
)the results of certainfor device actions (e.g.:
Input
IP address of the used SIC server (usually
localhost
)
Usage
The AbstractSICConnector
class is abstract, meaning that it itself does not do anything with the incoming data. To process the incoming data you can implement your own concrete SIC Connector class by inheriting the AbstractSICConnector
class and overriding the empty event handlers.
Example
In the below MyConnector
example, you see that it uses the AbstractSICConnector
class as a parent, inheriting all its methods. Two things have been added:
...
the on_robot_event
method is overridden to print all the events generated by the robot
...
wake_up
,say
orset_eye_color
), the callback function is called only oncefor touch events (e.g.
MiddleTactilTouched
), the callback function is called every time the event becomes availablethe result of vision operations (e.g.
on_face_recognized(identifier)
), the callback function is called every time the result becomes available
Example
Code Block | ||||
---|---|---|---|---|
| ||||
import threading from social_interaction_cloud.abstractbasic_connector import AbstractSICConnectorBasicSICConnector from time import sleep class MyConnector(AbstractSICConnector)Example: def __init__(self, server_ip): super(MyConnector, self).__init__.sic = BasicSICConnector(server_ip) def run(self): self.awake_lock = self.startthreading.Event() def self.set_language('en-US')run(self): sleep(1)# active #Social waitInteraction forCloud theconnection language to change self.say('Hello, world!') sic.start() sleep(3) # waitset for thelanguage robot to be done speaking (to see the relevant prints)English self.stop(self.sic.set_language('en-US') def on_robot_event(self, event): # stand up and wait until this print(event)action is done # Run(whenever the applicationcallback my_connector = MyConnector(server_ip='127.0.0.1') my_connector.run() |
The working of the example is as follows:
run
self.start()
activates the connection.Under the hood, a thread is started allowing the connector to receive actions
self.set_language('en-US')
andself.say('Hello, world!')
are the two actions sent to the robot to make it say ‘Hello, world!’ in English.self.stop()
gracefully closes the connection.The sleep statements avoid the program to stop before all the events are generated.
See what happens when you remove the sleep statements. Most of the time you do not know how long you have to wait for an action to finish. Therefore, sleep statements are not the way to go. Ideally, you want the device to wait until it has received the necessary data and select it’s next action based on the available data
on_robot_event
The
on_robot_event
method will print all incoming events, which are:LanguageChanged
,TextStarted
, andTextDone
. If you, for example, touch a robot's head sensors (while the program is running), the eventsFrontTactilTouched
,MiddleTactilTouched
, and/orRearTactilTouched
will also be printed.
Note: These methods, as are all actions, are asynchronous. This means that they do not wait for a result before continuing. It also allows, if supported by the connected device(s), to execute actions in parallel (e.g. simultaneously speaking and gesturing).
You will find extensive documentation for each available action and on_* trigger on
Basic SIC Connector
Introduction
The Python API also provides its own concrete implementation of the AbstractSICConnector
class, called the BasicSICConnector
. It allows you to register callback functions for each action you send.
callback functions
called when the action is finished or a result becomes available
e.g.:
for device actions (e.g.:
wake_up
,say
orset_eye_color
), the callback function is called only oncefor touch events (e.g.
MiddleTactilTouched
), the callback function is called every time the event becomes availablethe result of vision operations (e.g.
on_face_recognized(identifier)
), the callback function is called every time the result becomes available
Example
Code Block | ||||
---|---|---|---|---|
| ||||
import threading
from social_interaction_cloud.basic_connector import BasicSICConnector
from time import sleep
class Example:
def __init__(self, server_ip):
self.sic = BasicSICConnector(server_ip)
self.awake_lock = threading.Event()
def run(self):
# active Social Interaction Cloud connection
self.sic.start()
# set language to English
self.sic.set_language('en-US')
# stand up and wait until this action is done (whenever the callback function self.awake is called)
self.sic.wake_up(self.awake)
self.awake_lock.wait() # see https://docs.python.org/3/library/threading.html#event-objects
self.sic.say_animated('You can tickle me by touching my head.')
# Execute that_tickles call each time the middle tactile is touched
self.sic.subscribe_touch_listener('MiddleTactilTouched', self.that_tickles)
# You have 10 seconds to tickle the robot
sleep(10)
# Unsubscribe the listener if you don't need it anymore.
self.sic.unsubscribe_touch_listener('MiddleTactilTouched')
# Go to rest mode
self.sic.rest()
# close the Social Interaction Cloud connection
self.sic.stop()
def awake(self):
"""Callback function for wake_up action. Called only once.
It lifts the lock, making the program continue from self.awake_lock.wait()"""
self.awake_lock.set()
def that_tickles(self):
"""Callback function for touch listener. Everytime the MiddleTactilTouched event is generated, this
callback function is called, making the robot say 'That tickles!'"""
self.sic.say_animated('That tickles!')
example = Example('127.0.0.1')
example.run() |
In the example above:
A connected Nao robot will stand up, saying “You can tickle me by touching my head”
To wait until the Nao has finished standing up, the program is locked by the
self.awake_lock.wait()
statement.awake_lock
is an threading.Event() object, that blocks the main thread until the threading.Event() is set by callingself.awake_lock.set()
. This is done in theawake
callback function. This callback function is added to thewake_up
action.
Once the robot is finished standing up,
awake
is called, and the “lock is lifted”, allowing the program to continue.
For 10 seconds will say “that tickles” every time you touch the sensor on the middle of its head.
After 10 seconds, the Nao will sit down again.
A different callback function is that_tickles
. It is subscribed to the MiddleTactilTouched
event. Whenever the program is running, that_tickles
is called each time the middle head sensor is touched.
Action, ActionFactory, and ActionRunner
Introduction
To help define behaviors, the Python API offers some additional facilities related to actions. An Action
allows you to prepare an action and (re)use it when necessary.
Usage
It requires a reference to a method of BasicSICConnector
and the input arguments for that method. Optionally you can give it a callback function and a threading.Event()
object as lock.
Note: you have to explicitly state callback=...
and lock=...
The following snippet provides an example of how to do so:
Code Block | ||||
---|---|---|---|---|
| ||||
sic = BasicSICConnector(server_ip)
hello_action_lock = threading.Event()
hello_action = Action(self.sic.say, 'Hello, Action!', callback=hello_action_callback,
lock=hello_action_lock)
hello_action.perform().wait() # perform() returns the lock, so you can immediately call wait() on it.
hello_action.perform().wait() # you can reuse an action.
def hello_action_callback():
print('Hello Action Done')
hello_action_lock.set()
hello_action_lock.clear() # necessary for reuse |
To ActionFactory
helps build actions and especially can save you the trouble of managing all the different locks you might need. The following actions define the core functionality of ActionFactory
:
The
build_action
method returns a regular action. But instead of providing a reference to theBasicSICConnector
method, you can use its name.
Code Block | ||
---|---|---|
| ||
sic = BasicSICConnector(server_ip)
action_factory = ActionFactory(sic)
hello_action_factory = action_factory.build_action('say', 'Hello, Action Factory!')
hello_action_factory.peform() |
2. The build_waiting_action
method returns a waiting action. The ActionFactory
creates a lock with a callback function that releases the lock for you. Optionally you can also add your own callback function, that will be embedded in the callback function created by the ActionFactory
.
Code Block | ||||
---|---|---|---|---|
| ||||
wake_up_action = action_factory.build_waiting_action('wake_up', additional_callback=awake)
wake_up_action.perform().wait()
def awake():
print('I am fully awake now') |
3. The build_touch_listener
method lets you build a touch listener that is registered to the BasicSICConnector
.
You can register to all the robot’s sensor events (e.g.
MiddleTactilTouched
).You can use it to wait until a button is pressed or to do something when a button is pressed.
The listener can be notified only once or every time (until you unregister the listener) the button is pressed.
The
build_vision_listener
method works similarly to thebuild_touch_listener
method. It lets you build a listener for four specific vision events:on_person_detected
,on_face_recognized
,on_emotion_detected
, andon_corona_check_passed
.For these events to be generated you need to turn on the People Detection, Face Recognition, Emotion Detection, and Corona Checker services respectively. To build a vision listener for these three events, you the label ‘people’, ‘face’, ‘emotion’ or ‘corona’ respectively for the
vision_type
parameter.
8. Finally, the last component of the action package is the ActionRunner
.
It allows you to run regular and waiting actions right away with
run_action
andrun_waiting_action
respectively.It takes the same input as the
ActionFactory
methods because that is being called under the hood first.ActionRunner
also allows you to preload a set of actions, run them in parallel, and wait for all of the waiting actions (not regular actions) to finish before continuing. Useload_waiting_action
andload_waiting_action
to preload actions andrun_loaded_actions
to run them.
Code Block | ||||
---|---|---|---|---|
| ||||
sic = BasicSICConnector(server_ip) action_runner = ActionRunner(sic) action_runner.run_waiting_action('say', 'Hello, Action Runner!', additional_callback=hello_action_runner_callback) # run_waiting_action('say', ...) waits to finish talking before continuing def hello_action_runner_callback(): print('Hello Action Runner Done') # The following actions will run in parallel. action_runner.load_waiting_action('wake_up') action_runner.load_waiting_action('set_eye_color', 'green') action_runner.load_waiting_action('say', 'I am standing up now') action_runner.run_loaded_actions() # If you want to keep an action sequence for reuse, add clear=False as input. action_runner.run_action('say', 'I will start sitting down as I say this. Because I am not a waiting action') action_runner.run_action('set_eye_color', 'white') action_runner.run_waiting_action('rest') function self.awake is called) self.sic.wake_up(self.awake) self.awake_lock.wait() # see https://docs.python.org/3/library/threading.html#event-objects self.sic.say_animated('You can tickle me by touching my head.') # Execute that_tickles call each time the middle tactile is touched self.sic.subscribe_touch_listener('MiddleTactilTouched', self.that_tickles) # You have 10 seconds to tickle the robot sleep(10) # Unsubscribe the listener if you don't need it anymore. self.sic.unsubscribe_touch_listener('MiddleTactilTouched') # Go to rest mode self.sic.rest() # close the Social Interaction Cloud connection self.sic.stop() def awake(self): """Callback function for wake_up action. Called only once. It lifts the lock, making the program continue from self.awake_lock.wait()""" self.awake_lock.set() def that_tickles(self): """Callback function for touch listener. Everytime the MiddleTactilTouched event is generated, this callback function is called, making the robot say 'That tickles!'""" self.sic.say_animated('That tickles!') example = Example('127.0.0.1') example.run() |
In the example above:
A connected Nao robot will stand up, saying “You can tickle me by touching my head”
To wait until the Nao has finished standing up, the program is locked by the
self.awake_lock.wait()
statement.awake_lock
is an threading.Event() object, that blocks the main thread until the threading.Event() is set by callingself.awake_lock.set()
. This is done in theawake
callback function. This callback function is added to thewake_up
action.
Once the robot is finished standing up,
awake
is called, and the “lock is lifted”, allowing the program to continue.
For 10 seconds will say “that tickles” every time you touch the sensor on the middle of its head.
After 10 seconds, the Nao will sit down again.
A different callback function is that_tickles
. It is subscribed to the MiddleTactilTouched
event. Whenever the program is running, that_tickles
is called each time the middle head sensor is touched.
State Machines Interaction Flows
Implementing a social interaction flow will go more efficiently if your code could have a similar structure to a graph/flowchart. Each step in the interaction is going from one state to another, based on the input from an end-user and the goals of the robot. To structure your code using state and state transitions you can use the state machine design pattern. See https://levelup.gitconnected.com/an-example-based-introduction-to-finite-state-machines-f908858e450f, https://medium.datadriveninvestor.com/state-machine-design-pattern-why-how-example-through-spring-state-machine-part-1-f13872d68c2d or this paper robot.
To structure your code using state and state transitions you can use the state machine design pattern. See Gkasdrogkas (2020), Nath (2019) or Shalyto et al. for a more extensive explanation of what they are.
...
define what triggers a transition (e.g.: a button press)
define prerequisites of a state transition (e.g.: to get from the sleep to the awake state, a robot first needs to stand up)
Usage
In order to facilitate the implementation of state machines, the library pytransitions ca be used. It is “a lightweight, object-oriented finite state machine implementation in Python with many extensions”. Read their guide to learn more.
Example
Let’s look at an example of how to use it together with the SIC Python API. The example is comprised of a basic interaction flow. In this interaction flow, the robot starts by being asleep, wakes up, introduces itself and gets acquainted with the person and then says goodbye.
...
Code Block | ||||
---|---|---|---|---|
| ||||
from transitions import Machine class ExampleRobot(object): states = ['asleep', 'awake', 'introduced', 'got_acquainted', 'goodbye'] def __init__(self): self.machine = Machine(model=self, states=ExampleRobot.states, initial='asleep') self.machine.add_transition(trigger='start', source='asleep', dest='awake', before='wake_up', after='introduce') self.machine.add_transition(trigger='introduce', source='awake', dest='introduced', before='introduction', after='get_acquainted') ... def wake_up(self) -> None: self.action_runner.load_waiting_action('sic.set_language(', 'en-US') self.action_runner.load_waiting_action('sic.wake_up'() self.action_runnersic.run_loaded_actions() robot = ExampleRobot() robot.start() # causes state transition from asleep to awake |
define all the states of the state machine (line 5)
initialise the state machine with the model, state and initial state (line 8)
add transitions between states (line 9) - if we have an instantiation of the
ExampleRobot
class we can now call thestart
method (trigger) to cause a transition from the initialasleep
state (source) the theawake
state (destination):the trigger to a transition is the method that causes the transition
in this case, the robot will wake up upon the call of the
start
method (line 21)
the source of a transition is the previous state in which the state machine was
in this case, “asleep”
the destination of a transition is the state to which the transition directs the state machine is directed
in this case, “awake”
before trigger of a transition is a statement to trigger call a method before the transition happens
in this case, method
wake_up
(line 15)
after trigger of a transition is a statement to trigger after a transition happens
in this case, “introduce” becomes a trigger to the next transition “introduced” (line 11)
note: Often there are no external triggers to trigger a state transition in the human-robot interaction flow. For example, when the robot is awake and ready it should automatically move to a next state. Adding an
after
parameter to the next transition as a trigger would address this issue
For a complete working example see https://bitbucket.org/socialroboticshub/connectorsexamples/src/mastermain/python/4_state_machine_example.py .