Skip to content

Commit 9bcb952

Browse files
committed
feat: NiFi 2.x upgrade, ABAC enforcement processor, and passthrough mode
- Upgrade NiFi to 2.7.0 and Java 21; migrate ExpressionLanguageScope from VARIABLE_REGISTRY to ENVIRONMENT across all processors and controller service (NiFi 2.x breaking change) - Add ABACEnforcement processor: calls DSP GetDecisions to make ABAC permit/deny decisions on any flow file with tdf_attribute set; routes to permit/deny/failure relationships; designed for binary protocol flows (e.g. Link 16/JREAP-C) that cannot be TDF-wrapped - Add Enable Encryption property to ConvertToZTDF: when false, passes flow files through without TDF wrapping; mirrors the behavior of GATEWAY_ABAC_ENCRYPT_EMAIL=0 in the Virtru Gateway (tag-only mode) Signed-off-by: Joel C. <143835810+JBCongdon@users.noreply.github.com>
1 parent e761410 commit 9bcb952

8 files changed

Lines changed: 333 additions & 15 deletions

File tree

Lines changed: 290 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
package io.opentdf.nifi;
2+
3+
import io.opentdf.platform.authorization.AuthorizationServiceGrpc;
4+
import io.opentdf.platform.authorization.DecisionRequest;
5+
import io.opentdf.platform.authorization.DecisionResponse;
6+
import io.opentdf.platform.authorization.Entity;
7+
import io.opentdf.platform.authorization.EntityChain;
8+
import io.opentdf.platform.authorization.GetDecisionsRequest;
9+
import io.opentdf.platform.authorization.GetDecisionsResponse;
10+
import io.opentdf.platform.authorization.ResourceAttribute;
11+
import io.opentdf.platform.policy.Action;
12+
import io.opentdf.platform.sdk.SDK;
13+
import org.apache.nifi.annotation.behavior.ReadsAttribute;
14+
import org.apache.nifi.annotation.behavior.ReadsAttributes;
15+
import org.apache.nifi.annotation.behavior.WritesAttribute;
16+
import org.apache.nifi.annotation.behavior.WritesAttributes;
17+
import org.apache.nifi.annotation.documentation.CapabilityDescription;
18+
import org.apache.nifi.annotation.documentation.Tags;
19+
import org.apache.nifi.components.AllowableValue;
20+
import org.apache.nifi.components.PropertyDescriptor;
21+
import org.apache.nifi.expression.ExpressionLanguageScope;
22+
import org.apache.nifi.flowfile.FlowFile;
23+
import org.apache.nifi.processor.ProcessContext;
24+
import org.apache.nifi.processor.ProcessSession;
25+
import org.apache.nifi.processor.Relationship;
26+
import org.apache.nifi.processor.exception.ProcessException;
27+
import org.apache.nifi.processor.util.StandardValidators;
28+
29+
import java.util.Arrays;
30+
import java.util.Collections;
31+
import java.util.HashSet;
32+
import java.util.List;
33+
import java.util.Set;
34+
import java.util.concurrent.TimeUnit;
35+
36+
/**
37+
* Calls the DSP Authorization Service (GetDecisions) to make an ABAC permit/deny
38+
* decision for the flow file. Routes to "permit" or "deny" relationship based on
39+
* the decision response.
40+
*
41+
* Designed to work downstream of ParseJREAPC — uses the tdf_attribute flow file
42+
* attribute set by the parser as the resource attribute FQN list.
43+
*
44+
* NiFi flow:
45+
* [Source] → [ParseJREAPC] → [ABACEnforcement] → [permit] → forward
46+
* → [deny] → drop/audit
47+
* → [failure] → error handling
48+
*/
49+
@CapabilityDescription("Calls the DSP Authorization Service GetDecisions endpoint to make an ABAC " +
50+
"permit/deny decision for the flow file. Routes to 'permit' or 'deny' based on the response. " +
51+
"Intended for use with JREAP-C and other binary protocol flows that require policy enforcement " +
52+
"without TDF encryption.")
53+
@Tags({"ABAC", "authorization", "DSP", "OpenTDF", "policy", "JREAP-C", "enforcement", "permit", "deny"})
54+
@ReadsAttributes({
55+
@ReadsAttribute(attribute = "tdf_attribute",
56+
description = "Comma-separated resource attribute FQNs used as the resource context " +
57+
"for the authorization decision. Set automatically by ParseJREAPC when " +
58+
"'Classification Attribute Namespace' is configured."),
59+
@ReadsAttribute(attribute = "jreapc.classification",
60+
description = "Used in the abac.decision_reason attribute on the output flow file."),
61+
})
62+
@WritesAttributes({
63+
@WritesAttribute(attribute = "abac.decision",
64+
description = "PERMIT or DENY"),
65+
@WritesAttribute(attribute = "abac.entity_id",
66+
description = "The entity ID used in the authorization request"),
67+
@WritesAttribute(attribute = "abac.resource_attributes",
68+
description = "Comma-separated resource attribute FQNs that were evaluated"),
69+
@WritesAttribute(attribute = "abac.processing_time_ms",
70+
description = "Time taken for the GetDecisions call in milliseconds"),
71+
})
72+
public class ABACEnforcement extends AbstractTDFProcessor {
73+
74+
// ─── Relationships ────────────────────────────────────────────────────────
75+
76+
static final Relationship REL_PERMIT = new Relationship.Builder()
77+
.name("permit")
78+
.description("Authorization service returned DECISION_PERMIT")
79+
.build();
80+
81+
static final Relationship REL_DENY = new Relationship.Builder()
82+
.name("deny")
83+
.description("Authorization service returned DECISION_DENY")
84+
.build();
85+
86+
@Override
87+
public Set<Relationship> getRelationships() {
88+
return new HashSet<>(Arrays.asList(REL_PERMIT, REL_DENY, REL_FAILURE));
89+
}
90+
91+
// ─── Properties ──────────────────────────────────────────────────────────
92+
93+
static final AllowableValue ENTITY_TYPE_CLIENT_ID = new AllowableValue("CLIENT_ID", "Client ID");
94+
static final AllowableValue ENTITY_TYPE_EMAIL = new AllowableValue("EMAIL", "Email Address");
95+
static final AllowableValue ENTITY_TYPE_USERNAME = new AllowableValue("USERNAME", "Username");
96+
97+
static final PropertyDescriptor ENTITY_ID = new PropertyDescriptor.Builder()
98+
.name("Entity ID")
99+
.displayName("Entity ID")
100+
.description("The entity (user, service account, or client) making the data access request. " +
101+
"Used as the subject in the GetDecisions call. Supports Expression Language to " +
102+
"read from flow file attributes (e.g. ${jwt.sub}).")
103+
.required(true)
104+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
105+
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
106+
.build();
107+
108+
static final PropertyDescriptor ENTITY_TYPE = new PropertyDescriptor.Builder()
109+
.name("Entity Type")
110+
.displayName("Entity Type")
111+
.description("How to interpret the Entity ID value.")
112+
.required(true)
113+
.allowableValues(ENTITY_TYPE_CLIENT_ID, ENTITY_TYPE_EMAIL, ENTITY_TYPE_USERNAME)
114+
.defaultValue("CLIENT_ID")
115+
.build();
116+
117+
static final PropertyDescriptor DEFAULT_RESOURCE_ATTRIBUTES = new PropertyDescriptor.Builder()
118+
.name("Default Resource Attribute FQNs")
119+
.displayName("Default Resource Attribute FQNs")
120+
.description("Comma-separated list of attribute value FQNs to use when the 'tdf_attribute' " +
121+
"flow file attribute is not set. Leave blank to require tdf_attribute on every message. " +
122+
"Example: https://classification.example.org/attr/level/value/secret")
123+
.required(false)
124+
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
125+
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
126+
.build();
127+
128+
static final PropertyDescriptor DECISION_TIMEOUT_SECONDS = new PropertyDescriptor.Builder()
129+
.name("Decision Timeout (seconds)")
130+
.displayName("Decision Timeout (seconds)")
131+
.description("Maximum time to wait for a GetDecisions response from the authorization service.")
132+
.required(true)
133+
.defaultValue("5")
134+
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
135+
.build();
136+
137+
static final PropertyDescriptor FAIL_OPEN = new PropertyDescriptor.Builder()
138+
.name("Fail Open")
139+
.displayName("Fail Open on Authorization Error")
140+
.description("When true, routes to 'permit' if the authorization service is unreachable " +
141+
"or returns an error. When false (default), routes to 'failure'.")
142+
.required(true)
143+
.allowableValues("true", "false")
144+
.defaultValue("false")
145+
.build();
146+
147+
@Override
148+
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
149+
List<PropertyDescriptor> props = new java.util.ArrayList<>(super.getSupportedPropertyDescriptors());
150+
props.add(ENTITY_ID);
151+
props.add(ENTITY_TYPE);
152+
props.add(DEFAULT_RESOURCE_ATTRIBUTES);
153+
props.add(DECISION_TIMEOUT_SECONDS);
154+
props.add(FAIL_OPEN);
155+
return Collections.unmodifiableList(props);
156+
}
157+
158+
// ─── Processor logic ─────────────────────────────────────────────────────
159+
160+
@Override
161+
void processFlowFiles(ProcessContext ctx, ProcessSession session, List<FlowFile> flowFiles)
162+
throws ProcessException {
163+
164+
SDK sdk = getTDFSDK(ctx);
165+
AuthorizationServiceGrpc.AuthorizationServiceFutureStub authStub =
166+
sdk.getServices().authorization();
167+
168+
int timeoutSeconds = ctx.getProperty(DECISION_TIMEOUT_SECONDS).asInteger();
169+
Boolean failOpenVal = ctx.getProperty(FAIL_OPEN).asBoolean();
170+
if (failOpenVal == null) {
171+
throw new ProcessException("Fail Open property did not resolve to 'true' or 'false'");
172+
}
173+
boolean failOpen = failOpenVal;
174+
175+
String defaultAttrFqns = ctx.getProperty(DEFAULT_RESOURCE_ATTRIBUTES).isSet()
176+
? ctx.getProperty(DEFAULT_RESOURCE_ATTRIBUTES).evaluateAttributeExpressions().getValue()
177+
: null;
178+
179+
for (FlowFile flowFile : flowFiles) {
180+
long startMs = System.currentTimeMillis();
181+
try {
182+
// Resolve entity ID
183+
String entityId = ctx.getProperty(ENTITY_ID)
184+
.evaluateAttributeExpressions(flowFile).getValue();
185+
String entityType = ctx.getProperty(ENTITY_TYPE).getValue();
186+
187+
// Resolve resource attributes from tdf_attribute or default
188+
String attrFqnsCsv = flowFile.getAttribute("tdf_attribute");
189+
if (attrFqnsCsv == null || attrFqnsCsv.isBlank()) {
190+
attrFqnsCsv = defaultAttrFqns;
191+
}
192+
if (attrFqnsCsv == null || attrFqnsCsv.isBlank()) {
193+
throw new ProcessException("No resource attributes: set tdf_attribute on flow file " +
194+
"or configure 'Default Resource Attribute FQNs'");
195+
}
196+
197+
// Build entity
198+
Entity.Builder entityBuilder = Entity.newBuilder().setId("entity-0");
199+
switch (entityType) {
200+
case "EMAIL" -> entityBuilder.setEmailAddress(entityId);
201+
case "USERNAME" -> entityBuilder.setUserName(entityId);
202+
default -> entityBuilder.setClientId(entityId);
203+
}
204+
Entity entity = entityBuilder.build();
205+
206+
// Build entity chain
207+
EntityChain entityChain = EntityChain.newBuilder()
208+
.setId("ec-0")
209+
.addEntities(entity)
210+
.build();
211+
212+
// Build resource attributes
213+
ResourceAttribute.Builder raBuilder = ResourceAttribute.newBuilder()
214+
.setResourceAttributesId("ra-0");
215+
for (String fqn : attrFqnsCsv.split(",")) {
216+
String trimmed = fqn.trim();
217+
if (!trimmed.isEmpty()) raBuilder.addAttributeValueFqns(trimmed);
218+
}
219+
ResourceAttribute resourceAttribute = raBuilder.build();
220+
221+
// Build action (TRANSMIT for data forwarding)
222+
Action action = Action.newBuilder()
223+
.setStandard(Action.StandardAction.STANDARD_ACTION_TRANSMIT)
224+
.build();
225+
226+
// Build and fire GetDecisions request
227+
DecisionRequest decisionRequest = DecisionRequest.newBuilder()
228+
.addActions(action)
229+
.addEntityChains(entityChain)
230+
.addResourceAttributes(resourceAttribute)
231+
.build();
232+
233+
GetDecisionsRequest request = GetDecisionsRequest.newBuilder()
234+
.addDecisionRequests(decisionRequest)
235+
.build();
236+
237+
GetDecisionsResponse response = authStub.getDecisions(request)
238+
.get(timeoutSeconds, TimeUnit.SECONDS);
239+
240+
long elapsedMs = System.currentTimeMillis() - startMs;
241+
242+
// Evaluate decision — empty response is not a permit
243+
if (response.getDecisionResponsesList().isEmpty()) {
244+
throw new ProcessException("Authorization service returned no decisions");
245+
}
246+
DecisionResponse.Decision overallDecision = DecisionResponse.Decision.DECISION_PERMIT;
247+
for (DecisionResponse dr : response.getDecisionResponsesList()) {
248+
if (dr.getDecision() != DecisionResponse.Decision.DECISION_PERMIT) {
249+
overallDecision = DecisionResponse.Decision.DECISION_DENY;
250+
break;
251+
}
252+
}
253+
254+
String decisionLabel = overallDecision == DecisionResponse.Decision.DECISION_PERMIT
255+
? "PERMIT" : "DENY";
256+
257+
flowFile = session.putAttribute(flowFile, "abac.decision", decisionLabel);
258+
flowFile = session.putAttribute(flowFile, "abac.entity_id", entityId);
259+
flowFile = session.putAttribute(flowFile, "abac.resource_attributes", attrFqnsCsv);
260+
flowFile = session.putAttribute(flowFile, "abac.processing_time_ms",
261+
String.valueOf(elapsedMs));
262+
263+
getLogger().info("ABAC decision: {} | attrs={} | {}ms",
264+
decisionLabel, attrFqnsCsv, elapsedMs);
265+
getLogger().debug("ABAC subject: {}", entityId);
266+
267+
Relationship rel = overallDecision == DecisionResponse.Decision.DECISION_PERMIT
268+
? REL_PERMIT : REL_DENY;
269+
session.transfer(flowFile, rel);
270+
271+
} catch (ProcessException pe) {
272+
// Local validation failures (missing attributes, bad config) are never
273+
// fail-open — unclassified or malformed flow files must not bypass policy.
274+
getLogger().error("ABAC request validation failed for FlowFile {}: {}",
275+
flowFile.getId(), pe.getMessage());
276+
session.transfer(flowFile, REL_FAILURE);
277+
} catch (Exception e) {
278+
// Remote call failures (network, timeout, service unavailable) respect failOpen.
279+
getLogger().error("ABAC authorization call failed for FlowFile {}", flowFile.getId(), e);
280+
if (failOpen) {
281+
flowFile = session.putAttribute(flowFile, "abac.decision", "PERMIT");
282+
flowFile = session.putAttribute(flowFile, "abac.error", e.getMessage());
283+
session.transfer(flowFile, REL_PERMIT);
284+
} else {
285+
session.transfer(flowFile, REL_FAILURE);
286+
}
287+
}
288+
}
289+
}
290+
}

nifi-tdf-processors/src/main/java/io/opentdf/nifi/AbstractTDFProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public AbstractTDFProcessor() {
4343
.name("FlowFile queue pull limit")
4444
.description("FlowFile queue pull size limit")
4545
.required(true)
46-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
46+
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
4747
.defaultValue("10")
4848
.addValidator(StandardValidators.INTEGER_VALIDATOR)
4949
.build();

nifi-tdf-processors/src/main/java/io/opentdf/nifi/AbstractToProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public AbstractToProcessor() {
3333
*/
3434
public static final PropertyDescriptor KAS_URL = new org.apache.nifi.components.PropertyDescriptor.Builder()
3535
.name("KAS URL")
36-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
36+
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
3737
.description("The KAS Url to use for encryption; this is a default if the kas_url attribute is not present in the flow file")
3838
.required(false)
3939
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)

nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToZTDF.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,26 @@ public ConvertToZTDF() {
6868
* - Required: false
6969
* - Default Value: false
7070
* - Allowable Values: true, false
71-
* - Expression Language Supported: {@link ExpressionLanguageScope#VARIABLE_REGISTRY}
71+
* - Expression Language Supported: {@link ExpressionLanguageScope#ENVIRONMENT}
7272
*/
73+
public static final PropertyDescriptor ENABLE_ENCRYPTION = new org.apache.nifi.components.PropertyDescriptor.Builder()
74+
.name("Enable Encryption")
75+
.description("When false, the flow file passes through without TDF encryption. " +
76+
"Use this for ABAC policy enforcement only (tag-only mode), mirroring the " +
77+
"GATEWAY_ABAC_ENCRYPT_EMAIL=0 behavior in the Virtru Gateway.")
78+
.required(true)
79+
.defaultValue("true")
80+
.allowableValues("true", "false")
81+
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
82+
.build();
83+
7384
public static final PropertyDescriptor SIGN_ASSERTIONS = new org.apache.nifi.components.PropertyDescriptor.Builder()
7485
.name("Sign Assertions")
7586
.description("sign assertions")
7687
.required(false)
7788
.defaultValue("false")
7889
.allowableValues("true", "false")
79-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
90+
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
8091
.build();
8192

8293
/**
@@ -94,7 +105,7 @@ public ConvertToZTDF() {
94105
.required(true)
95106
.identifiesControllerService(PrivateKeyService.class)
96107
.dependsOn(SIGN_ASSERTIONS, new AllowableValue("true"))
97-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
108+
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
98109
.build();
99110

100111

@@ -118,6 +129,7 @@ PrivateKeyService getPrivateKeyService(ProcessContext processContext) {
118129
@Override
119130
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
120131
List<PropertyDescriptor> propertyDescriptors = new ArrayList<>(super.getSupportedPropertyDescriptors());
132+
propertyDescriptors.add(ENABLE_ENCRYPTION);
121133
propertyDescriptors.add(PRIVATE_KEY_CONTROLLER_SERVICE);
122134
propertyDescriptors.add(SIGN_ASSERTIONS);
123135
return Collections.unmodifiableList(propertyDescriptors);
@@ -195,6 +207,11 @@ private void populateFieldFromMap(Map<?, ?> sourceMap, String key, Map<?, ?> des
195207
*/
196208
@Override
197209
void processFlowFiles(ProcessContext processContext, ProcessSession processSession, List<FlowFile> flowFiles) throws ProcessException {
210+
Boolean encryptionEnabled = processContext.getProperty(ENABLE_ENCRYPTION).evaluateAttributeExpressions().asBoolean();
211+
if (encryptionEnabled == null) {
212+
throw new ProcessException("Enable Encryption property did not resolve to 'true' or 'false'");
213+
}
214+
198215
SDK sdk = getTDFSDK(processContext);
199216
for (final FlowFile flowFile : flowFiles) {
200217
try {
@@ -203,6 +220,9 @@ void processFlowFiles(ProcessContext processContext, ProcessSession processSessi
203220
//build baseline TDF Config options
204221
List<Consumer<TDFConfig>> configurationOptions = new ArrayList<>(Arrays.asList(Config.withKasInformation(kasInfoList.toArray(new Config.KASInfo[0])),
205222
Config.withDataAttributes(dataAttributes.toArray(new String[0]))));
223+
if (!encryptionEnabled) {
224+
configurationOptions.add(cfg -> cfg.enableEncryption = false);
225+
}
206226
List<String> nifiAssertionAttributeKeys = flowFile.getAttributes().keySet().stream().filter(x->x.startsWith(TDF_ASSERTION_PREFIX)).toList();
207227
for(String nifiAssertionAttributeKey: nifiAssertionAttributeKeys) {
208228
getLogger().debug(String.format("Adding assertion for NiFi attribute = %s", nifiAssertionAttributeKey));

0 commit comments

Comments
 (0)