Skip to content
16 changes: 16 additions & 0 deletions pkg/quickwit/response_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,18 @@ func processLogsResponse(res *es.SearchResponse, target *Query, configuredFields
}
}

// Generate unique id per row. Grafana's virtualized log panel uses
// LogRowModel.uid (derived from the "id" field) as a cache key for
// row height measurements. Without unique ids, all rows share the
// same cache key, causing an infinite resetAfterIndex loop.
if _, hasId := doc["id"]; !hasId {
doc["id"] = fmt.Sprintf("%d", hitIdx)
}

docs[hitIdx] = doc
}

propNames["id"] = true
sortedPropNames := sortPropNames(propNames, configuredFields, true)
fields := processDocsToDataFrameFields(docs, sortedPropNames, configuredFields)

Expand Down Expand Up @@ -1074,18 +1083,25 @@ func flatten(target map[string]interface{}) map[string]interface{} {
// if shouldSortLogMessageField is true, and rest of propNames are ordered alphabetically
func sortPropNames(propNames map[string]bool, configuredFields es.ConfiguredFields, shouldSortLogMessageField bool) []string {
hasTimeField := false
hasLogMessageField := false

var sortedPropNames []string
for k := range propNames {
if configuredFields.TimeField != "" && k == configuredFields.TimeField {
hasTimeField = true
} else if shouldSortLogMessageField && configuredFields.LogMessageField != "" && k == configuredFields.LogMessageField {
hasLogMessageField = true
} else {
sortedPropNames = append(sortedPropNames, k)
}
}

sort.Strings(sortedPropNames)

if hasLogMessageField {
sortedPropNames = append([]string{configuredFields.LogMessageField}, sortedPropNames...)
}

if hasTimeField {
sortedPropNames = append([]string{configuredFields.TimeField}, sortedPropNames...)
}
Expand Down
41 changes: 5 additions & 36 deletions src/datasource/processResponse.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { DataFrame, DataLink, DataQueryRequest, DataQueryResponse, Field, FieldType } from "@grafana/data";
import { DataFrame, DataLink, DataQueryRequest, DataQueryResponse } from "@grafana/data";
import { getDataSourceSrv } from "@grafana/runtime";
import { BaseQuickwitDataSource } from './base';
import { DataLinkConfig, ElasticsearchQuery } from "../types";
Expand All @@ -16,48 +16,16 @@ export function getQueryResponseProcessor(datasource: BaseQuickwitDataSource, re
}
};
}
function getCustomFieldName(fieldname: string) { return `$qw_${fieldname}`; }

export function processLogsDataFrame(datasource: BaseQuickwitDataSource, dataFrame: DataFrame) {
// Ignore log volume dataframe, no need to add links or a displayed message field.
// Ignore log volume dataframe, no need to add links.
if (!dataFrame.refId || dataFrame.refId.startsWith('log-volume')) {
return;
}
// Skip empty dataframes
if (dataFrame.length===0 || dataFrame.fields.length === 0) {
return;
}
if (datasource.logMessageField) {
const messageFields = datasource.logMessageField.split(',');
let field_idx_list = [];
for (const messageField of messageFields) {
const field_idx = dataFrame.fields.findIndex((field) => field.name === messageField);
if (field_idx !== -1) {
field_idx_list.push(field_idx);
}
}
const displayedMessages = Array(dataFrame.length);
for (let idx = 0; idx < dataFrame.length; idx++) {
let displayedMessage = "";
// If we have only one field, we assume the field name is obvious for the user and we don't need to show it.
if (field_idx_list.length === 1) {
displayedMessage = `${dataFrame.fields[field_idx_list[0]].values[idx]}`;
} else {
for (const field_idx of field_idx_list) {
displayedMessage += ` ${dataFrame.fields[field_idx].name}=${dataFrame.fields[field_idx].values[idx]}`;
}
}
displayedMessages[idx] = displayedMessage.trim();
}

const newField: Field = {
name: getCustomFieldName('message'),
type: FieldType.string,
config: {},
values: displayedMessages,
};
const [timestamp, ...rest] = dataFrame.fields;
dataFrame.fields = [timestamp, newField, ...rest];
}

if (!datasource.dataLinks.length) {
return;
Expand All @@ -71,9 +39,10 @@ export function processLogsDataFrame(datasource: BaseQuickwitDataSource, dataFra
}

field.config = field.config || {};
field.config.links = [...(field.config.links || [], linksToApply.map(generateDataLink))];
field.config.links = [...(field.config.links || []), ...linksToApply.map(generateDataLink)];
}
}

function generateDataLink(linkConfig: DataLinkConfig): DataLink {
const dataSourceSrv = getDataSourceSrv();

Expand Down
211 changes: 15 additions & 196 deletions src/datasource/supplementaryQueries.ts
Original file line number Diff line number Diff line change
@@ -1,44 +1,28 @@
import {
DataFrame,
DataQueryRequest,
DataQueryResponse,
DataSourceApi,
DataSourceJsonData,
DataSourceWithSupplementaryQueriesSupport,
FieldColorModeId,
FieldType,
LoadingState,
LogLevel,
LogsVolumeCustomMetaData,
LogsVolumeType,
SupplementaryQueryType,
} from '@grafana/data';
import { BarAlignment, DataQuery, GraphDrawStyle, StackingMode } from "@grafana/schema";
import { colors } from "@grafana/ui";
import { getIntervalInfo } from '@/utils/time';
import { cloneDeep, groupBy } from "lodash";
import { Observable, isObservable, from } from 'rxjs';
import { cloneDeep } from "lodash";
import { BucketAggregation, ElasticsearchQuery } from '@/types';
import { BaseQuickwitDataSourceConstructor } from './base';

export const REF_ID_STARTER_LOG_VOLUME = 'log-volume-';

export function withSupplementaryQueries<T extends BaseQuickwitDataSourceConstructor> ( Base: T ){
return class DSWithSupplementaryQueries extends Base implements DataSourceWithSupplementaryQueriesSupport<ElasticsearchQuery> {

/**
* Returns an observable that will be used to fetch supplementary data based on the provided
* supplementary query type and original request.
* Returns a DataQueryRequest for the supplementary query type.
* Grafana's Explore layer handles the Observable lifecycle.
*/
getDataProvider(
getSupplementaryRequest(
type: SupplementaryQueryType,
request: DataQueryRequest<ElasticsearchQuery>
): Observable<DataQueryResponse> | undefined {
if (!this.getSupportedSupplementaryQueryTypes().includes(type)) {
return undefined;
}
): DataQueryRequest<ElasticsearchQuery> | undefined {
switch (type) {
case SupplementaryQueryType.LogsVolume:
return this.getLogsVolumeDataProvider(request);
return this.getLogsVolumeRequest(request);
default:
return undefined;
}
Expand All @@ -55,18 +39,15 @@ export function withSupplementaryQueries<T extends BaseQuickwitDataSourceConstru
* Returns a supplementary query to be used to fetch supplementary data based on the provided type and original query.
* If provided query is not suitable for provided supplementary query type, undefined should be returned.
*/
// FIXME: options should be of type SupplementaryQueryOptions but this type is not public.
getSupplementaryQuery(options: any, query: ElasticsearchQuery): ElasticsearchQuery | undefined {
getSupplementaryQuery(options: { type: SupplementaryQueryType }, query: ElasticsearchQuery): ElasticsearchQuery | undefined {
if (!this.getSupportedSupplementaryQueryTypes().includes(options.type)) {
return undefined;
}

let isQuerySuitable = false;

switch (options.type) {
case SupplementaryQueryType.LogsVolume:
case SupplementaryQueryType.LogsVolume: {
// it has to be a logs-producing range-query
isQuerySuitable = !!(query.metrics?.length === 1 && query.metrics[0].type === 'logs');
const isQuerySuitable = !!(query.metrics?.length === 1 && query.metrics[0].type === 'logs');
if (!isQuerySuitable) {
return undefined;
}
Expand Down Expand Up @@ -103,13 +84,16 @@ export function withSupplementaryQueries<T extends BaseQuickwitDataSourceConstru
metrics: [{ type: 'count', id: '1' }],
bucketAggs,
};
}

default:
return undefined;
}
}

getLogsVolumeDataProvider(request: DataQueryRequest<ElasticsearchQuery>): Observable<DataQueryResponse> | undefined {
private getLogsVolumeRequest(
request: DataQueryRequest<ElasticsearchQuery>
): DataQueryRequest<ElasticsearchQuery> | undefined {
const logsVolumeRequest = cloneDeep(request);
const targets = logsVolumeRequest.targets
.map((target) => this.getSupplementaryQuery({ type: SupplementaryQueryType.LogsVolume }, target))
Expand All @@ -119,172 +103,7 @@ export function withSupplementaryQueries<T extends BaseQuickwitDataSourceConstru
return undefined;
}

return queryLogsVolume(
this,
{ ...logsVolumeRequest, targets },
{
range: request.range,
targets: request.targets,
extractLevel: (dataFrame: any) => getLogLevelFromKey(dataFrame || ''),
}
);
}
};
}

// Copy/pasted from grafana/data as it is deprecated there.
function getLogLevelFromKey(dataframe: DataFrame): LogLevel {
const name = dataframe.fields[1].config.displayNameFromDS || ``;
const level = (LogLevel as any)[name.toString().toLowerCase()];
if (level) {
return level;
}
return LogLevel.unknown;
}

/**
* Creates an observable, which makes requests to get logs volume and aggregates results.
*/

export function queryLogsVolume<TQuery extends DataQuery, TOptions extends DataSourceJsonData>(
datasource: DataSourceApi<TQuery, TOptions>,
logsVolumeRequest: DataQueryRequest<TQuery>,
options: any
): Observable<DataQueryResponse> {
const timespan = options.range.to.valueOf() - options.range.from.valueOf();
const intervalInfo = getIntervalInfo(timespan, 400);

logsVolumeRequest.interval = intervalInfo.interval;
logsVolumeRequest.scopedVars.__interval = { value: intervalInfo.interval, text: intervalInfo.interval };

if (intervalInfo.intervalMs !== undefined) {
logsVolumeRequest.intervalMs = intervalInfo.intervalMs;
logsVolumeRequest.scopedVars.__interval_ms = { value: intervalInfo.intervalMs, text: intervalInfo.intervalMs };
return { ...logsVolumeRequest, targets };
}

logsVolumeRequest.hideFromInspector = true;

return new Observable((observer) => {
let logsVolumeData: DataFrame[] = [];
observer.next({
state: LoadingState.Loading,
error: undefined,
data: [],
});

const queryResponse = datasource.query(logsVolumeRequest);
const queryObservable = isObservable(queryResponse) ? queryResponse : from(queryResponse);

const subscription = queryObservable.subscribe({
complete: () => {
observer.complete();
},
next: (dataQueryResponse: DataQueryResponse) => {
const { error } = dataQueryResponse;
if (error !== undefined) {
observer.next({
state: LoadingState.Error,
error,
data: [],
});
observer.error(error);
} else {
const framesByRefId = groupBy(dataQueryResponse.data, 'refId');
logsVolumeData = dataQueryResponse.data.map((dataFrame) => {
let sourceRefId = dataFrame.refId || '';
if (sourceRefId.startsWith('log-volume-')) {
sourceRefId = sourceRefId.substr('log-volume-'.length);
}

const logsVolumeCustomMetaData: LogsVolumeCustomMetaData = {
logsVolumeType: LogsVolumeType.FullRange,
absoluteRange: { from: options.range.from.valueOf(), to: options.range.to.valueOf() },
datasourceName: datasource.name,
sourceQuery: options.targets.find((dataQuery: any) => dataQuery.refId === sourceRefId)!,
};

dataFrame.meta = {
...dataFrame.meta,
custom: {
...dataFrame.meta?.custom,
...logsVolumeCustomMetaData,
},
};
return updateLogsVolumeConfig(dataFrame, options.extractLevel, framesByRefId[dataFrame.refId].length === 1);
});

observer.next({
state: dataQueryResponse.state,
error: undefined,
data: logsVolumeData,
});
}
},
error: (error: any) => {
observer.next({
state: LoadingState.Error,
error: error,
data: [],
});
observer.error(error);
},
});
return () => {
subscription?.unsubscribe();
};
});
}
const updateLogsVolumeConfig = (
dataFrame: DataFrame,
extractLevel: (dataFrame: DataFrame) => LogLevel,
oneLevelDetected: boolean
): DataFrame => {
dataFrame.fields = dataFrame.fields.map((field) => {
if (field.type === FieldType.number) {
field.config = {
...field.config,
...getLogVolumeFieldConfig(extractLevel(dataFrame), oneLevelDetected),
};
}
return field;
});
return dataFrame;
};
const LogLevelColor = {
[LogLevel.critical]: colors[7],
[LogLevel.warning]: colors[1],
[LogLevel.error]: colors[4],
[LogLevel.info]: colors[0],
[LogLevel.debug]: colors[5],
[LogLevel.trace]: colors[2],
[LogLevel.unknown]: '#8e8e8e' // or '#bdc4cd',
};
/**
* Returns field configuration used to render logs volume bars
*/
function getLogVolumeFieldConfig(level: LogLevel, oneLevelDetected: boolean) {
const name = oneLevelDetected && level === LogLevel.unknown ? 'logs' : level;
const color = LogLevelColor[level];
return {
displayNameFromDS: name,
color: {
mode: FieldColorModeId.Fixed,
fixedColor: color,
},
custom: {
drawStyle: GraphDrawStyle.Bars,
barAlignment: BarAlignment.Center,
lineColor: color,
pointColor: color,
fillColor: color,
lineWidth: 1,
fillOpacity: 100,
stacking: {
mode: StackingMode.Normal,
group: 'A',
},
},
};
}


Loading