Open API - ProtoOASpotEvent event just fires once unless...
Open API - ProtoOASpotEvent event just fires once unless...
26 Feb 2024, 00:44
I'm working a C# application and taking references from the documentation and the example on Github. I've managed to get everything up and running how I need except one part which works but does not make sense to me.
- I'm sending a ProtoOASubscribeSpotsReq and passing in the symbolIds (exactly like the sample code)
- I'm subscribing and listening to the ProtoOASpotEvent
- This event fires ONLY once and I get the data for all subscribed symbols (but only once)
- The breakpoint does not get trigged nor the console.writeline so I know it's not being triggered again
- HOWEVER, if I add in a function GetSymbolsQuote (see below) and put a foreach to call it then the ProtoOASpotEvent function gets called continuously and works as expected.
My doubt is that the channel type, reader, yeild, etc is a C# thing and nothing to do with Open API. Because the GetSymbolsQuote function technically does not do anything to Open API or the ProtoOASpotEvent event to trigger and only creates a C# channel which can be read.
Please let me know what's up. Any guidance would be appreciated, thanks!
PS - Confusion also arises because channel or no channel, the ProtoOASpotEvent should trigger everytime then only I can write it to the channel but what's up the requirement of using GetSymbolsQuote to trigger ProtoOASpotEvent I don't get.
All relevant code below.
---
async Task<ProtoOASubscribeSpotsRes> SubscribeToSpots(long[] symbolIds)
{
var taskCompletionSource = new TaskCompletionSource<ProtoOASubscribeSpotsRes>();
IDisposable? disposable = null;
if (openClient != null && settings != null)
{
disposable = openClient.OfType<ProtoOASubscribeSpotsRes>()
.Where(response => response.CtidTraderAccountId == settings.CTraderAccountId)
.Subscribe(response =>
{
//Console.WriteLine(JsonConvert.SerializeObject(response));
taskCompletionSource.SetResult(response);
disposable?.Dispose();
});
var protoOASubscribeSpotsReq = new ProtoOASubscribeSpotsReq
{
CtidTraderAccountId = settings.CTraderAccountId,
SubscribeToSpotTimestamp = true
};
protoOASubscribeSpotsReq.SymbolId.AddRange(symbolIds);
await openClient.SendMessage(protoOASubscribeSpotsReq);
return await taskCompletionSource.Task;
}
return null!;
}
void SubscribeToUpdates()
{
if(openClient != null)
{
_subscribedAccountQuoteChannels.Clear();
openClient.OfType<ProtoOASpotEvent>().Subscribe(OnSpotEvent);
openClient.OfType<ProtoOAExecutionEvent>().Subscribe(OnExecutionEvent);
openClient.OfType<ProtoErrorRes>().Subscribe(OnErrorRes);
openClient.OfType<ProtoOAErrorRes>().Subscribe(OnOaErrorRes);
openClient.OfType<ProtoOAOrderErrorEvent>().Subscribe(OnOrderErrorRes);
}
}
void OnSpotEvent(ProtoOASpotEvent spotEvent)
{
if(accountModel != null)
{
var symbol = accountModel.Symbols.FirstOrDefault(symbol => symbol.Id == spotEvent.SymbolId);
if (symbol != null)
{
double bid = symbol.Bid;
double ask = symbol.Ask;
if (spotEvent.HasBid) bid = symbol.Data.GetPriceFromRelative((long)spotEvent.Bid);
if (spotEvent.HasAsk) ask = symbol.Data.GetPriceFromRelative((long)spotEvent.Ask);
var quote = new SymbolQuote(symbol.Id, bid, ask);
symbol.OnTick(quote);
Console.WriteLine(spotEvent.Timestamp + " -> " + symbol.Name + " " + bid);
if (symbol.QuoteAsset.AssetId == accountModel.DepositAsset.AssetId && symbol.TickValue == 0)
{
symbol.TickValue = symbol.Data.GetTickValue(symbol.QuoteAsset, accountModel.DepositAsset, null);
}
else if (symbol.ConversionSymbols.Count > 0 && symbol.ConversionSymbols.All(symbol => symbol.Bid != 0))
{
symbol.TickValue = symbol.Data.GetTickValue(symbol.QuoteAsset, accountModel.DepositAsset, symbol.ConversionSymbols.Select(symbol => new Tuple<ProtoOAAsset, ProtoOAAsset, double>(symbol.BaseAsset, symbol.QuoteAsset, symbol.Bid)));
}
if (_subscribedAccountQuoteChannels.TryGetValue(spotEvent.CtidTraderAccountId, out var quotesChannel))
{
quotesChannel.Writer.TryWrite(quote);
}
}
}
}
---
async IAsyncEnumerable<SymbolQuote> GetSymbolsQuote()
{
if(settings != null)
{
var channel = Channel.CreateUnbounded<SymbolQuote>();
_subscribedAccountQuoteChannels.AddOrUpdate(settings.CTraderAccountId, channel, (key, oldChannel) => channel);
while (await channel.Reader.WaitToReadAsync())
{
while (channel.Reader.TryRead(out var quote))
{
yield return quote;
}
}
}
}
await foreach (var quote in GetSymbolsQuote())
{
//Console.WriteLine(quote.Id + " -> " + quote.Bid);
}