RPC Framework

The goal of this library is to enable seamless remote procedure calls, asynchronous events and exceptions propagation between C++ objects in different processes and machines. This chapter introduces the usage of the RPC library of Lima2.

The library depends on functionalities offered by other libraries:

  • MPI and Boost.MPI for the transport layer

  • Boost.Serialization for serialization

  • Boost.Hana for introspection and generative programming

Usage

Asynchronous Events and Notification Channels

Asynchronous events are triggered by the server and handled by the client using a notification channel than can be either unbuffered (notification are lost if they are not read fast enough) or buffered (notification are queued and popped at will).

// Get event receiver
auto& event_recv = client.on_event.get_receiver();

// Get value from the receiver (here blocking)
T evt = event_recv.get();

// Try to get a value from the receiver (non-blocking)
std::optional<T> evt = event_recv.try_get();
if (evt)
    BOOST_CHECK_EQUAL(hana::at_c<0>(evt), "You've got mail");

Procedure Calls

Defining the interface

The interface is defined using a simple class definition and a macro that provides the necessary boilerplate for introspecting the interface. Two macros are provided:

  • LIMA_RPC_MEMBER() provides introspection for member functions and data

  • LIMA_RPC_NESTED_TYPE() provides introspection for nested types

struct service
{
    std::string name() const;
    int exit();
};


struct calculator : service
{
    using value_type  = double;
  
    calculator_impl() : m_result(0) {}

    std::string name() const { return "calculator"; }
    int   exit() { ::exit(0); }

    double add(double v);
    double add2(double v, double v2);
    double sub(double v);
    double mult(double v);
    double div(double v);
    double result() const;

    void reset();

    void throw_exception();

    event<unbuffered_policy, std::string> on_event;
};

// This macro provides a way to mirror the member (function and event) of the interface
LIMA_RPC_MEMBER(calculator, name, exit, add, add2, sub, mult, div, result);

// This macro provides a way to mirror the nested types of the interface
LIMA_RPC_NESTED_TYPE(calculator, value_type);

Defining the implementation

The implementation of the RPC service must be compatible with the interface definition.

class calculator_impl
{
public:
    calculator_impl() : m_result(0.0) {}
    calculator_impl(calculator_impl&) = delete;

    std::string name() const { return "calculator";	}
    int   exit() { /*MPI_Finalize();*/ ::exit(0); }

    double add(double v) { m_result += v; return m_result; }
    double sub(double v) { m_result -= v; return m_result; }
    double mult(double v) { m_result *= v; return m_result; }
    double div(double v) { m_result /= v; return m_result; }
    double add2(double v, double v2) { m_result += v + v2; return m_result; }

    double result() const { return m_result; }

    void reset() { m_result = 0.0; }

    void throw_exception() const { throw std::runtime_error("Oopsy"); }

    void trigger_event() const {
        on_event("You've got mail");
    }
    std::function<void(std::string)> on_event;

private:
    double m_result;
};

Server and Clients

Server and clients communicate over an MPI communicator, or either a sub-communicator that includes a subset of processes, involved in a specific RPC service.

int main() {
    mpi::environment env;
    mpi::communicator world;

    // if Rank 0 => server
    if (world.rank() == 0)
    {
        using server_t = server<calculator, calculator_impl>;

        // Create and run server
        server_t server(world);
        server.listen();
    }
    // Else clients
    else
    {
        using client_t = rpc::client<calculator>;
        
        client_t::value_type val = 0.0;
        client_t client(world);

        //Register event callback
        client.on_event.set_delegate([](auto evt) {
            std::cout << "on_event: " << hana::at_c<0>(evt) << std::endl;
        });

        auto name = client.name();
        std::cout << "name: " << name << std::endl;
        
        val = client.add(5.0);
        std::cout << "val: " << val << std::endl;

        auto result = client.result();
        std::cout << "result: " << result << std::endl;

        client.reset();

        client.trigger_event();

        try {
            client.throw_exception();
        }
        catch (std::exception& ex)
        {
            std::cout << "ex: " << ex.what() << std::endl;
        }

        return client.exit();
    }
}

Master and Slaves

In this mode, commands are broadcasted to every slaves. A common return value is computed using a “consensus” function that reduces individual return values from the slaves.

int main() {
    mpi::environment env;
    mpi::communicator world;

    // if Rank 0 => master
    if (world.rank() == 0)
    {
        // Create and run server
        master<calculator> master(world);

        //Register event callback
        master.on_event.set_delegate([](auto evt) {
            std::cout << "on_event: " << hana::at_c<0>(evt) << std::endl;
        });

        auto name = master.name();
        std::cout << "name: " << name << std::endl;
        
        val = master.add(5.0);
        std::cout << "val: " << val << std::endl;

        auto result = master.result();
        std::cout << "result: " << result << std::endl;

        master.reset();

        master.trigger_event();

        try {
            master.throw_exception();
        }
        catch (std::exception& ex)
        {
            std::cout << "ex: " << ex.what() << std::endl;
        }

        return master.exit();
    }
    // Else slaves
    else
    {
        using slave_t = rpc::slave<calculator, calculator_impl>;

        slave_t slave(world, sub, 0, 0.0);
        slave.listen();
    }
}