/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Linq; using Newtonsoft.Json; using System.Threading; using QuantConnect.Data; using QuantConnect.Orders; using QuantConnect.Logging; using System.Threading.Tasks; using QuantConnect.Interfaces; using QuantConnect.Securities; using QuantConnect.Orders.Fees; using System.Collections.Generic; using System.Collections.Concurrent; using QuantConnect.Brokerages.CrossZero; namespace QuantConnect.Brokerages { /// /// Represents the base Brokerage implementation. This provides logging on brokerage events. /// public abstract class Brokerage : IBrokerage { // 7:45 AM (New York time zone) private static readonly TimeSpan LiveBrokerageCashSyncTime = new TimeSpan(7, 45, 0); private readonly object _performCashSyncReentranceGuard = new object(); private bool _syncedLiveBrokerageCashToday = true; private long _lastSyncTimeTicks = DateTime.UtcNow.Ticks; /// /// Event that fires each time the brokerage order id changes /// public event EventHandler OrderIdChanged; /// /// Event that fires each time the status for a list of orders change /// public event EventHandler> OrdersStatusChanged; /// /// Event that fires each time an order is updated in the brokerage side /// /// /// These are not status changes but mainly price changes, like the stop price of a trailing stop order /// public event EventHandler OrderUpdated; /// /// Event that fires each time a short option position is assigned /// public event EventHandler OptionPositionAssigned; /// /// Event that fires each time an option position has changed /// public event EventHandler OptionNotification; /// /// Event that fires each time there's a brokerage side generated order /// public event EventHandler NewBrokerageOrderNotification; /// /// Event that fires each time a delisting occurs /// public event EventHandler DelistingNotification; /// /// Event that fires each time a user's brokerage account is changed /// public event EventHandler AccountChanged; /// /// Event that fires when an error is encountered in the brokerage /// public event EventHandler Message; /// /// Gets the name of the brokerage /// public string Name { get; } /// /// Returns true if we're currently connected to the broker /// public abstract bool IsConnected { get; } /// /// Enables or disables concurrent processing of messages to and from the brokerage. /// public bool ConcurrencyEnabled { get; set; } /// /// Creates a new Brokerage instance with the specified name /// /// The name of the brokerage protected Brokerage(string name) { Name = name; } /// /// Places a new order and assigns a new broker ID to the order /// /// The order to be placed /// True if the request for a new order has been placed, false otherwise public abstract bool PlaceOrder(Order order); /// /// Updates the order with the same id /// /// The new order information /// True if the request was made for the order to be updated, false otherwise public abstract bool UpdateOrder(Order order); /// /// Cancels the order with the specified ID /// /// The order to cancel /// True if the request was made for the order to be canceled, false otherwise public abstract bool CancelOrder(Order order); /// /// Connects the client to the broker's remote servers /// public abstract void Connect(); /// /// Disconnects the client from the broker's remote servers /// public abstract void Disconnect(); /// /// Dispose of the brokerage instance /// public virtual void Dispose() { // NOP } /// /// Event invocator for the OrderFilled event /// /// The list of order events protected virtual void OnOrderEvents(List orderEvents) { try { OrdersStatusChanged?.Invoke(this, orderEvents); } catch (Exception err) { Log.Error(err); } } /// /// Event invocator for the OrderFilled event /// /// The order event protected virtual void OnOrderEvent(OrderEvent e) { OnOrderEvents(new List { e }); } /// /// Event invocator for the OrderUpdated event /// /// The update event protected virtual void OnOrderUpdated(OrderUpdateEvent e) { try { OrderUpdated?.Invoke(this, e); } catch (Exception err) { Log.Error(err); } } /// /// Event invocator for the OrderIdChanged event /// /// The BrokerageOrderIdChangedEvent protected virtual void OnOrderIdChangedEvent(BrokerageOrderIdChangedEvent e) { try { OrderIdChanged?.Invoke(this, e); } catch (Exception err) { Log.Error(err); } } /// /// Event invocator for the OptionPositionAssigned event /// /// The OrderEvent protected virtual void OnOptionPositionAssigned(OrderEvent e) { try { Log.Debug("Brokerage.OptionPositionAssigned(): " + e); OptionPositionAssigned?.Invoke(this, e); } catch (Exception err) { Log.Error(err); } } /// /// Event invocator for the OptionNotification event /// /// The OptionNotification event arguments protected virtual void OnOptionNotification(OptionNotificationEventArgs e) { try { Log.Debug("Brokerage.OnOptionNotification(): " + e); OptionNotification?.Invoke(this, e); } catch (Exception err) { Log.Error(err); } } /// /// Event invocator for the NewBrokerageOrderNotification event /// /// The NewBrokerageOrderNotification event arguments protected virtual void OnNewBrokerageOrderNotification(NewBrokerageOrderNotificationEventArgs e) { try { Log.Debug("Brokerage.OnNewBrokerageOrderNotification(): " + e); NewBrokerageOrderNotification?.Invoke(this, e); } catch (Exception err) { Log.Error(err); } } /// /// Event invocator for the DelistingNotification event /// /// The DelistingNotification event arguments protected virtual void OnDelistingNotification(DelistingNotificationEventArgs e) { try { Log.Debug("Brokerage.OnDelistingNotification(): " + e); DelistingNotification?.Invoke(this, e); } catch (Exception err) { Log.Error(err); } } /// /// Event invocator for the AccountChanged event /// /// The AccountEvent protected virtual void OnAccountChanged(AccountEvent e) { try { Log.Trace($"Brokerage.OnAccountChanged(): {e}"); AccountChanged?.Invoke(this, e); } catch (Exception err) { Log.Error(err); } } /// /// Event invocator for the Message event /// /// The error protected virtual void OnMessage(BrokerageMessageEvent e) { try { if (e.Type == BrokerageMessageType.Error) { Log.Error("Brokerage.OnMessage(): " + e); } else { Log.Trace("Brokerage.OnMessage(): " + e); } Message?.Invoke(this, e); } catch (Exception err) { Log.Error(err); } } /// /// Helper method that will try to get the live holdings from the provided brokerage data collection else will default to the algorithm state /// /// Holdings will removed from the provided collection on the first call, since this method is expected to be called only /// once on initialize, after which the algorithm should use Lean accounting protected virtual List GetAccountHoldings(Dictionary brokerageData, IEnumerable securities) { if (Log.DebuggingEnabled) { Log.Debug("Brokerage.GetAccountHoldings(): starting..."); } if (brokerageData != null && brokerageData.Remove("live-holdings", out var value) && !string.IsNullOrEmpty(value)) { if (Log.DebuggingEnabled) { Log.Debug($"Brokerage.GetAccountHoldings(): raw value: {value}"); } // remove the key, we really only want to return the cached value on the first request var result = JsonConvert.DeserializeObject>(value); if (result == null) { return new List(); } Log.Trace($"Brokerage.GetAccountHoldings(): sourcing holdings from provided brokerage data, found {result.Count} entries"); return result; } return securities?.Where(security => security.Holdings.AbsoluteQuantity > 0) .OrderBy(security => security.Symbol) .Select(security => new Holding(security)).ToList() ?? new List(); } /// /// Helper method that will try to get the live cash balance from the provided brokerage data collection else will default to the algorithm state /// /// Cash balance will removed from the provided collection on the first call, since this method is expected to be called only /// once on initialize, after which the algorithm should use Lean accounting protected virtual List GetCashBalance(Dictionary brokerageData, CashBook cashBook) { if (Log.DebuggingEnabled) { Log.Debug("Brokerage.GetCashBalance(): starting..."); } if (brokerageData != null && brokerageData.Remove("live-cash-balance", out var value) && !string.IsNullOrEmpty(value)) { // remove the key, we really only want to return the cached value on the first request var result = JsonConvert.DeserializeObject>(value); if (result == null) { return new List(); } Log.Trace($"Brokerage.GetCashBalance(): sourcing cash balance from provided brokerage data, found {result.Count} entries"); return result; } return cashBook?.Select(x => new CashAmount(x.Value.Amount, x.Value.Symbol)).ToList() ?? new List(); } /// /// Gets all open orders on the account. /// NOTE: The order objects returned do not have QC order IDs. /// /// The open orders returned from IB public abstract List GetOpenOrders(); /// /// Gets all holdings for the account /// /// The current holdings from the account public abstract List GetAccountHoldings(); /// /// Gets the current cash balance for each currency held in the brokerage account /// /// The current cash balance for each currency available for trading public abstract List GetCashBalance(); /// /// Specifies whether the brokerage will instantly update account balances /// public virtual bool AccountInstantlyUpdated => false; /// /// Returns the brokerage account's base currency /// public virtual string AccountBaseCurrency { get; protected set; } /// /// Gets the history for the requested security /// /// The historical data request /// An enumerable of bars covering the span specified in the request public virtual IEnumerable GetHistory(HistoryRequest request) { return Enumerable.Empty(); } /// /// Gets the position that might result given the specified order direction and the current holdings quantity. /// This is useful for brokerages that require more specific direction information than provided by the OrderDirection enum /// (e.g. Tradier differentiates Buy/Sell and BuyToOpen/BuyToCover/SellShort/SellToClose) /// /// The order direction /// The current holdings quantity /// The order position protected static OrderPosition GetOrderPosition(OrderDirection orderDirection, decimal holdingsQuantity) { return BrokerageExtensions.GetOrderPosition(orderDirection, holdingsQuantity); } #region IBrokerageCashSynchronizer implementation /// /// Gets the date of the last sync (New York time zone) /// protected DateTime LastSyncDate => LastSyncDateTimeUtc.ConvertFromUtc(TimeZones.NewYork).Date; /// /// Gets the datetime of the last sync (UTC) /// public DateTime LastSyncDateTimeUtc => new DateTime(Interlocked.Read(ref _lastSyncTimeTicks)); /// /// Returns whether the brokerage should perform the cash synchronization /// /// The current time (UTC) /// True if the cash sync should be performed public virtual bool ShouldPerformCashSync(DateTime currentTimeUtc) { // every morning flip this switch back var currentTimeNewYork = currentTimeUtc.ConvertFromUtc(TimeZones.NewYork); if (_syncedLiveBrokerageCashToday && currentTimeNewYork.Date != LastSyncDate) { _syncedLiveBrokerageCashToday = false; } return !_syncedLiveBrokerageCashToday && currentTimeNewYork.TimeOfDay >= LiveBrokerageCashSyncTime; } /// /// Synchronizes the cashbook with the brokerage account /// /// The algorithm instance /// The current time (UTC) /// A function which returns the time elapsed since the last fill /// True if the cash sync was performed successfully public virtual bool PerformCashSync(IAlgorithm algorithm, DateTime currentTimeUtc, Func getTimeSinceLastFill) { try { // prevent reentrance in this method if (!Monitor.TryEnter(_performCashSyncReentranceGuard)) { Log.Trace("Brokerage.PerformCashSync(): Reentrant call, cash sync not performed"); return false; } Log.Trace("Brokerage.PerformCashSync(): Sync cash balance"); List balances = null; try { balances = GetCashBalance(); } catch (Exception err) { Log.Error(err, "Error in GetCashBalance:"); } // empty cash balance is valid, if there was No error/exception if (balances == null) { Log.Trace("Brokerage.PerformCashSync(): No cash balances available, cash sync not performed"); return false; } // Adds currency to the cashbook that the user might have deposited foreach (var balance in balances) { if (!algorithm.Portfolio.CashBook.ContainsKey(balance.Currency)) { Log.Trace($"Brokerage.PerformCashSync(): Unexpected cash found {balance.Currency} {balance.Amount}", true); algorithm.Portfolio.SetCash(balance.Currency, balance.Amount, 0); } } var totalPorfolioValueThreshold = algorithm.Portfolio.TotalPortfolioValue * 0.02m; // if we were returned our balances, update everything and flip our flag as having performed sync today foreach (var kvp in algorithm.Portfolio.CashBook) { var cash = kvp.Value; //update the cash if the entry if found in the balances var balanceCash = balances.Find(balance => balance.Currency == cash.Symbol); if (balanceCash != default(CashAmount)) { // compare in account currency var delta = cash.Amount - balanceCash.Amount; if (cash.ConversionRate == 0 || Math.Abs(algorithm.Portfolio.CashBook.ConvertToAccountCurrency(delta, cash.Symbol)) > totalPorfolioValueThreshold) { // log the delta between Log.Trace($"Brokerage.PerformCashSync(): {balanceCash.Currency} Delta: {delta:0.00}", true); } algorithm.Portfolio.CashBook[cash.Symbol].SetAmount(balanceCash.Amount); } else { //Set the cash amount to zero if cash entry not found in the balances Log.Trace($"Brokerage.PerformCashSync(): {cash.Symbol} was not found in brokerage cash balance, setting the amount to 0", true); algorithm.Portfolio.CashBook[cash.Symbol].SetAmount(0); } } _syncedLiveBrokerageCashToday = true; _lastSyncTimeTicks = currentTimeUtc.Ticks; } finally { Monitor.Exit(_performCashSyncReentranceGuard); } // fire off this task to check if we've had recent fills, if we have then we'll invalidate the cash sync // and do it again until we're confident in it Task.Delay(TimeSpan.FromSeconds(10)).ContinueWith(_ => { // we want to make sure this is a good value, so check for any recent fills if (getTimeSinceLastFill() <= TimeSpan.FromSeconds(20)) { // this will cause us to come back in and reset cash again until we // haven't processed a fill for +- 10 seconds of the set cash time _syncedLiveBrokerageCashToday = false; //_failedCashSyncAttempts = 0; Log.Trace("Brokerage.PerformCashSync(): Unverified cash sync - resync required."); } else { Log.Trace("Brokerage.PerformCashSync(): Verified cash sync."); algorithm.Portfolio.LogMarginInformation(); } }); return true; } #endregion #region CrossZeroOrder implementation /// /// A dictionary to store the relationship between brokerage crossing orders and Lean orer id. /// private readonly ConcurrentDictionary _leanOrderByBrokerageCrossingOrders = new(); /// /// An object used to lock the critical section in the method, /// ensuring thread safety when accessing the order collection. /// private object _lockCrossZeroObject = new(); /// /// A thread-safe dictionary that maps brokerage order IDs to their corresponding Order objects. /// /// /// This ConcurrentDictionary is used to maintain a mapping between Zero Cross brokerage order IDs and Lean Order objects. /// The dictionary is protected and read-only, ensuring that it can only be modified by the class that declares it and cannot /// be assigned a new instance after initialization. /// protected ConcurrentDictionary LeanOrderByZeroCrossBrokerageOrderId { get; } = new(); /// /// Places an order that crosses zero (transitions from a short position to a long position or vice versa) and returns the response. /// This method should be overridden in a derived class to implement brokerage-specific logic for placing such orders. /// /// The request object containing details of the cross zero order to be placed. /// /// A boolean indicating whether the order should be placed with triggering a Lean event. /// Default is true, meaning Lean events will be triggered. /// /// /// A object indicating the result of the order placement. /// /// /// Thrown if the method is not overridden in a derived class. /// protected virtual CrossZeroOrderResponse PlaceCrossZeroOrder(CrossZeroFirstOrderRequest crossZeroOrderRequest, bool isPlaceOrderWithLeanEvent = true) { throw new NotImplementedException($"{nameof(PlaceCrossZeroOrder)} method should be overridden in the derived class to handle brokerage-specific logic."); } /// /// Attempts to place an order that may cross the zero position. /// If the order needs to be split into two parts due to crossing zero, /// this method handles the split and placement accordingly. /// /// The order to be placed. Must not be null. /// The current holding quantity of the order's symbol. /// /// true if the order crosses zero and the first part was successfully placed; /// false if the first part of the order could not be placed; /// null if the order does not cross zero. /// /// /// Thrown if is null. /// protected bool? TryCrossZeroPositionOrder(Order order, decimal holdingQuantity) { if (order == null) { throw new ArgumentNullException(nameof(order), "The order parameter cannot be null."); } // do we need to split the order into two pieces? var crossesZero = BrokerageExtensions.OrderCrossesZero(holdingQuantity, order.Quantity); if (crossesZero) { // first we need an order to close out the current position var (firstOrderQuantity, secondOrderQuantity) = GetQuantityOnCrossPosition(holdingQuantity, order.Quantity); // Note: original quantity - already sell var firstOrderPartRequest = new CrossZeroFirstOrderRequest(order, order.Type, firstOrderQuantity, holdingQuantity, GetOrderPosition(order.Direction, holdingQuantity)); // we actually can't place this order until the closingOrder is filled // create another order for the rest, but we'll convert the order type to not be a stop // but a market or a limit order var secondOrderPartRequest = new CrossZeroSecondOrderRequest(order, order.Type, secondOrderQuantity, 0m, GetOrderPosition(order.Direction, 0m), firstOrderPartRequest); _leanOrderByBrokerageCrossingOrders.AddOrUpdate(order.Id, secondOrderPartRequest); CrossZeroOrderResponse response; lock (_lockCrossZeroObject) { // issue the first order to close the position response = PlaceCrossZeroOrder(firstOrderPartRequest); if (response.IsOrderPlacedSuccessfully) { var orderId = response.BrokerageOrderId; if (!order.BrokerId.Contains(orderId)) { order.BrokerId.Add(orderId); } } } if (!response.IsOrderPlacedSuccessfully) { OnOrderEvent(new OrderEvent(order, DateTime.UtcNow, OrderFee.Zero, $"{nameof(Brokerage)}: {response.Message}") { Status = OrderStatus.Invalid }); // remove the contingent order if we weren't successful in placing the first //ContingentOrderQueue contingent; _leanOrderByBrokerageCrossingOrders.TryRemove(order.Id, out _); return false; } return true; } return null; } /// /// Determines whether the given Lean order crosses zero quantity based on the initial order quantity. /// /// The Lean order to check. /// The quantity to be updated based on whether the order crosses zero. /// /// true if the Lean order does not cross zero quantity; otherwise, false. /// /// Thrown when the is null. protected bool TryGetUpdateCrossZeroOrderQuantity(Order leanOrder, out decimal quantity) { if (leanOrder == null) { throw new ArgumentNullException(nameof(leanOrder), "The provided leanOrder cannot be null."); } // Check if the order is a CrossZeroOrder. if (_leanOrderByBrokerageCrossingOrders.TryGetValue(leanOrder.Id, out var crossZeroOrderRequest)) { // If it is a CrossZeroOrder, use the first part of the quantity for the update. quantity = crossZeroOrderRequest.FirstPartCrossZeroOrder.OrderQuantity; // If the quantities of the LeanOrder do not match, return false. Don't support. if (crossZeroOrderRequest.LeanOrder.Quantity != leanOrder.Quantity) { return false; } } else { // If it is not a CrossZeroOrder, use the original order quantity. quantity = leanOrder.Quantity; } return true; } /// /// Attempts to retrieve or remove a cross-zero order based on the brokerage order ID and its filled status. /// /// The unique identifier of the brokerage order. /// The updated status of the order received from the brokerage /// /// When this method returns, contains the object associated with the given brokerage order ID, /// if the operation was successful; otherwise, null. /// This parameter is passed uninitialized. /// /// /// true if the method successfully retrieves or removes the order; otherwise, false. /// /// /// The method locks on a private object to ensure thread safety while accessing the collection of orders. /// If the order is filled, it is removed from the collection. If the order is partially filled, /// it is retrieved but not removed. If the order is not found, the method returns false. /// protected bool TryGetOrRemoveCrossZeroOrder(string brokerageOrderId, OrderStatus leanOrderStatus, out Order leanOrder) { lock (_lockCrossZeroObject) { if (LeanOrderByZeroCrossBrokerageOrderId.TryGetValue(brokerageOrderId, out leanOrder)) { switch (leanOrderStatus) { case OrderStatus.Filled: case OrderStatus.Canceled: case OrderStatus.Invalid: LeanOrderByZeroCrossBrokerageOrderId.TryRemove(brokerageOrderId, out var _); break; }; return true; } // Return false if the brokerage order ID does not correspond to a cross-zero order return false; } } /// /// Attempts to handle any remaining orders that cross the zero boundary. /// /// The order object that needs to be processed. /// The event object containing order event details. protected bool TryHandleRemainingCrossZeroOrder(Order leanOrder, OrderEvent orderEvent) { if (leanOrder != null && orderEvent != null && _leanOrderByBrokerageCrossingOrders.TryGetValue(leanOrder.Id, out var brokerageOrder)) { switch (orderEvent.Status) { case OrderStatus.Filled: // if we have a contingent that needs to be submitted then we can't respect the 'Filled' state from the order // because the Lean order hasn't been technically filled yet, so mark it as 'PartiallyFilled' orderEvent.Status = OrderStatus.PartiallyFilled; _leanOrderByBrokerageCrossingOrders.Remove(leanOrder.Id, out var _); break; case OrderStatus.Canceled: case OrderStatus.Invalid: _leanOrderByBrokerageCrossingOrders.Remove(leanOrder.Id, out var _); return false; default: return false; }; OnOrderEvent(orderEvent); Task.Run(() => { #pragma warning disable CA1031 // Do not catch general exception types try { var response = default(CrossZeroOrderResponse); lock (_lockCrossZeroObject) { Log.Trace($"{nameof(Brokerage)}.{nameof(TryHandleRemainingCrossZeroOrder)}: Submit the second part of cross order by Id:{leanOrder.Id}"); response = PlaceCrossZeroOrder(brokerageOrder, false); if (response.IsOrderPlacedSuccessfully) { // add the new brokerage id for retrieval later var orderId = response.BrokerageOrderId; if (!leanOrder.BrokerId.Contains(orderId)) { leanOrder.BrokerId.Add(orderId); } // leanOrder is a clone, here we can add the new brokerage order Id for the second part of the cross zero OnOrderIdChangedEvent(new BrokerageOrderIdChangedEvent { OrderId = leanOrder.Id, BrokerId = leanOrder.BrokerId }); LeanOrderByZeroCrossBrokerageOrderId.AddOrUpdate(orderId, leanOrder); } } if (!response.IsOrderPlacedSuccessfully) { // if we failed to place this order I don't know what to do, we've filled the first part // and failed to place the second... strange. Should we invalidate the rest of the order?? Log.Error($"{nameof(Brokerage)}.{nameof(TryHandleRemainingCrossZeroOrder)}: Failed to submit contingent order."); var message = $"{leanOrder.Symbol} Failed submitting the second part of cross order for " + $"LeanOrderId: {leanOrder.Id.ToStringInvariant()} Filled - BrokerageOrderId: {response.BrokerageOrderId}. " + $"{response.Message}"; OnMessage(new BrokerageMessageEvent(BrokerageMessageType.Warning, "CrossZeroFailed", message)); OnOrderEvent(new OrderEvent(leanOrder, DateTime.UtcNow, OrderFee.Zero) { Status = OrderStatus.Canceled }); } } catch (Exception err) { Log.Error(err); OnMessage(new BrokerageMessageEvent(BrokerageMessageType.Warning, "CrossZeroOrderError", "Error occurred submitting cross zero order: " + err.Message)); OnOrderEvent(new OrderEvent(leanOrder, DateTime.UtcNow, OrderFee.Zero) { Status = OrderStatus.Canceled }); } #pragma warning restore CA1031 // Do not catch general exception types }); return true; } return false; } /// /// Calculates the quantities needed to close the current position and establish a new position based on the provided order. /// /// The quantity currently held in the position that needs to be closed. /// The quantity defined in the new order to be established. /// /// A tuple containing: /// /// /// The quantity needed to close the current position (negative value). /// /// /// The quantity needed to establish the new position. /// /// /// private static (decimal closePostionQunatity, decimal newPositionQuantity) GetQuantityOnCrossPosition(decimal holdingQuantity, decimal orderQuantity) { // first we need an order to close out the current position var firstOrderQuantity = -holdingQuantity; var secondOrderQuantity = orderQuantity - firstOrderQuantity; return (firstOrderQuantity, secondOrderQuantity); } #endregion } }