Notifications
Clear all

How can I requeue a large quantity of Messages?

1 Posts
1 Users
1 Likes
533 Views
0
Topic starter

My customer has asked me to re-queue hundreds of Messages for re-processing.  How can I take existing Messages and do this in bulk, *without* having to click them manually in the Message Queue page and Requeue them manually?

1 Answer
1
Topic starter

You can use the following groovy script to achieve this.

 

// Provide any query which returns a list of SYS_MESSAGE_ID of messages which need to be re-queued
msgsToRequeueSql = "select sys_message_id from message where id in ('a7f8-6d95-8b49-4b25-8ce3-7059-745b-7e55', 'b0ad-53f0-2b06-4f23-852f-32b4-252c-128b', '2cb7-3fd2-5b62-4bf2-b721-f350-f56f-c346')";
// We will print a checkpoint every N requeues to give you a sense of status
summarizeEveryN = 50;

import java.util.List;

import com.onenetwork.platform.common.Message;
import com.onenetwork.platform.common.usercontext.PlatformUserContext;
import com.onenetwork.platform.common.usercontext.UserContextService;
import com.onenetwork.platform.data.sql.SqlRow;
import com.onenetwork.platform.data.sql.SqlService;
import com.onenetwork.platform.env.servicelocator.Services;
import com.onenetwork.platform.integ.msg.MessageQueueService;
import com.transcendsys.platform.server.cache.manager.ValueChainCacheManager;
import com.transcendsys.platform.server.model.dma.DirectModelAccess;
import com.onenetwork.platform.env.SharedStorageServiceResource;
import com.transcendsys.platform.base.storage.MessageStorageService;
import com.transcendsys.platform.base.io.StreamUtil;

ucs = Services.get(UserContextService.class);
mqService = Services.get(MessageQueueService.class);
sqlService = Services.get(SqlService.class);

ctx = ucs.createDefaultValueChainAdminContext(ValueChainCacheManager.getInstance().getFirstNonZeroVC().getVcId());

def copyPayload(payloadRef, queueName) throws IOException {
  SharedStorageServiceResource existStorageResource = MessageStorageService.getInstance().getMessageResource(payloadRef);
  SharedStorageServiceResource newStorageResource = MessageStorageService.getInstance().generateMessageResource(
    queueName,
    "requeue_");
    
  StreamUtil.transfer(existStorageResource.openInputStream(), newStorageResource.openOutputStream(), true);
  return newStorageResource.getPath();
}
   

rows = sqlService.executeQuery(msgsToRequeueSql).getRows();
System.out.println(">>>>>> Found " + rows.size() + " message(s) to be re-queued");
for (j = 0; j < rows.size(); j++) {
  row = rows.get(j);
  sysMessageId = row.getLongValue("SYS_MESSAGE_ID");
  origMsg = DirectModelAccess.readById(Message.class, sysMessageId, ctx);
  newMsg = (Message) mqService.newMessage();
  newMsg.setClientFileName(origMsg.getClientFileName());
  newMsg.setInboundInterface(origMsg.getInboundInterface());
  newMsg.setInboundInterfaceVersion(origMsg.getInboundInterfaceVersion());
  newMsg.setInboundQueueEnterpriseName(origMsg.getInboundQueueEnterpriseName());
  newMsg.setInboundQueueName(origMsg.getInboundQueueName());
  if (origMsg.isSetInboundQueueValueChainId()) {
    newMsg.setInboundQueueValueChainId(origMsg.getInboundQueueValueChainId());
  }
  newMsg.setMetadata(origMsg.getMetadata());
  newMsg.setOriginalPayloadRef(origMsg.getOriginalPayloadRef());
  newMsg.setOutboundInterface(origMsg.getOutboundInterface());
  newMsg.setOutboundInterfaceVersion(origMsg.getOutboundInterfaceVersion());
  newMsg.setOutboundQueueEnterpriseName(origMsg.getOutboundQueueEnterpriseName());
  newMsg.setOutboundQueueName(origMsg.getOutboundQueueName());
  if (origMsg.isSetOutboundQueueValueChainId()) {
    newMsg.setOutboundQueueValueChainId(origMsg.getOutboundQueueValueChainId());
  }
  newMsg.setOwningQueueEnterpriseName(origMsg.getOwningQueueEnterpriseName());
  newMsg.setOwningQueueName(origMsg.getOwningQueueName());
  newMsg.setSender(origMsg.getSender());
  System.out.println(">>>>>> Requeuing " + origMsg.getId() + " as " + newMsg.getId());

  payloadRef = origMsg.getOriginalPayloadRef() != null ? origMsg.getOriginalPayloadRef() : origMsg.getPayloadRef();
        
  newMsg.setPayloadRef(copyPayload(payloadRef, newMsg.getOwningQueueRef().toString()));

  mqService.enqueue(newMsg, ctx);
  
  if ((j+1) % summarizeEveryN == 0) {
    System.out.println(">>>>>> Finished " + j + " of " + rows.size() + " requeue(s)");
  }
}