To show the remote access to Oncilla API Level 2 we provide an example written in Python that performs an simple sine movement, by sending commands to the Simulation over the Robotics Service Bus (RSB), which is available in Python, C++, Java and Common Lisp.
In order to start the example, usew Webots to open the world file Example3.wbt which is installed by the Oncilla Project Wizard. This world file will start a controller for Oncilla, that listens to commands send over RSB.
You find the example Python script at /usr/share/cca-oncilla/examples/OncillaRemoteSimpleSineMovement.py. Start the python script:
python /usr/share/cca-oncilla/examples/OncillaRemoteSimpleSineMovement.py
The script does:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 | import math
import time
import rsb
import rst
import rstsandbox
steps = 0
from rsb.converter import registerGlobalConverter, ProtocolBufferConverter
from rst.geometry.Translation_pb2 import Translation
from rst.geometry.Pose_pb2 import Pose
from rst.kinematics.JointAngles_pb2 import JointAngles
from rst.generic.Value_pb2 import Value
robotPose = Pose()
LFPos = Translation()
def reg(klass):
registerGlobalConverter(ProtocolBufferConverter(messageClass=klass))
reg(Translation)
reg(Pose)
reg(JointAngles)
reg(Value)
rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromDefaultSources()
def hip_position(phase, amplitude, offset):
return amplitude * math.cos(phase) + offset
def knee_position(phase, phase_lag, amplitude, offset):
if ((phase + phase_lag) > math.pi):
return amplitude * 1 + offset
else:
return amplitude * 0 + offset
if __name__ == '__main__':
frequency = 1.6
time_s = 0.0
phase = 0.0
antiphase = 0.0
def rememberRobotPose(e):
global robotPose
robotPose = e.data
def rememberFootPos(e):
global LFPos
LFPos = e.data
def sendCommand(e):
global time_s
global steps
global frequency
global phase
global antiphase
hip_amplitude = 10
fore_hip_offset = 0
hind_hip_offset = 0
fore_knee_amplitude = 0.1
fore_knee_offset = 0.2
hind_knee_amplitude = 0.1
hind_knee_offset = 0.2
steps = steps + 1
time_s = e.metaData.createTime
phase = (time_s * frequency * 2 * math.pi) % (2 * math.pi)
antiphase = (phase + math.pi) % (2 * math.pi)
# Only print every 50th time
if (steps%50 == 0):
print "\x1b[1;1f\x1b[J" # Blanks the screen
print "\033[01;40m" # Blanks the screen
print "------ Step:", steps
print "------ Time:", time_s
print "\n------ Robot Pose ----\n", robotPose
print "------ Robot Angles --\n", e.data
print "------ LeftFore Pos --\n", LFPos
# generate the eight desired joint angles
a = JointAngles()
a.angles.append(hip_position(phase, hip_amplitude, fore_hip_offset) / 180.0 * math.pi) # lf hip
a.angles.append(hip_position(antiphase, hip_amplitude, fore_hip_offset) / 180.0 * math.pi) # rf hip
a.angles.append(hip_position(antiphase, hip_amplitude, hind_hip_offset) / 180.0 * math.pi) # lh hip
a.angles.append(hip_position(phase, hip_amplitude, hind_hip_offset) / 180.0 * math.pi) # rh hip
a.angles.append(knee_position(phase, fore_hip_knee_phase_lag, fore_knee_amplitude, fore_knee_offset)) # lf knee
a.angles.append(knee_position(antiphase, fore_hip_knee_phase_lag, fore_knee_amplitude, fore_knee_offset)) # rf knee
a.angles.append(knee_position(antiphase, hind_hip_knee_phase_lag, hind_knee_amplitude, hind_knee_offset)) # lh knee
a.angles.append(knee_position(phase, hind_hip_knee_phase_lag, hind_knee_amplitude, hind_knee_offset)) # rh knee
send_angles.publishData(a)
# after 500 steps, reset simulation
if (steps%1000 == 0):
print "\x1b[1;1f\x1b[J" # Blanks the screen
print "\033[01;40m" # Blanks the screen
print "RESET!"
reset = Value()
reset.type = 2
reset.int = 32 # Corresponds to 'component reset'
send_state.publishData(reset)
exe = Value()
exe.type = 2
exe.int = 12 # Corresponds to 'component execute'
send_state.publishData(exe)
try:
send_angles = rsb.createInformer("/oncilla/cmd/pos/all",
dataType=JointAngles)
send_state = rsb.createInformer("/cca/comp/OncillaWorld/statechange/",
dataType=Value)
currentangles = rsb.createListener("/oncilla/status/pos/all")
currentangles.addHandler(sendCommand)
footpos = rsb.createListener("/oncilla/supervisor/pose/rf/l4/")
footpos.addHandler(rememberFootPos)
robotpose = rsb.createListener("/oncilla/status/pose/trunk/")
robotpose.addHandler(rememberRobotPose)
while (True):
time.sleep(1)
finally:
currentangles.deactivate()
footpos.deactivate()
robotpose.deactivate()
send_angles.deactivate() # We always have to deactivate the informer
send_state.deactivate() # We always have to deactivate the informer
|