diff --git a/pom.xml b/pom.xml index bb8416b..cc1ffe2 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ junit junit - 4.11 + 4.13.2 test @@ -71,6 +71,20 @@ + + pentaho-public + Pentaho Public + https://repo.orl.eng.hitachivantara.com/artifactory/pnt-mvn/ + + true + always + + + true + always + + + mavex-xx.atalssian.com maven-xx.atalssian.com @@ -107,7 +122,7 @@ mvnrepository - http://repo1.maven.org/maven2 + https://repo1.maven.org/maven2 false @@ -201,10 +216,10 @@ maven-compiler-plugin - 3.0 + 3.6.0 - 1.6 - 1.6 + 1.8 + 1.8 -Xlint:unchecked -Xlint:deprecation @@ -242,4 +257,4 @@ - \ No newline at end of file + diff --git a/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPlugin.java b/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPlugin.java index 0f9fda6..9c6dc6e 100644 --- a/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPlugin.java +++ b/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPlugin.java @@ -141,6 +141,7 @@ private void initPluginStep() throws Exception Integer port = null; String body = environmentSubstitute(meta.getBodyField()); + String contentTypeField = environmentSubstitute(meta.getContentTypeField()); String routing = environmentSubstitute(meta.getRouting()); String uri = environmentSubstitute(meta.getUri()); String host = environmentSubstitute(meta.getHost()); @@ -212,6 +213,14 @@ private void initPluginStep() throws Exception } } + if ( ! Const.isEmpty(contentTypeField)) { + data.contentTypeIndex = data.outputRowMeta.indexOfValue(contentTypeField); + + if (data.contentTypeIndex < 0) { + throw new AMQPException("Unable to retrieve ContentType field : " + contentTypeField); + } + } + if (data.bodyFieldIndex < 0) { throw new AMQPException("Unable to retrieve body field : " + body); } diff --git a/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPluginData.java b/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPluginData.java index b78f808..0c8a129 100644 --- a/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPluginData.java +++ b/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPluginData.java @@ -36,6 +36,7 @@ public class AMQPPluginData extends BaseStepData implements StepDataInterface public boolean isRequeue = false; public String exchtype = ""; public String body = null; + public String content_type = null; public String routing; public String target; public long amqpTag; @@ -56,6 +57,7 @@ public class AMQPPluginData extends BaseStepData implements StepDataInterface // Delayed confirmation realted public Integer deliveryTagIndex = null; + public Integer contentTypeIndex = null; public boolean activeConfirmation = false; public String ackStepName = null; diff --git a/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPluginDialog.java b/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPluginDialog.java index 841b540..fd56bbb 100644 --- a/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPluginDialog.java +++ b/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPluginDialog.java @@ -160,6 +160,11 @@ public class AMQPPluginDialog extends BaseStepDialog implements StepDialogInterf private FormData formBodyLabel; private FormData formBodyText; + private Label labelContentTypeField; + private TextVar textContentTypeField; + private FormData formContentTypeLabel; + private FormData formContentTypeText; + private Label labelDeliveryTagField; private TextVar textDeliveryTagField; private FormData formDeliveryTagLabel; @@ -600,6 +605,30 @@ public String open() textBodyField.setLayoutData(formBodyText); + // ContentType line + labelContentTypeField = new Label(wConnectionComp, SWT.RIGHT); + labelContentTypeField.setText(getString("AmqpPlugin.ContentType.Label")); + props.setLook(labelContentTypeField); + + formContentTypeLabel = new FormData(); + formContentTypeLabel.left = new FormAttachment(0, 0); + formContentTypeLabel.right = new FormAttachment(middle, -margin); + formContentTypeLabel.top = new FormAttachment(textBodyField , margin); + + labelContentTypeField.setLayoutData(formContentTypeLabel); + + textContentTypeField = new TextVar(transMeta, wConnectionComp, SWT.MULTI | SWT.LEFT | SWT.BORDER); + + props.setLook(textContentTypeField); + textContentTypeField.addModifyListener(modifyListener); + + formContentTypeText = new FormData(); + formContentTypeText.left = new FormAttachment(middle, 0); + formContentTypeText.right = new FormAttachment(100, 0); + formContentTypeText.top = new FormAttachment(textBodyField, margin); + + textContentTypeField.setLayoutData(formContentTypeText); + // DeliveryTag labelDeliveryTagField = new Label(wConnectionComp, SWT.RIGHT); labelDeliveryTagField.setText(getString("AmqpPlugin.DeliveryTag.Label")); @@ -608,7 +637,7 @@ public String open() formDeliveryTagLabel = new FormData(); formDeliveryTagLabel.left = new FormAttachment(0, 0); formDeliveryTagLabel.right = new FormAttachment(middle, -margin); - formDeliveryTagLabel.top = new FormAttachment(textBodyField , margin); + formDeliveryTagLabel.top = new FormAttachment(textContentTypeField , margin); labelDeliveryTagField.setLayoutData(formDeliveryTagLabel); @@ -620,7 +649,7 @@ public String open() formDeliveryTagText = new FormData(); formDeliveryTagText.left = new FormAttachment(middle, 0); formDeliveryTagText.right = new FormAttachment(100, 0); - formDeliveryTagText.top = new FormAttachment(textBodyField, margin); + formDeliveryTagText.top = new FormAttachment(textContentTypeField, margin); textDeliveryTagField.setLayoutData(formDeliveryTagText); @@ -1299,6 +1328,7 @@ public void getData() setFieldText(textUsername, input.getUsername()); setFieldText(textPassword, input.getPassword()); setFieldText(textBodyField, input.getBodyField()); + setFieldText(textContentTypeField, input.getContentTypeField()); setFieldText(textDeliveryTagField, input.getDeliveryTagField()); setFieldText(textAckStepName, input.getAckStepName()); @@ -1352,6 +1382,7 @@ private void ok() input.setUri(getFieldText(textURI)); input.setTarget(getFieldText(textTarget)); input.setBodyField(getFieldText(textBodyField)); + input.setContentTypeField(getFieldText(textContentTypeField)); input.setDeliveryTagField(getFieldText(textDeliveryTagField)); input.setUsername(getFieldText(textUsername)); diff --git a/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPluginMeta.java b/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPluginMeta.java index ebd69bd..72788a1 100644 --- a/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPluginMeta.java +++ b/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPluginMeta.java @@ -37,6 +37,7 @@ public class AMQPPluginMeta extends BaseStepMeta implements StepMetaInterface { private static final String FIELD_TRANSACTIONAL = "transactional"; private static final String FIELD_BODY_FIELD = "body_field"; + private static final String FIELD_CONTENTTYPE_FIELD ="content_type"; private static final String FIELD_ROUTING = "routing"; private static final String FIELD_DELIVERYTAG_FIELD = "deliverytag_field"; private static final String FIELD_LIMIT = "limit"; @@ -73,6 +74,7 @@ public class AMQPPluginMeta extends BaseStepMeta implements StepMetaInterface private static final String DEFAULT_BODY_FIELD = "message"; private static final String DEFAULT_DELIVERYTAG_FIELD = "amqpdeliverytag"; + private static final String DEFAULT_CONTENTTYPE_FIELD = "amqpcontenttype"; private static final String DEFAULT_EXCHTYPE = AMQPPluginData.EXCHTYPE_DIRECT; private String uri; @@ -97,6 +99,7 @@ public class AMQPPluginMeta extends BaseStepMeta implements StepMetaInterface private boolean transactional = false; private String bodyField = DEFAULT_BODY_FIELD; private String deliveryTagField = DEFAULT_DELIVERYTAG_FIELD; + private String contentTypeField = DEFAULT_CONTENTTYPE_FIELD; public String ackStepName = null; public String ackStepDeliveryTagField = null; public String rejectStepName = null; @@ -203,6 +206,7 @@ public String getXML() bufer.append(" ").append(XMLHandler.addTagValue(FIELD_TRANSACTIONAL, isTransactional())); bufer.append(" ").append(XMLHandler.addTagValue(FIELD_BODY_FIELD, getBodyField())); + bufer.append(" ").append(XMLHandler.addTagValue(FIELD_CONTENTTYPE_FIELD, getContentTypeField())); bufer.append(" ").append(XMLHandler.addTagValue(FIELD_DELIVERYTAG_FIELD, getDeliveryTagField())); bufer.append(" ").append(XMLHandler.addTagValue(FIELD_LIMIT, getLimitString())); bufer.append(" ").append(XMLHandler.addTagValue(FIELD_PREFETCHCOUNT, getPrefetchCountString())); @@ -251,6 +255,7 @@ public void loadXML(Node stepnode, List databases, IMetaStore ims) try { setTransactional(XMLHandler.getTagValue(stepnode, FIELD_TRANSACTIONAL)); setBodyField(XMLHandler.getTagValue(stepnode, FIELD_BODY_FIELD)); + setContentTypeField(XMLHandler.getTagValue(stepnode, FIELD_CONTENTTYPE_FIELD)); setDeliveryTagField(XMLHandler.getTagValue(stepnode, FIELD_DELIVERYTAG_FIELD)); setRouting(XMLHandler.getTagValue(stepnode, FIELD_ROUTING)); setTarget(XMLHandler.getTagValue(stepnode, FIELD_TARGET)); @@ -305,6 +310,7 @@ public void readRep(Repository rep, IMetaStore ims, ObjectId idStep, List getStepInjectionMetadataEntries() throws Ket , Entry.AMQP_PORT , Entry.AMQP_VHOST , Entry.BODY + , Entry.CONTENTTYPE , Entry.DELIVERYTAG , Entry.TARGET , Entry.ROUTING @@ -217,6 +219,9 @@ public void injectStepMetadataEntries( List all ) throws case BODY: meta.setBodyField( lookValue ); break; + case CONTENTTYPE: + meta.setContentTypeField( lookValue ); + break; case DELIVERYTAG: meta.setDeliveryTagField( lookValue ); break; @@ -321,6 +326,7 @@ public List extractStepMetadataEntries() throws KettleEx list.add( StepInjectionUtil.getEntry( Entry.AMQP_URI, meta.getUri() ) ); list.add( StepInjectionUtil.getEntry( Entry.BODY, meta.getBodyField() ) ); + list.add( StepInjectionUtil.getEntry( Entry.CONTENTTYPE, meta.getContentTypeField() ) ); list.add( StepInjectionUtil.getEntry( Entry.DELIVERYTAG, meta.getDeliveryTagField() ) ); list.add( StepInjectionUtil.getEntry( Entry.TARGET, meta.getTarget() ) ); list.add( StepInjectionUtil.getEntry( Entry.ROUTING, meta.getRouting() ) ); diff --git a/src/main/java/com/instaclick/pentaho/plugin/amqp/messages/messages_en_US.properties b/src/main/java/com/instaclick/pentaho/plugin/amqp/messages/messages_en_US.properties index 617a4d4..1b755dd 100644 --- a/src/main/java/com/instaclick/pentaho/plugin/amqp/messages/messages_en_US.properties +++ b/src/main/java/com/instaclick/pentaho/plugin/amqp/messages/messages_en_US.properties @@ -4,6 +4,7 @@ AmqpPlugin.Type.Label=Mode AmqpPlugin.URI.Label=URI AmqpPlugin.URIMasked.Label = URI (password masked) AmqpPlugin.Body.Label=Body Field +AmqpPlugin.ContentType.Label=Content Type Field AmqpPlugin.DeliveryTag.Label=DeliveryTag Field AmqpPlugin.Target.Label=Exchange/Queue name AmqpPlugin.Exchange.Label=Exchange name diff --git a/src/main/java/com/instaclick/pentaho/plugin/amqp/processor/BaseProcessor.java b/src/main/java/com/instaclick/pentaho/plugin/amqp/processor/BaseProcessor.java index e85f8f1..a2add47 100644 --- a/src/main/java/com/instaclick/pentaho/plugin/amqp/processor/BaseProcessor.java +++ b/src/main/java/com/instaclick/pentaho/plugin/amqp/processor/BaseProcessor.java @@ -45,11 +45,27 @@ protected String getAmqpBody(final Object[] r) throws KettleStepException return null; } + protected String getAmqpContentType(final Object[] r) throws KettleStepException + { + if (hasAmqpContentType(r)) { + return (r[data.contentTypeIndex] == null) ? null : r[data.contentTypeIndex].toString(); + } + + logUndefinedRow(r, "Invalid content type", "ICAmqpPlugin003"); + + return null; + } + protected boolean hasAmqpRoutingKey(final Object[] r) { return rowContains(r, data.routingIndex); } + protected boolean hasAmqpContentType(final Object[] r) + { + return rowContains(r, data.contentTypeIndex); + } + protected boolean hasAmqpBody(final Object[] r) { return rowContains(r, data.bodyFieldIndex); diff --git a/src/main/java/com/instaclick/pentaho/plugin/amqp/processor/ProducerProcessor.java b/src/main/java/com/instaclick/pentaho/plugin/amqp/processor/ProducerProcessor.java index 30d57b9..67d4ed6 100644 --- a/src/main/java/com/instaclick/pentaho/plugin/amqp/processor/ProducerProcessor.java +++ b/src/main/java/com/instaclick/pentaho/plugin/amqp/processor/ProducerProcessor.java @@ -4,6 +4,7 @@ import com.instaclick.pentaho.plugin.amqp.AMQPPlugin; import com.instaclick.pentaho.plugin.amqp.AMQPPluginData; import com.rabbitmq.client.Channel; +import com.rabbitmq.client.AMQP; import java.io.IOException; import java.util.List; import org.pentaho.di.core.exception.KettleStepException; @@ -31,12 +32,20 @@ public boolean process(Object[] r) throws KettleStepException, IOException } data.body = getAmqpBody(r); + data.content_type = (data.contentTypeIndex != null) + ? getAmqpContentType(r) + : ""; + data.routing = (data.routingIndex != null) ? getAmqpRoutingKey(r) : ""; // publish the current message - channel.basicPublish(data.target, data.routing, null, data.body.getBytes()); + channel.basicPublish(data.target, data.routing, + new AMQP.BasicProperties.Builder() + .contentType(data.content_type) + .build(), + data.body.getBytes()); // put the row to the output row stream plugin.putRow(data.outputRowMeta, r);