@@ -2,22 +2,28 @@ package daemon
22
33import (
44 "bufio"
5+ "context"
56 "encoding/json"
67 "fmt"
78 "io"
8- "net"
99 "os"
1010 "strconv"
1111 "strings"
1212 "time"
1313
1414 "github.com/devforth/OnLogs/app/agent"
1515 "github.com/devforth/OnLogs/app/containerdb"
16+ "github.com/devforth/OnLogs/app/docker"
1617 "github.com/devforth/OnLogs/app/util"
1718 "github.com/devforth/OnLogs/app/vars"
19+ "github.com/docker/docker/api/types/container"
1820 "github.com/syndtr/goleveldb/leveldb"
1921)
2022
23+ type DaemonService struct {
24+ DockerClient * docker.DockerService
25+ }
26+
2127func createLogMessage (db * leveldb.DB , host string , container string , message string ) string {
2228 datetime := strings .Replace (strings .Split (time .Now ().UTC ().String (), " +" )[0 ], " " , "T" , 1 )
2329 if len (datetime ) < 29 {
@@ -40,25 +46,6 @@ func validateMessage(message string) (string, bool) {
4046 return message , true
4147}
4248
43- func createConnection (containerName string ) net.Conn {
44- connection , _ := net .Dial ("unix" , os .Getenv ("DOCKER_SOCKET_PATH" ))
45- fmt .Fprintf (
46- connection ,
47- "GET /containers/" + containerName + "/logs?stdout=true&stderr=true×tamps=true&follow=true&since=" + strconv .FormatInt (time .Now ().Add (- 5 * time .Second ).Unix (), 10 )+ " HTTP/1.0\r \n \r \n " ,
48- )
49- return connection
50- }
51-
52- func readHeader (reader bufio.Reader ) {
53- for { // reading resp header
54- tmp , _ := reader .ReadString ('\n' )
55- if tmp [:len (tmp )- 2 ] == "" {
56- reader .ReadString ('\n' )
57- break
58- }
59- }
60- }
61-
6249func closeActiveStream (containerName string ) {
6350 newDaemonStreams := []string {}
6451 for _ , stream := range vars .Active_Daemon_Streams {
@@ -73,19 +60,29 @@ func closeActiveStream(containerName string) {
7360 vars .Active_Daemon_Streams = newDaemonStreams
7461}
7562
76- func CreateDaemonToHostStream (containerName string ) {
77- connection := createConnection (containerName )
78- reader := bufio .NewReader (connection )
79- readHeader (* reader )
63+ func (h * DaemonService ) CreateDaemonToHostStream (ctx context.Context , containerName string ) {
64+ rc , err := h .DockerClient .Client .ContainerLogs (ctx , containerName , container.LogsOptions {ShowStdout : true , ShowStderr : true , Timestamps : true , Follow : true , Since : strconv .FormatInt (time .Now ().Add (- 5 * time .Second ).Unix (), 10 )})
65+ if err != nil {
66+ closeActiveStream (containerName )
67+ return
68+ }
69+ defer rc .Close ()
70+
71+ reader := bufio .NewReader (rc )
8072
8173 host := util .GetHost ()
8274 token := os .Getenv ("ONLOGS_TOKEN" )
8375 agent .SendLogMessage (token , containerName , strings .SplitN (createLogMessage (nil , host , containerName , "ONLOGS: Container listening started!" ), " " , 2 ))
8476
8577 lastSleep := time .Now ().Unix ()
8678 for { // reading body
87- logLine , get_string_error := reader .ReadString ('\n' ) // TODO read bytes instead of strings
79+ logLine , get_string_error := reader .ReadString ('\n' )
8880 if get_string_error != nil {
81+ if get_string_error == io .EOF {
82+ closeActiveStream (containerName )
83+ agent .SendLogMessage (token , containerName , strings .SplitN (createLogMessage (nil , host , containerName , "ONLOGS: Container listening stopped! (EOF)" ), " " , 2 ))
84+ return
85+ }
8986 closeActiveStream (containerName )
9087 agent .SendLogMessage (token , containerName , strings .SplitN (createLogMessage (nil , host , containerName , "ONLOGS: Container listening stopped! (" + get_string_error .Error ()+ ")" ), " " , 2 ))
9188 return
@@ -106,10 +103,15 @@ func CreateDaemonToHostStream(containerName string) {
106103}
107104
108105// creates stream that writes logs from every docker container to leveldb
109- func CreateDaemonToDBStream (containerName string ) {
110- connection := createConnection (containerName )
111- reader := bufio .NewReader (connection )
112- readHeader (* reader )
106+ func (h * DaemonService ) CreateDaemonToDBStream (ctx context.Context , containerName string ) {
107+ rc , err := h .DockerClient .Client .ContainerLogs (ctx , containerName , container.LogsOptions {ShowStdout : true , ShowStderr : true , Timestamps : true , Follow : true , Since : strconv .FormatInt (time .Now ().Add (- 5 * time .Second ).Unix (), 10 )})
108+ if err != nil {
109+ closeActiveStream (containerName )
110+ return
111+ }
112+ defer rc .Close ()
113+
114+ reader := bufio .NewReader (rc )
113115
114116 host := util .GetHost ()
115117 current_db := util .GetDB (host , containerName , "logs" )
@@ -119,6 +121,11 @@ func CreateDaemonToDBStream(containerName string) {
119121 for { // reading body
120122 logLine , get_string_error := reader .ReadString ('\n' )
121123 if get_string_error != nil {
124+ if get_string_error == io .EOF {
125+ closeActiveStream (containerName )
126+ createLogMessage (current_db , host , containerName , "ONLOGS: Container listening stopped! (EOF)" )
127+ return
128+ }
122129 closeActiveStream (containerName )
123130 createLogMessage (current_db , host , containerName , "ONLOGS: Container listening stopped! (" + get_string_error .Error ()+ ")" )
124131 return
@@ -150,27 +157,12 @@ func CreateDaemonToDBStream(containerName string) {
150157 }
151158}
152159
153- // make request to docker socket
154- func makeSocketRequest ( path string ) [] byte {
155- connection , err := net . Dial ( "unix" , os . Getenv ( "DOCKER_SOCKET_PATH" ) )
160+ // returns list of names of docker containers from docker daemon
161+ func ( h * DaemonService ) GetContainersList ( ctx context. Context ) [] string {
162+ result , err := h . DockerClient . GetContainerNames ( ctx )
156163 if err != nil {
157164 panic (err )
158165 }
159- fmt .Fprintf (connection , "GET /" + path + " HTTP/1.0\r \n \r \n " )
160-
161- body , _ := io .ReadAll (connection )
162-
163- connection .Close ()
164- return body
165- }
166-
167- // returns list of names of docker containers from docker daemon
168- func GetContainersList () []string {
169- var result []map [string ]any
170-
171- body := string (makeSocketRequest ("containers/json" ))
172- body = strings .Split (body , "\r \n \r \n " )[1 ]
173- json .Unmarshal ([]byte (body ), & result )
174166
175167 var names []string
176168
@@ -184,9 +176,9 @@ func GetContainersList() []string {
184176 }
185177 containersMetaDB = vars .ContainersMeta_DBs [util .GetHost ()]
186178
187- for i := 0 ; i < len ( result ); i ++ {
188- name := fmt . Sprintf ( "%v" , result [i ][ "Names" ].([] interface {})[ 0 ].( string ))[ 1 :]
189- id := result [i ][ "Id" ].( string )
179+ for i := range result {
180+ name := result [i ]. Name
181+ id := result [i ]. ID
190182
191183 names = append (names , name )
192184 containersMetaDB .Put ([]byte (name ), []byte (id ), nil )
@@ -195,16 +187,11 @@ func GetContainersList() []string {
195187 return names
196188}
197189
198- func GetContainerImageNameByContainerID (containerID string ) string {
199- body := string (makeSocketRequest ("containers/" + containerID + "/json" ))
200- body = strings .Split (body , "\r \n \r \n " )[1 ]
201- var result map [string ]any
202- json .Unmarshal ([]byte (body ), & result )
203-
204- if result ["Config" ] == nil {
190+ func (h * DaemonService ) GetContainerImageNameByContainerID (ctx context.Context , containerID string ) string {
191+ result , err := h .DockerClient .GetContainerImageNameByContainerID (ctx , containerID )
192+ if err != nil {
205193 return ""
206194 }
207195
208- imageName := fmt .Sprintf ("%v" , result ["Config" ].(map [string ]any )["Image" ])
209- return imageName
196+ return result
210197}
0 commit comments