基于员工信息的CRUD操作

/***员工增删改查的应用程序**@authorAdministrator**/publicclassEmployeeCRUDApp{@SuppressWarnings({"unchecked","resource"})publicstaticvoidmain(String[]args)throwsException{//先构建clientSettingssettings=Settings.builder().put("cluster.name","elasticsearch").build();TransportClientclient=newPreBuiltTransportClient(settings).addTransportAddress(newInetSocketTransportAddress(InetAddress.getByName("localhost"),9300));//createEmployee(client);//getEmployee(client);//updateEmployee(client);//deleteEmployee(client);client.close();}/***创建员工信息(创建一个document)**@paramclient*/privatestaticvoidcreateEmployee(TransportClientclient)throwsException{IndexResponseresponse=client.prepareIndex("company","employee","1").setSource(XContentFactory.jsonBuilder().startObject().field("name","jack").field("age",27).field("position","technique").field("country","china").field("join_date","2017-01-01").field("salary",10000).endObject()).get();System.out.println(response.getResult());}/***获取员工信息**@paramclient*@throwsException*/privatestaticvoidgetEmployee(TransportClientclient)throwsException{GetResponseresponse=client.prepareGet("company","employee","1").get();System.out.println(response.getSourceAsString());}/***修改员工信息**@paramclient*@throwsException*/privatestaticvoidupdateEmployee(TransportClientclient)throwsException{UpdateResponseresponse=client.prepareUpdate("company","employee","1").setDoc(XContentFactory.jsonBuilder().startObject().field("position","techniquemanager").endObject()).get();System.out.println(response.getResult());}/***删除员工信息**@paramclient*@throwsException*/privatestaticvoiddeleteEmployee(TransportClientclient)throwsException{DeleteResponseresponse=client.prepareDelete("company","employee","1").get();System.out.println(response.getResult());}}

基于员工信息的查询操作

/***员工搜索应用程序**@authorAdministrator**/publicclassEmployeeSearchApp{@SuppressWarnings({"unchecked","resource"})publicstaticvoidmain(String[]args)throwsException{Settingssettings=Settings.builder().put("cluster.name","elasticsearch").build();TransportClientclient=newPreBuiltTransportClient(settings).addTransportAddress(newInetSocketTransportAddress(InetAddress.getByName("localhost"),9300));prepareData(client);//executeSearch(client);client.close();}/***执行搜索操作**@paramclient*/privatestaticvoidexecuteSearch(TransportClientclient){SearchResponseresponse=client.prepareSearch("company").setTypes("employee").setQuery(QueryBuilders.matchQuery("position","technique")).setPostFilter(QueryBuilders.rangeQuery("age").from(30).to(40)).setFrom(0).setSize(1).get();SearchHit[]searchHits=response.getHits().getHits();for(inti=0;i<searchHits.length;i++){System.out.println(searchHits[i].getSourceAsString());}}/***准备数据**@paramclient*/privatestaticvoidprepareData(TransportClientclient)throwsException{client.prepareIndex("company","employee","1").setSource(XContentFactory.jsonBuilder().startObject().field("name","jack").field("age",27).field("position","techniquesoftware").field("country","china").field("join_date","2017-01-01").field("salary",10000).endObject()).get();client.prepareIndex("company","employee","2").setSource(XContentFactory.jsonBuilder().startObject().field("name","marry").field("age",35).field("position","techniquemanager").field("country","china").field("join_date","2017-01-01").field("salary",12000).endObject()).get();client.prepareIndex("company","employee","3").setSource(XContentFactory.jsonBuilder().startObject().field("name","tom").field("age",32).field("position","seniortechniquesoftware").field("country","china").field("join_date","2016-01-01").field("salary",11000).endObject()).get();client.prepareIndex("company","employee","4").setSource(XContentFactory.jsonBuilder().startObject().field("name","jen").field("age",25).field("position","juniorfinance").field("country","usa").field("join_date","2016-01-01").field("salary",7000).endObject()).get();client.prepareIndex("company","employee","5").setSource(XContentFactory.jsonBuilder().startObject().field("name","mike").field("age",37).field("position","financemanager").field("country","usa").field("join_date","2015-01-01").field("salary",15000).endObject()).get();}}

基于员工新的聚合查询操作

/***员工聚合分析应用程序**@authorAdministrator**/publicclassEmployeeAggrApp{@SuppressWarnings({"unchecked","resource"})publicstaticvoidmain(String[]args)throwsException{Settingssettings=Settings.builder().put("cluster.name","elasticsearch").build();TransportClientclient=newPreBuiltTransportClient(settings).addTransportAddress(newInetSocketTransportAddress(InetAddress.getByName("localhost"),9300));SearchResponsesearchResponse=client.prepareSearch("company").addAggregation(AggregationBuilders.terms("group_by_country").field("country").subAggregation(AggregationBuilders.dateHistogram("group_by_join_date").field("join_date").dateHistogramInterval(DateHistogramInterval.YEAR).subAggregation(AggregationBuilders.avg("avg_salary").field("salary")))).execute().actionGet();Map<String,Aggregation>aggrMap=searchResponse.getAggregations().asMap();StringTermsgroupByCountry=(StringTerms)aggrMap.get("group_by_country");Iterator<Bucket>groupByCountryBucketIterator=groupByCountry.getBuckets().iterator();while(groupByCountryBucketIterator.hasNext()){BucketgroupByCountryBucket=groupByCountryBucketIterator.next();System.out.println(groupByCountryBucket.getKey()+":"+groupByCountryBucket.getDocCount());HistogramgroupByJoinDate=(Histogram)groupByCountryBucket.getAggregations().asMap().get("group_by_join_date");Iterator<org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket>groupByJoinDateBucketIterator=groupByJoinDate.getBuckets().iterator();while(groupByJoinDateBucketIterator.hasNext()){org.elasticsearch.search.aggregations.bucket.histogram.Histogram.BucketgroupByJoinDateBucket=groupByJoinDateBucketIterator.next();System.out.println(groupByJoinDateBucket.getKey()+":"+groupByJoinDateBucket.getDocCount());Avgavg=(Avg)groupByJoinDateBucket.getAggregations().asMap().get("avg_salary");System.out.println(avg.getValue());}}client.close();}}

注:聚合查询的时候可能出现问题:fielddata需要变为true,这个时候需要手动添加mapping

GET/company/_mapping/employee{"company":{"mappings":{"employee":{"properties":{"age":{"type":"long"},"country":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"join_date":{"type":"date"},"name":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"position":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"salary":{"type":"long"}}}}}}Delete/company上面查出来以后进行截取,修改"fielddata":truePUT/company{"mappings":{"employee":{"properties":{"age":{"type":"long"},"country":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}},"fielddata":true},"join_date":{"type":"date"},"name":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"position":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"salary":{"type":"long"}}}}}


elasticsearch-5.2.0获取代码:

importorg.apache.commons.lang.StringUtils;importorg.elasticsearch.client.Client;importorg.elasticsearch.client.transport.TransportClient;importorg.elasticsearch.common.settings.Settings;importorg.elasticsearch.common.transport.TransportAddress;importorg.elasticsearch.transport.client.PreBuiltTransportClient;importorg.springframework.stereotype.Component;importcom.ad.utils.ConfigUtil;importjavax.annotation.PostConstruct;importjavax.annotation.Resource;importjava.net.InetAddress;importjava.util.ArrayList;importjava.util.List;importjava.util.Map;importjava.util.concurrent.ConcurrentHashMap;/***Createdbywangyunpengon2017/8/5.*/@ComponentpublicclassElasticSearchClient{@Resource(type=ConfigUtil.class)privateConfigUtilconfigUtil;privateMap<String,Client>clientMap=newConcurrentHashMap<String,Client>();/*1.@PostConstruct说明被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器调用一次,类似于Serclet的inti()方法。被@PostConstruct修饰的方法会在构造函数之后,init()方法之前运行。2.@PreConstruct说明被@PreConstruct修饰的方法会在服务器卸载Servlet的时候运行,并且只会被服务器调用一次,类似于Servlet的destroy()方法。被@PreConstruct修饰的方法会在destroy()方法之后运行,在Servlet被彻底卸载之前。*/@PostConstructpublicvoidinit(){init(configUtil.clusterName,configUtil.clusterIpAddress);//init(configUtil.clusterName2,configUtil.clusterIpAddress2);}privatevoidinit(StringclusterName,StringclusterIpAddress){try{Settingssettings=Settings.builder().put("cluster.name",clusterName).build();addClient(settings,getAllAddress(clusterIpAddress));}catch(Exceptione){e.printStackTrace();}}/***得所有的地址端口*@paramips*@return*@throwsException*/publicList<TransportAddress>getAllAddress(Stringips)throwsException{List<TransportAddress>addressList=newArrayList<TransportAddress>();if(StringUtils.isNotBlank(ips)&&ips.contains(",")){String[]ipaddr=ips.split(",");for(inti=0;i<ipaddr.length;i++){addressList.add(newTransportAddress(InetAddress.getByName(ipaddr[i]),configUtil.clusterPort));}}else{addressList.add(newTransportAddress(InetAddress.getByName(ips),configUtil.clusterPort));}returnaddressList;}/***添加es客户端*@paramsetting*@paramtransportAddress*@throwsException*/publicvoidaddClient(Settingssetting,List<TransportAddress>transportAddress)throwsException{TransportClientclient=newPreBuiltTransportClient(setting);for(inti=0;i<transportAddress.size();i++){client.addTransportAddress(transportAddress.get(i));}clientMap.put(setting.get("cluster.name"),client);}publicClientgetClient(StringclusterName){returnclientMap.get(clusterName);}}

elasticsearch-6.0.0获取代码:

importorg.apache.commons.lang.StringUtils;importorg.elasticsearch.client.Client;importorg.elasticsearch.client.transport.TransportClient;importorg.elasticsearch.common.settings.Settings;importorg.elasticsearch.common.transport.InetSocketTransportAddress;importorg.elasticsearch.transport.client.PreBuiltTransportClient;importorg.springframework.stereotype.Component;importcom.ad.utils.ConfigUtil;importjavax.annotation.PostConstruct;importjavax.annotation.Resource;importjava.net.InetAddress;importjava.util.ArrayList;importjava.util.List;importjava.util.Map;importjava.util.concurrent.ConcurrentHashMap;/***Createdbywangyunpengon2017/8/5.*/@ComponentpublicclassElasticSearchClient{@Resource(type=ConfigUtil.class)privateConfigUtilconfigUtil;privateMap<String,Client>clientMap=newConcurrentHashMap<String,Client>();/*1.@PostConstruct说明被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器调用一次,类似于Serclet的inti()方法。被@PostConstruct修饰的方法会在构造函数之后,init()方法之前运行。2.@PreConstruct说明被@PreConstruct修饰的方法会在服务器卸载Servlet的时候运行,并且只会被服务器调用一次,类似于Servlet的destroy()方法。被@PreConstruct修饰的方法会在destroy()方法之后运行,在Servlet被彻底卸载之前。*/@PostConstructpublicvoidinit(){init(configUtil.clusterName,configUtil.clusterIpAddress);//init(configUtil.clusterName2,configUtil.clusterIpAddress2);}privatevoidinit(StringclusterName,StringclusterIpAddress){try{Settingssettings=Settings.builder().put("cluster.name",clusterName).build();addClient(settings,getAllAddress(clusterIpAddress));}catch(Exceptione){e.printStackTrace();}/*try{Settingssettings=TransportClient.builder().build().settings().builder().put("cluster.name",clusterName).build();addClient(settings,getAllAddress(clusterIpAddress));}catch(Exceptione){e.printStackTrace();}*/}/***得所有的地址端口*@paramips*@return*@throwsException*/publicList<InetSocketTransportAddress>getAllAddress(Stringips)throwsException{List<InetSocketTransportAddress>addressList=newArrayList<InetSocketTransportAddress>();if(StringUtils.isNotBlank(ips)&&ips.contains(",")){String[]ipaddr=ips.split(",");for(inti=0;i<ipaddr.length;i++){addressList.add(newInetSocketTransportAddress(InetAddress.getByName(ipaddr[i]),configUtil.clusterPort));}}else{addressList.add(newInetSocketTransportAddress(InetAddress.getByName(ips),configUtil.clusterPort));}returnaddressList;}/***添加es客户端*@paramsetting*@paramtransportAddress*@throwsException*//*TransportClientclient=newPreBuiltTransportClient(settings).addTransportAddress(newInetSocketTransportAddress(InetAddress.getByName("localhost"),9300));*/publicvoidaddClient(Settingssetting,List<InetSocketTransportAddress>transportAddress)throwsException{TransportClientclient=newPreBuiltTransportClient(setting);//TransportClientclient=TransportClient.builder().settings(setting).build();for(inti=0;i<transportAddress.size();i++){client.addTransportAddress(transportAddress.get(i));}clientMap.put(setting.get("cluster.name"),client);}publicClientgetClient(StringclusterName){returnclientMap.get(clusterName);}}

maven依赖:

<!--elasticsearchpackage--><!--<dependency><groupId>org.elasticsearch.client</groupId><artifactId>transport</artifactId><version>5.2.2</version></dependency>--><dependency><groupId>org.elasticsearch.client</groupId><artifactId>transport</artifactId><version>6.0.0</version></dependency>

测试代码:

importjava.io.IOException;importjava.util.UUID;importjavax.annotation.Resource;importorg.elasticsearch.action.index.IndexResponse;importorg.elasticsearch.client.Client;importorg.elasticsearch.common.xcontent.XContentFactory;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.stereotype.Component;importcom.ad.base.AlbumIndexField;importcom.ad.utils.ConfigUtil;importcom.ad.utils.ExceptionUtil;importnet.sf.json.JSONObject;@ComponentpublicclassElasticSaveServiceImplimplementsElasticSaveService{privatestaticfinalLoggerlogger=LoggerFactory.getLogger(ElasticSearchServiceImpl.class);@Resource(type=ElasticSearchClient.class)privateElasticSearchClientelasticSearchClient;@Resource(type=ConfigUtil.class)privateConfigUtilconfigUtil;/***@return*/publicClientgetClient(StringclusterName){try{returnelasticSearchClient.getClient(clusterName);}catch(Exceptione){logger.error("ES获取client失败:"+ExceptionUtil.stackTrace(e));returnnull;}}@OverridepublicvoidexecuteSave(StringclusterName,Stringjson1){JSONObjectjson=JSONObject.fromObject(json1);StringdocumentId=(UUID.randomUUID().toString().replaceAll("-",""));IndexResponseresponse;try{response=this.getClient(clusterName).prepareIndex(configUtil.indexName,configUtil.indexType,documentId).setSource(XContentFactory.jsonBuilder().startObject().field(AlbumIndexField.FID,json.getString("fid")).endObject()).get();}catch(IOExceptione){logger.error("===AdInfoConsumerconsumerisexception",e);//e.printStackTrace();}}}