Setting an attribute with a dynamic property name

Blog

Setting an attribute with a dynamic property name

  • 28 April, 2021
  • By Dave Cassel
  • No Comments
blog-image

The NiFi EvaluateJsonPath processor lets you specify a JSON path, which will be applied to the content of a flow file. For most situations that works great, but what if you want to retrieve a value where the property you want to read depends on a flow file attribute? EvaluateJsonPath doesn’t support the expression language on dynamic attributes. As an example, suppose I have a flow that gets JSON content with different structures. Somewhere in my flow I’ll set an attribute saying which JSON property to read to get a value. We won’t be able to use EvaluateJsonPath, since the path has to be static. ExecuteScript to the rescue! The ExecuteScript processor is a great way to handle situations that the out-of-the-box processors don’t support. (You also have the option to write a custom processor, but that’s much more complicated.) Here’s my setup for testing:

The GenerateFlowFile processor has some JSON for the content:
{
  "keyProperty": 1,
  "anotherKeyProperty": "some value"
}
It also sets two attributes:
  • propName with the value of “keyProperty”
  • anotherProp with the value of “anotherKeyProperty”
The ExecuteScript processor uses ECMAScript and the following code:
/* Add objects to the mapping to indicate which attribute is to 
 * be read (sourceAttribute) and where that value is to be 
 * written (targetAttribute). 
 */
var mapping = [
  { 
    sourceAttribute: 'propName', 
    targetAttribute: 'myNewProperty' 
  },
  { 
    sourceAttribute: 'anotherProp', 
    targetAttribute: 'someOtherProperty' 
  }
]; 


var InputStreamCallback =    
  Java.type("org.apache.nifi.processor.io.InputStreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = 
  Java.type("java.nio.charset.StandardCharsets");

var flowFile = session.get();
if (flowFile != null) {
  try {
    var json = null;
    // Create a new InputStreamCallback, passing in a function  // to define the interface method  session.read(flowFile,    new InputStreamCallback(function(inputStream) {      json = JSON.parse(IOUtils.toString(inputStream, StandardCharsets.UTF_8));   }) );   var attrMap = {};   mapping.forEach(function(attr) {       var value = json[flowFile.getAttribute(attr.sourceAttribute)];       attrMap[attr.targetAttribute] = value ? value.toString() : "";  });     flowFile = session.putAllAttributes(flowFile, attrMap);    session.transfer(flowFile, REL_SUCCESS); } catch (e) { flowFile = session.putAttribute(flowFile, 'dynamicAttributeError', e); session.transfer(flowFile, REL_FAILURE); } } 
The mappings array in the first few lines is how you configure the processor, defining what attributes should be set.

quote
The NiFi EvaluateJsonPath processor lets you specify a JSON path, which will be applied to the content of a flow...
0 0 votes
Article Rating
Subscribe
Notify of
0 Comments
Inline Feedbacks
View all comments
cta-bg

Looking Forward to Building a Partnership!

Let's discuss how we can help your organization