Skip to content

Commit 0a17a58

Browse files
author
pemrouz
committed
add request-response semantics
1 parent e08dee8 commit 0a17a58

File tree

3 files changed

+130
-98
lines changed

3 files changed

+130
-98
lines changed

dist/index.js

Lines changed: 46 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,16 @@ function sync(ripple, server) {
7171
/* istanbul ignore next */
7272
if (!_client2.default && !server) return;
7373
/* istanbul ignore next */
74-
if (!_client2.default) ripple.to = clean(ripple.to), (0, _values2.default)(ripple.types).map(headers(ripple));
74+
if (!_client2.default) ripple.to = clean(ripple.to), (0, _values2.default)(ripple.types).map(function (type) {
75+
return type.parse = headers(ripple)(type.parse);
76+
});
7577

7678
ripple.stream = stream(ripple);
79+
ripple.respond = respond(ripple);
7780
ripple.io = io(server);
7881
ripple.on('change.stream', ripple.stream()); // both - broadcast change to everyone
7982
ripple.io.on('change', consume(ripple)); // client - receive change
83+
ripple.io.on('response', response(ripple)); // client - receive response
8084
ripple.io.on('connection', function (s) {
8185
return s.on('change', consume(ripple));
8286
}); // server - receive change
@@ -87,6 +91,27 @@ function sync(ripple, server) {
8791
return ripple;
8892
}
8993

94+
var respond = function respond(ripple) {
95+
return function (socket, name, time) {
96+
return function (reply) {
97+
socket.emit('response', [name, time, reply]);
98+
};
99+
};
100+
};
101+
102+
var response = function response(ripple) {
103+
return function (_ref) {
104+
/* istanbul ignore next */
105+
var _ref2 = _slicedToArray(_ref, 3);
106+
107+
var name = _ref2[0];
108+
var time = _ref2[1];
109+
var reply = _ref2[2];
110+
111+
ripple.resources[name].body.emit('response._' + time, reply);
112+
};
113+
};
114+
90115
// send diff to all or some sockets
91116
var stream = function stream(ripple) {
92117
return function (sockets) {
@@ -143,28 +168,29 @@ var to = function to(ripple, res, change) {
143168

144169
// incoming transforms
145170
var consume = function consume(ripple) {
146-
return function (_ref) {
171+
return function (_ref3) {
147172
/* istanbul ignore next */
148-
var _ref2 = _slicedToArray(_ref, 3);
173+
var _ref4 = _slicedToArray(_ref3, 3);
149174

150-
var name = _ref2[0];
151-
var change = _ref2[1];
152-
var _ref2$ = _ref2[2];
153-
var req = _ref2$ === undefined ? {} : _ref2$;
175+
var name = _ref4[0];
176+
var change = _ref4[1];
177+
var _ref4$ = _ref4[2];
178+
var req = _ref4$ === undefined ? {} : _ref4$;
154179

155180
log('receiving', name);
156181

157182
var res = ripple.resources[name],
158183
xall = ripple.from,
159-
xtype = type(ripple)(res).from || type(ripple)(req).from,
184+
xtype = type(ripple)(res).from || type(ripple)(req).from // is latter needed?
185+
,
160186
xres = (0, _header2.default)('from')(res),
161-
types = ripple.types,
162187
next = (0, _set2.default)(change),
163-
silent = silence(this);
188+
silent = silence(this),
189+
respond = ripple.respond(this, name, change.time);
164190

165-
return xall && !xall.call(this, req, change) ? debug('skip all', name) // rejected - by xall
166-
: xtype && !xtype.call(this, req, change) ? debug('skip type', name) // rejected - by xtype
167-
: xres && !xres.call(this, req, change) ? debug('skip res', name) // rejected - by xres
191+
return xall && !xall.call(this, req, change, respond) ? debug('skip all', name) // rejected - by xall
192+
: xtype && !xtype.call(this, req, change, respond) ? debug('skip type', name) // rejected - by xtype
193+
: xres && !xres.call(this, req, change, respond) ? debug('skip res', name) // rejected - by xres
168194
: !change ? ripple(silent(req)) // accept - replace (new)
169195
: !change.key ? ripple(silent({ name: name, body: change.value })) // accept - replace at root
170196
: (silent(res), next(res.body)); // accept - deep change
@@ -178,16 +204,14 @@ var count = function count(total, name) {
178204
};
179205

180206
var headers = function headers(ripple) {
181-
return function (type) {
182-
/* istanbul ignore next */
183-
var parse = type.parse || _noop2.default;
184-
type.parse = function (res) {
207+
return function (next) {
208+
return function (res) {
185209
var existing = ripple.resources[res.name],
186210
from = (0, _header2.default)('from')(res) || (0, _header2.default)('from')(existing),
187211
to = (0, _header2.default)('to')(res) || (0, _header2.default)('to')(existing);
188212
if (from) res.headers.from = from;
189213
if (to) res.headers.to = to;
190-
return parse.apply(this, arguments), res;
214+
return next ? next(res) : res;
191215
};
192216
};
193217
};
@@ -206,10 +230,10 @@ var setIP = function setIP(socket, next) {
206230
};
207231

208232
var clean = function clean(next) {
209-
return function (_ref3, change) {
210-
var name = _ref3.name;
211-
var body = _ref3.body;
212-
var headers = _ref3.headers;
233+
return function (_ref5, change) {
234+
var name = _ref5.name;
235+
var body = _ref5.body;
236+
var headers = _ref5.headers;
213237

214238
if (change) return next ? next.apply(this, arguments) : true;
215239

src/index.js

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,28 @@ export default function sync(ripple, server){
77
if (!client && !server) return
88
if (!client)
99
ripple.to = clean(ripple.to)
10-
, values(ripple.types).map(headers(ripple))
10+
, values(ripple.types).map(type => type.parse = headers(ripple)(type.parse))
1111

1212
ripple.stream = stream(ripple)
13+
ripple.respond = respond(ripple)
1314
ripple.io = io(server)
1415
ripple.on('change.stream', ripple.stream()) // both - broadcast change to everyone
1516
ripple.io.on('change', consume(ripple)) // client - receive change
17+
ripple.io.on('response', response(ripple)) // client - receive response
1618
ripple.io.on('connection', s => s.on('change', consume(ripple))) // server - receive change
1719
ripple.io.on('connection', s => ripple.stream(s)()) // server - send all resources to new client
1820
ripple.io.use(setIP)
1921
return ripple
2022
}
2123

24+
const respond = ripple => (socket, name, time) => reply => {
25+
socket.emit('response', [ name, time, reply ])
26+
}
27+
28+
const response = ripple => function([ name, time, reply ]) {
29+
ripple.resources[name].body.emit('response._' + time, reply)
30+
}
31+
2232
// send diff to all or some sockets
2333
const stream = ripple => sockets => (name, change) => {
2434
if (!name) return values(ripple.resources)
@@ -71,17 +81,17 @@ const to = (ripple, res, change) => socket => {
7181
const consume = ripple => function([name, change, req = {}]) {
7282
log('receiving', name)
7383

74-
const res = ripple.resources[name]
75-
, xall = ripple.from
76-
, xtype = type(ripple)(res).from || type(ripple)(req).from
77-
, xres = header('from')(res)
78-
, types = ripple.types
79-
, next = set(change)
80-
, silent = silence(this)
81-
82-
return xall && !xall.call(this, req, change) ? debug('skip all' , name) // rejected - by xall
83-
: xtype && !xtype.call(this, req, change) ? debug('skip type', name) // rejected - by xtype
84-
: xres && !xres.call(this, req, change) ? debug('skip res' , name) // rejected - by xres
84+
const res = ripple.resources[name]
85+
, xall = ripple.from
86+
, xtype = type(ripple)(res).from || type(ripple)(req).from // is latter needed?
87+
, xres = header('from')(res)
88+
, next = set(change)
89+
, silent = silence(this)
90+
, respond = ripple.respond(this, name, change.time)
91+
92+
return xall && !xall.call(this, req, change, respond) ? debug('skip all' , name) // rejected - by xall
93+
: xtype && !xtype.call(this, req, change, respond) ? debug('skip type', name) // rejected - by xtype
94+
: xres && !xres.call(this, req, change, respond) ? debug('skip res' , name) // rejected - by xres
8595
: !change ? ripple(silent(req)) // accept - replace (new)
8696
: !change.key ? ripple(silent({ name, body: change.value })) // accept - replace at root
8797
: (silent(res), next(res.body)) // accept - deep change
@@ -93,16 +103,13 @@ const count = (total, name) => tally => debug(
93103
, 'sending', name
94104
)
95105

96-
const headers = ripple => type => {
97-
const parse = type.parse || noop
98-
type.parse = function(res){
99-
const existing = ripple.resources[res.name]
100-
, from = header('from')(res) || header('from')(existing)
101-
, to = header('to')(res) || header('to')(existing)
102-
if (from) res.headers.from = from
103-
if (to) res.headers.to = to
104-
return parse.apply(this, arguments), res
105-
}
106+
const headers = ripple => next => res => {
107+
const existing = ripple.resources[res.name]
108+
, from = header('from')(res) || header('from')(existing)
109+
, to = header('to')(res) || header('to')(existing)
110+
if (from) res.headers.from = from
111+
if (to) res.headers.to = to
112+
return next ? next(res) : res
106113
}
107114

108115
const io = opts => {

0 commit comments

Comments
 (0)