Initiator.cpp
Go to the documentation of this file.
1/****************************************************************************
2** Copyright (c) 2001-2014
3**
4** This file is part of the QuickFIX FIX Engine
5**
6** This file may be distributed under the terms of the quickfixengine.org
7** license as defined by quickfixengine.org and appearing in the file
8** LICENSE included in the packaging of this file.
9**
10** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
11** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
12**
13** See http://www.quickfixengine.org/LICENSE for licensing information.
14**
15** Contact ask@quickfixengine.org if any conditions of this licensing are
16** not clear to you.
17**
18****************************************************************************/
19
20#ifdef _MSC_VER
21#include "stdafx.h"
22#else
23#include "config.h"
24#endif
25
26#include "Initiator.h"
27#include "Utility.h"
28#include "Session.h"
29#include "SessionFactory.h"
30#include "HttpServer.h"
31#include <algorithm>
32#include <fstream>
33
34namespace FIX
35{
37 MessageStoreFactory& messageStoreFactory,
38 const SessionSettings& settings ) throw( ConfigError )
39: m_threadid( 0 ),
40 m_application( application ),
41 m_messageStoreFactory( messageStoreFactory ),
42 m_settings( settings ),
43 m_pLogFactory( 0 ),
44 m_pLog( 0 ),
45 m_firstPoll( true ),
46 m_stop( true )
47{ initialize(); }
48
50 MessageStoreFactory& messageStoreFactory,
51 const SessionSettings& settings,
52 LogFactory& logFactory ) throw( ConfigError )
53: m_threadid( 0 ),
54 m_application( application ),
55 m_messageStoreFactory( messageStoreFactory ),
56 m_settings( settings ),
57 m_pLogFactory( &logFactory ),
58 m_pLog( logFactory.create() ),
59 m_firstPoll( true ),
60 m_stop( true )
61{ initialize(); }
62
64{
65 std::set < SessionID > sessions = m_settings.getSessions();
66 std::set < SessionID > ::iterator i;
67
68 if ( !sessions.size() )
69 throw ConfigError( "No sessions defined" );
70
73
74 for ( i = sessions.begin(); i != sessions.end(); ++i )
75 {
76 if ( m_settings.get( *i ).getString( "ConnectionType" ) == "initiator" )
77 {
78 m_sessionIDs.insert( *i );
79 m_sessions[ *i ] = factory.create( *i, m_settings.get( *i ) );
80 setDisconnected( *i );
81 }
82 }
83
84 if ( !m_sessions.size() )
85 throw ConfigError( "No sessions defined for initiator" );
86}
87
89{
90 Sessions::iterator i;
91 for ( i = m_sessions.begin(); i != m_sessions.end(); ++i )
92 delete i->second;
93
94 if( m_pLogFactory && m_pLog )
96}
97
99 Responder& responder )
100{
101 Sessions::iterator i = m_sessions.find( sessionID );
102 if ( i != m_sessions.end() )
103 {
104 i->second->setResponder( &responder );
105 return i->second;
106 }
107 return 0;
108}
109
110Session* Initiator::getSession( const SessionID& sessionID ) const
111{
112 Sessions::const_iterator i = m_sessions.find( sessionID );
113 if( i != m_sessions.end() )
114 return i->second;
115 else
116 return 0;
117}
118
119const Dictionary* const Initiator::getSessionSettings( const SessionID& sessionID ) const
120{
121 try
122 {
123 return &m_settings.get( sessionID );
124 }
125 catch( ConfigError& )
126 {
127 return 0;
128 }
129}
130
132{
133 Locker l(m_mutex);
134
135 SessionIDs disconnected = m_disconnected;
136 SessionIDs::iterator i = disconnected.begin();
137 for ( ; i != disconnected.end(); ++i )
138 {
139 Session* pSession = Session::lookupSession( *i );
140 if ( pSession->isEnabled() && pSession->isSessionTime(UtcTimeStamp()) )
141 doConnect( *i, m_settings.get( *i ));
142 }
143}
144
145void Initiator::setPending( const SessionID& sessionID )
146{
147 Locker l(m_mutex);
148
149 m_pending.insert( sessionID );
150 m_connected.erase( sessionID );
151 m_disconnected.erase( sessionID );
152}
153
154void Initiator::setConnected( const SessionID& sessionID )
155{
156 Locker l(m_mutex);
157
158 m_pending.erase( sessionID );
159 m_connected.insert( sessionID );
160 m_disconnected.erase( sessionID );
161}
162
163void Initiator::setDisconnected( const SessionID& sessionID )
164{
165 Locker l(m_mutex);
166
167 m_pending.erase( sessionID );
168 m_connected.erase( sessionID );
169 m_disconnected.insert( sessionID );
170}
171
172bool Initiator::isPending( const SessionID& sessionID )
173{
174 Locker l(m_mutex);
175 return m_pending.find( sessionID ) != m_pending.end();
176}
177
178bool Initiator::isConnected( const SessionID& sessionID )
179{
180 Locker l(m_mutex);
181 return m_connected.find( sessionID ) != m_connected.end();
182}
183
184bool Initiator::isDisconnected( const SessionID& sessionID )
185{
186 Locker l(m_mutex);
187 return m_disconnected.find( sessionID ) != m_disconnected.end();
188}
189
191{
192 m_stop = false;
195
197
198 if( !thread_spawn( &startThread, this, m_threadid ) )
199 throw RuntimeError("Unable to spawn thread");
200}
201
202
204{
205 m_stop = false;
208
209 startThread(this);
210}
211
212bool Initiator::poll( double timeout ) throw ( ConfigError, RuntimeError )
213{
214 if( m_firstPoll )
215 {
216 m_stop = false;
217 onConfigure( m_settings );
218 onInitialize( m_settings );
219 connect();
220 m_firstPoll = false;
221 }
222
223 return onPoll( timeout );
224}
225
226void Initiator::stop( bool force )
227{
228 if( isStopped() ) return;
229
231
232 std::vector<Session*> enabledSessions;
233
234 SessionIDs connected;
235
236 {
237 Locker l(m_mutex);
238 connected = m_connected;
239 }
240
241 SessionIDs::iterator i = connected.begin();
242 for ( ; i != connected.end(); ++i )
243 {
244 Session* pSession = Session::lookupSession(*i);
245 if( pSession && pSession->isEnabled() )
246 {
247 enabledSessions.push_back( pSession );
248 pSession->logout();
249 }
250 }
251
252 if( !force )
253 {
254 for ( int second = 1; second <= 10 && isLoggedOn(); ++second )
255 process_sleep( 1 );
256 }
257
258 {
259 Locker l(m_mutex);
260 for ( i = connected.begin(); i != connected.end(); ++i )
262 }
263
264 m_stop = true;
265 onStop();
266 if( m_threadid )
268 m_threadid = 0;
269
270 std::vector<Session*>::iterator session = enabledSessions.begin();
271 for( ; session != enabledSessions.end(); ++session )
272 (*session)->logon();
273}
274
276{
277 Locker l(m_mutex);
278
279 SessionIDs connected = m_connected;
280 SessionIDs::iterator i = connected.begin();
281 for ( ; i != connected.end(); ++i )
282 {
284 return true;
285 }
286 return false;
287}
288
290{
291 Initiator * pInitiator = static_cast < Initiator* > ( p );
292 pInitiator->onStart();
293 return 0;
294}
295}
#define THREAD_PROC
Definition Utility.h:184
This interface must be implemented to define what your FIX application does.
Definition Application.h:44
For storage and retrieval of key/value pairs.
Definition Dictionary.h:37
std::string getString(const std::string &, bool capitalize=false) const
Get a value as a string.
static void startGlobal(const SessionSettings &)
static void stopGlobal()
Base for classes which act as an initiator for establishing connections.
Definition Initiator.h:52
const Dictionary *const getSessionSettings(const SessionID &sessionID) const
virtual void onStart()=0
Implemented to start connecting to targets.
thread_id m_threadid
Definition Initiator.h:135
SessionIDs m_connected
Definition Initiator.h:131
bool isLoggedOn()
Check to see if any sessions are currently logged on.
void setConnected(const SessionID &)
std::set< SessionID > SessionIDs
Definition Initiator.h:124
void setPending(const SessionID &)
static THREAD_PROC startThread(void *p)
void initialize()
Definition Initiator.cpp:63
void stop(bool force=false)
Stop initiator.
void setDisconnected(const SessionID &)
Initiator(Application &, MessageStoreFactory &, const SessionSettings &)
Definition Initiator.cpp:36
SessionIDs m_pending
Definition Initiator.h:130
void start()
Start initiator.
virtual void doConnect(const SessionID &, const Dictionary &)=0
Implemented to connect a session to its target.
Session * getSession(const SessionID &sessionID, Responder &)
Definition Initiator.cpp:98
Sessions m_sessions
Definition Initiator.h:128
bool poll(double timeout=0.0)
Poll the initiator.
bool isDisconnected(const SessionID &)
SessionIDs m_disconnected
Definition Initiator.h:132
void block()
Block on the initiator.
bool isConnected(const SessionID &)
virtual void onConfigure(const SessionSettings &)
Implemented to configure acceptor.
Definition Initiator.h:110
LogFactory * m_pLogFactory
Definition Initiator.h:141
SessionIDs m_sessionIDs
Definition Initiator.h:129
virtual void onInitialize(const SessionSettings &)
Implemented to initialize initiator.
Definition Initiator.h:112
SessionSettings m_settings
Definition Initiator.h:139
bool isStopped()
Definition Initiator.h:83
bool isPending(const SessionID &)
virtual ~Initiator()
Definition Initiator.cpp:88
Application & m_application
Definition Initiator.h:136
MessageStoreFactory & m_messageStoreFactory
Definition Initiator.h:137
virtual void onStop()=0
Implemented to stop a running initiator.
Locks/Unlocks a mutex using RAII.
Definition Mutex.h:96
This interface must be implemented to create a Log.
Definition Log.h:43
virtual void destroy(Log *)=0
This interface must be implemented to create a MessageStore.
Interface implements sending on and disconnecting a transport.
Definition Responder.h:35
Responsible for creating Session objects.
Session * create(const SessionID &sessionID, const Dictionary &settings)
Maintains the state and implements the logic of a FIX session.
Definition Session.h:46
void logout(const std::string &reason="")
Definition Session.h:57
void setResponder(Responder *pR)
Definition Session.h:210
bool isEnabled()
Definition Session.h:59
bool isSessionTime(const UtcTimeStamp &time)
Definition Session.h:108
static Session * lookupSession(const SessionID &)
Definition Session.cpp:1496
const SessionID & getSessionID() const
Definition Session.h:75
bool isLoggedOn()
Definition Session.h:65
Unique session id consists of BeginString, SenderCompID and TargetCompID.
Definition SessionID.h:31
Container for setting dictionaries mapped to sessions.
std::set< SessionID > getSessions() const
const Dictionary & get(const SessionID &) const
Get a dictionary for a session.
Date and Time represented in UTC.
Definition FieldTypes.h:583
bool thread_spawn(THREAD_START_ROUTINE func, void *var, thread_id &thread)
Definition Utility.cpp:416
void process_sleep(double s)
Definition Utility.cpp:466
void thread_join(thread_id thread)
Definition Utility.cpp:437
Application is not configured correctly
Definition Exceptions.h:88
Application encountered serious error during runtime
Definition Exceptions.h:95

Generated on Mon Mar 4 2024 21:10:02 for QuickFIX by doxygen 1.9.8 written by Dimitri van Heesch, © 1997-2001