Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. the on_robot_event method is overridden to print all the events generated by the robot

  2. the run method sends actions to the SIC

Code Block
breakoutModewide
languagepy
from social_interaction_cloud.abstract_connector import AbstractSICConnector
from time import sleep


class MyConnector(AbstractSICConnector):
    def __init__(self, server_ip):
        super(MyConnector, self).__init__(server_ip)

    def run(self):
        self.start()
        self.set_language('en-US')
        sleep(1)  # wait for the language to change
        self.say('Hello, world!')
        sleep(3)  # wait for the robot to be done speaking (to see the relevant prints)
        self.stop()

    def on_robot_event(self, event):
        print(event)


# Run the application
my_connector = MyConnector(server_ip='127.0.0.1')
my_connector.run()

...

  • callback functions

    • called when the action is finished or a result becomes available

    • e.g.:

      • for device actions (e.g.: wake_up, say or set_eye_color), the callback function is called only once

      • for touch events (e.g. MiddleTactilTouched), the callback function is called every time the event becomes available

      • the result of vision operations (e.g. on_face_recognized(identifier)), the callback function is called every time the result becomes available

Example

Code Block
breakoutModewide
languagepy
import threading
from social_interaction_cloud.basic_connector import BasicSICConnector
from time import sleep


class Example:

    def __init__(self, server_ip):
        self.sic = BasicSICConnector(server_ip)

        self.awake_lock = threading.Event()

    def run(self):
        # active Social Interaction Cloud connection
        self.sic.start()

        # set language to English
        self.sic.set_language('en-US')

        # stand up and wait until this action is done (whenever the callback function self.awake is called)
        self.sic.wake_up(self.awake)
        self.awake_lock.wait()  # see https://docs.python.org/3/library/threading.html#event-objects

        self.sic.say_animated('You can tickle me by touching my head.')
        # Execute that_tickles call each time the middle tactile is touched
        self.sic.subscribe_touch_listener('MiddleTactilTouched', self.that_tickles)

        # You have 10 seconds to tickle the robot
        sleep(10)
        
        # Unsubscribe the listener if you don't need it anymore.
        self.sic.unsubscribe_touch_listener('MiddleTactilTouched')

        # Go to rest mode
        self.sic.rest()

        # close the Social Interaction Cloud connection
        self.sic.stop()

    def awake(self):
        """Callback function for wake_up action. Called only once.
        It lifts the lock, making the program continue from self.awake_lock.wait()"""

        self.awake_lock.set()

    def that_tickles(self):
        """Callback function for touch listener. Everytime the MiddleTactilTouched event is generated, this
         callback function is called, making the robot say 'That tickles!'"""
        
        self.sic.say_animated('That tickles!')


example = Example('127.0.0.1')
example.run()

...

Note: you have to explicitly state callback=... and lock=... The following snippet provides an example of how to do so:

Code Block
breakoutModewide
languagepy
sic = BasicSICConnector(server_ip)
hello_action_lock = threading.Event()

hello_action = Action(self.sic.say, 'Hello, Action!', callback=hello_action_callback,
                      lock=hello_action_lock)
hello_action.perform().wait()  # perform() returns the lock, so you can immediately call wait() on it.
hello_action.perform().wait()  # you can reuse an action.

def hello_action_callback():
    print('Hello Action Done')
    hello_action_lock.set()
    hello_action_lock.clear()  # necessary for reuse

...

Code Block
languagepy
sic = BasicSICConnector(server_ip)
action_factory = ActionFactory(sic)
hello_action_factory = action_factory.build_action('say', 'Hello, Action Factory!')
hello_action_factory.peform()

2. The build_waiting_action method returns a waiting action. The ActionFactory creates a lock with a callback function that releases the lock for you. Optionally you can also add your own callback function, that will be embedded in the callback function created by the ActionFactory.

Code Block
breakoutModewide
languagepy
wake_up_action = action_factory.build_waiting_action('wake_up', additional_callback=awake)
wake_up_action.perform().wait()

def awake():
  print('I am fully awake now')

3. The build_touch_listener method lets you build a touch listener that is registered to the BasicSICConnector.

  • You can register to all the robot’s sensor events (e.g. MiddleTactilTouched).

  • You can use it to wait until a button is pressed or to do something when a button is pressed.

  • The listener can be notified only once or every time (until you unregister the listener) the button is pressed.

  • The build_vision_listener method works similarly to the build_touch_listener method. It lets you build a listener for four specific vision events: on_person_detected, on_face_recognized, on_emotion_detected, and on_corona_check_passed.

  • For these events to be generated you need to turn on the People Detection, Face Recognition, Emotion Detection, and Corona Checker services respectively. To build a vision listener for these three events, you the label ‘people’, ‘face’, ‘emotion’ or ‘corona’ respectively for the vision_type parameter.

8. Finally, the last component of the action package is the ActionRunner.

  • It allows you to run regular and waiting actions right away with run_action and run_waiting_action respectively.

  • It takes the same input as the ActionFactory methods because that is being called under the hood first.

  • ActionRunner also allows you to preload a set of actions, run them in parallel, and wait for all of the waiting actions (not regular actions) to finish before continuing. Use load_waiting_action and load_waiting_action to preload actions and run_loaded_actions to run them.

Code Block
breakoutModewide
languagepy
sic = BasicSICConnector(server_ip)
action_runner = ActionRunner(sic)
action_runner.run_waiting_action('say', 'Hello, Action Runner!',
                                 additional_callback=hello_action_runner_callback)
# run_waiting_action('say', ...) waits to finish talking before continuing

def hello_action_runner_callback():
  print('Hello Action Runner Done')

# The following actions will run in parallel.
action_runner.load_waiting_action('wake_up')
action_runner.load_waiting_action('set_eye_color', 'green')
action_runner.load_waiting_action('say', 'I am standing up now')
action_runner.run_loaded_actions()  # If you want to keep an action sequence for reuse, add clear=False as input.

action_runner.run_action('say', 'I will start sitting down as I say this. Because I am not a waiting action')
action_runner.run_action('set_eye_color', 'white')
action_runner.run_waiting_action('rest')

State Machines Interaction Flows

Implementing a social interaction flow will go more efficiently if your code could have a similar structure to a graph/flowchart.Each step in the interaction is going from one state to another, based on the input from an end-user and the goals of the robot.

To structure your code using state and state transitions you can use the state machine design pattern. See https://levelup.gitconnected.com/an-example-based-introduction-to-finite-state-machines-f908858e450f, https://medium.datadriveninvestor.com/state-machine-design-pattern-why-how-example-through-spring-state-machine-part-1-f13872d68c2d or this paper for a more extensive explanation of what they are.

Using this approach you can create a whole chain of states, neatly separating each interaction step in different states and methods. It does not have to be a linear sequence. You can create branches and cycles, depending on the indented interaction flow.

The most important component of state machines are the state transitions.

  • define what triggers a transition (e.g.: a button press)

  • define prerequisites of a state transition (e.g.: to get from the sleep to the awake state, a robot first needs to stand up)

Usage

In order to facilitate the implementation of state machines, the library pytransitions ca be used. It is “a lightweight, object-oriented finite state machine implementation in Python with many extensions”. Read their guide to learn more.

Example

Let’s look at an example of how to use it together with the SIC Python API. The example is comprised of a basic interaction flow. In this interaction flow, the robot starts by being asleep, wakes up, introduces itself and gets acquainted with the person and then says goodbye.

It starts with creating a model class for a robot that has states and link it to a state machine:

Code Block
breakoutModewide
languagepy
from transitions import Machine

class ExampleRobot(object):
   
   states = ['asleep', 'awake', 'introduced', 'got_acquainted', 'goodbye']

   def __init__(self):
      self.machine = Machine(model=self, states=ExampleRobot.states, initial='asleep')
      self.machine.add_transition(trigger='start', source='asleep', dest='awake',
                            before='wake_up', after='introduce')
      self.machine.add_transition(trigger='introduce', source='awake', dest='introduced',
                            before='introduction', after='get_acquainted')
      ...
      
   def wake_up(self):
      self.action_runner.load_waiting_action('set_language', 'en-US')
      self.action_runner.load_waiting_action('wake_up')
      self.action_runner.run_loaded_actions()

robot = ExampleRobot()
robot.start() # causes state transition from asleep to awake
  1. define all the states of the state machine (line 5)

  2. initialise the state machine with the model, state and initial state (line 8)

  3. add transitions between states (line 9) - if we have an instantiation of the ExampleRobot class we can now call the start method (trigger) to cause a transition from the initial asleep state (source) the the awake state (destination):

    • the trigger to a transition is the method that causes the transition

      • in this case, the robot will wake up upon the call of the start method (line 21)

    • the source of a transition is the previous state in which the state machine was

      • in this case, “asleep”

    • the destination of a transition is the state to which the transition directs the state machine is directed

      • in this case, “awake”

    • before trigger of a transition is a statement to trigger a method before the transition happens

      • in this case, method wake_up (line 15)

    • after trigger of a transition is a statement to trigger after a transition happens

      • in this case, “introduce” becomes a trigger to the next transition “introduced” (line 11)

      • note: Often there are no external triggers to trigger a state transition in the human-robot interaction flow. For example, when the robot is awake and ready it should automatically move to a next state. Adding an after parameter to the next transition as a trigger would address this issue

For a complete working example see https://bitbucket.org/socialroboticshub/connectors/src/master/python/state_machine_example.py .