Skip to content

Commit 2f1f55a

Browse files
committed
fix(messagev2): adjust schema per feedback
1 parent 1ccf2b0 commit 2f1f55a

6 files changed

Lines changed: 78 additions & 31 deletions

File tree

message/ipldbind/message.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,19 +45,19 @@ func (gse GraphSyncExtensions) ToExtensionsList() []graphsync.ExtensionData {
4545
type GraphSyncRequest struct {
4646
Id []byte
4747
RequestType graphsync.RequestType
48-
Priority graphsync.Priority
48+
Priority *graphsync.Priority
4949
Root *cid.Cid
5050
Selector *datamodel.Node
51-
Extensions GraphSyncExtensions
51+
Extensions *GraphSyncExtensions
5252
}
5353

5454
// GraphSyncResponse is an struct to capture data on a response sent back
5555
// in a GraphSyncMessage.
5656
type GraphSyncResponse struct {
5757
Id []byte
5858
Status graphsync.ResponseStatusCode
59-
Metadata []message.GraphSyncLinkMetadatum
60-
Extensions GraphSyncExtensions
59+
Metadata *[]message.GraphSyncLinkMetadatum
60+
Extensions *GraphSyncExtensions
6161
}
6262

6363
// GraphSyncBlock is a container for representing extension data for bindnode,

message/ipldbind/schema.ipldsch

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,15 @@
1-
type GraphSyncExtensions {String:nullable Any}
1+
################################################################################
2+
### GraphSync messaging protocol version 2 ###
3+
################################################################################
4+
5+
# UUID bytes
26
type GraphSyncRequestID bytes
7+
38
type GraphSyncPriority int
49

10+
# Extensions as a name:data map where the data is any arbitrary, valid Node
11+
type GraphSyncExtensions { String : nullable Any }
12+
513
type GraphSyncLinkAction enum {
614
# Present means the linked block was present on this machine, and is included
715
# in this message
@@ -18,6 +26,9 @@ type GraphSyncLinkAction enum {
1826
# TODO: | DuplicateDAGSkipped ("s")
1927
} representation string
2028

29+
# Metadata for each "link" in the DAG being communicated, each block gets one of
30+
# these and missing blocks also get one, with an explanation as per
31+
# GraphSyncLinkAction
2132
type GraphSyncMetadatum struct {
2233
link Link
2334
action GraphSyncLinkAction
@@ -57,38 +68,47 @@ type GraphSyncRequestType enum {
5768
| Cancel ("c")
5869
# Update means the extensions contain an update about this request
5970
| Update ("u")
60-
# Restart means restart this request from the begging, respecting the any DoNotSendCids/DoNotSendBlocks contained
61-
# in the extensions -- essentially a cancel followed by a new
71+
# Restart means restart this request from the begging, respecting the any
72+
# DoNotSendCids/DoNotSendBlocks contained in the extensions--essentially a
73+
# cancel followed by a new
6274
# TODO: | Restart ("r")
6375
} representation string
6476

6577
type GraphSyncRequest struct {
66-
id GraphSyncRequestID # unique id set on the requester side
67-
requestType GraphSyncRequestType # the request type
68-
priority GraphSyncPriority # the priority (normalized). default to 1
69-
root nullable Link # a CID for the root node in the query
70-
selector nullable Any # see https://github.com/ipld/specs/blob/master/selectors/selectors.md
71-
extensions GraphSyncExtensions # side channel information
72-
} representation tuple
78+
id GraphSyncRequestID (rename "id") # unique id set on the requester side
79+
requestType GraphSyncRequestType (rename "type") # the request type
80+
priority optional GraphSyncPriority (rename "pri") # the priority (normalized). default to 1
81+
root optional Link (rename "root") # a CID for the root node in the query
82+
selector optional Any (rename "sel") # see https://github.com/ipld/specs/blob/master/selectors/selectors.md
83+
extensions optional GraphSyncExtensions (rename "ext") # side channel information
84+
} representation map
7385

7486
type GraphSyncResponse struct {
75-
id GraphSyncRequestID # the request id we are responding to
76-
status GraphSyncResponseStatusCode # a status code.
77-
metadata GraphSyncMetadata # metadata about response
78-
extensions GraphSyncExtensions # side channel information
79-
} representation tuple
87+
id GraphSyncRequestID (rename "reqid") # the request id we are responding to
88+
status GraphSyncResponseStatusCode (rename "stat") # a status code.
89+
metadata optional GraphSyncMetadata (rename "meta") # metadata about response
90+
extensions optional GraphSyncExtensions (rename "ext") # side channel information
91+
} representation map
8092

93+
# Block data and CID prefix that can be used to reconstruct the entire CID from
94+
# the hash of the bytes
8195
type GraphSyncBlock struct {
8296
prefix Bytes # CID prefix (cid version, multicodec and multihash prefix (type + length)
8397
data Bytes
8498
} representation tuple
8599

100+
# We expect each message to contain at least one of the fields, typically either
101+
# just requests, or responses and possibly blocks with it
86102
type GraphSyncMessage struct {
87103
requests optional [GraphSyncRequest] (rename "req")
88104
responses optional [GraphSyncResponse] (rename "rsp")
89105
blocks optional [GraphSyncBlock] (rename "blk")
90106
} representation map
91107

108+
# Parent keyed union to hold the message, the root of the structure that can be
109+
# used to version the messaging format outside of the protocol and makes the
110+
# data itself more self-descriptive (i.e. `{"gs2":...` will appear at the front
111+
# of every msg)
92112
type GraphSyncMessageRoot union {
93113
| GraphSyncMessage "gs2"
94114
} representation keyed

message/v2/message.go

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,15 @@ func (mh *MessageHandler) toIPLD(gsm message.GraphSyncMessage) (*ipldbind.GraphS
7070
if root == cid.Undef {
7171
rootPtr = nil
7272
}
73+
priority := request.Priority()
74+
ext := ipldbind.NewGraphSyncExtensions(request)
7375
ibmRequests = append(ibmRequests, ipldbind.GraphSyncRequest{
7476
Id: request.ID().Bytes(),
7577
Root: rootPtr,
7678
Selector: selPtr,
77-
Priority: request.Priority(),
79+
Priority: &priority,
7880
RequestType: request.Type(),
79-
Extensions: ipldbind.NewGraphSyncExtensions(request),
81+
Extensions: &ext,
8082
})
8183
}
8284
ibm.Requests = &ibmRequests
@@ -90,11 +92,13 @@ func (mh *MessageHandler) toIPLD(gsm message.GraphSyncMessage) (*ipldbind.GraphS
9092
if !ok {
9193
return nil, fmt.Errorf("unexpected metadata type")
9294
}
95+
md := glsm.RawMetadata()
96+
ext := ipldbind.NewGraphSyncExtensions(response)
9397
ibmResponses = append(ibmResponses, ipldbind.GraphSyncResponse{
9498
Id: response.RequestID().Bytes(),
9599
Status: response.Status(),
96-
Metadata: glsm.RawMetadata(),
97-
Extensions: ipldbind.NewGraphSyncExtensions(response),
100+
Metadata: &md,
101+
Extensions: &ext,
98102
})
99103
}
100104
ibm.Responses = &ibmResponses
@@ -161,8 +165,13 @@ func (mh *MessageHandler) fromIPLD(ibm *ipldbind.GraphSyncMessageRoot) (message.
161165
continue
162166
}
163167

168+
var ext []graphsync.ExtensionData
169+
if req.Extensions != nil {
170+
ext = req.Extensions.ToExtensionsList()
171+
}
172+
164173
if req.RequestType == graphsync.RequestTypeUpdate {
165-
requests[id] = message.NewUpdateRequest(id, req.Extensions.ToExtensionsList()...)
174+
requests[id] = message.NewUpdateRequest(id, ext...)
166175
continue
167176
}
168177

@@ -176,7 +185,12 @@ func (mh *MessageHandler) fromIPLD(ibm *ipldbind.GraphSyncMessageRoot) (message.
176185
selector = *req.Selector
177186
}
178187

179-
requests[id] = message.NewRequest(id, root, selector, graphsync.Priority(req.Priority), req.Extensions.ToExtensionsList()...)
188+
var priority graphsync.Priority
189+
if req.Priority != nil {
190+
priority = graphsync.Priority(*req.Priority)
191+
}
192+
193+
requests[id] = message.NewRequest(id, root, selector, priority, ext...)
180194
}
181195
}
182196

@@ -188,10 +202,18 @@ func (mh *MessageHandler) fromIPLD(ibm *ipldbind.GraphSyncMessageRoot) (message.
188202
if err != nil {
189203
return message.GraphSyncMessage{}, err
190204
}
191-
responses[id] = message.NewResponse(id,
192-
graphsync.ResponseStatusCode(res.Status),
193-
res.Metadata,
194-
res.Extensions.ToExtensionsList()...)
205+
206+
var md []message.GraphSyncLinkMetadatum
207+
if res.Metadata != nil {
208+
md = *res.Metadata
209+
}
210+
211+
var ext []graphsync.ExtensionData
212+
if res.Extensions != nil {
213+
ext = res.Extensions.ToExtensionsList()
214+
}
215+
216+
responses[id] = message.NewResponse(id, graphsync.ResponseStatusCode(res.Status), md, ext...)
195217
}
196218
}
197219

message/v2/message_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func TestAppendingRequests(t *testing.T) {
5454
require.NoError(t, err)
5555

5656
gsrIpld := (*gsmIpld.Gs2.Requests)[0]
57-
require.Equal(t, priority, gsrIpld.Priority)
57+
require.Equal(t, priority, *gsrIpld.Priority)
5858
require.Equal(t, request.Type(), graphsync.RequestTypeNew)
5959
require.Equal(t, root, *gsrIpld.Root)
6060
require.Equal(t, selector, *gsrIpld.Selector)

responsemanager/responsemanager_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,11 @@ func TestCancellationQueryInProgress(t *testing.T) {
136136

137137
td.assertRequestCleared()
138138

139+
td.cancel()
140+
select {
141+
case <-td.ctx.Done():
142+
}
143+
139144
tracing := td.collectTracing(t)
140145
traceStrings := tracing.TracesToStrings()
141146
require.Contains(t, traceStrings, "processRequests(0)")

responsemanager/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ func (rm *ResponseManager) terminateRequest(key responseKey) {
5252
}
5353
rm.connManager.Unprotect(key.p, key.requestID.Tag())
5454
delete(rm.inProgressResponses, key)
55-
ipr.cancelFn()
5655
ipr.span.End()
56+
ipr.cancelFn()
5757
}
5858

5959
func (rm *ResponseManager) processUpdate(ctx context.Context, key responseKey, update gsmsg.GraphSyncRequest) {

0 commit comments

Comments
 (0)