Cross-host communication

Learning Objectives

This tutorial extends an iceoryx2 system across host boundaries using the tunnel:

  1. Enable cross-host communication with the iox2 tunnel CLI, without touching application code

  2. Embed the tunnel in your own application for full control over its execution

  3. Drive the tunnel in polling and reactive modes

  4. Switch the communication mechanism by instantiating a different backend

So far Larry has been a self-contained system: his sensors, his control logic, and his actuators all reside on one host and talk over iceoryx2’s shared memory. But shared memory does not extend beyond the boundary of the host.

Imagine now that the user wants to remotely track where Larry is and how much battery he has left. Some kind of dashboard hosted on another device on the same network.

The required data is already flowing through iceoryx2’s shared memory. A gateway or a tunnel can hook into that flow and propagate it to other hosts. To get the data to the dashboard, we will use a network tunnel, which propagates the raw shared memory payloads over the network.

Tip

A gateway or tunnel can be implemented for any communication mechanism, not just network communication. All that is required is an implementation of the Backend traits available in iceoryx2-services-tunnel-backend.

For example, the tunnel has been used to extend communication between co-processors on a single board that have their own memory using cross-chip communication APIs.

Application setup

As in previous articles in this series, the data flowing through Larry are defined as ZeroCopySend structs:

use iceoryx2::prelude::*;

#[derive(Debug, Clone, Copy, ZeroCopySend)]
// shared type name; must match across languages
#[type_name("BatteryState")]
#[repr(C)]
pub struct BatteryState {
    pub charge_percent: f32,
}

#[derive(Debug, Clone, Copy, ZeroCopySend)]
// shared type name; must match across languages
#[type_name("Position")]
#[repr(C)]
pub struct Position {
    pub x: f32,
    pub y: f32,
}
import ctypes

class BatteryState(ctypes.Structure):
    """Battery telemetry payload."""

    _fields_ = [("charge_percent", ctypes.c_float)]

    # shared type name; must match across languages
    @staticmethod
    def type_name() -> str:
        return "BatteryState"


class Position(ctypes.Structure):
    """Position telemetry payload."""

    _fields_ = [("x", ctypes.c_float), ("y", ctypes.c_float)]

    # shared type name; must match across languages
    @staticmethod
    def type_name() -> str:
        return "Position"
struct BatteryState {
    // shared type name; must match across languages
    static constexpr const char* IOX2_TYPE_NAME = "BatteryState";
    float charge_percent;
};

struct Position {
    // shared type name; must match across languages
    static constexpr const char* IOX2_TYPE_NAME = "Position";
    float x;
    float y;
};
// shared type name; must match across languages
#define BATTERY_STATE_TYPE_NAME "BatteryState"
struct BatteryState {
    float charge_percent;
};

// shared type name; must match across languages
#define POSITION_TYPE_NAME "Position"
struct Position {
    float x;
    float y;
};

For demonstration purposes, we use the event-driven communication execution pattern, which is the same approach used by many popular network communication libraries. However, with iceoryx2, you are free to choose the best approach that suits your application — the tunnel can be configured to work with any approach. See execution control patterns for an overview.

Note

The tunnel currently only supports the publish-subscribe and event messaging patterns. Support for request-response and blackboard is planned.

We set Larry up to send a notification on a corresponding event service along with every sample that is published:

use core::time::Duration;
use cross_host_communication::telemetry_data::{BatteryState, Position};
use iceoryx2::prelude::*;

const CYCLE_TIME: Duration = Duration::from_secs(1);

let node = NodeBuilder::new()
    .name(&"Larry".try_into()?)
    .create::<ipc::Service>()?;

let battery_publisher = node
    .service_builder(&"larry/battery".try_into()?)
    .publish_subscribe::<BatteryState>()
    .open_or_create()?
    .publisher_builder()
    .create()?;
let battery_notifier = node
    .service_builder(&"larry/battery".try_into()?)
    .event()
    .open_or_create()?
    .notifier_builder()
    .create()?;

let position_publisher = node
    .service_builder(&"larry/position".try_into()?)
    .publish_subscribe::<Position>()
    .open_or_create()?
    .publisher_builder()
    .create()?;
let position_notifier = node
    .service_builder(&"larry/position".try_into()?)
    .event()
    .open_or_create()?
    .notifier_builder()
    .create()?;

while node.wait(CYCLE_TIME).is_ok() {
    let battery = battery_publisher.loan_uninit()?;
    let battery = battery.write_payload(BatteryState {
        charge_percent: 87.0,
    });
    battery.send()?;
    battery_notifier.notify()?;

    let position = position_publisher.loan_uninit()?;
    let position = position.write_payload(Position { x: 1.0, y: 2.0 });
    position.send()?;
    position_notifier.notify()?;
}
import iceoryx2 as iox2
from telemetry_data import BatteryState, Position

cycle_time = iox2.Duration.from_secs(1)

iox2.set_log_level_from_env_or(iox2.LogLevel.Info)
node = iox2.NodeBuilder.new().create(iox2.ServiceType.Ipc)

battery_publisher = (
    node.service_builder(iox2.ServiceName.new("larry/battery"))
    .publish_subscribe(BatteryState)
    .open_or_create()
    .publisher_builder()
    .create()
)
battery_notifier = (
    node.service_builder(iox2.ServiceName.new("larry/battery"))
    .event()
    .open_or_create()
    .notifier_builder()
    .create()
)

position_publisher = (
    node.service_builder(iox2.ServiceName.new("larry/position"))
    .publish_subscribe(Position)
    .open_or_create()
    .publisher_builder()
    .create()
)
position_notifier = (
    node.service_builder(iox2.ServiceName.new("larry/position"))
    .event()
    .open_or_create()
    .notifier_builder()
    .create()
)

try:
    while True:
        node.wait(cycle_time)

        battery = battery_publisher.loan_uninit()
        battery = battery.write_payload(BatteryState(charge_percent=87.0))
        battery.send()
        battery_notifier.notify()

        position = position_publisher.loan_uninit()
        position = position.write_payload(Position(x=1.0, y=2.0))
        position.send()
        position_notifier.notify()

except iox2.NodeWaitFailure:
    print("exit")
constexpr iox2::bb::Duration CYCLE_TIME = iox2::bb::Duration::from_secs(1);

auto main() -> int {
    using namespace iox2;
    set_log_level_from_env_or(LogLevel::Info);

    auto node = NodeBuilder().name(NodeName::create("Larry").value()).create<ServiceType::Ipc>().value();

    auto battery_publisher = node.service_builder(ServiceName::create("larry/battery").value())
                                 .publish_subscribe<BatteryState>()
                                 .open_or_create()
                                 .value()
                                 .publisher_builder()
                                 .create()
                                 .value();
    auto battery_notifier = node.service_builder(ServiceName::create("larry/battery").value())
                                .event()
                                .open_or_create()
                                .value()
                                .notifier_builder()
                                .create()
                                .value();

    auto position_publisher = node.service_builder(ServiceName::create("larry/position").value())
                                  .publish_subscribe<Position>()
                                  .open_or_create()
                                  .value()
                                  .publisher_builder()
                                  .create()
                                  .value();
    auto position_notifier = node.service_builder(ServiceName::create("larry/position").value())
                                 .event()
                                 .open_or_create()
                                 .value()
                                 .notifier_builder()
                                 .create()
                                 .value();

    while (node.wait(CYCLE_TIME).has_value()) {
        auto battery = battery_publisher.loan_uninit().value();
        auto initialized_battery = battery.write_payload(BatteryState { 87.0F }); // NOLINT
        send(std::move(initialized_battery)).has_value();
        battery_notifier.notify().has_value();

        auto position = position_publisher.loan_uninit().value();
        auto initialized_position = position.write_payload(Position { 1.0F, 2.0F }); // NOLINT
        send(std::move(initialized_position)).has_value();
        position_notifier.notify().has_value();
    }

    return 0;
}
#define CYCLE_TIME_SECS 1
#define CYCLE_TIME_NANOS 0

int main(void) {
    iox2_set_log_level_from_env_or(iox2_log_level_e_INFO);
    int ret_val = 0;

    // create node
    iox2_node_builder_h node_builder = iox2_node_builder_new(NULL);
    iox2_node_h node = NULL;
    ret_val = iox2_node_builder_create(node_builder, NULL, iox2_service_type_e_IPC, &node);
    if (ret_val != IOX2_OK) {
        printf("Could not create node! Error: %d\n", ret_val);
        goto end;
    }

    // create battery publish-subscribe service
    iox2_service_name_h battery_name = NULL;
    const char* battery_name_value = "larry/battery";
    iox2_service_name_new(NULL, battery_name_value, strlen(battery_name_value), &battery_name);
    iox2_service_builder_pub_sub_h battery_builder =
        iox2_service_builder_pub_sub(iox2_node_service_builder(&node, NULL, iox2_cast_service_name_ptr(battery_name)));
    iox2_service_builder_pub_sub_set_payload_type_details(&battery_builder,
                                                          iox2_type_variant_e_FIXED_SIZE,
                                                          BATTERY_STATE_TYPE_NAME,
                                                          strlen(BATTERY_STATE_TYPE_NAME),
                                                          sizeof(struct BatteryState),
                                                          alignof(struct BatteryState));
    iox2_port_factory_pub_sub_h battery_service = NULL;
    ret_val = iox2_service_builder_pub_sub_open_or_create(battery_builder, NULL, &battery_service);
    if (ret_val != IOX2_OK) {
        printf("Could not create battery service! Error: %d\n", ret_val);
        goto drop_node;
    }

    // create position publish-subscribe service
    iox2_service_name_h position_name = NULL;
    const char* position_name_value = "larry/position";
    iox2_service_name_new(NULL, position_name_value, strlen(position_name_value), &position_name);
    iox2_service_builder_pub_sub_h position_builder =
        iox2_service_builder_pub_sub(iox2_node_service_builder(&node, NULL, iox2_cast_service_name_ptr(position_name)));
    iox2_service_builder_pub_sub_set_payload_type_details(&position_builder,
                                                          iox2_type_variant_e_FIXED_SIZE,
                                                          POSITION_TYPE_NAME,
                                                          strlen(POSITION_TYPE_NAME),
                                                          sizeof(struct Position),
                                                          alignof(struct Position));
    iox2_port_factory_pub_sub_h position_service = NULL;
    ret_val = iox2_service_builder_pub_sub_open_or_create(position_builder, NULL, &position_service);
    if (ret_val != IOX2_OK) {
        printf("Could not create position service! Error: %d\n", ret_val);
        goto drop_battery_service;
    }

    // create battery event service
    iox2_service_builder_event_h battery_event_builder =
        iox2_service_builder_event(iox2_node_service_builder(&node, NULL, iox2_cast_service_name_ptr(battery_name)));
    iox2_port_factory_event_h battery_event_service = NULL;
    ret_val = iox2_service_builder_event_open_or_create(battery_event_builder, NULL, &battery_event_service);
    if (ret_val != IOX2_OK) {
        printf("Could not create battery event service! Error: %d\n", ret_val);
        goto drop_position_service;
    }

    // create position event service
    iox2_service_builder_event_h position_event_builder =
        iox2_service_builder_event(iox2_node_service_builder(&node, NULL, iox2_cast_service_name_ptr(position_name)));
    iox2_port_factory_event_h position_event_service = NULL;
    ret_val = iox2_service_builder_event_open_or_create(position_event_builder, NULL, &position_event_service);
    if (ret_val != IOX2_OK) {
        printf("Could not create position event service! Error: %d\n", ret_val);
        goto drop_battery_event_service;
    }

    // create publishers
    iox2_publisher_h battery_publisher = NULL;
    iox2_port_factory_publisher_builder_create(
        iox2_port_factory_pub_sub_publisher_builder(&battery_service, NULL), NULL, &battery_publisher);
    iox2_publisher_h position_publisher = NULL;
    iox2_port_factory_publisher_builder_create(
        iox2_port_factory_pub_sub_publisher_builder(&position_service, NULL), NULL, &position_publisher);

    // create notifiers
    iox2_notifier_h battery_notifier = NULL;
    iox2_port_factory_notifier_builder_create(
        iox2_port_factory_event_notifier_builder(&battery_event_service, NULL), NULL, &battery_notifier);
    iox2_notifier_h position_notifier = NULL;
    iox2_port_factory_notifier_builder_create(
        iox2_port_factory_event_notifier_builder(&position_event_service, NULL), NULL, &position_notifier);

    while (iox2_node_wait(&node, CYCLE_TIME_SECS, CYCLE_TIME_NANOS) == IOX2_OK) {
        // publish battery state and notify
        iox2_sample_mut_h battery_sample = NULL;
        if (iox2_publisher_loan_slice_uninit(&battery_publisher, NULL, &battery_sample, 1) == IOX2_OK) {
            struct BatteryState* battery_payload = NULL;
            iox2_sample_mut_payload_mut(&battery_sample, (void**) &battery_payload, NULL);
            battery_payload->charge_percent = 87.0F; // NOLINT
            iox2_sample_mut_send(battery_sample, NULL);
            iox2_notifier_notify(&battery_notifier, NULL);
        }

        // publish position and notify
        iox2_sample_mut_h position_sample = NULL;
        if (iox2_publisher_loan_slice_uninit(&position_publisher, NULL, &position_sample, 1) == IOX2_OK) {
            struct Position* position_payload = NULL;
            iox2_sample_mut_payload_mut(&position_sample, (void**) &position_payload, NULL);
            position_payload->x = 1.0F; // NOLINT
            position_payload->y = 2.0F; // NOLINT
            iox2_sample_mut_send(position_sample, NULL);
            iox2_notifier_notify(&position_notifier, NULL);
        }
    }

    iox2_notifier_drop(position_notifier);
    iox2_notifier_drop(battery_notifier);
    iox2_publisher_drop(position_publisher);
    iox2_publisher_drop(battery_publisher);
    iox2_port_factory_event_drop(position_event_service);
drop_battery_event_service:
    iox2_port_factory_event_drop(battery_event_service);
drop_position_service:
    iox2_port_factory_pub_sub_drop(position_service);
drop_battery_service:
    iox2_port_factory_pub_sub_drop(battery_service);
drop_node:
    iox2_node_drop(node);
end:
    return ret_val;
}

In the dashboard application running on another host, we set up listeners to wake the thread when new data arrives, and subscribers to receive the data. A WaitSet is used to react to events from both listeners in a single thread.

use cross_host_communication::telemetry_data::{BatteryState, Position};
use iceoryx2::prelude::*;

let node = NodeBuilder::new()
    .name(&"Dashboard".try_into()?)
    .create::<ipc::Service>()?;

let battery_subscriber = node
    .service_builder(&"larry/battery".try_into()?)
    .publish_subscribe::<BatteryState>()
    .open_or_create()?
    .subscriber_builder()
    .create()?;
let battery_listener = node
    .service_builder(&"larry/battery".try_into()?)
    .event()
    .open_or_create()?
    .listener_builder()
    .create()?;

let position_subscriber = node
    .service_builder(&"larry/position".try_into()?)
    .publish_subscribe::<Position>()
    .open_or_create()?
    .subscriber_builder()
    .create()?;
let position_listener = node
    .service_builder(&"larry/position".try_into()?)
    .event()
    .open_or_create()?
    .listener_builder()
    .create()?;

let waitset = WaitSetBuilder::new().create::<ipc::Service>()?;
let battery_guard = waitset.attach_notification(&battery_listener)?;
let position_guard = waitset.attach_notification(&position_listener)?;

let on_event = |attachment_id: WaitSetAttachmentId<ipc::Service>| {
    if attachment_id.has_event_from(&battery_guard) {
        // drain every pending notification, otherwise the WaitSet wakes
        // again immediately and spins
        battery_listener.try_wait(|_| {}).unwrap();
        while let Ok(Some(sample)) = battery_subscriber.receive() {
            println!("battery: {}%", sample.charge_percent);
        }
    } else if attachment_id.has_event_from(&position_guard) {
        // drain every pending notification, otherwise the WaitSet wakes
        // again immediately and spins
        position_listener.try_wait(|_| {}).unwrap();
        while let Ok(Some(sample)) = position_subscriber.receive() {
            println!("position: ({}, {})", sample.x, sample.y);
        }
    }

    CallbackProgression::Continue
};

waitset.wait_and_process(on_event)?;
import iceoryx2 as iox2
from telemetry_data import BatteryState, Position

iox2.set_log_level_from_env_or(iox2.LogLevel.Info)
node = iox2.NodeBuilder.new().create(iox2.ServiceType.Ipc)

battery_subscriber = (
    node.service_builder(iox2.ServiceName.new("larry/battery"))
    .publish_subscribe(BatteryState)
    .open_or_create()
    .subscriber_builder()
    .create()
)
battery_listener = (
    node.service_builder(iox2.ServiceName.new("larry/battery"))
    .event()
    .open_or_create()
    .listener_builder()
    .create()
)

position_subscriber = (
    node.service_builder(iox2.ServiceName.new("larry/position"))
    .publish_subscribe(Position)
    .open_or_create()
    .subscriber_builder()
    .create()
)
position_listener = (
    node.service_builder(iox2.ServiceName.new("larry/position"))
    .event()
    .open_or_create()
    .listener_builder()
    .create()
)

waitset = iox2.WaitSetBuilder.new().create(iox2.ServiceType.Ipc)
battery_guard = waitset.attach_notification(battery_listener)
position_guard = waitset.attach_notification(position_listener)

try:
    while True:
        attachments, result = waitset.wait_and_process()
        if result in (
            iox2.WaitSetRunResult.TerminationRequest,
            iox2.WaitSetRunResult.Interrupt,
        ):
            break

        for attachment_id in attachments:
            if attachment_id.has_event_from(battery_guard):
                # drain every pending notification, otherwise the WaitSet wakes
                # again immediately and spins
                battery_listener.try_wait()
                sample = battery_subscriber.receive()
                while sample is not None:
                    print("battery:", sample.payload().contents.charge_percent, "%")
                    sample = battery_subscriber.receive()
            elif attachment_id.has_event_from(position_guard):
                # drain every pending notification, otherwise the WaitSet wakes
                # again immediately and spins
                position_listener.try_wait()
                sample = position_subscriber.receive()
                while sample is not None:
                    position = sample.payload().contents
                    print("position:", (position.x, position.y))
                    sample = position_subscriber.receive()

except iox2.WaitSetRunError:
    print("exit")
auto main() -> int {
    using namespace iox2;
    set_log_level_from_env_or(LogLevel::Info);

    auto node = NodeBuilder().name(NodeName::create("Dashboard").value()).create<ServiceType::Ipc>().value();

    auto battery_subscriber = node.service_builder(ServiceName::create("larry/battery").value())
                                  .publish_subscribe<BatteryState>()
                                  .open_or_create()
                                  .value()
                                  .subscriber_builder()
                                  .create()
                                  .value();
    auto battery_listener = node.service_builder(ServiceName::create("larry/battery").value())
                                .event()
                                .open_or_create()
                                .value()
                                .listener_builder()
                                .create()
                                .value();

    auto position_subscriber = node.service_builder(ServiceName::create("larry/position").value())
                                   .publish_subscribe<Position>()
                                   .open_or_create()
                                   .value()
                                   .subscriber_builder()
                                   .create()
                                   .value();
    auto position_listener = node.service_builder(ServiceName::create("larry/position").value())
                                 .event()
                                 .open_or_create()
                                 .value()
                                 .listener_builder()
                                 .create()
                                 .value();

    auto waitset = WaitSetBuilder().create<ServiceType::Ipc>().value();
    auto battery_guard = waitset.attach_notification(battery_listener).value();
    auto position_guard = waitset.attach_notification(position_listener).value();

    auto on_event = [&](WaitSetAttachmentId<ServiceType::Ipc> attachment_id) -> CallbackProgression {
        if (attachment_id.has_event_from(battery_guard)) {
            // drain every pending notification, otherwise the WaitSet wakes
            // again immediately and spins
            battery_listener.try_wait([](auto) {}).value();
            auto battery = battery_subscriber.receive().value();
            while (battery.has_value()) {
                std::cout << "battery: " << battery->payload().charge_percent << "%" << std::endl;
                battery = battery_subscriber.receive().value();
            }
        } else if (attachment_id.has_event_from(position_guard)) {
            // drain every pending notification, otherwise the WaitSet wakes
            // again immediately and spins
            position_listener.try_wait([](auto) {}).value();
            auto position = position_subscriber.receive().value();
            while (position.has_value()) {
                std::cout << "position: (" << position->payload().x << ", " << position->payload().y << ")"
                          << std::endl;
                position = position_subscriber.receive().value();
            }
        }

        return CallbackProgression::Continue;
    };

    waitset.wait_and_process(on_event).value();

    return 0;
}
struct CallbackContext {
    iox2_waitset_guard_h_ref battery_guard;
    iox2_waitset_guard_h_ref position_guard;
    iox2_listener_h_ref battery_listener;
    iox2_listener_h_ref position_listener;
    iox2_subscriber_h_ref battery_subscriber;
    iox2_subscriber_h_ref position_subscriber;
};

// no-op callback: draining only needs to clear the queue, not inspect events
static void drain_event(const iox2_event_id_t* event_id, uint64_t count, void* context) {
    (void) event_id;
    (void) count;
    (void) context;
}

// called whenever a listener attached to the WaitSet has received an event
static iox2_callback_progression_e on_event(iox2_waitset_attachment_id_h attachment_id, void* context) {
    struct CallbackContext* ctx = (struct CallbackContext*) context;

    uint64_t number_of_notifications = 0;

    if (iox2_waitset_attachment_id_has_event_from(&attachment_id, ctx->battery_guard)) {
        // drain every pending notification, otherwise the WaitSet wakes
        // again immediately and spins
        iox2_listener_try_wait(ctx->battery_listener, &number_of_notifications, drain_event, NULL);

        iox2_sample_h battery_sample = NULL;
        while (iox2_subscriber_receive(ctx->battery_subscriber, NULL, &battery_sample) == IOX2_OK
               && battery_sample != NULL) {
            struct BatteryState* payload = NULL;
            iox2_sample_payload(&battery_sample, (const void**) &payload, NULL);
            printf("battery: %.1f%%\n", (double) payload->charge_percent);
            iox2_sample_drop(battery_sample);
            battery_sample = NULL;
        }
    } else if (iox2_waitset_attachment_id_has_event_from(&attachment_id, ctx->position_guard)) {
        // drain every pending notification, otherwise the WaitSet wakes
        // again immediately and spins
        iox2_listener_try_wait(ctx->position_listener, &number_of_notifications, drain_event, NULL);

        iox2_sample_h position_sample = NULL;
        while (iox2_subscriber_receive(ctx->position_subscriber, NULL, &position_sample) == IOX2_OK
               && position_sample != NULL) {
            struct Position* payload = NULL;
            iox2_sample_payload(&position_sample, (const void**) &payload, NULL);
            printf("position: (%.1f, %.1f)\n", (double) payload->x, (double) payload->y);
            iox2_sample_drop(position_sample);
            position_sample = NULL;
        }
    }

    iox2_waitset_attachment_id_drop(attachment_id);
    return iox2_callback_progression_e_CONTINUE;
}

// NOLINTBEGIN(readability-function-size)
int main(void) {
    iox2_set_log_level_from_env_or(iox2_log_level_e_INFO);
    int ret_val = 0;

    // create node
    iox2_node_builder_h node_builder = iox2_node_builder_new(NULL);
    iox2_node_h node = NULL;
    ret_val = iox2_node_builder_create(node_builder, NULL, iox2_service_type_e_IPC, &node);
    if (ret_val != IOX2_OK) {
        printf("Could not create node! Error: %d\n", ret_val);
        goto end;
    }

    // create battery publish-subscribe service
    iox2_service_name_h battery_name = NULL;
    const char* battery_name_value = "larry/battery";
    iox2_service_name_new(NULL, battery_name_value, strlen(battery_name_value), &battery_name);
    iox2_service_builder_pub_sub_h battery_builder =
        iox2_service_builder_pub_sub(iox2_node_service_builder(&node, NULL, iox2_cast_service_name_ptr(battery_name)));
    iox2_service_builder_pub_sub_set_payload_type_details(&battery_builder,
                                                          iox2_type_variant_e_FIXED_SIZE,
                                                          BATTERY_STATE_TYPE_NAME,
                                                          strlen(BATTERY_STATE_TYPE_NAME),
                                                          sizeof(struct BatteryState),
                                                          alignof(struct BatteryState));
    iox2_port_factory_pub_sub_h battery_service = NULL;
    ret_val = iox2_service_builder_pub_sub_open_or_create(battery_builder, NULL, &battery_service);
    if (ret_val != IOX2_OK) {
        printf("Could not create battery service! Error: %d\n", ret_val);
        goto drop_node;
    }

    // create position publish-subscribe service
    iox2_service_name_h position_name = NULL;
    const char* position_name_value = "larry/position";
    iox2_service_name_new(NULL, position_name_value, strlen(position_name_value), &position_name);
    iox2_service_builder_pub_sub_h position_builder =
        iox2_service_builder_pub_sub(iox2_node_service_builder(&node, NULL, iox2_cast_service_name_ptr(position_name)));
    iox2_service_builder_pub_sub_set_payload_type_details(&position_builder,
                                                          iox2_type_variant_e_FIXED_SIZE,
                                                          POSITION_TYPE_NAME,
                                                          strlen(POSITION_TYPE_NAME),
                                                          sizeof(struct Position),
                                                          alignof(struct Position));
    iox2_port_factory_pub_sub_h position_service = NULL;
    ret_val = iox2_service_builder_pub_sub_open_or_create(position_builder, NULL, &position_service);
    if (ret_val != IOX2_OK) {
        printf("Could not create position service! Error: %d\n", ret_val);
        goto drop_battery_service;
    }

    // create battery event service
    iox2_service_builder_event_h battery_event_builder =
        iox2_service_builder_event(iox2_node_service_builder(&node, NULL, iox2_cast_service_name_ptr(battery_name)));
    iox2_port_factory_event_h battery_event_service = NULL;
    ret_val = iox2_service_builder_event_open_or_create(battery_event_builder, NULL, &battery_event_service);
    if (ret_val != IOX2_OK) {
        printf("Could not create battery event service! Error: %d\n", ret_val);
        goto drop_position_service;
    }

    // create position event service
    iox2_service_builder_event_h position_event_builder =
        iox2_service_builder_event(iox2_node_service_builder(&node, NULL, iox2_cast_service_name_ptr(position_name)));
    iox2_port_factory_event_h position_event_service = NULL;
    ret_val = iox2_service_builder_event_open_or_create(position_event_builder, NULL, &position_event_service);
    if (ret_val != IOX2_OK) {
        printf("Could not create position event service! Error: %d\n", ret_val);
        goto drop_battery_event_service;
    }

    // create subscribers
    iox2_subscriber_h battery_subscriber = NULL;
    iox2_port_factory_subscriber_builder_create(
        iox2_port_factory_pub_sub_subscriber_builder(&battery_service, NULL), NULL, &battery_subscriber);
    iox2_subscriber_h position_subscriber = NULL;
    iox2_port_factory_subscriber_builder_create(
        iox2_port_factory_pub_sub_subscriber_builder(&position_service, NULL), NULL, &position_subscriber);

    // create listeners
    iox2_listener_h battery_listener = NULL;
    iox2_port_factory_listener_builder_create(
        iox2_port_factory_event_listener_builder(&battery_event_service, NULL), NULL, &battery_listener);
    iox2_listener_h position_listener = NULL;
    iox2_port_factory_listener_builder_create(
        iox2_port_factory_event_listener_builder(&position_event_service, NULL), NULL, &position_listener);

    // create waitset and attach both listeners
    iox2_waitset_builder_h waitset_builder = NULL;
    iox2_waitset_builder_new(NULL, &waitset_builder);
    iox2_waitset_h waitset = NULL;
    ret_val = iox2_waitset_builder_create(waitset_builder, iox2_service_type_e_IPC, NULL, &waitset);
    if (ret_val != IOX2_OK) {
        printf("Could not create waitset! Error: %d\n", ret_val);
        goto drop_listeners;
    }

    iox2_waitset_guard_h battery_guard = NULL;
    ret_val = iox2_waitset_attach_notification(
        &waitset, iox2_listener_get_file_descriptor(&battery_listener), NULL, &battery_guard);
    if (ret_val != IOX2_OK) {
        printf("Could not attach battery listener! Error: %d\n", ret_val);
        goto drop_waitset;
    }

    iox2_waitset_guard_h position_guard = NULL;
    ret_val = iox2_waitset_attach_notification(
        &waitset, iox2_listener_get_file_descriptor(&position_listener), NULL, &position_guard);
    if (ret_val != IOX2_OK) {
        printf("Could not attach position listener! Error: %d\n", ret_val);
        goto drop_battery_guard;
    }

    struct CallbackContext context;
    context.battery_guard = &battery_guard;
    context.position_guard = &position_guard;
    context.battery_listener = &battery_listener;
    context.position_listener = &position_listener;
    context.battery_subscriber = &battery_subscriber;
    context.position_subscriber = &position_subscriber;

    iox2_waitset_run_result_e result = iox2_waitset_run_result_e_STOP_REQUEST;
    ret_val = iox2_waitset_wait_and_process(&waitset, on_event, (void*) &context, &result);
    if (ret_val != IOX2_OK) {
        printf("Failure in WaitSet wait_and_process loop! Error: %d\n", ret_val);
    }

    iox2_waitset_guard_drop(position_guard);
drop_battery_guard:
    iox2_waitset_guard_drop(battery_guard);
drop_waitset:
    iox2_waitset_drop(waitset);
drop_listeners:
    iox2_listener_drop(position_listener);
    iox2_listener_drop(battery_listener);
    iox2_subscriber_drop(position_subscriber);
    iox2_subscriber_drop(battery_subscriber);
    iox2_port_factory_event_drop(position_event_service);
drop_battery_event_service:
    iox2_port_factory_event_drop(battery_event_service);
drop_position_service:
    iox2_port_factory_pub_sub_drop(position_service);
drop_battery_service:
    iox2_port_factory_pub_sub_drop(battery_service);
drop_node:
    iox2_node_drop(node);
end:
    return ret_val;
}
// NOLINTEND(readability-function-size)

Neither application is aware that the data will be propagated over the network. They are both set up as regular iceoryx2 applications.

Tunneling over the network

With the applications set up, we now need to spin up the tunnel to extend the communication over the network. The tunnel can either be run in a separate process or embedded into the application. First, let’s use the iceoryx2-cli to run the tunnel in a separate process.

Installing the CLI

First install the iceoryx2-cli which is the entry point for all CLI commands:

cargo install iceoryx2-cli

Next, the CLI for a specific tunnel backend must be installed. For this article, we use the Zenoh backend:

cargo install iceoryx2-integrations-zenoh-tunnel-cli

The command iox2 tunnel --list can be used to verify that the installation was successful. This command lists all installed tunnel backends discovered by the CLI.

The tunnel should be installed on both hosts.

Running a tunnel process

The CLI can now be used to start a tunnel process on both hosts:

iox2 tunnel zenoh

Each tunnel discovers iceoryx2 services, both on its own host and over the communication mechanism used by the chosen backend implementation. Larry’s tunnel sees larry/battery and larry/position and forwards all samples published to them over the wire. The dashboard tunnel receives these samples from the wire, and injects them into the iceoryx2 shared memory to be received by subscribers to matching services on the dashboard host. The two applications communicate over services, as if they were running on the same host.

By default the tunnel polls for new services and pending samples every 100 ms, however since the event-based communication pattern is used, it is possible to configure the tunnel to execute reactively.

The tunnel running on Larry only has work to do when new samples are published on the services that need to be tunneled:

iox2 tunnel zenoh --listener "larry/battery" --listener "larry/position"

The tunnel running on the dashboard host only has work to do when data arrives over the backend:

iox2 tunnel zenoh --reactive-backend

Nothing here is application code. Both applications were written, compiled, and deployed with no knowledge that a tunnel exists. Communicating across the hosts is purely a deployment decision.

Embedding the tunnel

Note

The embeddable tunnel is currently only available as a Rust library. C++, C, and Python applications must use the tunnel from a separate process with the iox2 tunnel CLI shown above; the remainder of this section applies to Rust.

Running the tunnel as its own process is convenient, but it’s not the only option. The tunnel is a library, and it spawns no threads of its own so you have full control over when and where to run it — it can be executed within a dedicated time slot of a time-triggered architecture, or on a separate thread that you control, pinned to a specific core so as not to interfere with application execution.

First add the generic tunnel and your chosen backend as dependencies:

iceoryx2-services-tunnel = { version = "X.Y.Z" }
iceoryx2-integrations-zenoh-tunnel-backend = { version = "X.Y.Z" }

The tunnel can then be instantiated in your application via the provided builder. The chosen backend implementation is passed as a generic on creation, here we again use the ZenohBackend, but any other implementation can be dropped in, even your own. Specifying polled() in the build returns a tunnel that can be manually driven:

use core::time::Duration;
use iceoryx2::prelude::*;
use iceoryx2_integrations_zenoh_tunnel_backend::ZenohBackend;
use iceoryx2_services_tunnel::Tunnel;

const POLL_INTERVAL: Duration = Duration::from_millis(100);

let mut tunnel = Tunnel::<ipc::Service, ZenohBackend<ipc::Service>>::new()
    .polled()
    .create()?;

while tunnel.node().wait(POLL_INTERVAL).is_ok() {
    tunnel.discover()?;
    tunnel.propagate()?;
}

Execution is driven via two API calls. The discover() call reconciles which services exist locally and over the wire. The propagate() call moves the data in both directions: from the wire into iceoryx2 shared memory and vice versa. Calling the two on the polling tunnel in a timed loop replicates the default behavior of the CLI.

Now let’s take a look at how to replicate the reactive configurations provided by the tunnel. Recapping the details, both tunnels must be set up to react to different things:

  1. Larry’s tunnel reacts to the services notified when a sample is published

  2. The dashboard’s tunnel reacts to data arriving over the network

Since Larry’s tunnel does not react to network communication, it can be created in polling mode. Listeners are created for the services notified with each publish, and a WaitSet is set up to react to notifications from both of them in a single thread:

use iceoryx2::prelude::*;
use iceoryx2_integrations_zenoh_tunnel_backend::ZenohBackend;
use iceoryx2_services_tunnel::Tunnel;

let mut tunnel = Tunnel::<ipc::Service, ZenohBackend<ipc::Service>>::new()
    .polled()
    .create()?;

// wake whenever Larry publishes locally, so freshly produced samples are
// pushed out promptly
let battery_listener = tunnel
    .node()
    .service_builder(&"larry/battery".try_into()?)
    .event()
    .open_or_create()?
    .listener_builder()
    .create()?;
let position_listener = tunnel
    .node()
    .service_builder(&"larry/position".try_into()?)
    .event()
    .open_or_create()?
    .listener_builder()
    .create()?;

let waitset = WaitSetBuilder::new().create::<ipc::Service>()?;
let _battery_guard = waitset.attach_notification(&battery_listener)?;
let _position_guard = waitset.attach_notification(&position_listener)?;

waitset.wait_and_process(|_| {
    let _ = battery_listener.try_wait(|_| {});
    let _ = position_listener.try_wait(|_| {});
    let _ = tunnel.discover();
    let _ = tunnel.propagate();

    CallbackProgression::Continue
})?;

The dashboard tunnel only cares about data arriving over the network, so it is created in reactive mode, which provides a listener along with the created tunnel. This listener is notified when new data is available from the backend specified on creation. No WaitSet is needed here, since there is only one listener to wait on:

use iceoryx2::prelude::*;
use iceoryx2_integrations_zenoh_tunnel_backend::ZenohBackend;
use iceoryx2_services_tunnel::Tunnel;

// reactive mode hands back a listener that wakes when the backend has
// delivered something to ingest from the wire
let (mut tunnel, listener) = Tunnel::<ipc::Service, ZenohBackend<ipc::Service>>::new()
    .reactive()
    .create()?;

while listener.blocking_wait(|_| {}).is_ok() {
    tunnel.discover()?;
    tunnel.propagate()?;
}

Both approaches run the same discover() and propagate() calls; they differ only in what decides when those calls happen. Once again, the tunnel has no background threads, so you are free to choose where and when these operations run, allowing you to plan for and minimize unpredictability.

Switching the mechanism

Let’s say that later in development you decide a different mechanism should carry the cross-host communication. Perhaps you want to reach the dashboard over the internet by integrating with existing MQTT infrastructure, or you need to bridge co-processors on a single board that each keep their own memory and can only communicate over a cross-chip API.

All that is required, is to switch out the chosen backend when creating the tunnel. In our examples above, we have been using Zenoh, so the tunnels were created with:

Tunnel::<ipc::Service, ZenohBackend<ipc::Service>>

The first parameter is the local iceoryx2 service type; the second is the backend. The tunnel itself knows nothing about Zenoh. It is written against the backend traits, and discover() and propagate() are the same calls regardless of what carries the bytes.

Switching over to a different mechanism is therefore only a change of the type specified on tunnel creation:

Tunnel::<ipc::Service, SomeOtherBackend<ipc::Service>>

The publishers, the subscribers, the embed loops, and the choice of polled or reactive mode are all untouched.

Over time, additional backend implementations will be provided upstream alongside iceoryx2, however if there isn’t an existing implementation for the mechanism you would like to use, it is possible to implement your own using the traits provided in iceoryx2-services-tunnel-backend.

Further Reading

Gateways and Tunnels

The feature that this tutorial puts to work.

Gateways and Tunnels
Event-Driven Communication

The control-flow pattern the dashboard and tunnel employ.

Event-Driven Communication
Execution Control Patterns

Overview of execution patterns possible with iceoryx2.

Execution Control Patterns