1.
Exture+는 Exture의 통신방식과 다릅니다. 동기식이 아닌 비동기식입니다. 비동기식이기때문에 과다 주문이 발생하여 매매체결시스템에 영향을 줄 수 있습니다. 통신방식과 알고리즘트레이딩에 대한 규제를 이유로 Exture+는 과다호가관리를 하고 있습니다. 영어로 하면 Throttle입니다.
구글에서 Throttle을 검색하면 웹에서 대역폭을 관리하는 기능으로 소개하고 있습니다. 틀리지 않습니다만 Exture+의 경우는 대역폭이 아닌 건수가 관리대상입니다. 자본시장에서 사용하는 메시징제품을 보면 Throttle기능이 내장되어 있습니다. 그중 소스로 확인할 수 있는 것이 OpenMAMA입니다. 소스를 보시면 아시겠지만 NYSE가 인수한 Wombat의 제품을 그대로 공개하였습니다.
아래는 소스입니다. FEP를 개발하시는 분들에게 도움이 되리라 생각합니다. 속도를 고려한다고 하면 FEP 보다는 알고리즘트레이딩을 하는 매매시스템에서 직접 관리하도록 하는 것도 방법입니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
/* $Id$ * * OpenMAMA: The open middleware agnostic messaging API * Copyright (C) 2011 NYSE Technologies, Inc. * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA * 02110-1301 USA */ #ifndef ThrottleH__ #define ThrottleH__ #include "mama/status.h" #include "list.h" /** * processes messages at a specified rate (msgs/sec). if the rate is 0 the * messages are sent as quickly as possible. * messages dispatched for throttling must provide an action callback. * */ #if defined(__cplusplus) extern "C" { #endif typedef struct wombatThrottle_* wombatThrottle; typedef void (*throttleCb)(void *closure1, void *closure2 ); typedef struct actionHandle* wombatThrottleAction; /** * allocate a new wombatThrottle. throttle is enabled by default. */ mama_status wombatThrottle_allocate (wombatThrottle *throttle); /** * crete a new wombatThrottle. */ mama_status wombatThrottle_create (wombatThrottle throttle, mamaQueue queue); /** * destroy this wombatThrottle and release any * resources associated with it. * destroy () waits for the dispatchThread to complete its current cycle before * killing it. Therefore, destroy () might take a long time to complete in some * cases. However, any messages remaining in the queue after the current * cycle are NOT processed. */ mama_status wombatThrottle_destroy (wombatThrottle throttle ); /** * Set the current throttle rate in messages/second. A rate less than or * equal to 0 disables throttling, and messages are sent as quickly as * possible. In this way, setRate () acts as a toggle for throttling. * * If rate is set to 0 while there are still messages waiting for dispatch, * those messages will be processed without delay by the dispatchThread. * * @param rate approximate desired throttle rate in msgs (jobs)/second */ void wombatThrottle_setRate (wombatThrottle throttle, double rate ); /** * Return the current throttle rate in messages per second. If throttling * is disabled, this method returns 0. * * @return the rate. 0 if throttling is disabled. */ double wombatThrottle_getRate (wombatThrottle throttle ); /** * Dispatch a throttled task. If throtle is disabled (mRate == 0), the task * is run immediately. Otherwise, the task is placed on the queue and run in * the order it was added according to the throttle rate. * * Specifying an owner allows tasks to be removed before they are dispatched. * This is useful if an object is being destroyed or closed. * * @param owner The owner of the task. * @param actionCB Function to invoke when the task is dispatched. * @param closure1 User-defined 1st parameter to actionCB function * @param closure2 User-defined 2nd parameter to actionCB function * @param immediate. If true put the action on the front of the throttle * queue. */ mama_status wombatThrottle_dispatch (wombatThrottle throttle, void* owner, throttleCb action, void* closure1, void* closure2, int immediate, wombatThrottleAction* handle); /** * Removed the messages in the queue for the specified owner. * * @param owner The message's "owner", or identifying void* */ mama_status wombatThrottle_removeMessagesForOwner (wombatThrottle throttle, void *owner ); /** * Removed the messages in the queue contained in the specified list of * pointers to wombatThrottleActions. * */ mama_status wombatThrottle_removeMessagesFromList (wombatThrottle throttle, wList list ); /** * Lock the throttle. Must be paired with call to wombatThrottle_unlock. * * The lock is recursive. */ void wombatThrottle_lock (wombatThrottle throttle ); /** * Unlock the throttle. Must be paired with call to wombatThrottle_lock. * * The lock is recursive. */ void wombatThrottle_unlock (wombatThrottle throttle ); /** * Set how frequently the throttle runs. * * @param interval Time in seconds between runs. */ mama_status wombatThrottle_setInterval (wombatThrottle throttle, double interval); /** * Remove an action. * * @parma action The action to remove. */ mama_status wombatThrottle_removeAction (wombatThrottle throttle, wombatThrottleAction action); #if defined(__cplusplus) } #endif #endif /* ThrottleH__ */ |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 |
/* $Id$ * * OpenMAMA: The open middleware agnostic messaging API * Copyright (C) 2011 NYSE Technologies, Inc. * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA * 02110-1301 USA */ /** * Processes messages at a specified rate (msgs/sec). If the rate is 0 the * messages are sent as quickly as possible. * Messages dispatched for throttling must provide an action callback. * */ #include #include "mama/mama.h" #include "mama/timer.h" #include "throttle.h" #include "list.h" #include "wlock.h" #include "wombat/wincompat.h" #define self ((wombatThrottleImpl*)(throttle)) #define WOMBAT_THROTTLE_DEFAULT_RATE 1000.0 /* msgs/sec */ typedef struct MsgProperties_ { void* mOwner; throttleCb mActionCb; void* mClosure1; void* mClosure2; } MsgProperties; typedef struct wombatThrottle_ { wList mMsgQueue; double mRate; /* msgs/second */ int mMsgsSentThisInterval; int mMessagesPerInterval; double mInterval; /* in seconds */ mamaQueue mQueue; mamaTimer mTimer; /* This lock is used to syncrhonize calls to timer_destroy to prevent * a race condition. */ wLock mTimerLock; } wombatThrottleImpl; /* forward declarations */ static void MAMACALLTYPE wombatThrottle_timerCB (mamaTimer timer, void *throttle); static void wombatThrottle_cleanUp (wombatThrottle throttle); static int wombatThrottle_sendQueuedMessage (wombatThrottle throttle); mama_status wombatThrottle_allocate (wombatThrottle *throttle) { wombatThrottleImpl *impl = (wombatThrottleImpl*) malloc (sizeof (wombatThrottleImpl)); if (impl == NULL) return (MAMA_STATUS_NOMEM); memset (impl, 0, sizeof (wombatThrottleImpl)); impl->mMsgQueue = list_create (sizeof (MsgProperties)); impl->mInterval = 0.1; /* Create the timer lock. */ impl->mTimerLock = wlock_create (); /* dispatchThread wait ()s immediately after creation because * mRate == 0.0 Call setRate () last to signal () it. * setRate () effectively toggles throttle on and off */ wombatThrottle_setRate (impl, WOMBAT_THROTTLE_DEFAULT_RATE); *throttle = (wombatThrottle)impl; return MAMA_STATUS_OK; } mama_status wombatThrottle_create (wombatThrottle throttle, mamaQueue queue) { self->mQueue = queue; return MAMA_STATUS_OK; } mama_status wombatThrottle_destroy (wombatThrottle throttle) { if (self == NULL) return MAMA_STATUS_OK; /* Acquire the lock before destroying the timer. */ wlock_lock (self->mTimerLock); /* Destroy the timer. */ if (self->mTimer != NULL) { mamaTimer_destroy (self->mTimer); self->mTimer = NULL; } /* Release the lock. */ wlock_unlock (self->mTimerLock); /* Destroy the timer lock. */ if (self->mTimerLock != NULL) { wlock_destroy (self->mTimerLock); self->mTimerLock = NULL; } wombatThrottle_cleanUp (throttle); return MAMA_STATUS_OK; } mama_status wombatThrottle_setInterval (wombatThrottle throttle, double interval) { if (self == NULL) return MAMA_STATUS_NULL_ARG; self->mInterval = interval; return MAMA_STATUS_OK; } static void wombatThrottle_cleanUp (wombatThrottle throttle) { list_destroy (self->mMsgQueue, NULL, NULL); bzero (self, sizeof (wombatThrottleImpl)); free (self); } double wombatThrottle_getRate (wombatThrottle throttle) { return self->mRate; } void wombatThrottle_setRate (wombatThrottle throttle, double rate) { if (rate < 0.0) rate = 0.0; self->mRate = rate; /* calc interval size (in microsecs) and #msgs/interval based on mRate. * Usually the interval size (mInterval) will be .1 sec. */ if (self->mRate == 0.0) { /* mRate == 0.0 means throttle is disabled. * We want to process any/all remaining jobs without throttling. * The dispatchThread will do so when it sees mRate == 0.0 * However, if the thread is in the middle of a normal * cycle, it will complete the cycle and possibly sleep before * it notices mRate == 0. Thus, disabling throttle won't always * have an instant effect. This could be easily changed. */ return; } if (self->mRate < 10) { self->mInterval = 1.0; /* 1 second */ self->mMessagesPerInterval = self->mRate; } else { self->mMessagesPerInterval = (int) (self->mRate * self->mInterval); if (self->mMessagesPerInterval <= 0) self->mMessagesPerInterval = 1; } } mama_status wombatThrottle_dispatch (wombatThrottle throttle, void *owner, throttleCb actionCb, void *closure1, void *closure2, int immediate, wombatThrottleAction *handle) { MsgProperties* info = NULL; if (self->mRate <= 0.0) { /* throttle is not in use, execute immediately */ mama_log (MAMA_LOG_LEVEL_FINEST, "wombatThrottle_dispatch (): " "no throttle; triggering action."); actionCb (closure1, closure2); } else { list_lock (self->mMsgQueue); info = (MsgProperties*)list_allocate_element (self->mMsgQueue); /* Acquire the lock before creating the timer. */ wlock_lock (self->mTimerLock); /* Create the timer. */ if (self->mTimer == NULL) { mamaTimer_create (&self->mTimer, self->mQueue, wombatThrottle_timerCB, self->mInterval, self); mama_log (MAMA_LOG_LEVEL_FINEST, "wombatThrottle_dispatch (): " "creating throttle timer (%p).", self->mTimer); } /* Release the lock. */ wlock_unlock (self->mTimerLock); if (info == NULL) { list_unlock (self->mMsgQueue); return (MAMA_STATUS_NOMEM); } info->mOwner = owner; info->mActionCb = actionCb; info->mClosure1 = closure1; info->mClosure2 = closure2; if (handle != NULL) { *handle = (wombatThrottleAction)info; } if (immediate) { list_push_front (self->mMsgQueue, info); } else { list_push_back (self->mMsgQueue, info); } list_unlock (self->mMsgQueue); } return MAMA_STATUS_OK; } static void wombatThrottle_dispatchMessagesUntilQuota (wombatThrottle throttle) { /* Send queued messages until we reach the limit for this * interval or empty the queue. */ while (self->mMsgsSentThisInterval++ < self->mMessagesPerInterval) { if (!wombatThrottle_sendQueuedMessage (throttle)) { /* msg queue is empty, nothing left to do */ return; } } } /** * Removes the oldest job (message) from the queue and "runs" it. * * @return 1 if a job was processed, 0 if the queue was empty. */ static int wombatThrottle_sendQueuedMessage (wombatThrottle throttle) { MsgProperties *info; list_lock (self->mMsgQueue); if (NULL != (info = list_pop_front (self->mMsgQueue))) { /* Process a throttled message.*/ info->mActionCb (info->mClosure1, info->mClosure2); list_free_element (self->mMsgQueue, info); list_unlock (self->mMsgQueue); return 1; } list_unlock (self->mMsgQueue); /* We did not send a message. */ return 0; } typedef struct biclosure_ { wombatThrottleImpl *impl; void* owner; } biclosure; static int gRemoveCount = 0; static void removeMessagesForOwnerCb (wList list, void *element, void *closure) { MsgProperties *info = (MsgProperties*)element; biclosure* pair = (biclosure*)closure; if (pair->owner == info->mOwner) { list_remove_element (pair->impl->mMsgQueue, element); list_free_element (pair->impl->mMsgQueue, element); gRemoveCount++; } } mama_status wombatThrottle_removeMessagesForOwner (wombatThrottle throttle, void *owner) { biclosure closure; if (NULL==throttle) return MAMA_STATUS_NULL_ARG; closure.impl = self; closure.owner = owner; gRemoveCount = 0; list_for_each (self->mMsgQueue, removeMessagesForOwnerCb, (void*)(&closure)); mama_log (MAMA_LOG_LEVEL_FINE, "wombatThrottle_removeMessagesForOwner (): " "%d Messages removed from queue.", gRemoveCount); return MAMA_STATUS_OK; } static void removeMessagesFromListCb (wList list, void *element, void *throttle) { wombatThrottleAction *action = (wombatThrottleAction*)element; list_remove_element (self->mMsgQueue, *action); list_free_element (self->mMsgQueue, *action); gRemoveCount++; } mama_status wombatThrottle_removeMessagesFromList (wombatThrottle throttle, wList list) { gRemoveCount = 0; list_lock (self->mMsgQueue); list_for_each (list, removeMessagesFromListCb, self); mama_log (MAMA_LOG_LEVEL_FINE, "wombatThrottle_removeMessagesFromList (): " "%d Messages removed from queue list.", gRemoveCount); list_unlock (self->mMsgQueue); return MAMA_STATUS_OK; } mama_status wombatThrottle_removeAction (wombatThrottle throttle, wombatThrottleAction action) { list_remove_element (self->mMsgQueue, action); list_free_element (self->mMsgQueue, action); return MAMA_STATUS_OK; } /** * Runs all queued tasks up to a fixed #/cycle. The quota and cycle * duration are calculated in #wombatThrottle_setRate. After quota * is reached for the current cycle, thread sleeps for the remainder. * When the queue is empty, dispatchThread wait ()s. */ static void MAMACALLTYPE wombatThrottle_timerCB (mamaTimer timer, void *throttle) { /* If mRate == 0, throttle has been deactivated. Finish any remaining * jobs in queue, and wait. If mRate is set again before we finish * purging the queue, we exit the while and don't call wait () */ while (self->mRate == 0.0 && wombatThrottle_sendQueuedMessage (throttle)) { } self->mMsgsSentThisInterval = 0; if (list_size (self->mMsgQueue) == 0) { /* Acquire the lock before destroying the timer. */ wlock_lock (self->mTimerLock); /* Destroy the timer. */ if (self->mTimer != NULL) { mamaTimer_destroy (self->mTimer); self->mTimer = NULL; } /* Release the lock. */ wlock_unlock (self->mTimerLock); } wombatThrottle_dispatchMessagesUntilQuota (throttle); } void wombatThrottle_lock (wombatThrottle throttle) { list_lock (self->mMsgQueue); } void wombatThrottle_unlock (wombatThrottle throttle) { list_unlock (self->mMsgQueue); } |
2.
NYSE와 같은 해외 거래소들이 왜 자신들의 저작물을 오픈소스로 공개할까요? 자본시장내에서 IT와 관련한 주도권을 획득하기 위함입니다. IT주도권은 거래소의 영향력으로 이어지고 거래소의 수익에도 이어집니다. 구글이 안드로이드를 공개하여 스마트폰 생태계에서 주도권을 얻는 것과 같은 이치입니다. Exture+를 개발하면서 Infiniband와 관련한 라이브러리를 개발하였다고 합니다. 한국거래소나 코스콤의 중요한 저작물입니다. 앞으로 한국거래소나 코스콤이 어떻게 오픈소스정책을 펴나갈지 알 수 없지만 오픈소스 프로젝트화하면 어떨까 생각합니다. 목표는 RDMA의 기술적 주도권을 확보하는 것이겠죠.
아래는 코스콤 오픈소스 샵이 ‘자본시장IT의 미래 오픈소스에서 답을 구하라’라는 제목으로 OpenTechnet Summit 2013에서 발표한 자료입니다.
아래는 Black Duck과 North Bridge라는 회사가 오픈소소의 미래(Future of Open Source Survey)라는 이름으로 설문조사한 결과입니다. 한국자료는 아니지만.