Skip to content

Commit 0190cc4

Browse files
author
ymahajan
committed
Merge branch 'master' into spark_2.3_merge
Conflicts: core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/JDBCSourceAsColumnarStore.scala
2 parents 7f9c6b3 + 73838ca commit 0190cc4

20 files changed

Lines changed: 3118 additions & 534 deletions

File tree

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1105,7 +1105,7 @@ task checkAll {
11051105
dependsOn ":gemfire-connector:check"
11061106
}
11071107
if (!rootProject.hasProperty('smoke.skip')) {
1108-
dependsOn buildDtests, ":snappy-dtests_${scalaBinaryVersion}:scalaTest"
1108+
dependsOn buildDtests, ":snappy-dtests_${scalaBinaryVersion}:check"
11091109
}
11101110
mustRunAfter buildAll
11111111
}

cluster/src/test/scala/org/apache/spark/sql/SnappySQLQuerySuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ class SnappySQLQuerySuite extends SnappyFunSuite {
253253
"(exists (select col1 from r2 where r2.col1=r1.col1) " +
254254
"or exists(select col1 from r3 where r3.col1=r1.col1))")
255255

256-
val result = df.collect()
256+
df.collect()
257257
checkAnswer(df, Seq(Row(1, "1", "1", 100),
258258
Row(2, "2", "2", 2), Row(4, "4", "4", 4) ))
259259
}

cluster/src/test/scala/org/apache/spark/sql/store/BugTest.scala

Lines changed: 235 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,11 @@
1616
*/
1717
package org.apache.spark.sql.store
1818

19-
import java.util.Properties
20-
2119
import com.pivotal.gemfirexd.Attribute
2220
import com.pivotal.gemfirexd.security.{LdapTestServer, SecurityTestUtils}
23-
import io.snappydata.util.TestUtils
24-
import io.snappydata.{Constant, PlanTest, SnappyFunSuite}
21+
import io.snappydata.{Constant, SnappyFunSuite}
22+
import org.junit.Assert.assertEquals
2523
import org.scalatest.BeforeAndAfterAll
26-
import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
2724

2825
import org.apache.spark.SparkConf
2926

@@ -33,11 +30,11 @@ class BugTest extends SnappyFunSuite with BeforeAndAfterAll {
3330
override def beforeAll(): Unit = {
3431
this.stopAll()
3532
}
36-
33+
3734
protected override def newSparkConf(addOn: (SparkConf) => SparkConf): SparkConf = {
3835
val ldapProperties = SecurityTestUtils.startLdapServerAndGetBootProperties(0, 0, sysUser,
3936
getClass.getResource("/auth.ldif").getPath)
40-
import com.pivotal.gemfirexd.Property.{AUTH_LDAP_SERVER, AUTH_LDAP_SEARCH_BASE}
37+
import com.pivotal.gemfirexd.Property.{AUTH_LDAP_SEARCH_BASE, AUTH_LDAP_SERVER}
4138
for (k <- List(Attribute.AUTH_PROVIDER, AUTH_LDAP_SERVER, AUTH_LDAP_SEARCH_BASE)) {
4239
System.setProperty(k, ldapProperties.getProperty(k))
4340
}
@@ -63,11 +60,11 @@ class BugTest extends SnappyFunSuite with BeforeAndAfterAll {
6360
if (ldapServer.isServerStarted) {
6461
ldapServer.stopService()
6562
}
66-
import com.pivotal.gemfirexd.Property.{AUTH_LDAP_SERVER, AUTH_LDAP_SEARCH_BASE}
63+
import com.pivotal.gemfirexd.Property.{AUTH_LDAP_SEARCH_BASE, AUTH_LDAP_SERVER}
6764
for (k <- List(Attribute.AUTH_PROVIDER, AUTH_LDAP_SERVER, AUTH_LDAP_SEARCH_BASE)) {
6865
System.clearProperty(k)
6966
System.clearProperty("gemfirexd." + k)
70-
System.clearProperty(Constant.STORE_PROPERTY_PREFIX + k)
67+
System.clearProperty(Constant.STORE_PROPERTY_PREFIX + k)
7168
}
7269
System.clearProperty(Constant.STORE_PROPERTY_PREFIX + Attribute.USERNAME_ATTR)
7370
System.clearProperty(Constant.STORE_PROPERTY_PREFIX + Attribute.PASSWORD_ATTR)
@@ -89,8 +86,7 @@ class BugTest extends SnappyFunSuite with BeforeAndAfterAll {
8986

9087
// TODO : Use the actual connection pool limit
9188
val limit = 500
92-
93-
for (i <- 1 to limit) {
89+
for (_ <- 1 to limit) {
9490
val snc2 = snc.newSession()
9591
snc2.snappySession.conf.set(Attribute.USERNAME_ATTR, user2)
9692
snc2.snappySession.conf.set(Attribute.PASSWORD_ATTR, user2)
@@ -100,4 +96,232 @@ class BugTest extends SnappyFunSuite with BeforeAndAfterAll {
10096
assertEquals(1, rs.length)
10197
}
10298
}
99+
100+
test("SNAP-2342 nested query involving joins & union throws Exception") {
101+
val user1 = "gemfire1"
102+
val session = snc.newSession()
103+
session.snappySession.conf.set(Attribute.USERNAME_ATTR, user1)
104+
session.snappySession.conf.set(Attribute.PASSWORD_ATTR, user1)
105+
106+
session.sql(s"create table ujli ( " +
107+
"aagmaterial string," +
108+
"accountassignmentgroup string," +
109+
"accounttype string," +
110+
"allocationcycle string," +
111+
"allocationsegment string," +
112+
"asset string," +
113+
"billingdocument string," +
114+
"billingdocumentitem string," +
115+
"bravoequitycode string," +
116+
"bravominorcode string," +
117+
"bsegdocumentlinenumber string," +
118+
"businessplace string," +
119+
"businesstransaction string," +
120+
"controllingarea string," +
121+
"copadocumentnumber string," +
122+
"copaobjectnumber string," +
123+
"costcenter string," +
124+
"costelement string," +
125+
"countryofshiptocustomer string," +
126+
"createdby string," +
127+
"creationtime string," +
128+
"customer string," +
129+
"customergroup string," +
130+
"debitcreditindicator string," +
131+
"distributionchannel string," +
132+
"division string," +
133+
"documentdate string," +
134+
"documentheadertext string," +
135+
"documentlinenumberinsourcesystem string," +
136+
"documentnumberinsourcesystem string," +
137+
"documenttype string," +
138+
"edgcreateditemindoc string," +
139+
"entrydate string," +
140+
"errorstatus string," +
141+
"fidocumentquantity string," +
142+
"fiscalperiod string," +
143+
"fiscalyear string," +
144+
"fsid string," +
145+
"functionalareacode string," +
146+
"glaccountcode string," +
147+
"hleamount string," +
148+
"indexfromcopa string," +
149+
"itemcategory string," +
150+
"itemtext string," +
151+
"kitmaterial string," +
152+
"kittype string," +
153+
"leamount string," +
154+
"lebillingtype string," +
155+
"lecode string," +
156+
"lecurrencycode string," +
157+
"lesalesqty string," +
158+
"lesalesqtyuom string," +
159+
"ledgercode string," +
160+
"localcompanycode string," +
161+
"localdocumenttype string," +
162+
"localfiscalperiod string," +
163+
"localfiscalyear string," +
164+
"localfunctionalareacode string," +
165+
"localglaccountcode string," +
166+
"locallecurrencycode string," +
167+
"localledgercode string," +
168+
"localmrccode string," +
169+
"localprofitcenter string," +
170+
"localsku string," +
171+
"localversioncode string," +
172+
"mrccode string," +
173+
"parentdocumentnumberinsourcesystem string," +
174+
"partnercostcenter string," +
175+
"partnerfunctionalarea string," +
176+
"partnerprofitcenter string," +
177+
"partnersegment string," +
178+
"payer string," +
179+
"pcadocnumber string," +
180+
"pcaitemnumber string," +
181+
"plant string," +
182+
"postingdate string," +
183+
"postingkey string," +
184+
"producthierarchy string," +
185+
"psegment string," +
186+
"rclnt string," +
187+
"reference string," +
188+
"referencedocument string," +
189+
"referencetransaction string," +
190+
"regionofshiptocustomer string," +
191+
"salesdoctype string," +
192+
"salesgroup string," +
193+
"salesoffice string," +
194+
"salesorder string," +
195+
"salesorderitem string," +
196+
"salesorganization string," +
197+
"sectorproductgroup string," +
198+
"shipto string," +
199+
"sleamount string," +
200+
"sourcesystemid string," +
201+
"tradingpartner string," +
202+
"transactioncode string," +
203+
"transactioncurrencyamount string," +
204+
"transactioncurrencycode string," +
205+
"transactiontype string," +
206+
"ujlkey string," +
207+
"valuefieldfromcopa string," +
208+
"vendor string," +
209+
"versioncode string )")
210+
211+
session.sql("create table ujs (" +
212+
"uuid string," +
213+
"bravoequitycode string," +
214+
"controllingarea string," +
215+
"costcenter string," +
216+
"creationtime string," +
217+
"debitcreditindicator string," +
218+
"errstatus string," +
219+
"fiscalyear string," +
220+
"fsid string," +
221+
"functionalareacode string," +
222+
"glaccountcode string," +
223+
"hleamount string," +
224+
"leamount string," +
225+
"lecode string," +
226+
"lecostelement string," +
227+
"lecurrencycode string," +
228+
"leplant string," +
229+
"ledgercode string," +
230+
"localcompanycode string," +
231+
"localfiscalyear string," +
232+
"localfunctionalareacode string," +
233+
"localglaccountcode string," +
234+
"locallecurrencycode string," +
235+
"localledgercode string," +
236+
"localmrccode string," +
237+
"localprofitcenter string," +
238+
"localversioncode string," +
239+
"mrccode string," +
240+
"partnerfunctionalarea string," +
241+
"partnerprofitcenter string," +
242+
"partnersegment string," +
243+
"referencetransaction string," +
244+
"sleamount string," +
245+
"sourceadditionalkey string," +
246+
"sourcesystemid string," +
247+
"tradingpartner string," +
248+
"transactioncurrencyamount string," +
249+
"transactioncurrencycode string," +
250+
"transactiontype string," +
251+
"versioncode string)")
252+
253+
session.sql("create table gfs (" +
254+
"gfs string, " +
255+
" gfsdescription string, " +
256+
" globalfunctionalarea string )")
257+
258+
session.sql("create table bravo (" +
259+
" bravo string," +
260+
"bravodescription string," +
261+
" gfs string, " +
262+
" gfsdescription string)")
263+
264+
session.sql("create table gtw (" +
265+
"gfs string," +
266+
"gfsdescription string," +
267+
"gtw string," +
268+
"gtwdescription string)")
269+
270+
session.sql("create table coa (" +
271+
"accounttype string," +
272+
"errorcode string," +
273+
"errormessage string," +
274+
"errorstatus string," +
275+
"gfs string," +
276+
"gfsdescription string," +
277+
"globalfunctionalarea string," +
278+
"indicevalue string," +
279+
"localfunctionalarea string," +
280+
"localgl string," +
281+
"localgldescription string)")
282+
283+
session.sql(s"create or replace view TrialBalance as " +
284+
s"( select leUniversal,gfs,first(gfsDescription) as gfsDescription, " +
285+
s"first(bravo) as bravo, " +
286+
s"first(bravoDescription) as bravoDescription, first(gtw) as gtw, " +
287+
s"first(gtwDescription) as gtwDescription, " +
288+
s"first(globalFunctionalArea) as globalFunctionalArea," +
289+
s"format_number(sum(credit),2) as credit," +
290+
s" format_number(sum(debit),2) as debit,format_number(sum(total),2) as total from" +
291+
s" ( select a.leCode as leUniversal,a.localCompanyCode as leLocal," +
292+
s" a.mrcCode as mrcUniversal," +
293+
s" a.sourceSystemId as sourceSystem,a.glAccountCode as gfs," +
294+
s"a.localGlAccountCode as localGl," +
295+
s" SUM(hleAmount) as debit,SUM(sleAmount) as credit,SUM(leAmount) as total," +
296+
s" first(b.gfsDescription) as gfsDescription," +
297+
s" first(b.globalFunctionalArea) as globalFunctionalArea," +
298+
s" first((case when a.sourceSystemId='project_one' then e.localGlDescription " +
299+
s" when a.sourceSystemId='btb_latam' then b.gfsDescription else '' end)) " +
300+
s" as localGlDescription ," +
301+
s" first(c.bravoDescription) as bravoDescription," +
302+
s"first(d.gtwDescription) as gtwDescription, " +
303+
s" first(c.bravo) as bravo, first(d.gtw) as gtw from ( select ledgerCode,leCode," +
304+
s" localCompanyCode,mrcCode,fiscalYear,sourceSystemId,localGlAccountCode," +
305+
s" glAccountCode,last(localFunctionalAreaCode),SUM(leAmount) as leAmount," +
306+
s" SUM(hleAmount) as hleAmount,SUM(sleAmount) as sleAmount, glAccountCode ," +
307+
s" 'Local GL' as accountType,localGlAccountCode as localGl from " +
308+
s" ( select ledgerCode,leCode,localCompanyCode,mrcCode,fiscalYear,sourceSystemId," +
309+
s" localGlAccountCode,glAccountCode,localFunctionalAreaCode,leAmount,hleAmount,sleAmount" +
310+
s" from ujli where ledgerCode='0L' and leCode='7600' " +
311+
s" AND fiscalYear='2017' and fiscalPeriod<=3 AND sourceSystemId='btb_latam' union all" +
312+
s" select ledgerCode,leCode,localCompanyCode,mrcCode,fiscalYear,sourceSystemId," +
313+
s" localGlAccountCode,glAccountCode,localFunctionalAreaCode,leAmount,hleAmount," +
314+
s" sleAmount from ujs where ledgerCode='0L' and leCode='7600'" +
315+
s" AND fiscalYear='2017' AND sourceSystemId='btb_latam' ) group by ledgerCode," +
316+
s" leCode,localCompanyCode,mrcCode,fiscalYear,sourceSystemId," +
317+
s" localGlAccountCode,glAccountCode ) a" +
318+
s" left join gfs b on (a.glAccountCode=b.gfs) left join " +
319+
s" bravo c " +
320+
s" on (a.glAccountCode=c.gfs) left join gtw d on (a.glAccountCode=d.gfs)" +
321+
s" left join coa e on(a.accountType=e.accountType and " +
322+
s" a.glAccountCode = e.gfs and a.localGl = e.localGl ) group by a.leCode," +
323+
s"a.localCompanyCode," +
324+
s" a.mrcCode,a.sourceSystemId,a.glAccountCode,a.localGlAccountCode," +
325+
s"c.bravo,d.gtw) group by leUniversal,gfs)")
326+
}
103327
}

core/src/main/scala/org/apache/spark/memory/StoreUnifiedManager.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,14 @@ object MemoryManagerCallback extends Logging {
209209
}
210210
allocator.allocate(size, owner).order(ByteOrder.LITTLE_ENDIAN)
211211
}
212+
213+
/** release and accounting for byte buffer allocated by [[allocateExecutionMemory]] */
214+
def releaseExecutionMemory(buffer: ByteBuffer, owner: String, releaseBuffer: Boolean): Unit = {
215+
if (releaseBuffer) BufferAllocator.releaseBuffer(buffer)
216+
if (buffer.hasArray) {
217+
StoreCallbacksImpl.releaseStorageMemory(owner, buffer.capacity(), offHeap = false)
218+
}
219+
}
212220
}
213221

214222
final class DefaultMemoryConsumer(taskMemoryManager: TaskMemoryManager,

core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnBatch.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@ import scala.util.control.NonFatal
2626

2727
import com.gemstone.gemfire.cache.EntryDestroyedException
2828
import com.gemstone.gemfire.internal.cache.{BucketRegion, GemFireCacheImpl, LocalRegion, NonLocalRegionEntry, PartitionedRegion, RegionEntry, TXStateInterface}
29-
import com.gemstone.gemfire.internal.shared.FetchRequest
30-
import com.gemstone.gemfire.internal.shared.unsafe.UnsafeHolder
29+
import com.gemstone.gemfire.internal.shared.{BufferAllocator, FetchRequest}
3130
import com.koloboke.function.IntObjPredicate
3231
import com.pivotal.gemfirexd.internal.engine.store.GemFireContainer
3332
import com.pivotal.gemfirexd.internal.impl.jdbc.EmbedConnection
3433
import io.snappydata.collection.IntObjectHashMap
3534
import io.snappydata.thrift.common.BufferedBlob
3635

36+
import org.apache.spark.memory.MemoryManagerCallback
3737
import org.apache.spark.sql.execution.columnar.encoding.{ColumnDecoder, ColumnDeleteDecoder, ColumnEncoding, UpdatedColumnDecoder, UpdatedColumnDecoderBase}
3838
import org.apache.spark.sql.execution.columnar.impl._
3939
import org.apache.spark.sql.execution.row.PRValuesIterator
@@ -333,7 +333,7 @@ final class ColumnBatchIteratorOnRS(conn: Connection,
333333
val result = CompressionUtils.codecDecompressIfRequired(
334334
buffer.order(ByteOrder.LITTLE_ENDIAN), allocator)
335335
if (result ne buffer) {
336-
UnsafeHolder.releaseIfDirectBuffer(buffer)
336+
BufferAllocator.releaseBuffer(buffer)
337337
// decompressed buffer will be ordered by LITTLE_ENDIAN while non-decompressed
338338
// is returned with BIG_ENDIAN in order to distinguish the two cases
339339
result
@@ -404,11 +404,11 @@ final class ColumnBatchIteratorOnRS(conn: Connection,
404404
override def test(col: Int, buffer: ByteBuffer): Boolean = {
405405
// release previous set of buffers immediately
406406
if (buffer ne null) {
407-
if (buffer.isDirect) UnsafeHolder.releaseDirectBuffer(buffer)
408407
// release from accounting if decompressed buffer
409-
else if (buffer.order() eq ByteOrder.LITTLE_ENDIAN) {
410-
StoreCallbacksImpl.releaseStorageMemory(CompressionUtils.DECOMPRESSION_OWNER,
411-
buffer.capacity(), offHeap = false)
408+
if (!BufferAllocator.releaseBuffer(buffer) &&
409+
(buffer.order() eq ByteOrder.LITTLE_ENDIAN)) {
410+
MemoryManagerCallback.releaseExecutionMemory(buffer,
411+
CompressionUtils.DECOMPRESSION_OWNER, releaseBuffer = false)
412412
}
413413
}
414414
true

0 commit comments

Comments
 (0)