Skip to content

SO 5.6 InDepth Delivery Filters

Yauheni Akhotnikau edited this page Jan 10, 2020 · 3 revisions

There are situations when an agent must filter the messages received. A good example is receiving run-time monitoring information from SObjectizer Environment (see Run-Time Monitoring): there are a lot of messages and an agent have to handle only a few of them:

void timer_thread_stats_listener::evt_timer_quantities(
    const so_5::stats::messages::quantity< std::size_t > & evt )
{
    // Ignore messages unrelated to timer thread.
    if( so_5::stats::prefixes::timer_thread() != evt.m_prefix )
        return;
    ... // Processing of related to timer thread messages.
}

This kind of filtering of inappropriate messages is not efficient and has significant run-time cost. Every message the agent subscribed for is passed to agent's queue, then enqueued, then passed to agent's event handler. Only to be thrown out. Message delivery filters are a more efficient message-filtering mechanism.

Message delivery filter -- is a predicate which gets a message instance and returns true if the message is allowed to be delivered to the corresponding recipient or false if the message must be thrown out.

For the example above a message delivery filter can be set as:

void timer_thread_stats_listener::so_define_agent()
{
    so_set_delivery_filter(
        so_environment().stats_controller().mbox(),
        []( const so_5::stats::messages::quantity< std::size_t > & msg ) {
            return so_5::stats::prefixes::timer_thread() == msg.m_prefix;
        } );
    ...
}

Delivery filters must be represented as lambda-function (or functional object) with just one argument -- a constant reference to message instance. Delivery filter lambda-function must return bool.

An agent stores all the delivery filters defined. All delivery filters are destroyed automatically at the end of the agent's lifetime. A delivery filter can be explicitly dropped and destroyed by the so_drop_delivery_filter() method:

void timer_thread_stats_listener::evt_some_action()
{
    so_drop_delivery_filter<
            so_5::stats::messages::quantity< std::size_t >
        >( so_environment().stats_controller().mbox() );
    ...
}

If there is need to change the delivery filter it is enough to call so_set_delivery_filter() yet another time without the need of calling so_drop_delivery_filter():

// Setting the filter first time.
so_set_delivery_filter( mbox, []( const some_message & msg ) {
        return /* some predicate... */;
    } );

... // Some messages are sent to mbox here.

// Changing the filter by new one.
so_set_delivery_filter( mbox, []( const some_message & msg ) {
        return /* another predicate... */;
    } );

... // Some messages are sent to mbox here.

Method so_set_delivery_filter() sets the reference to delivery filter to the appropriate message mbox. That mbox now knows that before message delivery for the corresponding recipient the delivery filter must be invoked to check the necessity of message delivery. If delivery filter returns false message is not stored to the recipient queue at all.

After the introduction of delivery filters multi-producer multi-consumer mboxes work this way:

  • there is a list of subscribers for every message type;
  • every subscriber in that list can have an optional delivery filter;
  • during message delivery mbox walks through that list and checks the delivery filter for every subscriber. If there is a delivery filter and it returns false then the message is not delivered to that subscriber. If there is no delivery filter or delivery filter returns true then the message is stored to the subscriber's event queue.

Please note that delivery filter is applied to a message only once -- during message dispatching inside mbox. When a message is stored in an agent's event queue and then extracted for processing the message instance no more checked. It can be very important if the delivery filter is replaced by a new one. For example:

void my_agent::evt_demo() 
{
    // All actions are performed inside an event handler. It means
    // that agent can handle all the messages sent only after finish
    // of that event handler.

    // MPMC-mbox is necessary.
    const auto mbox = so_environment().create_mbox();

    // Create event subscription for message.
    so_subscribe( mbox ).event( []( const current_temperature & evt ) {
        cout << "temperature is: " << evt.value() << endl;
    } );

    // Send the first message. Temperature is 10 degrees.
    so_5::send< current_temperature >( mbox, 10 );

    // Sets the first filter.
    so_set_delivery_filter( mbox, []( const current_temperature & evt ) {
        // Allows only messages with values outside of normal range [0,45].
        return !( 0 <= evt.value() && evt.value() <= 45 );
    } );

    // Send the second message. Temperature is -10 degress.
    so_5::send< current_temperatur >( mbox, -10 );

    // Replace the filter with another.
    so_set_delivery_filter( mbox, []( const current_temperature & evt ) {
        // Allows only messages with values in very narrow range.
        return ( 20 <= evt.value() && evt.value() <= 22 );
    } );

    // Send the third message. Temperature is 21 degrees.
    so_5::send< current_temperature >( mbox, 21 );
}

In that case, an agent will receive all three messages. The first one because there was no filter during a send operation. The second one because it passes the first delivery filter. And the third one because it passes the second delivery filter.

This approach could be a surprise for a developer if the developer thinks that delivery filter applies for messages which are in the agent's queue. It is not true.

Important note. A delivery filter can be set only for MPMC-mbox. It is impossible to set a delivery filter to a MPSC (direct) mbox.

It is because the main purpose of MPSC-mbox is to be a very fast and efficient direct channel to an agent. Any additional checking will slow down the work of MPSC-mbox.

There are some demands to delivery filters implementations:

  • delivery filter must be as fast as possible. It is because a delivery filter is called during message delivery operation and long working time can slow down part or the whole application significantly;
  • delivery filter must be thread-safe. It is because it will be called on different working threads, probably at the same time;

In the ideal case, a delivery filter must be represented by a small and light side-effect free lambda-function (like in the example above).

The delivery filter setup and the event subscription is different, not tightly related operations. It is possible to set up delivery filter without subscribing to that message. It is also possible to subscribe to message without setting a delivery filter up.

In the situation when a delivery filter is set but there are no subscriptions to the message a mbox stores only filter. The mbox knows that there are no subscriptions to the message and will not call delivery filter during a message dispatch.

A delivery filter and subscription for a message can be handled independently. For example:

using namespace std;
using namespace so_5;

// Message to be filtered.
struct msg_sample
{
   int m_key;
   string m_value;
};

// A signal for doing second part of example.
struct msg_second_part : public signal_t {};

// A signal for finish the example.
struct msg_shutdown : public signal_t {};

// Main example agent.
// Ordinary agent is necessary because a delivery filter can be set
// only by ordinary agent.
class a_example_t final : public agent_t
{
public :
   a_example_t( context_t ctx )
      :  agent_t( ctx )
      ,  m_mbox( so_environment().create_mbox() )
   {}

   void so_define_agent() override
   {
      so_subscribe_self()
         .event( &a_example_t::evt_second_part )
         .event( [this](mhood_t< msg_shutdown >) {
            so_deregister_agent_coop_normally();
         } );
   }

   void so_evt_start() override
   {
      // Subscribe for the message. Without filter.
      so_subscribe( m_mbox ).event( []( const msg_sample & evt ) {
        cout << "[first]: " << evt.m_key << "-" << evt.m_value << endl;
      } );
      // Sending several messages...
      // All of them will be stored to the agent's queue and handled.
      send< msg_sample >( m_mbox, 0, "only-subscription" );
      send< msg_sample >( m_mbox, 1, "only-subscription" );

      // Setting a delivery filter for message.
      so_set_delivery_filter( m_mbox, []( const msg_sample & evt ) {
        return 1 == evt.m_key;
      } );
      // Sending several messages...
      // Only one will be stored to the agent's queue and handled.
      send< msg_sample >( m_mbox, 0, "subscription-and-filter" );
      send< msg_sample >( m_mbox, 1, "subscription-and-filter" );

      // Take time for processing already queued messages.
      send< msg_second_part >( *this );
   }

   void evt_second_part(mhood_t< msg_second_part >)
   {
      // Drop the subscription.
      so_drop_subscription< msg_sample >( m_mbox );
      // Sending several messages...
      // No one of them will be stored to the agent's queue nor handled.
      send< msg_sample >( m_mbox, 0, "only-filter" );
      send< msg_sample >( m_mbox, 1, "only-filter" );

      // Subscribe for the message again.
      so_subscribe( m_mbox ).event( []( const msg_sample & evt ) {
        cout << "[second]: " << evt.m_key << "-" << evt.m_value << endl;
      } );
      // Sending several messages...
      // Only one will be stored to the agent's queue and handled.
      send< msg_sample >( m_mbox, 0, "subscription-and-filter-2" );
      send< msg_sample >( m_mbox, 1, "subscription-and-filter-2" );

      // Changing the filter to new one.
      so_set_delivery_filter( m_mbox, []( const msg_sample & evt ) {
        return 0 == evt.m_key;
      } );
      // Sending several messages...
      // Only one will be stored to the agent's queue and handled.
      send< msg_sample >( m_mbox, 0, "subscription-and-filter-3" );
      send< msg_sample >( m_mbox, 1, "subscription-and-filter-3" );

      // Dropping the filter.
      so_drop_delivery_filter< msg_sample >( m_mbox );
      // Sending several messages...
      // All of them will be stored to the agent's queue and handled.
      send< msg_sample >( m_mbox, 0, "only-subscription-2" );
      send< msg_sample >( m_mbox, 1, "only-subscription-2" );

      // Example could be finished.
      send< msg_shutdown >( *this );
   }

private :
   // Separate MPMC-mbox is necessary for delivery filter.
   const mbox_t m_mbox;
};
Clone this wiki locally