import torch import time from torch import nn import PytorchBoot.namespace as namespace import PytorchBoot.stereotype as stereotype from PytorchBoot.factory.component_factory import ComponentFactory from PytorchBoot.utils import Log @stereotype.pipeline("nbv_reconstruction_pipeline") class NBVReconstructionPipeline(nn.Module): def __init__(self, config): super(NBVReconstructionPipeline, self).__init__() self.config = config self.module_config = config["modules"] self.pts_encoder = ComponentFactory.create( namespace.Stereotype.MODULE, self.module_config["pts_encoder"] ) self.pose_encoder = ComponentFactory.create( namespace.Stereotype.MODULE, self.module_config["pose_encoder"] ) self.seq_encoder = ComponentFactory.create( namespace.Stereotype.MODULE, self.module_config["seq_encoder"] ) self.view_finder = ComponentFactory.create( namespace.Stereotype.MODULE, self.module_config["view_finder"] ) self.eps = float(self.config["eps"]) def forward(self, data): mode = data["mode"] if mode == namespace.Mode.TRAIN: return self.forward_train(data) elif mode == namespace.Mode.TEST: return self.forward_test(data) else: Log.error("Unknown mode: {}".format(mode), True) def pertube_data(self, gt_delta_9d): bs = gt_delta_9d.shape[0] random_t = ( torch.rand(bs, device=gt_delta_9d.device) * (1.0 - self.eps) + self.eps ) random_t = random_t.unsqueeze(-1) mu, std = self.view_finder.marginal_prob(gt_delta_9d, random_t) std = std.view(-1, 1) z = torch.randn_like(gt_delta_9d) perturbed_x = mu + z * std target_score = -z * std / (std**2) return perturbed_x, random_t, target_score, std def forward_train(self, data): main_feat = self.get_main_feat(data) """ get std """ best_to_world_pose_9d_batch = data["best_to_world_pose_9d"] perturbed_x, random_t, target_score, std = self.pertube_data( best_to_world_pose_9d_batch ) input_data = { "sampled_pose": perturbed_x, "t": random_t, "main_feat": main_feat, } estimated_score = self.view_finder(input_data) output = { "estimated_score": estimated_score, "target_score": target_score, "std": std, } return output def forward_test(self, data): main_feat = self.get_main_feat(data) repeat_num = data.get("repeat_num", 1) main_feat = main_feat.repeat(repeat_num, 1) estimated_delta_rot_9d, in_process_sample = self.view_finder.next_best_view( main_feat ) result = { "pred_pose_9d": estimated_delta_rot_9d, "in_process_sample": in_process_sample, } return result def get_main_feat(self, data): scanned_n_to_world_pose_9d_batch = data[ "scanned_n_to_world_pose_9d" ] # List(B): Tensor(S x 9) scanned_pts_mask_batch = data["scanned_pts_mask"] # List(B): Tensor(S x N) scanned_pts_mask_batch = data["scanned_pts_mask"] # List(B): Tensor(N) device = next(self.parameters()).device embedding_list_batch = [] combined_scanned_pts_batch = data["combined_scanned_pts"] # Tensor(B x N x 3) global_scanned_feat, per_point_feat_batch = self.pts_encoder.encode_points( combined_scanned_pts_batch, require_per_point_feat=True ) # global_scanned_feat: Tensor(B x Dg) batch_size = len(scanned_n_to_world_pose_9d_batch) for i in range(batch_size): seq_len = len(scanned_n_to_world_pose_9d_batch[i]) scanned_n_to_world_pose_9d = scanned_n_to_world_pose_9d_batch[i].to(device) # Tensor(S x 9) scanned_pts_mask = scanned_pts_mask_batch[i] # Tensor(S x N) per_point_feat = per_point_feat_batch[i] # Tensor(N x Dp) partial_point_feat_seq = [] for j in range(seq_len): partial_per_point_feat = per_point_feat[scanned_pts_mask[j]] if partial_per_point_feat.shape[0] == 0: partial_point_feat = torch.zeros(per_point_feat.shape[1], device=device) else: partial_point_feat = torch.mean(partial_per_point_feat, dim=0) # Tensor(Dp) partial_point_feat_seq.append(partial_point_feat) partial_point_feat_seq = torch.stack(partial_point_feat_seq, dim=0) # Tensor(S x Dp) pose_feat_seq = self.pose_encoder.encode_pose(scanned_n_to_world_pose_9d) # Tensor(S x Dp) seq_embedding = torch.cat([partial_point_feat_seq, pose_feat_seq], dim=-1) embedding_list_batch.append(seq_embedding) # List(B): Tensor(S x (Dp)) seq_feat = self.seq_encoder.encode_sequence(embedding_list_batch) # Tensor(B x Ds) main_feat = torch.cat([seq_feat, global_scanned_feat], dim=-1) # Tensor(B x (Ds+Dg)) if torch.isnan(main_feat).any(): for i in range(len(main_feat)): if torch.isnan(main_feat[i]).any(): scanned_pts_mask = scanned_pts_mask_batch[i] Log.info(f"scanned_pts_mask shape: {scanned_pts_mask.shape}") Log.info(f"scanned_pts_mask sum: {scanned_pts_mask.sum()}") import ipdb ipdb.set_trace() Log.error("nan in main_feat", True) return main_feat