11import {
2- testConfig as testConfigImpl ,
3- importStages ,
2+ authBegin ,
3+ authCodeResponse ,
4+ configureContext ,
5+ dataSourceFns ,
46 defaultApiLimits ,
5- initialPagingContext ,
6- reportImportProblem ,
7- dataSourceFns
7+ importStages ,
8+ initialPagingContext , reportImportProblem ,
9+ testConfig as testConfigImpl
810} from './handlerConfig.js' ;
911
12+ export const HandlerFunctionEnum = Object . freeze ( {
13+ testConfig : 'testConfig' ,
14+ importObjects : 'importObjects' ,
15+ readDataSource : 'readDataSource' ,
16+ oAuth2 : 'oAuth2'
17+ } ) ;
18+
1019// ============================================================================
1120//
1221// testConfig
1322//
1423export async function testConfig ( event , api ) {
1524 const { pluginConfig } = event ;
16- const { log, report, patchConfig } = api ;
25+ const { log, report, patchConfig, runtimeContext } = api ;
1726
1827 const context = {
1928 pluginConfig,
2029
21- log,
22- report,
23- patchConfig
30+ log, report, patchConfig, runtimeContext
2431 } ;
2532
33+ await configureContext ( context , HandlerFunctionEnum . testConfig ) ;
34+
2635 return testConfigImpl ( context ) ;
2736}
2837
@@ -32,83 +41,87 @@ export async function testConfig(event, api) {
3241//
3342export async function importObjects ( event , api ) {
3443 const { pluginConfig, pagingContext } = event ;
35- const { log, report, patchConfig } = api ;
44+ const { log, report, patchConfig, runtimeContext } = api ;
3645
3746 const context = {
38- vertices : [ ] ,
39- edges : [ ] ,
47+ vertices : [ ] , edges : [ ] ,
4048
41- pluginConfig,
42- pagingContext,
49+ pluginConfig, pagingContext,
4350
44- log,
45- report,
46- patchConfig,
51+ log, report, patchConfig, runtimeContext,
4752
4853 apiLimits : Object . assign ( { } , defaultApiLimits , pluginConfig . testSettings ?. apiLimits ?? { } )
4954 } ;
5055 const pageAPI = ( context ) => {
5156 return {
5257 get : ( key ) => context . pagingContext [ key ] ,
53- set : ( key , value ) => {
54- context . pagingContext [ key ] = value ;
55- } ,
56- clear : ( ) => {
57- context . pagingContext = { } ;
58- }
58+ set : ( key , value ) => { context . pagingContext [ key ] = value ; } ,
59+ clear : ( ) => { context . pagingContext = { } ; }
5960 } ;
6061 } ;
6162 context . pageAPI = pageAPI ( context ) ;
6263 context . reportImportProblem = reportImportProblem ( context ) ;
6364
64- if ( ! context . pageAPI . get ( 'squaredUp_isInit' ) ) {
65- // Set initial paging context values
66- context . pageAPI . set ( 'squaredUp_stage' , 0 ) ;
67- for ( const [ key , value ] of Object . entries ( initialPagingContext ) ) {
68- context . pageAPI . set ( key , value ) ;
65+ await configureContext ( context , HandlerFunctionEnum . importObjects ) ;
66+
67+ if ( Array . isArray ( importStages ) && importStages . length > 0 ) {
68+
69+ if ( ! context . pageAPI . get ( 'squaredUp_isInit' ) ) {
70+ // Set initial paging context values
71+ context . pageAPI . set ( 'squaredUp_stage' , 0 ) ;
72+ for ( const [ key , value ] of Object . entries ( initialPagingContext ) ) {
73+ context . pageAPI . set ( key , value ) ;
74+ }
75+ context . pageAPI . set ( 'squaredUp_isInit' , true ) ;
6976 }
70- context . pageAPI . set ( 'squaredUp_isInit' , true ) ;
71- }
7277
73- // Run through the appropriate stages until we've been running for 10 minutes or we've created results larger than 2MB.
74- const maxElapsedTimeMSecs = pluginConfig . testSettings ?. maxElapsedTimeMSecs ?? 10 * 60 * 1000 ;
75- const maxPayloadSize = pluginConfig . testSettings ?. maxPayloadSize ?? 2 * 1024 * 1024 ;
76- let stage = context . pageAPI . get ( 'squaredUp_stage' ) ;
77- context . log . debug (
78- 'importObjects starts: ' +
78+ // Run through the appropriate stages until we've been running for 10 minutes or we've created results larger than 2MB.
79+ const maxElapsedTimeMSecs = pluginConfig . testSettings ?. maxElapsedTimeMSecs ?? 10 * 60 * 1000 ;
80+ const maxPayloadSize = pluginConfig . testSettings ?. maxPayloadSize ?? 2 * 1024 * 1024 ;
81+ let stage = context . pageAPI . get ( 'squaredUp_stage' ) ;
82+ context . log . debug ( 'importObjects starts: ' +
7983 `stage=${ stage } , ` +
8084 `apiLimits=${ JSON . stringify ( context . apiLimits ) } , ` +
8185 `maxElapsedTimeMSecs=${ maxElapsedTimeMSecs } , ` +
82- `maxPayloadSize=${ maxPayloadSize } `
83- ) ;
84- const start = Date . now ( ) ;
85- let elapsed ;
86- let payloadSize ;
87- do {
88- if ( await importStages [ stage ] ( context ) ) {
89- // Stage reported it has finished... step to the next one
90- stage ++ ;
91- context . pageAPI . set ( 'squaredUp_stage' , stage ) ;
92-
93- if ( stage >= importStages . length ) {
94- // No more stages, so set pagingContext to an empty object to
95- // indicate import is complete
96- context . pageAPI . clear ( ) ;
97- break ;
86+ `maxPayloadSize=${ maxPayloadSize } ` ) ;
87+ const start = Date . now ( ) ;
88+ let elapsed ;
89+ let payloadSize ;
90+ let rateLimited = false ;
91+ do {
92+ context . pageAPI . set ( 'rateLimitDelay' , undefined ) ;
93+ if ( await importStages [ stage ] ( context ) ) {
94+ // Stage reported it has finished... step to the next one
95+ stage ++ ;
96+ context . pageAPI . set ( 'squaredUp_stage' , stage ) ;
97+
98+ if ( stage >= importStages . length ) {
99+ // No more stages, so set pagingContext to an empty object to
100+ // indicate import is complete
101+ context . pageAPI . clear ( ) ;
102+ break ;
103+ }
98104 }
99- }
100- elapsed = Date . now ( ) - start ;
101- const pagingContextSize = JSON . stringify ( context . pagingContext ) . length ;
102- payloadSize = JSON . stringify ( {
103- vertices : context . vertices ,
104- edges : context . edges ,
105- pagingContext : context . pagingContext
106- } ) . length ;
107- context . log . debug (
108- `importObjects looping: elapsed = ${ elapsed } , payloadSize=${ payloadSize } , pagingContextSize=${ pagingContextSize } `
109- ) ;
110- } while ( elapsed < maxElapsedTimeMSecs && payloadSize < maxPayloadSize ) ;
111- context . log . debug ( 'importObjects loop ends' ) ;
105+ elapsed = Date . now ( ) - start ;
106+ const pagingContextSize = JSON . stringify ( context . pagingContext ) . length ;
107+ payloadSize = JSON . stringify ( { vertices : context . vertices , edges : context . edges , pagingContext : context . pagingContext } ) . length ;
108+ const rateLimitDelay = context . pageAPI . get ( 'rateLimitDelay' ) ?? 0 ;
109+ if ( rateLimitDelay ) {
110+ // Stage reported it was rate limited, so wait synchronously before continuing if we have time, otherwise
111+ // end this page of import and return the results so far.
112+ if ( elapsed + rateLimitDelay < maxElapsedTimeMSecs && payloadSize < maxPayloadSize ) {
113+ context . log . debug ( `importObjects rate limited: elapsed = ${ elapsed } , synchronously delaying ${ rateLimitDelay } msecs` ) ;
114+ await new Promise ( ( resolve ) => setTimeout ( resolve , rateLimitDelay ) ) ;
115+ elapsed = Date . now ( ) - start ;
116+ } else {
117+ context . log . debug ( `importObjects rate limited: elapsed = ${ elapsed } , ending page early` ) ;
118+ rateLimited = true ;
119+ }
120+ }
121+ context . log . debug ( `importObjects looping: elapsed = ${ elapsed } , payloadSize=${ payloadSize } , pagingContextSize=${ pagingContextSize } ` ) ;
122+ } while ( ! rateLimited && elapsed < maxElapsedTimeMSecs && payloadSize < maxPayloadSize ) ;
123+ context . log . debug ( 'importObjects loop ends' ) ;
124+ }
112125
113126 // Return the results
114127 const result = {
@@ -117,6 +130,7 @@ export async function importObjects(event, api) {
117130 pagingContext : context . pagingContext
118131 } ;
119132 return result ;
133+
120134}
121135
122136// ============================================================================
@@ -125,23 +139,47 @@ export async function importObjects(event, api) {
125139//
126140export async function readDataSource ( event , api ) {
127141 const { pluginConfig, dataSource, dataSourceConfig, targetNodes, timeframe } = event ;
128- const { log, report, patchConfig } = api ;
142+ const { log, report, patchConfig, runtimeContext } = api ;
129143
130144 const context = {
131- pluginConfig,
132- dataSource,
133- dataSourceConfig,
134- targetNodes,
135- timeframe,
136- log,
137- report,
138- patchConfig
145+ pluginConfig, dataSource, dataSourceConfig, targetNodes, timeframe,
146+ log, report, patchConfig, runtimeContext
139147 } ;
140148
149+ await configureContext ( context , HandlerFunctionEnum . readDataSource ) ;
150+
141151 const dataSourceFn = dataSourceFns [ dataSource . name ] ;
142152 if ( ! dataSourceFn ) {
143153 throw new Error ( `No data source function was found for data source ${ dataSource . name } ` ) ;
144154 }
145155
146156 return dataSourceFn ( context ) ;
147157}
158+
159+ // ============================================================================
160+ //
161+ // oAuth2
162+ //
163+ export async function oAuth2 ( { pluginConfig, dataSourceConfig, oAuth2Config } , { log, report, patchConfig } ) {
164+ const context = {
165+ pluginConfig,
166+ dataSourceConfig,
167+ oAuth2Config,
168+ log,
169+ report,
170+ patchConfig
171+ } ;
172+
173+ await configureContext ( context , HandlerFunctionEnum . oAuth2 ) ;
174+
175+ switch ( dataSourceConfig . oAuth2Stage ) {
176+ case 'oAuth2Begin' :
177+ return authBegin ( context ) ;
178+
179+ case 'oAuth2CodeResponse' :
180+ return authCodeResponse ( context ) ;
181+
182+ default :
183+ throw new Error ( `Invalid oAuth2Stage: "${ dataSourceConfig . oAuth2Stage } "` ) ;
184+ }
185+ }
0 commit comments