diff --git a/active_grasp/__init__.py b/active_grasp/__init__.py index 35334ab..f13612f 100644 --- a/active_grasp/__init__.py +++ b/active_grasp/__init__.py @@ -1,10 +1,6 @@ from .policy import register from .baselines import * -from .nbv import * -register("single-view", SingleView) -register("top", TopView) -register("random", RandomView) -register("fixed-trajectory", FixedTrajectory) -register("alignment", AlignmentView) -register("nbv", NextBestView) +register("initial-view", InitialView) +register("front-view", FrontView) +register("top-view", TopView) diff --git a/active_grasp/baselines.py b/active_grasp/baselines.py index 81a7f07..033bd92 100644 --- a/active_grasp/baselines.py +++ b/active_grasp/baselines.py @@ -1,123 +1,32 @@ import numpy as np -import scipy.interpolate -import rospy -from .policy import BasePolicy + +from .policy import SingleViewPolicy from vgn.utils import look_at -class SingleView(BasePolicy): - """ - Process a single image from the initial viewpoint. - """ - +class InitialView(SingleViewPolicy): def update(self, img, extrinsic): - self.integrate_img(img, extrinsic) - self.best_grasp = self.compute_best_grasp() - self.done = True + self.target = extrinsic + super().update(img, extrinsic) -class TopView(BasePolicy): - """ - Move the camera to a top-down view of the target object. - """ - +class FrontView(SingleViewPolicy): def activate(self, bbox): super().activate(bbox) - eye = np.r_[self.center[:2], self.center[2] + 0.3] + l, theta = 0.25, np.deg2rad(30) + eye = np.r_[ + self.center[0] - l * np.sin(theta), + self.center[1], + self.center[2] + l * np.cos(theta), + ] up = np.r_[1.0, 0.0, 0.0] self.target = look_at(eye, self.center, up) - def update(self, img, extrinsic): - self.integrate_img(img, extrinsic) - error = extrinsic.translation - self.target.translation - if np.linalg.norm(error) < 0.01: - self.best_grasp = self.compute_best_grasp() - self.done = True - return self.target - - -class RandomView(BasePolicy): - """ - Move the camera to a random viewpoint on a circle centered above the target. - """ - - def __init__(self, rate, filter_grasps): - super().__init__(rate, filter_grasps) - self.r = 0.06 # radius of the circle - self.h = 0.3 # distance above bbox center +class TopView(SingleViewPolicy): def activate(self, bbox): super().activate(bbox) - t = np.random.uniform(np.pi, 3.0 * np.pi) - eye = self.center + np.r_[self.r * np.cos(t), self.r * np.sin(t), self.h] + eye = np.r_[self.center[:2], self.center[2] + 0.25] up = np.r_[1.0, 0.0, 0.0] self.target = look_at(eye, self.center, up) - - def update(self, img, extrinsic): - self.integrate_img(img, extrinsic) - error = extrinsic.translation - self.target.translation - if np.linalg.norm(error) < 0.01: - self.best_grasp = self.compute_best_grasp() - self.done = True - return self.target - - -class FixedTrajectory(BasePolicy): - """ - Follow a pre-defined circular trajectory centered above the target object. - """ - - def __init__(self, rate, filter_grasps): - super().__init__(rate, filter_grasps) - self.r = 0.08 - self.h = 0.3 - self.duration = 6.0 - self.m = scipy.interpolate.interp1d([0, self.duration], [np.pi, 3.0 * np.pi]) - - def activate(self, bbox): - super().activate(bbox) - self.tic = rospy.Time.now() - - def update(self, img, extrinsic): - self.integrate_img(img, extrinsic) - elapsed_time = (rospy.Time.now() - self.tic).to_sec() - if elapsed_time > self.duration: - self.best_grasp = self.compute_best_grasp() - self.done = True - else: - t = self.m(elapsed_time) - eye = self.center + np.r_[self.r * np.cos(t), self.r * np.sin(t), self.h] - up = np.r_[1.0, 0.0, 0.0] - target = look_at(eye, self.center, up) - return target - - -class AlignmentView(BasePolicy): - """ - Align the camera with an initial grasp prediction as proposed in (Gualtieri, 2017). - """ - - def activate(self, bbox): - super().activate(bbox) - self.target = None - - def update(self, img, extrinsic): - self.integrate_img(img, extrinsic) - - if not self.target: - grasp = self.compute_best_grasp() - if not grasp: - self.done = True - return - R, t = grasp.pose.rotation, grasp.pose.translation - eye = R.apply([0.0, 0.0, -0.16]) + t - center = t - up = np.r_[1.0, 0.0, 0.0] - self.target = look_at(eye, center, up) - - error = extrinsic.translation - self.target.translation - if np.linalg.norm(error) < 0.01: - self.best_grasp = self.compute_best_grasp() - self.done = True - return self.target diff --git a/active_grasp/controller.py b/active_grasp/controller.py index dd8a80a..42a7dc1 100644 --- a/active_grasp/controller.py +++ b/active_grasp/controller.py @@ -117,11 +117,11 @@ class GraspController: return T_base_grasp def collect_info(self, result): - points = [p.translation for p in self.policy.viewpoints] + points = [p.translation for p in self.policy.views] d = np.sum([np.linalg.norm(p2 - p1) for p1, p2 in zip(points, points[1:])]) info = { "result": result, - "viewpoint_count": len(points), + "view_count": len(points), "distance": d, } info.update(Timer.timers) diff --git a/active_grasp/policy.py b/active_grasp/policy.py index b0b16e7..acbf1c3 100644 --- a/active_grasp/policy.py +++ b/active_grasp/policy.py @@ -1,4 +1,5 @@ import numpy as np +from numpy.core.fromnumeric import sort from sensor_msgs.msg import CameraInfo from pathlib import Path import rospy @@ -12,17 +13,8 @@ from vgn.utils import * class Policy: - def activate(self, bbox): - raise NotImplementedError - - def update(self, img, extrinsic): - raise NotImplementedError - - -class BasePolicy(Policy): - def __init__(self, rate=5, filter_grasps=False): + def __init__(self, rate=5): self.rate = rate - self.filter_grasps = filter_grasps self.load_parameters() self.init_visualizer() @@ -32,8 +24,6 @@ class BasePolicy(Policy): info_topic = rospy.get_param("active_grasp/camera/info_topic") msg = rospy.wait_for_message(info_topic, CameraInfo, rospy.Duration(2.0)) self.intrinsic = from_camera_info_msg(msg) - self.vgn = VGN(Path(rospy.get_param("vgn/model"))) - self.score_fn = lambda g: g.pose.translation[2] # TODO def init_visualizer(self): self.vis = Visualizer() @@ -41,91 +31,74 @@ class BasePolicy(Policy): def activate(self, bbox): self.bbox = bbox - self.center = 0.5 * (bbox.min + bbox.max) - self.T_base_task = Transform.translation(self.center - np.full(3, 0.15)) - self.T_task_base = self.T_base_task.inv() - tf.broadcast(self.T_base_task, self.base_frame, self.task_frame) - rospy.sleep(1.0) # wait for the transform to be published - - N, self.T = 40, 10 - grid_shape = (N,) * 3 - - self.tsdf = UniformTSDFVolume(0.3, N) - - self.qual_hist = np.zeros((self.T,) + grid_shape, np.float32) - self.rot_hist = np.zeros((self.T, 4) + grid_shape, np.float32) - self.width_hist = np.zeros((self.T,) + grid_shape, np.float32) - - self.viewpoints = [] - self.done = False - self.best_grasp = None + self.calibrate_task_frame() self.vis.clear() self.vis.bbox(self.base_frame, bbox) - def integrate_img(self, img, extrinsic): - self.viewpoints.append(extrinsic.inv()) - self.tsdf.integrate(img, self.intrinsic, extrinsic * self.T_base_task) - tsdf_grid, voxel_size = self.tsdf.get_grid(), self.tsdf.voxel_size + self.tsdf = UniformTSDFVolume(0.3, 40) + self.vgn = VGN(Path(rospy.get_param("vgn/model"))) - if self.filter_grasps: - out = self.vgn.predict(self.tsdf.get_grid()) - t = (len(self.viewpoints) - 1) % self.T - self.qual_hist[t, ...] = out.qual - self.rot_hist[t, ...] = out.rot - self.width_hist[t, ...] = out.width + self.best_grasp = None + self.done = False - mean_qual = self.compute_mean_quality() - self.vis.quality(self.task_frame, voxel_size, mean_qual) + def calibrate_task_frame(self): + self.center = 0.5 * (self.bbox.min + self.bbox.max) + self.T_base_task = Transform.translation(self.center - np.full(3, 0.15)) + self.T_task_base = self.T_base_task.inv() + tf.broadcast(self.T_base_task, self.base_frame, self.task_frame) + rospy.sleep(0.1) - self.vis.scene_cloud(self.task_frame, self.tsdf.get_scene_cloud()) - self.vis.map_cloud(self.task_frame, self.tsdf.get_map_cloud()) - self.vis.path(self.base_frame, self.viewpoints) + def sort_grasps(self, in_grasps): + grasps, scores = [], [] - def compute_best_grasp(self): - if self.filter_grasps: - qual = self.compute_mean_quality() - index_list = select_local_maxima(qual, 0.9, 3) - grasps = [g for i in index_list if (g := self.select_mean_at(i))] - else: - out = self.vgn.predict(self.tsdf.get_grid()) - qual = out.qual - index_list = select_local_maxima(qual, 0.9, 3) - grasps = [select_at(out, i) for i in index_list] - - grasps = [from_voxel_coordinates(g, self.tsdf.voxel_size) for g in grasps] - grasps = self.transform_and_reject(grasps) - grasps = sort_grasps(grasps, self.score_fn) - - self.vis.quality(self.task_frame, self.tsdf.voxel_size, qual) - self.vis.grasps(self.base_frame, grasps) - - return grasps[0] if len(grasps) > 0 else None - - def compute_mean_quality(self): - qual = np.mean(self.qual_hist, axis=0, where=self.qual_hist > 0.0) - return np.nan_to_num(qual, copy=False) # mean of empty slices returns nan - - def select_mean_at(self, index): - i, j, k = index - ts = np.flatnonzero(self.qual_hist[:, i, j, k]) - if len(ts) < 3: - return - ori = Rotation.from_quat([self.rot_hist[t, :, i, j, k] for t in ts]) - pos = np.array([i, j, k], dtype=np.float64) - width = self.width_hist[ts, i, j, k].mean() - qual = self.qual_hist[ts, i, j, k].mean() - return Grasp(Transform(ori.mean(), pos), width, qual) - - def transform_and_reject(self, grasps): - result = [] - for grasp in grasps: + for grasp in in_grasps: pose = self.T_base_task * grasp.pose tip = pose.rotation.apply([0, 0, 0.05]) + pose.translation if self.bbox.is_inside(tip): grasp.pose = pose - result.append(grasp) - return result + grasps.append(grasp) + scores.append(self.score_fn(grasp)) + + grasps, scores = np.asarray(grasps), np.asarray(scores) + indices = np.argsort(-scores) + return grasps[indices], scores[indices] + + def score_fn(self, grasp): + # return grasp.quality + return grasp.pose.translation[2] + + def update(sekf, img, extrinsic): + raise NotImplementedError + + +class SingleViewPolicy(Policy): + """Plan grasps from a single view of the target object.""" + + def update(self, img, extrinsic): + error = extrinsic.translation - self.target.translation + + if np.linalg.norm(error) < 0.01: + self.views = [extrinsic.inv()] + + self.tsdf.integrate(img, self.intrinsic, extrinsic * self.T_base_task) + tsdf_grid, voxel_size = self.tsdf.get_grid(), self.tsdf.voxel_size + self.vis.scene_cloud(self.task_frame, self.tsdf.get_scene_cloud()) + self.vis.map_cloud(self.task_frame, self.tsdf.get_map_cloud()) + + out = self.vgn.predict(tsdf_grid) + self.vis.quality(self.task_frame, voxel_size, out.qual) + + grasps = select_grid(voxel_size, out, threshold=0.95) + grasps, scores = self.sort_grasps(grasps) + + self.vis.grasps(self.base_frame, grasps, scores) + rospy.sleep(1.0) + + self.best_grasp = grasps[0] if len(grasps) > 0 else None + self.done = True + else: + return self.target registry = {} diff --git a/active_grasp/visualization.py b/active_grasp/visualization.py index 098d4b5..e5cd70f 100644 --- a/active_grasp/visualization.py +++ b/active_grasp/visualization.py @@ -33,6 +33,7 @@ class Visualizer: self.scene_cloud_pub.publish(msg) self.map_cloud_pub.publish(msg) self.quality_pub.publish(msg) + rospy.sleep(0.1) def draw(self, markers): self.marker_pub.publish(MarkerArray(markers=markers)) @@ -44,10 +45,14 @@ class Visualizer: marker = create_cube_marker(frame, pose, scale, color, ns="bbox") self.draw([marker]) - def grasps(self, frame, grasps): + def grasps(self, frame, grasps, scores): + if len(grasps) == 0: + return + + smin, smax = min(scores), max(scores) markers = [] - for i, grasp in enumerate(grasps): - color = [1.0, 0.0, 0.0] # TODO choose color based on score + for i, (grasp, score) in enumerate(zip(grasps, scores)): + color = cmap((score - smin) / (smax - smin)) markers += create_grasp_markers(frame, grasp, color, "grasps", 4 * i) self.draw(markers) @@ -103,7 +108,8 @@ class Visualizer: self.draw([marker]) def quality(self, frame, voxel_size, quality): - points, values = grid_to_map_cloud(voxel_size, quality, threshold=0.8) + points, values = grid_to_map_cloud(voxel_size, quality, threshold=0.9) + values = (values - 0.9) / (1.0 - 0.9) # to increase contrast msg = to_cloud_msg(frame, points, intensities=values) self.quality_pub.publish(msg) diff --git a/scripts/run.py b/scripts/run.py index 0af15b3..f230bf8 100644 --- a/scripts/run.py +++ b/scripts/run.py @@ -16,7 +16,7 @@ def main(): parser = create_parser() args = parser.parse_args() - policy = make(args.policy, args.rate, args.filter_grasps) + policy = make(args.policy, args.rate) controller = GraspController(policy) logger = Logger(args) @@ -33,7 +33,6 @@ def create_parser(): parser.add_argument("--runs", type=int, default=10) parser.add_argument("--logdir", type=Path, default="logs") parser.add_argument("--rate", type=int, default=5) - parser.add_argument("--filter-grasps", action="store_true") parser.add_argument("--seed", type=int, default=12) return parser @@ -41,11 +40,10 @@ def create_parser(): class Logger: def __init__(self, args): stamp = datetime.now().strftime("%y%m%d-%H%M%S") - name = "{}_policy={},rate={},filter-grasps={},seed={}.csv".format( + name = "{}_policy={},rate={},seed={}.csv".format( stamp, args.policy, args.rate, - args.filter_grasps, args.seed, ) self.path = args.logdir / name