-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathdirect.coffee
More file actions
137 lines (109 loc) · 3.53 KB
/
direct.coffee
File metadata and controls
137 lines (109 loc) · 3.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
debug = require('debug')('msgflo:direct')
EventEmitter = require('events').EventEmitter
interfaces = require './interfaces'
routing = require './routing'
brokers = {}
class Client extends interfaces.MessagingClient
constructor: (@address, @options) ->
# console.log 'client', @address
@broker = null
## Broker connection management
connect: (callback) ->
debug 'client connect'
@broker = brokers[@address]
return callback null
disconnect: (callback) ->
debug 'client disconnect'
@broker = null
return callback null
_assertBroker: (callback) ->
err = new Error "no broker connected #{@address}" if not @broker
return callback err if err
## Manipulating queues
createQueue: (type, queueName, options, callback) ->
if not callback
callback = options
options = {}
# console.log 'client create queue', queueName
@_assertBroker callback
@broker.createQueue type, queueName, callback
removeQueue: (type, queueName, callback) ->
@_assertBroker callback
@broker.removeQueue type, queueName, callback
## Sending/Receiving messages
sendTo: (type, queueName, message, callback) ->
debug 'client sendTo', type, queueName
@_assertBroker callback
@broker.sendTo type, queueName, message, callback
subscribeToQueue: (queueName, handler, callback) ->
@_assertBroker callback
@broker.subscribeToQueue queueName, handler, callback
## ACK/NACK messages
ackMessage: (message) ->
return
nackMessage: (message) ->
return
# Participant discovery
registerParticipant: (part, callback) ->
@createQueue '', 'fbp', (err) =>
msg =
protocol: 'discovery'
command: 'participant'
payload: part
@sendTo 'outqueue', 'fbp', msg, callback
class Queue extends EventEmitter
constructor: () ->
send: (msg) ->
@_emitSend msg
_emitSend: (msg) ->
@emit 'message', msg
class MessageBroker extends interfaces.MessageBroker
constructor: (@address) ->
routing.binderMixin this
@queues = {}
# console.log 'broker', @address
## Broker connection management
connect: (callback) ->
debug 'broker connect'
brokers[@address] = this
return callback null
disconnect: (callback) ->
debug 'broker disconnect'
delete brokers[@address]
return callback null
## Manipulating queues
createQueue: (type, queueName, callback) ->
@queues[queueName] = new Queue if not @queues[queueName]?
return callback null
removeQueue: (type, queueName, callback) ->
delete @queues[queueName]
return callback null
## Sending/Receiving messages
sendTo: (type, queueName, message, callback) ->
debug 'broker sendTo', queueName
@queues[queueName].send message
return callback null
subscribeToQueue: (queueName, handler, callback) ->
@queues[queueName] = new Queue if not @queues[queueName]?
@queues[queueName].on 'message', (data) ->
out =
direct: null
data: data
return handler out
return callback null
## ACK/NACK messages
ackMessage: (message) ->
return
nackMessage: (message) ->
return
subscribeParticipantChange: (handler, callback) ->
defaultCallback = (err) ->
if err
console.err "Error in msgflo.direct.subscribeParticipantChange, and no callback added", err
callback = defaultCallback if not callback
@createQueue '', 'fbp', (err) =>
return callback err if err
@subscribeToQueue 'fbp', handler, (err) ->
return callback err
exports.MessageBroker = MessageBroker
exports.Client = Client