/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System.Collections.Generic;
using QuantConnect.Interfaces;
using QuantConnect.Logging;
using QuantConnect.Notifications;
using QuantConnect.Packets;
namespace QuantConnect.Messaging
{
///
/// Desktop implementation of messaging system for Lean Engine
///
public class EventMessagingHandler : IMessagingHandler
{
private AlgorithmNodePacket _job;
private volatile bool _loaded;
private Queue _queue;
///
/// Gets or sets whether this messaging handler has any current subscribers.
/// When set to false, messages won't be sent.
///
public bool HasSubscribers
{
get;
set;
}
///
/// Initialize the Messaging System Plugin.
///
/// The parameters required for initialization
public void Initialize(MessagingHandlerInitializeParameters initializeParameters)
{
_queue = new Queue();
ConsumerReadyEvent += () => { _loaded = true; };
}
///
/// Set Loaded to true
///
public void LoadingComplete()
{
_loaded = true;
}
///
/// Set the user communication channel
///
///
public void SetAuthentication(AlgorithmNodePacket job)
{
_job = job;
}
#pragma warning disable 1591
public delegate void DebugEventRaised(DebugPacket packet);
public event DebugEventRaised DebugEvent;
public delegate void SystemDebugEventRaised(SystemDebugPacket packet);
#pragma warning disable 0067 // SystemDebugEvent is not used currently; ignore the warning
public event SystemDebugEventRaised SystemDebugEvent;
#pragma warning restore 0067
public delegate void LogEventRaised(LogPacket packet);
public event LogEventRaised LogEvent;
public delegate void RuntimeErrorEventRaised(RuntimeErrorPacket packet);
public event RuntimeErrorEventRaised RuntimeErrorEvent;
public delegate void HandledErrorEventRaised(HandledErrorPacket packet);
public event HandledErrorEventRaised HandledErrorEvent;
public delegate void BacktestResultEventRaised(BacktestResultPacket packet);
public event BacktestResultEventRaised BacktestResultEvent;
public delegate void ConsumerReadyEventRaised();
public event ConsumerReadyEventRaised ConsumerReadyEvent;
#pragma warning restore 1591
///
/// Send any message with a base type of Packet.
///
public void Send(Packet packet)
{
//Until we're loaded queue it up
if (!_loaded)
{
_queue.Enqueue(packet);
return;
}
//Catch up if this is the first time
while (_queue.TryDequeue(out var item))
{
ProcessPacket(item);
}
//Finally process this new packet
ProcessPacket(packet);
}
///
/// Send any notification with a base type of Notification.
///
/// The notification to be sent.
public void SendNotification(Notification notification)
{
if (!notification.CanSend())
{
Log.Error("Messaging.SendNotification(): Send not implemented for notification of type: " + notification.GetType().Name);
return;
}
notification.Send();
}
///
/// Send any message with a base type of Packet that has been enqueued.
///
public void SendEnqueuedPackets()
{
while (_loaded && _queue.TryDequeue(out var item))
{
ProcessPacket(item);
}
}
///
/// Packet processing implementation
///
private void ProcessPacket(Packet packet)
{
//Packets we handled in the UX.
switch (packet.Type)
{
case PacketType.Debug:
var debug = (DebugPacket)packet;
OnDebugEvent(debug);
break;
case PacketType.SystemDebug:
var systemDebug = (SystemDebugPacket)packet;
OnSystemDebugEvent(systemDebug);
break;
case PacketType.Log:
var log = (LogPacket)packet;
OnLogEvent(log);
break;
case PacketType.RuntimeError:
var runtime = (RuntimeErrorPacket)packet;
OnRuntimeErrorEvent(runtime);
break;
case PacketType.HandledError:
var handled = (HandledErrorPacket)packet;
OnHandledErrorEvent(handled);
break;
case PacketType.BacktestResult:
var result = (BacktestResultPacket)packet;
OnBacktestResultEvent(result);
break;
}
}
///
/// Raise a debug event safely
///
protected virtual void OnDebugEvent(DebugPacket packet)
{
var handler = DebugEvent;
if (handler != null)
{
handler(packet);
}
}
///
/// Raise a system debug event safely
///
protected virtual void OnSystemDebugEvent(SystemDebugPacket packet)
{
var handler = DebugEvent;
if (handler != null)
{
handler(packet);
}
}
///
/// Handler for consumer ready code.
///
public virtual void OnConsumerReadyEvent()
{
var handler = ConsumerReadyEvent;
if (handler != null)
{
handler();
}
}
///
/// Raise a log event safely
///
protected virtual void OnLogEvent(LogPacket packet)
{
var handler = LogEvent;
if (handler != null)
{
handler(packet);
}
}
///
/// Raise a handled error event safely
///
protected virtual void OnHandledErrorEvent(HandledErrorPacket packet)
{
var handler = HandledErrorEvent;
if (handler != null)
{
handler(packet);
}
}
///
/// Raise runtime error safely
///
protected virtual void OnRuntimeErrorEvent(RuntimeErrorPacket packet)
{
var handler = RuntimeErrorEvent;
if (handler != null)
{
handler(packet);
}
}
///
/// Raise a backtest result event safely.
///
protected virtual void OnBacktestResultEvent(BacktestResultPacket packet)
{
var handler = BacktestResultEvent;
if (handler != null)
{
handler(packet);
}
}
///
/// Dispose of any resources
///
public virtual void Dispose()
{
}
}
}