|
| 1 | +defmodule Indicator.Ohlc do |
| 2 | + require Logger |
| 3 | + alias Core.Struct.TradeEvent |
| 4 | + |
| 5 | + @pubsub_client Application.compile_env(:core, :pubsub_client) |
| 6 | + @enforce_keys [:symbol, :start_time, :duration] |
| 7 | + defstruct [:symbol, :start_time, :duration, :open, :high, :low, :close] |
| 8 | + |
| 9 | + def process([_ | _] = ohlcs, %TradeEvent{} = trade_event) do |
| 10 | + {old_ohlcs, new_ohlcs} = merge_prices(ohlcs, trade_event.price, trade_event.trade_time) |
| 11 | + Enum.each(old_ohlcs, &maybe_broadcast/1) |
| 12 | + new_ohlcs |
| 13 | + end |
| 14 | + |
| 15 | + def process(symbol, %TradeEvent{} = trade_event) do |
| 16 | + generate_ohlcs(symbol, trade_event.price, trade_event.trade_time) |
| 17 | + end |
| 18 | + |
| 19 | + def merge_prices(ohlcs, price, trade_time) do |
| 20 | + results = ohlcs |> Enum.map(&merge_price(&1, price, trade_time)) |
| 21 | + |
| 22 | + { |
| 23 | + results |> Enum.map(&elem(&1, 0)) |> Enum.filter(& &1), |
| 24 | + results |> Enum.map(&elem(&1, 1)) |
| 25 | + } |
| 26 | + end |
| 27 | + |
| 28 | + def merge_price(%__MODULE__{} = ohlc, price, trade_time) do |
| 29 | + if within_current_timeframe(ohlc.start_time, ohlc.duration, trade_time) do |
| 30 | + {nil, %{ohlc | low: min(ohlc.low, price), high: max(ohlc.high, price), close: price}} |
| 31 | + else |
| 32 | + {ohlc, generate_ohlc(ohlc.symbol, ohlc.duration, price, trade_time)} |
| 33 | + end |
| 34 | + end |
| 35 | + |
| 36 | + def within_current_timeframe(start_time, duration, trade_time) do |
| 37 | + end_time = start_time + duration * 60 |
| 38 | + trade_time = div(trade_time, 1000) |
| 39 | + start_time <= trade_time && trade_time < end_time |
| 40 | + end |
| 41 | + |
| 42 | + def generate_ohlcs(symbol, price, trade_time) do |
| 43 | + [1, 5, 15, 60, 4 * 60, 24 * 60] |
| 44 | + |> Enum.map( |
| 45 | + &generate_ohlc( |
| 46 | + symbol, |
| 47 | + &1, |
| 48 | + price, |
| 49 | + trade_time |
| 50 | + ) |
| 51 | + ) |
| 52 | + end |
| 53 | + |
| 54 | + def generate_ohlc(symbol, duration, price, trade_time) do |
| 55 | + # start_time = div(div(div(trade_time, 1000), 60), duration) |
| 56 | + start_time = trade_time |> div(1000) |> div(60) |> div(duration) |
| 57 | + |
| 58 | + %__MODULE__{ |
| 59 | + symbol: symbol, |
| 60 | + start_time: start_time, |
| 61 | + duration: duration, |
| 62 | + open: price, |
| 63 | + high: price, |
| 64 | + low: price, |
| 65 | + close: price |
| 66 | + } |
| 67 | + end |
| 68 | + |
| 69 | + defp maybe_broadcast(nil), do: :ok |
| 70 | + |
| 71 | + defp maybe_broadcast(%__MODULE__{} = ohlc) do |
| 72 | + Logger.debug("Broadcasting OHLC: #{inspect(ohlc)}") |
| 73 | + |
| 74 | + @pubsub_client.broadcast( |
| 75 | + Core.PubSub, |
| 76 | + "OHLC:#{ohlc.symbol}", |
| 77 | + ohlc |
| 78 | + ) |
| 79 | + end |
| 80 | +end |
0 commit comments