from flask import Flask, request, jsonify, send_from_directory import os import json import base64 import numpy as np import pickle from flask_cors import CORS from data_load import DataLoadUtil from reconstruction import ReconstructionUtil from pts import PtsUtil ROOT = os.path.join("./static") print(ROOT) app = Flask(__name__, static_folder="static") CORS(app) @app.route("/") def serve_index(): return send_from_directory(app.static_folder, "index.html") @app.route('/') def serve_static(filename): return send_from_directory(app.static_folder, filename) @app.route('/get_scene_list', methods=['POST']) def get_scene_list(): data = request.json dataset_name = data.get('dataset_name') dataset_path = os.path.join(ROOT, dataset_name) if not os.path.exists(dataset_path): print(f"Dataset not found: {dataset_path}") return jsonify({"error": "Dataset not found"}), 404 scene_list = [d for d in os.listdir(dataset_path) if os.path.isdir(os.path.join(dataset_path, d))] return jsonify({"scene_list": scene_list, "success": True}) @app.route('/get_label_list', methods=['POST']) def get_label_list(): data = request.json dataset_name = data.get('dataset_name') scene_name = data.get("scene_name") scene_dir = os.path.join(ROOT, dataset_name, scene_name) label_dir = os.path.join(scene_dir, "label") if not os.path.exists(scene_dir): print(f"Scene not found: {scene_dir}") return jsonify({"error": "Scene not found"}), 404 label_list = [] global_min_coverage_rate = 1 global_max_coverage_rate = 0 global_total_coverage_rate = 0 for label_file in os.listdir(label_dir): if label_file.endswith(".json"): label_path = os.path.join(label_dir, label_file) with open(label_path, 'r') as f: label_data = json.load(f) max_coveraget_rate = label_data.get('max_coverage_rate') if max_coveraget_rate > global_max_coverage_rate: global_max_coverage_rate = max_coveraget_rate if max_coveraget_rate < global_min_coverage_rate: global_min_coverage_rate = max_coveraget_rate label_list.append({ "label_name": label_file, "max_coverage_rate": round(max_coveraget_rate*100,3) }) global_total_coverage_rate += max_coveraget_rate if len(label_list) == 0: global_mean_coverage_rate = 0 else: global_mean_coverage_rate = global_total_coverage_rate / len(label_list) return jsonify({"label_list": label_list, "total_max_coverage_rate": round(global_max_coverage_rate*100, 3), "total_min_coverage_rate": round(global_min_coverage_rate*100, 3), "total_mean_coverage_rate": round(global_mean_coverage_rate*100, 3), "success": True}) @app.route('/get_scene_info', methods=['POST']) def get_scene_info(): data = request.json dataset_name = data.get('dataset_name') scene_name = data.get('scene_name') label_name = data.get('label_name') scene_path = os.path.join(ROOT, dataset_name, scene_name) camera_params_path = os.path.join(scene_path, 'camera_params') label_json_path = os.path.join(scene_path, "label", label_name) if not os.path.exists(scene_path) or not os.path.exists(label_json_path): return jsonify({"error": "Scene or label.json not found"}), 404 with open(label_json_path, 'r') as f: label_data = json.load(f) sequence_length = len([f for f in os.listdir(camera_params_path) if os.path.isfile(os.path.join(camera_params_path, f))]) max_coverage_rate = label_data.get('max_coverage_rate') best_sequence = label_data.get('best_sequence') best_sequence_length = len(best_sequence) best_sequence_formatted = [] for i in range(best_sequence_length): best_sequence_formatted.append( { "frame": best_sequence[i][0], "coverage_rate": round(best_sequence[i][1]*100,1) } ) return jsonify({ "sequence_length": sequence_length, "max_coverage_rate": round(max_coverage_rate*100,2), "best_sequence_length": best_sequence_length, "best_sequence": best_sequence_formatted, "success": True }) def read_image_as_base64(file_path): try: with open(file_path, 'rb') as image_file: encoded_string = base64.b64encode(image_file.read()).decode('utf-8') return encoded_string except FileNotFoundError: return None @app.route('/get_frame_data', methods=['POST']) def get_frame_data(): data = request.json dataset_name = data.get('dataset_name') scene_name = data.get('scene_name') sequence = data.get('sequence') scene_path = os.path.join(ROOT, dataset_name, scene_name) root = os.path.join(ROOT, dataset_name) camera_params_path = os.path.join(scene_path, 'camera_params') depth_path = os.path.join(scene_path, 'depth') mask_path = os.path.join(scene_path, 'mask') voxel_threshold = 0.005 # model_points_normals = DataLoadUtil.load_points_normals(ROOT, scene_name) # model_pts = model_points_normals[:,:3] # down_sampled_model_pts = PtsUtil.voxel_downsample_point_cloud(model_pts, ) model_points_normals = DataLoadUtil.load_points_normals(root, scene_name) model_points = model_points_normals[:, :3] obj_path = os.path.join(dataset_name, scene_name, 'mesh', 'world_target_mesh.obj') mtl_path = os.path.join(dataset_name, scene_name, 'mesh', 'material.mtl') if not all([os.path.exists(scene_path), os.path.exists(camera_params_path), os.path.exists(depth_path), os.path.exists(mask_path)]): return jsonify({"error": "Invalid paths or files not found"}), 404 result = [] combined_point_cloud = np.zeros((0, 3)) last_CR = 0 for frame_info in sequence: frame_id = frame_info.get('frame') frame_data = {} path = DataLoadUtil.get_path(root, scene_name, frame_id) cam_params = DataLoadUtil.load_cam_info(path, binocular=True) frame_data['cam_to_world'] = cam_params['cam_to_world'].tolist() depth_file = os.path.join(depth_path, f'{frame_id}_L.png') depth_base64 = read_image_as_base64(depth_file) frame_data['depth'] = depth_base64 if depth_base64 else None mask_file = os.path.join(mask_path, f'{frame_id}_L.png') mask_base64 = read_image_as_base64(mask_file) frame_data['mask'] = mask_base64 if mask_base64 else None point_cloud = DataLoadUtil.get_target_point_cloud_world_from_path(path, binocular=True) sampled_point_cloud = ReconstructionUtil.filter_points(point_cloud, model_points_normals, cam_params['cam_to_world'], voxel_size=voxel_threshold, theta=75) #sampled_point_cloud = point_cloud sampled_point_cloud = PtsUtil.voxel_downsample_point_cloud(sampled_point_cloud, voxel_threshold) frame_data['new_point_cloud'] = sampled_point_cloud.tolist() frame_data['combined_point_cloud'] = combined_point_cloud.tolist() new_added_pts = ReconstructionUtil.get_new_added_points(combined_point_cloud, sampled_point_cloud, threshold=voxel_threshold) frame_data["new_added_pts"] = new_added_pts.tolist() combined_point_cloud = np.concatenate([combined_point_cloud, sampled_point_cloud], axis=0) combined_point_cloud = PtsUtil.voxel_downsample_point_cloud(combined_point_cloud, voxel_threshold) frame_data["coverage_rate"] = frame_info.get('coverage_rate') delta_CR = frame_data["coverage_rate"] - last_CR frame_data["delta_CR"] = round(delta_CR,2) last_CR = frame_data["coverage_rate"] result.append({ "frame_id": frame_id, "data": frame_data }) return jsonify({"seq_frame_data": result,"model_pts":model_points.tolist(), "obj_path": obj_path, "mtl_path":mtl_path, "success": True}) def rotation_6d_to_matrix_numpy(d6): a1, a2 = d6[:3], d6[3:] b1 = a1 / np.linalg.norm(a1) b2 = a2 - np.dot(b1, a2) * b1 b2 = b2 / np.linalg.norm(b2) b3 = np.cross(b1, b2) return np.stack((b1, b2, b3), axis=-2) def parse_to_frame_data(inference_result): result = [] combined_point_cloud = np.zeros((0, 3)) last_CR = 0 for idx in range(len(inference_result["pts_seq"])): frame_data ={} pose_9d = inference_result["pred_pose_9d_seq"][idx] np_pose_9d = np.array(pose_9d) cam_to_world_mat = np.eye(4) cam_to_world_mat[:3,:3] = rotation_6d_to_matrix_numpy(np_pose_9d[:6]) cam_to_world_mat[:3, 3] = np_pose_9d[6:] frame_data['cam_to_world'] = cam_to_world_mat.tolist() target_pts = inference_result["target_pts_seq"][idx] coverage_rate = inference_result["coverage_rate_seq"][idx] frame_data['new_point_cloud'] = target_pts.tolist() frame_data['combined_point_cloud'] = combined_point_cloud.tolist() new_added_pts = ReconstructionUtil.get_new_added_points(combined_point_cloud, target_pts) frame_data["new_added_pts"] = new_added_pts.tolist() combined_point_cloud = np.concatenate([combined_point_cloud, target_pts], axis=0) frame_data["coverage_rate"] = float(coverage_rate) delta_CR = frame_data["coverage_rate"] - last_CR frame_data["delta_CR"] = round(delta_CR,2) last_CR = frame_data["coverage_rate"] result.append({ "frame_id": idx, "data": frame_data }) return result @app.route('/analysis_inference_result', methods=['POST']) def analysis_inference_result(): res = {"success": True} if 'file' not in request.files: res["success"] = False res["message"] = "No file part" return jsonify(res) file = request.files['file'] if file.filename == '': res["success"] = False res["message"] = "No selected file" return jsonify(res) try: data = pickle.load(file) except Exception as e: res["success"] = False res["message"] = f"File processing error: {e}" return jsonify(res) result = parse_to_frame_data(data) scene_name = data["scene_name"] res["seq_frame_data"] = result res["retry_no_pts_pose"] = data["retry_no_pts_pose"] res["retry_duplication_pose"] = data["retry_duplication_pose"] dataset_name = "sample" obj_path = os.path.join(dataset_name, scene_name, 'mesh', 'world_target_mesh.obj') mtl_path = os.path.join(dataset_name, scene_name, 'mesh', 'material.mtl') res["obj_path"] = obj_path res["mtl_path"] = mtl_path res["max_coverage_rate"] = round(data["max_coverage_rate"]*100, 3) res["scene_name"] = scene_name res["pred_max_coverage_rate"] = round(data["pred_max_coverage_rate"]*100, 3) res["best_seq_len"] = data["best_seq_len"] return jsonify(res) if __name__ == '__main__': app.run(debug=True, port=13333,host="0.0.0.0")