技术

ES实现分组查询Java API

需要对ES中的数据先过滤查询,然后分组统计,现将本人的示例代码贴在下,希望对你有用(搜索中加聚合)

/**
 * 组合查询条件:
 * 类似sql语句 select deviceId, count(1) from table where code = '0' group by deviceId
 */
private BoolQueryBuilder getSearchBuilder(Log log) {
	BoolQueryBuilder searchBuilder = transportClient.prepareSearch();
	BoolQueryBuilder builder = QueryBuilders.boolQuery();
	//类型sql的where条件
	if (!StringUtils.isBlank(log.getDeviceId())) {
        builder.must(QueryBuilders.matchQuery("deviceId",log.getDeviceId()));
    }
    if (!StringUtils.isBlank(log.getCode())) {
        if (SUCCESS.equals(log.getCode())) {
            //查询所有的请求成功的数据                		
            builder.must(QueryBuilders.matchQuery(“code”,log.getCode()));
        } else {
            //查询所有的失败的数据
            builder.mustNot(QueryBuilders.matchQuery("code", SUCCESS));
        }
    }
    //请求开始时间和请求结束时间
    String stmStart = log.getStmStart();
    String stmEnd = log.getStmEnd();

    if (!StringUtils.isBlank(stmStart) || !StringUtils.isBlank(stmEnd)) {
        stmStart = StringUtils.isBlank(stmStart) ? stmStart : stmStart + ZERO;
        stmEnd = StringUtils.isBlank(stmEnd) ? stmEnd : stmEnd + ZERO;
        rangeBuilder rangeBuilder = QueryBuilders.rangeQuery(STM);
        rangeBuilder.from(stmStart);
        rangeBuilder.to(stmEnd);
        builder.must(rangeBuilder);
    }

    searchBuilder.setQuery(builder);
    //添加分组,deviceCount为别名,deviceId为要分组的字段
    //200000为最大的分组显示条数,默认为10条
    SearchRequestBuilder termsBuilder = AggregationBuilders.terms("deviceCount").field(“deviceId”).size(200000);
    searchBuilder.addAggregation(termsBuilder);

    //此处用于使用聚合函数分页无效,需手动分页
    /*
    Integer pageNo = log.getPage();
    Integer limit = log.getLimit();
    if (null != pageNo && null != limit) {
    searchBuilder.setFrom((pageNo -1) * limit).setSize(limit);
    }
    */
    return searchBuilder;
}

 

获取查询分组后数据

public CountLogRespDto getBoxCountListLogs(Log log) {
    getTransportClient();
    SearchRequestBuilder searchBuilder = getSearchBuilder(log);
    SearchResponse response = searchBuilder.execute().actionGet();
    //取出分组数据,通过别名
    Terms terms = response.getAggregations().get("deviceCount");

    System.out.println(terms.getBuckets().size());
        
    //手动分页,前端传入页码和分页展示条数,通过截断list来展示分页数据
    List<LogDto> listDtos = new ArrayList<>();
    int total = terms.getBuckets().size();
    Integer pageNo = log.getPage();
    Integer limit = log.getLimit();
    int startIndex = 0;
    int endIndex = 10;
    if (null != pageNo && null != limit) {
        startIndex = (pageNo - 1) * limit;
        limit = limit * pageNo;
        if (limit > total) {
            endIndex = total;
        } else {
            endIndex = limit;
        }
    }
    List<Terms.Bucket> pageList = terms.getBuckets().subList(startIndex, endIndex);

    LogDto dto;
    for (Terms.Bucket bucket: pageList) {
        dto = new LogDto();
        dto.setCount(bucket.getDocCount());
        dto.setDeviceId(String.valueOf(bucket.getKey()));
        listDtos.add(dto);
        System.out.println("deviceId=" + bucket.getKey() + " deviceCount=" + bucket.getDocCount());
    }

    LogRespDto respDto = new LogRespDto();
    respDto.setLogDtoList(listDtos);
    respDto.setTotal((long)(terms.getBuckets().size()));
    respDto.setPage(log.getPage());
    respDto.setLimit(log.getLimit());
    return respDto;
}

连接ElasticSearch集群

private static TransportClient transportClient = null;
/**
 * 初始化链接elasticsearch
 */
public static TransportClient getTransportClient() {
	if (null == transportClient) {
		InputStream inputStream =  EsSearchLogImpl.class.getClassLoader().getResourceAsStream("init.properties");
		Properties properties = new Properties();
		try{
			properties.load(inputStream);
			inputStream.close();
		} catch (IOException e1) {
			e1.printStackTrace();
		}
		String elasticsearchUrl = properties.getProperty("elasticsearch.url");
		String clusterName = properties.getProperty("elasticsearch.cluster.name");
		//如果需要更改集群名(默认是elasticsearch),需要如下设置:
		Settings settings = Settings.builder().put("cluster.name", clusterName)
		//你可以设置client.transport.sniff为true来使客户端去嗅探整个集群的状态,会自动添加其他节点的ip
		//.put("client.transport.sniff", true)
		.build();
		try {
			String[] array  = elasticsearchUrl.split(",");
			transportClient = new PreBuiltTransportClient(settings);
			for (String url : array) {
				String[] urlArray = url.split(":");
				transportClient.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(urlArray[0], Integer.valueOf(urlArray[1]))));
			}
		} catch (Exception e) {
			logger.error(e.getMessage(), e);
			throw e;
		}
		logger.info("已连成功连接上es搜索引擎");
    }
	return transportClient;
}

init.properties内容如下

elasticsearch.url=192.168.1.12:9305
elasticsearch.cluster.name=es-test

备注:官方API

发表评论

电子邮件地址不会被公开。 必填项已用*标注