Within NEO Plasma, I have a synchronous pipeline that needs to merge the errors generated from the Parse node with the errors generated from the Inbound Sync node that occurs during inbound processing. How can I accomplish this merging of records into a single output?
The python script below will merge two Stream of Record inputs into a single Stream of Record output. A simple breakdown of the script is below:
import json
def executeNode(inputs):
iterable_inputs = {}
outputs = {}
# Input ports
iterable_inputs["Input 1"] = inputs["Input 1"]
# Type = stream.record
# Record No, Error Message, Raw Record Data
iterable_inputs["Input 2"] = inputs["Input 2"]
# Type = stream.record
# ErrorLineNo, ErrorDescription, ManagingEntName, ItemName, Description
# Add node logic here
count = 1
for ele in iterable_inputs["Input 1"]:
err_obj = json.loads(ele['Raw Record Data'])
err_obj['Error No'] = count
err_obj['Error Description'] = ele['Error Message']
err_obj['Enterprise'] = err_obj['entName']
err_obj['Item Name'] = err_obj['productName']
err_obj['Description'] = err_obj['description']
count += 1
if len(err_obj) != 0:
yield { "Output 1": err_obj }
for ele in iterable_inputs["Input 2"]:
err_obj = {}
err_obj['Error No'] = count
err_obj['Error Description'] = ele['ErrorDescription']
err_obj['Enterprise'] = ele['ManagingEntName']
err_obj['Item Name'] = ele['ItemName']
err_obj['Description'] = ele['Description']
print(err_obj)
count += 1
if len(err_obj) != 0:
yield { "Output 1": err_obj }
# Activate and set outputs (omit a port to prevent execution of nodes that depend on that port)
# Type = stream.record
# Error No, Error Description, Enterprise, Item Name, Raw Data
return outputs
With the above script, we have taken two Stream of Record inputs which used different schemas and merged them into a single Stream of Record Output. Below are example screenshots of this example:
Input 1 Stream of Record
Input 2 Stream of Record
Output 1 Stream of Record