Skip to content

Commit 1948fe2

Browse files
committed
Repartition is now done based on size
1 parent 45adc19 commit 1948fe2

File tree

5 files changed

+39
-11
lines changed

5 files changed

+39
-11
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
.classpath
55
.project
66
.settings/
7+
.cache-main
8+
.cache-tests
79
target/
810
project/target
911
dependency-reduced-pom.xml
12+
/bin/

build.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ resolvers += "sonatype-releases" at "https://oss.sonatype.org/content/repositori
2121
resolvers += "Spark Package Main Repo" at "https://dl.bintray.com/spark-packages/maven"
2222

2323
libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.1" % "test"
24-
24+
libraryDependencies += "com.madhukaraphatak" %% "java-sizeof" % "0.1"
2525

2626

2727
//unmanagedJars in Compile += file("lib/partner.jar")

project/plugins.sbt

+1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
addSbtPlugin("org.spark-packages" % "sbt-spark-package" % "0.2.2")
22
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")
3+
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "4.0.0")

src/main/scala/com/springml/spark/salesforce/Utils.scala

+33-9
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package com.springml.spark.salesforce
22

33
import com.sforce.soap.partner.{SaveResult, Connector, PartnerConnection}
44
import com.sforce.ws.ConnectorConfig
5+
import com.madhukaraphatak.sizeof.SizeEstimator
56
import org.apache.log4j.Logger
67
import org.apache.spark.rdd.RDD
78
import org.apache.spark.sql.Row
@@ -10,7 +11,7 @@ import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType}
1011
/**
1112
* Created by madhu on 9/7/15.
1213
*/
13-
object Utils extends Serializable{
14+
object Utils extends Serializable {
1415

1516

1617
private def fieldJson(fieldName:String,datasetName:String) = {
@@ -70,18 +71,41 @@ object Utils extends Serializable{
7071
})
7172
}
7273

73-
def repartition(rdd: RDD[Row]): RDD[Row] = {
74+
def repartition(rdd: RDD[Row]): RDD[Row] = {
75+
val totalDataSize = getTotalSize(rdd)
76+
val maxBundleSize = 1024 * 1024 * 10l;
77+
var partitions = 1
78+
if (totalDataSize > maxBundleSize) {
79+
partitions = Math.round(totalDataSize / maxBundleSize) + 1
80+
}
7481

75-
val NO_OF_ROWS_PARTITION = 500
76-
val totalRows = rdd.count()
77-
val partitions = Math.round(totalRows / NO_OF_ROWS_PARTITION) + 1
78-
//val noPartitions = Math.max(rdd.partitions.length, partititons)
7982
val shuffle = rdd.partitions.length < partitions
8083
rdd.coalesce(partitions.toInt, shuffle)
8184
}
8285

86+
def getTotalSize(rdd: RDD[Row]): Long = {
87+
// This can be fetched as optional parameter
88+
val NO_OF_SAMPLE_ROWS = 10l;
89+
val totalRows = rdd.count();
90+
var totalSize = 0l
91+
if (totalRows > NO_OF_SAMPLE_ROWS) {
92+
val sampleRDD = rdd.sample(true, NO_OF_SAMPLE_ROWS)
93+
val sampleRDDSize = getRDDSize(sampleRDD)
94+
totalSize = sampleRDDSize.*(totalRows)./(NO_OF_SAMPLE_ROWS)
95+
} else {
96+
totalSize = getRDDSize(rdd)
97+
}
98+
99+
totalSize
100+
}
83101

84-
85-
86-
102+
def getRDDSize(rdd: RDD[Row]) : Long = {
103+
var rddSize = 0l
104+
val rows = rdd.collect()
105+
for (i <- 0 until rows.length) {
106+
rddSize += SizeEstimator.estimate(rows.apply(i).toSeq.map { value => value.toString() }.mkString(","))
107+
}
108+
109+
rddSize
110+
}
87111
}

src/test/scala/com/springml/spark/salesforce/TestUtils.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class TestUtils extends FunSuite {
4343

4444
val repartitionDF = Utils.repartition(inMemoryRDD)
4545

46-
assert(repartitionDF.partitions.length == 5)
46+
assert(repartitionDF.partitions.length >= 30)
4747

4848

4949
}

0 commit comments

Comments
 (0)