Single view baselines

This commit is contained in:
Michel Breyer 2021-08-25 18:29:10 +02:00
parent fc8d9a8791
commit 5501c2ae42
6 changed files with 89 additions and 207 deletions

View File

@ -1,10 +1,6 @@
from .policy import register
from .baselines import *
from .nbv import *
register("single-view", SingleView)
register("top", TopView)
register("random", RandomView)
register("fixed-trajectory", FixedTrajectory)
register("alignment", AlignmentView)
register("nbv", NextBestView)
register("initial-view", InitialView)
register("front-view", FrontView)
register("top-view", TopView)

View File

@ -1,123 +1,32 @@
import numpy as np
import scipy.interpolate
import rospy
from .policy import BasePolicy
from .policy import SingleViewPolicy
from vgn.utils import look_at
class SingleView(BasePolicy):
"""
Process a single image from the initial viewpoint.
"""
class InitialView(SingleViewPolicy):
def update(self, img, extrinsic):
self.integrate_img(img, extrinsic)
self.best_grasp = self.compute_best_grasp()
self.done = True
self.target = extrinsic
super().update(img, extrinsic)
class TopView(BasePolicy):
"""
Move the camera to a top-down view of the target object.
"""
class FrontView(SingleViewPolicy):
def activate(self, bbox):
super().activate(bbox)
eye = np.r_[self.center[:2], self.center[2] + 0.3]
l, theta = 0.25, np.deg2rad(30)
eye = np.r_[
self.center[0] - l * np.sin(theta),
self.center[1],
self.center[2] + l * np.cos(theta),
]
up = np.r_[1.0, 0.0, 0.0]
self.target = look_at(eye, self.center, up)
def update(self, img, extrinsic):
self.integrate_img(img, extrinsic)
error = extrinsic.translation - self.target.translation
if np.linalg.norm(error) < 0.01:
self.best_grasp = self.compute_best_grasp()
self.done = True
return self.target
class RandomView(BasePolicy):
"""
Move the camera to a random viewpoint on a circle centered above the target.
"""
def __init__(self, rate, filter_grasps):
super().__init__(rate, filter_grasps)
self.r = 0.06 # radius of the circle
self.h = 0.3 # distance above bbox center
class TopView(SingleViewPolicy):
def activate(self, bbox):
super().activate(bbox)
t = np.random.uniform(np.pi, 3.0 * np.pi)
eye = self.center + np.r_[self.r * np.cos(t), self.r * np.sin(t), self.h]
eye = np.r_[self.center[:2], self.center[2] + 0.25]
up = np.r_[1.0, 0.0, 0.0]
self.target = look_at(eye, self.center, up)
def update(self, img, extrinsic):
self.integrate_img(img, extrinsic)
error = extrinsic.translation - self.target.translation
if np.linalg.norm(error) < 0.01:
self.best_grasp = self.compute_best_grasp()
self.done = True
return self.target
class FixedTrajectory(BasePolicy):
"""
Follow a pre-defined circular trajectory centered above the target object.
"""
def __init__(self, rate, filter_grasps):
super().__init__(rate, filter_grasps)
self.r = 0.08
self.h = 0.3
self.duration = 6.0
self.m = scipy.interpolate.interp1d([0, self.duration], [np.pi, 3.0 * np.pi])
def activate(self, bbox):
super().activate(bbox)
self.tic = rospy.Time.now()
def update(self, img, extrinsic):
self.integrate_img(img, extrinsic)
elapsed_time = (rospy.Time.now() - self.tic).to_sec()
if elapsed_time > self.duration:
self.best_grasp = self.compute_best_grasp()
self.done = True
else:
t = self.m(elapsed_time)
eye = self.center + np.r_[self.r * np.cos(t), self.r * np.sin(t), self.h]
up = np.r_[1.0, 0.0, 0.0]
target = look_at(eye, self.center, up)
return target
class AlignmentView(BasePolicy):
"""
Align the camera with an initial grasp prediction as proposed in (Gualtieri, 2017).
"""
def activate(self, bbox):
super().activate(bbox)
self.target = None
def update(self, img, extrinsic):
self.integrate_img(img, extrinsic)
if not self.target:
grasp = self.compute_best_grasp()
if not grasp:
self.done = True
return
R, t = grasp.pose.rotation, grasp.pose.translation
eye = R.apply([0.0, 0.0, -0.16]) + t
center = t
up = np.r_[1.0, 0.0, 0.0]
self.target = look_at(eye, center, up)
error = extrinsic.translation - self.target.translation
if np.linalg.norm(error) < 0.01:
self.best_grasp = self.compute_best_grasp()
self.done = True
return self.target

View File

@ -117,11 +117,11 @@ class GraspController:
return T_base_grasp
def collect_info(self, result):
points = [p.translation for p in self.policy.viewpoints]
points = [p.translation for p in self.policy.views]
d = np.sum([np.linalg.norm(p2 - p1) for p1, p2 in zip(points, points[1:])])
info = {
"result": result,
"viewpoint_count": len(points),
"view_count": len(points),
"distance": d,
}
info.update(Timer.timers)

View File

@ -1,4 +1,5 @@
import numpy as np
from numpy.core.fromnumeric import sort
from sensor_msgs.msg import CameraInfo
from pathlib import Path
import rospy
@ -12,17 +13,8 @@ from vgn.utils import *
class Policy:
def activate(self, bbox):
raise NotImplementedError
def update(self, img, extrinsic):
raise NotImplementedError
class BasePolicy(Policy):
def __init__(self, rate=5, filter_grasps=False):
def __init__(self, rate=5):
self.rate = rate
self.filter_grasps = filter_grasps
self.load_parameters()
self.init_visualizer()
@ -32,8 +24,6 @@ class BasePolicy(Policy):
info_topic = rospy.get_param("active_grasp/camera/info_topic")
msg = rospy.wait_for_message(info_topic, CameraInfo, rospy.Duration(2.0))
self.intrinsic = from_camera_info_msg(msg)
self.vgn = VGN(Path(rospy.get_param("vgn/model")))
self.score_fn = lambda g: g.pose.translation[2] # TODO
def init_visualizer(self):
self.vis = Visualizer()
@ -41,91 +31,74 @@ class BasePolicy(Policy):
def activate(self, bbox):
self.bbox = bbox
self.center = 0.5 * (bbox.min + bbox.max)
self.T_base_task = Transform.translation(self.center - np.full(3, 0.15))
self.T_task_base = self.T_base_task.inv()
tf.broadcast(self.T_base_task, self.base_frame, self.task_frame)
rospy.sleep(1.0) # wait for the transform to be published
N, self.T = 40, 10
grid_shape = (N,) * 3
self.tsdf = UniformTSDFVolume(0.3, N)
self.qual_hist = np.zeros((self.T,) + grid_shape, np.float32)
self.rot_hist = np.zeros((self.T, 4) + grid_shape, np.float32)
self.width_hist = np.zeros((self.T,) + grid_shape, np.float32)
self.viewpoints = []
self.done = False
self.best_grasp = None
self.calibrate_task_frame()
self.vis.clear()
self.vis.bbox(self.base_frame, bbox)
def integrate_img(self, img, extrinsic):
self.viewpoints.append(extrinsic.inv())
self.tsdf.integrate(img, self.intrinsic, extrinsic * self.T_base_task)
tsdf_grid, voxel_size = self.tsdf.get_grid(), self.tsdf.voxel_size
self.tsdf = UniformTSDFVolume(0.3, 40)
self.vgn = VGN(Path(rospy.get_param("vgn/model")))
if self.filter_grasps:
out = self.vgn.predict(self.tsdf.get_grid())
t = (len(self.viewpoints) - 1) % self.T
self.qual_hist[t, ...] = out.qual
self.rot_hist[t, ...] = out.rot
self.width_hist[t, ...] = out.width
self.best_grasp = None
self.done = False
mean_qual = self.compute_mean_quality()
self.vis.quality(self.task_frame, voxel_size, mean_qual)
def calibrate_task_frame(self):
self.center = 0.5 * (self.bbox.min + self.bbox.max)
self.T_base_task = Transform.translation(self.center - np.full(3, 0.15))
self.T_task_base = self.T_base_task.inv()
tf.broadcast(self.T_base_task, self.base_frame, self.task_frame)
rospy.sleep(0.1)
self.vis.scene_cloud(self.task_frame, self.tsdf.get_scene_cloud())
self.vis.map_cloud(self.task_frame, self.tsdf.get_map_cloud())
self.vis.path(self.base_frame, self.viewpoints)
def sort_grasps(self, in_grasps):
grasps, scores = [], []
def compute_best_grasp(self):
if self.filter_grasps:
qual = self.compute_mean_quality()
index_list = select_local_maxima(qual, 0.9, 3)
grasps = [g for i in index_list if (g := self.select_mean_at(i))]
else:
out = self.vgn.predict(self.tsdf.get_grid())
qual = out.qual
index_list = select_local_maxima(qual, 0.9, 3)
grasps = [select_at(out, i) for i in index_list]
grasps = [from_voxel_coordinates(g, self.tsdf.voxel_size) for g in grasps]
grasps = self.transform_and_reject(grasps)
grasps = sort_grasps(grasps, self.score_fn)
self.vis.quality(self.task_frame, self.tsdf.voxel_size, qual)
self.vis.grasps(self.base_frame, grasps)
return grasps[0] if len(grasps) > 0 else None
def compute_mean_quality(self):
qual = np.mean(self.qual_hist, axis=0, where=self.qual_hist > 0.0)
return np.nan_to_num(qual, copy=False) # mean of empty slices returns nan
def select_mean_at(self, index):
i, j, k = index
ts = np.flatnonzero(self.qual_hist[:, i, j, k])
if len(ts) < 3:
return
ori = Rotation.from_quat([self.rot_hist[t, :, i, j, k] for t in ts])
pos = np.array([i, j, k], dtype=np.float64)
width = self.width_hist[ts, i, j, k].mean()
qual = self.qual_hist[ts, i, j, k].mean()
return Grasp(Transform(ori.mean(), pos), width, qual)
def transform_and_reject(self, grasps):
result = []
for grasp in grasps:
for grasp in in_grasps:
pose = self.T_base_task * grasp.pose
tip = pose.rotation.apply([0, 0, 0.05]) + pose.translation
if self.bbox.is_inside(tip):
grasp.pose = pose
result.append(grasp)
return result
grasps.append(grasp)
scores.append(self.score_fn(grasp))
grasps, scores = np.asarray(grasps), np.asarray(scores)
indices = np.argsort(-scores)
return grasps[indices], scores[indices]
def score_fn(self, grasp):
# return grasp.quality
return grasp.pose.translation[2]
def update(sekf, img, extrinsic):
raise NotImplementedError
class SingleViewPolicy(Policy):
"""Plan grasps from a single view of the target object."""
def update(self, img, extrinsic):
error = extrinsic.translation - self.target.translation
if np.linalg.norm(error) < 0.01:
self.views = [extrinsic.inv()]
self.tsdf.integrate(img, self.intrinsic, extrinsic * self.T_base_task)
tsdf_grid, voxel_size = self.tsdf.get_grid(), self.tsdf.voxel_size
self.vis.scene_cloud(self.task_frame, self.tsdf.get_scene_cloud())
self.vis.map_cloud(self.task_frame, self.tsdf.get_map_cloud())
out = self.vgn.predict(tsdf_grid)
self.vis.quality(self.task_frame, voxel_size, out.qual)
grasps = select_grid(voxel_size, out, threshold=0.95)
grasps, scores = self.sort_grasps(grasps)
self.vis.grasps(self.base_frame, grasps, scores)
rospy.sleep(1.0)
self.best_grasp = grasps[0] if len(grasps) > 0 else None
self.done = True
else:
return self.target
registry = {}

View File

@ -33,6 +33,7 @@ class Visualizer:
self.scene_cloud_pub.publish(msg)
self.map_cloud_pub.publish(msg)
self.quality_pub.publish(msg)
rospy.sleep(0.1)
def draw(self, markers):
self.marker_pub.publish(MarkerArray(markers=markers))
@ -44,10 +45,14 @@ class Visualizer:
marker = create_cube_marker(frame, pose, scale, color, ns="bbox")
self.draw([marker])
def grasps(self, frame, grasps):
def grasps(self, frame, grasps, scores):
if len(grasps) == 0:
return
smin, smax = min(scores), max(scores)
markers = []
for i, grasp in enumerate(grasps):
color = [1.0, 0.0, 0.0] # TODO choose color based on score
for i, (grasp, score) in enumerate(zip(grasps, scores)):
color = cmap((score - smin) / (smax - smin))
markers += create_grasp_markers(frame, grasp, color, "grasps", 4 * i)
self.draw(markers)
@ -103,7 +108,8 @@ class Visualizer:
self.draw([marker])
def quality(self, frame, voxel_size, quality):
points, values = grid_to_map_cloud(voxel_size, quality, threshold=0.8)
points, values = grid_to_map_cloud(voxel_size, quality, threshold=0.9)
values = (values - 0.9) / (1.0 - 0.9) # to increase contrast
msg = to_cloud_msg(frame, points, intensities=values)
self.quality_pub.publish(msg)

View File

@ -16,7 +16,7 @@ def main():
parser = create_parser()
args = parser.parse_args()
policy = make(args.policy, args.rate, args.filter_grasps)
policy = make(args.policy, args.rate)
controller = GraspController(policy)
logger = Logger(args)
@ -33,7 +33,6 @@ def create_parser():
parser.add_argument("--runs", type=int, default=10)
parser.add_argument("--logdir", type=Path, default="logs")
parser.add_argument("--rate", type=int, default=5)
parser.add_argument("--filter-grasps", action="store_true")
parser.add_argument("--seed", type=int, default=12)
return parser
@ -41,11 +40,10 @@ def create_parser():
class Logger:
def __init__(self, args):
stamp = datetime.now().strftime("%y%m%d-%H%M%S")
name = "{}_policy={},rate={},filter-grasps={},seed={}.csv".format(
name = "{}_policy={},rate={},seed={}.csv".format(
stamp,
args.policy,
args.rate,
args.filter_grasps,
args.seed,
)
self.path = args.logdir / name