Previous
Measure depth
You have a vision service detecting or classifying objects, but you need your machine to respond automatically – stop an arm when a person is nearby, sort items by color, or trigger an action when an anomaly appears. This guide shows you how to build a module that reads vision results and controls other resources based on what it sees.
The most common approach is to create a module that wraps an existing resource. The wrapper intercepts API calls, checks vision results, and decides whether to pass the call through or block it.
For example, a “safe arm” module wraps a real arm. When your code calls move_to_position, the wrapper first checks the vision service for people in the frame. If no one is detected, it passes the command to the real arm. If a person is detected, it raises an error.
This pattern works with any resource type: arms, bases, motors, or even other services.
Your module must implement a resource API. Pick the type that matches what you are controlling:
| Scenario | Resource type |
|---|---|
| Gate movement commands based on vision | The component being gated (arm, base, motor) |
| Classify images with custom logic | Vision service |
| Trigger actions across multiple resources | Generic service |
The choice determines which API methods you must implement. A wrapper around an arm implements the arm API. A standalone logic service might implement the generic service API.
Your module needs access to the vision service, a camera, and whatever resource it controls. Viam’s dependency system handles this: you declare required dependencies in validate_config, and viam-server ensures they are available before your module starts.
Install the Viam CLI and generate a module template. Replace <ORGANIZATION-ID> with your organization ID.
viam module generate --language python --model-name safe-arm \
--name my-vision-module --public-namespace <ORGANIZATION-ID> --public
The CLI creates a project directory with the files you need. The only file you need to modify is the model file in src/models/.
Open the generated model file (for example, src/models/safe-arm.py) and add imports for the vision service and any resources you will control:
from typing import cast
from viam.services.vision import *
Import additional resource types as needed. For the safe arm example:
from viam.components.arm import Arm
The validate_config method parses your module’s configuration and returns a list of required dependencies. This tells viam-server to wait until all dependencies are available before starting your module.
Your module needs at minimum a camera name and a vision service name:
@classmethod
def validate_config(
cls, config: ComponentConfig
) -> Tuple[Sequence[str], Sequence[str]]:
req_deps = []
fields = config.attributes.fields
if "camera_name" not in fields:
raise Exception("missing required camera_name attribute")
elif not fields["camera_name"].HasField("string_value"):
raise Exception("camera_name must be a string")
camera_name = fields["camera_name"].string_value
if not camera_name:
raise ValueError("camera_name cannot be empty")
req_deps.append(camera_name)
if "vision_name" not in fields:
raise Exception("missing required vision_name attribute")
elif not fields["vision_name"].HasField("string_value"):
raise Exception("vision_name must be a string")
vision_name = fields["vision_name"].string_value
if not vision_name:
raise ValueError("vision_name cannot be empty")
req_deps.append(vision_name)
return req_deps, []
Add validation for any other resources your module wraps. For the safe arm, add arm_name to the required dependencies the same way.
The reconfigure method runs when the module starts and whenever configuration changes. Use it to get references to your dependencies:
def reconfigure(
self, config: ComponentConfig,
dependencies: Mapping[ResourceName, ResourceBase]
):
camera_name = config.attributes.fields["camera_name"].string_value
vision_name = config.attributes.fields["vision_name"].string_value
vision_resource_name = VisionClient.get_resource_name(vision_name)
if vision_resource_name not in dependencies:
raise KeyError(f"Vision service '{vision_name}' not found in "
f"dependencies. Available: "
f"{list(dependencies.keys())}")
self.vision_service = cast(VisionClient,
dependencies[vision_resource_name])
self.camera_name = camera_name
return super().reconfigure(config, dependencies)
For the safe arm, also initialize the arm reference:
arm_name = config.attributes.fields["arm_name"].string_value
arm_resource_name = Arm.get_resource_name(arm_name)
self.arm = cast(Arm, dependencies[arm_resource_name])
Create a helper method that queries the vision service and returns a decision:
async def _is_safe(self):
detections = await self.vision_service.get_detections_from_camera(
self.camera_name)
for d in detections:
if d.confidence > 0.4 and d.class_name == "Person":
self.logger.warn(
f"Detected {d.class_name} "
f"with confidence {d.confidence}.")
return False
return True
async def _is_safe(self):
classifications = (
await self.vision_service.get_classifications_from_camera(
self.camera_name, 4))
for c in classifications:
if c.confidence > 0.6 and c.class_name == "UNSAFE":
self.logger.warn(
f"Classification {c.class_name} "
f"with confidence {c.confidence}.")
return False
return True
Override the API methods where you want vision-based gating. For the safe arm:
async def move_to_position(
self,
pose: Pose,
*,
extra: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs
):
if await self._is_safe():
await self.arm.move_to_position(
pose, extra=extra, timeout=timeout)
else:
raise ValueError(
"Person detected. Safe arm will not move.")
async def move_to_joint_positions(
self,
positions: JointPositions,
*,
extra: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs
):
if await self._is_safe():
await self.arm.move_to_joint_positions(
positions, extra=extra, timeout=timeout)
else:
raise ValueError(
"Person detected. Safe arm will not move.")
Pass through all other methods to the underlying resource:
async def do_command(
self,
command: Mapping[str, ValueTypes],
*,
timeout: Optional[float] = None,
**kwargs
) -> Mapping[str, ValueTypes]:
return await self.arm.do_command(
command, timeout, **kwargs)
Configure the module as a local module on your machine:
run.sh (for example, /home/user/my-vision-module/run.sh).Add your resource as a local component:
Click +, select Local module, then Local component.
Fill in:
meta.jsonarm)safe-arm-1)Add the configuration attributes:
{
"camera_name": "my-camera",
"vision_name": "my-detector",
"arm_name": "my-arm"
}
Save and use the TEST panel to verify behavior.
Once your module works locally:
If your module wraps another resource, update any services or processes that reference the original. For example, if motion planning used my-arm, update it to use safe-arm-1 so all movement commands go through the vision check.
_is_safe and observe how it affects sensitivity.Was this page helpful?
Glad to hear it! If you have any other feedback please let us know:
We're sorry about that. To help us improve, please tell us what we can do better:
Thank you!