Custom Pipeline Construction
This example shows how to build a pipeline manually, adding an object detection node that uses an .easy
model file.
- Python
from denkflow import Pipeline, ImageTensor, HubLicenseSource
import numpy as np
# --- Configuration ---
pat = "YOUR-PAT-TOKEN"
# Path to the individual model file downloaded from Network Details
easy_model_path = "path/to/your_model.easy"
# --- License Source Creation ---
# For custom pipelines using .easy files, a LicenseSource is typically needed
# when adding AI model nodes. The simplest way is using HubLicenseSource with a PAT:
license_source = HubLicenseSource.from_pat(pat, license_id="YOUR-LICENSE-ID-FOR-EASY-MODEL")
# --- Pipeline Construction ---
pipeline = Pipeline()
# Define input topic for the pipeline
input_image_topic = "pipeline/input_image"
# Add nodes (example: resize -> detect)
target_size_node = pipeline.add_const_tensor_node(
"target-size", np.array([640, 640], dtype=np.int64) # Example size
)
image_resize_node = pipeline.add_image_resize_node(
node_name="image-resize",
image_source=input_image_topic,
target_size_source=target_size_node.output,
device="cuda", # Execution device ("cuda", "cpu", etc.)
device_id=0 # Device index
)
object_detection_node = pipeline.add_object_detection_node(
node_name="object-detection",
image_source=image_resize_node.output, # Input from resize node
model_path=easy_model_path, # Path to the .easy model file
license_source=license_source, # Pass the created license source
device="cuda", # Execution device
device_id=0 # Device index
)
# --- Inspection ---
print("Custom Pipeline Structure:")
print(pipeline)
# --- Initialization ---
pipeline.initialize()
# --- Subscribe to Final Output ---
object_receiver = pipeline.subscribe_bounding_box_tensor(object_detection_node.output)
# --- Publish Input Image to the defined topic ---
image_path = "path/to/your/image.jpg" # Define image_path closer to its use
image_tensor = ImageTensor.from_file(image_path)
pipeline.publish_image_tensor(input_image_topic, image_tensor)
# --- Run Pipeline ---
pipeline.run()
# --- Receive and Process Results ---
bounding_box_tensor = object_receiver.receive()
objects = bounding_box_tensor.to_objects(0.5)
print(f"\nDetected {len(objects)} objects:")
for obj in objects:
print(f"- Class: {obj.class_label.name}, Confidence: {obj.confidence:.2f}")
print(f" BBox: ({obj.x1}, {obj.y1}), ({obj.x2}, {obj.y2})")