Skip to content

Commit 837c136

Browse files
authored
Persist gRPC custom error messages (#1468)
1 parent e6be4cf commit 837c136

23 files changed

Lines changed: 238 additions & 87 deletions

File tree

runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java

Lines changed: 61 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public final class GrpcKafkaProxyFactory implements GrpcKafkaStreamFactory
7979
private static final int DATA_FLAG_COMPLETE = 0x03;
8080

8181
private static final String8FW HEADER_NAME_ZILLA_GRPC_STATUS = new String8FW("zilla:status");
82+
private static final String8FW HEADER_NAME_ZILLA_GRPC_MESSAGE = new String8FW("zilla:message");
8283
private static final String16FW HEADER_VALUE_GRPC_OK = new String16FW("0");
8384
private static final String16FW HEADER_VALUE_GRPC_ABORTED = new String16FW("10");
8485
private static final String16FW HEADER_VALUE_GRPC_INTERNAL_ERROR = new String16FW("13");
@@ -102,6 +103,8 @@ public final class GrpcKafkaProxyFactory implements GrpcKafkaStreamFactory
102103

103104
private final String16FW.Builder statusRW = new
104105
String16FW.Builder().wrap(new UnsafeBuffer(new byte[256], 0, 256), 0, 256);
106+
private final String16FW.Builder messageRW = new
107+
String16FW.Builder().wrap(new UnsafeBuffer(new byte[256], 0, 256), 0, 256);
105108

106109
private final BeginFW.Builder beginRW = new BeginFW.Builder();
107110
private final DataFW.Builder dataRW = new DataFW.Builder();
@@ -454,7 +457,7 @@ private void onGrpcData(
454457
}
455458
else
456459
{
457-
doGrpcReset(traceId, authorization, HEADER_VALUE_GRPC_ABORTED);
460+
doGrpcReset(traceId, authorization, HEADER_VALUE_GRPC_ABORTED, null);
458461
cleanup(traceId, authorization);
459462
}
460463
}
@@ -647,8 +650,8 @@ private void cleanup(
647650
long traceId,
648651
long authorization)
649652
{
650-
doGrpcReset(traceId, authorization, HEADER_VALUE_GRPC_INTERNAL_ERROR);
651-
doGrpcAbort(traceId, authorization, HEADER_VALUE_GRPC_INTERNAL_ERROR);
653+
doGrpcReset(traceId, authorization, HEADER_VALUE_GRPC_INTERNAL_ERROR, null);
654+
doGrpcAbort(traceId, authorization, HEADER_VALUE_GRPC_INTERNAL_ERROR, null);
652655

653656
fetch.doKafkaAbort(traceId, authorization);
654657
fetch.doKafkaReset(traceId, authorization);
@@ -731,20 +734,25 @@ private void doGrpcData(
731734
private void doGrpcAbort(
732735
long traceId,
733736
long authorization,
734-
String16FW status)
737+
String16FW status,
738+
String16FW message)
735739
{
736740
if (GrpcKafkaState.replyOpened(state) && !GrpcKafkaState.replyClosed(state))
737741
{
738742
replySeq = fetch.replySeq;
739743

740-
final GrpcAbortExFW grpcAbortEx =
744+
final GrpcAbortExFW.Builder grpcAbortEx =
741745
grpcAbortExRW.wrap(extBuffer, 0, extBuffer.capacity())
742746
.typeId(grpcTypeId)
743-
.status(status)
744-
.build();
747+
.status(status);
748+
749+
if (message != null)
750+
{
751+
grpcAbortEx.message(message);
752+
}
745753

746754
doAbort(grpc, originId, routedId, replyId, replySeq, replyAck, replyMax,
747-
traceId, authorization, grpcAbortEx);
755+
traceId, authorization, grpcAbortEx.build());
748756
}
749757
state = GrpcKafkaState.closeReply(state);
750758
}
@@ -780,20 +788,25 @@ private void doGrpcWindow(
780788
private void doGrpcReset(
781789
long traceId,
782790
long authorization,
783-
String16FW status)
791+
String16FW status,
792+
String16FW message)
784793
{
785794
if (!GrpcKafkaState.initialClosed(state))
786795
{
787796
state = GrpcKafkaState.closeInitial(state);
788797

789-
final GrpcResetExFW grpcResetEx =
798+
final GrpcResetExFW.Builder grpcResetEx =
790799
grpcResetExRW.wrap(extBuffer, 0, extBuffer.capacity())
791800
.typeId(grpcTypeId)
792-
.status(status)
793-
.build();
801+
.status(status);
802+
803+
if (message != null)
804+
{
805+
grpcResetEx.message(message);
806+
}
794807

795808
doReset(grpc, originId, routedId, initialId, initialSeq, initialAck, initialMax,
796-
traceId, authorization, grpcResetEx);
809+
traceId, authorization, grpcResetEx.build());
797810
}
798811
}
799812
}
@@ -1441,7 +1454,19 @@ protected void onKafkaData(
14411454
String16FW status = statusRW
14421455
.set(value.buffer(), value.offset(), value.sizeof())
14431456
.build();
1444-
doGrpcAbort(traceId, authorization, status);
1457+
1458+
String16FW message = null;
1459+
KafkaHeaderFW grpcMessage = kafkaDataEx.merged().fetch().headers()
1460+
.matchFirst(h -> HEADER_NAME_ZILLA_GRPC_MESSAGE.value().equals(h.name().value()));
1461+
if (grpcMessage != null)
1462+
{
1463+
OctetsFW messageValue = grpcMessage.value();
1464+
message = messageRW
1465+
.set(messageValue.buffer(), messageValue.offset(), messageValue.sizeof())
1466+
.build();
1467+
}
1468+
1469+
doGrpcAbort(traceId, authorization, status, message);
14451470
}
14461471
else
14471472
{
@@ -1493,7 +1518,7 @@ private void cleanup(
14931518
long authorization)
14941519
{
14951520
doGrpcReset(traceId, authorization);
1496-
doGrpcAbort(traceId, authorization, HEADER_VALUE_GRPC_INTERNAL_ERROR);
1521+
doGrpcAbort(traceId, authorization, HEADER_VALUE_GRPC_INTERNAL_ERROR, null);
14971522

14981523
producer.doKafkaAbort(traceId, authorization);
14991524
producer.doKafkaReset(traceId, authorization);
@@ -1583,21 +1608,26 @@ private void doGrpcData(
15831608
private void doGrpcAbort(
15841609
long traceId,
15851610
long authorization,
1586-
String16FW status)
1611+
String16FW status,
1612+
String16FW message)
15871613
{
15881614
if (GrpcKafkaState.replyOpening(state) &&
15891615
!GrpcKafkaState.replyClosed(state))
15901616
{
15911617
replySeq = correlater.replySeq;
15921618

1593-
final GrpcAbortExFW grpcAbortEx =
1619+
final GrpcAbortExFW.Builder grpcAbortEx =
15941620
grpcAbortExRW.wrap(extBuffer, 0, extBuffer.capacity())
15951621
.typeId(grpcTypeId)
1596-
.status(status)
1597-
.build();
1622+
.status(status);
1623+
1624+
if (message != null)
1625+
{
1626+
grpcAbortEx.message(message);
1627+
}
15981628

15991629
doAbort(grpc, originId, routedId, replyId, replySeq, replyAck, replyMax,
1600-
traceId, authorization, grpcAbortEx);
1630+
traceId, authorization, grpcAbortEx.build());
16011631
}
16021632
state = GrpcKafkaState.closeReply(state);
16031633
}
@@ -1931,7 +1961,7 @@ private void cleanup(
19311961
long authorization)
19321962
{
19331963
doGrpcReset(traceId, authorization);
1934-
doGrpcAbort(traceId, authorization, HEADER_VALUE_GRPC_INTERNAL_ERROR);
1964+
doGrpcAbort(traceId, authorization, HEADER_VALUE_GRPC_INTERNAL_ERROR, null);
19351965

19361966
delegate.doKafkaAbort(traceId, authorization);
19371967
delegate.doKafkaReset(traceId, authorization);
@@ -1970,19 +2000,24 @@ private void doGrpcData(
19702000
private void doGrpcAbort(
19712001
long traceId,
19722002
long authorization,
1973-
String16FW status)
2003+
String16FW status,
2004+
String16FW message)
19742005
{
19752006
if (GrpcKafkaState.replyOpening(state) &&
19762007
!GrpcKafkaState.replyClosed(state))
19772008
{
1978-
final GrpcAbortExFW grpcAbortEx =
2009+
final GrpcAbortExFW.Builder grpcAbortEx =
19792010
grpcAbortExRW.wrap(extBuffer, 0, extBuffer.capacity())
19802011
.typeId(grpcTypeId)
1981-
.status(status)
1982-
.build();
2012+
.status(status);
2013+
2014+
if (message != null)
2015+
{
2016+
grpcAbortEx.message(message);
2017+
}
19832018

19842019
doAbort(grpc, originId, routedId, replyId, replySeq, replyAck, replyMax,
1985-
traceId, authorization, grpcAbortEx);
2020+
traceId, authorization, grpcAbortEx.build());
19862021
}
19872022
state = GrpcKafkaState.closeReply(state);
19882023
}

runtime/binding-grpc/src/main/java/io/aklivity/zilla/runtime/binding/grpc/internal/stream/GrpcClientFactory.java

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public class GrpcClientFactory implements GrpcStreamFactory
6969
private static final String8FW HTTP_HEADER_CONTENT_TYPE = new String8FW("content-type");
7070
private static final String8FW HTTP_HEADER_TE = new String8FW("te");
7171
private static final String8FW HTTP_HEADER_GRPC_STATUS = new String8FW("grpc-status");
72+
private static final String8FW HTTP_HEADER_GRPC_MESSAGE = new String8FW("grpc-message");
7273

7374
private static final String16FW HTTP_HEADER_VALUE_METHOD_POST = new String16FW("POST");
7475
private static final String16FW HTTP_HEADER_VALUE_STATUS_200 = new String16FW("200");
@@ -90,6 +91,7 @@ public class GrpcClientFactory implements GrpcStreamFactory
9091
private final AbortFW abortRO = new AbortFW();
9192
private final WindowFW windowRO = new WindowFW();
9293
private final ResetFW resetRO = new ResetFW();
94+
private final String16FW messageRO = new String16FW();
9395

9496
private final BeginFW.Builder beginRW = new BeginFW.Builder();
9597
private final DataFW.Builder dataRW = new DataFW.Builder();
@@ -242,6 +244,7 @@ private final class GrpcClient
242244

243245
private int state;
244246
private String grpcStatus;
247+
private String grpcMessage;
245248

246249
private GrpcClient(
247250
MessageConsumer application,
@@ -480,9 +483,11 @@ private void doAppData(
480483
private void doAppAbortDeferring(
481484
long traceId,
482485
long authorization,
483-
String16FW grpcStatus)
486+
String16FW grpcStatus,
487+
String16FW grpcMessage)
484488
{
485489
this.grpcStatus = grpcStatus != null ? grpcStatus.asString() : null;
490+
this.grpcMessage = grpcMessage != null ? grpcMessage.asString() : null;
486491
this.state = GrpcState.closingReply(state);
487492

488493
if (GrpcState.replyOpened(state))
@@ -495,17 +500,24 @@ private void doAppAbortDeferred(
495500
long traceId,
496501
long authorization)
497502
{
498-
GrpcAbortExFW abortEx = grpcStatus != null
499-
? grpcAbortExRW.wrap(extBuffer, 0, extBuffer.capacity())
500-
.typeId(grpcTypeId)
501-
.status(grpcStatus)
502-
.build()
503-
: grpcAbortExRW.wrap(extBuffer, 0, extBuffer.capacity())
504-
.typeId(grpcTypeId)
505-
.status(HEADER_VALUE_GRPC_INTERNAL_ERROR)
506-
.build();
507-
508-
doAppAbort(traceId, authorization, abortEx);
503+
GrpcAbortExFW.Builder abortEx = grpcAbortExRW.wrap(extBuffer, 0, extBuffer.capacity())
504+
.typeId(grpcTypeId);
505+
506+
if (grpcStatus != null)
507+
{
508+
abortEx.status(grpcStatus);
509+
}
510+
else
511+
{
512+
abortEx.status(HEADER_VALUE_GRPC_INTERNAL_ERROR);
513+
}
514+
515+
if (grpcMessage != null)
516+
{
517+
abortEx.message(grpcMessage);
518+
}
519+
520+
doAppAbort(traceId, authorization, abortEx.build());
509521
}
510522

511523
private void doAppAbort(
@@ -770,13 +782,15 @@ private void onNetBegin(
770782

771783
String16FW status = HTTP_HEADER_VALUE_STATUS_200;
772784
String16FW grpcStatus = null;
785+
String16FW grpcMessage = null;
773786

774787
helper.visit(httpBeginEx);
775788

776789
if (httpBeginEx != null)
777790
{
778791
status = helper.status;
779792
grpcStatus = helper.grpcStatus;
793+
grpcMessage = helper.grpcMessage;
780794
}
781795

782796
assert acknowledge <= sequence;
@@ -799,7 +813,7 @@ private void onNetBegin(
799813
if (!HTTP_HEADER_VALUE_STATUS_200.equals(status) ||
800814
grpcStatus != null && !HEADER_VALUE_GRPC_OK.equals(grpcStatus))
801815
{
802-
delegate.doAppAbortDeferring(traceId, authorization, grpcStatus);
816+
delegate.doAppAbortDeferring(traceId, authorization, grpcStatus, grpcMessage);
803817
doNetReset(traceId, authorization);
804818
}
805819
}
@@ -875,6 +889,10 @@ private void onNetEnd(
875889

876890
final HttpEndExFW endEx = end.extension().get(endExRO::tryWrap);
877891
final Array32FW<HttpHeaderFW> trailers = endEx != null ? endEx.trailers() : TRAILERS_EMPTY;
892+
final HttpHeaderFW grpcMessage = trailers.matchFirst(t -> t.name().equals(HTTP_HEADER_GRPC_MESSAGE));
893+
final String16FW message = grpcMessage != null
894+
? messageRO.wrap(grpcMessage.value().buffer(), grpcMessage.value().offset(), grpcMessage.value().limit())
895+
: null;
878896
final HttpHeaderFW grpcStatus = trailers.matchFirst(t -> t.name().equals(HTTP_HEADER_GRPC_STATUS));
879897

880898
if (grpcStatus != null && HEADER_VALUE_GRPC_OK.equals(grpcStatus.value()))
@@ -884,7 +902,7 @@ private void onNetEnd(
884902
else
885903
{
886904
delegate.doAppAbortDeferring(traceId, authorization,
887-
grpcStatus != null ? grpcStatus.value() : HEADER_VALUE_GRPC_INTERNAL_ERROR);
905+
grpcStatus != null ? grpcStatus.value() : HEADER_VALUE_GRPC_INTERNAL_ERROR, message);
888906
}
889907
}
890908

0 commit comments

Comments
 (0)