Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions internal/events/webhooks/webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (wh *WebHooks) Init(ctx context.Context, config config.Section) (err error)
client := ffresty.NewWithConfig(ctx, *ffrestyConfig)

*wh = WebHooks{
ctx: log.WithLogField(ctx, "webhook", wh.connID),
ctx: log.WithLogField(ctx, "webhook", connID),
capabilities: &events.Capabilities{
BatchDelivery: true,
},
Expand Down Expand Up @@ -139,7 +139,7 @@ func (p *whPayload) firstData() fftypes.JSONObject {
}

func (wh *WebHooks) buildPayload(ctx context.Context, sub *core.Subscription, event *core.CombinedEventDataDelivery) *whPayload {
log.L(wh.ctx).Debugf("Webhook-> %s event %s on subscription %s", sub.Options.URL, event.Event.ID, sub.ID)
log.L(ctx).Debugf("Webhook-> %s event %s", sub.Options.URL, event.Event.ID)
withData := sub.Options.WithData != nil && *sub.Options.WithData
options := sub.Options.TransportOptions()
p := &whPayload{
Expand Down Expand Up @@ -393,7 +393,7 @@ func (wh *WebHooks) attemptRequest(ctx context.Context, sub *core.Subscription,

resp, err := req.r.Execute(req.method, req.url)
if err != nil {
log.L(ctx).Errorf("Webhook<- %s %s on subscription %s failed: %s", req.method, req.url, sub.ID, err)
log.L(ctx).Errorf("Webhook<- %s %s failed: %s", req.method, req.url, err)
return nil, nil, err
}
defer func() { _ = resp.RawBody().Close() }()
Expand All @@ -402,7 +402,7 @@ func (wh *WebHooks) attemptRequest(ctx context.Context, sub *core.Subscription,
Status: resp.StatusCode(),
Headers: fftypes.JSONObject{},
}
log.L(wh.ctx).Debugf("Webhook<- %s %s on subscription %s returned %d", req.method, req.url, sub.ID, res.Status)
log.L(ctx).Debugf("Webhook<- %s %s returned %d", req.method, req.url, res.Status)
header := resp.Header()
for h := range header {
res.Headers[h] = header.Get(h)
Expand Down Expand Up @@ -440,7 +440,7 @@ func (wh *WebHooks) doDelivery(ctx context.Context, connID string, reply bool, s
if gwErr != nil {
// Generate a bad-gateway error response - we always want to send something back,
// rather than just causing timeouts
log.L(wh.ctx).Errorf("Failed to invoke webhook: %s", gwErr)
log.L(ctx).Errorf("Failed to invoke webhook: %s", gwErr)
b, _ := json.Marshal(&fftypes.RESTError{
Error: gwErr.Error(),
})
Expand All @@ -453,7 +453,7 @@ func (wh *WebHooks) doDelivery(ctx context.Context, connID string, reply bool, s
}
}
b, _ := json.Marshal(&res)
log.L(wh.ctx).Tracef("Webhook response: %s", string(b))
log.L(ctx).Tracef("Webhook response: %s", string(b))

// For each event emit a response
for _, combinedEvent := range events {
Expand All @@ -465,7 +465,7 @@ func (wh *WebHooks) doDelivery(ctx context.Context, connID string, reply bool, s
txType = fftypes.FFEnum(strings.ToLower(req.replyTx))
}
if cb, ok := wh.callbacks.handlers[sub.Namespace]; ok {
log.L(wh.ctx).Debugf("Sending reply message for %s CID=%s", event.ID, event.Message.Header.ID)
log.L(ctx).Debugf("Sending reply message for %s CID=%s", event.ID, event.Message.Header.ID)
cb.DeliveryResponse(connID, &core.EventDeliveryResponse{
ID: event.ID,
Rejected: false,
Expand Down Expand Up @@ -501,12 +501,13 @@ func (wh *WebHooks) doDelivery(ctx context.Context, connID string, reply bool, s
}

func (wh *WebHooks) DeliveryRequest(ctx context.Context, connID string, sub *core.Subscription, event *core.EventDelivery, data core.DataArray) error {
ctx = log.WithLogField(log.WithLogField(ctx, "webhook", wh.connID), "sub", sub.ID.String())
reply := sub.Options.TransportOptions().GetBool("reply")
if reply && event.Message != nil && event.Message.Header.CID != nil {
// We cowardly refuse to dispatch a message that is itself a reply, as it's hard for users to
// avoid loops - and there's no way for us to detect here if a user has configured correctly
// to avoid a loop.
log.L(wh.ctx).Debugf("Webhook subscription with reply enabled called with reply event '%s'", event.ID)
log.L(ctx).Debugf("Webhook subscription with reply enabled called with reply event '%s'", event.ID)
if cb, ok := wh.callbacks.handlers[sub.Namespace]; ok {
cb.DeliveryResponse(connID, &core.EventDeliveryResponse{
ID: event.ID,
Expand Down Expand Up @@ -540,6 +541,7 @@ func (wh *WebHooks) DeliveryRequest(ctx context.Context, connID string, sub *cor
}

func (wh *WebHooks) BatchDeliveryRequest(ctx context.Context, connID string, sub *core.Subscription, events []*core.CombinedEventDataDelivery) error {
ctx = log.WithLogField(log.WithLogField(ctx, "webhook", wh.connID), "sub", sub.ID.String())
reply := sub.Options.TransportOptions().GetBool("reply")
if reply {
nonReplyEvents := []*core.CombinedEventDataDelivery{}
Expand All @@ -549,7 +551,7 @@ func (wh *WebHooks) BatchDeliveryRequest(ctx context.Context, connID string, sub
// avoid loops - and there's no way for us to detect here if a user has configured correctly
// to avoid a loop.
if event.Message != nil && event.Message.Header.CID != nil {
log.L(wh.ctx).Debugf("Webhook subscription with reply enabled called with reply event '%s'", event.ID)
log.L(ctx).Debugf("Webhook subscription with reply enabled called with reply event '%s'", event.ID)
if cb, ok := wh.callbacks.handlers[sub.Namespace]; ok {
cb.DeliveryResponse(connID, &core.EventDeliveryResponse{
ID: event.ID,
Expand Down
74 changes: 66 additions & 8 deletions internal/events/webhooks/webhooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ import (
"github.com/hyperledger/firefly-common/pkg/ffresty"
"github.com/hyperledger/firefly-common/pkg/fftls"
"github.com/hyperledger/firefly-common/pkg/fftypes"
fflog "github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly/internal/coreconfig"
"github.com/hyperledger/firefly/mocks/eventsmocks"
"github.com/hyperledger/firefly/pkg/core"
"github.com/hyperledger/firefly/pkg/events"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
Expand Down Expand Up @@ -470,14 +472,10 @@ func TestRequestWithBodyReplyEndToEndWithTLS(t *testing.T) {

ctx, cancelCtx := context.WithCancel(context.Background())
go func() {
select {
case <-ctx.Done():
shutdownContext, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err := server.Shutdown(shutdownContext); err != nil {
return
}
}
<-ctx.Done()
shutdownContext, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_ = server.Shutdown(shutdownContext)
}()

server.Handler = r
Expand Down Expand Up @@ -1490,3 +1488,63 @@ func TestRequestWithBodyReplyEndToEndWithBatch(t *testing.T) {
func TestFirstDataNeverNil(t *testing.T) {
assert.NotNil(t, (&whPayload{}).firstData())
}

type testHook struct{ entries []*logrus.Entry }

func (h *testHook) Levels() []logrus.Level { return logrus.AllLevels }
func (h *testHook) Fire(e *logrus.Entry) error {
h.entries = append(h.entries, e)
return nil
}

func TestLoggingContextPreserved(t *testing.T) {
wh, cancel := newTestWebHooks(t)
defer cancel()

logger := logrus.StandardLogger()
origHooks := logger.Hooks
hook := &testHook{}
logger.AddHook(hook)
logrus.SetLevel(logrus.DebugLevel)
defer logger.ReplaceHooks(origHooks)

r := mux.NewRouter()
r.HandleFunc("/ping", func(res http.ResponseWriter, req *http.Request) {
res.WriteHeader(200)
_, _ = res.Write([]byte(`ok`))
}).Methods(http.MethodPost)
server := httptest.NewServer(r)
defer server.Close()

subID := fftypes.NewUUID()
sub := &core.Subscription{
SubscriptionRef: core.SubscriptionRef{ID: subID, Namespace: "ns1"},
}
to := sub.Options.TransportOptions()
to["url"] = fmt.Sprintf("http://%s/ping", server.Listener.Addr())
event := &core.EventDelivery{
EnrichedEvent: core.EnrichedEvent{Event: core.Event{ID: fftypes.NewUUID()}},
Subscription: core.SubscriptionRef{ID: subID},
}

parentCtx := fflog.WithLogField(context.Background(), "httpReq", "req-123")

mcb := wh.callbacks.handlers["ns1"].(*eventsmocks.Callbacks)
mcb.On("DeliveryResponse", mock.Anything, mock.MatchedBy(func(resp *core.EventDeliveryResponse) bool {
return !resp.Rejected
})).Return(nil)

err := wh.DeliveryRequest(parentCtx, mock.Anything, sub, event, nil)
assert.NoError(t, err)

found := false
for _, e := range hook.entries {
if e.Data["httpReq"] == "req-123" && e.Data["webhook"] != nil && e.Data["sub"] == subID.String() {
found = true
break
}
}
assert.True(t, found, "expected log entry with preserved httpReq, webhook, and sub fields")

mcb.AssertExpectations(t)
}
Loading