dialogue_configuration.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #coding=utf-8
3 
4 import json
5 import os
6 import codecs
7 import rospy
8 import subprocess
9 from std_msgs.msg import Bool, UInt8, String
10 from xbot_talker.msg import actionmode, actionmode_ros, action, action_ros, dialogue_ros, dialogue
11 from xbot_talker.srv import *
12 import chardet
13 import datetime
14 import csv
15 
16 import sys
17 reload(sys)
18 sys.setdefaultencoding('utf-8')
19 
20 
22  def __init__(self):
23  rospy.init_node('dialogue_config',anonymous=False)
24  self.get_actionmode_srv = rospy.Service('/app/get_actionmode',call_actionmode,self.call_actionmodes)
25  self.get_action_srv = rospy.Service('/app/get_action',call_action,self.call_actions)
26  self.add_dialogue_srv = rospy.Service('/app/add_dialogue',add_dialogue,self.add_dialogues)
27  self.get_history_srv = rospy.Service('/app/history_record',history_record,self.get_history)
28  self.roolback_srv = rospy.Service('/app/roolback_dialogue',roolback_dialogue,self.roolback_dialogue)
29 
30  rospy.spin()
31 
32 
33  # 配置nlp对话库csv文件并生效
34  def nlp_config(self, keyword, answer,action,action_mode):
35  rospy.wait_for_service('/xbot_talker/nlp_dialog_config')
36  nlp_dialog_config = rospy.ServiceProxy('/xbot_talker/nlp_dialog_config', nlpdialog_config)
37  res=nlp_dialog_config(keyword,answer,action,action_mode)
38  return res.error_code
39 
40 
41  # 配置asr重置语法参数
42  def asr_config(self,keyword):
43  rospy.wait_for_service('/xbot_talker/asr_keyword_config')
44  asr_keyword_config = rospy.ServiceProxy('/xbot_talker/asr_keyword_config', keyword_config)
45  res=asr_keyword_config(keyword)
46  return res.error_code
47 
48 
49  # 获取动作模式字典信息
50  def call_actionmodes(self, req):
51  filename = os.popen('rospack find xbot_talker').read().strip() + '/userconfig/json/action_mode.json'
52  if req.get_actionmode_req:
53  with open(filename, 'r') as f:
54  data = json.load(f)
55  actionmodes = data['ActionMode']
56  actionmode_list = actionmode_ros()
57  for i in range(len(actionmodes)):
58  tem = actionmode()
59  tem.actionmode_desc = actionmodes[i]['description']
60  tem.actionmode_name = actionmodes[i]['name']
61  tem.actionmode_flag = actionmodes[i]['flag']
62  actionmode_list.actionmode_ros.append(tem)
63  print(actionmode_list)
64  return call_actionmodeResponse(actionmode_list)
65  else:
66  rospy.logwarn("error: If you want to get the actionmodes, please set the get_actionmode_req True.")
67 
68  # 获取动作字典信息
69  def call_actions(self, req):
70  filename = os.popen('rospack find xbot_talker').read().strip() + '/userconfig/json/action.json'
71  if req.get_action_req:
72  with open(filename, 'r') as f:
73  data = json.load(f)
74  actions = data['Action']
75  action_list = action_ros()
76  for i in range(len(actions)):
77  tem = action()
78  tem.action_desc = actions[i]['description']
79  tem.action_name = actions[i]['name']
80  tem.action_flag = actions[i]['flag']
81  action_list.action_ros.append(tem)
82  rospy.loginfo(action_list)
83  return call_actionResponse(action_list)
84  else:
85  rospy.logwarn("error: If you want to get the actions, please set the get_action_req True.")
86 
87  # 保存对话配置并生效
88  def add_dialogues(self, req):
89  filename = os.popen('rospack find xbot_talker').read().strip() + '/userconfig/json/dialogue.json'
90  with open(filename, 'r') as f:
91  data = json.load(f)
92  dialogues = data['Dialogue']
93  print(req)
94  # 判断keyword是否已存在
95  for i in range(len(dialogues)):
96  if req.keyword == dialogues[i]['keyword']:
97  rospy.logwarn("False: This keyword already exist!")
98  return add_dialogueResponse(1)
99  # 获取时间信息
100  now_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
101  # 添加对话配置到json文件
102  new_dialogue = {'keyword':req.keyword,'answer':req.answer,'action':req.action,'action_mode':req.action_mode,'update_time':now_time}
103  dialogues.append(new_dialogue)
104  data['Dialogue'] = dialogues
105  with codecs.open(filename, 'w', 'utf-8') as file:
106  json.dump(data, file, indent=4, ensure_ascii=False)
107  # 配置keyword到bnf文件
108  print(chardet.detect(req.keyword))
109  filename_bnf = os.popen('rospack find xbot_talker').read().strip() + '/userconfig/grammar.bnf'
110  with open(filename_bnf,'r') as f:
111  data_bnf = f.read()
112  print(chardet.detect(data_bnf))
113  content = data_bnf.rfind(';')
114  if content != -1:
115  data_bnf = data_bnf[:content]+'|'+req.keyword+data_bnf[content:]
116  with open(filename_bnf,'w') as f:
117  f.write(data_bnf)
118  add_dialogue_enable = True
119  rospy.loginfo("Dialogue config【" + req.keyword + "】successful!")
120  # 启动build_grammar.launch使新加keyword生效
121  home_dir = "/home"
122  build_grammar_command = 'roslaunch xbot_talker build_grammar.launch'
123  build_grammar = subprocess.Popen(build_grammar_command, shell=True,
124  stdout=subprocess.PIPE,
125  stderr=subprocess.STDOUT,
126  cwd=home_dir)
127  rospy.sleep(1)
128 
129  errcorde = self.nlp_config(req.keyword,req.answer,req.action,req.action_mode)
130  if errcorde != 0:
131  rospy.logwarn("False: This keyword already exist in answer_dic!")
132  return add_dialogueResponse(1)
133 
134  errcorde = self.asr_config(req.keyword)
135  if errcorde != 0:
136  rospy.logwarn("False: reset keyword failed!")
137  return add_dialogueResponse(2)
138 
139  return add_dialogueResponse(0)
140 
141 
142  # 获取对话配置最近10条历史记录功能,查询并返回所有的配置项,为回滚功能提供数据
143  def get_history(self,req):
144  if req.get_history_record:
145  filename = os.popen('rospack find xbot_talker').read().strip() + '/userconfig/json/dialogue.json'
146  with open(filename, 'r') as f:
147  data = json.load(f)
148  dialogues = data['Dialogue']
149  dialogue_list = dialogue_ros()
150  for i in range(10):
151  tem = dialogue()
152  tem.keyword = dialogues[-10+i]['keyword']
153  tem.answer = dialogues[-10+i]['answer']
154  tem.update_time = dialogues[-10+i]['update_time']
155  tem.action = dialogues[-10+i]['action']
156  tem.action_mode = dialogues[-10+i]['action_mode']
157  dialogue_list.dialogue_ros.append(tem)
158  rospy.loginfo(dialogue_list)
159  return history_recordResponse(dialogue_list)
160  else:
161  rospy.logwarn("error: If you want to get the dialogue history record, please set the get_history_record True.")
162 
163 
164  # 对话配置回滚功能,根据配置时间可选择回滚到之前的设定
165  # 返回值0:成功;1:失败
166  def roolback_dialogue(self,req):
167  filename = os.popen('rospack find xbot_talker').read().strip() + '/userconfig/json/dialogue.json'
168  with open(filename, 'r') as f:
169  data = json.load(f)
170  dialogues = data['Dialogue']
171  config_status = 1
172  for i in range(len(dialogues)):
173  if req.version_info == dialogues[i]['update_time']:
174  config_status = 0
175  break
176  if config_status == 1:
177  rospy.logwarn("False: roolback_dialogue failed!")
178  return roolback_dialogueResponse(config_status)
179  # 回退dialogue.json
180  del dialogues[i+1:]
181  data['Dialogue'] = dialogues
182  with codecs.open(filename,'w','utf-8') as file:
183  json.dump(data, file, indent=4, ensure_ascii=False)
184  # 回退grammar.bnf
185  filename_bnf = os.popen('rospack find xbot_talker').read().strip() + '/userconfig/grammar.bnf'
186  with open(filename_bnf,'r') as f:
187  data_bnf = f.read()
188  roolback_keyword = dialogues[i]['keyword'].encode('utf-8')
189  content_bnf = data_bnf.rfind(roolback_keyword)
190  if content_bnf != -1:
191  data_bnf = data_bnf[:content_bnf+len(roolback_keyword)]+';'
192  print(data_bnf)
193  with open(filename_bnf,'w') as f:
194  f.write(data_bnf)
195 
196  # 重新构建语法使回滚操作生效
197  home_dir = "/home"
198  build_grammar_command = 'roslaunch xbot_talker build_grammar.launch'
199  build_grammar = subprocess.Popen(build_grammar_command, shell=True,
200  stdout=subprocess.PIPE,
201  stderr=subprocess.STDOUT,
202  cwd=home_dir)
203  rospy.sleep(1)
204 
205  # 回退answer_dic.csv
206  filename_csv = os.popen('rospack find xbot_talker').read().strip() + '/defaultconfig/answer_dic.csv'
207  with open(filename_csv,'r') as f:
208  data_csv = csv.reader(f)
209  new_data_csv = []
210  for item in data_csv:
211  new_data_csv.append(item)
212  if item[0]==roolback_keyword:
213  # roolback_flag = data_csv.line_num # 可用来查找行
214  break
215  with open(filename_csv,'w') as f:
216  writer = csv.writer(f)
217  writer.writerows(new_data_csv)
218 
219  # 重新加载answer_dic.csv使回滚操作生效
220  self.nlp_config("back_config","back_config",0,0)
221  rospy.loginfo("Successful: roolback_dialogue succeed!")
222  return roolback_dialogueResponse(config_status)
223 
224 
225  def shutdown(self):
226  while rospy.is_shutdown:
227  rospy.loginfo("Stopping ...")
228  rospy.sleep(1)
229  os._exit(0)
230 
231 
232 if __name__ == '__main__':
def nlp_config(self, keyword, answer, action, action_mode)


xbot_talker
Author(s): wangxiaoyun
autogenerated on Sat Oct 10 2020 03:27:53