I have an Inbound Asynchronous Pipeline with error handling set up to catch parsing errors. Within the Sink node, these errors come in under the Default/Error schema, which doesn't match the error file schema generated by the SCC.Item_IB. How can I merge these two files together under the SCC.Item_IB error schema using the Error Handling Script within the Sink node?
Default/Error Schema
SCC.Item_IB Error Schema
The most efficient way to handle this scenario is to create a new Script node that accepts Stream of Record within your error-handling path. Here we can map the Default/Error schema to the Inbound Interface error schema. You can create the Inbound Interface error schema by uploading the Inbound Interface error file within a parse node.
Once the Inbound Interface error schema is created, you can use a Script node within your error-handling path to map the values and output a Stream of Records that now use the Inbound Interface error schema
The Script node located within our error-handling path will do the following:
import json def executeNode(inputs): iterable_inputs = {} outputs = {} # Input ports iterable_inputs["Input 1"] = inputs["Input 1"] # Type = stream.record # Record No, Error Message, Raw Record Data # Add node logic here for record in iterable_inputs["Input 1"]: err_obj = json.loads(record['Raw Record Data']) yield { "Output 1": { "ErrorLineNo": record["Record No"], "ErrorDescription": record["Error Message"], "ManagingEntName": err_obj['entName'], "ItemName": err_obj['productName'], "Description": err_obj['description'], "ManufacturerPrice": err_obj["msrp"], "Price": err_obj["msrp"], "Currency": err_obj["currency"], "CaseUPC": err_obj["upc"], "PackageUPC": err_obj["upc"], "Weight": err_obj["weight"], "WeightUOM": err_obj["weightUom"] } } # Activate and set outputs (omit a port to prevent execution of nodes that depend on that port) # Type = stream.record return outputs
At this point, the Output from this Script node can be passed to a Format node to be formatted as a CSV file. This CSV file can then be passed to the Sink node, which will now be receiving two error files using the same schema.
Within the Sink node, we can use our python script to merge both files together. Our python will do the following:
import csv def processErrors(inbound_errors_file, pipeline_errors_file): output_file = '/temp/formattedOutputFile' #Formatted Error output file # Input ports #inbound_errors_file: Errors generated while processing inbound #pipeline_errors_file: Errors generated while executing pipeline nodes # Add node logic here to format with open(output_file, "a+") as merged_file: csv_writer = csv.writer(merged_file, delimiter = ',') with open(inbound_errors_file) as inbound_errors: #Add Inbound Error Rows to Output File, Including Header Row csv_reader = csv.reader(inbound_errors, delimiter=',') for row in csv_reader: csv_writer.writerow(row) with open(pipeline_errors_file) as pipeline_errors: #Add Pipeline Error Rows to Output File, Excluding Header Row csv_reader = csv.reader(pipeline_errors, delimiter=',') pipeline_header = None for row in csv_reader: if not pipeline_header: pipeline_header=row else: csv_writer.writerow(row) merged_file.write('\n') merged_file.write('Error Files Merged') # Return the formatted error output file return output_file
Default/Error schema Stream of Record
SCC.Item Error Schema Stream of Record
Final CSV Error File