Skip to content

Commit a6f6312

Browse files
committed
fix(plc4go): make BACnet/IP driver actually round-trip against bacpypes3
End-to-end integration testing against bacpypes3 0.0.102 surfaced a stack of bugs that masked each other; this commit fixes them and adds the regression tests (unit + dockerized integration) that pin the behavior down. Driver bugs - MessageCodec.handleCustomMessage unconditionally returned true, swallowing every parsed BVLC before HandleMessages could match it. Read/Write/Subscribe responses arrived on the wire but never woke their expectations. Replace with a no-op keepReceiveLoopActive that returns false so the default expectation-matching path runs while still keeping the receive worker awake when expectations are empty (needed so unsolicited COV notifications get drained). - Writer's WritePropertyMultiple path used wrong BACnet context tag numbers inside BACnetPropertyWriteDefinition (2/3/3 instead of 0/1/2), causing bacpypes3 REJECT(INVALID_TAG). Fix the tag numbers and parameterize constructedDataFromAppTag so single-write keeps its [3] wrapper. - ValueDecoder fell through to PlcSTRING for BACnet *Tagged enum wrappers (BACnetBinaryPVTagged etc.), making BV reads unusable. Add a reflection- based taggedEnumToPlcValue that surfaces them as PlcUDINT/PlcLINT/PlcBOOL. - Subscriber.dispatchNotification only inspected ApplicationTag on COV elements, dropping ConstructedData-framed values to PlcNULL. Add the ConstructedData branch. - ValueEncoder sent Boolean application tags for bool writes; Binary PresentValue requires Enumerated and BACnet stacks REJECT otherwise. Extend hintForProperty to take object type and return hintEnumerated for Binary/* PRESENT_VALUE; encoder maps bool -> Enumerated(0|1) under it. - Driver was binding to an ephemeral UDP source port; spec-conformant peers reply to UDP/47808 regardless of source, so responses got dropped. Bind local port to 47808 by default, overridable via the local-port driver option for processes that need multiple BACnet connections. - UDP transport's WriteToUDP rejected DialUDP-connected sockets with ErrWriteToConnected. Discriminate on udpConn.RemoteAddr() and use plain Write for connected sockets, WriteToUDP for ListenUDP sockets. Cross-driver fix - spi/model/DefaultPlcUnsubscriptionRequestBuilder.AddHandles appended to the local parameter (`subscriptionHandles = append(subscriptionHandles, subscriptionHandles...)`) and never stored anything on the builder, so every caller got back an empty request. Fix to append to d.subscriptionHandles. API additions on the BACnet Connection - Wire up UnsubscriptionRequestBuilder (was inherited as panicking stub). - Make plcTag a PlcSubscriptionTag (GetPlcSubscriptionType, GetDuration) and ValueHandler override NewPlcValue so written values survive the type-switch. - Add Framing.wrapAPDU helper: APDU -> NPDU -> BVLCOriginalUnicastNPDU, required because MessageCodec.Send hard-casts to BVLC. Reader/Writer/ Subscriber now wrap their requests via wrapAPDU instead of passing raw APDUs that triggered a panic. Dockerized integration suite (new pattern for plc4go) - Two-container compose on a private bridge so both sides bind UDP/47808 in their own network namespace. bacnet-device runs bacpypes3 (with a monkey-patched do_WritePropertyMultipleRequest because 0.0.102 ships a raise-UnrecognizedService stub); test-runner runs `go test -tags integration`. - 16 integration tests cover Discover, Read (AV/BV/MSV/CharacterString/ UnknownObject), Write (AV/BV/MSV/ReadOnly), WritePropertyMultiple, multi-tag Read, concurrent Read, Subscribe (initial + sawtooth-driven COV), and Unsubscribe. - `make integration-bacnetip` builds, runs, and tears down with the test-runner's exit code so a failed test fails the make target. Regression unit tests - MessageCodec_test pins Send/Receive behavior and the keepReceiveLoopActive==false invariant — flipping it back to true is what hid the bug originally. - Framing_test locks wrapAPDU's BVLC type, protocolVersion=1, and the no-routing local-scope contract. - ValueDecoder_test exercises BACnetBinaryPVTagged and every reflection Kind branch in taggedEnumToPlcValue. - Subscriber_test adds a ConstructedData-branch COV notification. - Writer_test asserts the WPM context tag numbers (0/1/2) and the single-write tag numbers (1/2/3) directly, so a future helper-sharing refactor can't silently swap them.
1 parent d666d65 commit a6f6312

25 files changed

Lines changed: 1680 additions & 176 deletions

plc4go/Makefile

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,15 @@ test: compile
6161
test-readable: compile
6262
@GOPATH=$(GOPATH) GOBIN=$(GOBIN) go tool -modfile=tools.mod gotestsum ./...
6363

64-
## integration-bacnetip: Bring up the bacpypes3 simulator + run the BACnet/IP integration tests.
65-
## Tears the container down on exit even if a test fails.
64+
## integration-bacnetip: Build + run the bacpypes3 simulator + Go test-runner in their own
65+
## network namespaces on a shared docker bridge, then tear everything down. The compose exit
66+
## code follows the test-runner so `make integration-bacnetip` mirrors a normal go-test failure.
6667
integration-bacnetip:
67-
@docker compose -f tests/integration/bacnetip/docker-compose.yml up -d --build
68-
@trap 'docker compose -f tests/integration/bacnetip/docker-compose.yml down' EXIT; \
69-
BACNET_IT=1 go test -tags integration ./tests/integration/bacnetip/... -v -count=1 -timeout 120s
68+
@docker compose -f tests/integration/bacnetip/docker-compose.yml up \
69+
--build --abort-on-container-exit --exit-code-from test-runner; \
70+
status=$$?; \
71+
docker compose -f tests/integration/bacnetip/docker-compose.yml down; \
72+
exit $$status
7073

7174
test-readable-mvn: compile
7275
$(MVNBIN) mvn-golang-wrapper:custom@readable-test

plc4go/internal/bacnetip/Connection.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,14 @@ func (c *Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionReques
231231
)
232232
}
233233

234+
func (c *Connection) UnsubscriptionRequestBuilder() apiModel.PlcUnsubscriptionRequestBuilder {
235+
// The default request implementation dispatches each handle's
236+
// Unsubscribe back through the embedded Subscriber, so we don't need
237+
// to pass our own here — the SubscriptionHandles created by Subscribe
238+
// already carry the Subscriber reference.
239+
return spiModel.NewDefaultPlcUnsubscriptionRequestBuilder()
240+
}
241+
234242
func (c *Connection) addSubscriber(subscriber *Subscriber) {
235243
if slices.Contains(c.subscribers, subscriber) {
236244
c.log.Debug().Interface("subscriber", subscriber).Msg("Subscriber already added")

plc4go/internal/bacnetip/Driver.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ package bacnetip
2222
import (
2323
"context"
2424
"math"
25+
"net"
2526
"net/url"
2627
"strconv"
2728
"time"
@@ -36,6 +37,7 @@ import (
3637
"github.com/apache/plc4x/plc4go/spi/options"
3738
"github.com/apache/plc4x/plc4go/spi/transactions"
3839
"github.com/apache/plc4x/plc4go/spi/transports"
40+
"github.com/apache/plc4x/plc4go/spi/transports/udp"
3941
"github.com/apache/plc4x/plc4go/spi/utils"
4042
)
4143

@@ -87,16 +89,44 @@ func (d *Driver) GetConnection(ctx context.Context, transportUrl url.URL, transp
8789
if _, ok := driverOptions["so-reuse"]; !ok {
8890
driverOptions["so-reuse"] = []string{"true"}
8991
}
90-
// Have the transport create a new transport-instance.
91-
transportInstance, err := transport.CreateTransportInstance(
92+
// BACnet/IP uses port 47808 on both sides of a conversation; spec-conformant
93+
// peers (bacpypes3, EcoStruxure, Niagara, ...) send unsolicited messages
94+
// and responses back to the well-known port regardless of the request's
95+
// source port. The generic transport.CreateTransportInstance dials with
96+
// LocalAddress=nil, which gives us an ephemeral source — fine for protocols
97+
// that reply to the source port, but for BACnet that means responses get
98+
// dropped by the kernel.
99+
//
100+
// Use CreateTransportInstanceForLocalAddress with a fixed 0.0.0.0:47808
101+
// bind. Callers that need to co-locate multiple BACnet connections in one
102+
// process can override via the "local-port" driver option (uint), or 0
103+
// for explicit ephemeral.
104+
localPort := int(model.BacnetConstants_BACNETUDPDEFAULTPORT)
105+
if val, ok := driverOptions["local-port"]; ok && len(val) > 0 {
106+
if parsed, parseErr := strconv.Atoi(val[0]); parseErr != nil {
107+
connectionLog.Warn().Err(parseErr).Str("local-port", val[0]).Msg("ignoring invalid local-port option")
108+
} else {
109+
localPort = parsed
110+
}
111+
}
112+
localAddress := &net.UDPAddr{IP: net.IPv4zero, Port: localPort}
113+
connectionLog.Info().Stringer("localAddress", localAddress).Msg("BACnet driver binding local UDP")
114+
115+
udpTransport, ok := transport.(*udp.Transport)
116+
if !ok {
117+
return nil, errors.Errorf("BACnet/IP requires the udp transport; got %T", transport)
118+
}
119+
transportInstance, err := udpTransport.CreateTransportInstanceForLocalAddress(
92120
transportUrl,
93121
driverOptions,
122+
localAddress,
94123
append(d._options, options.WithCustomLogger(connectionLog))...,
95124
)
96125
if err != nil {
97126
connectionLog.Error().
98127
Stringer("transportUrl", &transportUrl).
99128
Strs("defaultUdpPort", driverOptions["defaultUdpPort"]).
129+
Int("localPort", localPort).
100130
Msg("We couldn't create a transport instance for port")
101131
return nil, errors.Wrapf(err, "couldn't initialize transport configuration for given transport url %s", transportUrl.String())
102132
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* https://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package bacnetip
21+
22+
import (
23+
"github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
24+
)
25+
26+
// wrapAPDU encapsulates an APDU in the BACnet/IP NPDU + BVLC layers expected
27+
// by MessageCodec.Send (which type-asserts to model.BVLC). Without this
28+
// wrapping the codec panics on the cast and the request silently dies.
29+
//
30+
// expectingReply is set for confirmed requests; unconfirmed requests pass
31+
// false. The NPDU is intentionally local-only (no DNET/SNET) because routed
32+
// addressing happens at the Tag layer in Phase 6.
33+
func wrapAPDU(apdu model.APDU, expectingReply bool) model.BVLC {
34+
control := model.NewNPDUControl(
35+
false, // messageTypeFieldPresent
36+
false, // destinationSpecified
37+
false, // sourceSpecified
38+
expectingReply,
39+
model.NPDUNetworkPriority_NORMAL_MESSAGE,
40+
)
41+
npdu := model.NewNPDU(
42+
1, // protocolVersionNumber
43+
control,
44+
nil, // destinationNetworkAddress
45+
nil, // destinationLength
46+
nil, // destinationAddress
47+
nil, // sourceNetworkAddress
48+
nil, // sourceLength
49+
nil, // sourceAddress
50+
nil, // hopCount
51+
nil, // nlm
52+
apdu,
53+
)
54+
return model.NewBVLCOriginalUnicastNPDU(npdu)
55+
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* https://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package bacnetip
21+
22+
import (
23+
"testing"
24+
25+
"github.com/stretchr/testify/assert"
26+
"github.com/stretchr/testify/require"
27+
28+
readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
29+
)
30+
31+
func newWhoIsAPDU(t *testing.T) readWriteModel.APDU {
32+
t.Helper()
33+
whoIs := readWriteModel.NewBACnetUnconfirmedServiceRequestWhoIs(nil, nil)
34+
return readWriteModel.NewAPDUUnconfirmedRequest(whoIs)
35+
}
36+
37+
func TestWrapAPDU_ProducesBVLCOriginalUnicastNPDU(t *testing.T) {
38+
bvlc := wrapAPDU(newWhoIsAPDU(t), false)
39+
require.NotNil(t, bvlc)
40+
// MessageCodec.Send type-asserts to BVLCOriginalUnicastNPDU on the
41+
// send-path; wrapAPDU must produce exactly that type.
42+
_, ok := bvlc.(readWriteModel.BVLCOriginalUnicastNPDU)
43+
assert.True(t, ok, "wrapAPDU must return a BVLCOriginalUnicastNPDU, got %T", bvlc)
44+
}
45+
46+
func TestWrapAPDU_NPDUProtocolVersionIs1(t *testing.T) {
47+
// BACnet stacks reject NPDUs with a wrong protocol version. Pin it
48+
// to 1 (the only spec-valid value) so an accidental change shows up
49+
// as a test failure rather than a wire-protocol incompatibility.
50+
bvlc := wrapAPDU(newWhoIsAPDU(t), false).(readWriteModel.BVLCOriginalUnicastNPDU)
51+
assert.Equal(t, uint8(1), bvlc.GetNpdu().GetProtocolVersionNumber())
52+
}
53+
54+
func TestWrapAPDU_ExpectingReplyPropagatesToControl(t *testing.T) {
55+
cases := []struct {
56+
name string
57+
expectingReply bool
58+
}{
59+
{"confirmed-request-sets-flag", true},
60+
{"unconfirmed-broadcast-clears-flag", false},
61+
}
62+
for _, tc := range cases {
63+
t.Run(tc.name, func(t *testing.T) {
64+
bvlc := wrapAPDU(newWhoIsAPDU(t), tc.expectingReply).(readWriteModel.BVLCOriginalUnicastNPDU)
65+
control := bvlc.GetNpdu().GetControl()
66+
assert.Equal(t, tc.expectingReply, control.GetExpectingReply(),
67+
"NPDU control.expectingReply must reflect the wrapAPDU argument")
68+
})
69+
}
70+
}
71+
72+
func TestWrapAPDU_LocalScope_NoRouting(t *testing.T) {
73+
// We only support local (same-network) addressing in Phase 6's tag layer.
74+
// The NPDU control fields for routing must all be off so bacpypes3/Niagara
75+
// don't interpret the message as routed.
76+
bvlc := wrapAPDU(newWhoIsAPDU(t), true).(readWriteModel.BVLCOriginalUnicastNPDU)
77+
control := bvlc.GetNpdu().GetControl()
78+
assert.False(t, control.GetMessageTypeFieldPresent())
79+
assert.False(t, control.GetDestinationSpecified())
80+
assert.False(t, control.GetSourceSpecified())
81+
// Routing fields (DNET/DLEN/DADR + SNET/SLEN/SADR + HopCount + NLM) must
82+
// be nil — otherwise a peer treats it as a routed frame.
83+
npdu := bvlc.GetNpdu()
84+
assert.Nil(t, npdu.GetDestinationNetworkAddress())
85+
assert.Nil(t, npdu.GetDestinationLength())
86+
assert.Nil(t, npdu.GetDestinationAddress())
87+
assert.Nil(t, npdu.GetSourceNetworkAddress())
88+
assert.Nil(t, npdu.GetSourceLength())
89+
assert.Nil(t, npdu.GetSourceAddress())
90+
assert.Nil(t, npdu.GetHopCount())
91+
assert.Nil(t, npdu.GetNlm())
92+
}
93+
94+
func TestWrapAPDU_PreservesAPDU(t *testing.T) {
95+
apdu := newWhoIsAPDU(t)
96+
bvlc := wrapAPDU(apdu, false).(readWriteModel.BVLCOriginalUnicastNPDU)
97+
// Same APDU identity should be reachable through the wrapper —
98+
// MessageCodec.Receive parses BVLC → NPDU → APDU and Reader/Writer
99+
// match expectations by walking that chain.
100+
assert.Equal(t, apdu, bvlc.GetNpdu().GetApdu())
101+
}
102+
103+
func TestWrapAPDU_SerializesToValidBVLC(t *testing.T) {
104+
// End-to-end sanity: the wrapped message round-trips through the
105+
// model serializer. Catches accidental nil-required-field changes
106+
// in wrapAPDU that would only show up at runtime under Send().
107+
bvlc := wrapAPDU(newWhoIsAPDU(t), false)
108+
raw, err := bvlc.Serialize()
109+
require.NoError(t, err, "wrapAPDU output must serialize")
110+
// First byte is BVLC type 0x81; second is function 0x0a (OriginalUnicastNPDU).
111+
require.GreaterOrEqual(t, len(raw), 4)
112+
assert.Equal(t, byte(0x81), raw[0], "BVLC magic byte")
113+
assert.Equal(t, byte(0x0a), raw[1], "BVLC function = OriginalUnicastNPDU")
114+
}

plc4go/internal/bacnetip/MessageCodec.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,26 @@ var (
4646

4747
func NewMessageCodec(transportInstance transports.TransportInstance, _options ...options.WithOption) *MessageCodec {
4848
codec := &MessageCodec{}
49-
codec.DefaultCodec = _default.NewDefaultCodec(codec, transportInstance, append(_options, _default.WithCustomMessageHandler(codec.handleCustomMessage))...)
49+
// Register a no-op (always-false) custom handler so the codec's receive
50+
// loop keeps polling even when there are zero outstanding expectations.
51+
// The default loop skips reading the transport when both `expectations`
52+
// is empty AND `customMessageHandling` is nil — that would mean unsolicited
53+
// COV notifications arrive in the kernel buffer but nobody drains them.
54+
// Returning false here lets the default expectation-matching path run
55+
// first, then falls through to defaultIncomingMessageChannel for the
56+
// Connection's COV-notification poller.
57+
codec.DefaultCodec = _default.NewDefaultCodec(codec, transportInstance, append(_options, _default.WithCustomMessageHandler(keepReceiveLoopActive))...)
5058
return codec
5159
}
5260

61+
// keepReceiveLoopActive is a no-op CustomMessageHandler. Its only purpose is
62+
// to set m.customMessageHandling to non-nil so the codec's receive worker
63+
// doesn't park when expectations drain to zero. Returning false defers all
64+
// real handling to HandleMessages → defaultIncomingMessageChannel.
65+
func keepReceiveLoopActive(_ context.Context, _ _default.DefaultCodecRequirements, _ spi.Message) bool {
66+
return false
67+
}
68+
5369
func (m *MessageCodec) GetCodec() spi.MessageCodec {
5470
return m
5571
}
@@ -112,11 +128,3 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) {
112128
return nil, nil
113129
}
114130

115-
func (m *MessageCodec) handleCustomMessage(ctx context.Context, _ _default.DefaultCodecRequirements, message spi.Message) bool {
116-
// For now, we just put them in the incoming channel
117-
select {
118-
case m.GetDefaultIncomingMessageChannel() <- message:
119-
case <-ctx.Done():
120-
}
121-
return true
122-
}

0 commit comments

Comments
 (0)