import os import random import math import bpy import numpy as np import mathutils import requests from utils.blender_util import BlenderUtils from utils.view_sample_util import ViewSampleUtil from utils.material_util import MaterialUtil class DataGenerator: def __init__(self, config): self.plane_size = config["runner"]["generate"]["plane_size"] self.table_model_path = config["runner"]["generate"]["table_model_path"] self.output_dir = config["runner"]["generate"]["output_dir"] self.random_config = config["runner"]["generate"]["random_config"] self.light_and_camera_config = config["runner"]["generate"]["light_and_camera_config"] self.obj_dir = config["runner"]["generate"]["object_dir"] self.use_list = config["runner"]["generate"]["use_list"] self.object_list_path = config["runner"]["generate"]["object_list_path"] self.max_views = config["runner"]["generate"]["max_views"] self.min_views = config["runner"]["generate"]["min_views"] self.min_diag = config["runner"]["generate"]["min_diag"] self.max_diag = config["runner"]["generate"]["max_diag"] self.min_cam_table_included_degree = config["runner"]["generate"]["min_cam_table_included_degree"] self.random_view_ratio = config["runner"]["generate"]["random_view_ratio"] self.binocular_vision = config["runner"]["generate"]["binocular_vision"] self.port = config["runner"]["generate"]["port"] self.from_idx = config["runner"]["generate"]["from"] self.to_idx = config["runner"]["generate"]["to"] self.set_status_path = f"http://localhost:{self.port}/project/set_status" self.log_path = f"http://localhost:{self.port}/project/add_log" self.origin_obj_name_list = os.listdir(self.obj_dir)[self.from_idx: self.to_idx] if not os.path.exists(self.output_dir): os.makedirs(self.output_dir) self.obj_name_list = [] if self.use_list: self.target_name_list = [line.strip() for line in open(self.object_list_path).readlines()] for obj_name in self.target_name_list: if obj_name in self.origin_obj_name_list: self.obj_name_list.append(obj_name) else: self.obj_name_list = self.origin_obj_name_list self.target_obj = None self.stopped = False self.random_obj_list = [] self.display_table_config = {} BlenderUtils.setup_scene(self.light_and_camera_config, self.table_model_path, self.binocular_vision) self.table = BlenderUtils.get_obj(BlenderUtils.TABLE_NAME) self.access = self._check_set_status_access(self.set_status_path) print(self.access) def _check_set_status_access(self, url): try: response = requests.get(url, timeout=5) return True except requests.RequestException as e: print(f"Cannot access {url}: {e}") return False def set_status(self, key, value): if not self.access: return request_data = {} request_data["status"] = { "app_name" : "generate_view", "runner_name" : "view_generator", "key": key, "value": value } requests.post(self.set_status_path, json=request_data) def set_progress(self, key, curr_value, max_value): if not self.access: return request_data = {} request_data["progress"] = { "app_name" : "generate_view", "runner_name" : "view_generator", "key": key, "curr_value": curr_value, "max_value": max_value } requests.post(self.set_status_path, json=request_data) def add_log(self, msg, log_type): if not self.access: print(f"[{log_type}] {msg}") return request_data = {"log":{}} request_data["log"]["message"] = msg request_data["log"]["log_type"] = log_type requests.post(self.log_path, json=request_data) def generate_display_platform(self): config = self.random_config[BlenderUtils.DISPLAY_TABLE_NAME] height = random.uniform(config["min_height"], config["max_height"]) radius = random.uniform(config["min_radius"], config["max_radius"]) while height > 0.5 * radius: height = random.uniform(config["min_height"], config["max_height"]) bpy.ops.mesh.primitive_cylinder_add(radius=radius, depth=height) platform = bpy.context.selected_objects[-1] platform.name = BlenderUtils.DISPLAY_TABLE_NAME bbox = self.table.bound_box bbox_world = [self.table.matrix_world @ mathutils.Vector(corner) for corner in bbox] table_top_z = max([v.z for v in bbox_world]) platform.location = (0, 0, table_top_z + height / 2) bpy.ops.rigidbody.object_add() bpy.context.object.rigid_body.type = 'PASSIVE' MaterialUtil.change_object_material(platform, MaterialUtil.create_mask_material(color=(1.0, 0, 0))) self.display_table_config = { "height": height, "radius": radius, "location": list(platform.location) } return platform def put_display_object(self, name): config = self.random_config["display_object"] x = random.uniform(config["min_x"], config["max_x"]) y = random.uniform(config["min_y"], config["max_y"]) z = random.uniform(config["min_z"], config["max_z"]) if random.random() <= config["random_rotation_ratio"]: rotation = ( random.uniform(0, 2 * np.pi), random.uniform(0, 2 * np.pi), random.uniform(0, 2 * np.pi) ) else: rotation = (0, 0, 0) z = 0.05 platform_bbox = self.platform.bound_box platform_bbox_world = [self.platform.matrix_world @ mathutils.Vector(corner) for corner in platform_bbox] platform_top_z = max([v.z for v in platform_bbox_world]) obj_mesh_path = BlenderUtils.get_obj_path(self.obj_dir, name) obj = BlenderUtils.load_obj(name, obj_mesh_path) obj_bottom_z = BlenderUtils.get_object_bottom_z(obj) offset_z = obj_bottom_z obj.rotation_euler = rotation obj.location = (x, y, platform_top_z - offset_z + z) bpy.ops.rigidbody.object_add() bpy.context.object.rigid_body.type = 'ACTIVE' MaterialUtil.change_object_material(obj, MaterialUtil.create_mask_material(color=(0, 1.0, 0))) self.target_obj = obj def reset(self): self.target_obj = None self.random_obj_list = [] BlenderUtils.reset_objects_and_platform() def check_moving_objects(self, previous_locations): threshold = 0.01 moving_objects = False target_checking_object = [self.target_obj] + self.random_obj_list for obj in target_checking_object: if obj.rigid_body: current_location = obj.location location_diff = (current_location - previous_locations[obj.name]).length if location_diff > threshold: moving_objects = True break return moving_objects def check_and_adjust_target(self): target_position = self.target_obj.matrix_world.translation msg = "success" if abs(target_position[0]) > self.random_config["display_object"]["max_x"]: target_position[0] = np.sign(target_position[0]) * self.random_config["display_object"]["max_x"]*random.uniform(-0.5,0.5) msg = "adjusted" if abs(target_position[1]) > self.random_config["display_object"]["max_y"]: target_position[1] = np.sign(target_position[1]) * self.random_config["display_object"]["max_y"]*random.uniform(-0.5,0.5) msg = "adjusted" if target_position[2] < 0.85: target_position[2] = target_position[2] + 0.1 msg = "adjusted" self.target_obj.location = target_position return msg def start_render(self, diag=0): object_name = self.target_obj.name if "." in object_name: object_name = object_name.split(".")[0] scene_dir = os.path.join(self.output_dir, object_name) if not os.path.exists(scene_dir): os.makedirs(scene_dir) view_num = int(self.min_views + (diag - self.min_diag)/(self.max_diag - self.min_diag) * (self.max_views - self.min_views)) view_data = ViewSampleUtil.sample_view_data_world_space(self.target_obj, distance_range=(0.25,0.5), voxel_size=0.005, max_views=view_num, min_cam_table_included_degree = self.min_cam_table_included_degree, random_view_ratio = self.random_view_ratio ) object_points = np.array(view_data["voxel_down_sampled_points"]) normals = np.array(view_data["normals"]) points_normals = np.concatenate((object_points, normals), axis=1) np.savetxt(os.path.join(scene_dir, "points_and_normals.txt"), points_normals) for i, cam_pose in enumerate(view_data["cam_poses"]): BlenderUtils.set_camera_at(cam_pose) BlenderUtils.render_mask(scene_dir, f"{i}", binocular_vision=self.binocular_vision, target_object = self.target_obj) BlenderUtils.save_cam_params(scene_dir, i, binocular_vision=self.binocular_vision) self.set_progress("render frame", i, len(view_data["cam_poses"])) self.set_progress("render frame", len(view_data["cam_poses"]), len(view_data["cam_poses"])) BlenderUtils.save_scene_info(scene_dir, self.display_table_config, object_name) MaterialUtil.change_object_material(self.target_obj, MaterialUtil.create_normal_material()) for i, cam_pose in enumerate(view_data["cam_poses"]): BlenderUtils.set_camera_at(cam_pose) BlenderUtils.render_normal_and_depth(scene_dir, f"{i}", binocular_vision=self.binocular_vision, target_object = self.target_obj) BlenderUtils.save_cam_params(scene_dir, i, binocular_vision=self.binocular_vision) self.set_progress("render normal frame", i, len(view_data["cam_poses"])) self.set_progress("render normal frame", len(view_data["cam_poses"]), len(view_data["cam_poses"])) depth_dir = os.path.join(scene_dir, "depth") for depth_file in os.listdir(depth_dir): if not depth_file.endswith(".png"): name, _ = os.path.splitext(depth_file) file_path = os.path.join(depth_dir, depth_file) new_file_path = os.path.join(depth_dir, f"{name}.png") os.rename(file_path,new_file_path) return True def simulate_scene(self, frame_limit=120, depth = 0, diag = 0): bpy.context.view_layer.update() bpy.ops.screen.animation_play() previous_locations = {obj.name: obj.matrix_world.translation.copy() for obj in bpy.context.scene.objects if obj.rigid_body} frame_count = 1 moving_objects = True while frame_count < frame_limit: bpy.context.view_layer.update() if frame_count%10 == 0: moving_objects = self.check_moving_objects(previous_locations) if not moving_objects: break frame_count += 1 bpy.context.scene.frame_set(bpy.context.scene.frame_current + 1) previous_locations = {obj.name: obj.matrix_world.translation.copy() for obj in bpy.context.scene.objects if obj.rigid_body} bpy.ops.screen.animation_cancel(restore_frame=False) msg = self.check_and_adjust_target() if msg == "success": print("Scene generation completed.") result = self.start_render(diag=diag) if not result: msg = "fail" return msg return "retry" def gen_scene_data(self, object_name): bpy.context.scene.frame_set(0) self.platform = self.generate_display_platform() self.put_display_object(object_name) diag = BlenderUtils.get_obj_diag(self.target_obj.name) self.set_status("target_diagonal", diag) if diag > self.max_diag or diag < self.min_diag: self.add_log(f"The diagonal size of the object <{object_name}>(size: {round(diag,3)}) does not meet the requirements.", "error") return "diag_error" return self.simulate_scene(diag=diag) def gen_all_scene_data(self): max_retry_times = 5 total = len(self.obj_name_list) count = 0 count_success = 0 self.set_progress("generate scene", 0, total) result = "retry" print(f"Generating scene for {total} objects") for target_obj_name in self.obj_name_list: self.add_log(f"Generating scene for object <{target_obj_name}>", "info") scene_info_path = os.path.join(self.output_dir, target_obj_name, "scene_info.json") if os.path.exists(scene_info_path): self.add_log(f"Scene for object <{target_obj_name}> already exists, skipping", "warning") count += 1 count_success += 1 continue retry_times = 0 self.set_status("target_object", target_obj_name) while retry_times < max_retry_times and result == "retry": self.reset() try: result = self.gen_scene_data(target_obj_name) except Exception as e: self.add_log(f"Uknown error: {e}", "error") result = "unknown_error" if result == "retry": retry_times += 1 self.add_log(f"Maximum adjust times, retrying <{target_obj_name}>. ({retry_times}/{max_retry_times}) ", "warning") count += 1 if result == "success": count_success += 1 self.add_log(f"Scene for object <{target_obj_name}> generated successfully", "success") if result == "retry" and retry_times >= max_retry_times: self.add_log(f"Maximum retries, failed to generate scene for object <{target_obj_name}>", "error") self.set_status("success", count_success) self.set_status("fail", count - count_success) self.set_progress("generate scene", count, total) result = "retry"