2022-11-23 16:57:19
Topic starter
My customer has asked me to re-queue hundreds of Messages for re-processing. How can I take existing Messages and do this in bulk, *without* having to click them manually in the Message Queue page and Requeue them manually?
You can use the following groovy script to achieve this.
// Provide any query which returns a list of SYS_MESSAGE_ID of messages which need to be re-queued msgsToRequeueSql = "select sys_message_id from message where id in ('a7f8-6d95-8b49-4b25-8ce3-7059-745b-7e55', 'b0ad-53f0-2b06-4f23-852f-32b4-252c-128b', '2cb7-3fd2-5b62-4bf2-b721-f350-f56f-c346')"; // We will print a checkpoint every N requeues to give you a sense of status summarizeEveryN = 50; import java.util.List; import com.onenetwork.platform.common.Message; import com.onenetwork.platform.common.usercontext.PlatformUserContext; import com.onenetwork.platform.common.usercontext.UserContextService; import com.onenetwork.platform.data.sql.SqlRow; import com.onenetwork.platform.data.sql.SqlService; import com.onenetwork.platform.env.servicelocator.Services; import com.onenetwork.platform.integ.msg.MessageQueueService; import com.transcendsys.platform.server.cache.manager.ValueChainCacheManager; import com.transcendsys.platform.server.model.dma.DirectModelAccess; import com.onenetwork.platform.env.SharedStorageServiceResource; import com.transcendsys.platform.base.storage.MessageStorageService; import com.transcendsys.platform.base.io.StreamUtil; ucs = Services.get(UserContextService.class); mqService = Services.get(MessageQueueService.class); sqlService = Services.get(SqlService.class); ctx = ucs.createDefaultValueChainAdminContext(ValueChainCacheManager.getInstance().getFirstNonZeroVC().getVcId()); def copyPayload(payloadRef, queueName) throws IOException { SharedStorageServiceResource existStorageResource = MessageStorageService.getInstance().getMessageResource(payloadRef); SharedStorageServiceResource newStorageResource = MessageStorageService.getInstance().generateMessageResource( queueName, "requeue_"); StreamUtil.transfer(existStorageResource.openInputStream(), newStorageResource.openOutputStream(), true); return newStorageResource.getPath(); } rows = sqlService.executeQuery(msgsToRequeueSql).getRows(); System.out.println(">>>>>> Found " + rows.size() + " message(s) to be re-queued"); for (j = 0; j < rows.size(); j++) { row = rows.get(j); sysMessageId = row.getLongValue("SYS_MESSAGE_ID"); origMsg = DirectModelAccess.readById(Message.class, sysMessageId, ctx); newMsg = (Message) mqService.newMessage(); newMsg.setClientFileName(origMsg.getClientFileName()); newMsg.setInboundInterface(origMsg.getInboundInterface()); newMsg.setInboundInterfaceVersion(origMsg.getInboundInterfaceVersion()); newMsg.setInboundQueueEnterpriseName(origMsg.getInboundQueueEnterpriseName()); newMsg.setInboundQueueName(origMsg.getInboundQueueName()); if (origMsg.isSetInboundQueueValueChainId()) { newMsg.setInboundQueueValueChainId(origMsg.getInboundQueueValueChainId()); } newMsg.setMetadata(origMsg.getMetadata()); newMsg.setOriginalPayloadRef(origMsg.getOriginalPayloadRef()); newMsg.setOutboundInterface(origMsg.getOutboundInterface()); newMsg.setOutboundInterfaceVersion(origMsg.getOutboundInterfaceVersion()); newMsg.setOutboundQueueEnterpriseName(origMsg.getOutboundQueueEnterpriseName()); newMsg.setOutboundQueueName(origMsg.getOutboundQueueName()); if (origMsg.isSetOutboundQueueValueChainId()) { newMsg.setOutboundQueueValueChainId(origMsg.getOutboundQueueValueChainId()); } newMsg.setOwningQueueEnterpriseName(origMsg.getOwningQueueEnterpriseName()); newMsg.setOwningQueueName(origMsg.getOwningQueueName()); newMsg.setSender(origMsg.getSender()); System.out.println(">>>>>> Requeuing " + origMsg.getId() + " as " + newMsg.getId()); payloadRef = origMsg.getOriginalPayloadRef() != null ? origMsg.getOriginalPayloadRef() : origMsg.getPayloadRef(); newMsg.setPayloadRef(copyPayload(payloadRef, newMsg.getOwningQueueRef().toString())); mqService.enqueue(newMsg, ctx); if ((j+1) % summarizeEveryN == 0) { System.out.println(">>>>>> Finished " + j + " of " + rows.size() + " requeue(s)"); } }