Skip to main content
Version: 0.4.x

Custom Pipeline Construction

This example shows how to build a pipeline manually, adding an object detection node that uses an .easy model file.

from denkflow import Pipeline, ImageTensor, HubLicenseSource
import numpy as np

# --- Configuration ---
pat = "YOUR-PAT-TOKEN"
# Path to the individual model file downloaded from Network Details
easy_model_path = "path/to/your_model.easy"

# --- License Source Creation ---
# For custom pipelines using .easy files, a LicenseSource is typically needed
# when adding AI model nodes. The simplest way is using HubLicenseSource with a PAT:
license_source = HubLicenseSource.from_pat(pat, license_id="YOUR-LICENSE-ID-FOR-EASY-MODEL")

# --- Pipeline Construction ---
pipeline = Pipeline()

# Define input topic for the pipeline
input_image_topic = "pipeline/input_image"

# Add nodes (example: resize -> detect)
target_size_node = pipeline.add_const_tensor_node(
"target-size", np.array([640, 640], dtype=np.int64) # Example size
)

image_resize_node = pipeline.add_image_resize_node(
node_name="image-resize",
image_source=input_image_topic,
target_size_source=target_size_node.output,
device="cuda", # Execution device ("cuda", "cpu", etc.)
device_id=0 # Device index
)

object_detection_node = pipeline.add_object_detection_node(
node_name="object-detection",
image_source=image_resize_node.output, # Input from resize node
model_path=easy_model_path, # Path to the .easy model file
license_source=license_source, # Pass the created license source
device="cuda", # Execution device
device_id=0 # Device index
)

# --- Inspection ---
print("Custom Pipeline Structure:")
print(pipeline)

# --- Initialization ---
pipeline.initialize()

# --- Subscribe to Final Output ---
object_receiver = pipeline.subscribe_bounding_box_tensor(object_detection_node.output)

# --- Publish Input Image to the defined topic ---
image_path = "path/to/your/image.jpg" # Define image_path closer to its use
image_tensor = ImageTensor.from_file(image_path)
pipeline.publish_image_tensor(input_image_topic, image_tensor)

# --- Run Pipeline ---
pipeline.run()

# --- Receive and Process Results ---
bounding_box_tensor = object_receiver.receive()
objects = bounding_box_tensor.to_objects(0.5)

print(f"\nDetected {len(objects)} objects:")
for obj in objects:
print(f"- Class: {obj.class_label.name}, Confidence: {obj.confidence:.2f}")
print(f" BBox: ({obj.x1}, {obj.y1}), ({obj.x2}, {obj.y2})")