Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -71,6 +71,20 @@
</dependency>
</dependencies>
<repositories>
<repository>
<id>pentaho-public</id>
<name>Pentaho Public</name>
<url>https://repo.orl.eng.hitachivantara.com/artifactory/pnt-mvn/</url>
<releases>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
</releases>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
</snapshots>
</repository>
<!--
<repository>
<id>repository.pentaho.org</id>
<name>repository.pentaho.org</name>
Expand All @@ -86,6 +100,7 @@
<name>thirdparty.pentaho.org</name>
<url>http://repository.pentaho.org/artifactory/third-party</url>
</repository>
-->
<repository>
<id>mavex-xx.atalssian.com</id>
<name>maven-xx.atalssian.com</name>
Expand All @@ -107,7 +122,7 @@
</repository>
<repository>
<id>mvnrepository</id>
<url>http://repo1.maven.org/maven2</url>
<url>https://repo1.maven.org/maven2</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
Expand Down Expand Up @@ -201,10 +216,10 @@
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<version>3.6.0</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
<source>1.8</source>
<target>1.8</target>
<compilerArgument>-Xlint:unchecked</compilerArgument>
<compilerArgument>-Xlint:deprecation</compilerArgument>
</configuration>
Expand Down Expand Up @@ -242,4 +257,4 @@
</plugin>
</plugins>
</reporting>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ private void initPluginStep() throws Exception

Integer port = null;
String body = environmentSubstitute(meta.getBodyField());
String contentTypeField = environmentSubstitute(meta.getContentTypeField());
String routing = environmentSubstitute(meta.getRouting());
String uri = environmentSubstitute(meta.getUri());
String host = environmentSubstitute(meta.getHost());
Expand Down Expand Up @@ -212,6 +213,14 @@ private void initPluginStep() throws Exception
}
}

if ( ! Const.isEmpty(contentTypeField)) {
data.contentTypeIndex = data.outputRowMeta.indexOfValue(contentTypeField);

if (data.contentTypeIndex < 0) {
throw new AMQPException("Unable to retrieve ContentType field : " + contentTypeField);
}
}

if (data.bodyFieldIndex < 0) {
throw new AMQPException("Unable to retrieve body field : " + body);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class AMQPPluginData extends BaseStepData implements StepDataInterface
public boolean isRequeue = false;
public String exchtype = "";
public String body = null;
public String content_type = null;
public String routing;
public String target;
public long amqpTag;
Expand All @@ -56,6 +57,7 @@ public class AMQPPluginData extends BaseStepData implements StepDataInterface

// Delayed confirmation realted
public Integer deliveryTagIndex = null;
public Integer contentTypeIndex = null;

public boolean activeConfirmation = false;
public String ackStepName = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ public class AMQPPluginDialog extends BaseStepDialog implements StepDialogInterf
private FormData formBodyLabel;
private FormData formBodyText;

private Label labelContentTypeField;
private TextVar textContentTypeField;
private FormData formContentTypeLabel;
private FormData formContentTypeText;

private Label labelDeliveryTagField;
private TextVar textDeliveryTagField;
private FormData formDeliveryTagLabel;
Expand Down Expand Up @@ -600,6 +605,30 @@ public String open()

textBodyField.setLayoutData(formBodyText);

// ContentType line
labelContentTypeField = new Label(wConnectionComp, SWT.RIGHT);
labelContentTypeField.setText(getString("AmqpPlugin.ContentType.Label"));
props.setLook(labelContentTypeField);

formContentTypeLabel = new FormData();
formContentTypeLabel.left = new FormAttachment(0, 0);
formContentTypeLabel.right = new FormAttachment(middle, -margin);
formContentTypeLabel.top = new FormAttachment(textBodyField , margin);

labelContentTypeField.setLayoutData(formContentTypeLabel);

textContentTypeField = new TextVar(transMeta, wConnectionComp, SWT.MULTI | SWT.LEFT | SWT.BORDER);

props.setLook(textContentTypeField);
textContentTypeField.addModifyListener(modifyListener);

formContentTypeText = new FormData();
formContentTypeText.left = new FormAttachment(middle, 0);
formContentTypeText.right = new FormAttachment(100, 0);
formContentTypeText.top = new FormAttachment(textBodyField, margin);

textContentTypeField.setLayoutData(formContentTypeText);

// DeliveryTag
labelDeliveryTagField = new Label(wConnectionComp, SWT.RIGHT);
labelDeliveryTagField.setText(getString("AmqpPlugin.DeliveryTag.Label"));
Expand All @@ -608,7 +637,7 @@ public String open()
formDeliveryTagLabel = new FormData();
formDeliveryTagLabel.left = new FormAttachment(0, 0);
formDeliveryTagLabel.right = new FormAttachment(middle, -margin);
formDeliveryTagLabel.top = new FormAttachment(textBodyField , margin);
formDeliveryTagLabel.top = new FormAttachment(textContentTypeField , margin);

labelDeliveryTagField.setLayoutData(formDeliveryTagLabel);

Expand All @@ -620,7 +649,7 @@ public String open()
formDeliveryTagText = new FormData();
formDeliveryTagText.left = new FormAttachment(middle, 0);
formDeliveryTagText.right = new FormAttachment(100, 0);
formDeliveryTagText.top = new FormAttachment(textBodyField, margin);
formDeliveryTagText.top = new FormAttachment(textContentTypeField, margin);

textDeliveryTagField.setLayoutData(formDeliveryTagText);

Expand Down Expand Up @@ -1299,6 +1328,7 @@ public void getData()
setFieldText(textUsername, input.getUsername());
setFieldText(textPassword, input.getPassword());
setFieldText(textBodyField, input.getBodyField());
setFieldText(textContentTypeField, input.getContentTypeField());
setFieldText(textDeliveryTagField, input.getDeliveryTagField());

setFieldText(textAckStepName, input.getAckStepName());
Expand Down Expand Up @@ -1352,6 +1382,7 @@ private void ok()
input.setUri(getFieldText(textURI));
input.setTarget(getFieldText(textTarget));
input.setBodyField(getFieldText(textBodyField));
input.setContentTypeField(getFieldText(textContentTypeField));
input.setDeliveryTagField(getFieldText(textDeliveryTagField));

input.setUsername(getFieldText(textUsername));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class AMQPPluginMeta extends BaseStepMeta implements StepMetaInterface
{
private static final String FIELD_TRANSACTIONAL = "transactional";
private static final String FIELD_BODY_FIELD = "body_field";
private static final String FIELD_CONTENTTYPE_FIELD ="content_type";
private static final String FIELD_ROUTING = "routing";
private static final String FIELD_DELIVERYTAG_FIELD = "deliverytag_field";
private static final String FIELD_LIMIT = "limit";
Expand Down Expand Up @@ -73,6 +74,7 @@ public class AMQPPluginMeta extends BaseStepMeta implements StepMetaInterface

private static final String DEFAULT_BODY_FIELD = "message";
private static final String DEFAULT_DELIVERYTAG_FIELD = "amqpdeliverytag";
private static final String DEFAULT_CONTENTTYPE_FIELD = "amqpcontenttype";
private static final String DEFAULT_EXCHTYPE = AMQPPluginData.EXCHTYPE_DIRECT;

private String uri;
Expand All @@ -97,6 +99,7 @@ public class AMQPPluginMeta extends BaseStepMeta implements StepMetaInterface
private boolean transactional = false;
private String bodyField = DEFAULT_BODY_FIELD;
private String deliveryTagField = DEFAULT_DELIVERYTAG_FIELD;
private String contentTypeField = DEFAULT_CONTENTTYPE_FIELD;
public String ackStepName = null;
public String ackStepDeliveryTagField = null;
public String rejectStepName = null;
Expand Down Expand Up @@ -203,6 +206,7 @@ public String getXML()

bufer.append(" ").append(XMLHandler.addTagValue(FIELD_TRANSACTIONAL, isTransactional()));
bufer.append(" ").append(XMLHandler.addTagValue(FIELD_BODY_FIELD, getBodyField()));
bufer.append(" ").append(XMLHandler.addTagValue(FIELD_CONTENTTYPE_FIELD, getContentTypeField()));
bufer.append(" ").append(XMLHandler.addTagValue(FIELD_DELIVERYTAG_FIELD, getDeliveryTagField()));
bufer.append(" ").append(XMLHandler.addTagValue(FIELD_LIMIT, getLimitString()));
bufer.append(" ").append(XMLHandler.addTagValue(FIELD_PREFETCHCOUNT, getPrefetchCountString()));
Expand Down Expand Up @@ -251,6 +255,7 @@ public void loadXML(Node stepnode, List<DatabaseMeta> databases, IMetaStore ims)
try {
setTransactional(XMLHandler.getTagValue(stepnode, FIELD_TRANSACTIONAL));
setBodyField(XMLHandler.getTagValue(stepnode, FIELD_BODY_FIELD));
setContentTypeField(XMLHandler.getTagValue(stepnode, FIELD_CONTENTTYPE_FIELD));
setDeliveryTagField(XMLHandler.getTagValue(stepnode, FIELD_DELIVERYTAG_FIELD));
setRouting(XMLHandler.getTagValue(stepnode, FIELD_ROUTING));
setTarget(XMLHandler.getTagValue(stepnode, FIELD_TARGET));
Expand Down Expand Up @@ -305,6 +310,7 @@ public void readRep(Repository rep, IMetaStore ims, ObjectId idStep, List<Databa
try {
setTransactional(rep.getStepAttributeString(idStep, FIELD_TRANSACTIONAL));
setBodyField(rep.getStepAttributeString(idStep, FIELD_BODY_FIELD));
setContentTypeField(rep.getStepAttributeString(idStep, FIELD_CONTENTTYPE_FIELD));
setDeliveryTagField(rep.getStepAttributeString(idStep, FIELD_DELIVERYTAG_FIELD));
setRouting(rep.getStepAttributeString(idStep, FIELD_ROUTING));
setTarget(rep.getStepAttributeString(idStep, FIELD_TARGET));
Expand Down Expand Up @@ -358,6 +364,7 @@ public void saveRep(Repository rep, IMetaStore ims, ObjectId idTransformation, O
try {
rep.saveStepAttribute(idTransformation, idStep, FIELD_TRANSACTIONAL, isTransactional());
rep.saveStepAttribute(idTransformation, idStep, FIELD_BODY_FIELD, getBodyField());
rep.saveStepAttribute(idTransformation, idStep, FIELD_CONTENTTYPE_FIELD, getContentTypeField());
rep.saveStepAttribute(idTransformation, idStep, FIELD_DELIVERYTAG_FIELD, getDeliveryTagField());
rep.saveStepAttribute(idTransformation, idStep, FIELD_LIMIT, getLimitString());
rep.saveStepAttribute(idTransformation, idStep, FIELD_PREFETCHCOUNT, getPrefetchCountString());
Expand Down Expand Up @@ -408,6 +415,7 @@ public void setDefault()
this.mode = AMQPPluginData.MODE_CONSUMER;
this.bodyField = DEFAULT_BODY_FIELD;
this.deliveryTagField = DEFAULT_DELIVERYTAG_FIELD;
this.contentTypeField = DEFAULT_CONTENTTYPE_FIELD;
this.exchtype = DEFAULT_EXCHTYPE;
this.username = "";
this.password = "";
Expand Down Expand Up @@ -690,6 +698,15 @@ public void setRouting(String routing)
this.routing = routing;
}

public String getContentTypeField()
{
return contentTypeField;
}

public void setContentTypeField(String val)
{
this.contentTypeField = val;
}

public String getDeliveryTagField()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public enum Entry implements StepMetaInjectionEntryInterface {

BODY( ValueMetaInterface.TYPE_STRING, getString("AmqpPlugin.Body.Label") ),
DELIVERYTAG( ValueMetaInterface.TYPE_STRING, getString("AmqpPlugin.DeliveryTag.Label") ),
CONTENTTYPE( ValueMetaInterface.TYPE_STRING, getString("AmqpPlugin.ContentType.Label") ),
TARGET( ValueMetaInterface.TYPE_STRING, getString("AmqpPlugin.Target.Label") ),
ROUTING( ValueMetaInterface.TYPE_STRING, getString("AmqpPlugin.Routing.Label") ),
LIMIT( ValueMetaInterface.TYPE_STRING, getString("AmqpPlugin.Limit.Label") ),
Expand Down Expand Up @@ -108,6 +109,7 @@ public List<StepInjectionMetaEntry> getStepInjectionMetadataEntries() throws Ket
, Entry.AMQP_PORT
, Entry.AMQP_VHOST
, Entry.BODY
, Entry.CONTENTTYPE
, Entry.DELIVERYTAG
, Entry.TARGET
, Entry.ROUTING
Expand Down Expand Up @@ -217,6 +219,9 @@ public void injectStepMetadataEntries( List<StepInjectionMetaEntry> all ) throws
case BODY:
meta.setBodyField( lookValue );
break;
case CONTENTTYPE:
meta.setContentTypeField( lookValue );
break;
case DELIVERYTAG:
meta.setDeliveryTagField( lookValue );
break;
Expand Down Expand Up @@ -321,6 +326,7 @@ public List<StepInjectionMetaEntry> extractStepMetadataEntries() throws KettleEx
list.add( StepInjectionUtil.getEntry( Entry.AMQP_URI, meta.getUri() ) );

list.add( StepInjectionUtil.getEntry( Entry.BODY, meta.getBodyField() ) );
list.add( StepInjectionUtil.getEntry( Entry.CONTENTTYPE, meta.getContentTypeField() ) );
list.add( StepInjectionUtil.getEntry( Entry.DELIVERYTAG, meta.getDeliveryTagField() ) );
list.add( StepInjectionUtil.getEntry( Entry.TARGET, meta.getTarget() ) );
list.add( StepInjectionUtil.getEntry( Entry.ROUTING, meta.getRouting() ) );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ AmqpPlugin.Type.Label=Mode
AmqpPlugin.URI.Label=URI
AmqpPlugin.URIMasked.Label = URI (password masked)
AmqpPlugin.Body.Label=Body Field
AmqpPlugin.ContentType.Label=Content Type Field
AmqpPlugin.DeliveryTag.Label=DeliveryTag Field
AmqpPlugin.Target.Label=Exchange/Queue name
AmqpPlugin.Exchange.Label=Exchange name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,27 @@ protected String getAmqpBody(final Object[] r) throws KettleStepException
return null;
}

protected String getAmqpContentType(final Object[] r) throws KettleStepException
{
if (hasAmqpContentType(r)) {
return (r[data.contentTypeIndex] == null) ? null : r[data.contentTypeIndex].toString();
}

logUndefinedRow(r, "Invalid content type", "ICAmqpPlugin003");

return null;
}

protected boolean hasAmqpRoutingKey(final Object[] r)
{
return rowContains(r, data.routingIndex);
}

protected boolean hasAmqpContentType(final Object[] r)
{
return rowContains(r, data.contentTypeIndex);
}

protected boolean hasAmqpBody(final Object[] r)
{
return rowContains(r, data.bodyFieldIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.instaclick.pentaho.plugin.amqp.AMQPPlugin;
import com.instaclick.pentaho.plugin.amqp.AMQPPluginData;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.AMQP;
import java.io.IOException;
import java.util.List;
import org.pentaho.di.core.exception.KettleStepException;
Expand Down Expand Up @@ -31,12 +32,20 @@ public boolean process(Object[] r) throws KettleStepException, IOException
}

data.body = getAmqpBody(r);
data.content_type = (data.contentTypeIndex != null)
? getAmqpContentType(r)
: "";

data.routing = (data.routingIndex != null)
? getAmqpRoutingKey(r)
: "";

// publish the current message
channel.basicPublish(data.target, data.routing, null, data.body.getBytes());
channel.basicPublish(data.target, data.routing,
new AMQP.BasicProperties.Builder()
.contentType(data.content_type)
.build(),
data.body.getBytes());

// put the row to the output row stream
plugin.putRow(data.outputRowMeta, r);
Expand Down