金山竹影几千秋,云索高飞水自流,万里长江飘玉带,一轮银月滚金球,远自湖北三千里,近到江南十六州,美景一时观不透,天缘有分画中游!
需求:在订单表中,根据buyerNick分组,统计每个buyerNick的电话、地址、支付总金额以及总商品数,返回结果是CustomerDetail。
* project:列出所有本次查询的字段,包括查询条件的字段和需要搜索的字段;
* match:搜索条件criteria
* unwind:某一个字段是集合,将该字段分解成数组
* group:分组的字段,以及聚合相关查询
* sum:求和(同sql查询)
* count:数量(同sql查询)
* as:别名(同sql查询)
* addToSet:将符合的字段值添加到一个集合或数组中
* sort:排序
* skip&limit:分页查询
public List<CustomerDetail> customerDetailList(Integer pageNum,String userId,String buyerNick,String itemId,List<String> phones) throws Exception{
Criteria criteria = Criteria.where("userId").is(userId);
Integer pageSize = 10;
Integer startRows = (pageNum - 1) * pageSize;
if(buyerNick != null && !"".equals(buyerNick)){
criteria.and("buyerNick").is(buyerNick);
if(phones != null && phones.size() > 0){
criteria.and("mobile").in(phoneList);
if(itemId != null && !"".equals(itemId)){
criteria.and("orders.numIid").is(itemId);
Aggregation customerAgg = Aggregation.newAggregation(
Aggregation.project("buyerNick","payment","num","tid","userId","address","mobile","orders"),
Aggregation.match(criteria),
Aggregation.unwind("orders"),
Aggregation.group("buyerNick").first("buyerNick").as("buyerNick").first("mobile").as("mobile").
first("address").as("address").sum("payment").as("totalPayment").sum("num").as("itemNum").count().as("orderNum"),
Aggregation.sort(new Sort(new Sort.Order(Sort.Direction.DESC, "totalPayment"))),
Aggregation.skip(startRows),
Aggregation.limit(pageSize)
List<CustomerDetail> customerList = tradeRepository.findAggregateList(new Query(criteria), userId, customerAgg,CustomerDetail.class);
return customerList;
public <T> List<T> findAggregateList(Query query,String userNickName, Aggregation aggregation,Class<T> clazz) {
AggregationResults<T> aggregate = this.mongoTemplate.aggregate(aggregation, collectionName, clazz);
List<T> customerDetails = aggregate.getMappedResults();
return customerDetails;
Trade表:
public class TradeInfo{
private String tid;//订单id
private Double payment;//支付金额
private String buyerNick;//买家昵称
private String address;//地址
private String mobile;//手机号
private Long num;//购买商品数量
private List<order> orders;子订单
CustomerDetail:
public class CustomerDetail{
private String buyerNick;//买家昵称
private Double totalPayment;//订单金额
private Integer orderNum;//订单数
private Integer itemNum;//商品数
private String address;//地址
聚合分组,先过滤字段为type=Constants.DataSetOpt.OPT_VIEW,通过dateSetId进行分组,并计算每组的个数,.first是需要显示的字段。
//浏览分组
private List<Document> flowGroup() {
Criteria criteria = Criteria.where("type").is(Constants.DataSetOpt.OPT_VIEW);
Aggregation agg = Aggregation.newAggregation(
Aggregation.match(criteria),
Aggregation.group("dataSetId")
.first("dataSetId").as("dataSetId")
.count()
.as("dataSetIdCount")
.first("dataSetType").as("dataSetType")
AggregationResults<Document> result = mongoTemplate.aggregate(agg,Access.class ,Document.class);
return result.getMappedResults();
也可以使用TypedAggregation的方式进行聚合分组,如下:
@Test
public void statTest(){
TypedAggregation<Statistics> agg = Aggregation.newAggregation(Statistics.class,
Aggregation.group("month")
.sum("totalVisit")
.as("sumTotalVisit")
.sum("totalDownload")
.as("sumTotalDownload"));
AggregationResults<Document> result = mongoTemplate.aggregate(agg, Document.class);
// return result.getMappedResults();
result.getMappedResults().forEach(document -> System.out.println(document));
***注意,使用Aggregation聚合函数Springboot版本限制,否则不生效****
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.10.RELEASE</version>
</parent>
Aggregation聚合查询金山竹影几千秋,云索高飞水自流,万里长江飘玉带,一轮银月滚金球,远自湖北三千里,近到江南十六州,美景一时观不透,天缘有分画中游!需求:在订单表中,根据buyerNick分组,统计每个buyerNick的电话、地址、支付总金额以及总商品数,返回结果是CustomerDetail。/* * project:列出所有本次查询的字段,包括查询条件的字...
spark_streaming_aggregation
通过Spark Streaming进行事件聚合。 该示例包括基于Kafka或TCP事件流的事件聚合。 这些说明是但是应该在独立群集上工作。
生成并运行Kafka示例
生成程序集./sbt/sbt package
确保您有正在运行的Spark服务器和Cassandra节点在本地主机上侦听
确保您已在本地主机上运行Kafka服务器,并预先设置了主题events 。
启动Kafka生产者./sbt/sbt "run-main KafkaProducer"
将程序集提交到spark服务器dse spark-submit --class KafkaConsumer ./target/scala-2.10/sparkstreamingaggregation_2.10-0.2.jar
数据将发布到C *列系列demo.event_log和
@Document("Schedule")
@Accessors(chain = true)
public class Schedule extends BaseMongoEntity {
private static final long serialVersionUID = 1L;
@ApiModelProperty(value = "医院编号")
@Indexed //.
用Mongo的聚合组件Aggregation要用到两个方法skip和limit。skip设置起点(分页的时候不包含起点,从起点的下一行开始),limit设置条数。如:
Aggregation.skip(10),
Aggregation.limit(20)的意思是(10,20)第10条以后取20条数据。
Criteria criteria= new Criteria();
MongoDB是一个文档数据库(以JSON为数据模型),由C++语言编写。MongoDB的数据是存储在硬盘上的,只不过需要操作的数据会被加载到内存中提高效率,所以MongoDB本身很吃内存。(本文章使用4.x版本,自带分布式事务)
MongoDB基于灵活的JSON文档模型,非常适合敏捷式快速开发。与此同时,其与生俱来的高可用、高水平扩展能力使它在处理海量、高并发数据应用时颇具优势。如何考虑是否选择MongoDB?
没有某个业务场景必须要使用MongoDB才能解决,但使用MongoDB通常能让你以更低的成本
T.class为mongo数据库实体类
TypedAggregation<T> noRepeated = Aggregation.newAggregation(
T.class,//数据库的实体类
Aggregation.match(Criteria.where(数据库字段).is(待满足的条件).and(数据库字段).is(待满足的条件)),//匹配的规则,先获取数据中将满足条件的警报
参数说明:sql(Operators)
where ($match) 、group by ($group)、having($match)、select($project)、order by($sort)、limit($limit)
sum($sum)、count($sum)、join($lookup)
* project:列出所有本次查询的字段,包括查询条件的字段和需要搜索的字段;
* match:搜索条件criteria
* un...
db.material.insert([
{ "_id" : "M00001", "name" : "多乐士油漆", "price" : 80},
{ "_id" : "M00002", "name" : "马可波罗瓷砖", "price" : 20 },
{ "_id" : "M00003", "name": "海螺水泥", "price": 150 },
{ "_id" : "M00004", "n.
在Spring Data MongoDB中,可以使用MongoTemplate进行联表查询。具体来说,可以使用MongoTemplate的aggregate方法来执行聚合查询,通过聚合管道操作符实现联表查询。
以下是一个使用MongoTemplate进行联表查询的示例:
```java
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.LookupOperation;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.stereotype.Repository;
import java.util.List;
import static org.springframework.data.mongodb.core.aggregation.Aggregation.*;
import static org.springframework.data.mongodb.core.query.Criteria.*;
@Repository
public class ExampleRepository {
private final MongoTemplate mongoTemplate;
public ExampleRepository(MongoTemplate mongoTemplate) {
this.mongoTemplate = mongoTemplate;
public List<Example> findByCategoryId(String categoryId) {
LookupOperation lookupOperation = LookupOperation.newLookup()
.from("category")
.localField("categoryId")
.foreignField("_id")
.as("category");
TypedAggregation<Example> aggregation = newAggregation(Example.class,
match(where("categoryId").is(categoryId)),
lookupOperation,
unwind("category"),
project("title", "category.name").andExclude("_id"));
return mongoTemplate.aggregate(aggregation, Example.class).getMappedResults();
在此示例中,我们使用LookupOperation来连接`Example`和`Category`集合,通过匹配`categoryId`和`_id`字段来联表查询。然后,我们使用unwind操作符展开`category`字段中的数组,以便进行后续的投影操作。最后,我们使用project操作符来选择需要返回的字段并排除`_id`字段。
请注意,在此示例中,我们假设存在一个名为`category`的集合,其包含`_id`和`name`字段。如果你的数据模型与此不同,请相应地更改代码。