This commit is contained in:
hofee 2024-09-08 19:43:01 +08:00
parent b5d44b153a
commit 38f7f8df18
10 changed files with 142 additions and 847 deletions

View File

@ -1,322 +0,0 @@
import os
import json
import bpy
import gc
import numpy as np
import mathutils
class BlenderUtils:
TABLE_NAME: str = "table"
CAMERA_NAME: str = "Camera"
CAMERA_RIGHT_NAME: str = "CameraRight"
CAMERA_OBJECT_NAME: str = "CameraObject"
LIGHT_NAME: str = "Light"
DISPLAY_TABLE_NAME: str = "display_table"
MESH_FILE_NAME: str = "mesh.obj"
@staticmethod
def get_obj_path(obj_dir, name):
return os.path.join(obj_dir, name, BlenderUtils.MESH_FILE_NAME)
@staticmethod
def load_obj(name, mesh_path, scale=1):
bpy.ops.wm.obj_import(filepath=mesh_path)
loaded_object = bpy.context.selected_objects[-1]
loaded_object.name = name
loaded_object.data.name = name
loaded_object.scale = (scale, scale, scale)
bpy.ops.rigidbody.object_add()
return loaded_object
@staticmethod
def get_obj(name):
return bpy.data.objects.get(name)
@staticmethod
def set_obj_at(name, pose):
pass
@staticmethod
def get_obj_pose(name):
obj = BlenderUtils.get_obj(name)
return np.asarray(obj.matrix_world)
@staticmethod
def add_plane(name, location, orientation, size=10):
bpy.ops.mesh.primitive_plane_add(size=size,location=location)
plane = bpy.context.selected_objects[-1]
plane.name = name
plane.rotation_euler = orientation
bpy.ops.rigidbody.object_add()
bpy.context.object.rigid_body.type = 'PASSIVE'
@staticmethod
def add_table(table_model_path):
table = BlenderUtils.load_obj(BlenderUtils.TABLE_NAME, table_model_path, scale=0.01)
bpy.ops.rigidbody.object_add()
bpy.context.object.rigid_body.type = 'PASSIVE'
mat = bpy.data.materials.new(name="TableYellowMaterial")
mat.diffuse_color = (1.0, 1.0, 0.0, 1.0)
if len(table.data.materials) > 0:
table.data.materials[0] = mat
else:
table.data.materials.append(mat)
@staticmethod
def setup_scene(init_light_and_camera_config, table_model_path, binocular_vision):
BlenderUtils.init_light_and_camera(init_light_and_camera_config, binocular_vision)
BlenderUtils.add_plane("plane_floor", location=(0,0,0), orientation=(0,0,0))
BlenderUtils.add_plane("plane_ceil", location=(0,0,10), orientation=(0,0,0))
BlenderUtils.add_plane("plane_wall_1", location=(5,0,5), orientation=(0,np.pi/2,0))
BlenderUtils.add_plane("plane_wall_2", location=(-5,0,5), orientation=(0,np.pi/2,0))
BlenderUtils.add_plane("plane_wall_3", location=(0,5,5), orientation=(np.pi/2,0,0))
BlenderUtils.add_plane("plane_wall_4", location=(0,-5,5), orientation=(np.pi/2,0,0))
BlenderUtils.add_table(table_model_path)
@staticmethod
def set_light_params(light, config):
light.location = config["location"]
light.rotation_euler = config["orientation"]
if light.type == 'SUN':
light.data.energy = config["power"]
elif light.type == 'POINT':
light.data.energy = config["power"]
@staticmethod
def set_camera_params(camera, config, binocular_vision):
camera_object = bpy.data.objects.new(BlenderUtils.CAMERA_OBJECT_NAME, None)
bpy.context.collection.objects.link(camera_object)
cameras = [bpy.data.objects.get("Camera")]
camera.location = [0,0,0]
camera.rotation_euler = [0,0,0]
camera.parent = camera_object
if binocular_vision:
left_camera = cameras[0]
right_camera = left_camera.copy()
right_camera.name = BlenderUtils.CAMERA_RIGHT_NAME
right_camera.data = left_camera.data.copy()
right_camera.data.name = BlenderUtils.CAMERA_RIGHT_NAME
bpy.context.collection.objects.link(right_camera)
right_camera.parent = camera_object
right_camera.location = [config["eye_distance"]/2, 0, 0]
left_camera.location = [-config["eye_distance"]/2, 0, 0]
cameras.append(right_camera)
for camera in cameras:
camera.data.clip_start = config["near_plane"]
camera.data.clip_end = config["far_plane"]
bpy.context.scene.render.resolution_x = config["resolution"][0]
bpy.context.scene.render.resolution_y = config["resolution"][1]
sensor_height = 24.0
focal_length = sensor_height / (2 * np.tan(np.radians(config["fov_vertical"]) / 2))
camera.data.lens = focal_length
camera.data.sensor_width = sensor_height * config["resolution"][0] / config["resolution"][1]
camera.data.sensor_height = sensor_height
@staticmethod
def init_light_and_camera(init_light_and_camera_config, binocular_vision):
camera = BlenderUtils.get_obj(BlenderUtils.CAMERA_NAME)
light = BlenderUtils.get_obj(BlenderUtils.LIGHT_NAME)
BlenderUtils.set_camera_params(camera, init_light_and_camera_config[BlenderUtils.CAMERA_NAME], binocular_vision)
BlenderUtils.set_light_params(light, init_light_and_camera_config[BlenderUtils.LIGHT_NAME])
@staticmethod
def get_obj_diag(name):
obj = BlenderUtils.get_obj(name)
return np.linalg.norm(obj.dimensions)
@staticmethod
def matrix_to_blender_pose(matrix):
location = matrix[:3, 3]
rotation_matrix = matrix[:3, :3]
rotation_matrix_blender = mathutils.Matrix(rotation_matrix.tolist())
rotation_euler = rotation_matrix_blender.to_euler()
return location, rotation_euler
@staticmethod
def set_camera_at(pose):
camera = BlenderUtils.get_obj(BlenderUtils.CAMERA_OBJECT_NAME)
location, rotation_euler = BlenderUtils.matrix_to_blender_pose(pose)
camera.location = location
camera.rotation_euler = rotation_euler
@staticmethod
def get_object_bottom_z(obj):
vertices = [v.co for v in obj.data.vertices]
vertices_world = [obj.matrix_world @ v for v in vertices]
min_z = min([v.z for v in vertices_world])
return min_z
@staticmethod
def render_and_save(output_dir, file_name, target_name, frame_num="0120", binocular_vision=False, render_rgb=False):
target_cameras = [BlenderUtils.CAMERA_NAME]
if binocular_vision:
target_cameras.append(BlenderUtils.CAMERA_RIGHT_NAME)
for cam_name in target_cameras:
# Set the current camera
bpy.context.scene.camera = BlenderUtils.get_obj(cam_name)
bpy.context.scene.view_layers["ViewLayer"].use_pass_z = True
cam_suffix = "L" if cam_name == BlenderUtils.CAMERA_NAME else "R"
scene = bpy.context.scene
scene.render.filepath = ""
if render_rgb:
rgb_dir = os.path.join(output_dir, "rgb")
if not os.path.exists(rgb_dir):
os.makedirs(rgb_dir)
# Modify the file name based on the camera
scene.render.filepath = os.path.join(output_dir, rgb_dir, f"{file_name}_{cam_suffix}.png")
scene.render.image_settings.color_depth = '16'
scene.render.resolution_percentage = 100
scene.render.use_overwrite = False
scene.render.use_file_extension = False
scene.render.use_placeholder = False
scene.use_nodes = True
tree = scene.node_tree
for node in tree.nodes:
tree.nodes.remove(node)
rl = tree.nodes.new('CompositorNodeRLayers')
map_range = tree.nodes.new('CompositorNodeMapRange')
map_range.inputs['From Min'].default_value = 0.01
map_range.inputs['From Max'].default_value = 5
map_range.inputs['To Min'].default_value = 0
map_range.inputs['To Max'].default_value = 1
tree.links.new(rl.outputs['Depth'], map_range.inputs[0])
output_depth = tree.nodes.new('CompositorNodeOutputFile')
depth_dir = os.path.join(output_dir, "depth")
if not os.path.exists(depth_dir):
os.makedirs(depth_dir)
output_depth.base_path = depth_dir
output_depth.file_slots[0].path = f"{file_name}_{cam_suffix}.####"
output_depth.format.file_format = 'PNG'
output_depth.format.color_mode = 'BW'
output_depth.format.color_depth = '16'
tree.links.new(map_range.outputs[0], output_depth.inputs[0])
bpy.context.scene.view_layers["ViewLayer"].use_pass_cryptomatte_object = True
crypto_node = scene.node_tree.nodes.new("CompositorNodeCryptomatteV2")
crypto_node.matte_id = target_name
output_mask = scene.node_tree.nodes.new("CompositorNodeOutputFile")
mask_dir = os.path.join(output_dir, "mask")
if not os.path.exists(mask_dir):
os.makedirs(mask_dir)
output_mask.base_path = mask_dir
output_mask.file_slots[0].path = f"{file_name}_{cam_suffix}.####"
output_mask.format.file_format = 'PNG'
output_mask.format.color_mode = 'RGB'
output_mask.format.color_depth = '8'
scene.node_tree.links.new(crypto_node.outputs[1], output_mask.inputs[0])
bpy.ops.render.render(write_still=True)
os.rename(os.path.join(depth_dir, f"{file_name}_{cam_suffix}.{frame_num}.png"), os.path.join(depth_dir, f"{file_name}_{cam_suffix}.png"))
os.rename(os.path.join(mask_dir, f"{file_name}_{cam_suffix}.{frame_num}.png"), os.path.join(mask_dir, f"{file_name}_{cam_suffix}.png"))
@staticmethod
def save_cam_params(scene_dir, idx, binocular_vision=False):
camera = BlenderUtils.get_obj(BlenderUtils.CAMERA_NAME)
extrinsic = np.array(camera.matrix_world @ camera.matrix_local)
cam_data = camera.data
focal_length = cam_data.lens
sensor_width = cam_data.sensor_width
sensor_height = cam_data.sensor_height
resolution_x = bpy.context.scene.render.resolution_x
resolution_y = bpy.context.scene.render.resolution_y
intrinsic = np.zeros((3, 3))
intrinsic[0, 0] = focal_length * resolution_x / sensor_width # fx
intrinsic[1, 1] = focal_length * resolution_y / sensor_height # fy
intrinsic[0, 2] = resolution_x / 2.0 # cx
intrinsic[1, 2] = resolution_y / 2.0 # cy
intrinsic[2, 2] = 1.0
cam_object = BlenderUtils.get_obj(BlenderUtils.CAMERA_OBJECT_NAME)
extrinsic_cam_object = np.array(cam_object.matrix_world)
data = {
"extrinsic": extrinsic.tolist(),
"extrinsic_cam_object": extrinsic_cam_object.tolist(),
"intrinsic": intrinsic.tolist(),
"far_plane": camera.data.clip_end,
"near_plane": camera.data.clip_start,
}
if binocular_vision:
right_camera = BlenderUtils.get_obj(BlenderUtils.CAMERA_RIGHT_NAME)
extrinsic_right = np.array(right_camera.matrix_world @ right_camera.matrix_local)
data["extrinsic_R"] = extrinsic_right.tolist()
cam_params_dir = os.path.join(scene_dir, "camera_params")
if not os.path.exists(cam_params_dir):
os.makedirs(cam_params_dir)
cam_params_path = os.path.join(cam_params_dir, f"{idx}.json")
with open(cam_params_path, "w") as f:
json.dump(data, f, indent=4)
@staticmethod
def reset_objects_and_platform():
all_objects = bpy.data.objects
keep_objects = {"plane_floor", "plane_ceil", "plane_wall_1", "plane_wall_2", "plane_wall_3", "plane_wall_4"}
keep_objects.add(BlenderUtils.CAMERA_OBJECT_NAME)
keep_objects.add(BlenderUtils.CAMERA_NAME)
keep_objects.add(BlenderUtils.CAMERA_RIGHT_NAME)
keep_objects.add(BlenderUtils.LIGHT_NAME)
keep_objects.add(BlenderUtils.TABLE_NAME)
for obj in all_objects:
if obj.name not in keep_objects:
bpy.data.objects.remove(obj, do_unlink=True)
for block in bpy.data.meshes:
if block.users == 0:
bpy.data.meshes.remove(block)
for block in bpy.data.materials:
if block.users == 0:
bpy.data.materials.remove(block)
for block in bpy.data.images:
if block.users == 0:
bpy.data.images.remove(block)
gc.collect()
bpy.context.scene.frame_set(0)
@staticmethod
def save_scene_info(scene_root_dir, display_table_config, target_name):
all_objects = bpy.data.objects
no_save_objects = {"plane_floor", "plane_ceil", "plane_wall_1", "plane_wall_2", "plane_wall_3", "plane_wall_4"}
no_save_objects.add(BlenderUtils.CAMERA_OBJECT_NAME)
no_save_objects.add(BlenderUtils.CAMERA_NAME)
no_save_objects.add(BlenderUtils.CAMERA_RIGHT_NAME)
no_save_objects.add(BlenderUtils.LIGHT_NAME)
no_save_objects.add(BlenderUtils.TABLE_NAME)
scene_info = {}
for obj in all_objects:
if obj.name not in no_save_objects and obj.name != BlenderUtils.DISPLAY_TABLE_NAME:
obj_info = {
"location": list(obj.location),
"rotation_euler": list(obj.rotation_euler),
"scale": list(obj.scale)
}
scene_info[obj.name] = obj_info
scene_info[BlenderUtils.DISPLAY_TABLE_NAME] = display_table_config
scene_info["target_name"] = target_name
scene_info_path = os.path.join(scene_root_dir, "scene_info.json")
with open(scene_info_path, "w") as outfile:
json.dump(scene_info, outfile)

View File

@ -1,320 +0,0 @@
import os
import random
import math
import bpy
import numpy as np
import mathutils
import requests
from blender.blender_util import BlenderUtils
from blender.view_sample_util import ViewSampleUtil
class DataGenerator:
def __init__(self, config):
self.plane_size = config["runner"]["generate"]["plane_size"]
self.table_model_path = config["runner"]["generate"]["table_model_path"]
self.output_dir = config["runner"]["generate"]["output_dir"]
self.random_config = config["runner"]["generate"]["random_config"]
self.light_and_camera_config = config["runner"]["generate"]["light_and_camera_config"]
self.obj_dir = config["runner"]["generate"]["object_dir"]
self.max_views = config["runner"]["generate"]["max_views"]
self.binocular_vision = config["runner"]["generate"]["binocular_vision"]
self.set_status_path = "http://localhost:5000/project/set_status"
self.log_path = "http://localhost:5000/project/add_log"
self.obj_name_list = os.listdir(self.obj_dir)
self.target_obj = None
self.stopped = False
self.random_obj_list = []
self.display_table_config = {}
BlenderUtils.setup_scene(self.light_and_camera_config, self.table_model_path, self.binocular_vision)
self.table = BlenderUtils.get_obj(BlenderUtils.TABLE_NAME)
self.access = self._check_set_status_access(self.set_status_path)
print(self.access)
def _check_set_status_access(self, url):
try:
response = requests.get(url, timeout=5)
return True
except requests.RequestException as e:
print(f"Cannot access {url}: {e}")
return False
def set_status(self, key, value):
if not self.access:
return
request_data = {}
request_data["status"] = {
"app_name" : "generate_view",
"runner_name" : "view_generator",
"key": key,
"value": value
}
requests.post(self.set_status_path, json=request_data)
def set_progress(self, key, curr_value, max_value):
if not self.access:
return
request_data = {}
request_data["progress"] = {
"app_name" : "generate_view",
"runner_name" : "view_generator",
"key": key,
"curr_value": curr_value,
"max_value": max_value
}
requests.post(self.set_status_path, json=request_data)
def add_log(self, msg, log_type):
if not self.access:
return
request_data = {"log":{}}
request_data["log"]["message"] = msg
request_data["log"]["log_type"] = log_type
requests.post(self.log_path, json=request_data)
def generate_display_platform(self):
config = self.random_config[BlenderUtils.DISPLAY_TABLE_NAME]
height = random.uniform(config["min_height"], config["max_height"])
radius = random.uniform(config["min_radius"], config["max_radius"])
R = random.uniform(config["min_R"], config["max_R"])
G = random.uniform(config["min_G"], config["max_G"])
B = random.uniform(config["min_B"], config["max_B"])
while height > 0.5 * radius:
height = random.uniform(config["min_height"], config["max_height"])
bpy.ops.mesh.primitive_cylinder_add(radius=radius, depth=height)
platform = bpy.context.selected_objects[-1]
platform.name = BlenderUtils.DISPLAY_TABLE_NAME
bbox = self.table.bound_box
bbox_world = [self.table.matrix_world @ mathutils.Vector(corner) for corner in bbox]
table_top_z = max([v.z for v in bbox_world])
platform.location = (0, 0, table_top_z + height / 2)
bpy.ops.rigidbody.object_add()
bpy.context.object.rigid_body.type = 'PASSIVE'
bpy.ops.object.shade_auto_smooth()
mat = bpy.data.materials.new(name="DarkGrayMaterial")
mat.diffuse_color = (R, G, B, 1.0)
if len(platform.data.materials) > 0:
platform.data.materials[0] = mat
else:
platform.data.materials.append(mat)
self.display_table_config = {
"height": height,
"radius": radius,
"R": R,
"G": G,
"B": B,
"location": list(platform.location)
}
return platform
def put_display_object(self, name):
config = self.random_config["display_object"]
x = random.uniform(config["min_x"], config["max_x"])
y = random.uniform(config["min_y"], config["max_y"])
z = random.uniform(config["min_z"], config["max_z"])
if random.random() <= config["random_rotation_ratio"]:
rotation = (
random.uniform(0, 2*np.pi),
random.uniform(0, 2*np.pi),
random.uniform(0, 2*np.pi)
)
else:
rotation = (0, 0, 0)
z=0.05
platform_bbox = self.platform.bound_box
platform_bbox_world = [self.platform.matrix_world @ mathutils.Vector(corner) for corner in platform_bbox]
platform_top_z = max([v.z for v in platform_bbox_world])
obj_mesh_path = BlenderUtils.get_obj_path(self.obj_dir,name)
obj = BlenderUtils.load_obj(name, obj_mesh_path)
obj_bottom_z = BlenderUtils.get_object_bottom_z(obj)
offset_z = obj_bottom_z
obj.rotation_euler = rotation
obj.location = (x, y, platform_top_z - offset_z + z)
bpy.ops.rigidbody.object_add()
bpy.context.object.rigid_body.type = 'ACTIVE'
self.target_obj = obj
def put_random_objects_on_table(self):
num_objects = self.random_config["random_objects"]["num"]
cluster = self.random_config["random_objects"]["cluster"]
for _ in range(num_objects):
obj_name = random.choice(self.obj_name_list)
obj_mesh_path = BlenderUtils.get_obj_path(self.obj_dir, obj_name)
obj = BlenderUtils.load_obj(obj_name, obj_mesh_path)
bbox = self.table.bound_box
bbox_world = [self.table.matrix_world @ mathutils.Vector(corner) for corner in bbox]
table_top_z = max([v.z for v in bbox_world])
platform_radius = self.platform.dimensions.x / 2.0
while True:
x = random.uniform(bbox_world[0].x*cluster, bbox_world[6].x*cluster)
y = random.uniform(bbox_world[0].y*cluster, bbox_world[6].y*cluster)
if math.sqrt(x**2 + y**2) > platform_radius*4:
break
rotation = (
random.uniform(0, 2 * np.pi),
random.uniform(0, 2 * np.pi),
random.uniform(0, 2 * np.pi)
)
obj_bottom_z = BlenderUtils.get_object_bottom_z(obj)
offset_z = obj_bottom_z
obj.rotation_euler = rotation
obj.location = (x, y, table_top_z - offset_z)
bpy.ops.rigidbody.object_add()
bpy.context.object.rigid_body.type = 'ACTIVE'
self.random_obj_list.append(obj)
def reset(self):
self.target_obj = None
self.random_obj_list = []
BlenderUtils.reset_objects_and_platform()
def check_moving_objects(self, previous_locations):
threshold = 0.01
moving_objects = False
target_checking_object = [self.target_obj] + self.random_obj_list
for obj in target_checking_object:
if obj.rigid_body:
current_location = obj.location
location_diff = (current_location - previous_locations[obj.name]).length
if location_diff > threshold:
moving_objects = True
break
return moving_objects
def check_and_adjust_target(self):
target_position = self.target_obj.matrix_world.translation
msg = "success"
if abs(target_position[0]) > self.random_config["display_object"]["max_x"]:
target_position[0] = np.sign(target_position[0]) * self.random_config["display_object"]["max_x"]*random.uniform(-0.5,0.5)
msg = "adjusted"
if abs(target_position[1]) > self.random_config["display_object"]["max_y"]:
target_position[1] = np.sign(target_position[1]) * self.random_config["display_object"]["max_y"]*random.uniform(-0.5,0.5)
msg = "adjusted"
if target_position[2] < 0.85:
target_position[2] = target_position[2] + 0.1
msg = "adjusted"
self.target_obj.location = target_position
return msg
def start_render(self):
object_name = self.target_obj.name
if "." in object_name:
object_name = object_name.split(".")[0]
scene_dir = os.path.join(self.output_dir, object_name)
if not os.path.exists(scene_dir):
os.makedirs(scene_dir)
view_data = ViewSampleUtil.sample_view_data_world_space(self.target_obj, distance_range=(0.3,0.5), voxel_size=0.005, max_views=self.max_views)
object_points = np.array(view_data["voxel_down_sampled_points"])
normals = np.array(view_data["normals"])
points_normals = np.concatenate((object_points, normals), axis=1)
np.savetxt(os.path.join(scene_dir, "points_and_normals.txt"), points_normals)
for i, cam_pose in enumerate(view_data["cam_poses"]):
BlenderUtils.set_camera_at(cam_pose)
BlenderUtils.render_and_save(scene_dir, f"{i}", object_name, binocular_vision=self.binocular_vision)
BlenderUtils.save_cam_params(scene_dir, i, binocular_vision=self.binocular_vision)
self.set_progress("render frame", i, len(view_data["cam_poses"]))
self.set_progress("render frame", len(view_data["cam_poses"]), len(view_data["cam_poses"]))
BlenderUtils.save_scene_info(scene_dir, self.display_table_config, object_name)
def simulate_scene(self, frame_limit=120, depth = 0):
bpy.context.view_layer.update()
bpy.ops.screen.animation_play()
previous_locations = {obj.name: obj.matrix_world.translation.copy() for obj in bpy.context.scene.objects if obj.rigid_body}
frame_count = 1
moving_objects = True
while frame_count < frame_limit:
bpy.context.view_layer.update()
if frame_count%10 == 0:
moving_objects = self.check_moving_objects(previous_locations)
if not moving_objects:
break
frame_count += 1
bpy.context.scene.frame_set(bpy.context.scene.frame_current + 1)
previous_locations = {obj.name: obj.matrix_world.translation.copy() for obj in bpy.context.scene.objects if obj.rigid_body}
bpy.ops.screen.animation_cancel(restore_frame=False)
msg = self.check_and_adjust_target()
if msg == "adjusted" and depth < 3:
bpy.context.view_layer.update()
bpy.context.scene.frame_set(0)
return self.simulate_scene(depth = depth + 1)
elif msg == "success":
print("Scene generation completed.")
self.start_render()
return msg
return "retry"
def gen_scene_data(self, object_name):
bpy.context.scene.frame_set(0)
self.platform = self.generate_display_platform()
self.put_display_object(object_name)
diag = BlenderUtils.get_obj_diag(self.target_obj.name)
self.set_status("target_diagonal", diag)
if diag > 0.7 or diag < 0.1:
self.add_log(f"The diagonal size of the object <{object_name}>(size: {round(diag,3)}) does not meet the requirements.", "error")
return "diag_error"
self.put_random_objects_on_table()
return self.simulate_scene()
def gen_all_scene_data(self):
max_retry_times = 3
total = len(self.obj_name_list)
count = 0
count_success = 0
self.set_progress("generate scene", 0, total)
result = "retry"
for target_obj_name in self.obj_name_list:
self.add_log(f"Generating scene for object <{target_obj_name}>", "info")
retry_times = 0
self.set_status("target_object", target_obj_name)
while retry_times < 3 and result == "retry":
self.reset()
try:
result = self.gen_scene_data(target_obj_name)
except Exception as e:
self.add_log(f"Uknown error: {e}", "error")
result = "unknown_error"
if result == "retry":
retry_times += 1
self.add_log(f"Maximum adjust times, retrying <{target_obj_name}>. ({retry_times}/{max_retry_times}) ", "warning")
count += 1
if result == "success":
count_success += 1
self.add_log(f"Scene for object <{target_obj_name}> generated successfully", "success")
if result == "retry" and retry_times >= max_retry_times:
self.add_log(f"Maximum retries, failed to generate scene for object <{target_obj_name}>", "error")
self.set_status("success", count_success)
self.set_status("fail", count - count_success)
self.set_progress("generate scene", count, total)
result = "retry"

View File

@ -1,14 +0,0 @@
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import yaml
from blender.data_generator import DataGenerator
if __name__ == "__main__":
config_path = sys.argv[sys.argv.index('--') + 1]
with open(config_path, "r") as file:
config = yaml.safe_load(file)
dg = DataGenerator(config)
dg.gen_all_scene_data()

View File

@ -1,130 +0,0 @@
import numpy as np
import bmesh
from collections import defaultdict
class ViewSampleUtil:
@staticmethod
def voxel_downsample(points, voxel_size):
voxel_grid = defaultdict(list)
for i, point in enumerate(points):
voxel_index = tuple((point // voxel_size).astype(int))
voxel_grid[voxel_index].append(i)
downsampled_points = []
downsampled_indices = []
for indices in voxel_grid.values():
selected_index = indices[0]
downsampled_points.append(points[selected_index])
downsampled_indices.append(selected_index)
return np.array(downsampled_points), downsampled_indices
@staticmethod
def sample_view_data(obj, distance_range:tuple = (0.3,0.5), voxel_size:float = 0.005, max_views: int = 1) -> dict:
view_data = {
"look_at_points": [],
"cam_positions": [],
}
mesh = obj.data
bm = bmesh.new()
bm.from_mesh(mesh)
bm.verts.ensure_lookup_table()
bm.faces.ensure_lookup_table()
bm.normal_update()
look_at_points = []
cam_positions = []
normals = []
for v in bm.verts:
look_at_point = np.array(v.co)
view_data["look_at_points"].append(look_at_point)
normal = np.zeros(3)
for loop in v.link_loops:
normal += np.array(loop.calc_normal())
normal /= len(v.link_loops)
normal = normal / np.linalg.norm(normal)
if np.isnan(normal).any():
continue
if np.dot(normal, look_at_point) < 0:
normal = -normal
distance = np.random.uniform(*distance_range)
cam_position = look_at_point + distance * normal
look_at_points.append(look_at_point)
cam_positions.append(cam_position)
normals.append(normal)
bm.free()
look_at_points = np.array(look_at_points)
cam_positions = np.array(cam_positions)
voxel_downsampled_look_at_points, selected_indices = ViewSampleUtil.voxel_downsample(look_at_points, voxel_size)
voxel_downsampled_cam_positions = cam_positions[selected_indices]
voxel_downsampled_normals = np.array(normals)[selected_indices]
if len(voxel_downsampled_look_at_points) > max_views*2:
indices = np.random.choice(len(voxel_downsampled_look_at_points), max_views*2, replace=False)
downsampled_look_at_points = voxel_downsampled_look_at_points[indices]
downsampled_cam_positions = voxel_downsampled_cam_positions[indices]
view_data["look_at_points"] = downsampled_look_at_points.tolist()
view_data["cam_positions"] = downsampled_cam_positions.tolist()
view_data["normals"] = voxel_downsampled_normals
view_data["voxel_down_sampled_points"] = voxel_downsampled_look_at_points
return view_data
@staticmethod
def get_world_points_and_normals(view_data: dict, obj_world_pose: np.ndarray) -> tuple:
world_points = []
world_normals = []
for voxel_down_sampled_points, normal in zip(view_data["voxel_down_sampled_points"], view_data["normals"]):
voxel_down_sampled_points_world = obj_world_pose @ np.append(voxel_down_sampled_points, 1.0)
normal_world = obj_world_pose[:3, :3] @ normal
world_points.append(voxel_down_sampled_points_world[:3])
world_normals.append(normal_world)
return np.array(world_points), np.array(world_normals)
@staticmethod
def get_cam_pose(view_data: dict, obj_world_pose: np.ndarray, max_views: int) -> np.ndarray:
cam_poses = []
min_height_z = 1000
for look_at_point, cam_position in zip(view_data["look_at_points"], view_data["cam_positions"]):
look_at_point_world = obj_world_pose @ np.append(look_at_point, 1.0)
cam_position_world = obj_world_pose @ np.append(cam_position, 1.0)
if look_at_point_world[2] < min_height_z:
min_height_z = look_at_point_world[2]
look_at_point_world = look_at_point_world[:3]
cam_position_world = cam_position_world[:3]
forward_vector = cam_position_world - look_at_point_world
forward_vector /= np.linalg.norm(forward_vector)
up_vector = np.array([0, 0, 1])
right_vector = np.cross(up_vector, forward_vector)
right_vector /= np.linalg.norm(right_vector)
corrected_up_vector = np.cross(forward_vector, right_vector)
rotation_matrix = np.array([right_vector, corrected_up_vector, forward_vector]).T
cam_pose = np.eye(4)
cam_pose[:3, :3] = rotation_matrix
cam_pose[:3, 3] = cam_position_world
cam_poses.append(cam_pose)
filtered_cam_poses = []
for cam_pose in cam_poses:
if cam_pose[2, 3] > min_height_z:
filtered_cam_poses.append(cam_pose)
if len(filtered_cam_poses) > max_views:
indices = np.random.choice(len(filtered_cam_poses), max_views, replace=False)
filtered_cam_poses = [filtered_cam_poses[i] for i in indices]
return np.array(filtered_cam_poses)
@staticmethod
def sample_view_data_world_space(obj, distance_range:tuple = (0.3,0.5), voxel_size:float = 0.005, max_views: int=1) -> dict:
obj_world_pose = np.asarray(obj.matrix_world)
view_data = ViewSampleUtil.sample_view_data(obj, distance_range, voxel_size, max_views)
view_data["cam_poses"] = ViewSampleUtil.get_cam_pose(view_data, obj_world_pose, max_views)
view_data["voxel_down_sampled_points"], view_data["normals"] = ViewSampleUtil.get_world_points_and_normals(view_data, obj_world_pose)
return view_data

View File

@ -13,13 +13,14 @@ runner:
generate:
voxel_threshold: 0.005
overlap_threshold: 0.5
save_points: False
to_specified_dir: True # if True, output_dir is used, otherwise, root_dir is used
save_points: True
dataset_list:
- OmniObject3d
datasets:
OmniObject3d:
model_dir: "H:\\AI\\Datasets\\scaled_object_meshes"
root_dir: "C:\\Document\\Local Project\\nbv_rec\\data\\sample"
root_dir: "/media/hofee/data/data/temp_output"
output_dir: "/media/hofee/data/data/label_output"

View File

@ -7,18 +7,18 @@ runner:
name: debug
root_dir: experiments
generate:
object_dir: H:\AI\Datasets\scaled_object_meshes
table_model_path: C:\Users\hofee\Desktop\blender\table.obj
output_dir: C:\Document\Local Project\nbv_rec\nbv_reconstruction\temp
object_dir: /media/hofee/data/data/scaled_object_meshes
table_model_path: /media/hofee/data/data/others/table.obj
output_dir: /media/hofee/data/data/temp_output
binocular_vision: true
plane_size: 10
max_views: 10
max_views: 100
random_config:
display_table:
min_height: 0.05
max_height: 0.15
min_radius: 0.1
max_radius: 0.2
min_radius: 0.3
max_radius: 0.5
min_R: 0.05
max_R: 0.3
min_G: 0.05
@ -42,7 +42,8 @@ runner:
far_plane: 5
fov_vertical: 25
resolution: [1280,800]
eye_distance: 0.06
eye_distance: 0.15
eye_angle: 25
Light:
location: [0,0,3.5]
orientation: [0,0,0]

View File

@ -22,6 +22,8 @@ class StrategyGenerator(Runner):
"app_name": "generate",
"runner_name": "strategy_generator"
}
self.to_specified_dir = ConfigManager.get("runner", "generate", "to_specified_dir")
def run(self):
dataset_name_list = ConfigManager.get("runner", "generate", "dataset_list")
@ -31,15 +33,13 @@ class StrategyGenerator(Runner):
dataset_name = dataset_name_list[dataset_idx]
status_manager.set_progress("generate", "strategy_generator", "dataset", dataset_idx, len(dataset_name_list))
root_dir = ConfigManager.get("datasets", dataset_name, "root_dir")
model_dir = ConfigManager.get("datasets", dataset_name, "model_dir")
scene_name_list = os.listdir(root_dir)
scene_name_list = os.listdir(root_dir)[:10]
cnt = 0
total = len(scene_name_list)
for scene_name in scene_name_list:
Log.info(f"({dataset_name})Processing [{cnt}/{total}]: {scene_name}")
status_manager.set_progress("generate", "strategy_generator", "scene", cnt, total)
self.generate_sequence(root_dir, model_dir, scene_name,voxel_threshold, overlap_threshold)
self.generate_sequence(root_dir, dataset_name, scene_name,voxel_threshold, overlap_threshold, )
cnt += 1
status_manager.set_progress("generate", "strategy_generator", "scene", total, total)
status_manager.set_progress("generate", "strategy_generator", "dataset", len(dataset_name_list), len(dataset_name_list))
@ -52,20 +52,21 @@ class StrategyGenerator(Runner):
def load_experiment(self, backup_name=None):
super().load_experiment(backup_name)
def generate_sequence(self, root, model_dir, scene_name, voxel_threshold, overlap_threshold):
def generate_sequence(self, root, dataset_name, scene_name, voxel_threshold, overlap_threshold):
status_manager.set_status("generate", "strategy_generator", "scene", scene_name)
frame_num = DataLoadUtil.get_scene_seq_length(root, scene_name)
model_pts = DataLoadUtil.load_original_model_points(model_dir, scene_name)
model_points_normals = DataLoadUtil.load_points_normals(root, scene_name)
model_pts = model_points_normals[:,:3]
down_sampled_model_pts = PtsUtil.voxel_downsample_point_cloud(model_pts, voxel_threshold)
obj_pose = DataLoadUtil.load_target_object_pose(root, scene_name)
down_sampled_transformed_model_pts = PtsUtil.transform_point_cloud(down_sampled_model_pts, obj_pose)
pts_list = []
for frame_idx in range(frame_num):
path = DataLoadUtil.get_path(root, scene_name, frame_idx)
cam_params = DataLoadUtil.load_cam_info(path, binocular=True)
status_manager.set_progress("generate", "strategy_generator", "loading frame", frame_idx, frame_num)
point_cloud = DataLoadUtil.get_point_cloud_world_from_path(path)
sampled_point_cloud = PtsUtil.voxel_downsample_point_cloud(point_cloud, voxel_threshold)
point_cloud = DataLoadUtil.get_target_point_cloud_world_from_path(path, binocular=True)
sampled_point_cloud = ReconstructionUtil.filter_points(point_cloud, model_points_normals, cam_pose=cam_params["cam_to_world"], voxel_size=voxel_threshold, theta=45)
if self.save_pts:
pts_dir = os.path.join(root,scene_name, "pts")
if not os.path.exists(pts_dir):
@ -74,7 +75,7 @@ class StrategyGenerator(Runner):
pts_list.append(sampled_point_cloud)
status_manager.set_progress("generate", "strategy_generator", "loading frame", frame_num, frame_num)
limited_useful_view, _ = ReconstructionUtil.compute_next_best_view_sequence_with_overlap(down_sampled_transformed_model_pts, pts_list, threshold=voxel_threshold, overlap_threshold=overlap_threshold, status_info=self.status_info)
limited_useful_view, _ = ReconstructionUtil.compute_next_best_view_sequence_with_overlap(down_sampled_model_pts, pts_list, threshold=voxel_threshold, overlap_threshold=overlap_threshold, status_info=self.status_info)
data_pairs = self.generate_data_pairs(limited_useful_view)
seq_save_data = {
"data_pairs": data_pairs,
@ -84,12 +85,18 @@ class StrategyGenerator(Runner):
status_manager.set_status("generate", "strategy_generator", "max_coverage_rate", limited_useful_view[-1][1])
Log.success(f"Scene <{scene_name}> Finished, Max Coverage Rate: {limited_useful_view[-1][1]}, Best Sequence length: {len(limited_useful_view)}")
if self.to_specified_dir:
output_dir = ConfigManager.get("datasets", dataset_name,"output_dir")
output_label_path = os.path.join(output_dir, f"{scene_name}.json")
if not os.path.exists(output_dir):
os.makedirs(output_dir)
else:
output_label_path = DataLoadUtil.get_label_path(root, scene_name)
with open(output_label_path, 'w') as f:
json.dump(seq_save_data, f)
DataLoadUtil.save_downsampled_world_model_points(root, scene_name, down_sampled_transformed_model_pts)
DataLoadUtil.save_downsampled_world_model_points(root, scene_name, down_sampled_model_pts)
def generate_data_pairs(self, useful_view):
data_pairs = []

View File

@ -9,7 +9,7 @@ class ViewGenerator(Runner):
self.config_path = config_path
def run(self):
subprocess.run(['blender', '-b', '-P', './blender/run_blender.py', '--', self.config_path])
subprocess.run(['blender', '-b', '-P', '../blender/run_blender.py', '--', self.config_path])
def create_experiment(self, backup_name=None):
return super().create_experiment(backup_name)

View File

@ -62,15 +62,47 @@ class DataLoadUtil:
return pose_mat
@staticmethod
def load_depth(path):
depth_path = os.path.join(os.path.dirname(path), "depth", os.path.basename(path) + ".png")
depth = cv2.imread(depth_path, cv2.IMREAD_UNCHANGED)
def load_depth(path, min_depth=0.01,max_depth=5.0,binocular=False):
def load_depth_from_real_path(real_path, min_depth, max_depth):
depth = cv2.imread(real_path, cv2.IMREAD_UNCHANGED)
depth = depth.astype(np.float32) / 65535.0
min_depth = 0.01
max_depth = 5.0
min_depth = min_depth
max_depth = max_depth
depth_meters = min_depth + (max_depth - min_depth) * depth
return depth_meters
if binocular:
depth_path_L = os.path.join(os.path.dirname(path), "depth", os.path.basename(path) + "_L.png")
depth_path_R = os.path.join(os.path.dirname(path), "depth", os.path.basename(path) + "_R.png")
depth_meters_L = load_depth_from_real_path(depth_path_L, min_depth, max_depth)
depth_meters_R = load_depth_from_real_path(depth_path_R, min_depth, max_depth)
return depth_meters_L, depth_meters_R
else:
depth_path = os.path.join(os.path.dirname(path), "depth", os.path.basename(path) + ".png")
depth_meters = load_depth_from_real_path(depth_path, min_depth, max_depth)
return depth_meters
@staticmethod
def load_seg(path, binocular=False):
if binocular:
def clean_mask(mask_image):
green = [0, 255, 0, 255]
red = [255, 0, 0, 255]
threshold = 2
mask_image = np.where(np.abs(mask_image - green) <= threshold, green, mask_image)
mask_image = np.where(np.abs(mask_image - red) <= threshold, red, mask_image)
return mask_image
mask_path_L = os.path.join(os.path.dirname(path), "mask", os.path.basename(path) + "_L.png")
mask_image_L = clean_mask(cv2.imread(mask_path_L, cv2.IMREAD_UNCHANGED))
mask_path_R = os.path.join(os.path.dirname(path), "mask", os.path.basename(path) + "_R.png")
mask_image_R = clean_mask(cv2.imread(mask_path_R, cv2.IMREAD_UNCHANGED))
return mask_image_L, mask_image_R
else:
mask_path = os.path.join(os.path.dirname(path), "mask", os.path.basename(path) + ".png")
mask_image = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
return mask_image
@staticmethod
def load_label(path):
with open(path, 'r') as f:
@ -83,12 +115,6 @@ class DataLoadUtil:
rgb_image = cv2.imread(rgb_path, cv2.IMREAD_COLOR)
return rgb_image
@staticmethod
def load_seg(path):
mask_path = os.path.join(os.path.dirname(path), "mask", os.path.basename(path) + ".png")
mask_image = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
return mask_image
@staticmethod
def cam_pose_transformation(cam_pose_before):
offset = np.asarray([
@ -100,20 +126,27 @@ class DataLoadUtil:
return cam_pose_after
@staticmethod
def load_cam_info(path):
def load_cam_info(path, binocular=False):
camera_params_path = os.path.join(os.path.dirname(path), "camera_params", os.path.basename(path) + ".json")
with open(camera_params_path, 'r') as f:
label_data = json.load(f)
cam_to_world = np.asarray(label_data["extrinsic"])
cam_to_world = DataLoadUtil.cam_pose_transformation(cam_to_world)
cam_intrinsic = np.asarray(label_data["intrinsic"])
return {
cam_info = {
"cam_to_world": cam_to_world,
"cam_intrinsic": cam_intrinsic
"cam_intrinsic": cam_intrinsic,
"far_plane": label_data["far_plane"],
"near_plane": label_data["near_plane"]
}
if binocular:
cam_to_world_R = np.asarray(label_data["extrinsic_R"])
cam_to_world_R = DataLoadUtil.cam_pose_transformation(cam_to_world_R)
cam_info["cam_to_world_R"] = cam_to_world_R
return cam_info
@staticmethod
def get_target_point_cloud(depth, cam_intrinsic, cam_extrinsic, mask, target_mask_label=255):
def get_target_point_cloud(depth, cam_intrinsic, cam_extrinsic, mask, target_mask_label=(0,255,0,255)):
h, w = depth.shape
i, j = np.meshgrid(np.arange(w), np.arange(h), indexing='xy')
@ -122,9 +155,10 @@ class DataLoadUtil:
y = (j - cam_intrinsic[1, 2]) * z / cam_intrinsic[1, 1]
points_camera = np.stack((x, y, z), axis=-1).reshape(-1, 3)
mask = mask.reshape(-1)
mask = mask.reshape(-1,4)
target_mask = (mask == target_mask_label).all(axis=-1)
target_mask = mask == target_mask_label
target_points_camera = points_camera[target_mask]
target_points_camera_aug = np.concatenate([target_points_camera, np.ones((target_points_camera.shape[0], 1))], axis=-1)
@ -134,21 +168,44 @@ class DataLoadUtil:
"points_camera": target_points_camera
}
@staticmethod
def get_point_cloud_world_from_path(path):
cam_info = DataLoadUtil.load_cam_info(path)
depth = DataLoadUtil.load_depth(path)
def get_target_point_cloud_world_from_path(path, binocular=False):
cam_info = DataLoadUtil.load_cam_info(path, binocular=binocular)
if binocular:
voxel_size = 0.0005
depth_L, depth_R = DataLoadUtil.load_depth(path, cam_info['near_plane'], cam_info['far_plane'], binocular=True)
mask_L, mask_R = DataLoadUtil.load_seg(path, binocular=True)
point_cloud_L = DataLoadUtil.get_target_point_cloud(depth_L, cam_info['cam_intrinsic'], cam_info['cam_to_world'], mask_L)['points_world']
point_cloud_R = DataLoadUtil.get_target_point_cloud(depth_R, cam_info['cam_intrinsic'], cam_info['cam_to_world_R'], mask_R)['points_world']
overlap_points = DataLoadUtil.get_overlapping_points(point_cloud_L, point_cloud_R, voxel_size)
return overlap_points
else:
depth = DataLoadUtil.load_depth(path, cam_info['near_plane'], cam_info['far_plane'])
mask = DataLoadUtil.load_seg(path)
point_cloud = DataLoadUtil.get_target_point_cloud(depth, cam_info['cam_intrinsic'], cam_info['cam_to_world'], mask)
return point_cloud['points_world']
point_cloud = DataLoadUtil.get_target_point_cloud(depth, cam_info['cam_intrinsic'], cam_info['cam_to_world'], mask)['points_world']
return point_cloud
@staticmethod
def get_point_cloud_list_from_seq(root, scene_name, num_frames):
point_cloud_list = []
for frame_idx in range(num_frames):
path = DataLoadUtil.get_path(root, scene_name, frame_idx)
point_cloud = DataLoadUtil.get_point_cloud_world_from_path(path)
point_cloud_list.append(point_cloud)
return point_cloud_list
def voxelize_points(points, voxel_size):
voxel_indices = np.floor(points / voxel_size).astype(np.int32)
unique_voxels = np.unique(voxel_indices, axis=0, return_inverse=True)
return unique_voxels
@staticmethod
def get_overlapping_points(point_cloud_L, point_cloud_R, voxel_size=0.005):
voxels_L, indices_L = DataLoadUtil.voxelize_points(point_cloud_L, voxel_size)
voxels_R, _ = DataLoadUtil.voxelize_points(point_cloud_R, voxel_size)
voxel_indices_L = voxels_L.view([('', voxels_L.dtype)]*3)
voxel_indices_R = voxels_R.view([('', voxels_R.dtype)]*3)
overlapping_voxels = np.intersect1d(voxel_indices_L, voxel_indices_R)
mask_L = np.isin(indices_L, np.where(np.isin(voxel_indices_L, overlapping_voxels))[0])
overlapping_points = point_cloud_L[mask_L]
return overlapping_points
@staticmethod
def load_points_normals(root, scene_name):
points_path = os.path.join(root, scene_name, "points_and_normals.txt")
points_normals = np.loadtxt(points_path)
return points_normals

View File

@ -102,4 +102,19 @@ class ReconstructionUtil:
sm.set_progress(app_name, runner_name, "processed view", len(point_cloud_list), len(point_cloud_list))
return view_sequence, remaining_views
@staticmethod
def filter_points(points, points_normals, cam_pose, voxel_size=0.005, theta=45):
sampled_points = PtsUtil.voxel_downsample_point_cloud(points, voxel_size)
kdtree = cKDTree(points_normals[:,:3])
_, indices = kdtree.query(sampled_points)
nearest_points = points_normals[indices]
normals = nearest_points[:, 3:]
camera_axis = -cam_pose[:3, 2]
normals_normalized = normals / np.linalg.norm(normals, axis=1, keepdims=True)
cos_theta = np.dot(normals_normalized, camera_axis)
theta_rad = np.deg2rad(theta)
filtered_sampled_points= sampled_points[cos_theta > np.cos(theta_rad)]
return filtered_sampled_points[:, :3]