Collectives™ on Stack Overflow
Find centralized, trusted content and collaborate around the technologies you use most.
Learn more about Collectives
Teams
Q&A for work
Connect and share knowledge within a single location that is structured and easy to search.
Learn more about Teams
Ask Question
.agg(
max(col("datetime")).as("visitdate"),
count(col("ipaddress")).as("number_of_records"),
collect_list("ipaddress").as("ipaddress")
.select(col("window"),col("visitdate"),col("number_of_records"),explode(col("ipaddress")).as("ipaddress"))
.join(joinedDf,Seq("ipaddress"))
.select(
col("window"),
col("category").as("category_page_category"),
col("category"),
col("calculation1"),
hour(col("dateTime")).as("hour_label").cast("String"),
col("dateTime").as("date_label").cast("String"),
minute(col("dateTime")).as("minute_label").cast("String"),
col("demography"),
col("fullname").as("full_name"),
col("ipaddress"),
col("number_of_records"),
col("endpoint").as("pageurl"),
col("pageurl").as("page_url"),
col("username"),
col("visitdate"),
col("productname").as("product_name")
).dropDuplicates().toDF()
There are no aggregations performed on this Data Frame earlier at this stage.
I have applied aggregation only once but still I am getting below error:
Exception in thread "main" org.apache.spark.sql.AnalysisException:
Multiple streaming aggregations are not supported with streaming
DataFrames/Datasets;
Thanks for contributing an answer to Stack Overflow!
-
Please be sure to
answer the question
. Provide details and share your research!
But
avoid
…
-
Asking for help, clarification, or responding to other answers.
-
Making statements based on opinion; back them up with references or personal experience.
To learn more, see our
tips on writing great answers
.