Cross-host communication¶
Learning Objectives
This tutorial extends an iceoryx2 system across host boundaries using the
tunnel:
Enable cross-host communication with the
iox2 tunnelCLI, without touching application codeEmbed the tunnel in your own application for full control over its execution
Drive the tunnel in polling and reactive modes
Switch the communication mechanism by instantiating a different backend
So far Larry has been a self-contained system: his sensors, his control logic,
and his actuators all reside on one host and talk over iceoryx2’s
shared memory. But shared memory does not extend beyond the boundary of the
host.
Imagine now that the user wants to remotely track where Larry is and how much battery he has left. Some kind of dashboard hosted on another device on the same network.
The required data is already flowing through iceoryx2’s shared memory. A
gateway or a tunnel can hook into that
flow and propagate it to other hosts. To get the data to the dashboard,
we will use a network tunnel, which propagates the raw shared memory payloads
over the network.
Tip
A gateway or tunnel can be implemented for any communication mechanism, not
just network communication. All that is required is an implementation of
the Backend traits available in iceoryx2-services-tunnel-backend.
For example, the tunnel has been used to extend communication between co-processors on a single board that have their own memory using cross-chip communication APIs.
Application setup¶
As in previous articles in this series, the data flowing through Larry are
defined as ZeroCopySend structs:
use iceoryx2::prelude::*;
#[derive(Debug, Clone, Copy, ZeroCopySend)]
// shared type name; must match across languages
#[type_name("BatteryState")]
#[repr(C)]
pub struct BatteryState {
pub charge_percent: f32,
}
#[derive(Debug, Clone, Copy, ZeroCopySend)]
// shared type name; must match across languages
#[type_name("Position")]
#[repr(C)]
pub struct Position {
pub x: f32,
pub y: f32,
}
import ctypes
class BatteryState(ctypes.Structure):
"""Battery telemetry payload."""
_fields_ = [("charge_percent", ctypes.c_float)]
# shared type name; must match across languages
@staticmethod
def type_name() -> str:
return "BatteryState"
class Position(ctypes.Structure):
"""Position telemetry payload."""
_fields_ = [("x", ctypes.c_float), ("y", ctypes.c_float)]
# shared type name; must match across languages
@staticmethod
def type_name() -> str:
return "Position"
struct BatteryState {
// shared type name; must match across languages
static constexpr const char* IOX2_TYPE_NAME = "BatteryState";
float charge_percent;
};
struct Position {
// shared type name; must match across languages
static constexpr const char* IOX2_TYPE_NAME = "Position";
float x;
float y;
};
// shared type name; must match across languages
#define BATTERY_STATE_TYPE_NAME "BatteryState"
struct BatteryState {
float charge_percent;
};
// shared type name; must match across languages
#define POSITION_TYPE_NAME "Position"
struct Position {
float x;
float y;
};
For demonstration purposes, we use the event-driven communication execution pattern, which is the same approach
used by many popular network communication libraries. However, with iceoryx2,
you are free to choose the best approach that suits your application — the
tunnel can be configured to work with any approach. See execution control
patterns for an overview.
Note
The tunnel currently only supports the publish-subscribe and event messaging patterns. Support for request-response and blackboard is planned.
We set Larry up to send a notification on a corresponding event service along with every sample that is published:
use core::time::Duration;
use cross_host_communication::telemetry_data::{BatteryState, Position};
use iceoryx2::prelude::*;
const CYCLE_TIME: Duration = Duration::from_secs(1);
let node = NodeBuilder::new()
.name(&"Larry".try_into()?)
.create::<ipc::Service>()?;
let battery_publisher = node
.service_builder(&"larry/battery".try_into()?)
.publish_subscribe::<BatteryState>()
.open_or_create()?
.publisher_builder()
.create()?;
let battery_notifier = node
.service_builder(&"larry/battery".try_into()?)
.event()
.open_or_create()?
.notifier_builder()
.create()?;
let position_publisher = node
.service_builder(&"larry/position".try_into()?)
.publish_subscribe::<Position>()
.open_or_create()?
.publisher_builder()
.create()?;
let position_notifier = node
.service_builder(&"larry/position".try_into()?)
.event()
.open_or_create()?
.notifier_builder()
.create()?;
while node.wait(CYCLE_TIME).is_ok() {
let battery = battery_publisher.loan_uninit()?;
let battery = battery.write_payload(BatteryState {
charge_percent: 87.0,
});
battery.send()?;
battery_notifier.notify()?;
let position = position_publisher.loan_uninit()?;
let position = position.write_payload(Position { x: 1.0, y: 2.0 });
position.send()?;
position_notifier.notify()?;
}
import iceoryx2 as iox2
from telemetry_data import BatteryState, Position
cycle_time = iox2.Duration.from_secs(1)
iox2.set_log_level_from_env_or(iox2.LogLevel.Info)
node = iox2.NodeBuilder.new().create(iox2.ServiceType.Ipc)
battery_publisher = (
node.service_builder(iox2.ServiceName.new("larry/battery"))
.publish_subscribe(BatteryState)
.open_or_create()
.publisher_builder()
.create()
)
battery_notifier = (
node.service_builder(iox2.ServiceName.new("larry/battery"))
.event()
.open_or_create()
.notifier_builder()
.create()
)
position_publisher = (
node.service_builder(iox2.ServiceName.new("larry/position"))
.publish_subscribe(Position)
.open_or_create()
.publisher_builder()
.create()
)
position_notifier = (
node.service_builder(iox2.ServiceName.new("larry/position"))
.event()
.open_or_create()
.notifier_builder()
.create()
)
try:
while True:
node.wait(cycle_time)
battery = battery_publisher.loan_uninit()
battery = battery.write_payload(BatteryState(charge_percent=87.0))
battery.send()
battery_notifier.notify()
position = position_publisher.loan_uninit()
position = position.write_payload(Position(x=1.0, y=2.0))
position.send()
position_notifier.notify()
except iox2.NodeWaitFailure:
print("exit")
constexpr iox2::bb::Duration CYCLE_TIME = iox2::bb::Duration::from_secs(1);
auto main() -> int {
using namespace iox2;
set_log_level_from_env_or(LogLevel::Info);
auto node = NodeBuilder().name(NodeName::create("Larry").value()).create<ServiceType::Ipc>().value();
auto battery_publisher = node.service_builder(ServiceName::create("larry/battery").value())
.publish_subscribe<BatteryState>()
.open_or_create()
.value()
.publisher_builder()
.create()
.value();
auto battery_notifier = node.service_builder(ServiceName::create("larry/battery").value())
.event()
.open_or_create()
.value()
.notifier_builder()
.create()
.value();
auto position_publisher = node.service_builder(ServiceName::create("larry/position").value())
.publish_subscribe<Position>()
.open_or_create()
.value()
.publisher_builder()
.create()
.value();
auto position_notifier = node.service_builder(ServiceName::create("larry/position").value())
.event()
.open_or_create()
.value()
.notifier_builder()
.create()
.value();
while (node.wait(CYCLE_TIME).has_value()) {
auto battery = battery_publisher.loan_uninit().value();
auto initialized_battery = battery.write_payload(BatteryState { 87.0F }); // NOLINT
send(std::move(initialized_battery)).has_value();
battery_notifier.notify().has_value();
auto position = position_publisher.loan_uninit().value();
auto initialized_position = position.write_payload(Position { 1.0F, 2.0F }); // NOLINT
send(std::move(initialized_position)).has_value();
position_notifier.notify().has_value();
}
return 0;
}
#define CYCLE_TIME_SECS 1
#define CYCLE_TIME_NANOS 0
int main(void) {
iox2_set_log_level_from_env_or(iox2_log_level_e_INFO);
int ret_val = 0;
// create node
iox2_node_builder_h node_builder = iox2_node_builder_new(NULL);
iox2_node_h node = NULL;
ret_val = iox2_node_builder_create(node_builder, NULL, iox2_service_type_e_IPC, &node);
if (ret_val != IOX2_OK) {
printf("Could not create node! Error: %d\n", ret_val);
goto end;
}
// create battery publish-subscribe service
iox2_service_name_h battery_name = NULL;
const char* battery_name_value = "larry/battery";
iox2_service_name_new(NULL, battery_name_value, strlen(battery_name_value), &battery_name);
iox2_service_builder_pub_sub_h battery_builder =
iox2_service_builder_pub_sub(iox2_node_service_builder(&node, NULL, iox2_cast_service_name_ptr(battery_name)));
iox2_service_builder_pub_sub_set_payload_type_details(&battery_builder,
iox2_type_variant_e_FIXED_SIZE,
BATTERY_STATE_TYPE_NAME,
strlen(BATTERY_STATE_TYPE_NAME),
sizeof(struct BatteryState),
alignof(struct BatteryState));
iox2_port_factory_pub_sub_h battery_service = NULL;
ret_val = iox2_service_builder_pub_sub_open_or_create(battery_builder, NULL, &battery_service);
if (ret_val != IOX2_OK) {
printf("Could not create battery service! Error: %d\n", ret_val);
goto drop_node;
}
// create position publish-subscribe service
iox2_service_name_h position_name = NULL;
const char* position_name_value = "larry/position";
iox2_service_name_new(NULL, position_name_value, strlen(position_name_value), &position_name);
iox2_service_builder_pub_sub_h position_builder =
iox2_service_builder_pub_sub(iox2_node_service_builder(&node, NULL, iox2_cast_service_name_ptr(position_name)));
iox2_service_builder_pub_sub_set_payload_type_details(&position_builder,
iox2_type_variant_e_FIXED_SIZE,
POSITION_TYPE_NAME,
strlen(POSITION_TYPE_NAME),
sizeof(struct Position),
alignof(struct Position));
iox2_port_factory_pub_sub_h position_service = NULL;
ret_val = iox2_service_builder_pub_sub_open_or_create(position_builder, NULL, &position_service);
if (ret_val != IOX2_OK) {
printf("Could not create position service! Error: %d\n", ret_val);
goto drop_battery_service;
}
// create battery event service
iox2_service_builder_event_h battery_event_builder =
iox2_service_builder_event(iox2_node_service_builder(&node, NULL, iox2_cast_service_name_ptr(battery_name)));
iox2_port_factory_event_h battery_event_service = NULL;
ret_val = iox2_service_builder_event_open_or_create(battery_event_builder, NULL, &battery_event_service);
if (ret_val != IOX2_OK) {
printf("Could not create battery event service! Error: %d\n", ret_val);
goto drop_position_service;
}
// create position event service
iox2_service_builder_event_h position_event_builder =
iox2_service_builder_event(iox2_node_service_builder(&node, NULL, iox2_cast_service_name_ptr(position_name)));
iox2_port_factory_event_h position_event_service = NULL;
ret_val = iox2_service_builder_event_open_or_create(position_event_builder, NULL, &position_event_service);
if (ret_val != IOX2_OK) {
printf("Could not create position event service! Error: %d\n", ret_val);
goto drop_battery_event_service;
}
// create publishers
iox2_publisher_h battery_publisher = NULL;
iox2_port_factory_publisher_builder_create(
iox2_port_factory_pub_sub_publisher_builder(&battery_service, NULL), NULL, &battery_publisher);
iox2_publisher_h position_publisher = NULL;
iox2_port_factory_publisher_builder_create(
iox2_port_factory_pub_sub_publisher_builder(&position_service, NULL), NULL, &position_publisher);
// create notifiers
iox2_notifier_h battery_notifier = NULL;
iox2_port_factory_notifier_builder_create(
iox2_port_factory_event_notifier_builder(&battery_event_service, NULL), NULL, &battery_notifier);
iox2_notifier_h position_notifier = NULL;
iox2_port_factory_notifier_builder_create(
iox2_port_factory_event_notifier_builder(&position_event_service, NULL), NULL, &position_notifier);
while (iox2_node_wait(&node, CYCLE_TIME_SECS, CYCLE_TIME_NANOS) == IOX2_OK) {
// publish battery state and notify
iox2_sample_mut_h battery_sample = NULL;
if (iox2_publisher_loan_slice_uninit(&battery_publisher, NULL, &battery_sample, 1) == IOX2_OK) {
struct BatteryState* battery_payload = NULL;
iox2_sample_mut_payload_mut(&battery_sample, (void**) &battery_payload, NULL);
battery_payload->charge_percent = 87.0F; // NOLINT
iox2_sample_mut_send(battery_sample, NULL);
iox2_notifier_notify(&battery_notifier, NULL);
}
// publish position and notify
iox2_sample_mut_h position_sample = NULL;
if (iox2_publisher_loan_slice_uninit(&position_publisher, NULL, &position_sample, 1) == IOX2_OK) {
struct Position* position_payload = NULL;
iox2_sample_mut_payload_mut(&position_sample, (void**) &position_payload, NULL);
position_payload->x = 1.0F; // NOLINT
position_payload->y = 2.0F; // NOLINT
iox2_sample_mut_send(position_sample, NULL);
iox2_notifier_notify(&position_notifier, NULL);
}
}
iox2_notifier_drop(position_notifier);
iox2_notifier_drop(battery_notifier);
iox2_publisher_drop(position_publisher);
iox2_publisher_drop(battery_publisher);
iox2_port_factory_event_drop(position_event_service);
drop_battery_event_service:
iox2_port_factory_event_drop(battery_event_service);
drop_position_service:
iox2_port_factory_pub_sub_drop(position_service);
drop_battery_service:
iox2_port_factory_pub_sub_drop(battery_service);
drop_node:
iox2_node_drop(node);
end:
return ret_val;
}
In the dashboard application running on another host, we set up listeners to
wake the thread when new data arrives, and subscribers to receive the data.
A WaitSet is used to react
to events from both listeners in a single thread.
use cross_host_communication::telemetry_data::{BatteryState, Position};
use iceoryx2::prelude::*;
let node = NodeBuilder::new()
.name(&"Dashboard".try_into()?)
.create::<ipc::Service>()?;
let battery_subscriber = node
.service_builder(&"larry/battery".try_into()?)
.publish_subscribe::<BatteryState>()
.open_or_create()?
.subscriber_builder()
.create()?;
let battery_listener = node
.service_builder(&"larry/battery".try_into()?)
.event()
.open_or_create()?
.listener_builder()
.create()?;
let position_subscriber = node
.service_builder(&"larry/position".try_into()?)
.publish_subscribe::<Position>()
.open_or_create()?
.subscriber_builder()
.create()?;
let position_listener = node
.service_builder(&"larry/position".try_into()?)
.event()
.open_or_create()?
.listener_builder()
.create()?;
let waitset = WaitSetBuilder::new().create::<ipc::Service>()?;
let battery_guard = waitset.attach_notification(&battery_listener)?;
let position_guard = waitset.attach_notification(&position_listener)?;
let on_event = |attachment_id: WaitSetAttachmentId<ipc::Service>| {
if attachment_id.has_event_from(&battery_guard) {
// drain every pending notification, otherwise the WaitSet wakes
// again immediately and spins
battery_listener.try_wait(|_| {}).unwrap();
while let Ok(Some(sample)) = battery_subscriber.receive() {
println!("battery: {}%", sample.charge_percent);
}
} else if attachment_id.has_event_from(&position_guard) {
// drain every pending notification, otherwise the WaitSet wakes
// again immediately and spins
position_listener.try_wait(|_| {}).unwrap();
while let Ok(Some(sample)) = position_subscriber.receive() {
println!("position: ({}, {})", sample.x, sample.y);
}
}
CallbackProgression::Continue
};
waitset.wait_and_process(on_event)?;
import iceoryx2 as iox2
from telemetry_data import BatteryState, Position
iox2.set_log_level_from_env_or(iox2.LogLevel.Info)
node = iox2.NodeBuilder.new().create(iox2.ServiceType.Ipc)
battery_subscriber = (
node.service_builder(iox2.ServiceName.new("larry/battery"))
.publish_subscribe(BatteryState)
.open_or_create()
.subscriber_builder()
.create()
)
battery_listener = (
node.service_builder(iox2.ServiceName.new("larry/battery"))
.event()
.open_or_create()
.listener_builder()
.create()
)
position_subscriber = (
node.service_builder(iox2.ServiceName.new("larry/position"))
.publish_subscribe(Position)
.open_or_create()
.subscriber_builder()
.create()
)
position_listener = (
node.service_builder(iox2.ServiceName.new("larry/position"))
.event()
.open_or_create()
.listener_builder()
.create()
)
waitset = iox2.WaitSetBuilder.new().create(iox2.ServiceType.Ipc)
battery_guard = waitset.attach_notification(battery_listener)
position_guard = waitset.attach_notification(position_listener)
try:
while True:
attachments, result = waitset.wait_and_process()
if result in (
iox2.WaitSetRunResult.TerminationRequest,
iox2.WaitSetRunResult.Interrupt,
):
break
for attachment_id in attachments:
if attachment_id.has_event_from(battery_guard):
# drain every pending notification, otherwise the WaitSet wakes
# again immediately and spins
battery_listener.try_wait()
sample = battery_subscriber.receive()
while sample is not None:
print("battery:", sample.payload().contents.charge_percent, "%")
sample = battery_subscriber.receive()
elif attachment_id.has_event_from(position_guard):
# drain every pending notification, otherwise the WaitSet wakes
# again immediately and spins
position_listener.try_wait()
sample = position_subscriber.receive()
while sample is not None:
position = sample.payload().contents
print("position:", (position.x, position.y))
sample = position_subscriber.receive()
except iox2.WaitSetRunError:
print("exit")
auto main() -> int {
using namespace iox2;
set_log_level_from_env_or(LogLevel::Info);
auto node = NodeBuilder().name(NodeName::create("Dashboard").value()).create<ServiceType::Ipc>().value();
auto battery_subscriber = node.service_builder(ServiceName::create("larry/battery").value())
.publish_subscribe<BatteryState>()
.open_or_create()
.value()
.subscriber_builder()
.create()
.value();
auto battery_listener = node.service_builder(ServiceName::create("larry/battery").value())
.event()
.open_or_create()
.value()
.listener_builder()
.create()
.value();
auto position_subscriber = node.service_builder(ServiceName::create("larry/position").value())
.publish_subscribe<Position>()
.open_or_create()
.value()
.subscriber_builder()
.create()
.value();
auto position_listener = node.service_builder(ServiceName::create("larry/position").value())
.event()
.open_or_create()
.value()
.listener_builder()
.create()
.value();
auto waitset = WaitSetBuilder().create<ServiceType::Ipc>().value();
auto battery_guard = waitset.attach_notification(battery_listener).value();
auto position_guard = waitset.attach_notification(position_listener).value();
auto on_event = [&](WaitSetAttachmentId<ServiceType::Ipc> attachment_id) -> CallbackProgression {
if (attachment_id.has_event_from(battery_guard)) {
// drain every pending notification, otherwise the WaitSet wakes
// again immediately and spins
battery_listener.try_wait([](auto) {}).value();
auto battery = battery_subscriber.receive().value();
while (battery.has_value()) {
std::cout << "battery: " << battery->payload().charge_percent << "%" << std::endl;
battery = battery_subscriber.receive().value();
}
} else if (attachment_id.has_event_from(position_guard)) {
// drain every pending notification, otherwise the WaitSet wakes
// again immediately and spins
position_listener.try_wait([](auto) {}).value();
auto position = position_subscriber.receive().value();
while (position.has_value()) {
std::cout << "position: (" << position->payload().x << ", " << position->payload().y << ")"
<< std::endl;
position = position_subscriber.receive().value();
}
}
return CallbackProgression::Continue;
};
waitset.wait_and_process(on_event).value();
return 0;
}
struct CallbackContext {
iox2_waitset_guard_h_ref battery_guard;
iox2_waitset_guard_h_ref position_guard;
iox2_listener_h_ref battery_listener;
iox2_listener_h_ref position_listener;
iox2_subscriber_h_ref battery_subscriber;
iox2_subscriber_h_ref position_subscriber;
};
// no-op callback: draining only needs to clear the queue, not inspect events
static void drain_event(const iox2_event_id_t* event_id, uint64_t count, void* context) {
(void) event_id;
(void) count;
(void) context;
}
// called whenever a listener attached to the WaitSet has received an event
static iox2_callback_progression_e on_event(iox2_waitset_attachment_id_h attachment_id, void* context) {
struct CallbackContext* ctx = (struct CallbackContext*) context;
uint64_t number_of_notifications = 0;
if (iox2_waitset_attachment_id_has_event_from(&attachment_id, ctx->battery_guard)) {
// drain every pending notification, otherwise the WaitSet wakes
// again immediately and spins
iox2_listener_try_wait(ctx->battery_listener, &number_of_notifications, drain_event, NULL);
iox2_sample_h battery_sample = NULL;
while (iox2_subscriber_receive(ctx->battery_subscriber, NULL, &battery_sample) == IOX2_OK
&& battery_sample != NULL) {
struct BatteryState* payload = NULL;
iox2_sample_payload(&battery_sample, (const void**) &payload, NULL);
printf("battery: %.1f%%\n", (double) payload->charge_percent);
iox2_sample_drop(battery_sample);
battery_sample = NULL;
}
} else if (iox2_waitset_attachment_id_has_event_from(&attachment_id, ctx->position_guard)) {
// drain every pending notification, otherwise the WaitSet wakes
// again immediately and spins
iox2_listener_try_wait(ctx->position_listener, &number_of_notifications, drain_event, NULL);
iox2_sample_h position_sample = NULL;
while (iox2_subscriber_receive(ctx->position_subscriber, NULL, &position_sample) == IOX2_OK
&& position_sample != NULL) {
struct Position* payload = NULL;
iox2_sample_payload(&position_sample, (const void**) &payload, NULL);
printf("position: (%.1f, %.1f)\n", (double) payload->x, (double) payload->y);
iox2_sample_drop(position_sample);
position_sample = NULL;
}
}
iox2_waitset_attachment_id_drop(attachment_id);
return iox2_callback_progression_e_CONTINUE;
}
// NOLINTBEGIN(readability-function-size)
int main(void) {
iox2_set_log_level_from_env_or(iox2_log_level_e_INFO);
int ret_val = 0;
// create node
iox2_node_builder_h node_builder = iox2_node_builder_new(NULL);
iox2_node_h node = NULL;
ret_val = iox2_node_builder_create(node_builder, NULL, iox2_service_type_e_IPC, &node);
if (ret_val != IOX2_OK) {
printf("Could not create node! Error: %d\n", ret_val);
goto end;
}
// create battery publish-subscribe service
iox2_service_name_h battery_name = NULL;
const char* battery_name_value = "larry/battery";
iox2_service_name_new(NULL, battery_name_value, strlen(battery_name_value), &battery_name);
iox2_service_builder_pub_sub_h battery_builder =
iox2_service_builder_pub_sub(iox2_node_service_builder(&node, NULL, iox2_cast_service_name_ptr(battery_name)));
iox2_service_builder_pub_sub_set_payload_type_details(&battery_builder,
iox2_type_variant_e_FIXED_SIZE,
BATTERY_STATE_TYPE_NAME,
strlen(BATTERY_STATE_TYPE_NAME),
sizeof(struct BatteryState),
alignof(struct BatteryState));
iox2_port_factory_pub_sub_h battery_service = NULL;
ret_val = iox2_service_builder_pub_sub_open_or_create(battery_builder, NULL, &battery_service);
if (ret_val != IOX2_OK) {
printf("Could not create battery service! Error: %d\n", ret_val);
goto drop_node;
}
// create position publish-subscribe service
iox2_service_name_h position_name = NULL;
const char* position_name_value = "larry/position";
iox2_service_name_new(NULL, position_name_value, strlen(position_name_value), &position_name);
iox2_service_builder_pub_sub_h position_builder =
iox2_service_builder_pub_sub(iox2_node_service_builder(&node, NULL, iox2_cast_service_name_ptr(position_name)));
iox2_service_builder_pub_sub_set_payload_type_details(&position_builder,
iox2_type_variant_e_FIXED_SIZE,
POSITION_TYPE_NAME,
strlen(POSITION_TYPE_NAME),
sizeof(struct Position),
alignof(struct Position));
iox2_port_factory_pub_sub_h position_service = NULL;
ret_val = iox2_service_builder_pub_sub_open_or_create(position_builder, NULL, &position_service);
if (ret_val != IOX2_OK) {
printf("Could not create position service! Error: %d\n", ret_val);
goto drop_battery_service;
}
// create battery event service
iox2_service_builder_event_h battery_event_builder =
iox2_service_builder_event(iox2_node_service_builder(&node, NULL, iox2_cast_service_name_ptr(battery_name)));
iox2_port_factory_event_h battery_event_service = NULL;
ret_val = iox2_service_builder_event_open_or_create(battery_event_builder, NULL, &battery_event_service);
if (ret_val != IOX2_OK) {
printf("Could not create battery event service! Error: %d\n", ret_val);
goto drop_position_service;
}
// create position event service
iox2_service_builder_event_h position_event_builder =
iox2_service_builder_event(iox2_node_service_builder(&node, NULL, iox2_cast_service_name_ptr(position_name)));
iox2_port_factory_event_h position_event_service = NULL;
ret_val = iox2_service_builder_event_open_or_create(position_event_builder, NULL, &position_event_service);
if (ret_val != IOX2_OK) {
printf("Could not create position event service! Error: %d\n", ret_val);
goto drop_battery_event_service;
}
// create subscribers
iox2_subscriber_h battery_subscriber = NULL;
iox2_port_factory_subscriber_builder_create(
iox2_port_factory_pub_sub_subscriber_builder(&battery_service, NULL), NULL, &battery_subscriber);
iox2_subscriber_h position_subscriber = NULL;
iox2_port_factory_subscriber_builder_create(
iox2_port_factory_pub_sub_subscriber_builder(&position_service, NULL), NULL, &position_subscriber);
// create listeners
iox2_listener_h battery_listener = NULL;
iox2_port_factory_listener_builder_create(
iox2_port_factory_event_listener_builder(&battery_event_service, NULL), NULL, &battery_listener);
iox2_listener_h position_listener = NULL;
iox2_port_factory_listener_builder_create(
iox2_port_factory_event_listener_builder(&position_event_service, NULL), NULL, &position_listener);
// create waitset and attach both listeners
iox2_waitset_builder_h waitset_builder = NULL;
iox2_waitset_builder_new(NULL, &waitset_builder);
iox2_waitset_h waitset = NULL;
ret_val = iox2_waitset_builder_create(waitset_builder, iox2_service_type_e_IPC, NULL, &waitset);
if (ret_val != IOX2_OK) {
printf("Could not create waitset! Error: %d\n", ret_val);
goto drop_listeners;
}
iox2_waitset_guard_h battery_guard = NULL;
ret_val = iox2_waitset_attach_notification(
&waitset, iox2_listener_get_file_descriptor(&battery_listener), NULL, &battery_guard);
if (ret_val != IOX2_OK) {
printf("Could not attach battery listener! Error: %d\n", ret_val);
goto drop_waitset;
}
iox2_waitset_guard_h position_guard = NULL;
ret_val = iox2_waitset_attach_notification(
&waitset, iox2_listener_get_file_descriptor(&position_listener), NULL, &position_guard);
if (ret_val != IOX2_OK) {
printf("Could not attach position listener! Error: %d\n", ret_val);
goto drop_battery_guard;
}
struct CallbackContext context;
context.battery_guard = &battery_guard;
context.position_guard = &position_guard;
context.battery_listener = &battery_listener;
context.position_listener = &position_listener;
context.battery_subscriber = &battery_subscriber;
context.position_subscriber = &position_subscriber;
iox2_waitset_run_result_e result = iox2_waitset_run_result_e_STOP_REQUEST;
ret_val = iox2_waitset_wait_and_process(&waitset, on_event, (void*) &context, &result);
if (ret_val != IOX2_OK) {
printf("Failure in WaitSet wait_and_process loop! Error: %d\n", ret_val);
}
iox2_waitset_guard_drop(position_guard);
drop_battery_guard:
iox2_waitset_guard_drop(battery_guard);
drop_waitset:
iox2_waitset_drop(waitset);
drop_listeners:
iox2_listener_drop(position_listener);
iox2_listener_drop(battery_listener);
iox2_subscriber_drop(position_subscriber);
iox2_subscriber_drop(battery_subscriber);
iox2_port_factory_event_drop(position_event_service);
drop_battery_event_service:
iox2_port_factory_event_drop(battery_event_service);
drop_position_service:
iox2_port_factory_pub_sub_drop(position_service);
drop_battery_service:
iox2_port_factory_pub_sub_drop(battery_service);
drop_node:
iox2_node_drop(node);
end:
return ret_val;
}
// NOLINTEND(readability-function-size)
Neither application is aware that the data will be propagated over the network.
They are both set up as regular iceoryx2 applications.
Tunneling over the network¶
With the applications set up, we now need to spin up the tunnel to extend
the communication over the network. The tunnel can either be run in a separate
process or embedded into the application. First, let’s use the iceoryx2-cli
to run the tunnel in a separate process.
Installing the CLI¶
First install the iceoryx2-cli
which is the entry point for all CLI commands:
cargo install iceoryx2-cli
Next, the CLI for a specific tunnel backend must be installed. For this article, we use the Zenoh backend:
cargo install iceoryx2-integrations-zenoh-tunnel-cli
The command iox2 tunnel --list can be used to verify that the installation
was successful. This command lists all installed tunnel backends discovered
by the CLI.
The tunnel should be installed on both hosts.
Running a tunnel process¶
The CLI can now be used to start a tunnel process on both hosts:
iox2 tunnel zenoh
Each tunnel discovers iceoryx2 services, both on its own host and over
the communication mechanism used by the chosen backend implementation.
Larry’s tunnel sees larry/battery and larry/position and forwards all
samples published to them over the wire. The dashboard tunnel receives these
samples from the wire, and injects them into the iceoryx2 shared memory to be
received by subscribers to matching services on the dashboard host. The two
applications communicate over services, as if they were running on the same
host.
By default the tunnel polls for new services and pending samples every 100 ms, however since the event-based communication pattern is used, it is possible to configure the tunnel to execute reactively.
The tunnel running on Larry only has work to do when new samples are published on the services that need to be tunneled:
iox2 tunnel zenoh --listener "larry/battery" --listener "larry/position"
The tunnel running on the dashboard host only has work to do when data arrives over the backend:
iox2 tunnel zenoh --reactive-backend
Nothing here is application code. Both applications were written, compiled, and deployed with no knowledge that a tunnel exists. Communicating across the hosts is purely a deployment decision.
Embedding the tunnel¶
Note
The embeddable tunnel is currently only available as a Rust library. C++, C,
and Python applications must use the tunnel from a separate process with the
iox2 tunnel CLI shown above; the remainder of this section applies to Rust.
Running the tunnel as its own process is convenient, but it’s not the only option. The tunnel is a library, and it spawns no threads of its own so you have full control over when and where to run it — it can be executed within a dedicated time slot of a time-triggered architecture, or on a separate thread that you control, pinned to a specific core so as not to interfere with application execution.
First add the generic tunnel and your chosen backend as dependencies:
iceoryx2-services-tunnel = { version = "X.Y.Z" }
iceoryx2-integrations-zenoh-tunnel-backend = { version = "X.Y.Z" }
The tunnel can then be instantiated in your application via the provided
builder. The chosen backend implementation is passed as a generic on creation,
here we again use the ZenohBackend, but any other implementation can be
dropped in, even your own. Specifying polled() in the build returns a tunnel
that can be manually driven:
use core::time::Duration;
use iceoryx2::prelude::*;
use iceoryx2_integrations_zenoh_tunnel_backend::ZenohBackend;
use iceoryx2_services_tunnel::Tunnel;
const POLL_INTERVAL: Duration = Duration::from_millis(100);
let mut tunnel = Tunnel::<ipc::Service, ZenohBackend<ipc::Service>>::new()
.polled()
.create()?;
while tunnel.node().wait(POLL_INTERVAL).is_ok() {
tunnel.discover()?;
tunnel.propagate()?;
}
Execution is driven via two API calls. The discover() call reconciles which
services exist locally and over the wire. The propagate() call moves the
data in both directions: from the wire into iceoryx2 shared memory and vice
versa. Calling the two on the polling tunnel in a timed loop
replicates the default behavior of the CLI.
Now let’s take a look at how to replicate the reactive configurations provided by the tunnel. Recapping the details, both tunnels must be set up to react to different things:
Larry’s tunnel reacts to the services notified when a sample is published
The dashboard’s tunnel reacts to data arriving over the network
Since Larry’s tunnel does not react to network communication, it can be
created in polling mode. Listeners are created for the services notified with
each publish, and a WaitSet is
set up to react to notifications from both of them in a single thread:
use iceoryx2::prelude::*;
use iceoryx2_integrations_zenoh_tunnel_backend::ZenohBackend;
use iceoryx2_services_tunnel::Tunnel;
let mut tunnel = Tunnel::<ipc::Service, ZenohBackend<ipc::Service>>::new()
.polled()
.create()?;
// wake whenever Larry publishes locally, so freshly produced samples are
// pushed out promptly
let battery_listener = tunnel
.node()
.service_builder(&"larry/battery".try_into()?)
.event()
.open_or_create()?
.listener_builder()
.create()?;
let position_listener = tunnel
.node()
.service_builder(&"larry/position".try_into()?)
.event()
.open_or_create()?
.listener_builder()
.create()?;
let waitset = WaitSetBuilder::new().create::<ipc::Service>()?;
let _battery_guard = waitset.attach_notification(&battery_listener)?;
let _position_guard = waitset.attach_notification(&position_listener)?;
waitset.wait_and_process(|_| {
let _ = battery_listener.try_wait(|_| {});
let _ = position_listener.try_wait(|_| {});
let _ = tunnel.discover();
let _ = tunnel.propagate();
CallbackProgression::Continue
})?;
The dashboard tunnel only cares about data arriving over the network, so it is
created in reactive mode, which provides a listener along with the created
tunnel. This listener is notified when new data is available from the backend
specified on creation.
No WaitSet is needed here,
since there is only one listener to wait on:
use iceoryx2::prelude::*;
use iceoryx2_integrations_zenoh_tunnel_backend::ZenohBackend;
use iceoryx2_services_tunnel::Tunnel;
// reactive mode hands back a listener that wakes when the backend has
// delivered something to ingest from the wire
let (mut tunnel, listener) = Tunnel::<ipc::Service, ZenohBackend<ipc::Service>>::new()
.reactive()
.create()?;
while listener.blocking_wait(|_| {}).is_ok() {
tunnel.discover()?;
tunnel.propagate()?;
}
Both approaches run the same discover() and propagate() calls; they differ
only in what decides when those calls happen. Once again, the tunnel has no
background threads, so you are free to choose where and when these operations
run, allowing you to plan for and minimize unpredictability.
Switching the mechanism¶
Let’s say that later in development you decide a different mechanism should carry the cross-host communication. Perhaps you want to reach the dashboard over the internet by integrating with existing MQTT infrastructure, or you need to bridge co-processors on a single board that each keep their own memory and can only communicate over a cross-chip API.
All that is required, is to switch out the chosen backend when creating the tunnel. In our examples above, we have been using Zenoh, so the tunnels were created with:
Tunnel::<ipc::Service, ZenohBackend<ipc::Service>>
The first parameter is the local iceoryx2 service type; the second is the
backend. The tunnel itself knows nothing about Zenoh. It is written against the
backend traits, and discover() and propagate() are the same calls
regardless of what carries the bytes.
Switching over to a different mechanism is therefore only a change of the type specified on tunnel creation:
Tunnel::<ipc::Service, SomeOtherBackend<ipc::Service>>
The publishers, the subscribers, the embed loops, and the choice of polled or reactive mode are all untouched.
Over time, additional backend implementations
will be provided upstream alongside iceoryx2, however if there isn’t an
existing implementation for the mechanism you would like to use, it is possible
to implement your own using the traits provided in iceoryx2-services-tunnel-backend.
Further Reading¶
The feature that this tutorial puts to work.
The control-flow pattern the dashboard and tunnel employ.
Overview of execution patterns possible with iceoryx2.