pull:初次提交

This commit is contained in:
Yep_Q
2025-09-08 04:48:28 +08:00
parent 5c0619656d
commit f64f498365
11751 changed files with 1953723 additions and 0 deletions

View File

@@ -0,0 +1,103 @@
import type { INodeProperties, INodePropertyCollection, INodePropertyOptions } from 'n8n-workflow';
export const rabbitDefaultOptions: Array<
INodePropertyOptions | INodeProperties | INodePropertyCollection
> = [
{
displayName: 'Arguments',
name: 'arguments',
placeholder: 'Add Argument',
description: 'Arguments to add',
type: 'fixedCollection',
typeOptions: {
multipleValues: true,
},
default: {},
options: [
{
name: 'argument',
displayName: 'Argument',
values: [
{
displayName: 'Key',
name: 'key',
type: 'string',
default: '',
},
{
displayName: 'Value',
name: 'value',
type: 'string',
default: '',
},
],
},
],
},
{
displayName: 'Headers',
name: 'headers',
placeholder: 'Add Header',
description: 'Headers to add',
type: 'fixedCollection',
typeOptions: {
multipleValues: true,
},
default: {},
options: [
{
name: 'header',
displayName: 'Header',
values: [
{
displayName: 'Key',
name: 'key',
type: 'string',
default: '',
},
{
displayName: 'Value',
name: 'value',
type: 'string',
default: '',
},
],
},
],
},
{
displayName: 'Auto Delete Queue',
name: 'autoDelete',
type: 'boolean',
default: false,
description: 'Whether the queue will be deleted when the number of consumers drops to zero',
},
{
displayName: 'Assert Exchange',
name: 'assertExchange',
type: 'boolean',
default: true,
description: 'Whether to assert the exchange exists before sending',
},
{
displayName: 'Assert Queue',
name: 'assertQueue',
type: 'boolean',
default: true,
description: 'Whether to assert the queue exists before sending',
},
{
displayName: 'Durable',
name: 'durable',
type: 'boolean',
default: true,
description: 'Whether the queue will survive broker restarts',
},
{
displayName: 'Exclusive',
name: 'exclusive',
type: 'boolean',
default: false,
description: 'Whether to scope the queue to the connection',
},
];

View File

@@ -0,0 +1,269 @@
import * as amqplib from 'amqplib';
import type {
IDeferredPromise,
IExecuteResponsePromiseData,
IDataObject,
IExecuteFunctions,
INodeExecutionData,
IRun,
ITriggerFunctions,
} from 'n8n-workflow';
import { jsonParse, sleep } from 'n8n-workflow';
import { formatPrivateKey } from '@utils/utilities';
import type { ExchangeType, Options, RabbitMQCredentials, TriggerOptions } from './types';
const credentialKeys = ['hostname', 'port', 'username', 'password', 'vhost'] as const;
export async function rabbitmqConnect(
credentials: RabbitMQCredentials,
): Promise<amqplib.Connection> {
const credentialData = credentialKeys.reduce((acc, key) => {
acc[key] = credentials[key] === '' ? undefined : credentials[key];
return acc;
}, {} as IDataObject) as amqplib.Options.Connect;
const optsData: IDataObject = {};
if (credentials.ssl) {
credentialData.protocol = 'amqps';
optsData.ca =
credentials.ca === '' ? undefined : [Buffer.from(formatPrivateKey(credentials.ca))];
if (credentials.passwordless) {
optsData.cert =
credentials.cert === '' ? undefined : Buffer.from(formatPrivateKey(credentials.cert));
optsData.key =
credentials.key === '' ? undefined : Buffer.from(formatPrivateKey(credentials.key));
optsData.passphrase = credentials.passphrase === '' ? undefined : credentials.passphrase;
optsData.credentials = amqplib.credentials.external();
}
}
return await amqplib.connect(credentialData, optsData);
}
export async function rabbitmqCreateChannel(
this: IExecuteFunctions | ITriggerFunctions,
): Promise<amqplib.Channel> {
const credentials = await this.getCredentials<RabbitMQCredentials>('rabbitmq');
return await new Promise(async (resolve, reject) => {
try {
const connection = await rabbitmqConnect(credentials);
// TODO: why is this error handler being added here?
connection.on('error', reject);
const channel = await connection.createChannel();
resolve(channel);
} catch (error) {
reject(error);
}
});
}
export async function rabbitmqConnectQueue(
this: IExecuteFunctions | ITriggerFunctions,
queue: string,
options: Options | TriggerOptions,
): Promise<amqplib.Channel> {
const channel = await rabbitmqCreateChannel.call(this);
return await new Promise(async (resolve, reject) => {
try {
if (options.assertQueue) {
await channel.assertQueue(queue, options);
} else {
await channel.checkQueue(queue);
}
if ('binding' in options && options.binding?.bindings.length) {
options.binding.bindings.forEach(async (binding) => {
await channel.bindQueue(queue, binding.exchange, binding.routingKey);
});
}
resolve(channel);
} catch (error) {
reject(error);
}
});
}
export async function rabbitmqConnectExchange(
this: IExecuteFunctions | ITriggerFunctions,
exchange: string,
options: Options | TriggerOptions,
): Promise<amqplib.Channel> {
const exchangeType = this.getNodeParameter('exchangeType', 0) as ExchangeType;
const channel = await rabbitmqCreateChannel.call(this);
return await new Promise(async (resolve, reject) => {
try {
if (options.assertExchange) {
await channel.assertExchange(exchange, exchangeType, options);
} else {
await channel.checkExchange(exchange);
}
resolve(channel);
} catch (error) {
reject(error);
}
});
}
export class MessageTracker {
messages: number[] = [];
isClosing = false;
received(message: amqplib.Message) {
this.messages.push(message.fields.deliveryTag);
}
answered(message: amqplib.Message) {
if (this.messages.length === 0) {
return;
}
const index = this.messages.findIndex((value) => value !== message.fields.deliveryTag);
this.messages.splice(index);
}
unansweredMessages() {
return this.messages.length;
}
async closeChannel(channel: amqplib.Channel, consumerTag?: string) {
if (this.isClosing) {
return;
}
this.isClosing = true;
// Do not accept any new messages
if (consumerTag) {
await channel.cancel(consumerTag);
}
let count = 0;
let unansweredMessages = this.unansweredMessages();
// Give currently executing messages max. 5 minutes to finish before
// the channel gets closed. If we would not do that, it would not be possible
// to acknowledge messages anymore for which the executions were already running
// when for example a new version of the workflow got saved. That would lead to
// them getting delivered and processed again.
while (unansweredMessages !== 0 && count++ <= 300) {
await sleep(1000);
unansweredMessages = this.unansweredMessages();
}
await channel.close();
await channel.connection.close();
}
}
export const parsePublishArguments = (options: Options) => {
const additionalArguments: IDataObject = {};
if (options.arguments?.argument.length) {
options.arguments.argument.forEach((argument) => {
additionalArguments[argument.key] = argument.value;
});
}
return additionalArguments as amqplib.Options.Publish;
};
export const parseMessage = async (
message: amqplib.Message,
options: TriggerOptions,
helpers: ITriggerFunctions['helpers'],
): Promise<INodeExecutionData> => {
if (options.contentIsBinary) {
const { content } = message;
message.content = undefined as unknown as Buffer;
return {
binary: {
data: await helpers.prepareBinaryData(content),
},
json: message as unknown as IDataObject,
};
} else {
let content: IDataObject | string = message.content.toString();
if ('jsonParseBody' in options && options.jsonParseBody) {
content = jsonParse(content);
}
if ('onlyContent' in options && options.onlyContent) {
return { json: content as IDataObject };
} else {
message.content = content as unknown as Buffer;
return { json: message as unknown as IDataObject };
}
}
};
export async function handleMessage(
this: ITriggerFunctions,
message: amqplib.Message,
channel: amqplib.Channel,
messageTracker: MessageTracker,
acknowledgeMode: string,
options: TriggerOptions,
) {
try {
if (acknowledgeMode !== 'immediately') {
messageTracker.received(message);
}
const item = await parseMessage(message, options, this.helpers);
let responsePromise: IDeferredPromise<IRun> | undefined = undefined;
let responsePromiseHook: IDeferredPromise<IExecuteResponsePromiseData> | undefined = undefined;
if (acknowledgeMode !== 'immediately' && acknowledgeMode !== 'laterMessageNode') {
responsePromise = this.helpers.createDeferredPromise();
} else if (acknowledgeMode === 'laterMessageNode') {
responsePromiseHook = this.helpers.createDeferredPromise<IExecuteResponsePromiseData>();
}
if (responsePromiseHook) {
this.emit([[item]], responsePromiseHook, undefined);
} else {
this.emit([[item]], undefined, responsePromise);
}
if (responsePromise && acknowledgeMode !== 'laterMessageNode') {
// Acknowledge message after the execution finished
await responsePromise.promise.then(async (data: IRun) => {
if (data.data.resultData.error) {
// The execution did fail
if (acknowledgeMode === 'executionFinishesSuccessfully') {
channel.nack(message);
messageTracker.answered(message);
return;
}
}
channel.ack(message);
messageTracker.answered(message);
});
} else if (responsePromiseHook && acknowledgeMode === 'laterMessageNode') {
await responsePromiseHook.promise.then(() => {
channel.ack(message);
messageTracker.answered(message);
});
} else {
// Acknowledge message directly
channel.ack(message);
}
} catch (error) {
const workflow = this.getWorkflow();
const node = this.getNode();
if (acknowledgeMode !== 'immediately') {
messageTracker.answered(message);
}
this.logger.error(
`There was a problem with the RabbitMQ Trigger node "${node.name}" in workflow "${workflow.id}": "${error.message}"`,
{
node: node.name,
workflowId: workflow.id,
},
);
}
}

View File

@@ -0,0 +1,18 @@
{
"node": "n8n-nodes-base.rabbitmq",
"nodeVersion": "1.0",
"codexVersion": "1.0",
"categories": ["Development"],
"resources": {
"credentialDocumentation": [
{
"url": "https://docs.n8n.io/integrations/builtin/credentials/rabbitMQ/"
}
],
"primaryDocumentation": [
{
"url": "https://docs.n8n.io/integrations/builtin/app-nodes/n8n-nodes-base.rabbitmq/"
}
]
}
}

View File

@@ -0,0 +1,550 @@
/* eslint-disable n8n-nodes-base/node-filename-against-convention */
import type * as amqplib from 'amqplib';
import type {
IExecuteFunctions,
ICredentialsDecrypted,
ICredentialTestFunctions,
IDataObject,
INodeCredentialTestResult,
INodeExecutionData,
INodeType,
INodeTypeDescription,
JsonObject,
} from 'n8n-workflow';
import { NodeApiError, NodeConnectionTypes, NodeOperationError } from 'n8n-workflow';
import {
parsePublishArguments,
rabbitmqConnect,
rabbitmqConnectExchange,
rabbitmqConnectQueue,
} from './GenericFunctions';
import type { Options, RabbitMQCredentials } from './types';
export class RabbitMQ implements INodeType {
description: INodeTypeDescription = {
displayName: 'RabbitMQ',
name: 'rabbitmq',
icon: 'file:rabbitmq.svg',
group: ['transform'],
version: [1, 1.1],
description: 'Sends messages to a RabbitMQ topic',
defaults: {
name: 'RabbitMQ',
},
usableAsTool: true,
inputs: [NodeConnectionTypes.Main],
outputs: [NodeConnectionTypes.Main],
credentials: [
{
name: 'rabbitmq',
required: true,
testedBy: 'rabbitmqConnectionTest',
},
],
properties: [
{
displayName: 'Operation',
name: 'operation',
type: 'hidden',
noDataExpression: true,
default: 'sendMessage',
displayOptions: {
show: {
'@version': [1],
},
},
// To remove when action view is fixed
options: [
{
name: 'Send a Message to RabbitMQ',
value: 'sendMessage',
action: 'Send a Message to RabbitMQ',
},
{
name: 'Delete From Queue',
value: 'deleteMessage',
action: 'Delete From Queue',
},
],
},
{
displayName: 'Operation',
name: 'operation',
type: 'options',
noDataExpression: true,
default: 'sendMessage',
displayOptions: {
show: {
'@version': [1.1],
},
},
options: [
{
name: 'Send a Message to RabbitMQ',
value: 'sendMessage',
action: 'Send a Message to RabbitMQ',
},
{
name: 'Delete From Queue',
value: 'deleteMessage',
action: 'Delete From Queue',
},
],
},
{
displayName:
'Will delete an item from the queue triggered earlier in the workflow by a RabbitMQ Trigger node',
name: 'deleteMessage',
type: 'notice',
default: '',
displayOptions: {
show: {
operation: ['deleteMessage'],
},
},
},
{
displayName: 'Mode',
name: 'mode',
type: 'options',
displayOptions: {
hide: {
operation: ['deleteMessage'],
},
},
options: [
{
name: 'Queue',
value: 'queue',
description: 'Publish data to queue',
},
{
name: 'Exchange',
value: 'exchange',
description: 'Publish data to exchange',
},
],
default: 'queue',
description: 'To where data should be moved',
},
// ----------------------------------
// Queue
// ----------------------------------
{
displayName: 'Queue / Topic',
name: 'queue',
type: 'string',
displayOptions: {
show: {
mode: ['queue'],
},
hide: {
operation: ['deleteMessage'],
},
},
default: '',
placeholder: 'queue-name',
description: 'Name of the queue to publish to',
},
// ----------------------------------
// Exchange
// ----------------------------------
{
displayName: 'Exchange',
name: 'exchange',
type: 'string',
displayOptions: {
show: {
mode: ['exchange'],
},
},
default: '',
placeholder: 'exchange-name',
description: 'Name of the exchange to publish to',
},
{
displayName: 'Type',
name: 'exchangeType',
type: 'options',
displayOptions: {
show: {
mode: ['exchange'],
},
},
options: [
{
name: 'Direct',
value: 'direct',
description: 'Direct exchange type',
},
{
name: 'Topic',
value: 'topic',
description: 'Topic exchange type',
},
{
name: 'Headers',
value: 'headers',
description: 'Headers exchange type',
},
{
name: 'Fanout',
value: 'fanout',
description: 'Fanout exchange type',
},
],
default: 'fanout',
description: 'Type of exchange',
},
{
displayName: 'Routing Key',
name: 'routingKey',
type: 'string',
displayOptions: {
show: {
mode: ['exchange'],
},
},
default: '',
placeholder: 'routing-key',
description: 'The routing key for the message',
},
// ----------------------------------
// Default
// ----------------------------------
{
displayName: 'Send Input Data',
name: 'sendInputData',
type: 'boolean',
displayOptions: {
show: {
operation: ['sendMessage'],
},
},
default: true,
description: 'Whether to send the data the node receives as JSON',
},
{
displayName: 'Message',
name: 'message',
type: 'string',
displayOptions: {
show: {
sendInputData: [false],
},
},
default: '',
description: 'The message to be sent',
},
{
displayName: 'Options',
name: 'options',
type: 'collection',
default: {},
displayOptions: {
show: {
operation: ['sendMessage'],
},
},
placeholder: 'Add option',
options: [
{
displayName: 'Alternate Exchange',
name: 'alternateExchange',
type: 'string',
displayOptions: {
show: {
'/mode': ['exchange'],
},
},
default: '',
description:
'An exchange to send messages to if this exchange cant route them to any queues',
},
{
displayName: 'Arguments',
name: 'arguments',
placeholder: 'Add Argument',
description:
'Arguments to add, See <a href="https://amqp-node.github.io/amqplib/channel_api.html#channel_publish" target="_blank">here</a> for valid options',
type: 'fixedCollection',
typeOptions: {
multipleValues: true,
},
default: {},
options: [
{
name: 'argument',
displayName: 'Argument',
values: [
{
displayName: 'Key',
name: 'key',
type: 'string',
default: '',
},
{
displayName: 'Value',
name: 'value',
type: 'string',
default: '',
},
],
},
],
},
{
displayName: 'Auto Delete Queue',
name: 'autoDelete',
type: 'boolean',
default: false,
description:
'Whether the queue will be deleted when the number of consumers drops to zero',
},
{
displayName: 'Durable',
name: 'durable',
type: 'boolean',
default: true,
description: 'Whether the queue will survive broker restarts',
},
{
displayName: 'Exclusive',
name: 'exclusive',
type: 'boolean',
displayOptions: {
show: {
'/mode': ['queue'],
},
},
default: false,
description: 'Whether to scope the queue to the connection',
},
{
displayName: 'Headers',
name: 'headers',
placeholder: 'Add Header',
description: 'Headers to add',
type: 'fixedCollection',
typeOptions: {
multipleValues: true,
},
default: {},
options: [
{
name: 'header',
displayName: 'Header',
values: [
{
displayName: 'Key',
name: 'key',
type: 'string',
default: '',
},
{
displayName: 'Value',
name: 'value',
type: 'string',
default: '',
},
],
},
],
},
],
},
],
};
methods = {
credentialTest: {
async rabbitmqConnectionTest(
this: ICredentialTestFunctions,
credential: ICredentialsDecrypted,
): Promise<INodeCredentialTestResult> {
try {
const connection = await rabbitmqConnect(credential.data as RabbitMQCredentials);
await connection.close();
} catch (error) {
return {
status: 'Error',
message: error.message,
};
}
return {
status: 'OK',
message: 'Connection successful!',
};
},
},
};
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
let channel: amqplib.Channel | undefined;
try {
const items = this.getInputData();
const operation = this.getNodeParameter('operation', 0);
if (operation === 'deleteMessage') {
this.sendResponse(items[0].json);
return [items];
}
const mode = (this.getNodeParameter('mode', 0) as string) || 'queue';
const returnItems: INodeExecutionData[] = [];
if (mode === 'queue') {
const queue = this.getNodeParameter('queue', 0) as string;
const options = this.getNodeParameter('options', 0, {}) as Options;
channel = await rabbitmqConnectQueue.call(this, queue, options);
const sendInputData = this.getNodeParameter('sendInputData', 0) as boolean;
let message: string;
const queuePromises = [];
for (let i = 0; i < items.length; i++) {
if (sendInputData) {
message = JSON.stringify(items[i].json);
} else {
message = this.getNodeParameter('message', i) as string;
}
let headers: IDataObject = {};
if (
options.headers &&
((options.headers as IDataObject).header! as IDataObject[]).length
) {
const itemOptions = this.getNodeParameter('options', i, {});
const additionalHeaders: IDataObject = {};
((itemOptions.headers as IDataObject).header as IDataObject[]).forEach(
(header: IDataObject) => {
additionalHeaders[header.key as string] = header.value;
},
);
headers = additionalHeaders;
}
queuePromises.push(
channel.sendToQueue(queue, Buffer.from(message), {
headers,
...parsePublishArguments(options),
}),
);
}
const promisesResponses = await Promise.allSettled(queuePromises);
// @ts-ignore
promisesResponses.forEach((response: JsonObject) => {
if (response.status !== 'fulfilled') {
if (!this.continueOnFail()) {
throw new NodeApiError(this.getNode(), response);
} else {
// Return the actual reason as error
returnItems.push({
json: {
error: response.reason,
},
});
return;
}
}
returnItems.push({
json: {
success: response.value,
},
});
});
await channel.close();
await channel.connection.close();
} else if (mode === 'exchange') {
const exchange = this.getNodeParameter('exchange', 0) as string;
const routingKey = this.getNodeParameter('routingKey', 0) as string;
const options = this.getNodeParameter('options', 0, {}) as Options;
channel = await rabbitmqConnectExchange.call(this, exchange, options);
const sendInputData = this.getNodeParameter('sendInputData', 0) as boolean;
let message: string;
const exchangePromises = [];
for (let i = 0; i < items.length; i++) {
if (sendInputData) {
message = JSON.stringify(items[i].json);
} else {
message = this.getNodeParameter('message', i) as string;
}
let headers: IDataObject = {};
if (
options.headers &&
((options.headers as IDataObject).header! as IDataObject[]).length
) {
const itemOptions = this.getNodeParameter('options', i, {});
const additionalHeaders: IDataObject = {};
((itemOptions.headers as IDataObject).header as IDataObject[]).forEach(
(header: IDataObject) => {
additionalHeaders[header.key as string] = header.value;
},
);
headers = additionalHeaders;
}
exchangePromises.push(
channel.publish(exchange, routingKey, Buffer.from(message), {
headers,
...parsePublishArguments(options),
}),
);
}
const promisesResponses = await Promise.allSettled(exchangePromises);
// @ts-ignore
promisesResponses.forEach((response: JsonObject) => {
if (response.status !== 'fulfilled') {
if (!this.continueOnFail()) {
throw new NodeApiError(this.getNode(), response);
} else {
// Return the actual reason as error
returnItems.push({
json: {
error: response.reason,
},
});
return;
}
}
returnItems.push({
json: {
success: response.value,
},
});
});
await channel.close();
await channel.connection.close();
} else {
throw new NodeOperationError(this.getNode(), `The operation "${mode}" is not known!`);
}
return [returnItems];
} catch (error) {
if (channel) {
await channel.close();
await channel.connection.close();
}
throw error;
}
}
}

View File

@@ -0,0 +1,18 @@
{
"node": "n8n-nodes-base.rabbitmqTrigger",
"nodeVersion": "1.0",
"codexVersion": "1.0",
"categories": ["Development"],
"resources": {
"credentialDocumentation": [
{
"url": "https://docs.n8n.io/integrations/builtin/credentials/rabbitMQ/"
}
],
"primaryDocumentation": [
{
"url": "https://docs.n8n.io/integrations/builtin/trigger-nodes/n8n-nodes-base.rabbitmqtrigger/"
}
]
}
}

View File

@@ -0,0 +1,302 @@
/* eslint-disable n8n-nodes-base/node-filename-against-convention */
import type { Message } from 'amqplib';
import type {
INodeProperties,
INodeType,
INodeTypeDescription,
ITriggerFunctions,
ITriggerResponse,
} from 'n8n-workflow';
import { NodeConnectionTypes, NodeOperationError } from 'n8n-workflow';
import { rabbitDefaultOptions } from './DefaultOptions';
import { MessageTracker, rabbitmqConnectQueue, handleMessage } from './GenericFunctions';
import type { TriggerOptions } from './types';
export class RabbitMQTrigger implements INodeType {
description: INodeTypeDescription = {
displayName: 'RabbitMQ Trigger',
name: 'rabbitmqTrigger',
icon: 'file:rabbitmq.svg',
group: ['trigger'],
version: 1,
description: 'Listens to RabbitMQ messages',
eventTriggerDescription: '',
defaults: {
name: 'RabbitMQ Trigger',
},
triggerPanel: {
header: '',
executionsHelp: {
inactive:
"<b>While building your workflow</b>, click the 'execute step' button, then trigger a Rabbit MQ event. This will trigger an execution, which will show up in this editor.<br /> <br /><b>Once you're happy with your workflow</b>, <a data-key='activate'>activate</a> it. Then every time a change is detected, the workflow will execute. These executions will show up in the <a data-key='executions'>executions list</a>, but not in the editor.",
active:
"<b>While building your workflow</b>, click the 'execute step' button, then trigger a Rabbit MQ event. This will trigger an execution, which will show up in this editor.<br /> <br /><b>Your workflow will also execute automatically</b>, since it's activated. Every time a change is detected, this node will trigger an execution. These executions will show up in the <a data-key='executions'>executions list</a>, but not in the editor.",
},
activationHint:
"Once youve finished building your workflow, <a data-key='activate'>activate</a> it to have it also listen continuously (you just wont see those executions here).",
},
inputs: [],
outputs: [NodeConnectionTypes.Main],
credentials: [
{
name: 'rabbitmq',
required: true,
},
],
properties: [
{
displayName: 'Queue / Topic',
name: 'queue',
type: 'string',
default: '',
placeholder: 'queue-name',
description: 'The name of the queue to read from',
},
{
displayName: 'Options',
name: 'options',
type: 'collection',
default: {},
placeholder: 'Add option',
options: [
{
displayName: 'Content Is Binary',
name: 'contentIsBinary',
type: 'boolean',
default: false,
description: 'Whether to save the content as binary',
},
{
displayName: 'Delete From Queue When',
name: 'acknowledge',
type: 'options',
options: [
{
name: 'Execution Finishes',
value: 'executionFinishes',
description:
'After the workflow execution finished. No matter if the execution was successful or not.',
},
{
name: 'Execution Finishes Successfully',
value: 'executionFinishesSuccessfully',
description: 'After the workflow execution finished successfully',
},
{
name: 'Immediately',
value: 'immediately',
description: 'As soon as the message got received',
},
{
name: 'Specified Later in Workflow',
value: 'laterMessageNode',
description: 'Using a RabbitMQ node to remove the item from the queue',
},
],
default: 'immediately',
description: 'When to acknowledge the message',
},
{
displayName: 'JSON Parse Body',
name: 'jsonParseBody',
type: 'boolean',
displayOptions: {
hide: {
contentIsBinary: [true],
},
},
default: false,
description: 'Whether to parse the body to an object',
},
{
displayName: 'Only Content',
name: 'onlyContent',
type: 'boolean',
displayOptions: {
hide: {
contentIsBinary: [true],
},
},
default: false,
description: 'Whether to return only the content property',
},
{
displayName: 'Parallel Message Processing Limit',
name: 'parallelMessages',
type: 'number',
default: -1,
displayOptions: {
hide: {
acknowledge: ['immediately'],
},
},
description: 'Max number of executions at a time. Use -1 for no limit.',
},
{
displayName: 'Binding',
name: 'binding',
placeholder: 'Add Binding',
description: 'Add binding to queu',
type: 'fixedCollection',
typeOptions: {
multipleValues: true,
},
default: {},
options: [
{
name: 'bindings',
displayName: 'Binding',
values: [
{
displayName: 'Exchange',
name: 'exchange',
type: 'string',
default: '',
placeholder: 'exchange',
},
{
displayName: 'RoutingKey',
name: 'routingKey',
type: 'string',
default: '',
placeholder: 'routing-key',
},
],
},
],
},
...rabbitDefaultOptions,
].sort((a, b) => {
if (
(a as INodeProperties).displayName.toLowerCase() <
(b as INodeProperties).displayName.toLowerCase()
) {
return -1;
}
if (
(a as INodeProperties).displayName.toLowerCase() >
(b as INodeProperties).displayName.toLowerCase()
) {
return 1;
}
return 0;
}) as INodeProperties[],
},
{
displayName:
"To delete an item from the queue, insert a RabbitMQ node later in the workflow and use the 'Delete from queue' operation",
name: 'laterMessageNode',
type: 'notice',
displayOptions: {
show: {
'/options.acknowledge': ['laterMessageNode'],
},
},
default: '',
},
],
};
async trigger(this: ITriggerFunctions): Promise<ITriggerResponse> {
const queue = this.getNodeParameter('queue') as string;
const options = this.getNodeParameter('options', {}) as TriggerOptions;
const channel = await rabbitmqConnectQueue.call(this, queue, options);
const messageTracker = new MessageTracker();
let acknowledgeMode = options.acknowledge ?? 'immediately';
let closeGotCalled = false;
let consumerTag: string | undefined;
// The "closeFunction" function gets called by n8n whenever
// the workflow gets deactivated and can so clean up.
const closeFunction = async () => {
closeGotCalled = true;
try {
return await messageTracker.closeChannel(channel, consumerTag);
} catch (error) {
const workflow = this.getWorkflow();
const node = this.getNode();
this.logger.error(
`There was a problem closing the RabbitMQ Trigger node connection "${node.name}" in workflow "${workflow.id}": "${error.message}"`,
{
node: node.name,
workflowId: workflow.id,
},
);
}
};
if (this.getMode() === 'manual') {
const manualTriggerFunction = async () => {
// Do only catch a single message when executing manually, else messages will leak
await channel.prefetch(1);
const processMessage = async (message: Message | null) => {
if (message !== null) {
void handleMessage.call(
this,
message,
channel,
messageTracker,
acknowledgeMode,
options,
);
} else {
this.emitError(new Error('Connection got closed unexpectedly'));
}
};
const existingMessage = await channel.get(queue);
if (existingMessage) {
await processMessage(existingMessage);
} else {
const consumerInfo = await channel.consume(queue, processMessage);
consumerTag = consumerInfo.consumerTag;
}
};
return {
closeFunction,
manualTriggerFunction,
};
}
const parallelMessages = options.parallelMessages ?? -1;
if (isNaN(parallelMessages) || parallelMessages === 0 || parallelMessages < -1) {
throw new NodeOperationError(
this.getNode(),
'Parallel message processing limit must be a number greater than zero (or -1 for no limit)',
);
}
if (parallelMessages !== -1 && acknowledgeMode === 'immediately') {
// If parallel message limit is set, then the default mode is "executionFinishes"
// unless acknowledgeMode got set specifically. Be aware that the mode "immediately"
// can not be supported in this case.
acknowledgeMode = 'executionFinishes';
}
if (parallelMessages !== -1) {
await channel.prefetch(parallelMessages);
}
channel.on('close', () => {
if (!closeGotCalled) {
this.emitError(new Error('Connection got closed unexpectedly'));
}
});
const consumerInfo = await channel.consume(queue, async (message) => {
if (message !== null) {
void handleMessage.call(this, message, channel, messageTracker, acknowledgeMode, options);
}
});
consumerTag = consumerInfo.consumerTag;
return {
closeFunction,
};
}
}

View File

@@ -0,0 +1,3 @@
<svg width="40" height="40" viewBox="0 0 40 40" fill="none" xmlns="http://www.w3.org/2000/svg">
<path d="M37.2582 16H24.688C24.0575 16 23.5455 15.488 23.5455 14.8575V1.75709C23.5455 0.786909 22.7585 0 21.7891 0H17.3018C16.3316 0 15.5455 0.786909 15.5455 1.75709V14.7782C15.5455 15.4538 14.9993 16.0029 14.3236 16.0058L10.2044 16.0255C9.52291 16.0291 8.96946 15.4764 8.97091 14.7956L8.99636 1.76C8.99855 0.788364 8.21164 0 7.24 0H2.75636C1.78618 0 1 0.786909 1 1.75709V38.44C1 39.3018 1.69818 40 2.55927 40H37.2582C38.12 40 38.8182 39.3018 38.8182 38.44V17.56C38.8182 16.6982 38.12 16 37.2582 16ZM31.3636 29.7731C31.3636 30.9025 30.448 31.8182 29.3185 31.8182H25.7724C24.6429 31.8182 23.7273 30.9025 23.7273 29.7731V26.2269C23.7273 25.0975 24.6429 24.1818 25.7724 24.1818H29.3185C30.448 24.1818 31.3636 25.0975 31.3636 26.2269V29.7731Z" fill="#FF6600"/>
</svg>

After

Width:  |  Height:  |  Size: 860 B

View File

@@ -0,0 +1,437 @@
import type { Channel, Connection, ConsumeMessage, Message } from 'amqplib';
import { mock, mockDeep } from 'jest-mock-extended';
import type { INode, IRun, ITriggerFunctions, IWorkflowMetadata } from 'n8n-workflow';
const mockChannel = mock<Channel>();
const mockConnection = mock<Connection>({ createChannel: async () => mockChannel });
mockChannel.connection = mockConnection;
const connect = jest.fn().mockReturnValue(mockConnection);
jest.mock('amqplib', () => ({ connect }));
import {
parseMessage,
rabbitmqConnect,
rabbitmqConnectExchange,
rabbitmqConnectQueue,
rabbitmqCreateChannel,
MessageTracker,
handleMessage,
} from '../GenericFunctions';
import type { TriggerOptions } from '../types';
describe('RabbitMQ GenericFunctions', () => {
const credentials = {
hostname: 'some.host',
port: 5672,
username: 'user',
password: 'pass',
vhost: '/',
};
const context = mockDeep<ITriggerFunctions>();
beforeEach(() => jest.clearAllMocks());
describe('parseMessage', () => {
const helpers = mock<ITriggerFunctions['helpers']>();
it('should handle binary data', async () => {
const message = mock<Message>();
const content = Buffer.from('test');
message.content = content;
const options = mock<TriggerOptions>({ contentIsBinary: true });
helpers.prepareBinaryData.mockResolvedValue(mock());
const item = await parseMessage(message, options, helpers);
expect(item.json).toBe(message);
expect(item.binary?.data).toBeDefined();
expect(helpers.prepareBinaryData).toHaveBeenCalledWith(content);
expect(message.content).toBeUndefined();
});
it('should handle JSON data', async () => {
const message = mock<Message>();
const content = Buffer.from(JSON.stringify({ test: 'test' }));
message.content = content;
const options = mock<TriggerOptions>({
contentIsBinary: false,
jsonParseBody: true,
onlyContent: false,
});
const item = await parseMessage(message, options, helpers);
expect(item.json).toBe(message);
expect(item.binary).toBeUndefined();
expect(helpers.prepareBinaryData).not.toHaveBeenCalled();
expect(message.content).toEqual({ test: 'test' });
});
it('should return only content, when requested', async () => {
const message = mock<Message>();
const content = Buffer.from(JSON.stringify({ test: 'test' }));
message.content = content;
const options = mock<TriggerOptions>({
contentIsBinary: false,
jsonParseBody: false,
onlyContent: true,
});
const item = await parseMessage(message, options, helpers);
expect(item.json).toBe(content.toString());
expect(item.binary).toBeUndefined();
expect(helpers.prepareBinaryData).not.toHaveBeenCalled();
expect(message.content).toEqual(content);
});
});
describe('rabbitmqConnect', () => {
it('should connect to RabbitMQ', async () => {
const connection = await rabbitmqConnect({ ...credentials, ssl: false });
expect(connect).toHaveBeenCalledWith(credentials, {});
expect(connection).toBe(mockConnection);
});
it('should connect to RabbitMQ over SSL', async () => {
const connection = await rabbitmqConnect({
...credentials,
ssl: true,
ca: 'ca',
passwordless: false,
});
expect(connect).toHaveBeenCalledWith(
{ ...credentials, protocol: 'amqps' },
{ ca: [Buffer.from('ca')] },
);
expect(connection).toBe(mockConnection);
});
});
describe('rabbitmqCreateChannel', () => {
it('should create a channel', async () => {
context.getCredentials.mockResolvedValue(credentials);
const channel = await rabbitmqCreateChannel.call(context);
expect(channel).toBe(mockChannel);
});
});
describe('rabbitmqConnectQueue', () => {
it('should assert a queue', async () => {
context.getCredentials.mockResolvedValue(credentials);
const options = mock<TriggerOptions>({ assertQueue: true });
await rabbitmqConnectQueue.call(context, 'queue', options);
expect(mockChannel.assertQueue).toHaveBeenCalledWith('queue', options);
expect(mockChannel.checkQueue).not.toHaveBeenCalled();
expect(mockChannel.bindQueue).not.toHaveBeenCalled();
});
it('should check a queue', async () => {
context.getCredentials.mockResolvedValue(credentials);
const options = mock<TriggerOptions>({ assertQueue: false });
await rabbitmqConnectQueue.call(context, 'queue', options);
expect(mockChannel.assertQueue).not.toHaveBeenCalled();
expect(mockChannel.checkQueue).toHaveBeenCalledWith('queue');
expect(mockChannel.bindQueue).not.toHaveBeenCalled();
});
});
describe('rabbitmqConnectExchange', () => {
it('should assert a queue', async () => {
context.getCredentials.mockResolvedValue(credentials);
context.getNodeParameter.calledWith('exchangeType', 0).mockReturnValue('topic');
const options = mock<TriggerOptions>({ assertExchange: true });
await rabbitmqConnectExchange.call(context, 'exchange', options);
expect(mockChannel.assertExchange).toHaveBeenCalledWith('exchange', 'topic', options);
expect(mockChannel.checkExchange).not.toHaveBeenCalled();
});
it('should check a queue', async () => {
context.getCredentials.mockResolvedValue(credentials);
const options = mock<TriggerOptions>({ assertExchange: false });
await rabbitmqConnectExchange.call(context, 'exchange', options);
expect(mockChannel.assertExchange).not.toHaveBeenCalled();
expect(mockChannel.checkExchange).toHaveBeenCalledWith('exchange');
});
});
describe('MessageTracker', () => {
let messageTracker: MessageTracker;
beforeEach(() => {
messageTracker = new MessageTracker();
});
it('should track received messages', () => {
const message = { fields: { deliveryTag: 1 } } as ConsumeMessage;
messageTracker.received(message);
expect(messageTracker.messages).toContain(1);
});
it('should track answered messages', () => {
const message = { fields: { deliveryTag: 1 } } as ConsumeMessage;
messageTracker.received(message);
messageTracker.answered(message);
expect(messageTracker.messages).not.toContain(1);
});
it('should return the number of unanswered messages', () => {
const message = { fields: { deliveryTag: 1 } } as ConsumeMessage;
messageTracker.received(message);
expect(messageTracker.unansweredMessages()).toBe(1);
});
it('should close the channel and connection', async () => {
await messageTracker.closeChannel(mockChannel, 'consumerTag');
expect(mockChannel.cancel).toHaveBeenCalledWith('consumerTag');
expect(mockChannel.close).toHaveBeenCalled();
expect(mockConnection.close).toHaveBeenCalled();
});
});
describe('handleMessage', () => {
const mockChannel = mockDeep<Channel>();
const messageTracker = mock<MessageTracker>();
const message = {
content: {
foo: 'bar',
},
} as unknown as Message;
const item = { json: message };
const options = {} as TriggerOptions;
it('should ack a message with "acknowledgeMode" set to "immediately"', async () => {
await handleMessage.call(
context,
message,
mockChannel,
messageTracker,
'immediately',
options,
);
expect(context.emit).toHaveBeenCalledWith([[item]], undefined, undefined);
expect(mockChannel.ack).toHaveBeenCalledWith(message);
});
it('should ack a message with "acknowledgeMode" set to "executionFinishesSuccessfully"', async () => {
let resolvePromise: (data: IRun) => void = () => {};
const deferredPromise = {
promise: new Promise<IRun>((resolve) => {
resolvePromise = resolve;
}),
resolve: jest.fn(),
reject: jest.fn(),
};
context.helpers.createDeferredPromise.mockReturnValue(deferredPromise);
const handleMessagePromise = handleMessage.call(
context,
message,
mockChannel,
messageTracker,
'executionFinishesSuccessfully',
options,
);
await Promise.resolve(); // yield control to let handleMessage run
expect(messageTracker.received).toHaveBeenCalledWith(message);
expect(context.emit).toHaveBeenCalledWith([[item]], undefined, deferredPromise);
expect(mockChannel.ack).not.toHaveBeenCalled();
expect(messageTracker.answered).not.toHaveBeenCalled();
resolvePromise({
data: {
resultData: {
error: undefined,
},
},
} as IRun);
await handleMessagePromise;
expect(mockChannel.ack).toHaveBeenCalledWith(message);
expect(messageTracker.answered).toHaveBeenCalledWith(message);
});
it('should nack a message with "acknowledgeMode" set to "executionFinishesSuccessfully" when there is an error', async () => {
let resolvePromise: (data: IRun) => void = () => {};
const deferredPromise = {
promise: new Promise<IRun>((resolve) => {
resolvePromise = resolve;
}),
resolve: jest.fn(),
reject: jest.fn(),
};
context.helpers.createDeferredPromise.mockReturnValue(deferredPromise);
const handleMessagePromise = handleMessage.call(
context,
message,
mockChannel,
messageTracker,
'executionFinishesSuccessfully',
options,
);
await Promise.resolve(); // yield control to let handleMessage run
expect(messageTracker.received).toHaveBeenCalledWith(message);
expect(context.emit).toHaveBeenCalledWith([[item]], undefined, deferredPromise);
expect(mockChannel.nack).not.toHaveBeenCalled();
expect(messageTracker.answered).not.toHaveBeenCalled();
resolvePromise({
data: {
resultData: {
error: new Error('Some error'),
},
},
} as IRun);
await handleMessagePromise;
expect(mockChannel.nack).toHaveBeenCalledWith(message);
expect(messageTracker.answered).toHaveBeenCalledWith(message);
});
it('should ack a message with "acknowledgeMode" set to "executionFinishes"', async () => {
let resolvePromise: (data: IRun) => void = () => {};
const deferredPromise = {
promise: new Promise<IRun>((resolve) => {
resolvePromise = resolve;
}),
resolve: jest.fn(),
reject: jest.fn(),
};
context.helpers.createDeferredPromise.mockReturnValue(deferredPromise);
const handleMessagePromise = handleMessage.call(
context,
message,
mockChannel,
messageTracker,
'executionFinishes',
options,
);
await Promise.resolve(); // yield control to let handleMessage run
expect(messageTracker.received).toHaveBeenCalledWith(message);
expect(context.emit).toHaveBeenCalledWith([[item]], undefined, deferredPromise);
expect(mockChannel.ack).not.toHaveBeenCalled();
expect(messageTracker.answered).not.toHaveBeenCalled();
resolvePromise({
data: {
resultData: {
error: undefined,
},
},
} as IRun);
await handleMessagePromise;
expect(mockChannel.ack).toHaveBeenCalledWith(message);
expect(messageTracker.answered).toHaveBeenCalledWith(message);
});
it('should ack a message with "acknowledgeMode" set to "laterMessageNode"', async () => {
let resolvePromise: (data: IRun) => void = () => {};
const deferredPromise = {
promise: new Promise<IRun>((resolve) => {
resolvePromise = resolve;
}),
resolve: jest.fn(),
reject: jest.fn(),
};
context.helpers.createDeferredPromise.mockReturnValue(deferredPromise);
const handleMessagePromise = handleMessage.call(
context,
message,
mockChannel,
messageTracker,
'laterMessageNode',
options,
);
await Promise.resolve(); // yield control to let handleMessage run
expect(messageTracker.received).toHaveBeenCalledWith(message);
expect(context.emit).toHaveBeenCalledWith([[item]], deferredPromise, undefined);
expect(mockChannel.ack).not.toHaveBeenCalled();
expect(messageTracker.answered).not.toHaveBeenCalled();
resolvePromise({
data: {
resultData: {
error: undefined,
},
},
} as IRun);
await handleMessagePromise;
expect(mockChannel.ack).toHaveBeenCalledWith(message);
expect(messageTracker.answered).toHaveBeenCalledWith(message);
});
it('should handle error when "acknowledgeMode" is set to "immediately"', async () => {
mockChannel.ack.mockImplementation(() => {
throw new Error('Test error');
});
context.getWorkflow.mockReturnValue({
id: '123',
} as IWorkflowMetadata);
context.getNode.mockReturnValue({
name: 'Test node',
} as INode);
await handleMessage.call(
context,
message,
mockChannel,
messageTracker,
'immediately',
options,
);
expect(context.logger.error).toHaveBeenCalledWith(
'There was a problem with the RabbitMQ Trigger node "Test node" in workflow "123": "Test error"',
{
node: 'Test node',
workflowId: '123',
},
);
});
it('should handle error when "acknowledgeMode" is set to something other than "immediately"', async () => {
context.helpers.createDeferredPromise.mockImplementation(() => {
throw new Error('Test error');
});
context.getWorkflow.mockReturnValue({
id: '123',
} as IWorkflowMetadata);
context.getNode.mockReturnValue({
name: 'Test node',
} as INode);
await handleMessage.call(
context,
message,
mockChannel,
messageTracker,
'executionFinishesSuccessfully',
options,
);
expect(context.logger.error).toHaveBeenCalledWith(
'There was a problem with the RabbitMQ Trigger node "Test node" in workflow "123": "Test error"',
{
node: 'Test node',
workflowId: '123',
},
);
});
});
});

View File

@@ -0,0 +1,118 @@
import { mockDeep } from 'jest-mock-extended';
import type { ITriggerFunctions } from 'n8n-workflow';
import * as GenericFunctions from '../GenericFunctions';
import type { Channel, GetMessage } from 'amqplib';
import { RabbitMQTrigger } from '../RabbitMQTrigger.node';
describe('RabbitMQTrigger node', () => {
const trigger = new RabbitMQTrigger();
const mockTriggerFunctions = mockDeep<ITriggerFunctions>();
const connectSpy = jest.spyOn(GenericFunctions, 'rabbitmqConnectQueue');
const handleMessageSpy = jest.spyOn(GenericFunctions, 'handleMessage');
const mockChannel = mockDeep<Channel>();
beforeEach(() => {
jest.resetAllMocks();
});
describe('manual execution', () => {
it('should get a message from the queue', async () => {
const message = {
content: {
foo: 'bar',
},
fields: {
deliveryTag: 1,
},
};
const options = { acknowledge: 'immediately' };
mockTriggerFunctions.getMode.mockReturnValue('manual');
mockTriggerFunctions.getNodeParameter.mockImplementation((parameterName) => {
switch (parameterName) {
case 'queue':
return 'testQueue';
case 'options':
return options;
}
return undefined;
});
connectSpy.mockResolvedValue(mockChannel);
mockChannel.get.mockResolvedValue(message as unknown as GetMessage);
const { closeFunction, manualTriggerFunction } =
await trigger.trigger.call(mockTriggerFunctions);
await manualTriggerFunction!();
expect(mockChannel.prefetch).toHaveBeenCalledWith(1);
expect(mockChannel.get).toHaveBeenCalledWith('testQueue');
expect(handleMessageSpy).toHaveBeenCalledWith(
message,
mockChannel,
expect.anything(),
'immediately',
options,
);
expect(mockChannel.consume).not.toHaveBeenCalled();
expect(mockChannel.close).not.toHaveBeenCalled();
await closeFunction!();
expect(mockChannel.close).toHaveBeenCalled();
});
it('should listen for a message from the queue', async () => {
const options = { acknowledge: 'immediately' };
mockTriggerFunctions.getMode.mockReturnValue('manual');
mockTriggerFunctions.getNodeParameter.mockImplementation((parameterName) => {
switch (parameterName) {
case 'queue':
return 'testQueue';
case 'options':
return options;
}
return undefined;
});
connectSpy.mockResolvedValue(mockChannel);
mockChannel.consume.mockResolvedValue({
consumerTag: 'testConsumerTag',
});
const { closeFunction, manualTriggerFunction } =
await trigger.trigger.call(mockTriggerFunctions);
await manualTriggerFunction!();
expect(mockChannel.prefetch).toHaveBeenCalledWith(1);
expect(mockChannel.consume).toHaveBeenCalledWith('testQueue', expect.anything());
expect(mockChannel.close).not.toHaveBeenCalled();
await closeFunction!();
expect(mockChannel.close).toHaveBeenCalled();
});
});
describe('regular execution', () => {
it('should listen for a message from the queue', async () => {
const options = { acknowledge: 'immediately' };
mockTriggerFunctions.getMode.mockReturnValue('trigger');
mockTriggerFunctions.getNodeParameter.mockImplementation((parameterName) => {
switch (parameterName) {
case 'queue':
return 'testQueue';
case 'options':
return options;
}
return undefined;
});
connectSpy.mockResolvedValue(mockChannel);
mockChannel.consume.mockResolvedValue({
consumerTag: 'testConsumerTag',
});
const { closeFunction } = await trigger.trigger.call(mockTriggerFunctions);
expect(mockChannel.prefetch).not.toHaveBeenCalled();
expect(mockChannel.consume).toHaveBeenCalledWith('testQueue', expect.anything());
expect(mockChannel.get).not.toHaveBeenCalled();
expect(mockChannel.close).not.toHaveBeenCalled();
await closeFunction!();
expect(mockChannel.close).toHaveBeenCalled();
});
});
});

View File

@@ -0,0 +1,71 @@
type Argument = {
key: string;
value?: string;
};
type Binding = {
exchange: string;
routingKey: string;
};
type Header = {
key: string;
value?: string;
};
export type Options = {
autoDelete: boolean;
assertExchange: boolean;
assertQueue: boolean;
durable: boolean;
exclusive: boolean;
arguments: {
argument: Argument[];
};
headers: {
header: Header[];
};
};
type ContentOptions =
| {
contentIsBinary: true;
}
| {
contentIsBinary: false;
jsonParseBody: boolean;
onlyContent: boolean;
};
export type TriggerOptions = Options & {
acknowledge:
| 'executionFinishes'
| 'executionFinishesSuccessfully'
| 'immediately'
| 'laterMessageNode';
parallelMessages: number;
binding: {
bindings: Binding[];
};
} & ContentOptions;
export type RabbitMQCredentials = {
hostname: string;
port: number;
username: string;
password: string;
vhost: string;
} & (
| { ssl: false }
| ({ ssl: true; ca: string } & (
| { passwordless: false }
| {
passwordless: true;
cert: string;
key: string;
passphrase: string;
}
))
);
export type ExchangeType = 'direct' | 'topic' | 'headers' | 'fanout';