/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
using QuantConnect.Brokerages;
using QuantConnect.Data;
using QuantConnect.Interfaces;
using QuantConnect.Logging;
using QuantConnect.Orders;
using QuantConnect.Securities;
using QuantConnect.Util;
namespace QuantConnect.Tests.Brokerages
{
public abstract class BrokerageTests
{
// ideally this class would be abstract, but I wanted to keep the order test cases here which use the
// various parameters required from derived types
private IBrokerage _brokerage;
private OrderProvider _orderProvider;
private SecurityProvider _securityProvider;
protected ManualResetEvent OrderFillEvent { get; } = new ManualResetEvent(false);
#region Test initialization and cleanup
[SetUp]
public void Setup()
{
Log.LogHandler = new NUnitLogHandler();
Log.Trace("");
Log.Trace("");
Log.Trace("--- SETUP ---");
Log.Trace("");
Log.Trace("");
// we want to regenerate these for each test
_brokerage = null;
_orderProvider = null;
_securityProvider = null;
Thread.Sleep(1000);
CancelOpenOrders();
LiquidateHoldings();
Thread.Sleep(1000);
}
[TearDown]
public void Teardown()
{
try
{
Log.Trace("");
Log.Trace("");
Log.Trace("--- TEARDOWN ---");
Log.Trace("");
Log.Trace("");
Thread.Sleep(1000);
CancelOpenOrders();
LiquidateHoldings();
Thread.Sleep(1000);
}
finally
{
if (_brokerage != null)
{
DisposeBrokerage(_brokerage);
}
OrderFillEvent.Reset();
}
}
public IBrokerage Brokerage
{
get
{
if (_brokerage == null)
{
_brokerage = InitializeBrokerage();
}
return _brokerage;
}
}
private IBrokerage InitializeBrokerage()
{
Log.Trace("");
Log.Trace("- INITIALIZING BROKERAGE -");
Log.Trace("");
var brokerage = CreateBrokerage(OrderProvider, SecurityProvider);
brokerage.Connect();
if (!brokerage.IsConnected)
{
Assert.Fail("Failed to connect to brokerage");
}
Log.Trace("");
Log.Trace("GET OPEN ORDERS");
Log.Trace("");
foreach (var openOrder in brokerage.GetOpenOrders())
{
OrderProvider.Add(openOrder);
}
Log.Trace("");
Log.Trace("GET ACCOUNT HOLDINGS");
Log.Trace("");
foreach (var accountHolding in brokerage.GetAccountHoldings())
{
// these securities don't need to be real, just used for the ISecurityProvider impl, required
// by brokerages to track holdings
var security = SecurityProvider.GetSecurity(accountHolding.Symbol);
security.Holdings.SetHoldings(accountHolding.AveragePrice, accountHolding.Quantity);
}
brokerage.OrdersStatusChanged += HandleFillEvents;
brokerage.OrderIdChanged += HandleOrderIdChangedEvents;
return brokerage;
}
///
/// Handles the event triggered when a brokerage order ID has changed.
/// Logs the event and forwards it to the order provider for further processing.
///
///
/// The sender of the event (unused).
///
///
/// The event data containing the updated order ID and brokerage IDs.
///
private void HandleOrderIdChangedEvents(object _, BrokerageOrderIdChangedEvent brokerageOrderIdChangedEvent)
{
Log.Trace("");
Log.Trace($"ORDER ID CHANGED: {brokerageOrderIdChangedEvent}");
Log.Trace("");
OrderProvider.HandlerBrokerageOrderIdChangedEvent(brokerageOrderIdChangedEvent);
}
private void HandleFillEvents(object sender, List ordeEvents)
{
Log.Trace("");
Log.Trace($"ORDER STATUS CHANGED: {string.Join(",", ordeEvents.Select(x => x.ToString()))}");
Log.Trace("");
var orderEvent = ordeEvents[0];
// we need to keep this maintained properly
if (orderEvent.Status == OrderStatus.Filled || orderEvent.Status == OrderStatus.PartiallyFilled)
{
Log.Trace("FILL EVENT: " + orderEvent.FillQuantity + " units of " + orderEvent.Symbol.ToString());
var eventFillPrice = orderEvent.FillPrice;
var eventFillQuantity = orderEvent.FillQuantity;
Assert.Greater(eventFillPrice, 0m);
switch (orderEvent.Direction)
{
case OrderDirection.Buy:
Assert.Greater(eventFillQuantity, 0m);
break;
case OrderDirection.Sell:
Assert.Less(eventFillQuantity, 0m);
break;
default:
throw new ArgumentException($"{nameof(BrokerageTests)}.{nameof(HandleFillEvents)}: Not Recognize order Event Direction = {orderEvent.Direction}");
}
var holding = SecurityProvider.GetSecurity(orderEvent.Symbol).Holdings;
holding.SetHoldings(eventFillPrice, holding.Quantity + eventFillQuantity);
Log.Trace("--HOLDINGS: " + _securityProvider[orderEvent.Symbol].Holdings);
}
// update order mapping
var order = _orderProvider.GetOrderById(orderEvent.OrderId);
order.Status = orderEvent.Status;
if (orderEvent.Status == OrderStatus.Filled)
{
// set the event after we actually update the order status
OrderFillEvent.Set();
}
}
protected virtual BrokerageName BrokerageName { get; set; } = BrokerageName.Default;
public OrderProvider OrderProvider
{
get { return _orderProvider ?? (_orderProvider = new OrderProvider()); }
}
public SecurityProvider SecurityProvider
{
get { return _securityProvider ?? (_securityProvider = new SecurityProvider(new(), BrokerageName, OrderProvider)); }
}
///
/// Creates the brokerage under test and connects it
///
/// A connected brokerage instance
protected abstract IBrokerage CreateBrokerage(IOrderProvider orderProvider, ISecurityProvider securityProvider);
///
/// Disposes of the brokerage and any external resources started in order to create it
///
/// The brokerage instance to be disposed of
protected virtual void DisposeBrokerage(IBrokerage brokerage)
{
brokerage.OrdersStatusChanged -= HandleFillEvents;
brokerage.OrderIdChanged -= HandleOrderIdChangedEvents;
brokerage.Disconnect();
brokerage.DisposeSafely();
}
///
/// This is used to ensure each test starts with a clean, known state.
///
protected void LiquidateHoldings()
{
Log.Trace("");
Log.Trace("LIQUIDATE HOLDINGS");
Log.Trace("");
var holdings = Brokerage.GetAccountHoldings();
foreach (var holding in holdings)
{
if (holding.Quantity == 0) continue;
Log.Trace("Liquidating: " + holding);
var order = GetMarketOrder(holding.Symbol, -holding.Quantity);
PlaceOrderWaitForStatus(order, OrderStatus.Filled);
}
}
protected void CancelOpenOrders()
{
Log.Trace("");
Log.Trace("CANCEL OPEN ORDERS");
Log.Trace("");
var openOrders = Brokerage.GetOpenOrders();
foreach (var openOrder in openOrders)
{
Log.Trace("Canceling: " + openOrder);
Brokerage.CancelOrder(openOrder);
}
}
#endregion
///
/// Gets the symbol to be traded, must be shortable
///
protected abstract Symbol Symbol { get; }
///
/// Gets the security type associated with the
///
protected abstract SecurityType SecurityType { get; }
///
/// Returns whether or not the brokers order methods implementation are async
///
protected abstract bool IsAsync();
///
/// Returns whether or not the brokers order cancel method implementation is async
///
protected virtual bool IsCancelAsync()
{
return IsAsync();
}
///
/// Gets the current market price of the specified security
///
protected abstract decimal GetAskPrice(Symbol symbol);
///
/// Gets the default order quantity
///
protected virtual decimal GetDefaultQuantity()
{
return 1;
}
[Test]
public void IsConnected()
{
Assert.IsTrue(Brokerage.IsConnected);
}
public virtual void CancelOrders(OrderTestParameters parameters)
{
const int secondsTimeout = 20;
Log.Trace("");
Log.Trace("CANCEL ORDERS");
Log.Trace("");
var order = PlaceOrderWaitForStatus(parameters.CreateLongOrder(GetDefaultQuantity()), parameters.ExpectedStatus);
using var canceledOrderStatusEvent = new ManualResetEvent(false);
EventHandler> orderStatusCallback = (sender, fills) =>
{
order.Status = fills.First().Status;
if (fills[0].Status == OrderStatus.Canceled)
{
canceledOrderStatusEvent.Set();
}
};
Brokerage.OrdersStatusChanged += orderStatusCallback;
var cancelResult = false;
try
{
cancelResult = Brokerage.CancelOrder(order);
}
catch (Exception exception)
{
Log.Error(exception);
}
Assert.AreEqual(IsCancelAsync() || parameters.ExpectedCancellationResult, cancelResult);
if (parameters.ExpectedCancellationResult)
{
// We expect the OrderStatus.Canceled event
canceledOrderStatusEvent.WaitOneAssertFail(1000 * secondsTimeout, "Order timedout to cancel");
}
var openOrders = Brokerage.GetOpenOrders();
var cancelledOrder = openOrders.FirstOrDefault(x => x.Id == order.Id);
Assert.IsNull(cancelledOrder);
canceledOrderStatusEvent.Reset();
var cancelResultSecondTime = false;
try
{
cancelResultSecondTime = Brokerage.CancelOrder(order);
}
catch (Exception exception)
{
Log.Error(exception);
}
Assert.AreEqual(IsCancelAsync(), cancelResultSecondTime);
// We do NOT expect the OrderStatus.Canceled event
Assert.IsFalse(canceledOrderStatusEvent.WaitOne(new TimeSpan(0, 0, 10)));
Brokerage.OrdersStatusChanged -= orderStatusCallback;
}
public virtual void LongFromZero(OrderTestParameters parameters)
{
Log.Trace("");
Log.Trace("LONG FROM ZERO");
Log.Trace("");
PlaceOrderWaitForStatus(parameters.CreateLongOrder(GetDefaultQuantity()), parameters.ExpectedStatus);
}
public virtual void CloseFromLong(OrderTestParameters parameters)
{
Log.Trace("");
Log.Trace("CLOSE FROM LONG");
Log.Trace("");
// first go long
PlaceOrderWaitForStatus(parameters.CreateLongMarketOrder(GetDefaultQuantity()), OrderStatus.Filled);
// now close it
PlaceOrderWaitForStatus(parameters.CreateShortOrder(GetDefaultQuantity()), parameters.ExpectedStatus);
}
public virtual void ShortFromZero(OrderTestParameters parameters)
{
Log.Trace("");
Log.Trace("SHORT FROM ZERO");
Log.Trace("");
PlaceOrderWaitForStatus(parameters.CreateShortOrder(GetDefaultQuantity()), parameters.ExpectedStatus);
}
public virtual void CloseFromShort(OrderTestParameters parameters)
{
Log.Trace("");
Log.Trace("CLOSE FROM SHORT");
Log.Trace("");
// first go short
PlaceOrderWaitForStatus(parameters.CreateShortMarketOrder(GetDefaultQuantity()), OrderStatus.Filled);
// now close it
PlaceOrderWaitForStatus(parameters.CreateLongOrder(GetDefaultQuantity()), parameters.ExpectedStatus);
}
public virtual void ShortFromLong(OrderTestParameters parameters)
{
Log.Trace("");
Log.Trace("SHORT FROM LONG");
Log.Trace("");
// first go long
PlaceOrderWaitForStatus(parameters.CreateLongMarketOrder(GetDefaultQuantity()));
// now go net short
var order = PlaceOrderWaitForStatus(parameters.CreateShortOrder(2 * GetDefaultQuantity()), parameters.ExpectedStatus);
if (parameters.ModifyUntilFilled)
{
ModifyOrderUntilFilled(order, parameters);
}
}
public virtual void LongFromShort(OrderTestParameters parameters)
{
Log.Trace("");
Log.Trace("LONG FROM SHORT");
Log.Trace("");
// first fo short
PlaceOrderWaitForStatus(parameters.CreateShortMarketOrder(-GetDefaultQuantity()), OrderStatus.Filled);
// now go long
var order = PlaceOrderWaitForStatus(parameters.CreateLongOrder(2 * GetDefaultQuantity()), parameters.ExpectedStatus);
if (parameters.ModifyUntilFilled)
{
ModifyOrderUntilFilled(order, parameters);
}
}
///
/// Places a long order, updates it, and then cancels it. Verifies that each operation completes successfully.
///
/// The parameters for creating and managing the order.
/// The increment to add to the order quantity during the update.
/// The increment to add to the order's limit price during the update.
/// The increment to add to the order's stop price during the update.
/// Thrown if the order fails to update or cancel as expected.
public virtual void LongFromZeroUpdateAndCancel(OrderTestParameters parameters, decimal quantityIncrement = 1, decimal limitPriceIncrement = 0.01m, decimal stopPriceIncrement = 0.01m)
{
Log.Trace("");
Log.Trace("LONG FROM ZERO THEN UPDATE AND CANCEL");
Log.Trace("");
var order = PlaceOrderWaitForStatus(parameters.CreateLongOrder(GetDefaultQuantity()), parameters.ExpectedStatus);
using var updatedOrderStatusEvent = new AutoResetEvent(false);
using var canceledOrderStatusEvent = new AutoResetEvent(false);
EventHandler> brokerageOnOrdersStatusChanged = (_, orderEvents) =>
{
var eventOrderStatus = orderEvents[0].Status;
order.Status = eventOrderStatus;
switch (eventOrderStatus)
{
case OrderStatus.UpdateSubmitted:
updatedOrderStatusEvent.Set();
break;
case OrderStatus.Canceled:
canceledOrderStatusEvent.Set();
break;
}
};
Brokerage.OrdersStatusChanged += brokerageOnOrdersStatusChanged;
var newQuantity = order.Quantity + quantityIncrement;
decimal? limitPrice = order switch
{
LimitOrder lo => lo.LimitPrice,
StopLimitOrder slo => slo.LimitPrice,
LimitIfTouchedOrder lito => lito.LimitPrice,
_ => null
};
decimal? stopPrice = order switch
{
StopMarketOrder smo => smo.StopPrice,
StopLimitOrder slo => slo.StopPrice,
_ => null
};
decimal? newLimitPrice = limitPrice.HasValue ? limitPrice.Value + limitPriceIncrement : null;
decimal? newStopPrice = stopPrice.HasValue ? stopPrice.Value + stopPriceIncrement : null;
Log.Trace("");
Log.Trace($"UPDATE ORDER FIELDS: \n" +
$" oldQuantity = {order.Quantity}, newQuantity = {newQuantity}\n" +
$" oldLimitPrice = {limitPrice}, newLimitPrice = {newLimitPrice}\n" +
$" oldStopPrice = {stopPrice}, newStopPrice = {newStopPrice}");
Log.Trace("");
var updateOrderFields = new UpdateOrderFields()
{
Quantity = newQuantity,
LimitPrice = newLimitPrice,
StopPrice = newStopPrice
};
order.ApplyUpdateOrderRequest(new UpdateOrderRequest(DateTime.UtcNow, order.Id, updateOrderFields));
if (!Brokerage.UpdateOrder(order) || !updatedOrderStatusEvent.WaitOne(TimeSpan.FromSeconds(5)))
{
Assert.Fail("Order is not updated well.");
}
if (!Brokerage.CancelOrder(order) || !canceledOrderStatusEvent.WaitOne(TimeSpan.FromSeconds(5)))
{
Assert.Fail("Order is not canceled well.");
}
Brokerage.OrdersStatusChanged -= brokerageOnOrdersStatusChanged;
}
[Test]
public virtual void GetCashBalanceContainsSomething()
{
Log.Trace("");
Log.Trace("GET CASH BALANCE");
Log.Trace("");
var balance = Brokerage.GetCashBalance();
Assert.IsTrue(balance.Any());
}
[Test]
public virtual void GetAccountHoldings()
{
Log.Trace("");
Log.Trace("GET ACCOUNT HOLDINGS");
Log.Trace("");
var before = Brokerage.GetAccountHoldings();
PlaceOrderWaitForStatus(GetMarketOrder(Symbol, GetDefaultQuantity()));
Thread.Sleep(3000);
var after = Brokerage.GetAccountHoldings();
var beforeHoldings = before.FirstOrDefault(x => x.Symbol == Symbol);
var afterHoldings = after.FirstOrDefault(x => x.Symbol == Symbol);
var beforeQuantity = beforeHoldings == null ? 0 : beforeHoldings.Quantity;
var afterQuantity = afterHoldings == null ? 0 : afterHoldings.Quantity;
Assert.AreEqual(GetDefaultQuantity(), afterQuantity - beforeQuantity);
}
[Test, Explicit("This test requires reading the output and selection of a low volume security for the Brokerage")]
public void PartialFills()
{
using var manualResetEvent = new ManualResetEvent(false);
var qty = 1000000m;
var remaining = qty;
var sync = new object();
Brokerage.OrdersStatusChanged += (sender, orderEvents) =>
{
lock (sync)
{
var orderEvent = orderEvents[0];
remaining -= orderEvent.FillQuantity;
Log.Trace("Remaining: " + remaining + " FillQuantity: " + orderEvent.FillQuantity);
if (orderEvent.Status == OrderStatus.Filled)
{
manualResetEvent.Set();
}
}
};
// pick a security with low, but some, volume
var symbol = Symbols.EURUSD;
var order = GetMarketOrder(symbol, qty);
OrderProvider.Add(order);
Brokerage.PlaceOrder(order);
// pause for a while to wait for fills to come in
manualResetEvent.WaitOne(2500);
manualResetEvent.WaitOne(2500);
manualResetEvent.WaitOne(2500);
Log.Trace("Remaining: " + remaining);
Assert.AreEqual(0, remaining);
}
///
/// Updates the specified order in the brokerage until it fills or reaches a timeout
///
/// The order to be modified
/// The order test parameters that define how to modify the order
/// Maximum amount of time to wait until the order fills
protected virtual void ModifyOrderUntilFilled(Order order, OrderTestParameters parameters, double secondsTimeout = 90)
{
if (order.Status == OrderStatus.Filled)
{
return;
}
EventHandler> brokerageOnOrdersStatusChanged = (sender, args) =>
{
var orderEvent = args[0];
order.Status = orderEvent.Status;
if (orderEvent.Status == OrderStatus.Canceled || orderEvent.Status == OrderStatus.Invalid)
{
Log.Trace("ModifyOrderUntilFilled(): " + order);
Assert.Fail("Unexpected order status: " + orderEvent.Status);
}
};
Brokerage.OrdersStatusChanged += brokerageOnOrdersStatusChanged;
Log.Trace("");
Log.Trace("MODIFY UNTIL FILLED: " + order);
Log.Trace("");
var stopwatch = Stopwatch.StartNew();
while (!order.Status.IsClosed() && !OrderFillEvent.WaitOne(3000) && stopwatch.Elapsed.TotalSeconds < secondsTimeout)
{
OrderFillEvent.Reset();
if (order.Status == OrderStatus.PartiallyFilled) continue;
var marketPrice = GetAskPrice(order.Symbol);
Log.Trace("BrokerageTests.ModifyOrderUntilFilled(): Ask: " + marketPrice);
var updateOrder = parameters.ModifyOrderToFill(Brokerage, order, marketPrice);
if (updateOrder)
{
if (order.Status.IsClosed())
{
break;
}
Log.Trace("BrokerageTests.ModifyOrderUntilFilled(): " + order);
if (!Brokerage.UpdateOrder(order))
{
// could be filling already, partial fill
}
}
}
Brokerage.OrdersStatusChanged -= brokerageOnOrdersStatusChanged;
Assert.AreEqual(OrderStatus.Filled, order.Status, $"Brokerage failed to update the order: {order.Status}");
}
///
/// Places the specified order with the brokerage and wait until we get the back via an OrdersStatusChanged event.
/// This function handles adding the order to the instance as well as incrementing the order ID.
///
/// The order to be submitted
/// The status to wait for
/// Maximum amount of time to wait for
/// Allow failed order submission
/// The same order that was submitted.
protected Order PlaceOrderWaitForStatus(Order order, OrderStatus expectedStatus = OrderStatus.Filled,
double secondsTimeout = 30.0, bool allowFailedSubmission = false)
{
using var requiredStatusEvent = new ManualResetEvent(false);
using var desiredStatusEvent = new ManualResetEvent(false);
EventHandler> brokerageOnOrdersStatusChanged = (sender, args) =>
{
var orderEvent = args[0];
order.Status = orderEvent.Status;
// no matter what, every order should fire at least one of these
if (orderEvent.Status == OrderStatus.Submitted || orderEvent.Status == OrderStatus.Invalid)
{
Log.Trace("");
Log.Trace("SUBMITTED: " + orderEvent);
Log.Trace("");
try
{
requiredStatusEvent.Set();
}
catch (ObjectDisposedException)
{
}
}
// make sure we fire the status we're expecting
if (orderEvent.Status == expectedStatus)
{
Log.Trace("");
Log.Trace("EXPECTED: " + orderEvent);
Log.Trace("");
try
{
desiredStatusEvent.Set();
}
catch (ObjectDisposedException)
{
}
}
};
Brokerage.OrdersStatusChanged += brokerageOnOrdersStatusChanged;
OrderFillEvent.Reset();
OrderProvider.Add(order);
if (!Brokerage.PlaceOrder(order) && !allowFailedSubmission)
{
Assert.Fail("Brokerage failed to place the order: " + order);
}
// This is due to IB simulating stop orders https://www.interactivebrokers.com/en/trading/orders/stop.php
// which causes the Status.Submitted order event to never be set
bool assertOrderEventStatus = !(Brokerage.Name == "Interactive Brokers Brokerage"
&& new[] { OrderType.StopMarket, OrderType.StopLimit }.Contains(order.Type));
if (assertOrderEventStatus)
{
requiredStatusEvent.WaitOneAssertFail((int)(1000 * secondsTimeout), "Expected every order to fire a submitted or invalid status event");
desiredStatusEvent.WaitOneAssertFail((int)(1000 * secondsTimeout), "OrderStatus " + expectedStatus + " was not encountered within the timeout. Order Id:" + order.Id);
}
else
{
requiredStatusEvent.WaitOne((int)(1000 * secondsTimeout));
}
Brokerage.OrdersStatusChanged -= brokerageOnOrdersStatusChanged;
return order;
}
protected static SubscriptionDataConfig GetSubscriptionDataConfig(Symbol symbol, Resolution resolution)
{
return new SubscriptionDataConfig(
typeof(T),
symbol,
resolution,
TimeZones.Utc,
TimeZones.Utc,
true,
true,
false);
}
protected static void ProcessFeed(IEnumerator enumerator, CancellationTokenSource cancellationToken, Action callback = null)
{
Task.Factory.StartNew(() =>
{
try
{
while (enumerator.MoveNext() && !cancellationToken.IsCancellationRequested)
{
BaseData tick = enumerator.Current;
if (callback != null)
{
callback.Invoke(tick);
}
}
}
catch (AssertionException)
{
throw;
}
catch (Exception err)
{
Log.Error(err.Message);
}
}, cancellationToken.Token);
}
private MarketOrder GetMarketOrder(Symbol symbol, decimal quantity)
{
var properties = SymbolPropertiesDatabase.FromDataFolder().GetSymbolProperties(symbol.ID.Market, symbol, symbol.SecurityType, Brokerage?.AccountBaseCurrency ?? Currencies.USD);
return new MarketOrder(symbol, quantity, DateTime.UtcNow) { Id = 1, PriceCurrency = properties.QuoteCurrency };
}
}
}