-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathmqtt.coffee
More file actions
140 lines (118 loc) · 3.69 KB
/
mqtt.coffee
File metadata and controls
140 lines (118 loc) · 3.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
debug = require('debug')('msgflo:mqtt')
interfaces = require './interfaces'
routing = require './routing'
try
mqtt = require 'mqtt'
catch e
mqtt = e
class Client extends interfaces.MessagingClient
constructor: (@address, @options) ->
@client = null
@subscribers = {} # queueName -> [handler1, ...]
## Broker connection management
connect: (callback) ->
if mqtt.message
return callback mqtt
@client = mqtt.connect @address
# debug
@client.on 'reconnect', () =>
debug 'reconnect'
@client.on 'offline', () =>
debug 'offline'
@client.on 'error', (err) =>
debug 'error', err
if callback
callback err
callback = null
return
onConnected = (connack) =>
debug 'connected'
@client.on 'message', (topic, message) =>
@_onMessage topic, message
if callback
callback null
callback = null
return
@client.once 'connect', onConnected
disconnect: (callback) ->
@client.removeAllListeners 'message'
@client.removeAllListeners 'connect'
@client.removeAllListeners 'reconnect'
@client.removeAllListeners 'offline'
@client.removeAllListeners 'error'
@subscribers = {}
@client.end (err) =>
debug 'disconnected'
@client = null
return callback err
## Manipulating queues
createQueue: (type, queueName, options, callback) ->
if not callback
callback = options
options = {}
# Noop, in MQTT one can send messages on 'topics' at any time
return callback null
removeQueue: (type, queueName, callback) ->
# Noop, in MQTT one can send messages on 'topics' at any time
return callback null
## Sending/Receiving messages
sendTo: (type, queueName, message, callback) ->
published = (err, granted) =>
debug 'published', queueName, err, granted
return callback err if err
return callback null
data = JSON.stringify message
debug 'publishing', queueName, data
@client.publish queueName, data, published
subscribeToQueue: (queueName, handler, callback) ->
debug 'subscribing', queueName
@client.subscribe queueName, (err) =>
debug 'subscribed', queueName, err
return callback err if err
subs = @subscribers[queueName]
if subs then subs.push handler else @subscribers[queueName] = [ handler ]
return callback null
## ACK/NACK messages
ackMessage: (message) ->
return
nackMessage: (message) ->
return
_onMessage: (topic, message) ->
return if not @client
return if not Object.keys(@subscribers).length > 0
msg = null
try
msg = JSON.parse message.toString()
catch e
debug 'failed to parse discovery message', e
msg = message.toString()
handlers = @subscribers[topic]
debug 'message', handlers.length, msg != null
return if not handlers
out =
data: msg
mqtt: message
for handler in handlers
handler out
registerParticipant: (part, callback) ->
msg =
protocol: 'discovery'
command: 'participant'
payload: part
@sendTo 'inqueue', 'fbp', msg, callback
class MessageBroker extends Client
constructor: (address, options) ->
super address, options
routing.binderMixin this
# Participant registration
subscribeParticipantChange: (handler, callback) ->
defaultCallback = (err) ->
if err
console.err "Error in msgflo.mqtt.subscribeParticipantChange, and no callback added", err
callback = defaultCallback if not callback
@createQueue '', 'fbp', (err) =>
return callback err if err
@subscribeToQueue 'fbp', handler, (err) ->
return callback err
exports.Client = Client
exports.MessageBroker = MessageBroker