-
Notifications
You must be signed in to change notification settings - Fork 115
Expand file tree
/
Copy pathruntimelogs_stub.go
More file actions
155 lines (141 loc) · 4.75 KB
/
runtimelogs_stub.go
File metadata and controls
155 lines (141 loc) · 4.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package handler
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"
log "github.com/sirupsen/logrus"
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/rapi/model"
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/rapi/rendering"
)
const (
logsAPIDisabledErrorType = "Logs.NotSupported"
telemetryAPIDisabledErrorType = "Telemetry.NotSupported"
)
type runtimeTelemetryBuffering struct {
MaxBytes int64 `json:"maxBytes"`
MaxItems int `json:"maxItems"`
TimeoutMs int64 `json:"timeoutMs"`
}
type runtimeTelemetryDestination struct {
URI string `json:"URI"`
Protocol string `json:"protocol"`
}
type runtimeTelemetryRequest struct {
Buffering runtimeTelemetryBuffering `json:"buffering"`
Destination runtimeTelemetryDestination `json:"destination"`
Types []string `json:"types"`
SchemaVersion string `json:"schemaVersion"`
}
type runtimeLogsStubAPIHandler struct{}
func (h *runtimeLogsStubAPIHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
if err := rendering.RenderJSON(http.StatusAccepted, writer, request, &model.ErrorResponse{
ErrorType: logsAPIDisabledErrorType,
ErrorMessage: "Logs API is not supported",
}); err != nil {
log.WithError(err).Warn("Error while rendering response")
http.Error(writer, err.Error(), http.StatusInternalServerError)
}
}
// NewRuntimeLogsAPIStubHandler returns a new instance of http handler
// for serving /runtime/logs when a telemetry service implementation is absent
func NewRuntimeLogsAPIStubHandler() http.Handler {
return &runtimeLogsStubAPIHandler{}
}
type runtimeTelemetryAPIStubHandler struct {
destinations []string
mu sync.Mutex
}
func (h *runtimeTelemetryAPIStubHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
var runtimeReq runtimeTelemetryRequest
body, err := ioutil.ReadAll(request.Body)
if err != nil {
log.WithError(err).Warn("Error while reading request body")
http.Error(writer, err.Error(), http.StatusInternalServerError)
}
err = json.Unmarshal(body, &runtimeReq)
if err != nil {
log.WithError(err).Warn("Error while unmarshaling request")
http.Error(writer, err.Error(), http.StatusInternalServerError)
}
if len(runtimeReq.Destination.URI) > 0 && runtimeReq.Destination.Protocol == "HTTP" {
u, err := url.Parse(runtimeReq.Destination.URI)
if err != nil {
log.WithError(err).Warn("Error while parsing destination URL")
http.Error(writer, err.Error(), http.StatusInternalServerError)
}
if sep := strings.IndexRune(u.Host, ':'); sep != -1 && u.Host[:sep] == "sandbox" {
u.Host = "localhost" + u.Host[sep:]
}
h.mu.Lock()
h.destinations = append(h.destinations, u.String())
h.mu.Unlock()
}
if err := rendering.RenderJSON(http.StatusAccepted, writer, request, &model.ErrorResponse{
ErrorType: telemetryAPIDisabledErrorType,
ErrorMessage: "Telemetry API is not supported",
}); err != nil {
log.WithError(err).Warn("Error while rendering response")
http.Error(writer, err.Error(), http.StatusInternalServerError)
}
}
type logMessage struct {
Time string `json:"time"`
Type string `json:"type"`
Record string `json:"record"`
}
// NewRuntimeTelemetryAPIStubHandler returns a new instance of http handler
// for serving /runtime/logs when a telemetry service implementation is absent
func NewRuntimeTelemetryAPIStubHandler() http.Handler {
handler := runtimeTelemetryAPIStubHandler{}
originalStdout := os.Stdout
r, w, _ := os.Pipe()
os.Stdout = w
os.Stderr = w
scanner := bufio.NewScanner(r)
scanner.Split(bufio.ScanLines)
go func() {
for {
if len(handler.destinations) > 0 {
var msgs []logMessage
for scanner.Scan() && len(msgs) < 10 {
line := scanner.Text()
originalStdout.WriteString(fmt.Sprintf("%s\n", line))
msgs = append(msgs, logMessage{
Time: time.Now().Format("2006-01-02T15:04:05.999Z"),
Type: "function",
Record: line,
})
}
data, err := json.Marshal(msgs)
if err != nil {
originalStdout.WriteString(fmt.Sprintf("%s\n", err))
}
bodyReader := bytes.NewReader(data)
handler.mu.Lock()
destinations := handler.destinations
handler.mu.Unlock()
for _, dest := range destinations {
resp, err := http.Post(dest, "application/json", bodyReader)
if err != nil {
originalStdout.WriteString(fmt.Sprintf("%s\n", err))
}
if resp.StatusCode > 300 {
originalStdout.WriteString(fmt.Sprintf("failed to send logs to destination %q: status %d", dest, resp.StatusCode))
}
}
}
time.Sleep(5 * time.Second)
}
}()
return &handler
}