import BaseComponent from './basecomponent'
import ExchangeMessage from './messages/exchangemessage'
import QueueMessage from './messages/queuemessage'
class Queue extends BaseComponent {
/**
* Queue class represents a component which keeps the messages until they are consumed.
*
* @param {number} x - x position of the queue
* @param {number} y - y position of the queue
* @param {string} name - optional identifier
* @param {number} ttl - `x-message-ttl` Argument of the queue
* @param {Exchange} dlx - Exchange object as `x-dead-letter-exchange` argument
* @param {number} maxLength - `x-max-length` argument of the queue
* @extends BaseComponent
*/
constructor(x, y, name, ttl, dlx, dlxrk, maxLength) {
super(x, y)
this.name = name
this.msgTtl = ttl || ''
this.dlx = dlx
this.dlxrk = dlxrk || ''
this.maxLength = maxLength || ''
this.radius = 20
this.binding = null
this.bindings = []
this.consumers = []
this.messages = []
}
/**
* Adds a consumer to the queue.
*
* @param {Consumer} consumer - Consumer object
*/
addConsumer(consumer) {
if (this.consumers.findIndex((c) => c === consumer) === -1) {
this.consumers.push(consumer)
}
// consumer added then deliver messages if present
if (this.consumers.length > 0 && this.messages.length > 0) {
this.messages.forEach((val) => {
val.msg.setConsumer(this.consumers[0])
})
this.messages = []
}
}
/**
* Removes a consumer from the queue.
*
* @param {Consumer} consumer - Consumer object
*/
removeConsumer(consumer) {
const consumerIndex = this.consumers.findIndex((c) => c === consumer)
if (consumerIndex !== -1) {
if (this.consumers.length === 1) {
this.consumers = []
} else {
this.consumers.splice(consumerIndex, 1)
}
}
}
/**
* Handler for arriving messages.
*
* @param {QueueMessage} msg - QueueMessage object
*/
messageArrived(msg) {
const { fillStyle } = msg
if (msg.constructor.name === 'RejectMessage') {
if (this.dlx) {
new ExchangeMessage(
this.x,
this.y,
this.dlx,
this.dlxrk,
msg.message,
true,
undefined,
fillStyle
).addToScene(this.scene)
} else {
this.scene.lostMessages += 1
}
} else {
// no consumer, message stays in the queue
if (this.consumers.length === 0) {
this.messages.push({
ts: Date.now(),
msg: new QueueMessage(
this.x,
this.y,
this,
null,
fillStyle
).addToScene(this.scene)
})
} else {
// deliver message random to one of the consumer
new QueueMessage(
this.x,
this.y,
this,
this.consumers[Math.floor(Math.random() * this.consumers.length)],
fillStyle
).addToScene(this.scene)
}
// max-length
if (this.maxLength !== '' && this.maxLength < this.messages.length) {
this.messages.pop()
if (this.dlx) {
new ExchangeMessage(
this.x,
this.y,
this.dlx,
this.dlxrk,
msg.message,
fillStyle
).addToScene(this.scene)
}
}
}
this.scene.removeActor(msg)
}
update() {
if (this.msgTtl > 0) {
const now = Date.now()
const msgToRemove = []
this.messages.forEach((val, i) => {
if (now - val.ts >= this.msgTtl) {
msgToRemove.push(i)
}
})
msgToRemove.forEach((val) => {
//console.log(val)
if (this.dlx) {
new ExchangeMessage(this.x, this.y, this.dlx, this.dlxrk).addToScene(
this.scene
)
} else {
this.scene.lostMessages += 1
}
this.messages.splice(val, 1)
})
}
}
render() {
// shadow
this.ctx.globalAlpha = 0.4
this.ctx.beginPath()
this.ctx.fillStyle = '#000'
this.ctx.roundRect(this.x - 23, this.y - 12, 50, 30, [10])
this.ctx.fill()
this.ctx.globalAlpha = 1.0
this.ctx.beginPath()
// this.ctx.fillStyle = gradient;
this.ctx.fillStyle = '#ccc'
this.ctx.setLineDash([])
this.ctx.roundRect(this.x - 25, this.y - 15, 50, 30, [10])
this.ctx.fill()
if (this.dragged) {
this.ctx.stroke()
}
if (this.hover) {
this.ctx.stroke()
}
if (this.dlx) {
this.ctx.beginPath()
this.ctx.strokeStyle = '#000'
this.ctx.setLineDash([3, 3])
this.ctx.lineWidth = 1
this.ctx.moveTo(this.x, this.y)
this.ctx.lineTo(this.dlx.x, this.dlx.y)
this.ctx.stroke()
}
this.ctx.font = '10px Arial'
this.ctx.fillStyle = '#000'
this.ctx.fillText(
this.name,
this.x - this.radius,
this.y + this.radius + 10
)
this.ctx.fillText(
`${this.messages.length} msgs`,
this.x - `${this.messages.length} msgs`.length,
this.y + this.radius + 20
)
}
}
export default Queue