77 "strings"
88 "time"
99
10+ "github.com/optim-kazuhiro-seida/go-advance-type/wrapper"
11+
1012 . "github.com/optim-corp/cios-cli/cli"
1113 "github.com/optim-corp/cios-cli/models"
1214 "github.com/optim-corp/cios-cli/utils"
@@ -27,6 +29,7 @@ func GetDataStoreCommand() *cli.Command {
2729 deleteDataStore (),
2830 listDataStore (),
2931 embezzleDateStore (),
32+ saveDataStore (),
3033 },
3134 }
3235}
@@ -94,84 +97,68 @@ func listDataStore() *cli.Command {
9497 Aliases : models .ALIAS_LIST ,
9598 Flags : []cli.Flag {
9699 & cli.StringFlag {Name : "channel_id" , Aliases : []string {"c" }},
97- & cli.StringFlag {Name : "packer_format" , Aliases : []string {"pf" }},
98- & cli.StringFlag {Name : "timestamp_range" , Aliases : []string {"tr" }},
99- & cli.StringFlag {Name : "save_dir" , Aliases : []string {"out" }},
100+ & cli.StringFlag {Name : "packer_format" , Aliases : []string {"pf" }, Value : "payload_only" },
101+ & cli.StringFlag {Name : "timestamp_range" , Aliases : []string {"tr" }, DefaultText : "Now Time" , Value : ":" + convert .MustStr (time .Now ().UnixNano ())},
100102 & cli.StringFlag {Name : "label" , Aliases : []string {"lb" }},
101103 & cli.BoolFlag {Name : "data" , Aliases : []string {"d" }},
102- & cli.BoolFlag {Name : "save" , Aliases : []string {"s" }},
103104 & cli.Int64Flag {Name : "limit" , Aliases : []string {"l" }, DefaultText : "30" , Value : 30 },
104105 & cli.Int64Flag {Name : "offset" , Aliases : []string {"o" }, DefaultText : "0" , Value : 0 },
105106 & cli.StringFlag {Name : "resource_owner_id" , Aliases : []string {"r" , "ro" }},
106- & cli.StringFlag {Name : "change_channel_id" , Aliases : []string {"chg-c" }},
107- & cli.StringFlag {Name : "data_replace" , Aliases : []string {"replace" , "rep" }, Usage : "" },
108107 },
109108 Action : func (c * cli.Context ) error {
110109 var (
111110 channelID = c .String ("channel_id" )
112111 packerFormat = c .String ("packer_format" )
113- outputDir = c .String ("save_dir" )
114112 limit = c .Int64 ("limit" )
115113 offset = c .Int64 ("offset" )
116- timestampRange = is ( c .String ("timestamp_range" ) != "" ). T ( c . String ( "timestamp_range" )). F ( ":" + convert . MustStr ( time . Now (). UnixNano ())). Value .( string )
114+ timestampRange = c .String ("timestamp_range" )
117115 resourceOwnerID = c .String ("resource_owner_id" )
118- dataFlag = c .Bool ("data" )
119- saveFlag = c .Bool ("save" )
120116 label = c .String ("label" )
121- replace = c .String ( "data_replace " )
117+ dataFlag = c .Bool ( "data " )
122118 channelsMap , _ , _ = Client .PubSub .GetChannelsMapByID (ciossdk .MakeGetChannelsOpts (), context .Background ())
123119 )
124120
125- printObj := func (channel cios.Channel , limit int64 ) {
126- if dataFlag {
127- stageDSDir := datastoreDir + "/" + models .GetStage ()
128- channelDir := datastoreDir + "/" + models .GetStage () + "/" + channelsMap [channel .Id ].Name + "___" + channel .Id
129- data , err := Client .PubSub .GetStreamAll (channel .Id , ciossdk .MakeGetStreamOpts ().Limit (limit ).PackerFormat (packerFormat ).TimestampRange (timestampRange ).Label (label ).Offset (offset ), context .Background ())
130- assert (err ).Log ()
131- fPrintf ("\n |Channel ID| : %s \n |Channel Name|: %s\n \n " , channel .Id , channelsMap [channel .Id ].Name )
132- if saveFlag {
133- path (datastoreDir ).CreateDir ()
134- path (stageDSDir ).CreateDir ()
135- path (channelDir ).CreateDir ()
136- }
137-
138- for count , val := range data {
139- if replace != "" {
140- splitInComma := strings .Split (replace , "," )
141- for _ , _val := range splitInComma {
142- splitVal := strings .Split (_val , ":" )
143- if len (splitVal ) >= 2 {
144- val = strings .Replace (val , splitVal [0 ], splitVal [1 ], - 1 )
145- } else {
146- log .Warn ("Replace Missing format string" )
147- }
148- }
149- }
150- if saveFlag {
151- if outputDir != "" {
152- channelDir = outputDir
153- }
154- filePrefix := strings .Repeat ("0" , len (strconv .Itoa (len (data )))- len (strconv .Itoa (count )))
155- _path := channelDir + "/" + filePrefix + strconv .Itoa (count ) + ".txt"
156- assert (path (_path ).WriteFileAsString (val )).Log ()
157- fPrintln (val )
121+ printObject := func (channelId string ) {
122+ objects , _ , err := Client .PubSub .GetObjectsAll (channelId , ciossdk .MakeGetObjectsOpts ().
123+ Limit (limit ).
124+ TimestampRange (timestampRange ).
125+ Label (label ).
126+ Offset (offset ),
127+ context .Background ())
128+ if len (objects ) == 0 || resourceOwnerID != "" && channelsMap [channelId ].ResourceOwnerId != resourceOwnerID {
129+ return
130+ }
131+ fPrintf ("\n |Channel ID| : %s \n |Channel Name|: %s\n \n " , channelId , channelsMap [channelId ].Name )
132+ fPrintln ("\t |ID|\t \t |Timestamp|\t |Mime Type|" )
133+ assert (err ).
134+ Log ().
135+ NoneErr (func () {
136+ for _ , obj := range objects {
137+ fPrintf ("%s %s %s\n " , obj .Id , obj .Timestamp , obj .MimeType )
158138 }
159- fPrintln (val )
160- }
161- } else {
162- objects , _ , err := Client .PubSub .GetObjectsAll (channel .Id , ciossdk .MakeGetObjectsOpts ().Limit (limit ).TimestampRange (timestampRange ).Label (label ).Offset (offset ), context .Background ())
163- if len (objects ) == 0 || resourceOwnerID != "" && channelsMap [channel .Id ].ResourceOwnerId != resourceOwnerID {
164- return
165- }
166- fPrintf ("\n |Channel ID| : %s \n |Channel Name|: %s\n \n " , channel .Id , channelsMap [channel .Id ].Name )
167- fPrintln ("\t |ID|\t \t |Timestamp|\t |Mime Type|" )
168- assert (err ).
169- Log ().
170- NoneErr (func () {
171- for _ , obj := range objects {
172- fPrintf ("%s %s %s\n " , obj .Id , obj .Timestamp , obj .MimeType )
173- }
174- })
139+ })
140+
141+ }
142+ printData := func (channelId string ) []string {
143+ data , err := Client .PubSub .GetStreamAll (channelId , ciossdk .MakeGetStreamOpts ().
144+ Limit (limit ).Offset (offset ).
145+ PackerFormat (packerFormat ).
146+ TimestampRange (timestampRange ).
147+ Label (label ),
148+ context .Background ())
149+ assert (err ).Log ()
150+ fPrintf ("\n |Channel ID| : %s \n |Channel Name|: %s\n \n " , channelId , channelsMap [channelId ].Name )
151+ for _ , val := range data {
152+ fPrintln (val )
153+ }
154+ return data
155+ }
156+ printJob := func (channel cios.Channel , limit int64 ) {
157+ switch {
158+ case dataFlag :
159+ printData (channel .Id )
160+ default :
161+ printObject (channel .Id )
175162 }
176163 assert (out .Flush ()).Log ()
177164 }
@@ -181,14 +168,14 @@ func listDataStore() *cli.Command {
181168 channel , _ , err := Client .PubSub .GetChannel (channelID , nil , nil , context .Background ())
182169 assert (err ).
183170 Log ().
184- NoneErr (func () { printObj (channel , limit ) })
171+ NoneErr (func () { printJob (channel , limit ) })
185172 } else {
186173 channels , _ , err := Client .PubSub .GetChannelsAll (ciossdk .MakeGetChannelsOpts ().ResourceOwnerId (resourceOwnerID ), context .Background ())
187174 assert (err ).
188175 Log ().
189176 NoneErr (func () {
190177 for _ , channel := range channels {
191- printObj (channel , limit )
178+ printJob (channel , limit )
192179 }
193180 })
194181 }
@@ -197,6 +184,120 @@ func listDataStore() *cli.Command {
197184 },
198185 }
199186}
187+ func saveDataStore () * cli.Command {
188+ return & cli.Command {
189+ Name : "save" ,
190+ Aliases : []string {"download" , "get" , "sv" , "dl" },
191+ Flags : []cli.Flag {
192+ & cli.StringFlag {Name : "channel_id" , Aliases : []string {"c" }},
193+ & cli.StringFlag {Name : "packer_format" , Aliases : []string {"pf" }, Value : "payload_only" },
194+ & cli.StringFlag {Name : "timestamp_range" , Aliases : []string {"tr" }, DefaultText : "Now Time" , Value : ":" + convert .MustStr (time .Now ().UnixNano ())},
195+ & cli.StringFlag {Name : "save_dir" , Aliases : []string {"out" }},
196+ & cli.StringFlag {Name : "label" , Aliases : []string {"lb" }},
197+ & cli.BoolFlag {Name : "indent" , Aliases : []string {"idt" , "idnt" , "i" }},
198+ & cli.BoolFlag {Name : "collective" , Aliases : []string {"compact" , "coll" , "collect" }},
199+ & cli.Int64Flag {Name : "limit" , Aliases : []string {"l" }, DefaultText : "30" , Value : 30 },
200+ & cli.Int64Flag {Name : "offset" , Aliases : []string {"o" }, DefaultText : "0" , Value : 0 },
201+ & cli.StringFlag {Name : "resource_owner_id" , Aliases : []string {"r" , "ro" }},
202+ & cli.StringFlag {Name : "replace_save_data_channel" , Aliases : []string {"replace" , "rep" }, Usage : "-replace <channel id>" },
203+ },
204+ Action : func (c * cli.Context ) error {
205+ var (
206+ channelID = c .String ("channel_id" )
207+ packerFormat = c .String ("packer_format" )
208+ limit = c .Int64 ("limit" )
209+ offset = c .Int64 ("offset" )
210+ timestampRange = c .String ("timestamp_range" )
211+ resourceOwnerID = c .String ("resource_owner_id" )
212+ label = c .String ("label" )
213+ outputDir = wrapper .AsString (c .String ("save_dir" ))
214+ replaced = wrapper .AsString (c .String ("replace_save_data_channel" ))
215+ indent = c .Bool ("indent" )
216+ collective = c .Bool ("collective" )
217+ channelsMap , _ , _ = Client .PubSub .GetChannelsMapByID (ciossdk .MakeGetChannelsOpts (), context .Background ())
218+ stageDSDir = fmt .Sprintf ("%s/%s" , datastoreDir , models .GetStage ())
219+ )
220+ replaceChannelId := func (data string ) string {
221+ var jsonFormat cios.PackerFormatJson
222+ assert (convert .UnMarshalJson ([]byte (data ), & jsonFormat )).Log ()
223+ jsonFormat .Header .ChannelId = replaced .Str ()
224+ return convert .MustCompactJson (jsonFormat )
225+ }
226+ indentJson := func (data string ) string {
227+ var jsonFormat interface {}
228+ assert (convert .UnMarshalJson ([]byte (data ), & jsonFormat )).Log ()
229+ return convert .MustIndentJson (jsonFormat )
230+ }
231+ job := func (channel cios.Channel , limit int64 ) {
232+ switch {
233+ case replaced != "" :
234+ packerFormat = "json"
235+ fallthrough
236+ case outputDir == "" :
237+ outputDir = wrapper .String (fmt .Sprintf ("%s/%s/%s___%s" , datastoreDir , models .GetStage (), channelsMap [channel .Id ].Name , channel .Id ))
238+ fallthrough
239+ default :
240+ data , err := Client .PubSub .GetStreamAll (channel .Id , ciossdk .MakeGetStreamOpts ().
241+ Limit (limit ).Offset (offset ).
242+ PackerFormat (packerFormat ).
243+ TimestampRange (timestampRange ).
244+ Label (label ),
245+ context .Background ())
246+ assert (err ).Log ()
247+ for idx , val := range data {
248+ switch {
249+ case replaced .IsPreset ():
250+ data [idx ] = replaceChannelId (val )
251+ fallthrough
252+ case indent :
253+ data [idx ] = indentJson (val )
254+ }
255+ }
256+ path (datastoreDir ).CreateDir ()
257+ path (stageDSDir ).CreateDir ()
258+ path (outputDir .Str ()).CreateDir ()
259+ switch {
260+ case collective :
261+ fileName := fmt .Sprintf ("%s/%s_%s.txt" ,
262+ outputDir ,
263+ packerFormat ,
264+ strings .Replace (timestampRange , ":" , "-" , - 1 ))
265+ assert (path (fileName ).WriteFileAsString ("[\n " + strings .Join (data , "," )+ "]\n " )).
266+ Log ().
267+ NoneErrPrintln ("Completed " , fileName )
268+ default :
269+ for idx , val := range data {
270+ allLength := len (str (len (data )))
271+ currentLength := len (str (idx ))
272+ filePrefixZero := strings .Repeat ("0" , allLength - currentLength )
273+ fileName := fmt .Sprintf ("%s/%s%d.txt" , outputDir , filePrefixZero , idx )
274+ assert (path (fileName ).WriteFileAsString (val )).
275+ Log ().
276+ NoneErrPrintln ("Completed " , fileName )
277+ }
278+ }
279+ }
280+ }
281+
282+ if channelID != "" {
283+ channel , _ , err := Client .PubSub .GetChannel (channelID , nil , nil , context .Background ())
284+ assert (err ).
285+ Log ().
286+ NoneErr (func () { job (channel , limit ) })
287+ } else {
288+ channels , _ , err := Client .PubSub .GetChannelsAll (ciossdk .MakeGetChannelsOpts ().ResourceOwnerId (resourceOwnerID ), context .Background ())
289+ assert (err ).
290+ Log ().
291+ NoneErr (func () {
292+ for _ , channel := range channels {
293+ job (channel , limit )
294+ }
295+ })
296+ }
297+ return nil
298+ },
299+ }
300+ }
200301func embezzleDateStore () * cli.Command {
201302 return & cli.Command {
202303 Name : "embezzle" ,
0 commit comments