Skip to main content
Version: 0.5.x [Latest Alpha]

Custom Pipeline Construction

This example shows how to build a pipeline manually, adding an object detection node that uses an .denkmodel model file.

šŸ“Š Architecture Overview​

Input Image → Resize → Object Detection → Filter → Extract Patches → OCR → Results
↓ ↑
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜

šŸŽØ Visual Pipeline Diagram​

from denkflow import Pipeline, ImageTensor, HubLicenseSource
import numpy as np
import cv2

# --- Configuration ---
pat = "xxx"
license_id = "xxx"
# Path to the individual model file downloaded from Network Details
od_model_path = "od.denkmodel"
ocr_model_path = "ocr.denkmodel"

# --- License Source Creation ---
# For custom pipelines using .easy files, a LicenseSource is typically needed
# when adding AI model nodes. The simplest way is using HubLicenseSource with a PAT:
license_source = HubLicenseSource.from_pat(pat, license_id=license_id)

# --- Pipeline Construction ---
pipeline = Pipeline()

# Define input topic for the pipeline
input_image_topic = "pipeline/input_image"

# Add nodes (example: resize -> detect)

### object detection
target_size_node = pipeline.add_const_tensor_node(
"target-size",
np.array([1088, 1280], dtype=np.int64), # Example size
)

image_resize_node = pipeline.add_image_resize_node(
node_name="image-resize",
image_source=input_image_topic,
target_size_source=target_size_node.output,
device="cpu", # Execution device ("cuda", "cpu", etc.)
device_id=0, # Device index
)

object_detection_node = pipeline.add_object_detection_node(
node_name="object-detection",
image_source=image_resize_node.output, # Input from resize node
model_path=od_model_path, # Path to the .easy model file
license_source=license_source, # Pass the created license source
device="cpu", # Execution device
device_id=0, # Device index
)

bb_filter = pipeline.add_bounding_box_filter_node(
node_name="bb-filter",
bounding_boxes_source=object_detection_node.output,
iou_threshold_source=0.7,
score_threshold_source=0.5,
device="cpu",
device_id=0,
)

###
### ocr
patch_size_node = pipeline.add_const_tensor_node(
"patch-size",
np.array([224, 224], dtype=np.int64), # Example size
)
image_patches_node = pipeline.add_image_patches_node(
node_name="image-patches",
image_source=image_resize_node.output, # Input from resize node
bboxes_source=bb_filter.output,
target_size_source=patch_size_node.output,
device="cpu", # Execution device
device_id=0, # Device index
)

ocr_node = pipeline.add_ocr_node(
node_name="ocr-node",
image_source="image-patches/cutouts", # Input from resize node
model_path=ocr_model_path, # Path to the .easy model file
license_source=license_source, # Pass the created license source
device="cpu", # Execution device
device_id=0, # Device index
)

###
# --- Inspection ---
print("Custom Pipeline Structure:")
print(pipeline)

# --- Initialization ---
pipeline.initialize()

# --- Subscribe to Final Output ---
object_receiver = pipeline.subscribe_bounding_box_tensor(bb_filter.output)
text_receiver = pipeline.subscribe_ocr_tensor(ocr_node.output)

# --- Publish Input Image to the defined topic ---
image_path = "test.png" # Define image_path closer to its use
image_tensor = ImageTensor.from_file(image_path)
pipeline.publish_image_tensor(input_image_topic, image_tensor)

# --- Run Pipeline ---
pipeline.run()

# --- Receive and Process Results ---
objects = object_receiver.receive().to_objects(0.5)
texts = text_receiver.receive().to_objects()

print(f"objects {objects}:")
print(f"texts {texts}:")

for obj, text in zip(objects, texts):
print(f"\nšŸ“¦ Object Detection Result:")
print(f" • Class: {obj.class_label.name}")
print(f" • Confidence: {obj.confidence:.2%}")
print(f" • Location: Top-Left ({obj.x1}, {obj.y1})")
print(f" Bottom-Right ({obj.x2}, {obj.y2})")
print(f" • Text: {text}")
print(" " + "─" * 50)


# Read the original image
image = cv2.imread(image_path)

# Colors for visualization
colors = {
"box": (0, 255, 0), # Green for bounding box
"text_bg": (0, 0, 0), # Black for text background
"text": (255, 255, 255), # White for text
}

for obj, text in zip(objects, texts):
# Draw bounding box
start_point = (int(obj.x1 * image.shape[1]), int(obj.y1 * image.shape[0]))
end_point = (int(obj.x2 * image.shape[1]), int(obj.y2 * image.shape[0]))
cv2.rectangle(image, start_point, end_point, colors["box"], 2)

# Prepare label text
label = f"{obj.class_label.name} ({obj.confidence:.1%}): {text}"

# Calculate text size and position
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.6
thickness = 1
(text_width, text_height), baseline = cv2.getTextSize(
label, font, font_scale, thickness
)

# Draw text background
text_x = int(obj.x1 * image.shape[1])
text_y = (
int(obj.y1 * image.shape[0]) - 10
if int(obj.y1 * image.shape[0]) - 10 > text_height
else int(obj.y1 * image.shape[0]) + text_height
)
cv2.rectangle(
image,
(text_x, text_y - text_height - baseline),
(text_x + text_width, text_y + baseline),
colors["text_bg"],
-1,
) # Filled rectangle

# Draw text
cv2.putText(
image, label, (text_x, text_y), font, font_scale, colors["text"], thickness
)

# Save the annotated image
cv2.imwrite("output.png", image)
print("\n✨ Annotated image saved as 'output.png'")

Node Details​

Node NameTypeInput(s)OutputPurpose
target-sizeConst Tensor-[1088, 1280]Define resize dimensions
image-resizeImage ResizeInput image + target sizeResized imageStandardize image size
object-detectionObject DetectionResized imageBounding boxesDetect objects in image
bb-filterBounding Box FilterRaw detectionsFiltered bboxesRemove low-confidence/overlapping detections
patch-sizeConst Tensor-[224, 224]Define patch dimensions
image-patchesImage PatchesResized image + filtered bboxes + patch sizeImage cutoutsExtract object regions
ocr-nodeOCRImage cutoutsText resultsExtract text from patches

Filtering Parameters​

  • IoU Threshold: 0.7 (removes overlapping detections)
  • Score Threshold: 0.5 (removes low-confidence detections)