-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrx.go
370 lines (344 loc) · 10.8 KB
/
rx.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
package canard
import (
"errors"
"unsafe"
)
// Contains OpenCyphal receive logic. Exported functions first.
func (ins *Instance) Accept(timestamp microsecond, frame *Frame, rti uint8, outTx *Transfer, outSub *Sub) error {
switch {
case ins == nil || outTx == nil || frame == nil:
return ErrInvalidArgument
case len(frame.payload) == 0:
return errEmptyPayload
}
model := FrameModel{}
err := rxTryParseFrame(timestamp, frame, &model)
if err != nil {
return err
}
if !model.dstNode.IsUnset() && ins.NodeID != model.dstNode {
return ErrBadDstAddr
}
// This is the reason the function has a logarithmic time complexity of the number of subscriptions.
// Note also that this one of the two variable-complexity operations in the RX pipeline; the other one
// is memcpy(). Excepting these two cases, the entire RX pipeline contains neither loops nor recursion.
portCp := model.port
got, err := search(&ins.rxSub[model.txKind], &portCp, predicateOnPortID, nil)
if errors.Is(err, ErrAVLNilRoot) || errors.Is(err, ErrAVLNodeNotFound) {
return ErrNoMatchingSub
}
if err != nil {
return err
}
sub := (*Sub)(unsafe.Pointer(got))
if outSub != nil {
outSub = sub
}
if sub == nil {
return ErrNoMatchingSub
}
if sub.port != model.port {
return errors.New("TODO sub port not equal to model port")
}
return rxAcceptFrame(ins.NodeID, sub, &model, rti, outTx)
}
func (ins *Instance) Subscribe(kind TxKind, port PortID, extent int, tidTimeout microsecond, outSub *Sub) error {
switch {
case outSub == nil:
return ErrInvalidArgument
case kind >= numberOfTxKinds:
return ErrTransferKind
}
err := ins.Unsubscribe(kind, port)
if err != nil {
return err
}
outSub.tidTimeout = tidTimeout
outSub.extent = extent
outSub.port = port
got, err := search(&ins.rxSub[kind], outSub, predicateOnStruct, avlTrivialFactory)
if err != nil {
return err
}
if got != &outSub.base {
panic("bad search result")
}
return nil
}
func (ins *Instance) Unsubscribe(kind TxKind, port PortID) error {
switch {
case kind >= numberOfTxKinds:
return ErrTransferKind
}
portcp := port
got, err := search(&ins.rxSub[kind], &portcp, predicateOnPortID, nil)
if errors.Is(err, ErrAVLNilRoot) || errors.Is(err, ErrAVLNodeNotFound) {
return nil // Node not exist, no need to remove.
}
if err != nil {
return err
}
sub := (*Sub)(unsafe.Pointer(got))
if got == nil || sub == nil {
return nil
}
remove(&ins.rxSub[kind], &sub.base)
if sub.port != port {
panic("bad search result")
}
return nil
}
func (ins *Instance) GetSubs(kind TxKind) (subs []*Sub) {
switch {
case kind >= numberOfTxKinds:
panic("invalid kind")
}
ins.rxSub[kind].traverse(0, func(n *TreeNode) {
sub := (*Sub)(unsafe.Pointer(n))
subs = append(subs, sub)
})
return subs
}
// Below is private API.
type internalRxSession struct {
txTimestamp microsecond
totalPayloadSize int
payloadSize int
payload []byte
crc CRC
tid TID
// Redundant Transport Index
rti uint8
toggle bool
}
func rxSessionWritePayload(rxs *internalRxSession, extent, payloadSize int, payload []byte) error {
switch {
case rxs == nil:
return ErrInvalidArgument
case len(payload) == 0 && payloadSize != 0:
return errEmptyPayload
case len(rxs.payload) > extent || len(rxs.payload) > rxs.totalPayloadSize:
// CANARD_ASSERT((payload != NULL) || (payload_size == 0U)); unreachable in go
return errTODO
}
rxs.totalPayloadSize += payloadSize
if cap(rxs.payload) == 0 && extent > 0 {
if rxs.payloadSize != 0 {
panic("assert rxs.payloadSize == 0")
}
// Allocate the payload lazily, as late as possible.
rxs.payload = make([]byte, extent)
}
bytesToCopy := payloadSize
if rxs.payloadSize+payloadSize > extent {
bytesToCopy = extent - rxs.payloadSize
if rxs.payloadSize > extent || rxs.payloadSize+bytesToCopy != extent || bytesToCopy >= payloadSize {
panic("assert payload bounds rxSessionWritePayload")
}
}
n := copy(rxs.payload[:rxs.payloadSize], payload[:bytesToCopy])
if n != bytesToCopy {
panic("insufficient rxs mem")
}
rxs.payloadSize += bytesToCopy
if rxs.payloadSize > extent {
panic("rxs payload exceed extent")
}
return nil
}
func rxAcceptFrame(dst NodeID, sub *Sub, frame *FrameModel, rti uint8, outTx *Transfer) error {
switch {
case sub == nil || frame == nil || outTx == nil:
return ErrInvalidArgument
case len(frame.payload) == 0:
return errEmptyPayload
case frame.tid > TRANSFER_ID_MAX:
return ErrBadTransferID
case !frame.dstNode.IsUnset() && dst != frame.dstNode:
return ErrBadDstAddr
case !frame.srcNode.IsValid():
return ErrInvalidNodeID
}
if frame.srcNode <= NODE_ID_MAX {
// If such session does not exist, create it. This only makes sense if this is the first frame of a
// transfer, otherwise, we won't be able to receive the transfer anyway so we don't bother.
if sub.sessions[frame.srcNode] == nil && frame.txStart {
sub.sessions[frame.srcNode] = &internalRxSession{
txTimestamp: frame.timestamp,
crc: newCRC(),
tid: frame.tid,
rti: rti,
toggle: true, // INITIAL_TOGGLE_STATE
}
}
if sub.sessions[frame.srcNode] != nil {
return rxSessionUpdate(sub.sessions[frame.srcNode], frame,
rti, sub.tidTimeout, sub.extent, outTx)
}
} else {
// Anonymous transfer. Must allocate according to libcanard.
payloadSize := min(sub.extent, frame.payloadSize)
payload := make([]byte, payloadSize)
//rxInitTransferMetadataFromFrame(frame, &out_transfer->metadata);
outTx.timestamp = frame.timestamp
outTx.payloadSize = payloadSize
outTx.payload = payload
copy(payload, frame.payload[:payloadSize])
}
return nil
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
func rxSessionUpdate(rxs *internalRxSession, frame *FrameModel, rti uint8, txIdTimeout microsecond, extent int, outTx *Transfer) error {
switch {
case rxs == nil || frame == nil || outTx == nil:
return ErrInvalidArgument
case rxs.tid > TRANSFER_ID_MAX || frame.tid > TRANSFER_ID_MAX:
return ErrBadTransferID
}
TIDTimeOut := frame.timestamp > rxs.txTimestamp && (frame.timestamp-rxs.txTimestamp) > txIdTimeout
notPreviousTID := rxComputeTransferIDDifference(rxs.tid, frame.tid) > 1
needRestart := TIDTimeOut || (rxs.rti == rti && frame.txStart && notPreviousTID)
if needRestart {
rxs.reset(frame.tid, rti)
}
if needRestart && !frame.txStart {
// SOT miss. Following is equivalent to rxSessionRestart in libcanard
rxs.reset((rxs.tid+1)&TRANSFER_ID_MAX, rxs.rti) // RTI is retained
rxs.payload = nil
return errTODO // freed.
}
correctTransport := rxs.rti == rti
correctToggle := frame.toggle == rxs.toggle
correctTID := frame.tid == rxs.tid
if correctTransport && correctToggle && correctTID {
return rxSessionAcceptFrame(rxs, frame, extent, outTx)
}
return errTODO
}
func rxComputeTransferIDDifference(a, b TID) uint8 {
diff := int16(a) - int16(b)
if diff < 0 {
diff += 1 << TRANSFER_ID_BIT_LENGTH
}
return uint8(diff)
}
func rxSessionAcceptFrame(rxs *internalRxSession, frame *FrameModel, extent int, outTx *Transfer) error {
switch {
case rxs == nil || frame == nil || outTx == nil:
return ErrInvalidArgument
case len(frame.payload) == 0:
return errEmptyPayload
case frame.tid > TRANSFER_ID_MAX:
return ErrBadTransferID
}
if frame.txStart {
rxs.txTimestamp = frame.timestamp
}
singleFrame := frame.txStart && frame.txEnd
if !singleFrame {
rxs.crc = rxs.crc.Add(frame.payload[:frame.payloadSize])
}
err := rxSessionWritePayload(rxs, extent, frame.payloadSize, frame.payload)
if err != nil {
// OOM session restart here.
return err
}
if !frame.txEnd {
rxs.toggle = !rxs.toggle
return errTODO // not sure if this correct
}
if !singleFrame && rxs.crc != 0 {
rxs.reset(rxs.tid+1, rxs.rti)
return nil
}
outTx.metadata.fromRxFrame(frame)
outTx.timestamp = rxs.txTimestamp
outTx.payloadSize = rxs.payloadSize
outTx.payload = rxs.payload
if rxs.totalPayloadSize < rxs.payloadSize {
panic("OOB rxs payload")
}
truncatedAmount := rxs.totalPayloadSize - rxs.payloadSize
const CRC_SIZE = 2 // bytes
if !singleFrame && CRC_SIZE > truncatedAmount {
if outTx.payloadSize < 2-truncatedAmount {
panic("OOB crc not fit in payload")
}
outTx.payloadSize -= 2 - truncatedAmount
}
// Ownership passed over to the application, nullify to prevent modifying.
rxs.payload = nil
rxs.reset(rxs.tid+1, rxs.rti)
return nil
}
func rxTryParseFrame(ts microsecond, frame *Frame, out *FrameModel) error {
switch {
case frame == nil || out == nil:
return ErrInvalidArgument
case frame.payloadSize == 0:
return errEmptyPayload
}
valid := false
canID := frame.extendedCANID
out.timestamp = ts
out.prority = Priority(canID>>offset_Priority) & priorityMask
out.srcNode = NodeID(canID & NODE_ID_MAX)
if canID&FLAG_SERVICE_NOT_MESSAGE == 0 {
out.txKind = TxKindMessage
out.port = PortID(canID>>offset_SubjectID) & SUBJECT_ID_MAX
if canID&FLAG_ANONYMOUS_MESSAGE != 0 {
out.srcNode.Unset()
}
out.dstNode.Unset()
// Reserved bits may be unreserved in the future.
valid = ((canID & FLAG_RESERVED_23) == 0) && ((canID & FLAG_RESERVED_07) == 0)
} else {
if canID&FLAG_REQUEST_NOT_RESPONSE != 0 {
out.txKind = TxKindRequest
} else {
out.txKind = TxKindResponse
}
out.port = PortID(canID>>offset_ServiceID) & SERVICE_ID_MAX
out.dstNode = NodeID(canID>>offset_DstNodeID) & NODE_ID_MAX
// The reserved bit may be unreserved in the future. It may be used to extend the service-ID to 10 bits.
// Per Specification, source cannot be the same as the destination.
valid = ((canID & FLAG_RESERVED_23) == 0) && (out.srcNode != out.dstNode)
}
// Payload parsing.
out.payloadSize = frame.payloadSize - 1
out.payload = frame.payload // Cut off the tail byte.
// Tail byte parsing.
// No violation of MISRA.
tail := frame.payload[out.payloadSize]
out.tid = TID(tail & TRANSFER_ID_MAX)
out.txStart = (tail & TAIL_START_OF_TRANSFER) != 0
out.txEnd = (tail & TAIL_END_OF_TRANSFER) != 0
out.toggle = (tail & TAIL_TOGGLE) != 0
// Final validation.
// Protocol version check: if SOT is set, then the toggle shall also be set.
// valid = valid && ((!out->start_of_transfer) || (INITIAL_TOGGLE_STATE == out->toggle));
valid = valid && (!out.txStart || out.toggle) //
// Anonymous transfers can be only single-frame transfers.
valid = valid && ((out.txStart && out.txEnd) || out.srcNode.IsUnset())
// Non-last frames of a multi-frame transfer shall utilize the MTU fully.
valid = valid && ((out.payloadSize >= MFT_NON_LAST_FRAME_PAYLOAD_MIN) || out.txEnd)
// A frame that is a part of a multi-frame transfer cannot be empty (tail byte not included).
valid = valid && (out.payloadSize > 0 || (out.txStart && out.txEnd))
if !valid {
return errInvalidFrame
}
return nil
}
func (rxs *internalRxSession) reset(txid TID, rti uint8) {
rxs.totalPayloadSize = 0
rxs.payload = rxs.payload[:0]
rxs.crc = newCRC()
rxs.tid = txid & TRANSFER_ID_MAX
rxs.toggle = true // INITIAL TOGGLE STATE
rxs.rti = rti
}