实战 | 使用Spring Boot + Elasticsearch + Logstash 实现图书查询服务
mhr18 2024-11-30 12:30 17 浏览 0 评论
前面我们介绍了Spring Boot 整合 Elasticsearch 实现数据查询检索的功能,在实际项目中,我们的数据一般存储在数据库中,而且随着业务的发送,数据也会随时变化。
那么如何保证数据库中的数据与Elasticsearch存储的索引数据保持一致呢? 最原始的方案就是:当数据发生增删改操作时同步更新Elasticsearch。但是这样的设计耦合太高。接下来我们介绍一种非常简单的数据同步方式:Logstash 数据同步。
一、Logstash简介
什么是Logstash
logstash是一个开源的服务器端数据处理工具。简单来说,就是一根具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可以让你根据自己的需求在中间加上滤网,Logstash提供里很多功能强大的滤网以满足你的各种应用场景。
Logstash常用于日志系统中做日志采集设备,最常用于ELK中作为日志收集器使用。
2.Logstash的架构原理
Logstash的基本流程架构:input=》 filter =》 output 。
- input(输入):采集各种样式,大小和来源数据,从各个服务器中收集数据。常用的有:jdbc、file、syslog、redis等。
- filter(过滤器)负责数据处理与转换。主要是将event通过output发出之前对其实现的某些处理功能。
- output(输出):将我们过滤出的数据保存到那些数据库和相关存储中,。
3.Logstash如何与Elasticsearch数据同步
实际项目中,我们不可能通过手动添加的方式将数据插入索引库,所以需要借助第三方工具,将数据库的数据同步到索引库。此时,Logstash出现了,它可以将不同数据库的数据同步到Elasticsearch中。保证数据库与Elasticsearch的数据保持一致。
目前支持数据库与ES数据同步的插件有很多,个人认为Logstash是众多同步mysql数据到es的插件中,最稳定并且最容易配置的一个。
二、安装Logstash
Logstash的使用方法也很简单,下面讲解一下,Logstash是如何使用的。需要说明的是:这里以windows 环境为例,演示Logstash的安装和配置。
1.下载Logstash
首先,下载对应版本的Logstash包,可以通过上面提供下载elasticsearch的地址进行下载,完成后解压。
上面是Logstash解压后的目录,我们需要关注是bin目录中的执行文件和config中的配置文件。一般生产情况下,会使用Linux服务器,并且会将Logstash配置成自启动的服务。这里测试的话,直接启动。
2.配置Logstash
接下来,配置Logstash。需要我们编写配置文件,根据官网和网上提供的配置文件,将其进行修改。
第一步:在Logstash根目录下创建mysql文件夹,添加mysql.conf配置文件,配置Logstash需要的相应信息,具体配置如下:
input {
stdin {
}
jdbc {
# mysql数据库连接
jdbc_connection_string => "jdbc:mysql://localhost:3306/book_test?characterEncoding=utf-8&useSSL=false&serverTimezone=UTC"
# mysqly用户名和密码
jdbc_user => "root"
jdbc_password => "root"
# 驱动配置
jdbc_driver_library => "C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\mysql-connector-java-8.0.20.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
#jdbc_paging_enabled => "true"
#jdbc_page_size => "50000"
jdbc_default_timezone => "Asia/Shanghai"
# 执行指定的sql文件
statement_filepath => "C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\sql\bookquery.sql"
use_column_value => true
# 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false);
lowercase_column_names => false
# 需要记录的字段,用于增量同步,需是数据库字段
tracking_column => updatetime
# Value can be any of: numeric,timestamp,Default value is "numeric"
tracking_column_type => timestamp
# record_last_run上次数据存放位置;
record_last_run => true
#上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
last_run_metadata_path => "C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\sql\logstash_default_last_time.log"
# 是否清除last_run_metadata_path的记录,需要增量同步时此字段必须为false;
clean_run => false
# 设置监听 各字段含义 分 时 天 月 年 ,默认全部为*代表含义:每分钟都更新
schedule => "* * * * *"
# 索引类型
type => "id"
}
}
output {
elasticsearch {
#es服务器
hosts => ["10.2.1.231:9200"]
#ES索引名称
index => "book"
#自增ID
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}
第二步:将mysql-connector-java.jar 拷贝到前面配置的目录下。上面的mysql.conf配置的是:C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\mysql-connector-java-8.0.20.jar。那么jar包拷贝到此目录下即可:
上面是mysql的驱动,如果是sqlserver数据库,下载SqlServer对应的驱动即可。放置的位置要与mysql.conf 配置文件中的jdbc_driver_library 地址保持一致。
第三步:创建sql目录,创建bookquery.sql文件用于保存需要执行的sql 脚本。示例代码如下:
select * from book where updatetime >= :sql_last_value
order by updatetime desc
这里使用的增量更新,所以使用:sql_last_value 记录上一次记录的最后时间。
3.启动Logstash
进入logstash的bin目录,执行如下命令:
logstash.bat -f C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\mysql.conf
启动成功之后,Logstash就会自动定时将数据写入到Elasticsearch。如下图所示:
同步完成后,我们使用Postman查询Elasticsearch,验证索引是否都创建成功。在postman中,发送 Get 请求:http://10.2.1.231:9200/book/_search 。返回结果如下图所示:
可以看到,数据库中的数据已经通过Logstash同步至Elasticsearch。说明Logstash配置成功。
三、创建查询服务
数据同步完成后,接下来我们使用Spring Boot 构建Elasticsearch查询服务。首先创建Spring Boot项目并整合Elasticsearch,这个之前都已经介绍过,不清楚的朋友可以看我之前的文章。
接下来演示如何封装完整的数据查询服务。
1.数据实体
@Document( indexName = "book" , replicas = 0)
public class Book {
@Id
private Long id;
@Field(analyzer = "ik_max_word",type = FieldType.Text)
private String bookName;
@Field(analyzer = "ik_max_word",type = FieldType.Text)
private String author;
private float price;
private int page;
@Field(type = FieldType.Date,format = DateFormat.custom,pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
private Date createTime;
@Field(type = FieldType.Date,format = DateFormat.custom,pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
private Date updateTime;
@Field(analyzer = "ik_max_word",type = FieldType.Text)
private String category;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getBookName() {
return bookName;
}
public void setBookName(String bookName) {
this.bookName = bookName;
}
public String getAuthor() {
return author;
}
public void setAuthor(String author) {
this.author = author;
}
public float getPrice() {
return price;
}
public void setPrice(float price) {
this.price = price;
}
public int getPage() {
return page;
}
public void setPage(int page) {
this.page = page;
}
public String getCategory() {
return category;
}
public void setCategory(String category) {
this.category = category;
}
public Book(){
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
}
2.请求封装类
public class BookQuery {
public String category;
public String bookName;
public String author;
public int priceMin;
public int priceMax;
public int pageMin;
public int pageMax;
public String sort;
public String sortType;
public int page;
public int limit;
}
3.创建Controller控制器
@RestController
public class ElasticSearchController {
@Autowired
private ElasticsearchRestTemplate elasticsearchRestTemplate;
/**
* 查询信息
* @param
* @return
*/
@PostMapping(value = "/book/query")
public JSONResult query(@RequestBody BookQuery bookQuery){
Query query= getQueryBuilder(bookQuery);
SearchHits<Book> searchHits = elasticsearchRestTemplate.search(query, Book.class);
List<SearchHit<Book>> result = searchHits.getSearchHits();
return JSONResult.ok(result);
}
public Query getQueryBuilder(BookQuery query) {
BoolQueryBuilder builder = boolQuery();
// 匹配器 模糊查询部分,分析器使用ik (ik_max_word)
List<QueryBuilder> must = builder.must();
if (query.getBookName()!=null && !query.getBookName().isEmpty())
must.add(wildcardQuery("bookName", "*" +query.getBookName()+ "*"));
if (query.getCategory()!=null && !query.getCategory().isEmpty())
must.add(wildcardQuery("category", "*" +query.getCategory()+ "*"));
if (query.getAuthor()!=null && !query.getAuthor().isEmpty())
must.add(wildcardQuery("author", "*" +query.getAuthor()+ "*"));
// 筛选器 精确查询部分
List<QueryBuilder> filter = builder.filter();
// 范围查询
if (query.getPriceMin()>0 && query.getPriceMax()>0) {
RangeQueryBuilder price = rangeQuery("price").gte(query.getPriceMin()).lte(query.getPriceMax());
filter.add(price);
}
// 范围查询
if (query.getPageMin()>0 && query.getPageMax()>0) {
RangeQueryBuilder page = rangeQuery("page").gte(query.getPageMin()).lte(query.getPageMax());
filter.add(page);
}
// 分页
PageRequest pageable = PageRequest.of(query.getPage() - 1, query.getLimit());
// 排序
SortBuilder sort = SortBuilders.fieldSort("price").order(SortOrder.DESC);
//设置高亮效果
String preTag = "<font color='#dd4b39'>";//google的色值
String postTag = "</font>";
HighlightBuilder.Field highlightFields = new HighlightBuilder.Field("category").preTags(preTag).postTags(postTag);
Query searchQuery = new NativeSearchQueryBuilder()
.withQuery(builder)
.withHighlightFields(highlightFields)
.withPageable(pageable)
.withSort(sort)
.build();
return searchQuery;
}
}
4.测试验证
启动项目,在Postman中,请求http://localhost:8080/book/query 接口查询书籍信息数据。查看接口返回情况。
我们看到接口成功返回数据。说明数据查询服务创建成功。
最后
以上,我们就把使用Spring Boot + Elasticsearch + Logstash 实现完整的数据查询检索服务介绍完了。
相关推荐
- 【推荐】一个开源免费、AI 驱动的智能数据管理系统,支持多数据库
-
如果您对源码&技术感兴趣,请点赞+收藏+转发+关注,大家的支持是我分享最大的动力!!!.前言在当今数据驱动的时代,高效、智能地管理数据已成为企业和个人不可或缺的能力。为了满足这一需求,我们推出了这款开...
- Pure Storage推出统一数据管理云平台及新闪存阵列
-
PureStorage公司今日推出企业数据云(EnterpriseDataCloud),称其为组织在混合环境中存储、管理和使用数据方式的全面架构升级。该公司表示,EDC使组织能够在本地、云端和混...
- 对Java学习的10条建议(对java课程的建议)
-
不少Java的初学者一开始都是信心满满准备迎接挑战,但是经过一段时间的学习之后,多少都会碰到各种挫败,以下北风网就总结一些对于初学者非常有用的建议,希望能够给他们解决现实中的问题。Java编程的准备:...
- SQLShift 重大更新:Oracle→PostgreSQL 存储过程转换功能上线!
-
官网:https://sqlshift.cn/6月,SQLShift迎来重大版本更新!作为国内首个支持Oracle->OceanBase存储过程智能转换的工具,SQLShift在过去一...
- JDK21有没有什么稳定、简单又强势的特性?
-
佳未阿里云开发者2025年03月05日08:30浙江阿里妹导读这篇文章主要介绍了Java虚拟线程的发展及其在AJDK中的实现和优化。阅前声明:本文介绍的内容基于AJDK21.0.5[1]以及以上...
- 「松勤软件测试」网站总出现404 bug?总结8个原因,不信解决不了
-
在进行网站测试的时候,有没有碰到过网站崩溃,打不开,出现404错误等各种现象,如果你碰到了,那么恭喜你,你的网站出问题了,是什么原因导致网站出问题呢,根据松勤软件测试的总结如下:01数据库中的表空间不...
- Java面试题及答案最全总结(2025版)
-
大家好,我是Java面试陪考员最近很多小伙伴在忙着找工作,给大家整理了一份非常全面的Java面试题及答案。涉及的内容非常全面,包含:Spring、MySQL、JVM、Redis、Linux、Sprin...
- 数据库日常运维工作内容(数据库日常运维 工作内容)
-
#数据库日常运维工作包括哪些内容?#数据库日常运维工作是一个涵盖多个层面的综合性任务,以下是详细的分类和内容说明:一、数据库运维核心工作监控与告警性能监控:实时监控CPU、内存、I/O、连接数、锁等待...
- 分布式之系统底层原理(上)(底层分布式技术)
-
作者:allanpan,腾讯IEG高级后台工程师导言分布式事务是分布式系统必不可少的组成部分,基本上只要实现一个分布式系统就逃不开对分布式事务的支持。本文从分布式事务这个概念切入,尝试对分布式事务...
- oracle 死锁了怎么办?kill 进程 直接上干货
-
1、查看死锁是否存在selectusername,lockwait,status,machine,programfromv$sessionwheresidin(selectsession...
- SpringBoot 各种分页查询方式详解(全网最全)
-
一、分页查询基础概念与原理1.1什么是分页查询分页查询是指将大量数据分割成多个小块(页)进行展示的技术,它是现代Web应用中必不可少的功能。想象一下你去图书馆找书,如果所有书都堆在一张桌子上,你很难...
- 《战场兄弟》全事件攻略 一般事件合同事件红装及隐藏职业攻略
-
《战场兄弟》全事件攻略,一般事件合同事件红装及隐藏职业攻略。《战场兄弟》事件奖励,事件条件。《战场兄弟》是OverhypeStudios制作发行的一款由xcom和桌游为灵感来源,以中世纪、低魔奇幻为...
- LoadRunner(loadrunner录制不到脚本)
-
一、核心组件与工作流程LoadRunner性能测试工具-并发测试-正版软件下载-使用教程-价格-官方代理商的架构围绕三大核心组件构建,形成完整测试闭环:VirtualUserGenerator(...
- Redis数据类型介绍(redis 数据类型)
-
介绍Redis支持五种数据类型:String(字符串),Hash(哈希),List(列表),Set(集合)及Zset(sortedset:有序集合)。1、字符串类型概述1.1、数据类型Redis支持...
- RMAN备份监控及优化总结(rman备份原理)
-
今天主要介绍一下如何对RMAN备份监控及优化,这里就不讲rman备份的一些原理了,仅供参考。一、监控RMAN备份1、确定备份源与备份设备的最大速度从磁盘读的速度和磁带写的带度、备份的速度不可能超出这两...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- oracle位图索引 (63)
- oracle批量插入数据 (62)
- oracle事务隔离级别 (53)
- oracle 空为0 (50)
- oracle主从同步 (55)
- oracle 乐观锁 (51)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)