44package com .microsoft .azure .synapse .ml .nbtest .SynapseExtension
55
66import com .microsoft .azure .synapse .ml .Secrets
7- import com .microsoft .azure .synapse .ml .Secrets .getSynapseExtensionSecret
87import com .microsoft .azure .synapse .ml .build .BuildInfo
9- import com .microsoft .azure .synapse .ml .core .env .PackageUtils .{ SparkMavenPackageList , SparkMavenRepositoryList }
8+ import com .microsoft .azure .synapse .ml .core .env .PackageUtils .SparkMavenRepositoryList
109import com .microsoft .azure .synapse .ml .io .http .RESTHelpers
1110import com .microsoft .azure .synapse .ml .io .http .RESTHelpers ._
1211import com .microsoft .azure .synapse .ml .nbtest .SharedNotebookE2ETestUtilities ._
@@ -23,6 +22,7 @@ import java.time.LocalDateTime
2322import java .time .format .DateTimeFormatter
2423import scala .annotation .tailrec
2524import scala .collection .JavaConverters ._
25+ import scala .collection .immutable .HashMap
2626import scala .concurrent .{ExecutionContext , Future , TimeoutException , blocking }
2727
2828object SynapseExtensionUtilities {
@@ -31,40 +31,84 @@ object SynapseExtensionUtilities {
3131
3232 object Environment extends Enumeration {
3333 type Environment = Value
34- val Dev, Daily, Weekly = Value
34+ val EDog, Daily, DXT, MSIT = Value
35+
3536 def withNameOpt (s : String ): Option [Value ] = values.find(_.toString.toLowerCase == s.toLowerCase)
3637 }
3738
38- lazy val TimeoutInMillis : Int = 30 * 60 * 1000
39-
40- lazy val BaseUri : String = s " $SSPHost/metadata "
41- lazy val ArtifactsUri : String = s " $BaseUri/workspaces/ $WorkspaceId/artifacts "
42-
43- lazy val AadAccessTokenResource : String = Secrets .AadResource
44- lazy val AadAccessTokenClientId : String = " 1950a258-227b-4e31-a9cf-717495945fc2"
39+ object Resource extends Enumeration {
40+ type Resource = Value
41+ val SSPHost, WorkspaceId, UxHost, TenantId, Password, AadAccessTokenResource, Login = Value
4542
46- lazy val DefaultEnvironment = Environment .Daily
47- lazy val SynapseEnvironment = getWorkingEnvironment(DefaultEnvironment )
48-
49- lazy val EnvironmentString = SynapseEnvironment match {
50- case Environment .Dev => " dev"
51- case Environment .Daily => " daily"
52- case Environment .Weekly => " weekly"
43+ def withNameOpt (s : String ): Option [Value ] = values.find(_.toString.toLowerCase == s.toLowerCase)
5344 }
5445
55- lazy val SSPHost : String = getSynapseExtensionSecret(EnvironmentString , " ssp-host" )
56- lazy val WorkspaceId : String = getSynapseExtensionSecret(EnvironmentString , " workspace-id" )
57- lazy val UxHost : String = getSynapseExtensionSecret(EnvironmentString , " ux-host" )
58- lazy val TenantId : String = getSynapseExtensionSecret(EnvironmentString , " tenant-id" )
59- lazy val Password : String = getSynapseExtensionSecret(EnvironmentString , " password" )
60-
61- lazy val Folder : String = s " build_ ${BuildInfo .version}/synapseextension/notebooks "
62- lazy val StorageAccount : String = " mmlsparkbuildsynapse"
63- lazy val StorageContainer : String = " synapse-extension"
46+ val TimeoutInMillis : Int = 60 * 60 * 1000
47+ val DefaultLogin = " login.microsoftonline.com"
48+ val PpeLogin = " login.windows-ppe.net"
49+
50+ // TODO: Edog is not yet available.
51+ // Details: 401 Unauthorized upon creating the lakehouse
52+ lazy val EdogResources = HashMap (
53+ Resource .SSPHost -> Secrets .SynapseExtensionEdogSspHost ,
54+ Resource .WorkspaceId -> Secrets .SynapseExtensionEdogWorkspaceId ,
55+ Resource .UxHost -> Secrets .SynapseExtensionEdogUxHost ,
56+ Resource .TenantId -> Secrets .SynapseExtensionEdogTenantId ,
57+ Resource .Password -> Secrets .SynapseExtensionEdogPassword ,
58+ Resource .AadAccessTokenResource -> " https://analysis.windows-int.net/powerbi/api" ,
59+ Resource .Login -> PpeLogin )
60+
61+ lazy val DailyResources = HashMap (
62+ Resource .SSPHost -> Secrets .SynapseExtensionDailySspHost ,
63+ Resource .WorkspaceId -> Secrets .SynapseExtensionDailyWorkspaceId ,
64+ Resource .UxHost -> Secrets .SynapseExtensionDailyUxHost ,
65+ Resource .TenantId -> Secrets .SynapseExtensionDailyTenantId ,
66+ Resource .Password -> Secrets .SynapseExtensionDailyPassword ,
67+ Resource .AadAccessTokenResource -> Secrets .AadResource ,
68+ Resource .Login -> DefaultLogin )
69+
70+ lazy val DxtResources = HashMap (
71+ Resource .SSPHost -> Secrets .SynapseExtensionDxtSspHost ,
72+ Resource .WorkspaceId -> Secrets .SynapseExtensionDxtWorkspaceId ,
73+ Resource .UxHost -> Secrets .SynapseExtensionDxtUxHost ,
74+ Resource .TenantId -> Secrets .SynapseExtensionDxtTenantId ,
75+ Resource .Password -> Secrets .SynapseExtensionDxtPassword ,
76+ Resource .AadAccessTokenResource -> Secrets .AadResource ,
77+ Resource .Login -> DefaultLogin )
78+
79+ // TODO: MSIT is not yet available.
80+ // Details: We get PowerBiFeatureDisabled and a 404 upon creating the lakehouse
81+ lazy val MsitResources = HashMap (
82+ Resource .SSPHost -> Secrets .SynapseExtensionMsitSspHost ,
83+ Resource .WorkspaceId -> Secrets .SynapseExtensionMsitWorkspaceId ,
84+ Resource .UxHost -> Secrets .SynapseExtensionMsitUxHost ,
85+ Resource .TenantId -> Secrets .SynapseExtensionMsitTenantId ,
86+ Resource .Password -> Secrets .SynapseExtensionMsitPassword ,
87+ Resource .AadAccessTokenResource -> Secrets .AadResource ,
88+ Resource .Login -> DefaultLogin )
89+
90+ val DefaultEnvironment = Environment .Daily
91+ val ResourceMap = getResources(DefaultEnvironment )
92+ val SSPHost : String = ResourceMap (Resource .SSPHost )
93+ val WorkspaceId : String = ResourceMap (Resource .WorkspaceId )
94+ val UxHost : String = ResourceMap (Resource .UxHost )
95+ val TenantId : String = ResourceMap (Resource .TenantId )
96+ val Password : String = ResourceMap (Resource .Password )
97+ val AadAccessTokenResource : String = ResourceMap (Resource .AadAccessTokenResource )
98+ val Login : String = ResourceMap (Resource .Login )
99+
100+ val BaseUri : String = s " $SSPHost/metadata "
101+ val ArtifactsUri : String = s " $BaseUri/workspaces/ $WorkspaceId/artifacts? "
102+
103+ val AadAccessTokenClientId : String = " 1950a258-227b-4e31-a9cf-717495945fc2"
104+
105+ val Folder : String = s " build_ ${BuildInfo .version}/synapseextension/notebooks "
106+ val StorageAccount : String = " mmlsparkbuildsynapse"
107+ val StorageContainer : String = " synapse-extension"
64108
65109 lazy val AccessToken : String = getAccessToken
66110
67- lazy val Platform = Secrets .Platform .toUpperCase
111+ val Platform : String = Secrets .Platform .toUpperCase
68112
69113 def createSJDArtifact (path : String ): String = {
70114 createSJDArtifact(path, " SparkJobDefinition" )
@@ -73,27 +117,22 @@ object SynapseExtensionUtilities {
73117 def updateSJDArtifact (path : String , artifactId : String , storeId : String ): Artifact = {
74118 val eTag = getETagFromArtifact(artifactId)
75119 val store = Secrets .ArtifactStore .capitalize
76- val excludes : String = " org.scala-lang:scala-reflect," +
77- " org.apache.spark:spark-tags_2.12," +
78- " org.scalatest:scalatest_2.12," +
79- " org.slf4j:slf4j-api"
120+ val sparkVersion = " 3.4"
121+ val packages : String = " com.microsoft.azure:synapseml_2.12:" + BuildInfo .version
80122
81123 val workloadPayload =
82124 s """
83125 |"{
84126 | 'Default ${store}ArtifactId': ' $storeId',
85127 | 'ExecutableFile': ' $path',
86- | 'SparkVersion':'3.4 ',
128+ | 'SparkVersion': ' $sparkVersion ',
87129 | 'SparkSettings': {
88- | 'spark.jars.packages' : ' $SparkMavenPackageList ',
130+ | 'spark.jars.packages' : ' $packages ',
89131 | 'spark.jars.repositories' : ' $SparkMavenRepositoryList',
90- | 'spark.jars.excludes': ' $excludes',
91- | 'spark.dynamicAllocation.enabled': 'false',
92- | 'spark.yarn.user.classpath.first': 'true',
93132 | 'spark.executorEnv.IS_ $Platform': 'true',
94- | 'spark.sql.parquet.outputwriter':
95- | 'org.apache. spark.sql.execution.datasources.parquet.ParquetOutputWriter ',
96- | 'spark.sql.parquet.vorder.enabled ': 'false'
133+ | 'spark.sql.extensions': 'com.microsoft.azure.synapse.ml.predict.PredictExtension',
134+ | 'spark.synapse.ml.predict.enabled': 'true ',
135+ | 'spark.executor.heartbeatInterval ': '60s',
97136 | }
98137 |}"
99138 """ .stripMargin
@@ -124,6 +163,7 @@ object SynapseExtensionUtilities {
124163 |}
125164 | """ .stripMargin
126165 val response = postRequest(ArtifactsUri , reqBody).asJsObject().convertTo[Artifact ]
166+ println(s " Created SJD for $runName: ${getSparkJobDefinitionLink(response.objectId)}" )
127167 response.objectId
128168 }
129169
@@ -278,23 +318,33 @@ object SynapseExtensionUtilities {
278318 }
279319
280320 def getAccessToken : String = {
281- val createRequest = new HttpPost (s " https://login.microsoftonline.com / $TenantId/oauth2/token " )
321+ val createRequest = new HttpPost (s " https:// $Login / $TenantId/oauth2/token " )
282322 createRequest.setHeader(" Content-Type" , " application/x-www-form-urlencoded" )
283323 createRequest.setEntity(
284324 new UrlEncodedFormEntity (
285325 List (
286- (" resource" , s " $ AadAccessTokenResource" ),
287- (" client_id" , s " $ AadAccessTokenClientId" ),
326+ (" resource" , AadAccessTokenResource ),
327+ (" client_id" , AadAccessTokenClientId ),
288328 (" grant_type" , " password" ),
289- (" username" , s " SynapseMLE2ETestUser @ $TenantId" ),
290- (" password" , s " $ Password" ),
329+ (" username" , s " AdminUser @ $TenantId" ),
330+ (" password" , Password ),
291331 (" scope" , " openid" )
292332 ).map(p => new BasicNameValuePair (p._1, p._2)).asJava, " UTF-8" )
293333 )
294334 " Bearer " + RESTHelpers .sendAndParseJson(createRequest).asJsObject()
295335 .fields(" access_token" ).convertTo[String ]
296336 }
297337
338+ def getResources (defaultEnv : Environment .Value ): HashMap [Resource .Value , String ] = {
339+ val environment = getWorkingEnvironment(defaultEnv)
340+ environment match {
341+ case Environment .EDog => EdogResources
342+ case Environment .Daily => DailyResources
343+ case Environment .DXT => DxtResources
344+ case Environment .MSIT => MsitResources
345+ }
346+ }
347+
298348 def getWorkingEnvironment (defaultEnv : Environment .Value ): Environment .Value = {
299349 val undefined = " "
300350 val varName = " SYNAPSE_ENVIRONMENT"
@@ -305,14 +355,14 @@ object SynapseExtensionUtilities {
305355 }
306356 val result = if (envValue != None ) envValue.get else defaultEnv
307357 println(s " Using environment ${result.toString}" )
308-
309358 result
310359 }
311360}
312361
313362object SynapseJsonProtocol extends DefaultJsonProtocol {
314363 implicit object LocalDateTimeFormat extends RootJsonFormat [LocalDateTime ] {
315364 def write (dt : LocalDateTime ): JsValue = JsString (dt.format(DateTimeFormatter .ISO_LOCAL_DATE_TIME ))
365+
316366 def read (value : JsValue ): LocalDateTime =
317367 LocalDateTime .parse(value.toString().replaceAll(" ^\" +|\" +$" , " " ),
318368 DateTimeFormatter .ISO_LOCAL_DATE_TIME )
@@ -322,4 +372,5 @@ object SynapseJsonProtocol extends DefaultJsonProtocol {
322372 jsonFormat3(Artifact .apply)
323373 implicit val SparkJobDefinitionExecutionResponseFormat : RootJsonFormat [SparkJobDefinitionExecutionResponse ] =
324374 jsonFormat3(SparkJobDefinitionExecutionResponse .apply)
375+
325376}
0 commit comments