|
| 1 | +""" |
| 2 | +prometheus.py |
| 3 | +
|
| 4 | +A simple python script that pulls data from Prometheus's API, and |
| 5 | +stores it in a Deephaven table. |
| 6 | +
|
| 7 | +This is expected to be run within Deephaven's application mode https://deephaven.io/core/docs/how-to-guides/app-mode/. |
| 8 | +
|
| 9 | +After launching, there will be 2 tables within the "Panels" section of the Deephaven UI. |
| 10 | +One will be a static table and the other will be continually updating with real data. |
| 11 | +
|
| 12 | +@author Jake Mulford |
| 13 | +@copyright Deephaven Data Labs LLC |
| 14 | +""" |
| 15 | +from deephaven.TableTools import newTable, stringCol, dateTimeCol, doubleCol |
| 16 | +from deephaven import DynamicTableWriter |
| 17 | +from deephaven.DBTimeUtils import millisToTime |
| 18 | +import deephaven.Types as dht |
| 19 | +from typing import Callable |
| 20 | + |
| 21 | +import requests |
| 22 | + |
| 23 | +import threading |
| 24 | +import time |
| 25 | + |
| 26 | +PROMETHEUS_QUERIES = ["up", "go_memstats_alloc_bytes"] #Edit this and add your queries here |
| 27 | +BASE_URL = "{base}/api/v1/query".format(base="http://prometheus:9090") #Edit this to your base URL if you're not using a local Prometheus instance |
| 28 | + |
| 29 | +ApplicationState = jpy.get_type("io.deephaven.appmode.ApplicationState") |
| 30 | + |
| 31 | +def make_prometheus_request(prometheus_query, query_url): |
| 32 | + """ |
| 33 | + A helper method that makes a request on the Prometheus API with the given |
| 34 | + query, and returns a list of results containing the timestamp, job, instance, and value for the query. |
| 35 | + The data returned by this method will be stored in a Deephaven table. |
| 36 | +
|
| 37 | + This assumes that the query is going to return a "vector" type from the Prometheus API. |
| 38 | + https://prometheus.io/docs/prometheus/latest/querying/api/#instant-vectors |
| 39 | +
|
| 40 | + Args: |
| 41 | + prometheus_query (str): The Prometheus query to execute with the API request. |
| 42 | + query_url (str): The URL of the query endpoint. |
| 43 | + Returns: |
| 44 | + list[(date-time, str, str, float)]: List of the timestamps, jobs, instances, and values from the API response. |
| 45 | + """ |
| 46 | + results = [] |
| 47 | + query_parameters = { |
| 48 | + "query": prometheus_query |
| 49 | + } |
| 50 | + response = requests.get(query_url, params=query_parameters) |
| 51 | + response_json = response.json() |
| 52 | + |
| 53 | + if "data" in response_json.keys(): |
| 54 | + if "resultType" in response_json["data"] and response_json["data"]["resultType"] == "vector": |
| 55 | + for result in response_json["data"]["result"]: |
| 56 | + #Prometheus timestamps are in seconds. We multiply by 1000 to convert it to |
| 57 | + #milliseconds, then cast to an int() to use the millisToTime() method |
| 58 | + timestamp = millisToTime(int(result["value"][0] * 1000)) |
| 59 | + job = result["metric"]["job"] |
| 60 | + instance = result["metric"]["instance"] |
| 61 | + value = float(result["value"][1]) |
| 62 | + results.append((timestamp, job, instance, value)) |
| 63 | + return results |
| 64 | + |
| 65 | +def start_dynamic(app: ApplicationState): |
| 66 | + """ |
| 67 | + Deephaven Application Mode method that starts the dynamic data collector. |
| 68 | + """ |
| 69 | + column_names = ["DateTime", "PrometheusQuery", "Job", "Instance", "Value"] |
| 70 | + column_types = [dht.datetime, dht.string, dht.string, dht.string, dht.double] |
| 71 | + |
| 72 | + table_writer = DynamicTableWriter( |
| 73 | + column_names, |
| 74 | + column_types |
| 75 | + ) |
| 76 | + |
| 77 | + result = table_writer.getTable() |
| 78 | + |
| 79 | + def thread_func(): |
| 80 | + while True: |
| 81 | + for prometheus_query in PROMETHEUS_QUERIES: |
| 82 | + values = make_prometheus_request(prometheus_query, BASE_URL) |
| 83 | + |
| 84 | + for (date_time, job, instance, value) in values: |
| 85 | + table_writer.logRow(date_time, prometheus_query, job, instance, value) |
| 86 | + time.sleep(2) |
| 87 | + |
| 88 | + app.setField("result_dynamic", result) |
| 89 | + thread = threading.Thread(target = thread_func) |
| 90 | + thread.start() |
| 91 | + |
| 92 | +def start_static(app: ApplicationState, query_count=5): |
| 93 | + """ |
| 94 | + Deephaven Application Mode method that starts the static data collector. |
| 95 | +
|
| 96 | + query_count sets the number of requests to make. It is recommended to keep this number low, |
| 97 | + since it delays how long the Deephaven UI takes to become accessible. |
| 98 | + """ |
| 99 | + date_time_list = [] |
| 100 | + prometheus_query_list = [] |
| 101 | + job_list = [] |
| 102 | + instance_list = [] |
| 103 | + value_list = [] |
| 104 | + |
| 105 | + for i in range(query_count): |
| 106 | + for prometheus_query in PROMETHEUS_QUERIES: |
| 107 | + values = make_prometheus_request(prometheus_query, BASE_URL) |
| 108 | + |
| 109 | + for (date_time, job, instance, value) in values: |
| 110 | + date_time_list.append(date_time) |
| 111 | + prometheus_query_list.append(prometheus_query) |
| 112 | + job_list.append(job) |
| 113 | + instance_list.append(instance) |
| 114 | + value_list.append(value) |
| 115 | + time.sleep(2) |
| 116 | + |
| 117 | + result = newTable( |
| 118 | + dateTimeCol("DateTime", date_time_list), |
| 119 | + stringCol("PrometheusQuery", prometheus_query_list), |
| 120 | + stringCol("Job", job_list), |
| 121 | + stringCol("Instance", instance_list), |
| 122 | + doubleCol("Value", value_list) |
| 123 | + ) |
| 124 | + app.setField("result_static", result) |
| 125 | + |
| 126 | +def update(app: ApplicationState): |
| 127 | + """ |
| 128 | + Deephaven Application Mode method that does various updates on the initial tables. |
| 129 | +
|
| 130 | + You can throw any Deehaven Query in here. The ones in here are simply examples. |
| 131 | + """ |
| 132 | + #Get the tables from the app |
| 133 | + result_static = app.getField("result_static").value() |
| 134 | + result_dynamic = app.getField("result_dynamic").value() |
| 135 | + |
| 136 | + #Perform the desired queries, and set the results as new fields |
| 137 | + result_static_update = result_static.by("PrometheusQuery") |
| 138 | + app.setField("result_static_update", result_static_update) |
| 139 | + |
| 140 | + result_static_average = result_static.dropColumns("DateTime", "Job", "Instance").avgBy("PrometheusQuery") |
| 141 | + app.setField("result_static_average", result_static_average) |
| 142 | + |
| 143 | + result_dynamic_update = result_dynamic.by("PrometheusQuery") |
| 144 | + app.setField("result_dynamic_update", result_dynamic_update) |
| 145 | + |
| 146 | + result_dynamic_average = result_dynamic.dropColumns("DateTime", "Job", "Instance").avgBy("PrometheusQuery") |
| 147 | + app.setField("result_dynamic_average", result_dynamic_average) |
| 148 | + |
| 149 | +def initialize(func: Callable[[ApplicationState], None]): |
| 150 | + """ |
| 151 | + Deephaven Application Mode initialization method. |
| 152 | + """ |
| 153 | + app = jpy.get_type("io.deephaven.appmode.ApplicationContext").get() |
| 154 | + func(app) |
| 155 | + |
| 156 | +#Start the static and dynamic data collectors |
| 157 | +initialize(start_static) |
| 158 | +initialize(start_dynamic) |
| 159 | +#Run the table updates |
| 160 | +initialize(update) |
0 commit comments