1. add Mission class

2. Improve ground control station protocol analysis

Signed-off-by: STT
This commit is contained in:
STT 2021-12-02 18:35:32 +08:00
parent 7b2b2900fb
commit d59dcae74e
7 changed files with 86 additions and 16 deletions

View File

@ -1 +1 @@
usv.py
settings.py

View File

@ -1,6 +1,6 @@
# USVControl
2021.12.1
2021.12.2
#### 介绍
@ -30,6 +30,11 @@ python版船控也可控两轮无人车作者初学python练手。
3. GroundControlStation类修复参数名错误和serial read字节数BUG
4. 打印数据供调试使用暂时放弃共享内存方法更改主函数名和Timer
2021.12.2
1. add Mission class
2. Improve ground control station protocol analysis
#### 开发计划
1. 完善地面站控制相关功能(任务、路点等功能)

View File

@ -12,3 +12,21 @@ def to_signed_number(number, max_bytes):
return number
else:
return number - 2 ** (8 * max_bytes)
def pretty_xml(element, indent, newline, level=0): # elemnt为传进来的Elment类参数indent用于缩进newline用于换行
"""为ElementTree处理的XML添加缩进和换行"""
if element: # 判断element是否有子元素
if (element.text is None) or element.text.isspace(): # 如果element的text没有内容
element.text = newline + indent * (level + 1)
else:
element.text = newline + indent * (level + 1) + element.text.strip() + newline + indent * (level + 1)
# else: # 此处两行如果把注释去掉Element的text也会另起一行
# element.text = newline + indent * (level + 1) + element.text.strip() + newline + indent * level
temp = list(element) # 将element转成list
for subelement in temp:
if temp.index(subelement) < (len(temp) - 1): # 如果不是list的最后一个元素说明下一个行是同级别元素的起始缩进应一致
subelement.tail = newline + indent * (level + 1)
else: # 如果是list的最后一个元素 说明下一行是母元素的结束,缩进应该少一个
subelement.tail = newline + indent * level
pretty_xml(subelement, indent, newline, level=level + 1) # 对子元素进行递归操作

View File

@ -1,6 +1,9 @@
import math
import time
from navigation import Navigation
from SelfBuiltModul.func import degrees_to_radians
class Control:
def __init__(self):
@ -32,7 +35,7 @@ class Control:
self.speed(usv)
elif self.data['mode'] == 'heading_speed':
self.heading_speed(usv)
elif self.data['mode'] == 'waypoint':
elif self.data['mode'] == 'waypoints':
self.waypoint(usv)
elif self.data['mode'] == 'waypoint_speed':
self.waypoint_speed(usv)
@ -62,7 +65,7 @@ class Control:
elif usv.gcs.command['setting'] == 4:
self.data['mode'] = 'heading_speed'
elif usv.gcs.command['setting'] == 5:
self.data['mode'] = 'waypoint'
self.data['mode'] = 'waypoints'
elif usv.gcs.command['setting'] == 6:
self.data['mode'] = 'waypoint_speed'
elif usv.gcs.command['setting'] == 7:
@ -100,32 +103,32 @@ class Control:
int(self.rudder_pid.calculate_pid(err, self.pid['heading_p'], self.pid['heading_i'],
self.pid['heading_d'])))
def gcs(self, usv):
def gcs(self, usv): # 1
self.data['rudder'] = limit_1000(int(usv.gcs.command['desired_rudder']))
self.data['thrust'] = limit_1000(int(usv.gcs.command['desired_thrust']))
def heading(self, usv):
def heading(self, usv): # 2
self.data['rudder'] = limit_1000(int(self.control_heading(usv)))
self.data['thrust'] = limit_1000(int(usv.gcs.command['desired_thrust']))
def speed(self, usv):
def speed(self, usv): # 3
self.data['rudder'] = limit_1000(int(usv.gcs.command['desired_rudder']))
self.data['thrust'] = limit_1000(int(self.control_speed(usv)))
def heading_speed(self, usv):
def heading_speed(self, usv): # 4
self.data['rudder'] = limit_1000(int(self.control_heading(usv)))
self.data['thrust'] = limit_1000(int(self.control_speed(usv)))
def waypoint(self, usv):
def waypoint(self, usv): # 5
pass
def waypoint_speed(self, usv):
def waypoint_speed(self, usv): # 6
pass
def trajectory_point(self, usv):
def trajectory_point(self, usv): # 7
pass
def mission(self, usv):
def mission(self, usv): # 8
pass
def control_heading(self, usv):

View File

@ -11,6 +11,7 @@ class GroundControlStation:
self.command = {'timestamp': 0.0, 'setting': 0, 'desired_heading': 0.0, 'desired_speed': 0.0,
'desired_latitude': 0.0, 'desired_longitude': 0.0, 'desired_rudder': 0, 'desired_thrust': 0,
'ignition': 0, 'buffer_err': 0}
self.waypoints = []
self.gcs = serial.Serial(com, 57600, timeout=0, write_timeout=0)
self.buffer = []
self.errors = 0
@ -100,14 +101,21 @@ class GroundControlStation:
del self.buffer[:packet_length]
continue
elif packet_id == 4:
elif packet_id == 4: # 任务上传请求
data_type = packet_data[10]
if packet_data_length == 11 and data_type == 0:
pass
self.waypoints.clear()
if packet_data_length == 31 and data_type == 1:
pass
way_point = struct.unpack('<ddf', bytes(packet_data[11:packet_data_length]))
is_same = False
if len(self.waypoints) != 0:
if self.waypoints[0] != way_point[0] and self.waypoints[1] != way_point[1] and self.waypoints[
2] != way_point[2]:
is_same = True
if not is_same:
self.waypoints.append(way_point)
if packet_data_length == 11 and data_type == 2:
pass
usv.mission.write(self.waypoints)
# 返回成功信号
data_bytes = struct.pack('<HdB', usv.control.pid['id'], time.time(), 0xff)
crc16 = calculate_crc16_ccitt(data_bytes, len(data_bytes))

34
mission.py Normal file
View File

@ -0,0 +1,34 @@
from xml.etree import ElementTree
from SelfBuiltModul.func import pretty_xml
class Mission:
"""任务相关类"""
def __init__(self, id):
self.file_name = f"AppData/{id}.xml"
def read(self):
"""去读xml文件返回路点列表"""
waypoints = []
with open(self.file_name) as wp:
tree = ElementTree.parse(wp)
root = tree.getroot()
for i in root:
if i.tag == 'waypoint':
waypoints.append(
(float(i.attrib['latitude']), float(i.attrib['longitude']), float(i.attrib['tolerance'])))
return waypoints
def write(self, waypoints):
"""将路点写入XML文件"""
root = ElementTree.Element('mission')
for way_point in waypoints:
sub = ElementTree.SubElement(root, 'waypoint',
{'latitude': str(way_point[0]), 'longitude': str(way_point[1]),
'tolerance': str(way_point[2])})
pretty_xml(root, '\t', '\n')
tree = ElementTree.ElementTree(root)
with open(self.file_name):
pass
tree.write(self.file_name, 'UTF-8', True)

2
usv.py
View File

@ -5,6 +5,7 @@ from navigation import Navigation
from control import Control
from settings import Settings
from ground_control_station import GroundControlStation
from mission import Mission
class UsvControl:
@ -17,6 +18,7 @@ class UsvControl:
self.settings.navigation_baudrate)
self.control = Control()
self.gcs = GroundControlStation(self.settings.gcs_com)
self.mission = Mission(self.settings.usv_id)
def ms10_run(self):
self.futaba.rcu_run(self)