345 lines
15 KiB
Python
345 lines
15 KiB
Python
|
|
import os
|
|
import random
|
|
import math
|
|
import bpy
|
|
import numpy as np
|
|
import mathutils
|
|
import requests
|
|
from blender.blender_util import BlenderUtils
|
|
from blender.view_sample_util import ViewSampleUtil
|
|
|
|
class DataGenerator:
|
|
def __init__(self, config):
|
|
self.plane_size = config["runner"]["generate"]["plane_size"]
|
|
self.table_model_path = config["runner"]["generate"]["table_model_path"]
|
|
self.output_dir = config["runner"]["generate"]["output_dir"]
|
|
self.random_config = config["runner"]["generate"]["random_config"]
|
|
self.light_and_camera_config = config["runner"]["generate"]["light_and_camera_config"]
|
|
self.obj_dir = config["runner"]["generate"]["object_dir"]
|
|
self.max_views = config["runner"]["generate"]["max_views"]
|
|
self.min_views = config["runner"]["generate"]["min_views"]
|
|
self.min_diag = config["runner"]["generate"]["min_diag"]
|
|
self.max_diag = config["runner"]["generate"]["max_diag"]
|
|
self.binocular_vision = config["runner"]["generate"]["binocular_vision"]
|
|
self.set_status_path = "http://localhost:5000/project/set_status"
|
|
self.log_path = "http://localhost:5000/project/add_log"
|
|
self.obj_name_list = os.listdir(self.obj_dir)
|
|
self.target_obj = None
|
|
self.stopped = False
|
|
self.random_obj_list = []
|
|
self.display_table_config = {}
|
|
BlenderUtils.setup_scene(self.light_and_camera_config, self.table_model_path, self.binocular_vision)
|
|
self.table = BlenderUtils.get_obj(BlenderUtils.TABLE_NAME)
|
|
self.access = self._check_set_status_access(self.set_status_path)
|
|
print(self.access)
|
|
|
|
def _check_set_status_access(self, url):
|
|
try:
|
|
response = requests.get(url, timeout=5)
|
|
return True
|
|
except requests.RequestException as e:
|
|
print(f"Cannot access {url}: {e}")
|
|
return False
|
|
|
|
def set_status(self, key, value):
|
|
if not self.access:
|
|
return
|
|
request_data = {}
|
|
request_data["status"] = {
|
|
"app_name" : "generate_view",
|
|
"runner_name" : "view_generator",
|
|
"key": key,
|
|
"value": value
|
|
}
|
|
requests.post(self.set_status_path, json=request_data)
|
|
|
|
def set_progress(self, key, curr_value, max_value):
|
|
if not self.access:
|
|
return
|
|
request_data = {}
|
|
request_data["progress"] = {
|
|
"app_name" : "generate_view",
|
|
"runner_name" : "view_generator",
|
|
"key": key,
|
|
"curr_value": curr_value,
|
|
"max_value": max_value
|
|
}
|
|
requests.post(self.set_status_path, json=request_data)
|
|
|
|
def add_log(self, msg, log_type):
|
|
if not self.access:
|
|
return
|
|
request_data = {"log":{}}
|
|
request_data["log"]["message"] = msg
|
|
request_data["log"]["log_type"] = log_type
|
|
requests.post(self.log_path, json=request_data)
|
|
|
|
def generate_display_platform(self):
|
|
config = self.random_config[BlenderUtils.DISPLAY_TABLE_NAME]
|
|
|
|
height = random.uniform(config["min_height"], config["max_height"])
|
|
radius = random.uniform(config["min_radius"], config["max_radius"])
|
|
R = random.uniform(config["min_R"], config["max_R"])
|
|
G = random.uniform(config["min_G"], config["max_G"])
|
|
B = random.uniform(config["min_B"], config["max_B"])
|
|
while height > 0.5 * radius:
|
|
height = random.uniform(config["min_height"], config["max_height"])
|
|
|
|
bpy.ops.mesh.primitive_cylinder_add(radius=radius, depth=height)
|
|
platform = bpy.context.selected_objects[-1]
|
|
platform.name = BlenderUtils.DISPLAY_TABLE_NAME
|
|
|
|
bbox = self.table.bound_box
|
|
bbox_world = [self.table.matrix_world @ mathutils.Vector(corner) for corner in bbox]
|
|
table_top_z = max([v.z for v in bbox_world])
|
|
|
|
platform.location = (0, 0, table_top_z + height / 2)
|
|
|
|
bpy.ops.rigidbody.object_add()
|
|
bpy.context.object.rigid_body.type = 'PASSIVE'
|
|
bpy.ops.object.shade_auto_smooth()
|
|
|
|
mat = bpy.data.materials.new(name="RedMaterial")
|
|
mat.diffuse_color = (1.0, 0.0, 0.0, 1.0) # Red with full alpha (1.0)
|
|
if len(platform.data.materials) > 0:
|
|
platform.data.materials[0] = mat
|
|
else:
|
|
platform.data.materials.append(mat)
|
|
|
|
self.display_table_config = {
|
|
"height": height,
|
|
"radius": radius,
|
|
"R": R,
|
|
"G": G,
|
|
"B": B,
|
|
"location": list(platform.location)
|
|
}
|
|
return platform
|
|
|
|
def put_display_object(self, name):
|
|
config = self.random_config["display_object"]
|
|
|
|
x = random.uniform(config["min_x"], config["max_x"])
|
|
y = random.uniform(config["min_y"], config["max_y"])
|
|
z = random.uniform(config["min_z"], config["max_z"])
|
|
|
|
if random.random() <= config["random_rotation_ratio"]:
|
|
rotation = (
|
|
random.uniform(0, 2*np.pi),
|
|
random.uniform(0, 2*np.pi),
|
|
random.uniform(0, 2*np.pi)
|
|
)
|
|
else:
|
|
rotation = (0, 0, 0)
|
|
z=0.05
|
|
|
|
platform_bbox = self.platform.bound_box
|
|
platform_bbox_world = [self.platform.matrix_world @ mathutils.Vector(corner) for corner in platform_bbox]
|
|
platform_top_z = max([v.z for v in platform_bbox_world])
|
|
|
|
obj_mesh_path = BlenderUtils.get_obj_path(self.obj_dir,name)
|
|
obj = BlenderUtils.load_obj(name, obj_mesh_path)
|
|
|
|
obj_bottom_z = BlenderUtils.get_object_bottom_z(obj)
|
|
offset_z = obj_bottom_z
|
|
|
|
obj.rotation_euler = rotation
|
|
obj.location = (x, y, platform_top_z - offset_z + z)
|
|
|
|
bpy.ops.rigidbody.object_add()
|
|
bpy.context.object.rigid_body.type = 'ACTIVE'
|
|
mat = bpy.data.materials.new(name="GreenMaterial")
|
|
mat.diffuse_color = (0.0, 1.0, 0.0, 1.0) # Green with full alpha (1.0)
|
|
if len(obj.data.materials) > 0:
|
|
obj.data.materials[0] = mat
|
|
else:
|
|
obj.data.materials.append(mat)
|
|
self.target_obj = obj
|
|
|
|
|
|
def put_random_objects_on_table(self):
|
|
num_objects = self.random_config["random_objects"]["num"]
|
|
cluster = self.random_config["random_objects"]["cluster"]
|
|
for _ in range(num_objects):
|
|
obj_name = random.choice(self.obj_name_list)
|
|
print(obj_name)
|
|
obj_mesh_path = BlenderUtils.get_obj_path(self.obj_dir, obj_name)
|
|
obj = BlenderUtils.load_obj(obj_name, obj_mesh_path)
|
|
|
|
bbox = self.table.bound_box
|
|
bbox_world = [self.table.matrix_world @ mathutils.Vector(corner) for corner in bbox]
|
|
table_top_z = max([v.z for v in bbox_world])
|
|
|
|
platform_radius = self.platform.dimensions.x / 2.0
|
|
|
|
try_times = 0
|
|
while True:
|
|
x = random.uniform(bbox_world[0].x*cluster, bbox_world[6].x*cluster)
|
|
y = random.uniform(bbox_world[0].y*cluster, bbox_world[6].y*cluster)
|
|
if math.sqrt(x**2 + y**2) > platform_radius*2 or try_times > 10:
|
|
break
|
|
try_times += 1
|
|
if try_times > 10:
|
|
continue
|
|
|
|
rotation = (
|
|
random.uniform(0, 2 * np.pi),
|
|
random.uniform(0, 2 * np.pi),
|
|
random.uniform(0, 2 * np.pi)
|
|
)
|
|
|
|
obj_bottom_z = BlenderUtils.get_object_bottom_z(obj)
|
|
offset_z = obj_bottom_z
|
|
|
|
obj.rotation_euler = rotation
|
|
obj.location = (x, y, table_top_z - offset_z)
|
|
|
|
bpy.ops.rigidbody.object_add()
|
|
bpy.context.object.rigid_body.type = 'ACTIVE'
|
|
self.random_obj_list.append(obj)
|
|
|
|
def reset(self):
|
|
self.target_obj = None
|
|
self.random_obj_list = []
|
|
BlenderUtils.reset_objects_and_platform()
|
|
|
|
def check_moving_objects(self, previous_locations):
|
|
threshold = 0.01
|
|
moving_objects = False
|
|
target_checking_object = [self.target_obj] + self.random_obj_list
|
|
for obj in target_checking_object:
|
|
if obj.rigid_body:
|
|
current_location = obj.location
|
|
location_diff = (current_location - previous_locations[obj.name]).length
|
|
if location_diff > threshold:
|
|
moving_objects = True
|
|
break
|
|
return moving_objects
|
|
|
|
def check_and_adjust_target(self):
|
|
target_position = self.target_obj.matrix_world.translation
|
|
msg = "success"
|
|
if abs(target_position[0]) > self.random_config["display_object"]["max_x"]:
|
|
target_position[0] = np.sign(target_position[0]) * self.random_config["display_object"]["max_x"]*random.uniform(-0.5,0.5)
|
|
msg = "adjusted"
|
|
if abs(target_position[1]) > self.random_config["display_object"]["max_y"]:
|
|
target_position[1] = np.sign(target_position[1]) * self.random_config["display_object"]["max_y"]*random.uniform(-0.5,0.5)
|
|
msg = "adjusted"
|
|
if target_position[2] < 0.85:
|
|
target_position[2] = target_position[2] + 0.1
|
|
msg = "adjusted"
|
|
self.target_obj.location = target_position
|
|
return msg
|
|
|
|
def start_render(self, diag=0):
|
|
object_name = self.target_obj.name
|
|
if "." in object_name:
|
|
object_name = object_name.split(".")[0]
|
|
scene_dir = os.path.join(self.output_dir, object_name)
|
|
if not os.path.exists(scene_dir):
|
|
os.makedirs(scene_dir)
|
|
view_num = int(self.min_views + (diag - self.min_diag)/(self.max_diag - self.min_diag) * (self.max_views - self.min_views))
|
|
view_data = ViewSampleUtil.sample_view_data_world_space(self.target_obj, distance_range=(0.2,0.4), voxel_size=0.005, max_views=view_num)
|
|
object_points = np.array(view_data["voxel_down_sampled_points"])
|
|
normals = np.array(view_data["normals"])
|
|
points_normals = np.concatenate((object_points, normals), axis=1)
|
|
|
|
np.savetxt(os.path.join(scene_dir, "points_and_normals.txt"), points_normals)
|
|
for i, cam_pose in enumerate(view_data["cam_poses"]):
|
|
BlenderUtils.set_camera_at(cam_pose)
|
|
BlenderUtils.render_and_save(scene_dir, f"{i}", binocular_vision=self.binocular_vision)
|
|
BlenderUtils.save_cam_params(scene_dir, i, binocular_vision=self.binocular_vision)
|
|
self.set_progress("render frame", i, len(view_data["cam_poses"]))
|
|
self.set_progress("render frame", len(view_data["cam_poses"]), len(view_data["cam_poses"]))
|
|
BlenderUtils.save_scene_info(scene_dir, self.display_table_config, object_name)
|
|
depth_dir = os.path.join(scene_dir, "depth")
|
|
for depth_file in os.listdir(depth_dir):
|
|
if not depth_file.endswith(".png"):
|
|
name, _ = os.path.splitext(depth_file)
|
|
file_path = os.path.join(depth_dir, depth_file)
|
|
new_file_path = os.path.join(depth_dir, f"{name}.png")
|
|
os.rename(file_path,new_file_path)
|
|
return True
|
|
|
|
def simulate_scene(self, frame_limit=120, depth = 0, diag = 0):
|
|
bpy.context.view_layer.update()
|
|
bpy.ops.screen.animation_play()
|
|
previous_locations = {obj.name: obj.matrix_world.translation.copy() for obj in bpy.context.scene.objects if obj.rigid_body}
|
|
frame_count = 1
|
|
moving_objects = True
|
|
while frame_count < frame_limit:
|
|
bpy.context.view_layer.update()
|
|
if frame_count%10 == 0:
|
|
moving_objects = self.check_moving_objects(previous_locations)
|
|
if not moving_objects:
|
|
break
|
|
frame_count += 1
|
|
bpy.context.scene.frame_set(bpy.context.scene.frame_current + 1)
|
|
|
|
previous_locations = {obj.name: obj.matrix_world.translation.copy() for obj in bpy.context.scene.objects if obj.rigid_body}
|
|
|
|
bpy.ops.screen.animation_cancel(restore_frame=False)
|
|
|
|
msg = self.check_and_adjust_target()
|
|
|
|
if msg == "adjusted" and depth < 3:
|
|
bpy.context.view_layer.update()
|
|
bpy.context.scene.frame_set(0)
|
|
return self.simulate_scene(depth = depth + 1, diag=diag)
|
|
elif msg == "success":
|
|
print("Scene generation completed.")
|
|
result = self.start_render(diag=diag)
|
|
if not result:
|
|
msg = "fail"
|
|
return msg
|
|
return "retry"
|
|
|
|
def gen_scene_data(self, object_name):
|
|
|
|
bpy.context.scene.frame_set(0)
|
|
self.platform = self.generate_display_platform()
|
|
self.put_display_object(object_name)
|
|
diag = BlenderUtils.get_obj_diag(self.target_obj.name)
|
|
self.set_status("target_diagonal", diag)
|
|
if diag > self.max_diag or diag < self.min_diag:
|
|
self.add_log(f"The diagonal size of the object <{object_name}>(size: {round(diag,3)}) does not meet the requirements.", "error")
|
|
return "diag_error"
|
|
self.put_random_objects_on_table()
|
|
|
|
return self.simulate_scene(diag=diag)
|
|
|
|
|
|
def gen_all_scene_data(self):
|
|
max_retry_times = 3
|
|
total = len(self.obj_name_list)
|
|
count = 0
|
|
count_success = 0
|
|
self.set_progress("generate scene", 0, total)
|
|
result = "retry"
|
|
for target_obj_name in self.obj_name_list:
|
|
self.add_log(f"Generating scene for object <{target_obj_name}>", "info")
|
|
retry_times = 0
|
|
self.set_status("target_object", target_obj_name)
|
|
while retry_times < 3 and result == "retry":
|
|
self.reset()
|
|
try:
|
|
result = self.gen_scene_data(target_obj_name)
|
|
except Exception as e:
|
|
self.add_log(f"Uknown error: {e}", "error")
|
|
result = "unknown_error"
|
|
if result == "retry":
|
|
retry_times += 1
|
|
self.add_log(f"Maximum adjust times, retrying <{target_obj_name}>. ({retry_times}/{max_retry_times}) ", "warning")
|
|
count += 1
|
|
if result == "success":
|
|
count_success += 1
|
|
self.add_log(f"Scene for object <{target_obj_name}> generated successfully", "success")
|
|
if result == "retry" and retry_times >= max_retry_times:
|
|
self.add_log(f"Maximum retries, failed to generate scene for object <{target_obj_name}>", "error")
|
|
self.set_status("success", count_success)
|
|
self.set_status("fail", count - count_success)
|
|
self.set_progress("generate scene", count, total)
|
|
result = "retry"
|
|
|
|
|