Tools

bag2pcd

sudo apt install pcl_ros
roscore
rosrun pcl_ros bag_to_pcd <input_file.bag> <topic> <output_directory>

bag2vicon

  • In one terminal rosbag play <input_file.bag>

  • In another terminal run ./bag2vicon.py. It will subscribe the designated ros topic and save it into a txt file in the format [time, trans.x, trans.y, trans.z, rot.x, rot.y, rot.z, rot.w].

  • Remember to change ros topic name, bag name, file name accordingly beforehand.

#!/usr/bin/env python
import csv
import rospy
from geometry_msgs.msg import TransformStamped

pose = []

def callback(msg):
    secs = msg.header.stamp.secs
    nsecs = msg.header.stamp.nsecs
    time = float(str(secs) + "." + str(nsecs))
    trans = msg.transform.translation
    rot = msg.transform.rotation
    stamp_float = [time, trans.x, trans.y, trans.z, rot.x, rot.y, rot.z, rot.w]
    stamp_string = [format(s, ".4f") for s in stamp_float ]
    pose.append(stamp_string)

if __name__ == '__main__':
    ros_topic_name = "vicon/turtlebot/turtlebot"
    ros_bag_name = "turtlebot_zed_pointcloud_sq2_vicon.bag"
    save_to_file = "turtlebot_zed_pointcloud_sq2_vicon_gt.txt"
    try:
        rospy.init_node('converter', anonymous=True)
        rospy.Subscriber(ros_topic_name, TransformStamped, callback)
        rospy.spin()
    finally:
        with open(save_to_file, mode='w') as file:
            file.writelines("# ground truth trajectory\n")
            file.writelines("# file: " + ros_bag_name + "\n")
            file.writelines("# timestamp tx ty tz qx qy qz qw\n")
        with open(save_to_file, mode='a') as file:
            file = csv.writer(file, delimiter=' ', quotechar='"', quoting=csv.QUOTE_MINIMAL)
            for line in pose:
                file.writerow(line)

vicon2gt

  • usage: ./vicon2gt.py <vicon_traj.txt> <pcd_folder>

  • It will iterate over pcd folder, extract the timestamp from the pcd filename, and look for the closest timestamp in vicon_traj.txt and save it into a gt.log file.

#!/usr/bin/python
import argparse
import random
import numpy
import sys
import os
_EPS = numpy.finfo(float).eps * 4.0

class CameraPose:
    def __init__(self, meta, mat):
        self.metadata = meta
        self.pose = mat
    def __str__(self):
        return 'Metadata : ' + ' '.join(map(str, self.metadata)) + '\n' + \
            "Pose : " + "\n" + numpy.array_str(self.pose)

def mr_read_trajectory(filename):
    traj = []
    with open(filename, 'r') as f:
        metastr = f.readline()
        while metastr:
            metadata = map(int, metastr.split())
            mat = numpy.zeros(shape = (4, 4))
            for i in range(4):
                matstr = f.readline()
                mat[i, :] = numpy.fromstring(matstr, dtype = float, sep=' \t')
            traj.append(CameraPose(metadata, mat))
            metastr = f.readline()
    return traj

def mr_write_trajectory(traj, filename):
    with open(filename, 'w') as f:
        for x in traj:
            p = x.pose.tolist()
            f.write(' '.join(map(str, x.metadata)) + '\n')
            f.write('\n'.join(' '.join(map('{0:.12f}'.format, p[i])) for i in range(4)))
            f.write('\n')

def transform44(l):
    """
    Generate a 4x4 homogeneous transformation matrix from a 3D point and unit quaternion.
    
    Input:
    l -- tuple consisting of (stamp,tx,ty,tz,qx,qy,qz,qw) where
         (tx,ty,tz) is the 3D position and (qx,qy,qz,qw) is the unit quaternion.
         
    Output:
    matrix -- 4x4 homogeneous transformation matrix
    """
    t = l[1:4]
    q = numpy.array(l[4:8], dtype=numpy.float64, copy=True)
    nq = numpy.dot(q, q)
    if nq < _EPS:
        return numpy.array((
        (                1.0,                 0.0,                 0.0, t[0])
        (                0.0,                 1.0,                 0.0, t[1])
        (                0.0,                 0.0,                 1.0, t[2])
        (                0.0,                 0.0,                 0.0, 1.0)
        ), dtype=numpy.float64)
    q *= numpy.sqrt(2.0 / nq)
    q = numpy.outer(q, q)
    return numpy.array((
        (1.0-q[1, 1]-q[2, 2],     q[0, 1]-q[2, 3],     q[0, 2]+q[1, 3], t[0]),
        (    q[0, 1]+q[2, 3], 1.0-q[0, 0]-q[2, 2],     q[1, 2]-q[0, 3], t[1]),
        (    q[0, 2]-q[1, 3],     q[1, 2]+q[0, 3], 1.0-q[0, 0]-q[1, 1], t[2]),
        (                0.0,                 0.0,                 0.0, 1.0)
        ), dtype=numpy.float64)

def read_trajectory(filename, matrix=True):
    """
    Read a trajectory from a text file. 
    
    Input:
    filename -- file to be read
    matrix -- convert poses to 4x4 matrices
    
    Output:
    dictionary of stamped 3D poses
    """
    file = open(filename)
    data = file.read()
    lines = data.replace(","," ").replace("\t"," ").split("\n") 
    list = [[float(v.strip()) for v in line.split(" ") if v.strip()!=""] for line in lines if len(line)>0 and line[0]!="#"]
    list_ok = []
    for i,l in enumerate(list):
        if l[4:8]==[0,0,0,0]:
            continue
        isnan = False
        for v in l:
            if numpy.isnan(v): 
                isnan = True
                break
        if isnan:
            sys.stderr.write("Warning: line %d of file '%s' has NaNs, skipping line\n"%(i,filename))
            continue
        list_ok.append(l)
    if matrix :
      traj = dict([(l[0],transform44(l[0:])) for l in list_ok])
    else:
      traj = dict([(l[0],l[1:8]) for l in list_ok])
    return traj

def find_closest_index(L,t):
    """
    Find the index of the closest value in a list.
    
    Input:
    L -- the list (sorted in ascending order)
    t -- value to be found
    
    Output:
    index of the closest element
    """
    beginning = 0
    difference = abs(L[0] - t)
    best = 0
    end = len(L)
    while beginning < end:
        middle = int((end+beginning)/2)
        if abs(L[middle] - t) < difference:
            difference = abs(L[middle] - t)
            best = middle
        if t == L[middle]:
            return middle
        elif L[middle] > t:
            end = middle
        else:
            beginning = middle + 1
    return best

if __name__ == '__main__':
    if len(sys.argv) < 3:
        print("usage: ./vicon2gt.py <vicon_traj.txt> <pcd_folder>")
        exit()

    vicon = read_trajectory(sys.argv[1])
    stamps_gt = list(vicon.keys())
    stamps_gt.sort()

    directory = os.listdir(sys.argv[2])
    stamps_file = list()
    for filename in directory:
        if filename.rsplit('.', 1)[1] == 'pcd':
            stamps_file.append(float(filename.rsplit('.', 1)[0]))
    stamps_file.sort()

    total_length = len(stamps_file)
    i = 0
    traj = []
    for stamp in stamps_file:
        idx = find_closest_index(stamps_gt, stamp)
        trans = vicon[stamps_gt[idx]]
        metadata = [i, i, total_length]
        traj.append(CameraPose(metadata, trans))
        i += 1
    mr_write_trajectory(traj, "gt.log")

ply2pcd

#!/usr/bin/env python3
import sys
import open3d as o3d

def ply2pcd(plyFileName, pcdFileName):
    pcd = o3d.io.read_point_cloud(plyFileName)
    o3d.io.write_point_cloud(pcdFileName, pcd)

if __name__ == "__main__":
    filename = sys.argv[1]
    if filename.rsplit('.', 1)[1] == 'ply':
        pcdname = filename.rsplit('.', 1)[0] + '.pcd'
        ply2pcd(filename, pcdname)

ply2pcd batch

#!/usr/bin/env python3
import sys
import os
import open3d as o3d

def ply2pcd(plyFileName, pcdFileName):
    pcd = o3d.io.read_point_cloud(plyFileName)
    o3d.io.write_point_cloud(pcdFileName, pcd)

if __name__ == "__main__":
    path = sys.argv[1]
    listdir = os.listdir(path)
    for filename in listdir:
        if filename.split('.')[1] == 'ply':
            pathin = path + '/' + filename
        pathout = path + '/' + filename.split('.')[0] + '.pcd'
        ply2pcd(pathin, pathout)

pcd2ply

#!/usr/bin/env python3
import sys
import open3d as o3d

def pcd2ply(pcdFileName, plyFileName):
    pcd = o3d.io.read_point_cloud(pcdFileName)
    o3d.io.write_point_cloud(plyFileName, pcd)

if __name__ == "__main__":
    filename = sys.argv[1]
    if filename.rsplit('.', 1)[1] == 'pcd':
        newname = filename.rsplit('.', 1)[0] + '.ply'
        pcd2ply(filename, newname)

pcd2ply batch

#!/usr/bin/env python3
import sys
import os
import open3d as o3d

def pcd2ply(pcdFileName, plyFileName):
    pcd = o3d.io.read_point_cloud(pcdFileName)
    o3d.io.write_point_cloud(plyFileName, pcd)

if __name__ == "__main__":
    path = sys.argv[1]
    listdir = os.listdir(path)
    for filename in listdir:
        if filename.split('.')[1] == 'pcd':
            pathin = path + '/' + filename
        pathout = path + '/' + filename.split('.')[0] + '.ply'
        pcd2ply(pathin, pathout)

bin2pcd

#!/usr/bin/env python3
import numpy as np
import struct
import sys
import open3d as o3d

def bin_to_pcd(binFileName):
    size_float = 4
    list_pcd = []
    with open(binFileName, "rb") as f:
        byte = f.read(size_float * 4)
        while byte:
            x, y, z, intensity = struct.unpack("ffff", byte)
            list_pcd.append([x, y, z])
            byte = f.read(size_float * 4)
    np_pcd = np.asarray(list_pcd)
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(np_pcd)
    return pcd

def main(binFileName, pcdFileName):
    pcd = bin_to_pcd(binFileName)
    o3d.io.write_point_cloud(pcdFileName, pcd)

if __name__ == "__main__":
    a = sys.argv[1]
    b = sys.argv[2]
    main(a, b)

Last updated