Skip to content

Conversation

@YanjieGao
Copy link
Contributor

Hi all,
I want to submit a SkewJoin operator in SparkSql joins.scala.

In some case ,data skew happens.SkewJoin sample the table rdd to find the largest key,then make the largest key
rows as a table rdd.The streamed rdd will be made as mainstreamedtable rdd without the largest key and the maxkeystreamedtable rdd
with the largest key.
Then,join the two table with the buildtable.
Finally,union the two result rdd.

Hi all,
I want to submit a SkewJoin operator in SparkSql joins.scala.

In some case ,data skew happens.SkewJoin  sample the table rdd to find the largest key,then make the largest key
rows as a table rdd.The streamed rdd will be made  as mainstreamedtable rdd without the largest key and the maxkeystreamedtable rdd
with the largest key.
Then,join the two table  with the buildtable.
Finally,union the two result rdd.
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@rxin
Copy link
Contributor

rxin commented Jun 20, 2014

@YanjieGao
Copy link
Contributor Author

Thanks a lot .I will reformat the code to match the Spark coding style.

Reformat the annotation and var name format
Reformat the annotation and var name format
@YanjieGao
Copy link
Contributor Author

Hi rxin,I reformat it . Can you give me some suggestions.I will try to make it better. Thanks a lot

@YanjieGao YanjieGao changed the title SparkSQL add SkewJoin [SPARK-2236][SQL]SparkSQL add SkewJoin Jun 22, 2014
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be done like

val (maxKeyStreamedTable, mainStreamedTable) = streamedTable.partition(row => {
  streamSideKeyGenerator(row).toString().equals(maxrowKey.toString())
})

@chenghao-intel
Copy link
Contributor

Skew join optimization will be very helpful, but how do we know what are skew join keys? By the way, the code will not take effect if you don't put the strategy object into a give context, (e.g. SQLContext, HiveContext)

should use "{}" for temp += 1 in a new line

val (maxKeyStreamedTable, mainStreamedTable) = streamedTable.partition(row => {
  streamSideKeyGenerator(row).toString().equals(maxrowKey.toString())
})
@YanjieGao
Copy link
Contributor Author

Thanks a lot ,Chenghao . This code like a demo ,i think we could through improve sample phrase and use some strategy to judge the which key set are skew keys. we can through absolute rate or relative rate .What's your suggestions?

@YanjieGao
Copy link
Contributor Author

Hi All,I update 8 files like the pull add EXCEPT operator .But when i exec the test ,it exec case class CartesianProduct operator.I think there are some mistakes in my code .Can you help me? Thanks a lot!

@YanjieGao
Copy link
Contributor Author

Hi all. I have resolve the conflict.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A key question for this PR is if we want to model skew as a join type. My first inclination would be no, since it is a hint about how to execute the query for maximum performance and not something that changes the answer of the query.

@marmbrus
Copy link
Contributor

marmbrus commented Jul 8, 2014

I think there are major questions that will need to be answered before we could merge this PR:

  • Is skew just a hint instead of a join type and how do we propagate that information through?
  • @chenghao-intel asks a valid question about join keys. I'm not sure how this could work without them.
  • I think the current implementation of execute() is going to suffer from serious performance issues. It does many passes over the data, does a lot of unnecessary string manipulation and computes several Cartesian products. You will need to run some performance experiments with large datasets in order to show that this operator actually has benefits.

@YanjieGao
Copy link
Contributor Author

Thanks Michael ,
(1) We could make it as a user hint ,like hive does .
set hive.optimize.skewjoin = true;
set hive.skewjoin.key = skew_key_threshold (default = 100000)
We could use set sparksql.optimize.skewjoin=true
set sparksql.skewjoin.key=skew_key_threshold
(2)We could use sample to found the relative num of the key and though skew_key_threshold which is user set can judge which key is over the threshold
(3) toString will generate many singleton object .
,I will optimize the code in next step.

@YanjieGao
Copy link
Contributor Author

Hi , I also make a left semi join .I don't know is this join as a optimization as the left semi join or as a single join algorithm. I think the 1127 PR also has some optimization need to do .Do you think this 1127 PR has it value to be merged ?Thanks a lot.
#1127

@YanjieGao
Copy link
Contributor Author

Hi I rewrite the code ,and resolve some former problem

@marmbrus
Copy link
Contributor

marmbrus commented Sep 3, 2014

Hi @YanjieGao, as I said #1127 this will be a great optimization to have after we figure out how to choose join algorithms based on statistics. I think we should close this issue for now and reopen once we have a design for this.

Thanks for working on it!

@YanjieGao
Copy link
Contributor Author

Hi marmbrus,I will close it. Best Regards

@YanjieGao YanjieGao closed this Sep 3, 2014
udaynpusa pushed a commit to mapr/spark that referenced this pull request Jan 30, 2024
1. Do not hardcode bc-fips jar version, use '*' wildcard instead
2. Add FIPS-specific options to keystore verification command
mapr-devops pushed a commit to mapr/spark that referenced this pull request May 8, 2025
1. Do not hardcode bc-fips jar version, use '*' wildcard instead
2. Add FIPS-specific options to keystore verification command
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants