@@ -74,7 +74,9 @@ func (p *client) send(ctx context.Context, events []*v2.Event) ([]publisher.Publ
7474 go func (ev * v2.Event ) {
7575 defer wg .Done ()
7676
77- ctx := context .Background ()
77+ base := context .WithoutCancel (ctx )
78+ pubCtx , cancel := context .WithTimeout (base , p .options .PublishTimeout )
79+ defer cancel ()
7880
7981 logger := log .WithField ("subject" , ev .Subject ()).
8082 WithField ("id" , ev .ID ())
@@ -99,12 +101,17 @@ func (p *client) send(ctx context.Context, events []*v2.Event) ([]publisher.Publ
99101 "ce_id" : ev .ID (),
100102 "ce_source" : ev .Source (),
101103 "ce_type" : ev .Type (),
102- "content-type" : ev .DataContentType (),
103- "ce_time" : ev .Time ().String (),
104+ "ce_time" : ev .Time ().UTC ().Format (time .RFC3339 ),
104105 "ce_path" : "/" ,
105106 "ce_subject" : ev .Subject (),
106107 }
107108
109+ if ct := ev .DataContentType (); ct != "" {
110+ attrs ["content-type" ] = ct
111+ } else {
112+ attrs ["content-type" ] = "application/json"
113+ }
114+
108115 msg := & pubsub.Message {ID : ev .ID (), Data : raw , Attributes : attrs , PublishTime : time .Now ()}
109116 if p .options .OrderingKey {
110117 if pk , err := p .getPartitionKey (ev ); err == nil {
@@ -115,8 +122,8 @@ func (p *client) send(ctx context.Context, events []*v2.Event) ([]publisher.Publ
115122 topic := p .getTopic (ev .Subject ())
116123 err = try .Do (func (attempt int ) (bool , error ) {
117124 logger .Tracef ("publishing to topic %s, attempt %d" , ev .Subject (), attempt )
118- r := topic .Publish (ctx , msg )
119- if _ , err := r .Get (ctx ); err != nil {
125+ r := topic .Publish (pubCtx , msg )
126+ if _ , err := r .Get (pubCtx ); err != nil {
120127 log .Error (err )
121128 return attempt < 5 , errors .NewInternal (err , "Pub/Sub publish failed" )
122129 }
0 commit comments