Custom pipeline construction
This example shows how to build a pipeline manually, adding an object detection node that uses an .denkmodel model file.
📊 Architecture overview​
Input Image → Resize → Object Detection → Filter → Extract Patches → OCR → Results
↓ ↑
└──────────────────────────────────────┘
🎨 Visual pipeline diagram​
- Python
- C / C++
from denkflow import Pipeline, ImageTensor, HubLicenseSource
import numpy as np
import cv2
# --- Configuration ---
pat = "xxx"
license_id = "xxx"
# Path to the individual model file downloaded from Network Details
od_model_path = "od.denkmodel"
ocr_model_path = "ocr.denkmodel"
# --- License Source Creation ---
# For custom pipelines with per-model `.denkmodel` files, a LicenseSource is typically needed
# when adding AI model nodes. The simplest way is using HubLicenseSource with a PAT:
license_source = HubLicenseSource.from_pat(pat, license_id=license_id)
# --- Pipeline Construction ---
pipeline = Pipeline()
# Define input topic for the pipeline
input_image_topic = "pipeline/input_image"
# Add nodes (example: resize -> detect)
### object detection
target_size_node = pipeline.add_const_tensor_node(
"target-size",
np.array([1088, 1280], dtype=np.int64), # Example size
)
image_resize_node = pipeline.add_image_resize_node(
node_name="image-resize",
image_source=input_image_topic,
target_size_source=target_size_node.output,
resize_mode="CenterPadBlack", # also: "Stretch", "CenterPadWhite"
execution_provider="cpu", # Execution provider ("cpu", "cuda", "tensorrt", "directml", "openvino")
device_id=0, # Device id for the execution provider
)
object_detection_node = pipeline.add_object_detection_node(
node_name="object-detection",
image_source=image_resize_node.output, # Input from resize node
model_path=od_model_path, # Path to the .denkmodel file
license_source=license_source, # Pass the created license source
execution_provider="cpu", # Execution provider
device_id=0, # Device id for the execution provider
)
bb_filter = pipeline.add_bounding_box_filter_node(
node_name="bb-filter",
bounding_boxes_source=object_detection_node.output,
iou_threshold_source=0.7,
score_threshold_source=0.5,
execution_provider="cpu",
device_id=0,
)
###
### ocr
patch_size_node = pipeline.add_const_tensor_node(
"patch-size",
np.array([224, 224], dtype=np.int64), # Example size
)
image_patches_node = pipeline.add_image_patches_node(
node_name="image-patches",
image_source=image_resize_node.output, # Input from resize node
bboxes_source=bb_filter.output,
target_size_source=patch_size_node.output,
resize_mode="CenterPadBlack", # also: "Stretch", "CenterPadWhite"
resize_method="Bilinear", # also: "Nearest", "Bicubic", "Area"
bounding_box_extend_ratio=0.0, # extend each bbox by this fraction before cropping
execution_provider="cpu", # Execution provider
device_id=0, # Device id for the execution provider
)
ocr_node = pipeline.add_ocr_node(
node_name="ocr-node",
image_source="image-patches/cutouts", # Patch cutouts topic from image-patches node
model_path=ocr_model_path, # Path to the .denkmodel file
license_source=license_source, # Pass the created license source
execution_provider="cpu", # Execution provider
device_id=0, # Device id for the execution provider
)
###
# --- Inspection ---
print("Custom Pipeline Structure:")
print(pipeline)
# --- Initialization ---
pipeline.initialize()
# --- Subscribe to Final Output ---
object_receiver = pipeline.subscribe(bb_filter.output)
text_receiver = pipeline.subscribe(ocr_node.output)
# --- Publish Input Image to the defined topic ---
image_path = "test.png" # Define image_path closer to its use
image_tensor = ImageTensor.from_file(image_path)
# For in-memory image data such as NumPy arrays, see the Creating ImageTensors guide:
# ../creating-image-tensors.md
pipeline.publish_image_tensor(input_image_topic, image_tensor)
# --- Run Pipeline ---
pipeline.run()
# --- Receive and Process Results ---
objects = object_receiver.receive_bounding_box_tensor().to_objects(0.5)
texts = text_receiver.receive_ocr_tensor().to_objects()
print(f"objects {objects}:")
print(f"texts {texts}:")
for obj, text in zip(objects, texts):
print(f"\n📦 Object Detection Result:")
print(f" • Class: {obj.class_label.name}")
print(f" • Confidence: {obj.confidence:.2%}")
print(f" • Location: Top-Left ({obj.x1}, {obj.y1})")
print(f" Bottom-Right ({obj.x2}, {obj.y2})")
print(f" • Text: {text}")
print(" " + "─" * 50)
# Read the original image
image = cv2.imread(image_path)
# Colors for visualization
colors = {
"box": (0, 255, 0), # Green for bounding box
"text_bg": (0, 0, 0), # Black for text background
"text": (255, 255, 255), # White for text
}
for obj, text in zip(objects, texts):
# Draw bounding box
start_point = (int(obj.x1 * image.shape[1]), int(obj.y1 * image.shape[0]))
end_point = (int(obj.x2 * image.shape[1]), int(obj.y2 * image.shape[0]))
cv2.rectangle(image, start_point, end_point, colors["box"], 2)
# Prepare label text
label = f"{obj.class_label.name} ({obj.confidence:.1%}): {text}"
# Calculate text size and position
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.6
thickness = 1
(text_width, text_height), baseline = cv2.getTextSize(
label, font, font_scale, thickness
)
# Draw text background
text_x = int(obj.x1 * image.shape[1])
text_y = (
int(obj.y1 * image.shape[0]) - 10
if int(obj.y1 * image.shape[0]) - 10 > text_height
else int(obj.y1 * image.shape[0]) + text_height
)
cv2.rectangle(
image,
(text_x, text_y - text_height - baseline),
(text_x + text_width, text_y + baseline),
colors["text_bg"],
-1,
) # Filled rectangle
# Draw text
cv2.putText(
image, label, (text_x, text_y), font, font_scale, colors["text"], thickness
)
# Save the annotated image
cv2.imwrite("output.png", image)
print("\n✨ Annotated image saved as 'output.png'")
The C-API supports the full custom-pipeline workflow. The example below mirrors the Python pipeline above (resize → detect → filter → patches → OCR).
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "denkflow.h"
static void handle_error(enum DenkflowResult code, const char* fn) {
printf("%s: %d", fn, (int)code);
if (code != DenkflowResult_Ok) {
char* buffer = (char*)malloc(DENKFLOW_ERROR_BUFFER_SIZE);
denkflow_get_last_error(buffer);
printf(" (%s)\n", buffer);
free(buffer);
exit(EXIT_FAILURE);
}
printf("\n");
}
int main(void) {
DenkflowHubLicenseSource* license = NULL;
DenkflowPipeline* pipeline = NULL;
DenkflowConstTensorNodeReference* target_size = NULL;
DenkflowConstTensorNodeReference* patch_size = NULL;
DenkflowImageResizeNodeReference* resize = NULL;
DenkflowImageObjectDetectionNodeReference* detector = NULL;
DenkflowBoundingBoxFilterNodeReference* bb_filter = NULL;
DenkflowImagePatchesNodeReference* patches = NULL;
DenkflowOCRNodeReference* ocr = NULL;
DenkflowInitializedPipeline* initialized = NULL;
DenkflowReceiverTensor* obj_receiver = NULL;
DenkflowReceiverTensor* txt_receiver = NULL;
DenkflowImageTensor* image = NULL;
DenkflowBoundingBoxTensor* bbox_tensor = NULL;
DenkflowOcrTensor* ocr_tensor = NULL;
DenkflowBoundingBoxResults* bbox_results = NULL;
DenkflowOcrResults* ocr_results = NULL;
int64_t target_size_values[2] = {1088, 1280};
int64_t patch_size_values[2] = {224, 224};
handle_error(denkflow_hub_license_source_from_pat(&license, "YOUR-PAT", "YOUR-LICENSE-ID", NULL),
"denkflow_hub_license_source_from_pat");
handle_error(denkflow_pipeline_new(&pipeline), "denkflow_pipeline_new");
handle_error(denkflow_pipeline_add_const_tensor_node_int(&target_size, pipeline, "target-size",
target_size_values, 2),
"denkflow_pipeline_add_const_tensor_node_int (target-size)");
handle_error(denkflow_pipeline_add_const_tensor_node_int(&patch_size, pipeline, "patch-size",
patch_size_values, 2),
"denkflow_pipeline_add_const_tensor_node_int (patch-size)");
handle_error(denkflow_pipeline_add_image_resize_node(
&resize, pipeline, "image-resize", "pipeline/input_image", target_size->output,
DenkflowResizeMode_CenterPadBlack, "cpu", 0),
"denkflow_pipeline_add_image_resize_node");
handle_error(denkflow_pipeline_add_image_object_detection_node(
&detector, pipeline, "object-detection", resize->output, "od.denkmodel",
(void*)license, "cpu", 0),
"denkflow_pipeline_add_image_object_detection_node");
handle_error(denkflow_pipeline_add_bounding_box_filter_node(
&bb_filter, pipeline, "bb-filter", detector->output,
"thresholds/iou", "thresholds/score", "cpu", 0),
"denkflow_pipeline_add_bounding_box_filter_node");
handle_error(denkflow_pipeline_add_image_patches_node(
&patches, pipeline, "image-patches", resize->output, bb_filter->output,
patch_size->output,
DenkflowResizeMode_CenterPadBlack,
DenkflowImagePatchesNodeResizeMethod_Bilinear,
"cpu", 0),
"denkflow_pipeline_add_image_patches_node");
handle_error(denkflow_pipeline_add_ocr_node(
&ocr, pipeline, "ocr-node", patches->output, "ocr.denkmodel",
(void*)license, "cpu", 0),
"denkflow_pipeline_add_ocr_node");
handle_error(denkflow_initialize_pipeline(&initialized, &pipeline),
"denkflow_initialize_pipeline");
handle_error(denkflow_initialized_pipeline_subscribe(&obj_receiver, initialized, bb_filter->output),
"denkflow_initialized_pipeline_subscribe (objects)");
handle_error(denkflow_initialized_pipeline_subscribe(&txt_receiver, initialized, ocr->output),
"denkflow_initialized_pipeline_subscribe (text)");
handle_error(denkflow_image_tensor_from_file(&image, "test.png"),
"denkflow_image_tensor_from_file");
handle_error(denkflow_initialized_pipeline_publish_image_tensor(
initialized, "pipeline/input_image", &image),
"denkflow_initialized_pipeline_publish_image_tensor");
handle_error(denkflow_initialized_pipeline_run(initialized, 8000),
"denkflow_initialized_pipeline_run");
handle_error(denkflow_receiver_receive_bounding_box_tensor(&bbox_tensor, obj_receiver),
"denkflow_receiver_receive_bounding_box_tensor");
handle_error(denkflow_bounding_box_tensor_to_objects(&bbox_results, bbox_tensor, 0.5f),
"denkflow_bounding_box_tensor_to_objects");
handle_error(denkflow_receiver_receive_ocr_tensor(&ocr_tensor, txt_receiver),
"denkflow_receiver_receive_ocr_tensor");
handle_error(denkflow_ocr_tensor_to_objects(&ocr_results, ocr_tensor),
"denkflow_ocr_tensor_to_objects");
for (int i = 0; i < bbox_results->bounding_boxes_length && i < ocr_results->ocr_strings_length; ++i) {
printf("%s (%.2f): %s\n",
bbox_results->bounding_boxes[i].class_label.name,
bbox_results->bounding_boxes[i].confidence,
ocr_results->ocr_strings[i].c_string);
}
denkflow_free_object((void**)&bbox_results);
denkflow_free_object((void**)&ocr_results);
denkflow_free_object((void**)&bbox_tensor);
denkflow_free_object((void**)&ocr_tensor);
denkflow_free_object((void**)&obj_receiver);
denkflow_free_object((void**)&txt_receiver);
denkflow_free_object((void**)&initialized);
denkflow_free_object((void**)&license);
return 0;
}
Node details​
| Node Name | Type | Input(s) | Output | Purpose |
|---|---|---|---|---|
target-size | Const Tensor | - | [1088, 1280] | Define resize dimensions |
image-resize | Image Resize | Input image + target size | Resized image | Standardize image size |
object-detection | Object Detection | Resized image | Bounding boxes | Detect objects in image |
bb-filter | Bounding Box Filter | Raw detections | Filtered bboxes | Remove low-confidence/overlapping detections |
patch-size | Const Tensor | - | [224, 224] | Define patch dimensions |
image-patches | Image Patches | Resized image + filtered bboxes + patch size | Image cutouts | Extract object regions |
ocr-node | OCR | Image cutouts | Text results | Extract text from patches |
Filtering parameters​
- IoU Threshold: 0.7 (removes overlapping detections)
- Score Threshold: 0.5 (removes low-confidence detections)