Setting an attribute with a dynamic property name

Blog

Setting an attribute with a dynamic property name

  • 28 April, 2021
  • By Dave Cassel
  • No Comments
blog-image

The NiFi EvaluateJsonPath processor lets you specify a JSON path, which will be applied to the content of a flow file. For most situations that works great, but what if you want to retrieve a value where the property you want to read depends on a flow file attribute? EvaluateJsonPath doesn’t support the expression language on dynamic attributes.

As an example, suppose I have a flow that gets JSON content with different structures. Somewhere in my flow I’ll set an attribute saying which JSON property to read to get a value. We won’t be able to use EvaluateJsonPath, since the path has to be static. ExecuteScript to the rescue!

The ExecuteScript processor is a great way to handle situations that the out-of-the-box processors don’t support. (You also have the option to write a custom processor, but that’s much more complicated.)

Here’s my setup for testing:

The GenerateFlowFile processor has some JSON for the content:

{
  "keyProperty": 1,
  "anotherKeyProperty": "some value"
}

It also sets two attributes:

  • propName with the value of “keyProperty”
  • anotherProp with the value of “anotherKeyProperty”

The ExecuteScript processor uses ECMAScript and the following code:

/* Add objects to the mapping to indicate which attribute is to 
 * be read (sourceAttribute) and where that value is to be 
 * written (targetAttribute). 
 */
var mapping = [
  { 
    sourceAttribute: 'propName', 
    targetAttribute: 'myNewProperty' 
  },
  { 
    sourceAttribute: 'anotherProp', 
    targetAttribute: 'someOtherProperty' 
  }
]; 


var InputStreamCallback =    
  Java.type("org.apache.nifi.processor.io.InputStreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = 
  Java.type("java.nio.charset.StandardCharsets");

var flowFile = session.get();
if (flowFile != null) {
  try {
    var json = null;
    // Create a new InputStreamCallback, passing in a function
    // to define the interface method
    session.read(flowFile,
      new InputStreamCallback(function(inputStream) {
        json = JSON.parse(IOUtils.toString(inputStream, StandardCharsets.UTF_8));
      })
    );
    var attrMap = {};
    mapping.forEach(function(attr) {
      var value = json[flowFile.getAttribute(attr.sourceAttribute)];
      attrMap[attr.targetAttribute] = value ? value.toString() : "";
    });
    flowFile = session.putAllAttributes(flowFile, attrMap);
    session.transfer(flowFile, REL_SUCCESS);
  }
  catch (e) { 
    flowFile = session.putAttribute(flowFile, 'dynamicAttributeError', e); 
    session.transfer(flowFile, REL_FAILURE);
  }
}

The mappings array in the first few lines is how you configure the processor, defining what attributes should be set.

Share this post:

quote
The NiFi EvaluateJsonPath processor lets you specify a JSON path, which will be applied to the content of a flow...

4V Services works with development teams to boost their knowledge and capabilities. Contact us today to talk about how we can help you succeed!

0 0 votes
Article Rating
Subscribe
Notify of
0 Comments
Inline Feedbacks
View all comments
cta-bg

Looking Forward to Building a Partnership!

Let's discuss how we can help your organization