Event-Driven Communication¶
Before jumping into the example, we need to clarify the difference between data flow and control flow.
In programming, control flow is about when a function executes, while data
flow is about what inputs it receives. In iceoryx2 these two concepts are
separated on purpose, unlike in many network protocols where receiving data
automatically involves a syscall that wakes up the participant.
Messaging patterns like publish-subscribe and request-response define the data flow. The event messaging pattern handles control flow. Why split them? Because you don’t always want to pass data when triggering execution. Sometimes you just need to wake up a function with no additional input. Other times you need several inputs, and you don’t want to be interrupted until all of them have arrived. Mixing data and control flow forces you into hacks like sending empty messages or waking up too early.
By keeping them separate, iceoryx2 gives you explicit control over when your
system reacts.
Why It Matters¶
Take the emergency brake system of our robot Larry. It doesn’t need every single distance sample from the ultrasonic sensor. If the next obstacle detected is on another planet, we can ignore it. But when an obstacle is close, we suddenly care. And not just about the raw number but also the trend over time. If the object is receding, no action is needed; if it’s approaching, we may need to hit the brakes.
Now consider failure cases. If the sensor participant dies and never sends another update, Larry shouldn’t drive happily into oblivion. A safe fallback (parking, for example) is required. Another factor to consider: the emergency brake participant might start after the sensor participant. In that case, it still needs the most recent three distance samples to compute not just position, but relative speed and acceleration.
These scenarios show why we want independent streams for data and control signals.
Publisher Setup¶
We start by creating a node and defining two services with the same name:
a publish-subscribe service for distance samples,
an event service for control signals.
The subscriber needs the last three samples whenever it connects, so we configure both the subscriber buffer and history size accordingly. The subscriber also needs to hold on to/borrow three samples in parallel to compute position, speed, and acceleration.
use iceoryx2::prelude::*;
let node = NodeBuilder::new().create::<ipc::Service>()?;
let pubsub_service = node
.service_builder(&"distance_to_obstacle".try_into()?)
.publish_subscribe::<Distance>()
.subscriber_max_buffer_size(3)
.history_size(3)
.subscriber_max_borrowed_samples(3)
.open_or_create()?;
import iceoryx2 as iox2
node = iox2.NodeBuilder.new().create(iox2.ServiceType.Ipc)
pubsub_service = (
node.service_builder(iox2.ServiceName.new("distance_to_obstacle"))
.publish_subscribe(Distance)
.subscriber_max_buffer_size(3)
.history_size(3)
.subscriber_max_borrowed_samples(3)
.open_or_create()
)
auto node = NodeBuilder().create<ServiceType::Ipc>().value();
auto pubsub_service = node.service_builder(ServiceName::create("distance_to_obstacle").value())
.publish_subscribe<Distance>()
.subscriber_max_buffer_size(3)
.history_size(3)
.subscriber_max_borrowed_samples(3)
.open_or_create()
.value();
iox2_node_builder_h node_builder_handle = iox2_node_builder_new(NULL);
iox2_node_h node = NULL;
if (iox2_node_builder_create(node_builder_handle, NULL, iox2_service_type_e_IPC, &node) != IOX2_OK) {
printf("Could not create node!\n");
exit(-1);
}
const char* service_name_value = "distance_to_obstacle";
iox2_service_name_h service_name = NULL;
if (iox2_service_name_new(NULL, service_name_value, strlen(service_name_value), &service_name) != IOX2_OK) {
printf("Unable to create service name!\n");
exit(-1);
}
iox2_service_name_ptr service_name_ptr = iox2_cast_service_name_ptr(service_name);
iox2_service_builder_h service_builder = iox2_node_service_builder(&node, NULL, service_name_ptr);
iox2_service_builder_pub_sub_h service_builder_pub_sub = iox2_service_builder_pub_sub(service_builder);
const char* payload_type_name = "Distance";
if (iox2_service_builder_pub_sub_set_payload_type_details(&service_builder_pub_sub,
iox2_type_variant_e_FIXED_SIZE,
payload_type_name,
strlen(payload_type_name),
sizeof(struct Distance),
alignof(struct Distance))
!= IOX2_OK) {
printf("Unable to set type details\n");
exit(-1);
}
iox2_service_builder_pub_sub_set_subscriber_max_buffer_size(&service_builder_pub_sub, 3);
iox2_service_builder_pub_sub_set_history_size(&service_builder_pub_sub, 3);
iox2_service_builder_pub_sub_set_subscriber_max_borrowed_samples(&service_builder_pub_sub, 3);
iox2_port_factory_pub_sub_h pubsub_service = NULL;
if (iox2_service_builder_pub_sub_open_or_create(service_builder_pub_sub, NULL, &pubsub_service) != IOX2_OK) {
printf("Unable to create service!\n");
exit(-1);
}
For the event service, we configure a special event that fires if the sensor participant is identified as dead, allowing the emergency brake to switch to a safe state.
let ultra_sonic_service_dead = EventId::new(10);
let event_service = node
.service_builder(&"distance_to_obstacle".try_into()?)
.event()
.notifier_dead_event(ultra_sonic_service_dead)
.open_or_create()?;
ultra_sonic_service_dead = iox2.EventId.new(10)
event_service = (
node.service_builder(iox2.ServiceName.new("distance_to_obstacle"))
.event()
.notifier_dead_event(ultra_sonic_service_dead)
.open_or_create()
)
auto ultra_sonic_service_dead = EventId(10);
auto event_service = node.service_builder(ServiceName::create("distance_to_obstacle").value())
.event()
.notifier_dead_event(ultra_sonic_service_dead)
.open_or_create()
.value();
size_t ultra_sonic_service_dead = 10;
iox2_service_builder_event_h service_builder_event =
iox2_service_builder_event(iox2_node_service_builder(&node, NULL, iox2_cast_service_name_ptr(service_name)));
iox2_service_builder_event_set_notifier_dead_event(&service_builder_event, ultra_sonic_service_dead);
iox2_port_factory_event_h event_service = NULL;
if (iox2_service_builder_event_open_or_create(service_builder_event, NULL, &event_service) != IOX2_OK) {
printf("Unable to create event service!\n");
exit(-1);
}
Now we create a publisher for the distance samples and a notifier for control events.
let publisher = pubsub_service.publisher_builder().create()?;
let notifier = event_service.notifier_builder().create()?;
let obstacle_too_close = EventId::new(5);
publisher = pubsub_service.publisher_builder().create()
notifier = event_service.notifier_builder().create()
obstacle_too_close = iox2.EventId.new(5)
auto publisher = pubsub_service.publisher_builder().create().value();
auto notifier = event_service.notifier_builder().create().value();
auto obstacle_too_close = EventId(5);
iox2_port_factory_notifier_builder_h notifier_builder =
iox2_port_factory_event_notifier_builder(&event_service, NULL);
iox2_notifier_h notifier = NULL;
if (iox2_port_factory_notifier_builder_create(notifier_builder, NULL, ¬ifier) != IOX2_OK) {
printf("Unable to create notifier!\n");
exit(-1);
}
iox2_port_factory_publisher_builder_h publisher_builder =
iox2_port_factory_pub_sub_publisher_builder(&pubsub_service, NULL);
iox2_publisher_h publisher = NULL;
if (iox2_port_factory_publisher_builder_create(publisher_builder, NULL, &publisher) != IOX2_OK) {
printf("Unable to create publisher!\n");
exit(-1);
}
iox2_event_id_t obstacle_too_close = { .value = 5 };
The publishing loop: send the distance sample every 100 ms, and trigger an event if it’s below the threshold.
while node.wait(Duration::from_millis(100)).is_ok() {
let sample = publisher.loan_uninit()?;
let distance = get_ultra_sonic_sensor_distance();
let sample = sample.write_payload(Distance {
distance_in_meters: distance,
some_other_property: 42.0,
});
sample.send()?;
if distance < distance_threshold {
notifier.notify_with_custom_event_id(obstacle_too_close)?;
}
}
try:
while True:
node.wait(iox2.Duration.from_millis(100))
sample = publisher.loan_uninit()
distance = get_ultra_sonic_sensor_distance()
sample = sample.write_payload(
Distance(distance_in_meters=distance, some_other_property=42.0)
)
sample.send()
if distance < distance_threshold:
notifier.notify_with_custom_event_id(obstacle_too_close)
except iox2.NodeWaitFailure:
print("exit")
while (node.wait(iox2::bb::Duration::from_millis(100)).has_value()) {
auto sample = publisher.loan_uninit().value();
auto distance = get_ultra_sonic_sensor_distance();
auto initialized_sample = sample.write_payload(Distance { distance, 42.0 }); // NOLINT
send(std::move(initialized_sample)).value();
if (distance < distance_threshold) {
notifier.notify_with_custom_event_id(obstacle_too_close).value();
}
}
while (iox2_node_wait(&node, 0, 100000000) == IOX2_OK) {
double distance = get_ultra_sonic_sensor_distance();
iox2_sample_mut_h sample = NULL;
if (iox2_publisher_loan_slice_uninit(&publisher, NULL, &sample, 1) != IOX2_OK) {
printf("Failed to loan sample\n");
exit(-1);
}
struct Distance* payload = NULL;
iox2_sample_mut_payload_mut(&sample, (void**) &payload, NULL);
payload->distance_in_meters = distance;
payload->some_other_property = 42.0F; // NOLINT
if (iox2_sample_mut_send(sample, NULL) != IOX2_OK) {
printf("Failed to send sample\n");
exit(-1);
}
if (distance < DISTANCE_THRESHOLD) {
if (iox2_notifier_notify_with_custom_event_id(¬ifier, &obstacle_too_close, NULL) != IOX2_OK) {
printf("Failed to notify listener!\n");
exit(-1);
}
}
}
// release every handle once the loop exits
iox2_publisher_drop(publisher);
iox2_notifier_drop(notifier);
iox2_port_factory_event_drop(event_service);
iox2_port_factory_pub_sub_drop(pubsub_service);
iox2_service_name_drop(service_name);
iox2_node_drop(node);
Subscriber Setup¶
On the other side, we again create both services and their ports:
use iceoryx2::prelude::*;
let node = NodeBuilder::new().create::<ipc::Service>()?;
let pubsub_service = node
.service_builder(&"distance_to_obstacle".try_into()?)
.publish_subscribe::<Distance>()
.subscriber_max_buffer_size(3)
.history_size(3)
.subscriber_max_borrowed_samples(3)
.open_or_create()?;
let ultra_sonic_service_dead = EventId::new(10);
let event_service = node
.service_builder(&"distance_to_obstacle".try_into()?)
.event()
.notifier_dead_event(ultra_sonic_service_dead)
.open_or_create()?;
let subscriber = pubsub_service.subscriber_builder().create()?;
let listener = event_service.listener_builder().create()?;
import iceoryx2 as iox2
node = iox2.NodeBuilder.new().create(iox2.ServiceType.Ipc)
pubsub_service = (
node.service_builder(iox2.ServiceName.new("distance_to_obstacle"))
.publish_subscribe(Distance)
.subscriber_max_buffer_size(3)
.history_size(3)
.subscriber_max_borrowed_samples(3)
.open_or_create()
)
ultra_sonic_service_dead = iox2.EventId.new(10)
event_service = (
node.service_builder(iox2.ServiceName.new("distance_to_obstacle"))
.event()
.notifier_dead_event(ultra_sonic_service_dead)
.open_or_create()
)
subscriber = pubsub_service.subscriber_builder().create()
listener = event_service.listener_builder().create()
auto node = NodeBuilder().create<ServiceType::Ipc>().value();
auto pubsub_service = node.service_builder(ServiceName::create("distance_to_obstacle").value())
.publish_subscribe<Distance>()
.subscriber_max_buffer_size(3)
.history_size(3)
.subscriber_max_borrowed_samples(3)
.open_or_create()
.value();
auto ultra_sonic_service_dead = EventId(10);
auto event_service = node.service_builder(ServiceName::create("distance_to_obstacle").value())
.event()
.notifier_dead_event(ultra_sonic_service_dead)
.open_or_create()
.value();
auto subscriber = pubsub_service.subscriber_builder().create().value();
auto listener = event_service.listener_builder().create().value();
iox2_node_builder_h node_builder_handle = iox2_node_builder_new(NULL);
iox2_node_h node = NULL;
if (iox2_node_builder_create(node_builder_handle, NULL, iox2_service_type_e_IPC, &node) != IOX2_OK) {
printf("Could not create node!\n");
exit(-1);
}
const char* service_name_value = "distance_to_obstacle";
iox2_service_name_h service_name = NULL;
if (iox2_service_name_new(NULL, service_name_value, strlen(service_name_value), &service_name) != IOX2_OK) {
printf("Unable to create service name!\n");
exit(-1);
}
iox2_service_name_ptr service_name_ptr = iox2_cast_service_name_ptr(service_name);
iox2_service_builder_h service_builder = iox2_node_service_builder(&node, NULL, service_name_ptr);
iox2_service_builder_pub_sub_h service_builder_pub_sub = iox2_service_builder_pub_sub(service_builder);
const char* payload_type_name = "Distance";
if (iox2_service_builder_pub_sub_set_payload_type_details(&service_builder_pub_sub,
iox2_type_variant_e_FIXED_SIZE,
payload_type_name,
strlen(payload_type_name),
sizeof(struct Distance),
alignof(struct Distance))
!= IOX2_OK) {
printf("Unable to set type details\n");
exit(-1);
}
iox2_service_builder_pub_sub_set_subscriber_max_buffer_size(&service_builder_pub_sub, 3);
iox2_service_builder_pub_sub_set_history_size(&service_builder_pub_sub, 3);
iox2_service_builder_pub_sub_set_subscriber_max_borrowed_samples(&service_builder_pub_sub, 3);
iox2_port_factory_pub_sub_h pubsub_service = NULL;
if (iox2_service_builder_pub_sub_open_or_create(service_builder_pub_sub, NULL, &pubsub_service) != IOX2_OK) {
printf("Unable to create service!\n");
exit(-1);
}
iox2_service_builder_event_h service_builder_event =
iox2_service_builder_event(iox2_node_service_builder(&node, NULL, iox2_cast_service_name_ptr(service_name)));
iox2_service_builder_event_set_notifier_dead_event(&service_builder_event, ultra_sonic_service_dead.value);
iox2_port_factory_event_h event_service = NULL;
if (iox2_service_builder_event_open_or_create(service_builder_event, NULL, &event_service) != IOX2_OK) {
printf("Unable to create event service!\n");
exit(-1);
}
iox2_port_factory_subscriber_builder_h subscriber_builder =
iox2_port_factory_pub_sub_subscriber_builder(&pubsub_service, NULL);
iox2_subscriber_h subscriber = NULL;
if (iox2_port_factory_subscriber_builder_create(subscriber_builder, NULL, &subscriber) != IOX2_OK) {
printf("Unable to create subscriber!\n");
exit(-1);
}
iox2_port_factory_listener_builder_h listener_builder =
iox2_port_factory_event_listener_builder(&event_service, NULL);
iox2_listener_h listener = NULL;
if (iox2_port_factory_listener_builder_create(listener_builder, NULL, &listener) != IOX2_OK) {
printf("Unable to create listener!\n");
exit(-1);
}
Instead of polling every 100 ms, the subscriber just waits for events. When woken up, it processes them and goes back to sleep.
while node.wait(Duration::ZERO).is_ok() {
let mut sensor_is_dead = false;
let mut obstacle_detected = false;
listener.blocking_wait(|event| {
if event.id == ultra_sonic_service_dead {
sensor_is_dead = true;
}
if event.id == obstacle_too_close {
obstacle_detected = true;
}
})?;
if sensor_is_dead {
go_into_parking_position();
}
if obstacle_detected {
let mut last_samples = vec![];
while let Some(sample) = subscriber.receive()? {
last_samples.push(sample);
if last_samples.len() == 3 {
break;
}
}
perform_break(last_samples);
}
}
try:
while True:
sensor_is_dead = False
obstacle_detected = False
for event in listener.blocking_wait():
if event.id == ultra_sonic_service_dead:
sensor_is_dead = True
if event.id == obstacle_too_close:
obstacle_detected = True
if sensor_is_dead:
go_into_parking_position()
if obstacle_detected:
last_samples = []
while len(last_samples) < 3:
sample = subscriber.receive()
if sample is None:
break
last_samples.append(sample)
perform_break(last_samples)
except iox2.ListenerWaitError:
print("exit")
while (node.wait(iox2::bb::Duration::from_secs(0)).has_value()) {
auto sensor_is_dead = false;
auto obstacle_detected = false;
listener
.blocking_wait([&](auto event) {
if (event.id() == ultra_sonic_service_dead) {
sensor_is_dead = true;
}
if (event.id() == obstacle_too_close) {
obstacle_detected = true;
}
})
.has_value();
if (sensor_is_dead) {
go_into_parking_position();
}
if (obstacle_detected) {
std::vector<Sample<ServiceType::Ipc, Distance, void>> last_samples;
auto sample = subscriber.receive().value();
while (sample.has_value()) {
last_samples.push_back(std::move(sample.value()));
if (last_samples.size() == 3) {
break;
}
sample = subscriber.receive().value();
}
perform_break(last_samples);
}
}
struct received_events events;
iox2_sample_h last_samples[3];
uint64_t number_of_notifications = 0;
while (iox2_node_wait(&node, 0, 0) == IOX2_OK) {
events.sensor_is_dead = false;
events.obstacle_detected = false;
// C has no closures, so the wait callback must be a separate static
// function rather than being written inline. `collect_event` just
// records which events fired into `events`:
//
// static void collect_event(const iox2_event_id_t* id, uint64_t count, void* ctx) {
// struct received_events* events = ctx;
// events->sensor_is_dead |= id->value == ultra_sonic_service_dead.value;
// events->obstacle_detected |= id->value == obstacle_too_close.value;
// }
if (iox2_listener_blocking_wait(&listener, &number_of_notifications, collect_event, &events) != IOX2_OK) {
printf("Unable to wait for notification!\n");
exit(-1);
}
if (events.sensor_is_dead) {
go_into_parking_position();
}
if (events.obstacle_detected) {
size_t received = 0;
for (size_t i = 0; i < 3; ++i) {
if (iox2_subscriber_receive(&subscriber, NULL, &last_samples[i]) != IOX2_OK) {
printf("Failed to receive sample\n");
exit(-1);
}
if (last_samples[i] == NULL) {
break;
}
received++;
}
perform_break(last_samples, received);
for (size_t i = 0; i < received; ++i) {
iox2_sample_drop(last_samples[i]);
}
}
}
// release every handle once the loop exits
iox2_listener_drop(listener);
iox2_subscriber_drop(subscriber);
iox2_port_factory_event_drop(event_service);
iox2_port_factory_pub_sub_drop(pubsub_service);
iox2_service_name_drop(service_name);
iox2_node_drop(node);
Health Monitoring¶
The notifier’s “dead event” relies on health monitoring: iceoryx2 offers
building blocks that detects when a participant dies. How this works in detail
is covered in a separate tutorial.