Classes | Public Member Functions | Private Types | Private Member Functions | Static Private Member Functions | Private Attributes | Friends | List of all members
FIX::ThreadedSocketAcceptor Class Reference

Threaded Socket implementation of Acceptor. More...

#include <ThreadedSocketAcceptor.h>

Inheritance diagram for FIX::ThreadedSocketAcceptor:
Inheritance graph
[legend]
Collaboration diagram for FIX::ThreadedSocketAcceptor:
Collaboration graph
[legend]

Classes

struct  AcceptorThreadInfo
 
struct  ConnectionThreadInfo
 

Public Member Functions

 ThreadedSocketAcceptor (Application &, MessageStoreFactory &, const SessionSettings &) throw ( ConfigError )
 
 ThreadedSocketAcceptor (Application &, MessageStoreFactory &, const SessionSettings &, LogFactory &) throw ( ConfigError )
 
virtual ~ThreadedSocketAcceptor ()
 
- Public Member Functions inherited from FIX::Acceptor
 Acceptor (Application &, MessageStoreFactory &, const SessionSettings &) throw ( ConfigError )
 
 Acceptor (Application &, MessageStoreFactory &, const SessionSettings &, LogFactory &) throw ( ConfigError )
 
virtual ~Acceptor ()
 
LoggetLog ()
 
void start () throw ( ConfigError, RuntimeError )
 Start acceptor.
 
void block () throw ( ConfigError, RuntimeError )
 Block on the acceptor.
 
bool poll (double timeout=0.0) throw ( ConfigError, RuntimeError )
 Poll the acceptor.
 
void stop (bool force=false)
 Stop acceptor.
 
bool isLoggedOn ()
 Check to see if any sessions are currently logged on.
 
SessiongetSession (const std::string &msg, Responder &)
 
const std::set< SessionID > & getSessions () const
 
SessiongetSession (const SessionID &sessionID) const
 
const Dictionary *const getSessionSettings (const SessionID &sessionID) const
 
bool has (const SessionID &id)
 
bool isStopped ()
 
ApplicationgetApplication ()
 
MessageStoreFactorygetMessageStoreFactory ()
 

Private Types

typedef std::set< int > Sockets
 
typedef std::set< SessionIDSessions
 
typedef std::map< int, SessionsPortToSessions
 
typedef std::map< int, int > SocketToPort
 
typedef std::map< int, thread_idSocketToThread
 

Private Member Functions

bool readSettings (const SessionSettings &)
 
void onConfigure (const SessionSettings &) throw ( ConfigError )
 Implemented to configure acceptor.
 
void onInitialize (const SessionSettings &) throw ( RuntimeError )
 Implemented to initialize acceptor.
 
void onStart ()
 Implemented to start listening for connections.
 
bool onPoll (double timeout)
 Implemented to connect and poll for events.
 
void onStop ()
 Implemented to stop a running acceptor.
 
void addThread (int s, thread_id t)
 
void removeThread (int s)
 

Static Private Member Functions

static THREAD_PROC socketAcceptorThread (void *p)
 
static THREAD_PROC socketConnectionThread (void *p)
 

Private Attributes

Sockets m_sockets
 
PortToSessions m_portToSessions
 
SocketToPort m_socketToPort
 
SocketToThread m_threads
 
Mutex m_mutex
 

Friends

class SocketConnection
 

Additional Inherited Members

- Protected Attributes inherited from FIX::Acceptor
SessionSettings m_settings
 

Detailed Description

Threaded Socket implementation of Acceptor.

Definition at line 36 of file ThreadedSocketAcceptor.h.

Member Typedef Documentation

◆ PortToSessions

typedef std::map< int, Sessions > FIX::ThreadedSocketAcceptor::PortToSessions
private

Definition at line 73 of file ThreadedSocketAcceptor.h.

◆ Sessions

typedef std::set< SessionID > FIX::ThreadedSocketAcceptor::Sessions
private

Definition at line 72 of file ThreadedSocketAcceptor.h.

◆ Sockets

typedef std::set< int > FIX::ThreadedSocketAcceptor::Sockets
private

Definition at line 71 of file ThreadedSocketAcceptor.h.

◆ SocketToPort

typedef std::map< int, int > FIX::ThreadedSocketAcceptor::SocketToPort
private

Definition at line 74 of file ThreadedSocketAcceptor.h.

◆ SocketToThread

typedef std::map< int, thread_id > FIX::ThreadedSocketAcceptor::SocketToThread
private

Definition at line 75 of file ThreadedSocketAcceptor.h.

Constructor & Destructor Documentation

◆ ThreadedSocketAcceptor() [1/2]

FIX::ThreadedSocketAcceptor::ThreadedSocketAcceptor ( Application application,
MessageStoreFactory factory,
const SessionSettings settings 
)
throw (ConfigError
)

Definition at line 32 of file ThreadedSocketAcceptor.cpp.

36: Acceptor( application, factory, settings )
37{ socket_init(); }
Acceptor(Application &, MessageStoreFactory &, const SessionSettings &)
Definition Acceptor.cpp:36
void socket_init()
Definition Utility.cpp:81

References FIX::socket_init().

◆ ThreadedSocketAcceptor() [2/2]

FIX::ThreadedSocketAcceptor::ThreadedSocketAcceptor ( Application application,
MessageStoreFactory factory,
const SessionSettings settings,
LogFactory logFactory 
)
throw (ConfigError
)

Definition at line 39 of file ThreadedSocketAcceptor.cpp.

44: Acceptor( application, factory, settings, logFactory )
45{
46 socket_init();
47}

References FIX::socket_init().

◆ ~ThreadedSocketAcceptor()

FIX::ThreadedSocketAcceptor::~ThreadedSocketAcceptor ( )
virtual

Definition at line 49 of file ThreadedSocketAcceptor.cpp.

50{
51 socket_term();
52}
void socket_term()
Definition Utility.cpp:96

References FIX::socket_term().

Member Function Documentation

◆ addThread()

void FIX::ThreadedSocketAcceptor::addThread ( int  s,
thread_id  t 
)
private

Definition at line 168 of file ThreadedSocketAcceptor.cpp.

169{
170 Locker l(m_mutex);
171
172 m_threads[ s ] = t;
173}

References m_mutex, and m_threads.

Referenced by onStart(), and socketAcceptorThread().

◆ onConfigure()

void FIX::ThreadedSocketAcceptor::onConfigure ( const SessionSettings )
throw (ConfigError
)
privatevirtual

Implemented to configure acceptor.

Reimplemented from FIX::Acceptor.

Definition at line 54 of file ThreadedSocketAcceptor.cpp.

56{
57 std::set<SessionID> sessions = s.getSessions();
58 std::set<SessionID>::iterator i;
59 for( i = sessions.begin(); i != sessions.end(); ++i )
60 {
61 const Dictionary& settings = s.get( *i );
62 settings.getInt( SOCKET_ACCEPT_PORT );
63 if( settings.has(SOCKET_REUSE_ADDRESS) )
64 settings.getBool( SOCKET_REUSE_ADDRESS );
65 if( settings.has(SOCKET_NODELAY) )
66 settings.getBool( SOCKET_NODELAY );
67 }
68}
const char SOCKET_ACCEPT_PORT[]
const char SOCKET_REUSE_ADDRESS[]
const char SOCKET_NODELAY[]

References FIX::Dictionary::getBool(), FIX::Dictionary::getInt(), FIX::Dictionary::has(), FIX::SOCKET_ACCEPT_PORT, FIX::SOCKET_NODELAY, and FIX::SOCKET_REUSE_ADDRESS.

◆ onInitialize()

void FIX::ThreadedSocketAcceptor::onInitialize ( const SessionSettings )
throw (RuntimeError
)
privatevirtual

Implemented to initialize acceptor.

Reimplemented from FIX::Acceptor.

Definition at line 70 of file ThreadedSocketAcceptor.cpp.

72{
73 short port = 0;
74 std::set<int> ports;
75
76 std::set<SessionID> sessions = s.getSessions();
77 std::set<SessionID>::iterator i = sessions.begin();
78 for( ; i != sessions.end(); ++i )
79 {
80 const Dictionary& settings = s.get( *i );
81 port = (short)settings.getInt( SOCKET_ACCEPT_PORT );
82
83 m_portToSessions[port].insert( *i );
84
85 if( ports.find(port) != ports.end() )
86 continue;
87 ports.insert( port );
88
89 const bool reuseAddress = settings.has( SOCKET_REUSE_ADDRESS ) ?
90 settings.getBool( SOCKET_REUSE_ADDRESS ) : true;
91
92 const bool noDelay = settings.has( SOCKET_NODELAY ) ?
93 settings.getBool( SOCKET_NODELAY ) : false;
94
95 const int sendBufSize = settings.has( SOCKET_SEND_BUFFER_SIZE ) ?
96 settings.getInt( SOCKET_SEND_BUFFER_SIZE ) : 0;
97
98 const int rcvBufSize = settings.has( SOCKET_RECEIVE_BUFFER_SIZE ) ?
99 settings.getInt( SOCKET_RECEIVE_BUFFER_SIZE ) : 0;
100
101 int socket = socket_createAcceptor( port, reuseAddress );
102 if( socket < 0 )
103 {
104 SocketException e;
105 socket_close( socket );
106 throw RuntimeError( "Unable to create, bind, or listen to port "
107 + IntConvertor::convert( (unsigned short)port ) + " (" + e.what() + ")" );
108 }
109 if( noDelay )
110 socket_setsockopt( socket, TCP_NODELAY );
111 if( sendBufSize )
112 socket_setsockopt( socket, SO_SNDBUF, sendBufSize );
113 if( rcvBufSize )
114 socket_setsockopt( socket, SO_RCVBUF, rcvBufSize );
115
116 m_socketToPort[socket] = port;
117 m_sockets.insert( socket );
118 }
119}
const char SOCKET_SEND_BUFFER_SIZE[]
int socket_setsockopt(int s, int opt)
Definition Utility.cpp:208
void socket_close(int s)
Definition Utility.cpp:180
const char SOCKET_RECEIVE_BUFFER_SIZE[]
int socket_createAcceptor(int port, bool reuse)
Definition Utility.cpp:120
static std::string convert(signed_int value)

References FIX::IntConvertor::convert(), FIX::Dictionary::getBool(), FIX::Dictionary::getInt(), FIX::Dictionary::has(), FIX::SOCKET_ACCEPT_PORT, FIX::socket_close(), FIX::socket_createAcceptor(), FIX::SOCKET_NODELAY, FIX::SOCKET_RECEIVE_BUFFER_SIZE, FIX::SOCKET_REUSE_ADDRESS, FIX::SOCKET_SEND_BUFFER_SIZE, and FIX::socket_setsockopt().

◆ onPoll()

bool FIX::ThreadedSocketAcceptor::onPoll ( double  second)
privatevirtual

Implemented to connect and poll for events.

Implements FIX::Acceptor.

Definition at line 135 of file ThreadedSocketAcceptor.cpp.

136{
137 return false;
138}

◆ onStart()

void FIX::ThreadedSocketAcceptor::onStart ( )
privatevirtual

Implemented to start listening for connections.

Implements FIX::Acceptor.

Definition at line 121 of file ThreadedSocketAcceptor.cpp.

122{
123 Sockets::iterator i;
124 for( i = m_sockets.begin(); i != m_sockets.end(); ++i )
125 {
126 Locker l( m_mutex );
127 int port = m_socketToPort[*i];
128 AcceptorThreadInfo* info = new AcceptorThreadInfo( this, *i, port );
129 thread_id thread;
130 thread_spawn( &socketAcceptorThread, info, thread );
131 addThread( *i, thread );
132 }
133}
static THREAD_PROC socketAcceptorThread(void *p)
pthread_t thread_id
Definition Utility.h:190
bool thread_spawn(THREAD_START_ROUTINE func, void *var, thread_id &thread)
Definition Utility.cpp:416

References addThread(), m_mutex, m_sockets, m_socketToPort, socketAcceptorThread(), and FIX::thread_spawn().

◆ onStop()

void FIX::ThreadedSocketAcceptor::onStop ( )
privatevirtual

Implemented to stop a running acceptor.

Implements FIX::Acceptor.

Definition at line 140 of file ThreadedSocketAcceptor.cpp.

141{
142 SocketToThread threads;
143 SocketToThread::iterator i;
144
145 {
146 Locker l(m_mutex);
147
148 time_t start = 0;
149 time_t now = 0;
150
151 ::time( &start );
152 while ( isLoggedOn() )
153 {
154 if( ::time(&now) -5 >= start )
155 break;
156 }
157
158 threads = m_threads;
159 m_threads.clear();
160 }
161
162 for ( i = threads.begin(); i != threads.end(); ++i )
163 socket_close( i->first );
164 for ( i = threads.begin(); i != threads.end(); ++i )
165 thread_join( i->second );
166}
void start()
Start acceptor.
Definition Acceptor.cpp:158
bool isLoggedOn()
Check to see if any sessions are currently logged on.
Definition Acceptor.cpp:230
std::map< int, thread_id > SocketToThread
void thread_join(thread_id thread)
Definition Utility.cpp:437

References FIX::Acceptor::isLoggedOn(), m_mutex, m_threads, FIX::socket_close(), FIX::Acceptor::start(), and FIX::thread_join().

◆ readSettings()

bool FIX::ThreadedSocketAcceptor::readSettings ( const SessionSettings )
private

◆ removeThread()

void FIX::ThreadedSocketAcceptor::removeThread ( int  s)
private

Definition at line 175 of file ThreadedSocketAcceptor.cpp.

176{
177 Locker l(m_mutex);
178 SocketToThread::iterator i = m_threads.find( s );
179 if ( i != m_threads.end() )
180 {
181 thread_detach( i->second );
182 m_threads.erase( i );
183 }
184}
void thread_detach(thread_id thread)
Definition Utility.cpp:447

References m_mutex, m_threads, and FIX::thread_detach().

Referenced by socketAcceptorThread(), and socketConnectionThread().

◆ socketAcceptorThread()

THREAD_PROC FIX::ThreadedSocketAcceptor::socketAcceptorThread ( void *  p)
staticprivate

Definition at line 186 of file ThreadedSocketAcceptor.cpp.

187{
188 AcceptorThreadInfo * info = reinterpret_cast < AcceptorThreadInfo* > ( p );
189
190 ThreadedSocketAcceptor* pAcceptor = info->m_pAcceptor;
191 int s = info->m_socket;
192 int port = info->m_port;
193 delete info;
194
195 int noDelay = 0;
196 int sendBufSize = 0;
197 int rcvBufSize = 0;
198 socket_getsockopt( s, TCP_NODELAY, noDelay );
199 socket_getsockopt( s, SO_SNDBUF, sendBufSize );
200 socket_getsockopt( s, SO_RCVBUF, rcvBufSize );
201
202 int socket = 0;
203 while ( ( !pAcceptor->isStopped() && ( socket = socket_accept( s ) ) >= 0 ) )
204 {
205 if( noDelay )
206 socket_setsockopt( socket, TCP_NODELAY );
207 if( sendBufSize )
208 socket_setsockopt( socket, SO_SNDBUF, sendBufSize );
209 if( rcvBufSize )
210 socket_setsockopt( socket, SO_RCVBUF, rcvBufSize );
211
212 Sessions sessions = pAcceptor->m_portToSessions[port];
213
214 ThreadedSocketConnection * pConnection =
215 new ThreadedSocketConnection
216 ( socket, sessions, pAcceptor->getLog() );
217
218 ConnectionThreadInfo* info = new ConnectionThreadInfo( pAcceptor, pConnection );
219
220 {
221 Locker l( pAcceptor->m_mutex );
222
223 std::stringstream stream;
224 stream << "Accepted connection from " << socket_peername( socket ) << " on port " << port;
225
226 if( pAcceptor->getLog() )
227 pAcceptor->getLog()->onEvent( stream.str() );
228
229 thread_id thread;
230 if ( !thread_spawn( &socketConnectionThread, info, thread ) )
231 {
232 delete info;
233 delete pConnection;
234 }
235 else
236 pAcceptor->addThread( socket, thread );
237 }
238 }
239
240 if( !pAcceptor->isStopped() )
241 pAcceptor->removeThread( s );
242
243 return 0;
244}
static THREAD_PROC socketConnectionThread(void *p)
ThreadedSocketAcceptor(Application &, MessageStoreFactory &, const SessionSettings &)
int socket_accept(int s)
Definition Utility.cpp:164
int socket_getsockopt(int s, int opt, int &optval)
Definition Utility.cpp:233
const char * socket_peername(int socket)
Definition Utility.cpp:353

References addThread(), FIX::Acceptor::getLog(), FIX::Acceptor::isStopped(), m_mutex, FIX::ThreadedSocketAcceptor::AcceptorThreadInfo::m_pAcceptor, FIX::ThreadedSocketAcceptor::AcceptorThreadInfo::m_port, m_portToSessions, FIX::ThreadedSocketAcceptor::AcceptorThreadInfo::m_socket, FIX::Log::onEvent(), removeThread(), FIX::socket_accept(), FIX::socket_getsockopt(), FIX::socket_peername(), FIX::socket_setsockopt(), socketConnectionThread(), and FIX::thread_spawn().

Referenced by onStart().

◆ socketConnectionThread()

THREAD_PROC FIX::ThreadedSocketAcceptor::socketConnectionThread ( void *  p)
staticprivate

Definition at line 246 of file ThreadedSocketAcceptor.cpp.

247{
248 ConnectionThreadInfo * info = reinterpret_cast < ConnectionThreadInfo* > ( p );
249
250 ThreadedSocketAcceptor* pAcceptor = info->m_pAcceptor;
251 ThreadedSocketConnection* pConnection = info->m_pConnection;
252 delete info;
253
254 int socket = pConnection->getSocket();
255
256 while ( pConnection->read() ) {}
257 delete pConnection;
258 if( !pAcceptor->isStopped() )
259 pAcceptor->removeThread( socket );
260 return 0;
261}

References FIX::ThreadedSocketConnection::getSocket(), FIX::Acceptor::isStopped(), FIX::ThreadedSocketAcceptor::ConnectionThreadInfo::m_pAcceptor, FIX::ThreadedSocketAcceptor::ConnectionThreadInfo::m_pConnection, FIX::ThreadedSocketConnection::read(), and removeThread().

Referenced by socketAcceptorThread().

Friends And Related Symbol Documentation

◆ SocketConnection

friend class SocketConnection
friend

Definition at line 38 of file ThreadedSocketAcceptor.h.

Member Data Documentation

◆ m_mutex

Mutex FIX::ThreadedSocketAcceptor::m_mutex
private

◆ m_portToSessions

PortToSessions FIX::ThreadedSocketAcceptor::m_portToSessions
private

Definition at line 90 of file ThreadedSocketAcceptor.h.

Referenced by socketAcceptorThread().

◆ m_sockets

Sockets FIX::ThreadedSocketAcceptor::m_sockets
private

Definition at line 89 of file ThreadedSocketAcceptor.h.

Referenced by onStart().

◆ m_socketToPort

SocketToPort FIX::ThreadedSocketAcceptor::m_socketToPort
private

Definition at line 91 of file ThreadedSocketAcceptor.h.

Referenced by onStart().

◆ m_threads

SocketToThread FIX::ThreadedSocketAcceptor::m_threads
private

Definition at line 92 of file ThreadedSocketAcceptor.h.

Referenced by addThread(), onStop(), and removeThread().


The documentation for this class was generated from the following files:

Generated on Thu Feb 29 2024 22:38:19 for QuickFIX by doxygen 1.9.8 written by Dimitri van Heesch, © 1997-2001