Custom Pipeline Construction
This example shows how to build a pipeline manually, adding an object detection node that uses an .denkmodel
model file.
š Architecture Overviewā
Input Image ā Resize ā Object Detection ā Filter ā Extract Patches ā OCR ā Results
ā ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
šØ Visual Pipeline Diagramā
- Python
- C
from denkflow import Pipeline, ImageTensor, HubLicenseSource
import numpy as np
import cv2
# --- Configuration ---
pat = "xxx"
license_id = "xxx"
# Path to the individual model file downloaded from Network Details
od_model_path = "od.denkmodel"
ocr_model_path = "ocr.denkmodel"
# --- License Source Creation ---
# For custom pipelines using .easy files, a LicenseSource is typically needed
# when adding AI model nodes. The simplest way is using HubLicenseSource with a PAT:
license_source = HubLicenseSource.from_pat(pat, license_id=license_id)
# --- Pipeline Construction ---
pipeline = Pipeline()
# Define input topic for the pipeline
input_image_topic = "pipeline/input_image"
# Add nodes (example: resize -> detect)
### object detection
target_size_node = pipeline.add_const_tensor_node(
"target-size",
np.array([1088, 1280], dtype=np.int64), # Example size
)
image_resize_node = pipeline.add_image_resize_node(
node_name="image-resize",
image_source=input_image_topic,
target_size_source=target_size_node.output,
device="cpu", # Execution device ("cuda", "cpu", etc.)
device_id=0, # Device index
)
object_detection_node = pipeline.add_object_detection_node(
node_name="object-detection",
image_source=image_resize_node.output, # Input from resize node
model_path=od_model_path, # Path to the .easy model file
license_source=license_source, # Pass the created license source
device="cpu", # Execution device
device_id=0, # Device index
)
bb_filter = pipeline.add_bounding_box_filter_node(
node_name="bb-filter",
bounding_boxes_source=object_detection_node.output,
iou_threshold_source=0.7,
score_threshold_source=0.5,
device="cpu",
device_id=0,
)
###
### ocr
patch_size_node = pipeline.add_const_tensor_node(
"patch-size",
np.array([224, 224], dtype=np.int64), # Example size
)
image_patches_node = pipeline.add_image_patches_node(
node_name="image-patches",
image_source=image_resize_node.output, # Input from resize node
bboxes_source=bb_filter.output,
target_size_source=patch_size_node.output,
device="cpu", # Execution device
device_id=0, # Device index
)
ocr_node = pipeline.add_ocr_node(
node_name="ocr-node",
image_source="image-patches/cutouts", # Input from resize node
model_path=ocr_model_path, # Path to the .easy model file
license_source=license_source, # Pass the created license source
device="cpu", # Execution device
device_id=0, # Device index
)
###
# --- Inspection ---
print("Custom Pipeline Structure:")
print(pipeline)
# --- Initialization ---
pipeline.initialize()
# --- Subscribe to Final Output ---
object_receiver = pipeline.subscribe_bounding_box_tensor(bb_filter.output)
text_receiver = pipeline.subscribe_ocr_tensor(ocr_node.output)
# --- Publish Input Image to the defined topic ---
image_path = "test.png" # Define image_path closer to its use
image_tensor = ImageTensor.from_file(image_path)
pipeline.publish_image_tensor(input_image_topic, image_tensor)
# --- Run Pipeline ---
pipeline.run()
# --- Receive and Process Results ---
objects = object_receiver.receive().to_objects(0.5)
texts = text_receiver.receive().to_objects()
print(f"objects {objects}:")
print(f"texts {texts}:")
for obj, text in zip(objects, texts):
print(f"\nš¦ Object Detection Result:")
print(f" ⢠Class: {obj.class_label.name}")
print(f" ⢠Confidence: {obj.confidence:.2%}")
print(f" ⢠Location: Top-Left ({obj.x1}, {obj.y1})")
print(f" Bottom-Right ({obj.x2}, {obj.y2})")
print(f" ⢠Text: {text}")
print(" " + "ā" * 50)
# Read the original image
image = cv2.imread(image_path)
# Colors for visualization
colors = {
"box": (0, 255, 0), # Green for bounding box
"text_bg": (0, 0, 0), # Black for text background
"text": (255, 255, 255), # White for text
}
for obj, text in zip(objects, texts):
# Draw bounding box
start_point = (int(obj.x1 * image.shape[1]), int(obj.y1 * image.shape[0]))
end_point = (int(obj.x2 * image.shape[1]), int(obj.y2 * image.shape[0]))
cv2.rectangle(image, start_point, end_point, colors["box"], 2)
# Prepare label text
label = f"{obj.class_label.name} ({obj.confidence:.1%}): {text}"
# Calculate text size and position
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.6
thickness = 1
(text_width, text_height), baseline = cv2.getTextSize(
label, font, font_scale, thickness
)
# Draw text background
text_x = int(obj.x1 * image.shape[1])
text_y = (
int(obj.y1 * image.shape[0]) - 10
if int(obj.y1 * image.shape[0]) - 10 > text_height
else int(obj.y1 * image.shape[0]) + text_height
)
cv2.rectangle(
image,
(text_x, text_y - text_height - baseline),
(text_x + text_width, text_y + baseline),
colors["text_bg"],
-1,
) # Filled rectangle
# Draw text
cv2.putText(
image, label, (text_x, text_y), font, font_scale, colors["text"], thickness
)
# Save the annotated image
cv2.imwrite("output.png", image)
print("\n⨠Annotated image saved as 'output.png'")
Custom pipeline constriction is currently not implemented in the C-API, but it is a planned feature.
Node Detailsā
Node Name | Type | Input(s) | Output | Purpose |
---|---|---|---|---|
target-size | Const Tensor | - | [1088, 1280] | Define resize dimensions |
image-resize | Image Resize | Input image + target size | Resized image | Standardize image size |
object-detection | Object Detection | Resized image | Bounding boxes | Detect objects in image |
bb-filter | Bounding Box Filter | Raw detections | Filtered bboxes | Remove low-confidence/overlapping detections |
patch-size | Const Tensor | - | [224, 224] | Define patch dimensions |
image-patches | Image Patches | Resized image + filtered bboxes + patch size | Image cutouts | Extract object regions |
ocr-node | OCR | Image cutouts | Text results | Extract text from patches |
Filtering Parametersā
- IoU Threshold: 0.7 (removes overlapping detections)
- Score Threshold: 0.5 (removes low-confidence detections)