Overview
Features
Download
Documentation
Community
Add-Ons & Services

please help with threaded server

A general discussion forum.

please help with threaded server

Postby gtbasher » 23 Sep 2012, 15:14

Hi there,

I'm new to network programming. I have been trying to write a threaded server application that accepts multiple connections from clients. It works fine with one thread, but I can't get it working with multiple connections.

Hope you can help. Thanks!

Code: Select all

#include <Poco/Exception.h>
#include <Poco/Net/SocketAddress.h>
#include <Poco/Net/Socket.h>
#include <Poco/Net/StreamSocket.h>
#include <Poco/Net/ServerSocket.h>
#include <Poco/Thread.h>
#include <Poco/Runnable.h>
#include <Poco/ThreadPool.h>
using Poco::Exception;
using Poco::Net::SocketAddress;
using Poco::Net::Socket;
using Poco::Net::StreamSocket;
using Poco::Net::ServerSocket;
using Poco::Thread;
using Poco::Runnable;
using Poco::ThreadPool;

#include <iostream>
#include <sstream>
#include <algorithm>

#define BUFFER_SIZE 32

class Connection : public Runnable
{
public:
   //constructor
   Connection(StreamSocket *socket) : m_socket(socket) {}
   //destructor
   ~Connection()
   {
      if(m_socket) delete m_socket;
   }
   //methods
   virtual void run()
   {
      try
      {
         std::string host = m_socket->address().host().toString();
         int port = m_socket->address().port();
         //talk to the server
         while(true)
         {
            std::string msg_in = _read_message();
            std::cout << host << ":" << port << ": " << msg_in << std::endl;
            //echo message back in uppercase
            std::transform(msg_in.begin(),msg_in.end(),msg_in.begin(),::toupper);
            std::string msg_out = msg_in;
            _send_message(msg_out);
         }
      }
      catch(Exception &e)
      {
         std::cout << e.displayText() << std::endl;
      }
   }
private:
   //methods
   //read the message. does not include termination sequence.
   std::string _read_message()
   {
      //clear the accumulation buffer
      m_accum_buffer.clear();
      int num_bytes;
      char prev_byte;
      while(true)
      {
         num_bytes = m_socket->receiveBytes(m_buffer,BUFFER_SIZE);
         //append the bytes to the accumulation buffer
         for(int i=0; i<num_bytes; i++)
         {
            if(i > 0 && (i+1) < num_bytes && m_buffer[i] == '\n' && m_buffer[i+1] == '\r')
            {
               //got message
               return m_accum_buffer;
            }
            m_accum_buffer += m_buffer[i];
            prev_byte = m_buffer[i];
         }
      }
   }
   //does not assume termination sequence has been appended
   void _send_message(const std::string &message)
   {
      std::string the_msg = message + "\n\r"; //add termination sequence
      m_socket->sendBytes((const void *)the_msg.data(),the_msg.length());
   }
   //data
   StreamSocket *m_socket;
   char m_buffer[BUFFER_SIZE];
   std::string m_accum_buffer;
protected:
   //
};

int main(int argc,char **argv)
{
   try
   {
      SocketAddress sa("localhost",54321);
      ServerSocket sock(sa);
      sock.listen();

      const int MAX = 5;
      int used = 0;
      Thread threads[MAX];
      while(used < MAX)
      {
         StreamSocket *ss = new StreamSocket(sock.acceptConnection());
         Connection conn(ss);
         //Thread thread;
         //thread.start(conn);
         //thread.join(); //blocks. how to do it?
         threads[used++].start(conn);
      }
      //join all the threads
      for(int i=0; i<used; i++)
      {
         threads[i].join();
      }
   }
   catch(Exception &e)
   {
      std::cerr << e.displayText() << std::endl;
   }

   return 0;
}
gtbasher
 
Posts: 8
Joined: 15 Sep 2012, 14:23

Re: please help with threaded server

Postby gtbasher » 23 Sep 2012, 15:49

I know there is the TCPServer framework, but I would like to learn how to do it from scratch.

I know it's a thread problem, just not sure exactly why. I'm going to paste the problem code (the main function) here so there is less stuff to read through:

Code: Select all
try
   {
      SocketAddress sa("localhost",54321);
      ServerSocket sock(sa);
      sock.listen();

      const int MAX = 5;
      int used = 0;
      Thread threads[MAX];
      while(used < MAX)
      {
         StreamSocket *ss = new StreamSocket(sock.acceptConnection());
         Connection conn(ss);
         //Thread thread;
         //thread.start(conn);
         //thread.join(); //blocks. how to do it?
         threads[used++].start(conn);
      }
      //join all the threads
      for(int i=0; i<used; i++)
      {
         threads[i].join();
      }
   }
   catch(Exception &e)
   {
      std::cerr << e.displayText() << std::endl;
   }


Hope someone can help. Thanks!
gtbasher
 
Posts: 8
Joined: 15 Sep 2012, 14:23

Re: please help with threaded server

Postby gtbasher » 23 Sep 2012, 18:01

Hello again, I tried creating a separate thread to handle the connections.

Code: Select all
class ConnectionHandler : public Runnable
{
public:
   //constructors
   ConnectionHandler(int port) : m_port(port) {}
   //destructor
   virtual ~ConnectionHandler() {}
   //methods
   virtual void run()
   {
      //listen for connections on m_port
      try
      {
         std::cout << "Server attempting to open socket on port " << m_port << std::endl;
         m_socket = ServerSocket(m_port);
         m_socket.listen();
         std::cout << "Server listening on port " << m_port << std::endl;

         //accept MAX connections
         const int MAX = 5;
         int used = 0;
         Thread threads[MAX];
         while(used < MAX)
         {
            std::cout << "Waiting for connection..." << std::endl;
            StreamSocket *ss = new StreamSocket(m_socket.acceptConnection());
            std::cout << "Established connection, spawning connection thread..." << std::endl;
            Connection conn(ss);
            //Thread thread;
            //thread.start(conn);
            //thread.join(); //blocks. how to do it?
            threads[used++].start(conn);
         }
         //join all the threads
         for(int i=0; i<used; i++)
         {
            threads[i].join();
         }
      }
      catch(Exception &e)
      {
         std::cout << e.displayText() << std::endl;
      }
   }
private:
   //data
   int m_port;
   ServerSocket m_socket;
};

int main(int argc,char **argv)
{
   ConnectionHandler connHandler(2222);
   Thread connHandlerThread;
   connHandlerThread.start(connHandler);
   connHandlerThread.join();

   return 0;
}


I get this error:

Code: Select all
Server attempting to open socket on port 2222
Server listening on port 2222
Waiting for connection...
Established connection, spawning connection thread...
Waiting for connection...
I/O error: 9


Please help. Thanks.
gtbasher
 
Posts: 8
Joined: 15 Sep 2012, 14:23

Re: please help with threaded server

Postby gtbasher » 23 Sep 2012, 18:31

Got it working.

Code: Select all

#include <Poco/Exception.h>
#include <Poco/Net/SocketAddress.h>
#include <Poco/Net/Socket.h>
#include <Poco/Net/StreamSocket.h>
#include <Poco/Net/ServerSocket.h>
#include <Poco/Thread.h>
#include <Poco/Runnable.h>
#include <Poco/ThreadPool.h>
using Poco::Exception;
using Poco::Net::SocketAddress;
using Poco::Net::Socket;
using Poco::Net::StreamSocket;
using Poco::Net::ServerSocket;
using Poco::Thread;
using Poco::Runnable;
using Poco::ThreadPool;

#include <iostream>
#include <sstream>
#include <algorithm>

#define BUFFER_SIZE 32

class Connection : public Runnable
{
public:
   //constructor
   Connection() : m_socket(0) {}
   Connection(StreamSocket *socket) : m_socket(socket) {}
   //destructor
   ~Connection()
   {
      std::cout << "Destructing Connection..." << std::endl;
      if(m_socket) delete m_socket;
   }
   //methods
   void set_socket(StreamSocket *socket) { m_socket = socket; }
   virtual void run()
   {
      try
      {
         std::string host = m_socket->address().host().toString();
         int port = m_socket->address().port();
         //talk to the server
         while(true)
         {
            std::string msg_in = _read_message();
            std::cout << host << ":" << port << ": " << msg_in << std::endl;
            if(msg_in == "quit")
            {
               //bye bye
               std::cout << host << ":" << port << " is closing the connection." << std::endl;
               break;
            }
            //echo message back in uppercase
            std::transform(msg_in.begin(),msg_in.end(),msg_in.begin(),::toupper);
            std::string msg_out = msg_in;
            _send_message(msg_out);
         }
         m_socket->close();
      }
      catch(Exception &e)
      {
         std::cout << e.displayText() << std::endl;
      }
   }
private:
   //methods
   //read the message. does not include termination sequence.
   std::string _read_message()
   {
      //clear the accumulation buffer
      m_accum_buffer.clear();
      int num_bytes;
      char prev_byte;
      while(true)
      {
         num_bytes = m_socket->receiveBytes(m_buffer,BUFFER_SIZE);
         //append the bytes to the accumulation buffer
         for(int i=0; i<num_bytes; i++)
         {
            if(i > 0 && (i+1) < num_bytes && m_buffer[i] == '\n' && m_buffer[i+1] == '\r')
            {
               //got message
               return m_accum_buffer;
            }
            m_accum_buffer += m_buffer[i];
            prev_byte = m_buffer[i];
         }
      }
   }
   //does not assume termination sequence has been appended
   void _send_message(const std::string &message)
   {
      std::string the_msg = message + "\n\r"; //add termination sequence
      m_socket->sendBytes((const void *)the_msg.data(),the_msg.length());
   }
   //data
   StreamSocket *m_socket;
   char m_buffer[BUFFER_SIZE];
   std::string m_accum_buffer;
protected:
   //
};

class ConnectionHandler : public Runnable
{
public:
   //constructors
   ConnectionHandler(int port) : m_port(port) {}
   //destructor
   virtual ~ConnectionHandler() {}
   //methods
   virtual void run()
   {
      //listen for connections on m_port
      try
      {
         std::cout << "Server attempting to open socket on port " << m_port << std::endl;
         m_socket = ServerSocket(m_port);
         m_socket.listen();
         std::cout << "Server listening on port " << m_port << std::endl;

         //accept MAX connections
         const int MAX = 5;
         int used = 0;
         Thread threads[MAX];
         Connection conns[MAX];
         while(used < MAX)
         {
            std::cout << "Waiting for connection..." << std::endl;
            StreamSocket *ss = new StreamSocket(m_socket.acceptConnection());
            std::cout << "Established connection, spawning connection thread..." << std::endl;
            conns[used].set_socket(ss);
            //Thread thread;
            //thread.start(conn);
            //thread.join(); //blocks. how to do it?
            threads[used].start(conns[used]);
            used++;
            std::cout << used << " threads running..." << std::endl;
         }
         //join all the threads
         for(int i=0; i<used; i++)
         {
            threads[i].join();
         }
      }
      catch(Exception &e)
      {
         std::cout << e.displayText() << std::endl;
      }
   }
private:
   //data
   int m_port;
   ServerSocket m_socket;
};

int main(int argc,char **argv)
{
   ConnectionHandler connHandler(2222);
   Thread connHandlerThread;
   connHandlerThread.start(connHandler);
   connHandlerThread.join();

   return 0;
}
gtbasher
 
Posts: 8
Joined: 15 Sep 2012, 14:23


Return to General Discussion

Who is online

Users browsing this forum: No registered users and 0 guests

cron