ThreadedSocketInitiator.cpp
Go to the documentation of this file.
1/****************************************************************************
2** Copyright (c) 2001-2014
3**
4** This file is part of the QuickFIX FIX Engine
5**
6** This file may be distributed under the terms of the quickfixengine.org
7** license as defined by quickfixengine.org and appearing in the file
8** LICENSE included in the packaging of this file.
9**
10** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
11** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
12**
13** See http://www.quickfixengine.org/LICENSE for licensing information.
14**
15** Contact ask@quickfixengine.org if any conditions of this licensing are
16** not clear to you.
17**
18****************************************************************************/
19
20#ifdef _MSC_VER
21#include "stdafx.h"
22#else
23#include "config.h"
24#endif
25
27#include "Session.h"
28#include "Settings.h"
29
30namespace FIX
31{
33 Application& application,
34 MessageStoreFactory& factory,
35 const SessionSettings& settings ) throw( ConfigError )
36: Initiator( application, factory, settings ),
37 m_lastConnect( 0 ), m_reconnectInterval( 30 ), m_noDelay( false ),
38 m_sendBufSize( 0 ), m_rcvBufSize( 0 )
39{
40 socket_init();
41}
42
44 Application& application,
45 MessageStoreFactory& factory,
46 const SessionSettings& settings,
47 LogFactory& logFactory ) throw( ConfigError )
48: Initiator( application, factory, settings, logFactory ),
49 m_lastConnect( 0 ), m_reconnectInterval( 30 ), m_noDelay( false ),
50 m_sendBufSize( 0 ), m_rcvBufSize( 0 )
51{
52 socket_init();
53}
54
59
61throw ( ConfigError )
62{
63 const Dictionary& dict = s.get();
64
65 if( dict.has( RECONNECT_INTERVAL ) )
66 m_reconnectInterval = dict.getInt( RECONNECT_INTERVAL );
67 if( dict.has( SOCKET_NODELAY ) )
68 m_noDelay = dict.getBool( SOCKET_NODELAY );
69 if( dict.has( SOCKET_SEND_BUFFER_SIZE ) )
70 m_sendBufSize = dict.getInt( SOCKET_SEND_BUFFER_SIZE );
72 m_rcvBufSize = dict.getInt( SOCKET_RECEIVE_BUFFER_SIZE );
73}
74
79
81{
82 while ( !isStopped() )
83 {
84 time_t now;
85 ::time( &now );
86
87 if ( (now - m_lastConnect) >= m_reconnectInterval )
88 {
89 Locker l( m_mutex );
90 connect();
91 m_lastConnect = now;
92 }
93
94 process_sleep( 1 );
95 }
96}
97
98bool ThreadedSocketInitiator::onPoll( double timeout )
99{
100 return false;
101}
102
104{
105 SocketToThread threads;
106 SocketToThread::iterator i;
107
108 {
109 Locker l(m_mutex);
110
111 time_t start = 0;
112 time_t now = 0;
113
114 ::time( &start );
115 while ( isLoggedOn() )
116 {
117 if( ::time(&now) -5 >= start )
118 break;
119 }
120
121 threads = m_threads;
122 m_threads.clear();
123 }
124
125 for ( i = threads.begin(); i != threads.end(); ++i )
126 socket_close( i->first );
127
128 for ( i = threads.begin(); i != threads.end(); ++i )
129 thread_join( i->second );
130 threads.clear();
131}
132
134{
135 try
136 {
137 Session* session = Session::lookupSession( s );
138 if( !session->isSessionTime(UtcTimeStamp()) ) return;
139
140 Log* log = session->getLog();
141
142 std::string address;
143 short port = 0;
144 std::string sourceAddress;
145 short sourcePort = 0;
146 getHost( s, d, address, port, sourceAddress, sourcePort );
147
148 int socket = socket_createConnector();
149 if( m_noDelay )
150 socket_setsockopt( socket, TCP_NODELAY );
151 if( m_sendBufSize )
152 socket_setsockopt( socket, SO_SNDBUF, m_sendBufSize );
153 if( m_rcvBufSize )
154 socket_setsockopt( socket, SO_RCVBUF, m_rcvBufSize );
155
156 setPending( s );
157 log->onEvent( "Connecting to " + address + " on port " + IntConvertor::convert((unsigned short)port) + " (Source " + sourceAddress + ":" + IntConvertor::convert((unsigned short)sourcePort) + ")");
158
159 ThreadedSocketConnection* pConnection =
160 new ThreadedSocketConnection( s, socket, address, port, getLog(), sourceAddress, sourcePort );
161
162 ThreadPair* pair = new ThreadPair( this, pConnection );
163
164 {
165 Locker l( m_mutex );
166 thread_id thread;
167 if ( thread_spawn( &socketThread, pair, thread ) )
168 {
169 addThread( socket, thread );
170 }
171 else
172 {
173 delete pair;
174 pConnection->disconnect();
175 delete pConnection;
176 setDisconnected( s );
177 }
178 }
179 }
180 catch ( std::exception& ) {}
181}
182
184{
185 Locker l(m_mutex);
186
187 m_threads[ s ] = t;
188}
189
191{
192 Locker l(m_mutex);
193 SocketToThread::iterator i = m_threads.find( s );
194
195 if ( i != m_threads.end() )
196 {
197 thread_detach( i->second );
198 m_threads.erase( i );
199 }
200}
201
203{
204 ThreadPair * pair = reinterpret_cast < ThreadPair* > ( p );
205
206 ThreadedSocketInitiator* pInitiator = pair->first;
207 ThreadedSocketConnection* pConnection = pair->second;
208 FIX::SessionID sessionID = pConnection->getSession()->getSessionID();
209 FIX::Session* pSession = FIX::Session::lookupSession( sessionID );
210 int socket = pConnection->getSocket();
211 delete pair;
212
213 pInitiator->lock();
214
215 if( !pConnection->connect() )
216 {
217 pInitiator->getLog()->onEvent( "Connection failed" );
218 pConnection->disconnect();
219 delete pConnection;
220 pInitiator->removeThread( socket );
221 pInitiator->setDisconnected( sessionID );
222 return 0;
223 }
224
225 pInitiator->setConnected( sessionID );
226 pInitiator->getLog()->onEvent( "Connection succeeded" );
227
228 pSession->next();
229
230 while ( pConnection->read() ) {}
231
232 delete pConnection;
233 if( !pInitiator->isStopped() )
234 pInitiator->removeThread( socket );
235
236 pInitiator->setDisconnected( sessionID );
237 return 0;
238}
239
241 std::string& address, short& port,
242 std::string& sourceAddress, short& sourcePort )
243{
244 int num = 0;
245 SessionToHostNum::iterator i = m_sessionToHostNum.find( s );
246 if ( i != m_sessionToHostNum.end() ) num = i->second;
247
248 std::stringstream hostStream;
249 hostStream << SOCKET_CONNECT_HOST << num;
250 std::string hostString = hostStream.str();
251
252 std::stringstream portStream;
253 portStream << SOCKET_CONNECT_PORT << num;
254 std::string portString = portStream.str();
255
256 if( d.has(hostString) && d.has(portString) )
257 {
258 address = d.getString( hostString );
259 port = ( short ) d.getInt( portString );
260
261 std::stringstream sourceHostStream;
262 sourceHostStream << SOCKET_CONNECT_SOURCE_HOST << num;
263 hostString = sourceHostStream.str();
264 if( d.has(hostString) )
265 sourceAddress = d.getString( hostString );
266
267 std::stringstream sourcePortStream;
268 sourcePortStream << SOCKET_CONNECT_SOURCE_PORT << num;
269 portString = sourcePortStream.str();
270 if( d.has(portString) )
271 sourcePort = ( short ) d.getInt( portString );
272 }
273 else
274 {
275 num = 0;
276 address = d.getString( SOCKET_CONNECT_HOST );
277 port = ( short ) d.getInt( SOCKET_CONNECT_PORT );
278
280 sourceAddress = d.getString( SOCKET_CONNECT_SOURCE_HOST );
282 sourcePort = ( short ) d.getInt( SOCKET_CONNECT_SOURCE_PORT );
283 }
284
285 m_sessionToHostNum[ s ] = ++num;
286}
287
288}
#define THREAD_PROC
Definition Utility.h:184
This interface must be implemented to define what your FIX application does.
Definition Application.h:44
For storage and retrieval of key/value pairs.
Definition Dictionary.h:37
int getInt(const std::string &) const
Get a value as a int.
bool getBool(const std::string &) const
Get a value as a bool.
bool has(const std::string &) const
Check if the dictionary contains a value for key.
std::string getString(const std::string &, bool capitalize=false) const
Get a value as a string.
Base for classes which act as an initiator for establishing connections.
Definition Initiator.h:52
bool isLoggedOn()
Check to see if any sessions are currently logged on.
void setConnected(const SessionID &)
void setPending(const SessionID &)
void setDisconnected(const SessionID &)
void start()
Start initiator.
Log * getLog()
Definition Initiator.h:90
bool isStopped()
Definition Initiator.h:83
Locks/Unlocks a mutex using RAII.
Definition Mutex.h:96
This interface must be implemented to create a Log.
Definition Log.h:43
This interface must be implemented to log messages and events.
Definition Log.h:82
virtual void onEvent(const std::string &)=0
This interface must be implemented to create a MessageStore.
Maintains the state and implements the logic of a FIX session.
Definition Session.h:46
bool isSessionTime(const UtcTimeStamp &time)
Definition Session.h:108
static Session * lookupSession(const SessionID &)
Definition Session.cpp:1496
const SessionID & getSessionID() const
Definition Session.h:75
Log * getLog()
Definition Session.h:227
void next()
Definition Session.cpp:125
Unique session id consists of BeginString, SenderCompID and TargetCompID.
Definition SessionID.h:31
Container for setting dictionaries mapped to sessions.
Encapsulates a socket file descriptor (multi-threaded).
Threaded Socket implementation of Initiator.
void onInitialize(const SessionSettings &)
Implemented to initialize initiator.
std::map< int, thread_id > SocketToThread
void onStop()
Implemented to stop a running initiator.
ThreadedSocketInitiator(Application &, MessageStoreFactory &, const SessionSettings &)
std::pair< ThreadedSocketInitiator *, ThreadedSocketConnection * > ThreadPair
void getHost(const SessionID &, const Dictionary &, std::string &, short &, std::string &, short &)
bool onPoll(double timeout)
Implemented to connect and poll for events.
void onConfigure(const SessionSettings &)
Implemented to configure acceptor.
static THREAD_PROC socketThread(void *p)
void onStart()
Implemented to start connecting to targets.
void doConnect(const SessionID &s, const Dictionary &d)
Implemented to connect a session to its target.
Date and Time represented in UTC.
Definition FieldTypes.h:583
const char SOCKET_CONNECT_PORT[]
const char SOCKET_SEND_BUFFER_SIZE[]
int socket_setsockopt(int s, int opt)
Definition Utility.cpp:208
void thread_detach(thread_id thread)
Definition Utility.cpp:447
int socket_createConnector()
Definition Utility.cpp:143
pthread_t thread_id
Definition Utility.h:190
bool thread_spawn(THREAD_START_ROUTINE func, void *var, thread_id &thread)
Definition Utility.cpp:416
void socket_close(int s)
Definition Utility.cpp:180
void process_sleep(double s)
Definition Utility.cpp:466
void thread_join(thread_id thread)
Definition Utility.cpp:437
const char SOCKET_NODELAY[]
void socket_init()
Definition Utility.cpp:81
const char SOCKET_RECEIVE_BUFFER_SIZE[]
void socket_term()
Definition Utility.cpp:96
const char RECONNECT_INTERVAL[]
const char SOCKET_CONNECT_HOST[]
const char SOCKET_CONNECT_SOURCE_PORT[]
const char SOCKET_CONNECT_SOURCE_HOST[]
Application is not configured correctly
Definition Exceptions.h:88
static std::string convert(signed_int value)
Application encountered serious error during runtime
Definition Exceptions.h:95

Generated on Mon Mar 4 2024 21:10:02 for QuickFIX by doxygen 1.9.8 written by Dimitri van Heesch, © 1997-2001