Within NEO Plasma, I have a synchronous pipeline that needs to merge the errors generated from the Parse node with the errors generated from the Inbound Sync node that occurs during inbound processing. How can I accomplish this merging of records into a single output?
The python script below will merge two Stream of Record inputs into a single Stream of Record output. A simple breakdown of the script is below:
import json def executeNode(inputs): iterable_inputs = {} outputs = {} # Input ports iterable_inputs["Input 1"] = inputs["Input 1"] # Type = stream.record # Record No, Error Message, Raw Record Data iterable_inputs["Input 2"] = inputs["Input 2"] # Type = stream.record # ErrorLineNo, ErrorDescription, ManagingEntName, ItemName, Description # Add node logic here count = 1 for ele in iterable_inputs["Input 1"]: err_obj = json.loads(ele['Raw Record Data']) err_obj['Error No'] = count err_obj['Error Description'] = ele['Error Message'] err_obj['Enterprise'] = err_obj['entName'] err_obj['Item Name'] = err_obj['productName'] err_obj['Description'] = err_obj['description'] count += 1 if len(err_obj) != 0: yield { "Output 1": err_obj } for ele in iterable_inputs["Input 2"]: err_obj = {} err_obj['Error No'] = count err_obj['Error Description'] = ele['ErrorDescription'] err_obj['Enterprise'] = ele['ManagingEntName'] err_obj['Item Name'] = ele['ItemName'] err_obj['Description'] = ele['Description'] print(err_obj) count += 1 if len(err_obj) != 0: yield { "Output 1": err_obj } # Activate and set outputs (omit a port to prevent execution of nodes that depend on that port) # Type = stream.record # Error No, Error Description, Enterprise, Item Name, Raw Data return outputs
With the above script, we have taken two Stream of Record inputs which used different schemas and merged them into a single Stream of Record Output. Below are example screenshots of this example:
Input 1 Stream of Record
Input 2 Stream of Record
Output 1 Stream of Record