Rust Web编程:第十八章 使用 Redis 对任务进行排队
mhr18 2024-11-23 19:22 19 浏览 0 评论
接收请求、执行操作,然后向用户返回响应可以解决 Web 编程中的许多问题。 然而,有时这种简单的方法根本无法解决问题。 例如,当我在 MonolithAi 工作时,我们有一个功能,用户可以输入数据和参数,然后单击按钮即可根据该数据训练机器学习模型。 然而,在向用户发送响应之前尝试训练机器学习模型会花费太长时间。 连接可能会超时。 为了解决这个问题,我们有一个 Redis 队列和一个消耗任务的工作线程池。 训练任务将被放入队列中,其中一名工作人员将在有空时开始训练模型。 HTTP 服务器将接受用户的请求,将训练任务发布到队列中,并向用户响应任务已发布。 当模型被训练时,用户会得到更新。 另一个例子是食品订购应用程序,其中食品订单经历一系列步骤,例如确认订单、处理订单、然后交付订单。
考虑到 MonolithAi 的例子,不难看出为什么学习如何在 Web 编程中实现排队不仅有用,而且还为开发人员提供了另一种解决方案,增加了他们可以解决的问题数量。
在本章中,我们将讨论以下主题:
布置排队项目,描述所需的组件和方法
构建 HTTP 服务器
建立投票工作人员
让我们的应用程序与 Redis 一起运行
为工人定义任务
为 Redis 队列定义消息
在 HTTP 服务器中集成路由
在 Docker 中运行所有服务器和工作人员
到本章结束时,您将能够构建一个 Rust 程序,该程序可以是工作程序,也可以是服务器程序,具体取决于传递给它的环境变量。 您还可以以不同结构的形式序列化一系列任务,并将它们插入到 Redis 队列中,从而使这些结构能够在不同的服务器之间排队和传输。 这不仅为您提供了实现队列的技能,还可以利用 Redis 来实现许多其他解决方案,例如多个服务器通过 Redis 发布/订阅通道通过广播接收消息。
技术要求
在本章中,我们将纯粹关注如何在 Redis 队列上使用 Tokio 和 Hyper 构建工作线程。 因此,在构建我们自己的新服务器时,我们将不会依赖任何以前的代码。
分解我们的项目
在我们的系统中,我们有一系列需要执行的任务。 然而,这些任务需要很长时间才能完成。 如果我们只有一个普通的服务器来处理任务,服务器最终将被阻塞,并且多个用户将收到延迟的体验。 如果任务太长,则用户的连接可能会超时。
为了避免在需要长时间任务时降低用户体验,我们使用了排队系统。 这是 HTTP 服务器接收来自用户的请求的地方。 然后,与请求关联的长任务被发送到先进先出队列,由工作池处理。 由于任务在队列中,因此 HTTP 服务器除了响应用户任务已发送且请求已被处理之外,无能为力。 由于流量的起伏,当流量较低时,我们不需要所有的工作人员和 HTTP 服务器。 但是,当流量增加时,我们需要创建并连接额外的 HTTP 服务器和工作线程,如下图所示:
考虑到上图,我们将需要以下基础设施:
Redis数据库:用于存储队列中的任务
HTTP服务器:将任务发送到队列中进行处理
Worker:从队列中拉取/弹出/轮询/处理任务
我们可以为工作人员和 HTTP 服务器构建单独的应用程序。 然而,这会增加复杂性而没有任何好处。 对于两个独立的应用程序,我们必须维护两个独立的 Docker 镜像。 我们还会重复大量代码,因为 HTTP 服务器发送到 Redis 队列的任务必须与工作线程拾取和处理的任务相同。 对于特定任务,从 HTTP 服务器传递到工作人员的字段之间可能会不匹配。 我们可以通过使用具有一系列输入字段的任务结构和一个使用这些字段执行任务的运行函数来防止这种不匹配。 这些任务结构的序列化特征使我们能够通过队列传递字段并接收它们。
在构建 HTTP 服务器和工作线程时,我们可以构建服务器,以便在程序启动时检查环境变量。 如果环境变量声明应用程序是工作线程,则应用程序可以启动轮询队列的参与者。 如果环境变量声明应用程序是 HTTP 服务器,则应用程序可以运行 HTTP 服务器并侦听请求。
对于我们的任务队列项目,我们有以下概要:
├── Cargo.toml
├── docker-compose.yml
└── src
├── main.rs
└── tasks
├── add.rs
├── mod.rs
├── multiply.rs
└── subtract.rs
我们将在 src/main.rs 文件中定义服务器入口点。 然后,我们将在 src/tasks/ 目录中定义任务结构。 就 Cargo.toml 文件中的依赖项而言,我们有以下内容:
[dependencies]
bincode = "1.0"
bytes = "1.2.1"
redis = "0.22.1"
serde_json = "1.0.86"
tokio = { version = "1", features = ["full"] }
hyper = { version = "0.14.20", features = ["full"] }
serde = { version = "1.0.136", features = ["derive"] }
除了字节和二进制码包之外,这些依赖项对您来说都不应该是新的。 我们将使用字节将结构转换为 HTTP 响应,并使用 bincode 将结构序列化为二进制以存储在 Redis 中。
通过我们在本节中刚刚提出的方法,我们将能够构建一个简单的任务处理队列,在其中我们可以确保服务器和工作人员之间的任务定义始终保持同步。 定义了我们的方法后,我们可以继续执行任务旅程的第一部分,即 HTTP 服务器。
构建 HTTP 服务器
对于我们的 HTTP 服务器,我们需要执行以下步骤:
定义一个反序列化 HTTP 请求正文的结构体。
定义一个处理传入请求的函数。
根据环境变量定义程序运行的路径。
运行一个监听传入请求的服务器。
我们不会为每个步骤划分单独的部分,因为我们已经在上一章中介绍了所有这些步骤/过程。 在执行所有步骤之前,我们必须将以下内容导入到 src/main.rs 文件中:
use hyper::{Body, Request, Response, Server};
use hyper::body;
use hyper::service::{make_service_fn, service_fn};
use std::net::SocketAddr;
use std::env;
use serde::{Serialize, Deserialize};
use serde_json;
use bytes::{BufMut, BytesMut};
除了 bytes 导入之外,您应该熟悉所有这些导入,我们将在定义 HTTP 句柄函数时介绍这些导入。 首先,我们将使用以下代码定义一个简单的结构来序列化传入的 HTTP 请求主体:
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IncomingBody {
pub one: String,
pub two: i32
}
这与我们的 Actix Web 应用程序的方法相同。 我们将能够使用序列化和反序列化特征来注释我们的任务结构。
现在我们已经定义了 IncomingBody 结构体,我们可以使用以下代码定义我们的句柄函数:
async fn handle(req: Request<Body>) ->
Result<Response<Body>, &'static str> {
let bytes = body::to_bytes(req.into_body()).await
.unwrap();
let response_body: IncomingBody =
serde_json::from_slice(&bytes).unwrap();
let mut buf = BytesMut::new().writer();
serde_json::to_writer(&mut buf,
&response_body).unwrap();
Ok(Response::new(Body::from(buf.into_inner().freeze())))
}
必须注意的是,我们在返回主体时调用了 freeze 函数。 此冻结函数将可变字节转换为不可变字节,从而防止任何缓冲区修改。 在这里,我们可以看到我们正在接受带有请求的通用主体。 然后,我们可以使用 serde 序列化主体,并使用 BytesMut 结构(本质上只是一个连续的内存片段)将主体返回给用户,本质上创建了一个回显服务器。
我们现在可以使用以下代码定义 main 函数,它是入口点:
#[tokio::main]
async fn main() {
let app_type = env::var("APP_TYPE").unwrap();
match app_type.as_str() {
"server" => {
. . .
},
"worker" => {
println!("worker not defined yet");
}
_ => {
panic!("{} app type not supported", app_type);
}
}
}
这里我们可以看到环境变量“APP_TYPE”被提取出来。 根据应用程序类型的不同,我们会执行不同的代码块。 现在,如果应用程序类型是“worker”,我们将只打印一条声明,表明未定义worker。 我们还声明,如果应用程序类型既不是“服务器”也不是“工作人员”类型,则程序将会出现恐慌。
在我们的服务器块中,我们使用以下代码定义了 addr 和 server:
let addr = SocketAddr::from(([0, 0, 0, 0], 3000));
let server = Server::bind(&addr).serve(make_service_fn( |_conn| {
async {
Ok::<_, hyper::Error>(service_fn( move |req| {
async {handle(req).await}
}))
}
}));
if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
这与上一章中的服务器代码非常相似。
然后我们使用以下命令运行服务器:
APP_TYPE=server cargo run
然后我们可以发送以下请求:
在这里,我们可以看到我们的服务器正在工作并回显发送到服务器的相同正文。 我们现在可以继续构建我们的工作应用程序。
构建投票工作人员
我们的工作人员本质上是在 Redis 中循环和轮询队列。 如果队列中有消息,worker就会执行从队列中提取的任务。 为了构建轮询工作器部分,工作器将创建一个结构体,将该结构体插入到 Redis 队列中,然后从队列中提取插入的结构体以进行打印。 这不是我们想要的行为,但这确实意味着我们可以测试以了解队列插入如何快速工作。 到本章结束时,我们的 HTTP 服务器将插入任务,而我们的工作线程将消耗任务。
我们不希望工作线程不间断地不断轮询 Redis 队列。 为了将轮询降低到合理的速率,我们需要让工作线程在每个循环期间休眠。 因此,我们必须在 src/main.rs 文件中导入以下内容,以使我们的工作线程休眠:
use std::{thread, time};
现在,我们可以转到运行工作程序的部分,在主函数的以下部分中定义我们的工作程序代码:
match app_type.as_str() {
"server" => {
. . .
},
"worker" => {
// worker code is going to be inserted here
. . .
}
_ => {
panic!("{} app type not supported", app_type);
}
}
我们的工人代码采用以下总体轮廓:
let client =
redis::Client::open("redis://127.0.0.1/").unwrap();
loop {
. . .
}
在这里,我们可以看到我们定义了 Redis 客户端,然后无限循环地运行工作线程。 在此循环中,我们将与 Redis 建立连接,轮询 Redis 中的队列,然后删除连接。 我们可以在循环中建立和删除连接,因为该任务将花费很长时间。 在整个任务期间保持 Redis 连接是没有意义的。
不幸的是,在撰写本书时,Rust Redis 箱还没有简单的队列实现。 然而,这不应该阻碍我们。 如果我们知道让 Redis 实现我们的队列所需的原始命令,我们就可以实现我们自己的队列。 Redis 的性能类似于 SQL 数据库。 如果您了解这些命令,您就可以像 SQL 一样实现自己的逻辑。
在无限循环中,我们将创建一个实现了序列化和反序列化特征的通用结构,然后使用以下代码将该结构序列化为二进制:
let body = IncomingBody{one: "one".to_owned(), two: 2};
let bytes = bincode::serialize(&body).unwrap();
我们的结构现在是一个字节向量。 然后我们将与Redis建立连接,并使用“LPUSH”命令将“some_queue”推送到队列,这会将值插入到队列的头部,代码如下:
let outcome: Option<Vec<u8>>;
{
let mut con = client.get_connection().unwrap();
let _ : () = redis::cmd("LPUSH").arg("some_queue")
.arg(bytes.clone())
.query(&mut con)
.unwrap();
// pop our task from the queue
outcome = redis::cmd("LPOP").arg("some_queue")
.query(&mut con)
.unwrap();
}
我们有 Option<Vec<u8>> 因为队列中可能没有任何内容。 如果队列中没有任何内容,则结果将为“无”。 现在,我们永远不会得到一个“无”,因为我们在从队列中提取任务之前直接将任务插入队列中。 然而,在流量较低的时期,我们的工作人员将轮询可能会空一段时间的队列。
现在我们有了结果,我们可以使用以下匹配语句来处理它:
match outcome {
Some(data) => {
. . .
},
None => {
. . .
}
}
如果我们有一些数据,我们只需反序列化二进制数据并使用以下代码打印出结构体:
let deserialized_struct: IncomingBody =
bincode::deserialize(&data).unwrap();
println!("{:?}", deserialized_struct);
如果队列中没有任何内容,则结果为 None,我们可以在使用以下代码再次运行循环之前休眠五秒钟:
let five_seconds = time::Duration::from_secs(5);
tokio::time::sleep(five_seconds).await;
这样,我们的工人就可以接受测试了。 在构建这样的异步程序时,您总是可以做更多的事情。 然而,为了避免本章臃肿,我们将坚持我们的基本应用程序。 如果您想进一步了解 Redis,您可以研究构建一个发布/订阅系统,其中一个工作人员不断轮询队列,而其他工作人员则关闭,并由一个参与者侦听通道上的消息。 当主worker收到新任务时,主worker可以向通道发布消息,唤醒其他worker。 如果你真的想推动自己,你可以研究 Kubernetes 控制器,让一个主工作线程启动并根据流量销毁工作节点。 然而,这些项目将超出本书的范围。
为了让我们的应用程序在一章的范围内工作,我们必须继续让我们的应用程序与 Redis 一起运行。
让我们的应用程序与 Redis 一起运行
在本地使用 Redis 运行我们的应用程序需要我们将 Redis 与 Docker 结合使用,将 APP_TYPE 环境变量导出为“worker”,然后使用 Cargo 运行我们的应用程序。 对于我们的 Redis,我们的 docker-compose.yml 文件采用以下形式:
version: "3.7"
services:
redis:
container_name: 'queue-redis'
image: 'redis'
ports:
- '6379:6379'
然后我们可以使用以下命令导出 APP_TYPE 环境变量:
export APP_TYPE=worker
然后我们可以使用以下命令运行我们的应用程序:
cargo run
当我们运行我们的应用程序时,我们将得到以下打印输出:
IncomingBody { one: "one", two: 2 }
IncomingBody { one: "one", two: 2 }
IncomingBody { one: "one", two: 2 }
IncomingBody { one: "one", two: 2 }
. . .
IncomingBody 结构的打印输出将是无限的,因为我们正在运行无限循环。 然而,这表明以下机制正在运行和工作:
尽管我们的工作线程正在使用 Redis 队列,但它只是打印出放入 Redis 队列的结构。 在下一节中,我们将功能构建到插入 Redis 队列的结构中,以便我们的工作人员可以执行任务。
为worker定义任务
当谈到运行我们的任务时,我们需要字段,以便我们可以将它们作为输入传递给正在运行的任务。 我们的任务还需要一个运行函数,这样我们就可以选择何时运行任务,因为运行任务需要很长时间。 我们可以使用以下代码在 src/tasks/add.rs 文件中定义基本的添加任务:
use std::{thread, time};
use serde::{Serialize, Deserialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AddTask {
pub one: i32,
pub two: i32
}
impl AddTask {
pub fn run(self) -> i32 {
let duration = time::Duration::from_secs(20);
thread::sleep(duration);
return self.one + self.two
}
}
这些代码都不应该令人震惊。 我们将实现序列化和反序列化特征,以便我们可以将任务插入到 Redis 队列中。 然后我们可以使用睡眠函数来模拟长时间任务。 最后,我们只需将两个数字相加即可。 对于 src/tasks/multiply.rs 文件中的任务,run 函数采用以下形式:
impl MultiplyTask {
pub fn run(self) -> i32 {
let duration = time::Duration::from_secs(20);
thread::sleep(duration);
return self.one * self.two
}
}
发现 src/tasks/subtract.rs 文件中的 run 函数具有以下结构并不奇怪:
impl SubtractTask {
pub fn run(self) -> i32 {
let duration = time::Duration::from_secs(20);
thread::sleep(duration);
return self.one - self.two
}
}
现在,我们想要实现一个任务,看看是否可以从 Redis 队列中提取任务结构并运行它。 我们使用 src/tasks/mod.rs 文件中的以下代码使任务可以从模块访问:
pub mod add;
pub mod multiply;
pub mod subtract;
在我们的 src/main.rs 文件中,我们最初使用以下代码导入任务:
mod tasks;
use tasks::{
add::AddTask,
subtract::SubtractTask,
multiply::MultiplyTask
};
我们现在可以在工作代码块中实现我们的任务之一。 在此工作代码块的开头,我们将使用以下代码将 IncomingBody 结构与 AddTask 结构交换:
let body = AddTask{one: 1, two: 2};
除了我们对结果匹配语句的 Some 部分所做的操作之外,不需要更改其他任何内容,该部分现在采用以下形式:
let deserialized_struct: AddTask =
bincode::deserialize(&data).unwrap();
println!("{:?}", deserialized_struct.run());
在这里,我们可以看到我们将二进制数据反序列化为 AddTask 结构,运行 run 函数,然后打印结果。 在实际应用程序中,我们会将结果插入数据库或使用 HTTP 将结果发送到另一台服务器。 然而,在本章中,我们只想了解排队任务是如何执行的。 我们在书中多次介绍了数据库插入和 HTTP 请求。
如果我们现在运行我们的工作应用程序,我们将得到 15 秒的延迟,然后得到以下打印输出:
3
如果再等 15 秒,我们将得到另一个相同的打印输出。 这表明我们的任务正在从 Redis 队列中提取、反序列化,然后以与我们期望的完全相同的方式运行,因为一加二等于三。 然而,这里有一个问题。 我们只能发送和接收 AddTask 结构。 这没有用,因为我们还有另外两项任务,并且我们希望支持所有这些任务。 因此,我们必须继续定义可以支持一系列任务的消息。
为 Redis 队列定义消息
为了支持多个任务,我们必须采用两步方法来打包要插入到 Redis 队列中的任务。 这意味着我们将把任务结构序列化为 Vec<u8>,然后将此字节向量添加到另一个结构中,该结构有一个字段表示消息中任务的类型。 我们可以通过首先使用以下代码在 src/tasks/mod.rs 文件中导入序列化和反序列化特征来定义此过程:
use serde::{Serialize, Deserialize};
然后我们可以使用以下代码定义枚举任务类型和消息结构:
#[derive(Debug, Clone, Serialize, Deserialize)]
use add::AddTask;
use multiply::MultiplyTask;
use subtract::SubtractTask;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum TaskType {
ADD(AddTask),
MULTIPLY(MultiplyTask),
SUBTRACT(SubtractTask)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskMessage {
pub task: TaskType
}
我们的消息结构现在已准备好打包一系列要插入到 Redis 队列中的任务。 在我们的 src/main.rs 文件中,我们可以使用以下代码导入 TaskType 和 TaskMessage 结构:
mod tasks;
use tasks::{
add::AddTask,
TaskType,
TaskMessage
};
我们现在准备重写工作代码块中的无限循环。 我们首先创建AddTask,序列化AddTask,然后将这个序列化的任务打包到TaskMessage中,代码如下:
let body = AddTask{one: 1, two: 2};
let message = TaskMessage{task: TaskType::ADD(body)};
let serialized_message = bincode::serialize(&message).unwrap();
然后我们将建立一个 Redis 连接,然后使用以下代码将序列化的消息推送到 Redis 队列:
let mut con = client.get_connection().unwrap();
let _ : () = redis::cmd("LPUSH").arg("some_queue")
.arg(serialized_message
.clone())
.query(&mut con).unwrap();
然后,我们将从 Redis 队列中弹出任务并使用以下代码删除连接:
let outcome: Option<Vec<u8>> =
redis::cmd("RPOP").arg("some_queue").query(&mut con)
.unwrap();
std::mem::drop(con);
我们现在将 TaskMessage 结构移入和移出 Redis 队列。 如果有 TaskMessage,我们必须对其进行处理。 在 Results 的 Some 语句的 match 块中,我们必须反序列化从 Redis 队列中获取的字节,然后使用以下代码匹配任务类型:
let deserialized_message: TaskMessage =
bincode::deserialize(&data).unwrap();
match deserialized_message.task {
TaskType::ADD(task) => {
println!("{:?}", task.run());
},
TaskType::MULTIPLY(task) => {
println!("{:?}", task.run());
},
TaskType::SUBTRACT(task) => {
println!("{:?}", task.run());
}
}
现在,这使我们能够处理从 Redis 队列中提取并运行的各个任务。
我们的工作人员现在支持我们的所有三项任务! 然而,我们目前只是创建消息,然后直接在worker中消费这些消息。 我们需要使 HTTP 服务器能够接受一系列不同的请求,以将一系列不同的任务发送到 Redis 队列以供工作人员使用。
在 HTTP 服务器中集成路由
我们现在正处于让 HTTP 服务器接受传入请求以根据 URI 来创建一系列任务的阶段。 为了让我们的 HTTP 支持多个任务,我们本质上必须重写 src/main.rs 文件中的句柄函数。 在我们重写 main 函数之前,我们必须使用以下代码导入我们需要的内容:
use hyper::body;
use hyper::http::StatusCode;
我们导入这些东西是因为如果传递了错误的 URI,我们将返回 NOT_FOUND 状态代码。 我们还将从传入请求的正文中提取数据。 在重构句柄函数之前,我们需要更改 IncomingBody 结构以接受以下形式的两个整数:
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IncomingBody {
pub one: i32,
pub two: i32
}
在我们的句柄函数中,我们可以定义我们的 Redis 客户端,通过删除尾部斜杠来清理我们的 URI,并使用以下代码从传入请求中提取数据:
let client =
redis::Client::open("redis://127.0.0.1/").unwrap();
let task_type = req.uri().to_string().replace("/", "")"");
let body_bytes =
body::to_bytes(req.into_body()).await.unwrap();
let body: IncomingBody =
_json::from_slice(&body_bytes).unwrap();
我们可以看到我们可以从 URI 中提取任务类型。 现在,我们将支持加法、减法和乘法。 现在我们已经从传入的请求中获得了所需的一切; 我们可以使用以下代码根据 URI 构造适当的任务:
let message_type: TaskType;
match task_type.as_str() {
"add" => {
let body = AddTask{one: body.one,
two: body.two};
message_type = TaskType::ADD(body);
},
"multiply" => {
let body = MultiplyTask{one: body.one,
two: body.two};
message_type = TaskType::MULTIPLY(body);
},
"subtract" => {
let body = SubtractTask{one: body.one,
two: body.two};
message_type = TaskType::SUBTRACT(body);
},
_ => {
. . .
}
}
我们可以看到,无论任务是什么,我们都需要将任务结构打包到 TaskType 枚举中,该枚举可以序列化为二进制向量,以便将消息发送到 Redis 队列。 对于 match 语句的最后一部分,它捕获所有与“add”、“multiply”或“subtract”不匹配的任务请求,我们仅返回一个 NOT_FOUND HTTP 响应,代码如下:
let response =
Response::builder().status(StatusCode::NOT_FOUND)
.body(Body::from("task not found"));
return Ok(response.unwrap())
现在,我们已拥有创建可插入 Redis 队列的通用任务消息所需的一切。 有了这些信息,我们就可以创建 TaskMessage 结构并在我们刚刚用以下代码覆盖的 match 语句之后序列化 TaskMessage:
let message = TaskMessage{task_type: message_type,
task: bytes};
let serialized_message =
bincode::serialize(&message).unwrap();
然后,我们将建立一个 Redis 连接,将序列化的消息推送到 Redis 队列,然后使用以下代码删除 Redis 连接:
let mut con = client.get_connection().unwrap();
let _ : () = redis::cmd("LPUSH").arg("some_queue")
.arg(serialized_message
.clone())
.query(&mut con).unwrap();
最后,我们返回一个 Ok HTTP 响应,表明任务已发送,代码如下:
Ok(Response::new(Body::from("task sent")))
我们的句柄功能现在已经完成。 我们现在需要做的就是删除工作代码块中将 AddTask 结构插入 Redis 队列的代码。 我们将从工作人员代码块中删除任务插入代码,因为我们不再需要工作人员插入任务。 插入代码的删除采用以下形式:
let client =
redis::Client::open("redis://127.0.0.1/").unwrap();
loop {
let outcome: Option<Vec<u8>> = {
let mut con = client.get_connection()
.unwrap();
redis::cmd("RPOP").arg("some_queue")
.query(&mut con)
.unwrap()
};
match outcome {
. . .
}
}
现在,我们已准备好将这些工作人员和 HTTP 服务器打包到 Docker 中,以便我们可以使用任意数量的工作人员来运行我们的应用程序。
全部在 Docker 中运行
我们现在正处于可以在 Docker 中运行整个应用程序的阶段。 这使我们能够让多个工作人员从同一个 Redis 队列中提取数据。 首先,我们需要定义 Dockerfile 来构建我们的工作/服务器镜像。 我们将使用以下代码为 Docker 构建进行无发行版构建:
FROM rust:1.62.1 as build
ENV PKG_CONFIG_ALLOW_CROSS=1
WORKDIR /app
COPY . .
cargo build --release
FROM gcr.io/distroless/cc-debian10
COPY --from=build /app/target/release/task_queue
/usr/local/bin/task_queue
EXPOSE 3000
ENTRYPOINT ["task_queue"]
在本书的这一点上,这种无发行版本应该不足为奇。 我们只是编译应用程序,然后将静态二进制文件复制到 distroless 映像中。 在以任何方式运行构建之前,我们必须确保不会将过多的文件从目标目录复制到我们的 Docker 构建中,并在 .dockerignore 文件中使用以下代码:
./target
.github
我们的构建现已准备就绪。 我们可以使用以下概要定义 docker-compose.yml:
version: "3.7"
services:
server_1:
. . .
worker_1:
. . .
worker_2:
. . .
worker_3:
. . .
redis:
container_name: 'queue-redis'
image: 'redis'
ports:
- '6379:6379'
在这里,我们可以看到我们有三个工作人员和一台服务器。 我们的服务器采用以下形式:
server_1:
container_name: server_1
image: server_1
build:
context: .
environment:
- 'APP_TYPE=server'
- 'REDIS_URL=redis://redis:6379'
depends_on:
redis:
condition: service_started
restart: on-failure
ports:
- "3000:3000"
expose:
- 3000
在这里,我们可以看到我们可以公开端口,指出构建上下文位于当前目录中,并且我们的容器应该在 Redis 启动后启动。
标准工作人员采用以下形式:
worker_1:
container_name: worker_1
image: worker_1
build:
context: .
environment:
- 'APP_TYPE=worker'
- 'REDIS_URL=redis://redis:'
depends_on:
redis:
condition: service_started
restart: on-failure
我们可以想象其他工人具有与前一个工人相同的结构,这是事实。 如果我们想添加另一个工作人员,我们可以使用与worker_1完全相同的规格,除非我们只是增加附加到图像和容器名称的数字,从而导致新工作人员被称为worker_2。 您可能已经注意到,我们已将 REDIS_URL 添加到环境变量中。 这是因为工作人员和服务器必须访问其容器外部的 Redis 数据库。 将 localhost 传递到 Redis 客户端将导致无法连接到 Redis。 因此,我们必须删除对 Redis 客户端的所有引用,并将这些引用替换为以下代码:
let client =
redis::Client::open(env::var("REDIS_URL").unwrap())
.unwrap();
如果我们现在启动 docker_compose 并向服务器发送一系列不同的 HTTP 请求,我们会得到以下打印输出:
. . .
queue-redis | 1:M 30 Oct 2022 18:42:52.334 *
RDB memory usage when created 0.85 Mb
queue-redis | 1:M 30 Oct 2022 18:42:52.334 *
Done loading RDB, keys loaded: 0, keys expired: 0.
queue-redis | 1:M 30 Oct 2022 18:42:52.334 *
DB loaded from disk: 0.002 seconds
queue-redis | 1:M 30 Oct 2022 18:42:52.334 *
Ready to accept connections
worker_1 | empty queue
worker_3 | empty queue
worker_1 | empty queue
worker_3 | empty queue
worker_2 | multiply: 9
worker_3 | multiply: 25
worker_1 | multiply: 8
worker_3 | empty queue
worker_3 | empty queue
worker_2 | multiply: 4
worker_2 | empty queue
. . .
这是一个很大的打印输出,但我们可以看到 Redis 开始旋转,并且有多个工作线程轮询 Redis 队列。 我们还可以看到多个worker同时处理多个任务。 这里描述了如何向服务器发出请求的示例:
在这里,我们有它! 我们有一个接受请求的服务器。 根据 URI,我们的服务器构造一个任务,将其打包成消息,然后将其发送到 Redis 队列。 然后,我们让多个工作人员轮询 Redis 队列来处理长任务。
概括
在本章中,我们构建了一个可以作为工作线程或服务器运行的应用程序。 然后,我们构建了可以序列化并插入到 Redis 队列中的结构。 这使得我们的工作人员能够消耗这些任务,然后在自己的时间处理它们。 您现在可以构建处理长时间任务的系统,而无需占用 HTTP 服务器。 序列化 Rust 结构并将其插入 Redis 的机制不仅仅止于处理大型任务。 我们可以序列化 Rust 结构,并通过 Redis 中的 pub/sub 通道将它们发送到其他 Rust 服务器,本质上是创建更大规模的参与者模型方法。
通过我们的 distroless 镜像,这些 Rust 服务器的大小大约只有 50 MB,使得这个概念具有可扩展性。 我们还探索了将原始命令应用于 Redis,这让您可以自由且自信地完全接受 Redis 所提供的功能。 进一步阅读部分给出了可以对 Redis 执行的所有命令的高级列表。 您会对自己能做的事情感到震惊,我希望您在查看可用命令时想到使用 Redis 可以实现的所有解决方案时会像我一样兴奋。
我们已经读到了本书的结尾。 我很感激你能走到这一步,当读者伸出援手时我总是很高兴。 Rust 确实是一种革命性的编程语言。 借助 Rust,我们已经能够构建和部署快速的小型服务器。 我们探索了异步编程和参与者模型。 我们已经构建了部署管道。 你的旅程还没有结束; 总是有更多东西需要学习。 然而,我希望我已经让您了解了基本概念,以便您可以继续阅读更多文档、实践,并有朝一日突破 Web 编程的界限。
相关推荐
- 【推荐】一个开源免费、AI 驱动的智能数据管理系统,支持多数据库
-
如果您对源码&技术感兴趣,请点赞+收藏+转发+关注,大家的支持是我分享最大的动力!!!.前言在当今数据驱动的时代,高效、智能地管理数据已成为企业和个人不可或缺的能力。为了满足这一需求,我们推出了这款开...
- Pure Storage推出统一数据管理云平台及新闪存阵列
-
PureStorage公司今日推出企业数据云(EnterpriseDataCloud),称其为组织在混合环境中存储、管理和使用数据方式的全面架构升级。该公司表示,EDC使组织能够在本地、云端和混...
- 对Java学习的10条建议(对java课程的建议)
-
不少Java的初学者一开始都是信心满满准备迎接挑战,但是经过一段时间的学习之后,多少都会碰到各种挫败,以下北风网就总结一些对于初学者非常有用的建议,希望能够给他们解决现实中的问题。Java编程的准备:...
- SQLShift 重大更新:Oracle→PostgreSQL 存储过程转换功能上线!
-
官网:https://sqlshift.cn/6月,SQLShift迎来重大版本更新!作为国内首个支持Oracle->OceanBase存储过程智能转换的工具,SQLShift在过去一...
- JDK21有没有什么稳定、简单又强势的特性?
-
佳未阿里云开发者2025年03月05日08:30浙江阿里妹导读这篇文章主要介绍了Java虚拟线程的发展及其在AJDK中的实现和优化。阅前声明:本文介绍的内容基于AJDK21.0.5[1]以及以上...
- 「松勤软件测试」网站总出现404 bug?总结8个原因,不信解决不了
-
在进行网站测试的时候,有没有碰到过网站崩溃,打不开,出现404错误等各种现象,如果你碰到了,那么恭喜你,你的网站出问题了,是什么原因导致网站出问题呢,根据松勤软件测试的总结如下:01数据库中的表空间不...
- Java面试题及答案最全总结(2025版)
-
大家好,我是Java面试陪考员最近很多小伙伴在忙着找工作,给大家整理了一份非常全面的Java面试题及答案。涉及的内容非常全面,包含:Spring、MySQL、JVM、Redis、Linux、Sprin...
- 数据库日常运维工作内容(数据库日常运维 工作内容)
-
#数据库日常运维工作包括哪些内容?#数据库日常运维工作是一个涵盖多个层面的综合性任务,以下是详细的分类和内容说明:一、数据库运维核心工作监控与告警性能监控:实时监控CPU、内存、I/O、连接数、锁等待...
- 分布式之系统底层原理(上)(底层分布式技术)
-
作者:allanpan,腾讯IEG高级后台工程师导言分布式事务是分布式系统必不可少的组成部分,基本上只要实现一个分布式系统就逃不开对分布式事务的支持。本文从分布式事务这个概念切入,尝试对分布式事务...
- oracle 死锁了怎么办?kill 进程 直接上干货
-
1、查看死锁是否存在selectusername,lockwait,status,machine,programfromv$sessionwheresidin(selectsession...
- SpringBoot 各种分页查询方式详解(全网最全)
-
一、分页查询基础概念与原理1.1什么是分页查询分页查询是指将大量数据分割成多个小块(页)进行展示的技术,它是现代Web应用中必不可少的功能。想象一下你去图书馆找书,如果所有书都堆在一张桌子上,你很难...
- 《战场兄弟》全事件攻略 一般事件合同事件红装及隐藏职业攻略
-
《战场兄弟》全事件攻略,一般事件合同事件红装及隐藏职业攻略。《战场兄弟》事件奖励,事件条件。《战场兄弟》是OverhypeStudios制作发行的一款由xcom和桌游为灵感来源,以中世纪、低魔奇幻为...
- LoadRunner(loadrunner录制不到脚本)
-
一、核心组件与工作流程LoadRunner性能测试工具-并发测试-正版软件下载-使用教程-价格-官方代理商的架构围绕三大核心组件构建,形成完整测试闭环:VirtualUserGenerator(...
- Redis数据类型介绍(redis 数据类型)
-
介绍Redis支持五种数据类型:String(字符串),Hash(哈希),List(列表),Set(集合)及Zset(sortedset:有序集合)。1、字符串类型概述1.1、数据类型Redis支持...
- RMAN备份监控及优化总结(rman备份原理)
-
今天主要介绍一下如何对RMAN备份监控及优化,这里就不讲rman备份的一些原理了,仅供参考。一、监控RMAN备份1、确定备份源与备份设备的最大速度从磁盘读的速度和磁带写的带度、备份的速度不可能超出这两...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- oracle位图索引 (63)
- oracle批量插入数据 (62)
- oracle事务隔离级别 (53)
- oracle 空为0 (50)
- oracle主从同步 (55)
- oracle 乐观锁 (51)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)