Notifications
Clear all

[Solved] How can I use NEO Plasma to merge Pipeline Errors and Inbound Errors under the Inbound Interface Error Schema?

1 Posts
1 Users
2 Likes
10.8 K Views
1
Topic starter

I have an Inbound Asynchronous Pipeline with error handling set up to catch parsing errors. Within the Sink node, these errors come in under the Default/Error schema, which doesn't match the error file schema generated by the SCC.Item_IB. How can I merge these two files together under the SCC.Item_IB error schema using the Error Handling Script within the Sink node?

Default/Error Schema

image

 

SCC.Item_IB Error Schema

image

 

1 Answer
1
Topic starter

The most efficient way to handle this scenario is to create a new Script node that accepts Stream of Record within your error-handling path. Here we can map the Default/Error schema to the Inbound Interface error schema. You can create the Inbound Interface error schema by uploading the Inbound Interface error file within a parse node.

Once the Inbound Interface error schema is created, you can use a Script node within your error-handling path to map the values and output a Stream of Records that now use the Inbound Interface error schema

image

The Script node located within our error-handling path will do the following:

  • Iterate over Default/Error schema records coming from Input 1
  • Pull Raw Record Data into err_obj variable
  • Yield Output 1, mapping values from Default/Error schema to our SCC.Item_IB Error schema
    • record["Record No"] -> ErrorLineNo
    • err_obj['entName'] -> ManagingEntName
import json

def executeNode(inputs):
  iterable_inputs = {}
  outputs = {}
  
  # Input ports
  
  iterable_inputs["Input 1"] = inputs["Input 1"]
  # Type = stream.record
  # Record No, Error Message, Raw Record Data
  
  # Add node logic here
  for record in iterable_inputs["Input 1"]:
    err_obj = json.loads(record['Raw Record Data'])
    yield {
      "Output 1": {
        "ErrorLineNo": record["Record No"],
        "ErrorDescription": record["Error Message"],
        "ManagingEntName": err_obj['entName'],
        "ItemName": err_obj['productName'],
        "Description": err_obj['description'],
        "ManufacturerPrice": err_obj["msrp"],
        "Price": err_obj["msrp"],
        "Currency": err_obj["currency"],
        "CaseUPC": err_obj["upc"],
        "PackageUPC": err_obj["upc"],
        "Weight": err_obj["weight"],
        "WeightUOM": err_obj["weightUom"]
      }
    }
  
  # Activate and set outputs (omit a port to prevent execution of nodes that depend on that port)

  # Type = stream.record
  
  return outputs

 

At this point, the Output from this Script node can be passed to a Format node to be formatted as a CSV file. This CSV file can then be passed to the Sink node, which will now be receiving two error files using the same schema.

image

 

Within the Sink node, we can use our python script to merge both files together. Our python will do the following:

  • Create an output_file
  • Open output_file
  • Create csv_wrtier
  • Open our inbound errors file
  • Create a csv_reader to read our inbound errors file
  • For every row within our inbound errors file, write that row to our file
    • This includes our Column Header row
  • Open pipeline error file
  • Create a csv_reader for our pipeline errors file
  • For every row except the column header row, write that row to our file
  • Add "Error Files Merged" line at end of file
import csv

def processErrors(inbound_errors_file, pipeline_errors_file):
  output_file = '/temp/formattedOutputFile'  #Formatted Error output file
  
  # Input ports
  #inbound_errors_file: Errors generated while processing inbound
  #pipeline_errors_file: Errors generated while executing pipeline nodes
  
  # Add node logic here to format
  with open(output_file, "a+") as merged_file:
    csv_writer = csv.writer(merged_file, delimiter = ',')
    with open(inbound_errors_file) as inbound_errors: #Add Inbound Error Rows to Output File, Including Header Row
      csv_reader = csv.reader(inbound_errors, delimiter=',')
      for row in csv_reader:
        csv_writer.writerow(row)
    with open(pipeline_errors_file) as pipeline_errors: #Add Pipeline Error Rows to Output File, Excluding Header Row
      csv_reader = csv.reader(pipeline_errors, delimiter=',')
      pipeline_header = None
      for row in csv_reader:
        if not pipeline_header:
          pipeline_header=row
        else:
          csv_writer.writerow(row)
    merged_file.write('\n')
    merged_file.write('Error Files Merged') 
  
  # Return the formatted error output file
  
  return output_file

 

Default/Error schema Stream of Record

image

SCC.Item Error Schema Stream of Record

image

Final CSV Error File

image