-
Notifications
You must be signed in to change notification settings - Fork 47
SO 5.8 ByExample Delivery Filters
Note. It is better to read SO-5.8 Basics before this text.
There could be situations where an agent have to deal with big amount of messages but need to handle only few of them. Let imagine an agent which listen messages from temperature or pressure sensor. The sensor could send several messages per seconds but the agent needs only messages with abnormal values inside. How to filter out the messages with normal values and ignore them?
Until v.5.5.5 there was no way to filter message stream. If an agent had wanted to ignore some messages then the agent received all of them, checked their content and threw out messages the agent not interested for.
But this approach has significant run-time cost. Every message needs to be stored to an event queue, then enqueued, then an event handler must be called for it. If an agent needs to receiver hundreds of messages to handle only dozens of them it is very inefficient.
To solve this problem a message delivery filters have introduced in v.5.5.5. They allows to check message’s content before message will be placed to an event queue. It means that an agent will receive only those messages which have an appropriate content. E.g. instead of hundreds of messages with inappropriate content the receiver will get only dozens messages with appropriate values inside.
The delivery filters are described in the corresponding section, please see SO-5.8 In-depth Message Delivery Filters for the details. This example show how delivery filters could be set and dropped. And how they influenced the messages flow.
The example shows effects of:
- receiving messages without delivery filters;
- receiving messages with delivery filters;
- attempt of sending messages with delivery filters but without subscription for messages;
- changing of delivery filters.
There is just one agent which handles one message type and two signals.
Instances of msg_sample message type are used to show effect of delivery filters.
Signal of type msg_second_part is necessary to split agent’s work into two parts. It is necessary because there is a moment in agent’s logic where all previously sent messages must be extracted from the event queue and processed. Only then the work of agent could be continued.
Signal of type msg_shutdown is necessary to finish example work.
It is important to note that agent uses two mboxes. The direct mbox is used for msg_second_part and msg_shutdown signals. It is because those signals are not subjects for delivery filters. And an anonymous MPMC-mbox is used for dealing with msg_sample messages. It is because delivery filters can be set only for MPMC-mboxes.
The agent just sends pairs of msg_sample messages to the dedicated MPMC-mboxes. There are several sends in different circumstances. Every send produces different results:
[first]: 0-only-subscription
[first]: 1-only-subscription
[first]: 1-subscription-and-filter
[second]: 1-subscription-and-filter-2
[second]: 0-subscription-and-filter-3
[second]: 0-only-subscription-2
[second]: 1-only-subscription-2
The first send is when there is only a subscription to msg_sample without any delivery filters. Two message instances must be received by the agent.
[first]: 0-only-subscription
[first]: 1-only-subscription
The second send is when there is a delivery filter. This filter allows only delivery of messages with value 1 inside. Because of that only one message instance should be received.
[first]: 1-subscription-and-filter
The third send is when there is the same delivery filter but there is no subscription to msg_sample. No messages must be received. Because of that there is no lines with “only-filter” strings in the output.
The fourth send is when there is the same delivery filter and a new subscription. Only one message instance should be received and it must be processed by new event handler.
[second]: 1-subscription-and-filter-2
The fifth send is when there is a new delivery filter but the same event handler. The new filter passes only messages with value 0 inside. Because of that only one message instance should be received:
[second]: 0-subscription-and-filter-3
The sixth send is when there is no delivery filter but the same event handler. Two message instances should be received:
[second]: 0-only-subscription-2
[second]: 1-only-subscription-2
#include <so_5/all.hpp>
#include <iostream>
using namespace std;
using namespace so_5;
// Message to be filtered.
struct msg_sample
{
int m_key;
string m_value;
};
// A signal for doing second part of example.
struct msg_second_part : public signal_t {};
// A signal for finish the example.
struct msg_shutdown : public signal_t {};
// Main example agent.
// Ordinary agent is necessary because a delivery filter can be set
// only by ordinary agent.
class a_example_t final : public agent_t
{
public :
a_example_t( context_t ctx )
: agent_t( ctx )
, m_mbox( so_environment().create_mbox() )
{}
void so_define_agent() override
{
so_subscribe_self()
.event( &a_example_t::evt_second_part )
.event( [this](mhood_t< msg_shutdown >) {
so_deregister_agent_coop_normally();
} );
}
void so_evt_start() override
{
// Subscribe for the message. Without filter.
so_subscribe( m_mbox ).event( []( const msg_sample & evt ) {
cout << "[first]: " << evt.m_key << "-" << evt.m_value << endl;
} );
// Sending several messages...
// All of them will be stored to the agent's queue and handled.
send< msg_sample >( m_mbox, 0, "only-subscription" );
send< msg_sample >( m_mbox, 1, "only-subscription" );
// Setting a delivery filter for message.
so_set_delivery_filter( m_mbox, []( const msg_sample & evt ) {
return 1 == evt.m_key;
} );
// Sending several messages...
// Only one will be stored to the agent's queue and handled.
send< msg_sample >( m_mbox, 0, "subscription-and-filter" );
send< msg_sample >( m_mbox, 1, "subscription-and-filter" );
// Take time for processing already queued messages.
send< msg_second_part >( *this );
}
void evt_second_part(mhood_t< msg_second_part >)
{
// Drop the subscription.
so_drop_subscription< msg_sample >( m_mbox );
// Sending several messages...
// No one of them will be stored to the agent's queue nor handled.
send< msg_sample >( m_mbox, 0, "only-filter" );
send< msg_sample >( m_mbox, 1, "only-filter" );
// Subscribe for the message again.
so_subscribe( m_mbox ).event( []( const msg_sample & evt ) {
cout << "[second]: " << evt.m_key << "-" << evt.m_value << endl;
} );
// Sending several messages...
// Only one will be stored to the agent's queue and handled.
send< msg_sample >( m_mbox, 0, "subscription-and-filter-2" );
send< msg_sample >( m_mbox, 1, "subscription-and-filter-2" );
// Changing the filter to new one.
so_set_delivery_filter( m_mbox, []( const msg_sample & evt ) {
return 0 == evt.m_key;
} );
// Sending several messages...
// Only one will be stored to the agent's queue and handled.
send< msg_sample >( m_mbox, 0, "subscription-and-filter-3" );
send< msg_sample >( m_mbox, 1, "subscription-and-filter-3" );
// Dropping the filter.
so_drop_delivery_filter< msg_sample >( m_mbox );
// Sending several messages...
// All of them will be stored to the agent's queue and handled.
send< msg_sample >( m_mbox, 0, "only-subscription-2" );
send< msg_sample >( m_mbox, 1, "only-subscription-2" );
// Example could be finished.
send< msg_shutdown >( *this );
}
private :
// Separate MPMC-mbox is necessary for delivery filter.
const mbox_t m_mbox;
};
int main()
{
try
{
so_5::launch( []( environment_t & env ) {
env.register_agent_as_coop( env.make_agent< a_example_t >() );
} );
return 0;
}
catch( const std::exception & x )
{
cerr << "Exception: " << x.what() << endl;
}
return 2;
}