-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathamqp.coffee
More file actions
240 lines (212 loc) · 8 KB
/
amqp.coffee
File metadata and controls
240 lines (212 loc) · 8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
debug = require('debug')('msgflo:amqp')
async = require 'async'
interfaces = require './interfaces'
uuid = require 'uuid'
try
amqp = require 'amqplib/callback_api'
catch e
amqp = e
class Client extends interfaces.MessagingClient
constructor: (@address, @options={}) ->
@connection = null
@channel = null
@options.prefetch = 2 if not @options.prefetch?
## Broker connection management
connect: (callback) ->
debug 'connect', @address
if amqp.message
return callback amqp
amqp.connect @address, (err, conn) =>
debug 'connected', err
return callback err if err
@connection = conn
conn.createChannel (err, ch) =>
debug 'channel created', err
return callback err if err
@channel = ch
debug 'setting prefetch', @options.prefetch
@channel.prefetch @options.prefetch
@channel.on 'close', () ->
debug 'channel closed'
@channel.on 'error', (err) ->
throw err if err
return callback null
disconnect: (callback) ->
debug 'disconnect'
return callback null if not @connection
return callback null if not @channel
@channel.close (err) =>
debug 'channel closed', err
@channel = null
@connection.close (err) =>
debug 'connection closed'
@connection = null
return callback err
## Manipulating queues
createQueue: (type, queueName, options, callback) ->
if not callback
callback = options
options = {}
debug 'create queue', type, queueName, options
queueOptions =
deadLetterExchange: 'dead-'+queueName # if not existing, messages will be dropped
exchangeOptions = {}
exchangeName = queueName
if options.persistent? and not options.persistent
queueOptions.durable = false
queueOptions.autoDelete = true
exchangeOptions.durable = false
exchangeOptions.autoDelete = true
if type == 'inqueue'
@channel.assertQueue queueName, queueOptions, (err) =>
# HACK: to make inqueue==outqueue work without binding.
# Has side-effect of creating an implicit exchange.
# Better than implicit queue, since a queue holds messages forever if noone is subscribed
@channel.assertExchange exchangeName, 'fanout', exchangeOptions, (err) =>
return callback err if err
@channel.bindQueue exchangeName, queueName, '', {}, callback
else
@channel.assertExchange exchangeName, 'fanout', exchangeOptions, callback
removeQueue: (type, queueName, callback) ->
debug 'remove queue', type, queueName
if type == 'inqueue'
@channel.deleteQueue queueName, {}, callback
else
exchangeName = queueName
@channel.deleteExchange exchangeName, {}, callback
## Sending/Receiving messages
sendTo: (type, name, message, callback) ->
return callback new Error 'msgflo.amqp.sendTo(): Not connected' if not @channel
# queue must exists
data = new Buffer JSON.stringify message
showLimit = 80
dataShow = if data.length > showLimit then data.slice(0, showLimit)+'...' else data
debug 'sendTo', type, name, dataShow
if type == 'inqueue'
# direct to queue
exchange = ''
routingKey = name
else
# to fanout exchange
exchange = name
routingKey = ''
@channel.publish exchange, routingKey, data
return callback null
subscribeToQueue: (queueName, handler, callback) ->
return callback new Error 'msgflo.amqp.subscribeToQueue(): Not connected' if not @channel
debug 'subscribe', queueName
# queue must exists
deserialize = (message) =>
debug 'receive on queue', queueName, message.fields.deliveryTag
data = null
try
data = JSON.parse message.content.toString()
catch e
data = message.content.toString()
out =
amqp: message
data: data
return handler out
@channel.consume queueName, deserialize
debug 'subscribed', queueName
return callback null
## ACK/NACK messages
ackMessage: (message) ->
return if not @channel
fields = message.amqp.fields
debug 'ACK', fields.routingKey, fields.deliveryTag
# NOTE: server will only give us new message after this
@channel.ack message.amqp, false
nackMessage: (message) ->
return if not @channel
fields = message.amqp.fields
debug 'NACK', fields.routingKey, fields.deliveryTag
@channel.nack message.amqp, false, false
# Participant registration
registerParticipant: (part, callback) ->
msg =
protocol: 'discovery'
command: 'participant'
payload: part
exchangeName = 'fbp'
@channel.assertExchange exchangeName, 'fanout', {}, (err) =>
return callback err if err
data = new Buffer JSON.stringify msg
@channel.publish exchangeName, '', data
return callback null
class MessageBroker extends Client
constructor: (address, options) ->
super address, options
addBinding: (binding, callback) ->
# TODO: support roundrobin type
debug 'Broker.addBinding', binding
if binding.type == 'pubsub'
@channel.bindQueue binding.tgt, binding.src, '', {}, callback
else if binding.type == 'roundrobin'
pattern = ''
bindSrcTgt = (callback) =>
# TODO: avoid creating the direct exchange?
debug 'binding src to tgt', binding.src, binding.tgt
directExchange = 'out-'+binding.src
directOptions = {}
@channel.assertExchange directExchange, 'direct', directOptions, (err) =>
return callback err if err
# bind input
@channel.bindExchange directExchange, binding.src, pattern, (err), =>
return callback err if err
# bind output
@channel.bindQueue binding.tgt, directExchange, pattern, {}, (err) =>
return callback err
bindDeadLetter = (callback) =>
# Setup the deadletter exchange, bind to deadletter queue
debug 'binding deadletter queue', binding.deadletter, binding.tgt
deadLetterExchange = 'dead-'+binding.tgt
deadLetterOptions = {}
@channel.assertExchange deadLetterExchange, 'fanout', deadLetterOptions, (err) =>
return callback err if err
@channel.bindQueue binding.deadletter, deadLetterExchange, pattern, {}, callback
steps = []
steps.push bindSrcTgt if binding.src and binding.tgt
steps.push bindDeadLetter if binding.deadletter and binding.tgt
async.series steps, callback
else
return callback new Error 'Unsupported binding type: '+binding.type
removeBinding: (binding, callback) -> # FIXME: implement
return callback null
listBindings: (from, callback) -> # FIXME: implement
return callback null, []
# Data subscriptions
subscribeData: (binding, datahandler, callback) -> # TODO: implement
return callback null
unsubscribeData: (binding, datahandler, callback) -> # TODO: implement
return callback null
# Participant registration
subscribeParticipantChange: (handler, callback) ->
defaultCallback = (err) ->
if err
console.err "Error in msgflo.amqp.subscribeParticipantChange, and no callback added", err
callback = defaultCallback if not callback
deserialize = (message) =>
debug 'receive on fbp', message.fields.deliveryTag
data = null
try
data = JSON.parse message.content.toString()
catch e
debug 'JSON exception:', e
out =
amqp: message
data: data
return handler out
exchangeName = 'fbp'
@channel.assertExchange exchangeName, 'fanout', {}, (err) =>
return callback err if err
subscribeQueue = '.fbp-subscribe-' + uuid.v4()
@channel.assertQueue subscribeQueue, { persistent: false }, (err) =>
return callback err if err
@channel.bindQueue subscribeQueue, exchangeName, '', {}, (err) =>
return callback err if err
@channel.consume subscribeQueue, deserialize
debug 'subscribed to', subscribeQueue, exchangeName
return callback null
exports.Client = Client
exports.MessageBroker = MessageBroker