286 lines
11 KiB
Python
286 lines
11 KiB
Python
from flask import Flask, request, jsonify, send_from_directory
|
|
import os
|
|
import json
|
|
import base64
|
|
import numpy as np
|
|
import pickle
|
|
from flask_cors import CORS
|
|
from data_load import DataLoadUtil
|
|
from reconstruction import ReconstructionUtil
|
|
from pts import PtsUtil
|
|
|
|
ROOT = os.path.join("./static")
|
|
print(ROOT)
|
|
app = Flask(__name__, static_folder="static")
|
|
CORS(app)
|
|
|
|
@app.route("/")
|
|
def serve_index():
|
|
return send_from_directory(app.static_folder, "index.html")
|
|
|
|
@app.route('/<path:filename>')
|
|
def serve_static(filename):
|
|
return send_from_directory(app.static_folder, filename)
|
|
|
|
|
|
@app.route('/get_scene_list', methods=['POST'])
|
|
def get_scene_list():
|
|
data = request.json
|
|
dataset_name = data.get('dataset_name')
|
|
|
|
dataset_path = os.path.join(ROOT, dataset_name)
|
|
|
|
if not os.path.exists(dataset_path):
|
|
print(f"Dataset not found: {dataset_path}")
|
|
return jsonify({"error": "Dataset not found"}), 404
|
|
|
|
scene_list = [d for d in os.listdir(dataset_path) if os.path.isdir(os.path.join(dataset_path, d))]
|
|
return jsonify({"scene_list": scene_list, "success": True})
|
|
|
|
@app.route('/get_label_list', methods=['POST'])
|
|
def get_label_list():
|
|
data = request.json
|
|
dataset_name = data.get('dataset_name')
|
|
scene_name = data.get("scene_name")
|
|
|
|
scene_dir = os.path.join(ROOT, dataset_name, scene_name)
|
|
label_dir = os.path.join(scene_dir, "label")
|
|
|
|
if not os.path.exists(scene_dir):
|
|
print(f"Scene not found: {scene_dir}")
|
|
return jsonify({"error": "Scene not found"}), 404
|
|
label_list = []
|
|
global_min_coverage_rate = 1
|
|
global_max_coverage_rate = 0
|
|
global_total_coverage_rate = 0
|
|
for label_file in os.listdir(label_dir):
|
|
if label_file.endswith(".json"):
|
|
label_path = os.path.join(label_dir, label_file)
|
|
with open(label_path, 'r') as f:
|
|
label_data = json.load(f)
|
|
max_coveraget_rate = label_data.get('max_coverage_rate')
|
|
if max_coveraget_rate > global_max_coverage_rate:
|
|
global_max_coverage_rate = max_coveraget_rate
|
|
if max_coveraget_rate < global_min_coverage_rate:
|
|
global_min_coverage_rate = max_coveraget_rate
|
|
label_list.append({
|
|
"label_name": label_file,
|
|
"max_coverage_rate": round(max_coveraget_rate*100,3)
|
|
})
|
|
global_total_coverage_rate += max_coveraget_rate
|
|
if len(label_list) == 0:
|
|
global_mean_coverage_rate = 0
|
|
else:
|
|
global_mean_coverage_rate = global_total_coverage_rate / len(label_list)
|
|
|
|
return jsonify({"label_list": label_list,
|
|
"total_max_coverage_rate": round(global_max_coverage_rate*100, 3),
|
|
"total_min_coverage_rate": round(global_min_coverage_rate*100, 3),
|
|
"total_mean_coverage_rate": round(global_mean_coverage_rate*100, 3),
|
|
"success": True})
|
|
|
|
|
|
@app.route('/get_scene_info', methods=['POST'])
|
|
def get_scene_info():
|
|
data = request.json
|
|
dataset_name = data.get('dataset_name')
|
|
scene_name = data.get('scene_name')
|
|
label_name = data.get('label_name')
|
|
|
|
scene_path = os.path.join(ROOT, dataset_name, scene_name)
|
|
camera_params_path = os.path.join(scene_path, 'camera_params')
|
|
label_json_path = os.path.join(scene_path, "label", label_name)
|
|
|
|
if not os.path.exists(scene_path) or not os.path.exists(label_json_path):
|
|
|
|
return jsonify({"error": "Scene or label.json not found"}), 404
|
|
|
|
with open(label_json_path, 'r') as f:
|
|
label_data = json.load(f)
|
|
|
|
sequence_length = len([f for f in os.listdir(camera_params_path) if os.path.isfile(os.path.join(camera_params_path, f))])
|
|
|
|
max_coverage_rate = label_data.get('max_coverage_rate')
|
|
best_sequence = label_data.get('best_sequence')
|
|
best_sequence_length = len(best_sequence)
|
|
best_sequence_formatted = []
|
|
for i in range(best_sequence_length):
|
|
best_sequence_formatted.append(
|
|
{
|
|
"frame": best_sequence[i][0],
|
|
"coverage_rate": round(best_sequence[i][1]*100,1)
|
|
}
|
|
)
|
|
return jsonify({
|
|
"sequence_length": sequence_length,
|
|
"max_coverage_rate": round(max_coverage_rate*100,2),
|
|
"best_sequence_length": best_sequence_length,
|
|
"best_sequence": best_sequence_formatted,
|
|
"success": True
|
|
})
|
|
|
|
def read_image_as_base64(file_path):
|
|
try:
|
|
with open(file_path, 'rb') as image_file:
|
|
encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
|
|
return encoded_string
|
|
except FileNotFoundError:
|
|
return None
|
|
|
|
|
|
@app.route('/get_frame_data', methods=['POST'])
|
|
def get_frame_data():
|
|
data = request.json
|
|
dataset_name = data.get('dataset_name')
|
|
scene_name = data.get('scene_name')
|
|
sequence = data.get('sequence')
|
|
|
|
scene_path = os.path.join(ROOT, dataset_name, scene_name)
|
|
root = os.path.join(ROOT, dataset_name)
|
|
camera_params_path = os.path.join(scene_path, 'camera_params')
|
|
depth_path = os.path.join(scene_path, 'depth')
|
|
mask_path = os.path.join(scene_path, 'mask')
|
|
voxel_threshold = 0.005
|
|
# model_points_normals = DataLoadUtil.load_points_normals(ROOT, scene_name)
|
|
# model_pts = model_points_normals[:,:3]
|
|
# down_sampled_model_pts = PtsUtil.voxel_downsample_point_cloud(model_pts, )
|
|
model_points_normals = DataLoadUtil.load_points_normals(root, scene_name)
|
|
model_points = model_points_normals[:, :3]
|
|
|
|
obj_path = os.path.join(dataset_name, scene_name, 'mesh', 'world_target_mesh.obj')
|
|
mtl_path = os.path.join(dataset_name, scene_name, 'mesh', 'material.mtl')
|
|
|
|
|
|
if not all([os.path.exists(scene_path), os.path.exists(camera_params_path), os.path.exists(depth_path), os.path.exists(mask_path)]):
|
|
return jsonify({"error": "Invalid paths or files not found"}), 404
|
|
|
|
result = []
|
|
combined_point_cloud = np.zeros((0, 3))
|
|
last_CR = 0
|
|
for frame_info in sequence:
|
|
frame_id = frame_info.get('frame')
|
|
frame_data = {}
|
|
path = DataLoadUtil.get_path(root, scene_name, frame_id)
|
|
|
|
cam_params = DataLoadUtil.load_cam_info(path, binocular=True)
|
|
|
|
frame_data['cam_to_world'] = cam_params['cam_to_world'].tolist()
|
|
|
|
depth_file = os.path.join(depth_path, f'{frame_id}_L.png')
|
|
depth_base64 = read_image_as_base64(depth_file)
|
|
frame_data['depth'] = depth_base64 if depth_base64 else None
|
|
|
|
mask_file = os.path.join(mask_path, f'{frame_id}_L.png')
|
|
mask_base64 = read_image_as_base64(mask_file)
|
|
frame_data['mask'] = mask_base64 if mask_base64 else None
|
|
|
|
|
|
point_cloud = DataLoadUtil.get_target_point_cloud_world_from_path(path, binocular=True)
|
|
sampled_point_cloud = ReconstructionUtil.filter_points(point_cloud, model_points_normals, cam_params['cam_to_world'], voxel_size=voxel_threshold, theta=75)
|
|
#sampled_point_cloud = point_cloud
|
|
sampled_point_cloud = PtsUtil.voxel_downsample_point_cloud(sampled_point_cloud, voxel_threshold)
|
|
|
|
frame_data['new_point_cloud'] = sampled_point_cloud.tolist()
|
|
frame_data['combined_point_cloud'] = combined_point_cloud.tolist()
|
|
new_added_pts = ReconstructionUtil.get_new_added_points(combined_point_cloud, sampled_point_cloud, threshold=voxel_threshold)
|
|
frame_data["new_added_pts"] = new_added_pts.tolist()
|
|
|
|
combined_point_cloud = np.concatenate([combined_point_cloud, sampled_point_cloud], axis=0)
|
|
|
|
combined_point_cloud = PtsUtil.voxel_downsample_point_cloud(combined_point_cloud, voxel_threshold)
|
|
|
|
frame_data["coverage_rate"] = frame_info.get('coverage_rate')
|
|
delta_CR = frame_data["coverage_rate"] - last_CR
|
|
frame_data["delta_CR"] = round(delta_CR,2)
|
|
last_CR = frame_data["coverage_rate"]
|
|
|
|
result.append({
|
|
"frame_id": frame_id,
|
|
"data": frame_data
|
|
})
|
|
|
|
return jsonify({"seq_frame_data": result,"model_pts":model_points.tolist(), "obj_path": obj_path, "mtl_path":mtl_path, "success": True})
|
|
|
|
|
|
def rotation_6d_to_matrix_numpy(d6):
|
|
a1, a2 = d6[:3], d6[3:]
|
|
b1 = a1 / np.linalg.norm(a1)
|
|
b2 = a2 - np.dot(b1, a2) * b1
|
|
b2 = b2 / np.linalg.norm(b2)
|
|
b3 = np.cross(b1, b2)
|
|
return np.stack((b1, b2, b3), axis=-2)
|
|
|
|
def parse_to_frame_data(inference_result):
|
|
result = []
|
|
combined_point_cloud = np.zeros((0, 3))
|
|
last_CR = 0
|
|
for idx in range(len(inference_result["pts_seq"])):
|
|
frame_data ={}
|
|
pose_9d = inference_result["pred_pose_9d_seq"][idx]
|
|
np_pose_9d = np.array(pose_9d)
|
|
cam_to_world_mat = np.eye(4)
|
|
cam_to_world_mat[:3,:3] = rotation_6d_to_matrix_numpy(np_pose_9d[:6])
|
|
cam_to_world_mat[:3, 3] = np_pose_9d[6:]
|
|
frame_data['cam_to_world'] = cam_to_world_mat.tolist()
|
|
target_pts = inference_result["target_pts_seq"][idx]
|
|
coverage_rate = inference_result["coverage_rate_seq"][idx]
|
|
frame_data['new_point_cloud'] = target_pts.tolist()
|
|
frame_data['combined_point_cloud'] = combined_point_cloud.tolist()
|
|
new_added_pts = ReconstructionUtil.get_new_added_points(combined_point_cloud, target_pts)
|
|
frame_data["new_added_pts"] = new_added_pts.tolist()
|
|
combined_point_cloud = np.concatenate([combined_point_cloud, target_pts], axis=0)
|
|
frame_data["coverage_rate"] = float(coverage_rate)
|
|
delta_CR = frame_data["coverage_rate"] - last_CR
|
|
frame_data["delta_CR"] = round(delta_CR,2)
|
|
last_CR = frame_data["coverage_rate"]
|
|
|
|
|
|
result.append({
|
|
"frame_id": idx,
|
|
"data": frame_data
|
|
})
|
|
return result
|
|
|
|
|
|
@app.route('/analysis_inference_result', methods=['POST'])
|
|
def analysis_inference_result():
|
|
res = {"success": True}
|
|
if 'file' not in request.files:
|
|
res["success"] = False
|
|
res["message"] = "No file part"
|
|
return jsonify(res)
|
|
|
|
file = request.files['file']
|
|
if file.filename == '':
|
|
res["success"] = False
|
|
res["message"] = "No selected file"
|
|
return jsonify(res)
|
|
|
|
try:
|
|
data = pickle.load(file)
|
|
except Exception as e:
|
|
res["success"] = False
|
|
res["message"] = f"File processing error: {e}"
|
|
return jsonify(res)
|
|
|
|
result = parse_to_frame_data(data)
|
|
|
|
scene_name = data["scene_name"]
|
|
res["seq_frame_data"] = result
|
|
res["retry_no_pts_pose"] = data["retry_no_pts_pose"]
|
|
res["retry_duplication_pose"] = data["retry_duplication_pose"]
|
|
dataset_name = "sample"
|
|
obj_path = os.path.join(dataset_name, scene_name, 'mesh', 'world_target_mesh.obj')
|
|
mtl_path = os.path.join(dataset_name, scene_name, 'mesh', 'material.mtl')
|
|
res["obj_path"] = obj_path
|
|
res["mtl_path"] = mtl_path
|
|
res["max_coverage_rate"] = round(data["max_coverage_rate"]*100, 3)
|
|
res["scene_name"] = scene_name
|
|
res["pred_max_coverage_rate"] = round(data["pred_max_coverage_rate"]*100, 3)
|
|
res["best_seq_len"] = data["best_seq_len"]
|
|
|
|
return jsonify(res)
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug=True, port=13333,host="0.0.0.0")
|