Notifications
Clear all

[Solved] How can I use NEO Plasma to merge two separate Stream of Records into a single Stream of Record?

1 Posts
1 Users
1 Likes
5,333 Views
0
Topic starter

Within NEO Plasma, I have a synchronous pipeline that needs to merge the errors generated from the Parse node with the errors generated from the Inbound Sync node that occurs during inbound processing. How can I accomplish this merging of records into a single output?

1 Answer
1
Topic starter

The python script below will merge two Stream of Record inputs into a single Stream of Record output. A simple breakdown of the script is below:

  •  Import the "json" module
  • Create iterable_inputs variable for both "Input 1" and "Input 2"
  • Create count variable to track record number
  • Iterate over all elements of iterable_inputs["Input 1"]
  • Load 'Raw Record Data' into err_obj
  • Map values from Input 1 schema to our Output 1 schema
    • Error Description = Error Message
  • yield err_obj to Output 1 if err_obj isn't empty
  • Iterate over all elements of iterable_inputs["Input 2"]
  • Map values from Input 2 schema to our Output 1 schema
    • Error Description = ErrorDescription
  • yield err_obj to Output 1 if err_obj isn't empty
import json

def executeNode(inputs):
  iterable_inputs = {}
  outputs = {}
  
  # Input ports
  
  iterable_inputs["Input 1"] = inputs["Input 1"]
  # Type = stream.record
  # Record No, Error Message, Raw Record Data
  
  iterable_inputs["Input 2"] = inputs["Input 2"]
  # Type = stream.record
  # ErrorLineNo, ErrorDescription, ManagingEntName, ItemName, Description
  
  # Add node logic here
  count = 1
  for ele in iterable_inputs["Input 1"]:
    err_obj = json.loads(ele['Raw Record Data'])
    err_obj['Error No'] = count
    err_obj['Error Description'] = ele['Error Message']
    err_obj['Enterprise'] = err_obj['entName']
    err_obj['Item Name'] = err_obj['productName']
    err_obj['Description'] = err_obj['description']
    
    count += 1

 if len(err_obj) != 0:
     yield { "Output 1": err_obj }

  for ele in iterable_inputs["Input 2"]:
    err_obj = {}
    err_obj['Error No'] = count
    err_obj['Error Description'] = ele['ErrorDescription']
    err_obj['Enterprise'] = ele['ManagingEntName']
    err_obj['Item Name'] = ele['ItemName']
    err_obj['Description'] = ele['Description']
    print(err_obj)
    count += 1

  if len(err_obj) != 0:
     yield { "Output 1": err_obj }
  
  # Activate and set outputs (omit a port to prevent execution of nodes that depend on that port)
  
  # Type = stream.record
  # Error No, Error Description, Enterprise, Item Name, Raw Data
  
  return outputs

 

With the above script, we have taken two Stream of Record inputs which used different schemas and merged them into a single Stream of Record Output. Below are example screenshots of this example:

Input 1 Stream of Record

image

Input 2 Stream of Record

image

Output 1 Stream of Record

image