Skip to content

Commit c97aa38

Browse files
authored
Fix Tacview continuation-line parsing in telemetry stream (#693)
## Summary This fixes telemetry parsing for ACMI streams containing multi-line continuation records, such as long global `Comments` values in Tacview exports and realtime streams. ## Root Cause `pkg/telemetry/streamer.go` was reading from the same `bufio.Reader` in two different places: - the background goroutine in `handleLines()` - `handleLine()` when it detected a continuation line ending with `\` That allowed reads to interleave. When a multi-line global property was present, part of the continuation block and the following record could be spliced together, leading to parse errors like: `strconv.ParseUint: parsing "====================0": invalid syntax` ## Changes - make the background reader goroutine the sole owner of ACMI stream reads - add `readACMILine()` to assemble full logical ACMI records, including continuation lines, before parsing - simplify `handleLine()` so it only parses an already-assembled record - add regression tests for a sample-like multi-line `Comments` record followed by the next global update ## Validation - `make test`
1 parent 458af9d commit c97aa38

2 files changed

Lines changed: 80 additions & 16 deletions

File tree

pkg/telemetry/streamer.go

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ func (c *streamingClient) handleLines(ctx context.Context, reader *bufio.Reader)
218218
go func() {
219219
defer close(lines)
220220
for {
221-
line, err := reader.ReadString('\n')
221+
line, err := readACMILine(reader)
222222
select {
223223
case lines <- result{line, err}:
224224
case <-ctx.Done():
@@ -252,29 +252,37 @@ func (c *streamingClient) handleLines(ctx context.Context, reader *bufio.Reader)
252252
}
253253
return fmt.Errorf("error reading line: %w", r.err)
254254
}
255-
if err := c.handleLine(r.line, reader); err != nil {
255+
if err := c.handleLine(r.line); err != nil {
256256
return fmt.Errorf("error reading ACMI stream: %w", err)
257257
}
258258
}
259259
}
260260
}
261261

262-
func (c *streamingClient) handleLine(line string, reader *bufio.Reader) error {
263-
if strings.HasSuffix(line, "\\\n") {
264-
var sb strings.Builder
265-
sb.WriteString(line[:len(line)-2])
266-
for {
267-
next, err := reader.ReadString('\n')
268-
if err != nil {
269-
return fmt.Errorf("error reading continuation line: %w", err)
270-
}
271-
sb.WriteString(next)
272-
if !strings.HasSuffix(next, "\\\n") {
273-
break
274-
}
262+
func readACMILine(reader *bufio.Reader) (string, error) {
263+
line, err := reader.ReadString('\n')
264+
if err != nil {
265+
return line, err
266+
}
267+
if !strings.HasSuffix(line, "\\\n") {
268+
return line, nil
269+
}
270+
271+
var sb strings.Builder
272+
sb.WriteString(line[:len(line)-2])
273+
for {
274+
next, readErr := reader.ReadString('\n')
275+
if readErr != nil {
276+
return "", fmt.Errorf("error reading continuation line: %w", readErr)
277+
}
278+
sb.WriteString(next)
279+
if !strings.HasSuffix(next, "\\\n") {
280+
return sb.String(), nil
275281
}
276-
line = sb.String()
277282
}
283+
}
284+
285+
func (c *streamingClient) handleLine(line string) error {
278286
line = strings.TrimSpace(line)
279287
if line == "" {
280288
return nil

pkg/telemetry/streamer_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package telemetry
2+
3+
import (
4+
"bufio"
5+
"strings"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
func TestReadACMILineContinuationPreservesNextRecord(t *testing.T) {
14+
t.Parallel()
15+
16+
reader := bufio.NewReader(strings.NewReader(strings.Join([]string{
17+
"0,Comments=DCS Retribution Turn 6\\",
18+
"====================\\",
19+
"Most briefing information can be found on your kneeboard.",
20+
"0,ReferenceTime=1989-09-13T10:08:31Z",
21+
}, "\n") + "\n"))
22+
23+
first, err := readACMILine(reader)
24+
require.NoError(t, err)
25+
assert.Equal(
26+
t,
27+
"0,Comments=DCS Retribution Turn 6====================\\\nMost briefing information can be found on your kneeboard.\n",
28+
first,
29+
)
30+
31+
second, err := readACMILine(reader)
32+
require.NoError(t, err)
33+
assert.Equal(t, "0,ReferenceTime=1989-09-13T10:08:31Z\n", second)
34+
}
35+
36+
func TestHandleLineAcceptsContinuationRecordBeforeNextObject(t *testing.T) {
37+
t.Parallel()
38+
39+
client := newStreamingClient(time.Second)
40+
reader := bufio.NewReader(strings.NewReader(strings.Join([]string{
41+
"0,Comments=DCS Retribution Turn 6\\",
42+
"====================\\",
43+
"Most briefing information can be found on your kneeboard.",
44+
"0,ReferenceTime=1989-09-13T10:08:31Z",
45+
}, "\n") + "\n"))
46+
47+
line, err := readACMILine(reader)
48+
require.NoError(t, err)
49+
require.NoError(t, client.handleLine(line))
50+
51+
line, err = readACMILine(reader)
52+
require.NoError(t, err)
53+
require.NoError(t, client.handleLine(line))
54+
55+
assert.Equal(t, time.Date(1989, 9, 13, 10, 8, 31, 0, time.UTC), client.Time())
56+
}

0 commit comments

Comments
 (0)