Tools
bag2pcd
sudo apt install pcl_ros
roscore
rosrun pcl_ros bag_to_pcd <input_file.bag> <topic> <output_directory>
bag2vicon
In one terminal
rosbag play <input_file.bag>
In another terminal run
./bag2vicon.py
. It will subscribe the designated ros topic and save it into a txt file in the format [time, trans.x, trans.y, trans.z, rot.x, rot.y, rot.z, rot.w].Remember to change ros topic name, bag name, file name accordingly beforehand.
#!/usr/bin/env python
import csv
import rospy
from geometry_msgs.msg import TransformStamped
pose = []
def callback(msg):
secs = msg.header.stamp.secs
nsecs = msg.header.stamp.nsecs
time = float(str(secs) + "." + str(nsecs))
trans = msg.transform.translation
rot = msg.transform.rotation
stamp_float = [time, trans.x, trans.y, trans.z, rot.x, rot.y, rot.z, rot.w]
stamp_string = [format(s, ".4f") for s in stamp_float ]
pose.append(stamp_string)
if __name__ == '__main__':
ros_topic_name = "vicon/turtlebot/turtlebot"
ros_bag_name = "turtlebot_zed_pointcloud_sq2_vicon.bag"
save_to_file = "turtlebot_zed_pointcloud_sq2_vicon_gt.txt"
try:
rospy.init_node('converter', anonymous=True)
rospy.Subscriber(ros_topic_name, TransformStamped, callback)
rospy.spin()
finally:
with open(save_to_file, mode='w') as file:
file.writelines("# ground truth trajectory\n")
file.writelines("# file: " + ros_bag_name + "\n")
file.writelines("# timestamp tx ty tz qx qy qz qw\n")
with open(save_to_file, mode='a') as file:
file = csv.writer(file, delimiter=' ', quotechar='"', quoting=csv.QUOTE_MINIMAL)
for line in pose:
file.writerow(line)
vicon2gt
usage:
./vicon2gt.py <vicon_traj.txt> <pcd_folder>
It will iterate over pcd folder, extract the timestamp from the pcd filename, and look for the closest timestamp in
vicon_traj.txt
and save it into agt.log
file.
#!/usr/bin/python
import argparse
import random
import numpy
import sys
import os
_EPS = numpy.finfo(float).eps * 4.0
class CameraPose:
def __init__(self, meta, mat):
self.metadata = meta
self.pose = mat
def __str__(self):
return 'Metadata : ' + ' '.join(map(str, self.metadata)) + '\n' + \
"Pose : " + "\n" + numpy.array_str(self.pose)
def mr_read_trajectory(filename):
traj = []
with open(filename, 'r') as f:
metastr = f.readline()
while metastr:
metadata = map(int, metastr.split())
mat = numpy.zeros(shape = (4, 4))
for i in range(4):
matstr = f.readline()
mat[i, :] = numpy.fromstring(matstr, dtype = float, sep=' \t')
traj.append(CameraPose(metadata, mat))
metastr = f.readline()
return traj
def mr_write_trajectory(traj, filename):
with open(filename, 'w') as f:
for x in traj:
p = x.pose.tolist()
f.write(' '.join(map(str, x.metadata)) + '\n')
f.write('\n'.join(' '.join(map('{0:.12f}'.format, p[i])) for i in range(4)))
f.write('\n')
def transform44(l):
"""
Generate a 4x4 homogeneous transformation matrix from a 3D point and unit quaternion.
Input:
l -- tuple consisting of (stamp,tx,ty,tz,qx,qy,qz,qw) where
(tx,ty,tz) is the 3D position and (qx,qy,qz,qw) is the unit quaternion.
Output:
matrix -- 4x4 homogeneous transformation matrix
"""
t = l[1:4]
q = numpy.array(l[4:8], dtype=numpy.float64, copy=True)
nq = numpy.dot(q, q)
if nq < _EPS:
return numpy.array((
( 1.0, 0.0, 0.0, t[0])
( 0.0, 1.0, 0.0, t[1])
( 0.0, 0.0, 1.0, t[2])
( 0.0, 0.0, 0.0, 1.0)
), dtype=numpy.float64)
q *= numpy.sqrt(2.0 / nq)
q = numpy.outer(q, q)
return numpy.array((
(1.0-q[1, 1]-q[2, 2], q[0, 1]-q[2, 3], q[0, 2]+q[1, 3], t[0]),
( q[0, 1]+q[2, 3], 1.0-q[0, 0]-q[2, 2], q[1, 2]-q[0, 3], t[1]),
( q[0, 2]-q[1, 3], q[1, 2]+q[0, 3], 1.0-q[0, 0]-q[1, 1], t[2]),
( 0.0, 0.0, 0.0, 1.0)
), dtype=numpy.float64)
def read_trajectory(filename, matrix=True):
"""
Read a trajectory from a text file.
Input:
filename -- file to be read
matrix -- convert poses to 4x4 matrices
Output:
dictionary of stamped 3D poses
"""
file = open(filename)
data = file.read()
lines = data.replace(","," ").replace("\t"," ").split("\n")
list = [[float(v.strip()) for v in line.split(" ") if v.strip()!=""] for line in lines if len(line)>0 and line[0]!="#"]
list_ok = []
for i,l in enumerate(list):
if l[4:8]==[0,0,0,0]:
continue
isnan = False
for v in l:
if numpy.isnan(v):
isnan = True
break
if isnan:
sys.stderr.write("Warning: line %d of file '%s' has NaNs, skipping line\n"%(i,filename))
continue
list_ok.append(l)
if matrix :
traj = dict([(l[0],transform44(l[0:])) for l in list_ok])
else:
traj = dict([(l[0],l[1:8]) for l in list_ok])
return traj
def find_closest_index(L,t):
"""
Find the index of the closest value in a list.
Input:
L -- the list (sorted in ascending order)
t -- value to be found
Output:
index of the closest element
"""
beginning = 0
difference = abs(L[0] - t)
best = 0
end = len(L)
while beginning < end:
middle = int((end+beginning)/2)
if abs(L[middle] - t) < difference:
difference = abs(L[middle] - t)
best = middle
if t == L[middle]:
return middle
elif L[middle] > t:
end = middle
else:
beginning = middle + 1
return best
if __name__ == '__main__':
if len(sys.argv) < 3:
print("usage: ./vicon2gt.py <vicon_traj.txt> <pcd_folder>")
exit()
vicon = read_trajectory(sys.argv[1])
stamps_gt = list(vicon.keys())
stamps_gt.sort()
directory = os.listdir(sys.argv[2])
stamps_file = list()
for filename in directory:
if filename.rsplit('.', 1)[1] == 'pcd':
stamps_file.append(float(filename.rsplit('.', 1)[0]))
stamps_file.sort()
total_length = len(stamps_file)
i = 0
traj = []
for stamp in stamps_file:
idx = find_closest_index(stamps_gt, stamp)
trans = vicon[stamps_gt[idx]]
metadata = [i, i, total_length]
traj.append(CameraPose(metadata, trans))
i += 1
mr_write_trajectory(traj, "gt.log")
ply2pcd
#!/usr/bin/env python3
import sys
import open3d as o3d
def ply2pcd(plyFileName, pcdFileName):
pcd = o3d.io.read_point_cloud(plyFileName)
o3d.io.write_point_cloud(pcdFileName, pcd)
if __name__ == "__main__":
filename = sys.argv[1]
if filename.rsplit('.', 1)[1] == 'ply':
pcdname = filename.rsplit('.', 1)[0] + '.pcd'
ply2pcd(filename, pcdname)
ply2pcd batch
#!/usr/bin/env python3
import sys
import os
import open3d as o3d
def ply2pcd(plyFileName, pcdFileName):
pcd = o3d.io.read_point_cloud(plyFileName)
o3d.io.write_point_cloud(pcdFileName, pcd)
if __name__ == "__main__":
path = sys.argv[1]
listdir = os.listdir(path)
for filename in listdir:
if filename.split('.')[1] == 'ply':
pathin = path + '/' + filename
pathout = path + '/' + filename.split('.')[0] + '.pcd'
ply2pcd(pathin, pathout)
pcd2ply
#!/usr/bin/env python3
import sys
import open3d as o3d
def pcd2ply(pcdFileName, plyFileName):
pcd = o3d.io.read_point_cloud(pcdFileName)
o3d.io.write_point_cloud(plyFileName, pcd)
if __name__ == "__main__":
filename = sys.argv[1]
if filename.rsplit('.', 1)[1] == 'pcd':
newname = filename.rsplit('.', 1)[0] + '.ply'
pcd2ply(filename, newname)
pcd2ply batch
#!/usr/bin/env python3
import sys
import os
import open3d as o3d
def pcd2ply(pcdFileName, plyFileName):
pcd = o3d.io.read_point_cloud(pcdFileName)
o3d.io.write_point_cloud(plyFileName, pcd)
if __name__ == "__main__":
path = sys.argv[1]
listdir = os.listdir(path)
for filename in listdir:
if filename.split('.')[1] == 'pcd':
pathin = path + '/' + filename
pathout = path + '/' + filename.split('.')[0] + '.ply'
pcd2ply(pathin, pathout)
bin2pcd
#!/usr/bin/env python3
import numpy as np
import struct
import sys
import open3d as o3d
def bin_to_pcd(binFileName):
size_float = 4
list_pcd = []
with open(binFileName, "rb") as f:
byte = f.read(size_float * 4)
while byte:
x, y, z, intensity = struct.unpack("ffff", byte)
list_pcd.append([x, y, z])
byte = f.read(size_float * 4)
np_pcd = np.asarray(list_pcd)
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(np_pcd)
return pcd
def main(binFileName, pcdFileName):
pcd = bin_to_pcd(binFileName)
o3d.io.write_point_cloud(pcdFileName, pcd)
if __name__ == "__main__":
a = sys.argv[1]
b = sys.argv[2]
main(a, b)
Last updated