diff --git a/pom.xml b/pom.xml
index bb8416b..cc1ffe2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,7 +20,7 @@
junit
junit
- 4.11
+ 4.13.2
test
@@ -71,6 +71,20 @@
+
+ pentaho-public
+ Pentaho Public
+ https://repo.orl.eng.hitachivantara.com/artifactory/pnt-mvn/
+
+ true
+ always
+
+
+ true
+ always
+
+
+
mavex-xx.atalssian.com
maven-xx.atalssian.com
@@ -107,7 +122,7 @@
mvnrepository
- http://repo1.maven.org/maven2
+ https://repo1.maven.org/maven2
false
@@ -201,10 +216,10 @@
maven-compiler-plugin
- 3.0
+ 3.6.0
- 1.6
- 1.6
+ 1.8
+ 1.8
-Xlint:unchecked
-Xlint:deprecation
@@ -242,4 +257,4 @@
-
\ No newline at end of file
+
diff --git a/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPlugin.java b/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPlugin.java
index 0f9fda6..9c6dc6e 100644
--- a/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPlugin.java
+++ b/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPlugin.java
@@ -141,6 +141,7 @@ private void initPluginStep() throws Exception
Integer port = null;
String body = environmentSubstitute(meta.getBodyField());
+ String contentTypeField = environmentSubstitute(meta.getContentTypeField());
String routing = environmentSubstitute(meta.getRouting());
String uri = environmentSubstitute(meta.getUri());
String host = environmentSubstitute(meta.getHost());
@@ -212,6 +213,14 @@ private void initPluginStep() throws Exception
}
}
+ if ( ! Const.isEmpty(contentTypeField)) {
+ data.contentTypeIndex = data.outputRowMeta.indexOfValue(contentTypeField);
+
+ if (data.contentTypeIndex < 0) {
+ throw new AMQPException("Unable to retrieve ContentType field : " + contentTypeField);
+ }
+ }
+
if (data.bodyFieldIndex < 0) {
throw new AMQPException("Unable to retrieve body field : " + body);
}
diff --git a/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPluginData.java b/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPluginData.java
index b78f808..0c8a129 100644
--- a/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPluginData.java
+++ b/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPluginData.java
@@ -36,6 +36,7 @@ public class AMQPPluginData extends BaseStepData implements StepDataInterface
public boolean isRequeue = false;
public String exchtype = "";
public String body = null;
+ public String content_type = null;
public String routing;
public String target;
public long amqpTag;
@@ -56,6 +57,7 @@ public class AMQPPluginData extends BaseStepData implements StepDataInterface
// Delayed confirmation realted
public Integer deliveryTagIndex = null;
+ public Integer contentTypeIndex = null;
public boolean activeConfirmation = false;
public String ackStepName = null;
diff --git a/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPluginDialog.java b/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPluginDialog.java
index 841b540..fd56bbb 100644
--- a/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPluginDialog.java
+++ b/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPluginDialog.java
@@ -160,6 +160,11 @@ public class AMQPPluginDialog extends BaseStepDialog implements StepDialogInterf
private FormData formBodyLabel;
private FormData formBodyText;
+ private Label labelContentTypeField;
+ private TextVar textContentTypeField;
+ private FormData formContentTypeLabel;
+ private FormData formContentTypeText;
+
private Label labelDeliveryTagField;
private TextVar textDeliveryTagField;
private FormData formDeliveryTagLabel;
@@ -600,6 +605,30 @@ public String open()
textBodyField.setLayoutData(formBodyText);
+ // ContentType line
+ labelContentTypeField = new Label(wConnectionComp, SWT.RIGHT);
+ labelContentTypeField.setText(getString("AmqpPlugin.ContentType.Label"));
+ props.setLook(labelContentTypeField);
+
+ formContentTypeLabel = new FormData();
+ formContentTypeLabel.left = new FormAttachment(0, 0);
+ formContentTypeLabel.right = new FormAttachment(middle, -margin);
+ formContentTypeLabel.top = new FormAttachment(textBodyField , margin);
+
+ labelContentTypeField.setLayoutData(formContentTypeLabel);
+
+ textContentTypeField = new TextVar(transMeta, wConnectionComp, SWT.MULTI | SWT.LEFT | SWT.BORDER);
+
+ props.setLook(textContentTypeField);
+ textContentTypeField.addModifyListener(modifyListener);
+
+ formContentTypeText = new FormData();
+ formContentTypeText.left = new FormAttachment(middle, 0);
+ formContentTypeText.right = new FormAttachment(100, 0);
+ formContentTypeText.top = new FormAttachment(textBodyField, margin);
+
+ textContentTypeField.setLayoutData(formContentTypeText);
+
// DeliveryTag
labelDeliveryTagField = new Label(wConnectionComp, SWT.RIGHT);
labelDeliveryTagField.setText(getString("AmqpPlugin.DeliveryTag.Label"));
@@ -608,7 +637,7 @@ public String open()
formDeliveryTagLabel = new FormData();
formDeliveryTagLabel.left = new FormAttachment(0, 0);
formDeliveryTagLabel.right = new FormAttachment(middle, -margin);
- formDeliveryTagLabel.top = new FormAttachment(textBodyField , margin);
+ formDeliveryTagLabel.top = new FormAttachment(textContentTypeField , margin);
labelDeliveryTagField.setLayoutData(formDeliveryTagLabel);
@@ -620,7 +649,7 @@ public String open()
formDeliveryTagText = new FormData();
formDeliveryTagText.left = new FormAttachment(middle, 0);
formDeliveryTagText.right = new FormAttachment(100, 0);
- formDeliveryTagText.top = new FormAttachment(textBodyField, margin);
+ formDeliveryTagText.top = new FormAttachment(textContentTypeField, margin);
textDeliveryTagField.setLayoutData(formDeliveryTagText);
@@ -1299,6 +1328,7 @@ public void getData()
setFieldText(textUsername, input.getUsername());
setFieldText(textPassword, input.getPassword());
setFieldText(textBodyField, input.getBodyField());
+ setFieldText(textContentTypeField, input.getContentTypeField());
setFieldText(textDeliveryTagField, input.getDeliveryTagField());
setFieldText(textAckStepName, input.getAckStepName());
@@ -1352,6 +1382,7 @@ private void ok()
input.setUri(getFieldText(textURI));
input.setTarget(getFieldText(textTarget));
input.setBodyField(getFieldText(textBodyField));
+ input.setContentTypeField(getFieldText(textContentTypeField));
input.setDeliveryTagField(getFieldText(textDeliveryTagField));
input.setUsername(getFieldText(textUsername));
diff --git a/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPluginMeta.java b/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPluginMeta.java
index ebd69bd..72788a1 100644
--- a/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPluginMeta.java
+++ b/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPluginMeta.java
@@ -37,6 +37,7 @@ public class AMQPPluginMeta extends BaseStepMeta implements StepMetaInterface
{
private static final String FIELD_TRANSACTIONAL = "transactional";
private static final String FIELD_BODY_FIELD = "body_field";
+ private static final String FIELD_CONTENTTYPE_FIELD ="content_type";
private static final String FIELD_ROUTING = "routing";
private static final String FIELD_DELIVERYTAG_FIELD = "deliverytag_field";
private static final String FIELD_LIMIT = "limit";
@@ -73,6 +74,7 @@ public class AMQPPluginMeta extends BaseStepMeta implements StepMetaInterface
private static final String DEFAULT_BODY_FIELD = "message";
private static final String DEFAULT_DELIVERYTAG_FIELD = "amqpdeliverytag";
+ private static final String DEFAULT_CONTENTTYPE_FIELD = "amqpcontenttype";
private static final String DEFAULT_EXCHTYPE = AMQPPluginData.EXCHTYPE_DIRECT;
private String uri;
@@ -97,6 +99,7 @@ public class AMQPPluginMeta extends BaseStepMeta implements StepMetaInterface
private boolean transactional = false;
private String bodyField = DEFAULT_BODY_FIELD;
private String deliveryTagField = DEFAULT_DELIVERYTAG_FIELD;
+ private String contentTypeField = DEFAULT_CONTENTTYPE_FIELD;
public String ackStepName = null;
public String ackStepDeliveryTagField = null;
public String rejectStepName = null;
@@ -203,6 +206,7 @@ public String getXML()
bufer.append(" ").append(XMLHandler.addTagValue(FIELD_TRANSACTIONAL, isTransactional()));
bufer.append(" ").append(XMLHandler.addTagValue(FIELD_BODY_FIELD, getBodyField()));
+ bufer.append(" ").append(XMLHandler.addTagValue(FIELD_CONTENTTYPE_FIELD, getContentTypeField()));
bufer.append(" ").append(XMLHandler.addTagValue(FIELD_DELIVERYTAG_FIELD, getDeliveryTagField()));
bufer.append(" ").append(XMLHandler.addTagValue(FIELD_LIMIT, getLimitString()));
bufer.append(" ").append(XMLHandler.addTagValue(FIELD_PREFETCHCOUNT, getPrefetchCountString()));
@@ -251,6 +255,7 @@ public void loadXML(Node stepnode, List databases, IMetaStore ims)
try {
setTransactional(XMLHandler.getTagValue(stepnode, FIELD_TRANSACTIONAL));
setBodyField(XMLHandler.getTagValue(stepnode, FIELD_BODY_FIELD));
+ setContentTypeField(XMLHandler.getTagValue(stepnode, FIELD_CONTENTTYPE_FIELD));
setDeliveryTagField(XMLHandler.getTagValue(stepnode, FIELD_DELIVERYTAG_FIELD));
setRouting(XMLHandler.getTagValue(stepnode, FIELD_ROUTING));
setTarget(XMLHandler.getTagValue(stepnode, FIELD_TARGET));
@@ -305,6 +310,7 @@ public void readRep(Repository rep, IMetaStore ims, ObjectId idStep, List getStepInjectionMetadataEntries() throws Ket
, Entry.AMQP_PORT
, Entry.AMQP_VHOST
, Entry.BODY
+ , Entry.CONTENTTYPE
, Entry.DELIVERYTAG
, Entry.TARGET
, Entry.ROUTING
@@ -217,6 +219,9 @@ public void injectStepMetadataEntries( List all ) throws
case BODY:
meta.setBodyField( lookValue );
break;
+ case CONTENTTYPE:
+ meta.setContentTypeField( lookValue );
+ break;
case DELIVERYTAG:
meta.setDeliveryTagField( lookValue );
break;
@@ -321,6 +326,7 @@ public List extractStepMetadataEntries() throws KettleEx
list.add( StepInjectionUtil.getEntry( Entry.AMQP_URI, meta.getUri() ) );
list.add( StepInjectionUtil.getEntry( Entry.BODY, meta.getBodyField() ) );
+ list.add( StepInjectionUtil.getEntry( Entry.CONTENTTYPE, meta.getContentTypeField() ) );
list.add( StepInjectionUtil.getEntry( Entry.DELIVERYTAG, meta.getDeliveryTagField() ) );
list.add( StepInjectionUtil.getEntry( Entry.TARGET, meta.getTarget() ) );
list.add( StepInjectionUtil.getEntry( Entry.ROUTING, meta.getRouting() ) );
diff --git a/src/main/java/com/instaclick/pentaho/plugin/amqp/messages/messages_en_US.properties b/src/main/java/com/instaclick/pentaho/plugin/amqp/messages/messages_en_US.properties
index 617a4d4..1b755dd 100644
--- a/src/main/java/com/instaclick/pentaho/plugin/amqp/messages/messages_en_US.properties
+++ b/src/main/java/com/instaclick/pentaho/plugin/amqp/messages/messages_en_US.properties
@@ -4,6 +4,7 @@ AmqpPlugin.Type.Label=Mode
AmqpPlugin.URI.Label=URI
AmqpPlugin.URIMasked.Label = URI (password masked)
AmqpPlugin.Body.Label=Body Field
+AmqpPlugin.ContentType.Label=Content Type Field
AmqpPlugin.DeliveryTag.Label=DeliveryTag Field
AmqpPlugin.Target.Label=Exchange/Queue name
AmqpPlugin.Exchange.Label=Exchange name
diff --git a/src/main/java/com/instaclick/pentaho/plugin/amqp/processor/BaseProcessor.java b/src/main/java/com/instaclick/pentaho/plugin/amqp/processor/BaseProcessor.java
index e85f8f1..a2add47 100644
--- a/src/main/java/com/instaclick/pentaho/plugin/amqp/processor/BaseProcessor.java
+++ b/src/main/java/com/instaclick/pentaho/plugin/amqp/processor/BaseProcessor.java
@@ -45,11 +45,27 @@ protected String getAmqpBody(final Object[] r) throws KettleStepException
return null;
}
+ protected String getAmqpContentType(final Object[] r) throws KettleStepException
+ {
+ if (hasAmqpContentType(r)) {
+ return (r[data.contentTypeIndex] == null) ? null : r[data.contentTypeIndex].toString();
+ }
+
+ logUndefinedRow(r, "Invalid content type", "ICAmqpPlugin003");
+
+ return null;
+ }
+
protected boolean hasAmqpRoutingKey(final Object[] r)
{
return rowContains(r, data.routingIndex);
}
+ protected boolean hasAmqpContentType(final Object[] r)
+ {
+ return rowContains(r, data.contentTypeIndex);
+ }
+
protected boolean hasAmqpBody(final Object[] r)
{
return rowContains(r, data.bodyFieldIndex);
diff --git a/src/main/java/com/instaclick/pentaho/plugin/amqp/processor/ProducerProcessor.java b/src/main/java/com/instaclick/pentaho/plugin/amqp/processor/ProducerProcessor.java
index 30d57b9..67d4ed6 100644
--- a/src/main/java/com/instaclick/pentaho/plugin/amqp/processor/ProducerProcessor.java
+++ b/src/main/java/com/instaclick/pentaho/plugin/amqp/processor/ProducerProcessor.java
@@ -4,6 +4,7 @@
import com.instaclick.pentaho.plugin.amqp.AMQPPlugin;
import com.instaclick.pentaho.plugin.amqp.AMQPPluginData;
import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.AMQP;
import java.io.IOException;
import java.util.List;
import org.pentaho.di.core.exception.KettleStepException;
@@ -31,12 +32,20 @@ public boolean process(Object[] r) throws KettleStepException, IOException
}
data.body = getAmqpBody(r);
+ data.content_type = (data.contentTypeIndex != null)
+ ? getAmqpContentType(r)
+ : "";
+
data.routing = (data.routingIndex != null)
? getAmqpRoutingKey(r)
: "";
// publish the current message
- channel.basicPublish(data.target, data.routing, null, data.body.getBytes());
+ channel.basicPublish(data.target, data.routing,
+ new AMQP.BasicProperties.Builder()
+ .contentType(data.content_type)
+ .build(),
+ data.body.getBytes());
// put the row to the output row stream
plugin.putRow(data.outputRowMeta, r);