/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System.Collections.Generic; using QuantConnect.Interfaces; using QuantConnect.Logging; using QuantConnect.Notifications; using QuantConnect.Packets; namespace QuantConnect.Messaging { /// /// Desktop implementation of messaging system for Lean Engine /// public class EventMessagingHandler : IMessagingHandler { private AlgorithmNodePacket _job; private volatile bool _loaded; private Queue _queue; /// /// Gets or sets whether this messaging handler has any current subscribers. /// When set to false, messages won't be sent. /// public bool HasSubscribers { get; set; } /// /// Initialize the Messaging System Plugin. /// /// The parameters required for initialization public void Initialize(MessagingHandlerInitializeParameters initializeParameters) { _queue = new Queue(); ConsumerReadyEvent += () => { _loaded = true; }; } /// /// Set Loaded to true /// public void LoadingComplete() { _loaded = true; } /// /// Set the user communication channel /// /// public void SetAuthentication(AlgorithmNodePacket job) { _job = job; } #pragma warning disable 1591 public delegate void DebugEventRaised(DebugPacket packet); public event DebugEventRaised DebugEvent; public delegate void SystemDebugEventRaised(SystemDebugPacket packet); #pragma warning disable 0067 // SystemDebugEvent is not used currently; ignore the warning public event SystemDebugEventRaised SystemDebugEvent; #pragma warning restore 0067 public delegate void LogEventRaised(LogPacket packet); public event LogEventRaised LogEvent; public delegate void RuntimeErrorEventRaised(RuntimeErrorPacket packet); public event RuntimeErrorEventRaised RuntimeErrorEvent; public delegate void HandledErrorEventRaised(HandledErrorPacket packet); public event HandledErrorEventRaised HandledErrorEvent; public delegate void BacktestResultEventRaised(BacktestResultPacket packet); public event BacktestResultEventRaised BacktestResultEvent; public delegate void ConsumerReadyEventRaised(); public event ConsumerReadyEventRaised ConsumerReadyEvent; #pragma warning restore 1591 /// /// Send any message with a base type of Packet. /// public void Send(Packet packet) { //Until we're loaded queue it up if (!_loaded) { _queue.Enqueue(packet); return; } //Catch up if this is the first time while (_queue.TryDequeue(out var item)) { ProcessPacket(item); } //Finally process this new packet ProcessPacket(packet); } /// /// Send any notification with a base type of Notification. /// /// The notification to be sent. public void SendNotification(Notification notification) { if (!notification.CanSend()) { Log.Error("Messaging.SendNotification(): Send not implemented for notification of type: " + notification.GetType().Name); return; } notification.Send(); } /// /// Send any message with a base type of Packet that has been enqueued. /// public void SendEnqueuedPackets() { while (_loaded && _queue.TryDequeue(out var item)) { ProcessPacket(item); } } /// /// Packet processing implementation /// private void ProcessPacket(Packet packet) { //Packets we handled in the UX. switch (packet.Type) { case PacketType.Debug: var debug = (DebugPacket)packet; OnDebugEvent(debug); break; case PacketType.SystemDebug: var systemDebug = (SystemDebugPacket)packet; OnSystemDebugEvent(systemDebug); break; case PacketType.Log: var log = (LogPacket)packet; OnLogEvent(log); break; case PacketType.RuntimeError: var runtime = (RuntimeErrorPacket)packet; OnRuntimeErrorEvent(runtime); break; case PacketType.HandledError: var handled = (HandledErrorPacket)packet; OnHandledErrorEvent(handled); break; case PacketType.BacktestResult: var result = (BacktestResultPacket)packet; OnBacktestResultEvent(result); break; } } /// /// Raise a debug event safely /// protected virtual void OnDebugEvent(DebugPacket packet) { var handler = DebugEvent; if (handler != null) { handler(packet); } } /// /// Raise a system debug event safely /// protected virtual void OnSystemDebugEvent(SystemDebugPacket packet) { var handler = DebugEvent; if (handler != null) { handler(packet); } } /// /// Handler for consumer ready code. /// public virtual void OnConsumerReadyEvent() { var handler = ConsumerReadyEvent; if (handler != null) { handler(); } } /// /// Raise a log event safely /// protected virtual void OnLogEvent(LogPacket packet) { var handler = LogEvent; if (handler != null) { handler(packet); } } /// /// Raise a handled error event safely /// protected virtual void OnHandledErrorEvent(HandledErrorPacket packet) { var handler = HandledErrorEvent; if (handler != null) { handler(packet); } } /// /// Raise runtime error safely /// protected virtual void OnRuntimeErrorEvent(RuntimeErrorPacket packet) { var handler = RuntimeErrorEvent; if (handler != null) { handler(packet); } } /// /// Raise a backtest result event safely. /// protected virtual void OnBacktestResultEvent(BacktestResultPacket packet) { var handler = BacktestResultEvent; if (handler != null) { handler(packet); } } /// /// Dispose of any resources /// public virtual void Dispose() { } } }