Setting an attribute with a dynamic property name

The NiFi EvaluateJsonPath processor lets you specify a JSON path, which will be applied to the content of a flow file. For most situations that works great, but what if you want to retrieve a value where the property you want to read depends on a flow file attribute? EvaluateJsonPath doesn’t support the expression language on dynamic attributes.

As an example, suppose I have a flow that gets JSON content with different structures. Somewhere in my flow I’ll set an attribute saying which JSON property to read to get a value. We won’t be able to use EvaluateJsonPath, since the path has to be static. ExecuteScript to the rescue!

The ExecuteScript processor is a great way to handle situations that the out-of-the-box processors don’t support. (You also have the option to write a custom processor, but that’s much more complicated.)

Here’s my setup for testing:

The GenerateFlowFile processor has some JSON for the content:

{
  "keyProperty": 1,
  "anotherKeyProperty": "some value"
}

It also sets two attributes:

  • propName with the value of “keyProperty”
  • anotherProp with the value of “anotherKeyProperty”

The ExecuteScript processor uses ECMAScript and the following code:

/* Add objects to the mapping to indicate which attribute is to 
 * be read (sourceAttribute) and where that value is to be 
 * written (targetAttribute). 
 */
var mapping = [
  { 
    sourceAttribute: 'propName', 
    targetAttribute: 'myNewProperty' 
  },
  { 
    sourceAttribute: 'anotherProp', 
    targetAttribute: 'someOtherProperty' 
  }
]; 


var InputStreamCallback =    
  Java.type("org.apache.nifi.processor.io.InputStreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = 
  Java.type("java.nio.charset.StandardCharsets");

var flowFile = session.get();
if (flowFile != null) {
  try {
    var json = null;
    // Create a new InputStreamCallback, passing in a function 
    // to define the interface method 
    session.read(flowFile,
      new InputStreamCallback(function(inputStream) {
        json = JSON.parse(IOUtils.toString(inputStream, StandardCharsets.UTF_8));
      })
    );

    var attrMap = {};
    mapping.forEach(function(attr) {
      var value = json[flowFile.getAttribute(attr.sourceAttribute)];
      attrMap[attr.targetAttribute] = value ? value.toString() : ""; 
    });
    flowFile = session.putAllAttributes(flowFile, attrMap); 
    session.transfer(flowFile, REL_SUCCESS);
  } catch (e) {
    flowFile = session.putAttribute(flowFile, 'dynamicAttributeError', e);
    session.transfer(flowFile, REL_FAILURE);
  }
}

The mappings array in the first few lines is how you configure the processor, defining what attributes should be set.

Hat tip to mburgess for his ExecuteScript Cookbook posts!

Leave a Reply

Your email address will not be published. Required fields are marked *