jeudi 20 avril 2017

C++ Design: Multiple TCP clients, boost asio and observers

In my system, I have a juggle a bunch of TCP clients and I am bit confused on how to design it [most of my experience is in C, hence the insecurity]. I am using boost ASIO for managing connection. These are the components I have

  • A TCPStream class : thin wrapper over boost asio
  • an IPC protocol, which implement a protocol over TCP: basically Each message starts with a type and length field so we can read the individual messages out of the stream.
  • Connection classes which handle the messages
  • Observer class which monitors connections

I am writing pseudo C++ code to be concise. I think you will get the idea

class TCPStream {
   boost::asio::socket socket_;
public:

   template <typename F>
   void connect (F f)
   {
       socket_.connect(f);
   }

   template <typename F>
   void read (F f)
   {
      socket_.read(f);
   }
};

class IpcProtocol : public TCPStream {
public:
    template <typename F
    void read (F f)
    {
        TCPStream::read(
              [f] (buffer, err) {

                while (msg = read_indvidual_message(buffer)) {
                      // **** this is a violation of how this pattern is 
                      // supposed to work. Ideally there should a callback 
                      // for individual message. Here the same callback
                      // is called for N no. of messages. But in our case  
                      // its the same callback everytime so this should be      
                      // fine - just avoids some function calls.
                      f(msg);
                };
              };
         )
    }
};

Lets say I have a bunch of TCP connections and there are a handler class for each of the connection. Lets name it Connection1, Connection2 ...

class Connection {
    virtual int type() = 0;
};

class Connection1 : public Connection {

   shared_ptr<IpcProtocol> ipc_;

   int type ()
   {
       return 1;
   }

   void start ()
   {
       ipc_.connect([self = shared_from_this()](){ self->connected(); });

       ipc_.read(
            [self = shared_from_this()](msg, err) {

              if (!err)
                  self->process(msg);
              } else {
                  self->error();
              }   
            });
   }

   void connected ()
   {
       observer.notify_connected(shared_from_this());
   }

   void error ()
   {
       observer.notify_error(shared_from_this());
   }
};

This pattern repeats for all connections one way or other. messages are processed by the connection class itself. But it will let know of other events [connect, error] to an observer. The reason -

  1. Restart the connection, everytime it disconnect
  2. Bunch of guys needs to know if the connection is established so that they can send initial request/confguration to server.
  3. There are things that needs be done based on connection status of muliple connections Eg: if connection1 and connection2 are established, then start connection3 etc.

I added a middle Observer class is there so that the observers do have to directly connect to the connection everytime it is restarted. Each time connection breaks, the connection class is deleted and new one is created.

 class Listeners {
public:
    virtual void notify_error(shared_ptr<Connection>) = 0;
    virtual void notify_connect(shared_ptr<Connection>) = 0;
    virtual void interested(int type) = 0;
};


class Observer {
   std::vector<Listeners *> listeners_;
public:

   void notify_connect(shared_ptr<Connection> connection)
   {
        for (listener : listeners_) {
            if (listener->interested(connection->type())) {
                listener->notify_error(connection);
            }
        }       
   }
};

Now a rough prototype of this works. But I was wondering if this class design any good. There are multiple streaming servers which will continuously produce states and send it to my module to program the state in h/w. This needs to be extensible as more clients will be added in future.

Threading

The legacy code had one thread per TCP connection and this worked fine. Here I am trying to handle multiple connections on same thread. Still there will be multiple threads calling ioservice. So the observer will run on multiple threads. I am planning to have a mutex per Listener, so that listeners wont get multiple events concurrently.

Aucun commentaire:

Enregistrer un commentaire