init project

This commit is contained in:
hofee 2024-09-19 13:26:58 -05:00
parent 4e3ab91bf1
commit 85f606a2a3
4 changed files with 402 additions and 0 deletions

151
app.py Normal file
View File

@ -0,0 +1,151 @@
from flask import Flask, request, jsonify
import os
import json
import base64
import pickle
import numpy as np
from flask_cors import CORS
from data_load import DataLoadUtil
from pts import PtsUtil
app = Flask(__name__)
CORS(app)
ROOT = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'data')
print(ROOT)
@app.route('/get_scene_list', methods=['POST'])
def get_scene_list():
data = request.json
dataset_name = data.get('dataset_name')
dataset_path = os.path.join(ROOT, dataset_name)
if not os.path.exists(dataset_path):
return jsonify({"error": "Dataset not found"}), 404
scene_list = [d for d in os.listdir(dataset_path) if os.path.isdir(os.path.join(dataset_path, d))]
return jsonify({"scene_list": scene_list, "success": True})
@app.route('/get_scene_info', methods=['POST'])
def get_scene_info():
data = request.json
dataset_name = data.get('dataset_name')
scene_name = data.get('scene_name')
scene_path = os.path.join(ROOT, dataset_name, scene_name)
camera_params_path = os.path.join(scene_path, 'camera_params')
label_json_path = os.path.join(scene_path, 'label.json')
if not os.path.exists(scene_path) or not os.path.exists(label_json_path):
return jsonify({"error": "Scene or label.json not found"}), 404
with open(label_json_path, 'r') as f:
label_data = json.load(f)
sequence_length = len([f for f in os.listdir(camera_params_path) if os.path.isfile(os.path.join(camera_params_path, f))])
max_coverage_rate = label_data.get('max_coverage_rate')
best_sequence = label_data.get('best_sequence')
best_sequence_length = len(best_sequence)
best_sequence_formatted = []
for i in range(best_sequence_length):
best_sequence_formatted.append(
{
"frame": best_sequence[i][0],
"coverage_rate": round(best_sequence[i][1]*100,1)
}
)
return jsonify({
"sequence_length": sequence_length,
"max_coverage_rate": round(max_coverage_rate*100,2),
"best_sequence_length": best_sequence_length,
"best_sequence": best_sequence_formatted,
"success": True
})
def read_image_as_base64(file_path):
try:
with open(file_path, 'rb') as image_file:
encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
return encoded_string
except FileNotFoundError:
return None
@app.route('/get_frame_data', methods=['POST'])
def get_frame_data():
data = request.json
dataset_name = data.get('dataset_name')
scene_name = data.get('scene_name')
sequence = data.get('sequence')
scene_path = os.path.join(ROOT, dataset_name, scene_name)
root = os.path.join(ROOT, dataset_name)
camera_params_path = os.path.join(scene_path, 'camera_params')
depth_path = os.path.join(scene_path, 'depth')
mask_path = os.path.join(scene_path, 'mask')
points_and_normals_path = os.path.join(scene_path, 'points_and_normals.txt')
points_and_normals = np.loadtxt(points_and_normals_path)
model_points = points_and_normals[:, :3]
if not all([os.path.exists(scene_path), os.path.exists(camera_params_path), os.path.exists(depth_path), os.path.exists(mask_path)]):
return jsonify({"error": "Invalid paths or files not found"}), 404
result = []
combined_point_cloud = np.zeros((0, 3))
last_CR = 0
for frame_info in sequence:
frame_id = frame_info.get('frame')
frame_data = {}
camera_params_file = os.path.join(camera_params_path, f'{frame_id}.json')
if os.path.exists(camera_params_file):
with open(camera_params_file, 'r') as f:
camera_params = json.load(f)
frame_data['camera_params'] = camera_params
else:
frame_data['camera_params'] = None
depth_file = os.path.join(depth_path, f'{frame_id}_L.png')
depth_base64 = read_image_as_base64(depth_file)
frame_data['depth'] = depth_base64 if depth_base64 else None
mask_file = os.path.join(mask_path, f'{frame_id}_L.png')
mask_base64 = read_image_as_base64(mask_file)
frame_data['mask'] = mask_base64 if mask_base64 else None
path = DataLoadUtil.get_path(root, scene_name, frame_id)
point_cloud = DataLoadUtil.get_point_cloud_world_from_path(path)
sampled_point_cloud = PtsUtil.voxel_downsample_point_cloud(point_cloud, 0.01)
frame_data['new_point_cloud'] = sampled_point_cloud.tolist()
frame_data['combined_point_cloud'] = combined_point_cloud.tolist()
combined_point_cloud = np.concatenate([combined_point_cloud, sampled_point_cloud], axis=0)
combined_point_cloud = PtsUtil.voxel_downsample_point_cloud(combined_point_cloud, 0.01)
frame_data["coverage_rate"] = frame_info.get('coverage_rate')
delta_CR = frame_data["coverage_rate"] - last_CR
frame_data["delta_CR"] = round(delta_CR,2)
last_CR = frame_data["coverage_rate"]
result.append({
"frame_id": frame_id,
"data": frame_data
})
return jsonify({"seq_frame_data": result,"model_pts":model_points, "success": True})
@app.route('/analysis_inference_result', methods=['POST'])
def analysis_inference_result():
res = {"success": True}
if 'file' not in request.files:
res["success"] = False
res["message"] = "No file part"
return jsonify(res)
file = request.files['file']
if file.filename == '':
res["success"] = False
res["message"] = "No selected file"
return jsonify(res)
try:
data = pickle.load(file)
except Exception as e:
res["success"] = False
res["message"] = f"File processing error: {e}"
return jsonify(res)
print(data)
return jsonify(res)
if __name__ == '__main__':
app.run(debug=True, port=13333)

210
data_load.py Normal file
View File

@ -0,0 +1,210 @@
import os
import numpy as np
import json
import cv2
import trimesh
from pts import PtsUtil
class DataLoadUtil:
@staticmethod
def get_path(root, scene_name, frame_idx):
path = os.path.join(root, scene_name, f"{frame_idx}")
return path
@staticmethod
def get_label_path(root, scene_name):
path = os.path.join(root,scene_name, f"label.json")
return path
@staticmethod
def get_sampled_model_points_path(root, scene_name):
path = os.path.join(root,scene_name, f"sampled_model_points.txt")
return path
@staticmethod
def get_scene_seq_length(root, scene_name):
camera_params_path = os.path.join(root, scene_name, "camera_params")
return len(os.listdir(camera_params_path))
@staticmethod
def load_downsampled_world_model_points(root, scene_name):
model_path = DataLoadUtil.get_sampled_model_points_path(root, scene_name)
model_points = np.loadtxt(model_path)
return model_points
@staticmethod
def save_downsampled_world_model_points(root, scene_name, model_points):
model_path = DataLoadUtil.get_sampled_model_points_path(root, scene_name)
np.savetxt(model_path, model_points)
@staticmethod
def load_original_model_points(model_dir, object_name):
model_path = os.path.join(model_dir, object_name, "mesh.obj")
mesh = trimesh.load(model_path)
return mesh.vertices
@staticmethod
def load_scene_info(root, scene_name):
scene_info_path = os.path.join(root, scene_name, "scene_info.json")
with open(scene_info_path, "r") as f:
scene_info = json.load(f)
return scene_info
@staticmethod
def load_target_object_pose(root, scene_name):
scene_info = DataLoadUtil.load_scene_info(root, scene_name)
target_name = scene_info["target_name"]
transformation = scene_info[target_name]
location = transformation["location"]
rotation_euler = transformation["rotation_euler"]
pose_mat = trimesh.transformations.euler_matrix(*rotation_euler)
pose_mat[:3, 3] = location
return pose_mat
@staticmethod
def load_depth(path, min_depth=0.01,max_depth=5.0,binocular=True):
def load_depth_from_real_path(real_path, min_depth, max_depth):
depth = cv2.imread(real_path, cv2.IMREAD_UNCHANGED)
depth = depth.astype(np.float32) / 65535.0
min_depth = min_depth
max_depth = max_depth
depth_meters = min_depth + (max_depth - min_depth) * depth
return depth_meters
if binocular:
depth_path_L = os.path.join(os.path.dirname(path), "depth", os.path.basename(path) + "_L.png")
depth_path_R = os.path.join(os.path.dirname(path), "depth", os.path.basename(path) + "_R.png")
depth_meters_L = load_depth_from_real_path(depth_path_L, min_depth, max_depth)
depth_meters_R = load_depth_from_real_path(depth_path_R, min_depth, max_depth)
return depth_meters_L, depth_meters_R
else:
depth_path = os.path.join(os.path.dirname(path), "depth", os.path.basename(path) + ".png")
depth_meters = load_depth_from_real_path(depth_path, min_depth, max_depth)
return depth_meters
@staticmethod
def load_seg(path, binocular=True):
if binocular:
def clean_mask(mask_image):
green = [0, 255, 0, 255]
red = [255, 0, 0, 255]
threshold = 2
mask_image = np.where(np.abs(mask_image - green) <= threshold, green, mask_image)
mask_image = np.where(np.abs(mask_image - red) <= threshold, red, mask_image)
return mask_image
mask_path_L = os.path.join(os.path.dirname(path), "mask", os.path.basename(path) + "_L.png")
mask_image_L = clean_mask(cv2.imread(mask_path_L, cv2.IMREAD_UNCHANGED))
mask_path_R = os.path.join(os.path.dirname(path), "mask", os.path.basename(path) + "_R.png")
mask_image_R = clean_mask(cv2.imread(mask_path_R, cv2.IMREAD_UNCHANGED))
return mask_image_L, mask_image_R
else:
mask_path = os.path.join(os.path.dirname(path), "mask", os.path.basename(path) + ".png")
mask_image = cv2.imread(mask_path)
return mask_image
@staticmethod
def load_label(path):
with open(path, 'r') as f:
label_data = json.load(f)
return label_data
@staticmethod
def load_rgb(path):
rgb_path = os.path.join(os.path.dirname(path), "rgb", os.path.basename(path) + ".png")
rgb_image = cv2.imread(rgb_path, cv2.IMREAD_COLOR)
return rgb_image
@staticmethod
def cam_pose_transformation(cam_pose_before):
offset = np.asarray([
[1, 0, 0, 0],
[0, -1, 0, 0],
[0, 0, -1, 0],
[0, 0, 0, 1]])
cam_pose_after = cam_pose_before @ offset
return cam_pose_after
@staticmethod
def load_cam_info(path, binocular=False):
camera_params_path = os.path.join(os.path.dirname(path), "camera_params", os.path.basename(path) + ".json")
with open(camera_params_path, 'r') as f:
label_data = json.load(f)
cam_to_world = np.asarray(label_data["extrinsic"])
cam_to_world = DataLoadUtil.cam_pose_transformation(cam_to_world)
cam_intrinsic = np.asarray(label_data["intrinsic"])
cam_info = {
"cam_to_world": cam_to_world,
"cam_intrinsic": cam_intrinsic,
"far_plane": label_data["far_plane"],
"near_plane": label_data["near_plane"]
}
if binocular:
cam_to_world_R = np.asarray(label_data["extrinsic_R"])
cam_to_world_R = DataLoadUtil.cam_pose_transformation(cam_to_world_R)
cam_info["cam_to_world_R"] = cam_to_world_R
return cam_info
@staticmethod
def get_target_point_cloud(depth, cam_intrinsic, cam_extrinsic, mask, target_mask_label=(0,255,0,255)):
h, w = depth.shape
i, j = np.meshgrid(np.arange(w), np.arange(h), indexing='xy')
z = depth
x = (i - cam_intrinsic[0, 2]) * z / cam_intrinsic[0, 0]
y = (j - cam_intrinsic[1, 2]) * z / cam_intrinsic[1, 1]
points_camera = np.stack((x, y, z), axis=-1).reshape(-1, 3)
mask = mask.reshape(-1,4)
target_mask = (mask == target_mask_label).all(axis=-1)
target_points_camera = points_camera[target_mask]
target_points_camera_aug = np.concatenate([target_points_camera, np.ones((target_points_camera.shape[0], 1))], axis=-1)
target_points_world = np.dot(cam_extrinsic, target_points_camera_aug.T).T[:, :3]
return {
"points_world": target_points_world,
"points_camera": target_points_camera
}
@staticmethod
def get_point_cloud_world_from_path(path, binocular=True):
cam_info = DataLoadUtil.load_cam_info(path, binocular=binocular)
if binocular:
voxel_size = 0.005
depth_L, depth_R = DataLoadUtil.load_depth(path, cam_info['near_plane'], cam_info['far_plane'], binocular=True)
mask_L, mask_R = DataLoadUtil.load_seg(path, binocular=True)
point_cloud_L = DataLoadUtil.get_target_point_cloud(depth_L, cam_info['cam_intrinsic'], cam_info['cam_to_world'], mask_L)['points_world']
point_cloud_R = DataLoadUtil.get_target_point_cloud(depth_R, cam_info['cam_intrinsic'], cam_info['cam_to_world_R'], mask_R)['points_world']
point_cloud_L = PtsUtil.random_downsample_point_cloud(point_cloud_L, 16384)
point_cloud_R = PtsUtil.random_downsample_point_cloud(point_cloud_R, 16384)
voxel_indices_L = np.floor(point_cloud_L / voxel_size).astype(np.int32)
voxel_indices_R = np.floor(point_cloud_R / voxel_size).astype(np.int32)
voxels_L = set(map(tuple, voxel_indices_L))
voxels_R = set(map(tuple, voxel_indices_R))
overlap_voxels = voxels_L.intersection(voxels_R)
overlap_points = point_cloud_L[np.array([tuple(v) in overlap_voxels for v in voxel_indices_L])]
return overlap_points
else:
depth = DataLoadUtil.load_depth(path, cam_info['near_plane'], cam_info['far_plane'])
mask = DataLoadUtil.load_seg(path)
point_cloud = DataLoadUtil.get_target_point_cloud(depth, cam_info['cam_intrinsic'], cam_info['cam_to_world'], mask)['points_world']
return point_cloud
@staticmethod
def get_point_cloud_list_from_seq(root, scene_name, num_frames, binocular=False):
point_cloud_list = []
for frame_idx in range(num_frames):
path = DataLoadUtil.get_path(root, scene_name, frame_idx)
point_cloud = DataLoadUtil.get_point_cloud_world_from_path(path, binocular)
point_cloud_list.append(point_cloud)
return point_cloud_list

22
pts.py Normal file
View File

@ -0,0 +1,22 @@
import numpy as np
import open3d as o3d
class PtsUtil:
@staticmethod
def voxel_downsample_point_cloud(point_cloud, voxel_size=0.005):
o3d_pc = o3d.geometry.PointCloud()
o3d_pc.points = o3d.utility.Vector3dVector(point_cloud)
downsampled_pc = o3d_pc.voxel_down_sample(voxel_size)
return np.asarray(downsampled_pc.points)
@staticmethod
def transform_point_cloud(points, pose_mat):
points_h = np.concatenate([points, np.ones((points.shape[0], 1))], axis=1)
points_h = np.dot(pose_mat, points_h.T).T
return points_h[:, :3]
@staticmethod
def random_downsample_point_cloud(point_cloud, num_points):
idx = np.random.choice(len(point_cloud), num_points, replace=False)
return point_cloud[idx]

19
reconstruction.py Normal file
View File

@ -0,0 +1,19 @@
import numpy as np
import open3d as o3d
class ReconstructionUtil:
@staticmethod
def reconstruct_with_pts(pts):
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(pts)
pcd.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.001, max_nn=30))
mesh, densities = o3d.geometry.TriangleMesh.create_from_point_cloud_poisson(pcd, depth=9)
densities = np.asarray(densities)
vertices_to_remove = densities < np.quantile(densities, 0.03)
mesh.remove_vertices_by_mask(vertices_to_remove)
return mesh
if __name__ == "__main__":
path = r"C:\Document\Local Project\nbv_rec_visualize\mis\sampled_model_points.txt"
test_pts = np.loadtxt(path)
mesh = ReconstructionUtil.reconstruct_with_pts(test_pts)
o3d.io.write_triangle_mesh("output_mesh.obj", mesh)