test_comm.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import roslib; roslib.load_manifest('ur_driver')
3 import time, sys, threading, math
4 import copy
5 import datetime
6 import socket, select
7 import struct
8 import traceback, code
9 import SocketServer
10 
11 import rospy
12 
13 from sensor_msgs.msg import JointState
14 from ur_driver.deserialize import RobotState
15 from ur_msgs.msg import *
16 
17 # renaming classes
18 DigitalIn = Digital
19 DigitalOut = Digital
20 Flag = Digital
21 
22 MULT_analog = 1000000.0
23 MULT_analog_robotstate = 0.1
24 
25 joint_names = ['shoulder_pan_joint', 'shoulder_lift_joint', 'elbow_joint',
26  'wrist_1_joint', 'wrist_2_joint', 'wrist_3_joint']
27 joint_offsets = {}
28 
29 
30 def __on_packet(buf):
31  state = RobotState.unpack(buf)
32 
33  mb_msg = MasterboardDataMsg()
34  mb_msg.digital_input_bits = state.masterboard_data.digital_input_bits
35  mb_msg.digital_output_bits = state.masterboard_data.digital_output_bits
36  mb_msg.analog_input_range0 = state.masterboard_data.analog_input_range0
37  mb_msg.analog_input_range1 = state.masterboard_data.analog_input_range1
38  mb_msg.analog_input0 = state.masterboard_data.analog_input0
39  mb_msg.analog_input1 = state.masterboard_data.analog_input1
40  mb_msg.analog_output_domain0 = state.masterboard_data.analog_output_domain0
41  mb_msg.analog_output_domain1 = state.masterboard_data.analog_output_domain1
42  mb_msg.analog_output0 = state.masterboard_data.analog_output0
43  mb_msg.analog_output1 = state.masterboard_data.analog_output1
44  mb_msg.masterboard_temperature = state.masterboard_data.masterboard_temperature
45  mb_msg.robot_voltage_48V = state.masterboard_data.robot_voltage_48V
46  mb_msg.robot_current = state.masterboard_data.robot_current
47  mb_msg.master_io_current = state.masterboard_data.master_io_current
48  mb_msg.master_safety_state = state.masterboard_data.master_safety_state
49  mb_msg.master_onoff_state = state.masterboard_data.master_onoff_state
50  pub_masterboard_state.publish(mb_msg)
51 
52 
53  # Use information from the robot state packet to publish IOStates
54  msg = IOStates()
55  #gets digital in states
56  for i in range(0, 10):
57  msg.digital_in_states.append(DigitalIn(i, (state.masterboard_data.digital_input_bits & (1<<i))>>i))
58  #gets digital out states
59  for i in range(0, 10):
60  msg.digital_out_states.append(DigitalOut(i, (state.masterboard_data.digital_output_bits & (1<<i))>>i))
61  #gets analog_in[0] state
62  inp = state.masterboard_data.analog_input0 / MULT_analog_robotstate
63  msg.analog_in_states.append(Analog(0, inp))
64  #gets analog_in[1] state
65  inp = state.masterboard_data.analog_input1 / MULT_analog_robotstate
66  msg.analog_in_states.append(Analog(1, inp))
67  #gets analog_out[0] state
68  inp = state.masterboard_data.analog_output0 / MULT_analog_robotstate
69  msg.analog_out_states.append(Analog(0, inp))
70  #gets analog_out[1] state
71  inp = state.masterboard_data.analog_output1 / MULT_analog_robotstate
72  msg.analog_out_states.append(Analog(1, inp))
73  #print "Publish IO-Data from robot state data"
74  pub_io_states.publish(msg)
75 
76  # Notes:
77  # - Where are the flags coming from? Do we need flags? No, as 'prog' does not use them and other scripts are not running!
78  # - analog_input2 and analog_input3 are within ToolData
79  # - What to do with the different analog_input/output_range/domain?
80  # - Shall we have appropriate ur_msgs definitions in order to reflect MasterboardData, ToolData,...?
81 
82 
83 
84 
85 
86 def main():
87  rospy.init_node('test_comm', disable_signals=True)
88 
89  global pub_masterboard_state
90  pub_masterboard_state = rospy.Publisher('masterboard_state', MasterboardDataMsg, queue_size=1)
91  global pub_io_states
92  pub_io_states = rospy.Publisher('io_states', IOStates, queue_size=1)
93 
94 
95  robot_hostname = '192.168.0.42'
96  port = 30002
97 
98  test_socket = socket.create_connection((robot_hostname, port))
99  buf = ""
100 
101  while not rospy.is_shutdown():
102  more = test_socket.recv(4096)
103  if more:
104  buf = buf + more
105 
106  # Attempts to extract a packet
107  packet_length = struct.unpack_from("!i", buf)[0]
108  print("PacketLength: ", packet_length, "; BufferSize: ", len(buf))
109  if len(buf) >= packet_length:
110  packet, buf = buf[:packet_length], buf[packet_length:]
111  __on_packet(packet)
112  else:
113  print("There is no more...")
114 
115  test_socket.close()
116 
117 
118 
119 
120 if __name__ == '__main__': main()
def __on_packet(buf)
Definition: test_comm.py:30


ur_driver
Author(s): Stuart Glaser, Shaun Edwards, Felix Messmer
autogenerated on Sun Nov 24 2019 03:36:29