/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading;
using QuantConnect.Logging;
using System.Collections.Generic;
using QuantConnect.Configuration;
namespace QuantConnect.Brokerages
{
///
/// Brokerage helper class to lock message stream while executing an action, for example placing an order
///
public class BrokerageConcurrentMessageHandler : IDisposable
where T : class
{
private readonly Action _processMessages;
private readonly Queue _messageBuffer;
private readonly ILock _lock;
private readonly ManualResetEventSlim _messagesProcessedEvent;
private readonly int _maxMessageBufferSize;
///
/// Creates a new instance
///
/// The action to call for each new message
public BrokerageConcurrentMessageHandler(Action processMessages)
: this(processMessages, false)
{
}
///
/// Creates a new instance
///
/// The action to call for each new message
/// Whether to enable concurrent order submission
public BrokerageConcurrentMessageHandler(Action processMessages, bool concurrencyEnabled)
{
_processMessages = processMessages;
_messageBuffer = new Queue();
_lock = concurrencyEnabled ? new ReaderWriterLockWrapper() : new MonitorWrapper();
_messagesProcessedEvent = new ManualResetEventSlim(false);
_maxMessageBufferSize = Config.GetInt("brokerage-concurrent-message-handler-buffer-size", 20);
}
///
/// Disposes of the resources used by this instance
///
public void Dispose()
{
_lock.Dispose();
_messagesProcessedEvent.Dispose();
}
///
/// Will process or enqueue a message for later processing it
///
/// The new message
public void HandleNewMessage(T message)
{
lock (_messageBuffer)
{
if (_lock.TryEnterReadLockImmediately())
{
try
{
ProcessMessages(message);
}
finally
{
_lock.ExitReadLock();
}
}
else if (message != default)
{
// if someone has the lock just enqueue the new message they will process any remaining messages
// if by chance they are about to free the lock, no worries, we will always process first any remaining message first see 'ProcessMessages'
_messageBuffer.Enqueue(message);
}
}
}
///
/// Lock the streaming processing while we're sending orders as sometimes they fill before the call returns.
///
public void WithLockedStream(Action code)
{
// Let's limit the amount of messages we can buffer, so we wait until
// consumers process a full queue of messages before we potentially add more
var queueIsFull = false;
lock (_messageBuffer)
{
queueIsFull = _messageBuffer.Count >= _maxMessageBufferSize;
}
if (queueIsFull)
{
_messagesProcessedEvent.Wait();
_messagesProcessedEvent.Reset();
}
_lock.EnterWriteLock();
try
{
code();
}
finally
{
// once we finish our 'code' we will process any message that come through,
// to make sure no message get's left behind (race condition between us finishing 'ProcessMessages'
// and some message being enqueued to it, we just take a lock on the buffer
lock (_messageBuffer)
{
var lockedStreams = _lock.CurrentWriteCount;
// we release the semaphore first so by the time we release '_messageBuffer' any new message is processed immediately and not enqueued
_lock.ExitWriteLock();
// only process if no other threads will process them after us
if (lockedStreams == 1)
{
ProcessMessages();
}
}
}
}
///
/// Process any pending message and the provided one if any
///
/// To be called owing the stream lock
private void ProcessMessages(T message = null)
{
try
{
if (message != null)
{
_messageBuffer.Enqueue(message);
}
// double check there isn't any pending message
while (_messageBuffer.TryDequeue(out var e))
{
try
{
_processMessages(e);
}
catch (Exception ex)
{
Log.Error(ex);
}
}
}
finally
{
_messagesProcessedEvent.Set();
}
}
private interface ILock : IDisposable
{
int CurrentWriteCount { get; }
void EnterReadLock();
void ExitReadLock();
bool TryEnterReadLockImmediately();
void EnterWriteLock();
void ExitWriteLock();
}
///
/// A simple reader/writer lock implementation that allows us to switch the meaning of read and write locks
/// so that it can be used for single reader and multiple writers scenario.
///
/// We want to allow multiple producers so, for example, a brokerage can be placing multiple orders concurrently,
/// since the transaction handler can have multiple threads processing orders.
/// But, on the other side, we need to ensure that messages are processed only when no producers are writing
/// to the stream (hence only one reader). For example, a brokerage needs the to lock the stream and
/// only handle incoming order event messages after it releases the lock, but we now support multiple streams
/// (so multiple orders) so we wait for all the current producers to release the lock before processing any messages.
///
private class ReaderWriterLockWrapper : ILock
{
private readonly ReaderWriterLockSlim _lock;
public int CurrentWriteCount => _lock.CurrentReadCount;
public ReaderWriterLockWrapper()
{
_lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
}
public void EnterReadLock() => _lock.EnterWriteLock();
public void ExitReadLock() => _lock.ExitWriteLock();
public bool TryEnterReadLockImmediately() => _lock.TryEnterWriteLock(0);
public void EnterWriteLock() => _lock.EnterReadLock();
public void ExitWriteLock() => _lock.ExitReadLock();
public void Dispose()
{
_lock.Dispose();
}
}
private class MonitorWrapper : ILock
{
private readonly object _lockObject;
private long _currentWriteCount;
public int CurrentWriteCount => (int)Interlocked.Read(ref _currentWriteCount);
public MonitorWrapper()
{
_lockObject = new object();
}
public void EnterReadLock() => Monitor.Enter(_lockObject);
public void ExitReadLock() => Monitor.Exit(_lockObject);
public bool TryEnterReadLockImmediately() => Monitor.TryEnter(_lockObject);
public void EnterWriteLock()
{
Monitor.Enter(_lockObject);
Interlocked.Exchange(ref _currentWriteCount, 1);
}
public void ExitWriteLock()
{
Interlocked.Exchange(ref _currentWriteCount, 0);
Monitor.Exit(_lockObject);
}
public void Dispose()
{
}
}
}
}