百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

用SendGrid和Redis队列用Python调度国际空间站的电子邮件

mhr18 2025-05-27 15:58 41 浏览 0 评论

有一些很酷的API,比如打开通知我们可以编程地访问国际空间站的位置,以确定它何时经过特定的位置,而使用Twilio SendGrid,我们可以在发生这种情况时发送电子邮件通知。

让我们介绍一下如何在Python中使用排定电子邮件的Redis队列 .

先决条件和依赖性

在继续前进之前,请确保有以下内容:

  • Python 3安装在你的机器上
  • 免费的SendGrid帐户
  • 测试此项目的电子邮件地址

这是你可以遵循的指南如果您要使用Python进行更多的Web开发,并且不熟悉诸如虚拟环境之类的东西,则需要设置您的开发环境。

在编写代码之前,您需要安装一些依赖项:

  • 这个SendGrid Python librarY用于发送电子邮件
  • RQ调度器-构建在另一个工具之上的轻量级、优雅的解决方案,该工具具有较低的进入门槛,称为Redis队列。
  • 请求-用于发出HTTP请求

请确保创建并激活虚拟环境,然后使用以下命令安装这些环境:

pip install sendgrid rq-scheduler==0.11.0 requests==2.26.0

RQ和RedisPython模块将作为依赖关系安装RQScheduler。为了让RQ工作,您还需要安装Redis在你的机器上。可以使用以下命令执行以下操作:wget :

wget https://download.redis.io/releases/redis-6.2.6.tar.gz
tar xzf redis-6.2.6.tar.gz
cd redis-6.2.6
make

使用命令在默认端口上的单独终端窗口中运行Redissrc/redis-server从安装它的目录。

进入国际空间站的位置

让我们首先编写代码,为给定的坐标集调用OpenNotificationAPI,并打印下一次ISS将沿着该纬度和经度飞行的时间。

创建一个名为iss.py(“国际空间站”模块)在您希望使用代码的目录中,并添加以下功能:

from datetime import datetime
import pytz

import requests

ISS_URL = 'http://api.open-notify.org/iss-pass.json'


def get_next_pass(lat, lon):
    location = { 'lat': lat, 'lon': lon }
    response = requests.get(ISS_URL, params=location).json()

    if 'response' in response:
        next_pass = response['response'][0]['risetime']
        next_pass_datetime = datetime.fromtimestamp(next_pass, tz=pytz.utc)
        print('Next pass for {}, {} is: {}'
              .format(lat, lon, next_pass_datetime))
        return next_pass_datetime
    else:
        print('No ISS flyby can be determined for {}, {}'.format(lat, lon))

这个get_next_pass函数在此代码中将向具有给定纬度和经度的OpenNotificationAPI发出请求,检查是否有有效的响应,然后将从API收到的时间戳转换为PythondateTime对象,并打印下一个ISS将在空中飞行的相应时间。

要测试这段代码,请打开Pythonshell并运行以下两行。在这个例子中,我们将使用旧金山的Twilio总部作为我们的测试位置(纬度:37.788052,经度:-122.391472):

from iss import get_next_pass
get_next_pass(37.788052, -122.391472)

你应该看到这样的东西:Next pass for 37.788052, -122.391472 is: 2021-12-09 23:58:11+00:00有一个合适的时间戳。

现在我们可以继续编写代码发送电子邮件了。

注册sendGrid并创建一个API密钥

创建SendGrid帐户,您可以为本教程选择免费层。一旦你有了账户,你就需要创建API密钥从这张截图中可以看到。您可以将它命名为您想要的任何名称,但是一旦创建了它,请确保在继续之前保存它!

保存此API键的一个好方法是将其设置为可以从Python代码中访问的环境变量,以避免在代码中直接编写它。设置SENDGRID_API_KEY环境变量是SendGrid帐户中的API密钥。不过,在其他地方做笔记也没什么坏处,因为你不能再看一遍了。这是一个如果需要帮助设置环境变量,请提供有用的教程。。稍后我们将使用这个API密钥。

用Python发送电子邮件

现在您有了SendGrid帐户和API密钥,您可以更新iss.py若要包含发送电子邮件的代码:

from datetime import datetime
import os
import pytz

import requests
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail


ISS_URL = 'http://api.open-notify.org/iss-pass.json'


def send_email(from_email, to_email, body):
    message = Mail(
        from_email=from_email,
        to_emails=to_email,
        subject='International Space Station passing by!',
        html_content=body)

    sg = SendGridAPIClient(os.environ.get('SENDGRID_API_KEY'))
    response = sg.send(message)
    print(response.status_code, response.body, response.headers)


def get_next_pass(lat, lon):
    location = { 'lat': lat, 'lon': lon }
    response = requests.get(ISS_URL, params=location).json()

    if 'response' in response:
        next_pass = response['response'][0]['risetime']
        next_pass_datetime = datetime.fromtimestamp(next_pass, tz=pytz.utc)
        print('Next pass for {}, {} is: {}'
              .format(lat, lon, next_pass_datetime))
        return next_pass_datetime
    else:
        print('No ISS flyby can be determined for {}, {}'.format(lat, lon))

请记住,在尝试运行此代码之前,要确保设置了SendGridAPI密钥环境变量。

请注意,在生产应用程序中,建议验证 发送者身份 通过完成 域认证 ...发件人身份代表您的“从”电子邮件地址-您的收件人视为您的电子邮件发件人的地址。有关此问题的一步一步的教程,请查看: 如何为Twilio SendGrid设置域认证 .

如果要测试此代码,请打开Pythonshell并运行以下代码,替换to_email与您自己的电子邮件地址的争论:

from iss import send_email
send_email('from_email@example.com', 'your_email@example.com', 'Look up!')

您应该收到一封电子邮件,告诉您在运行此代码后要查找。

用RQ调度器调度任务

现在我们有了一个为我们提供日期时间的函数,还有一个发送电子邮件的函数,我们可以使用RQScheduler。创建另一个名为schedule_notification.py,并向其添加以下代码:

from datetime import datetime

from redis import Redis
from rq_scheduler import Scheduler

import iss

scheduler = Scheduler(connection=Redis()) # Get a scheduler for the "default" queue

# Change these latitude and longitude values for any location you want.
next_pass = iss.get_next_pass(37.788052, -122.391472)

if next_pass:
    scheduler.enqueue_at(next_pass, iss.send_email,
                         'from_email@example.com', 'your_email@example.com',
                         'Look up! The ISS is flying above you!')

这只是一个快速的脚本,调用您编写的其他功能,一个是了解国际空间站何时经过您的位置下,另一个将向您发送电子邮件。在本例中,我使用了旧金山Twilio办公室的坐标,但您可以将纬度和经度更改为您所在的任何位置。

在能够运行此代码之前,必须确保在其他终端窗口或后台进程中运行Redis服务器、RQ工作者和RQ Scheduler进程。您应该已经通过使用命令运行Redis服务器了。src/redis-server从安装Redis的目录。再打开两个终端窗口,在这两个窗口中导航到代码所在的目录,并激活该项目的虚拟环境。在一个窗口中运行命令rqworker在另一个命令中运行命令rqscheduler .

这样做之后,您应该准备好运行代码来调度通知:

python schedule_notification.py

现在你要做的就是等..。

穿越时间

这很好,但是如果您不想等着看您的代码是否工作,这是可以理解的。如果你想即时满足,我们可以使用时间旅行的方法。在基于unix的系统上,可以使用日期命令。

如果空间站预定在2019年12月5日4:02飞过,那么你可以在linux上运行。date -s "12/05/2019 03:02:00"。在OSX上你会运行date 1205160219(您甚至可以使用-u参数如果要使用UTC时区,该时区对应于Python代码正在打印的日期时间)。如果所有这些都失败了,也有GUI选项来改变您的计算机在大多数操作系统上的时间。

在OSX上,您可以通过在系统首选项中打开“日期和时间”来设置(并重置)此选项。

如果您希望在ISS每次经过时而不是仅收到一次通知,则可以在每条消息之后安排另一次通知,方法是修改send_emailiss.py并添加适当的导入语句:

from redis import Redis
from rq_scheduler import Scheduler

scheduler = Scheduler(connection=Redis()) # Get a scheduler for the "default" queue


def send_email(from_email, to_email, body):
    message = Mail(
        from_email=from_email,
        to_emails=to_email,
        subject='International Space Station passing by!',
        html_content=body)

    sg = SendGridAPIClient(os.environ.get('SENDGRID_API_KEY'))
    response = sg.send(message)
    print(response.status_code, response.body, response.headers)
    scheduler.enqueue_at(next_pass, iss.send_email,
                         'from_email@example.com', 'your_email@example.com',
                         'Look up! The ISS is flying above you!')

无限与超越

现在您可以在国际空间站经过时接收电子邮件,您可以使用RQ Scheduler来满足所有Python调度需求。可能性是无限的。

原文
Https://www.twilio.com/blog/scheduling-international-space-station-emails-in-python-with-sendgrid-and-redis-queue

相关推荐

保持SSH隧道活跃:一个实用的Bash监控脚本

引言如果您正在使用AWSDocumentDB或任何位于堡垒主机后面的云托管服务等远程资源,您可能正在使用SSH隧道来安全地访问它们。虽然设置SSH隧道很简单,但保持其活跃状态并监控其状态可能会有些棘...

京东大佬问我,为什么说连接池是微服务的关键,你是如何理解的?

京东大佬问我,为什么说连接池是微服务的关键,你是如何理解的?我应该如何理解。首先,我需要回忆一下连接池和微服务的基本概念,然后思考它们在微服务架构中的作用和重要性。连接池,数据库连接池,用来管理数据库...

OOM 血案:5 小时绝地求生,MAT+Arthas 终极排查指南

一、血案现场:线上服务突然暴毙2025年4月12日凌晨3点15分,服务突发大规模OOM,三个Pod在10分钟内连续崩溃,Prometheus告警显示JVM堆内存使用率...

记Tomcat优化方案

Tomcat服务吞吐量评估方案问题:评估方案在一台8核16G的linux服务器上,使用tomcat容器部署服务。在正常情况下如何评估这个tomcat服务可处理的连接数,即服务的吞吐量,请在正常情况下考...

Java高级面试,常见数据结构的实现原理详细说明及面试总结

一、List接口实现类1.ArrayList底层结构:动态数组(Object[]数组)。核心原理:o动态扩容:初始容量为10(JDK1.8),当元素超过容量时,新容量为原容量的1.5倍(old...

SpringBoot敏感配置项加密与解密实战

一、为什么要加密配置?先说说SpringBoot的配置加载机制。我们知道,SpringBoot支持多种配置加载方式,优先级从高到低大概是:命令行参数环境变量application-{profile}....

【面试题】nacos 配置管理类型-主配置、共享配置、扩展配置

nacos配置管理类型-主配置、共享配置、扩展配置Nacos的配置管理支持多种类型,其中共享配置及其扩展机制(如shared-configs和extension-configs)是微服...

Spring Boot 的 RedisAutoConfiguration 配置:自动装配到自定义扩展

在SpringBoot开发中,Redis作为高性能缓存和分布式数据存储方案被广泛使用。而RedisAutoConfiguration作为SpringBoot自动装配体系的重要组成部分,能...

Docker图像处理:扩展您的优化工作流程

随着应用程序的增长和图像处理需求的增加,传统的优化方法遇到了扩展瓶颈。内存限制、环境不一致和处理瓶颈将图像优化从一个已解决的问题变成了生产环境的噩梦。Docker改变了游戏规则。通过容器化图像处理工作...

掌握 Spring 框架这 10 个扩展点,让你的能力更上一层楼

当我们提到Spring时,或许首先映入脑海的是IOC(控制反转)和AOP(面向切面编程)。它们可以被视为Spring的基石。正是凭借其出色的设计,Spring才能在众多优秀框架中脱颖而出...

简简单单在线文件浏览的功能搞起来很头疼

您的系统支持在线预览文件吗?一个小小的问题,背后是无数程序员的爆肝研究,有人说了,我平时打开个文件不是很容易吗?其实不然。文件格式代表着软件行业的底层、高端产出,也代表着经久不衰的使用场景,也是我国底...

没硬盘、网盘也能看片自由!NAS一键部署MoonTV,随时随地爽看。

本内容来源于@什么值得买APP,观点仅代表作者本人|作者:羊刀仙有没有一个应用服务,能满足既没有足够预算购置硬盘,也不想依托网盘的朋友的家庭观影需求?之前我介绍过LibreTV,本篇再来看看另一个更...

阿里云ECS代理商:如何使用ECS部署Node.js应用?

Node.js作为一种高性能、事件驱动的JavaScript运行环境,广泛用于构建实时通信、微服务接口、后台管理系统等现代Web应用。而阿里云ECS服务器以高可用性、灵活配置、安全稳定等优势,为部署N...

阿里云数据库代理商:如何提高数据库的查询效率?

在现代企业应用中,数据库查询效率对整体系统性能的影响巨大。特别是随着数据量的不断增加,如何提升数据库查询的响应速度,成为了数据库优化的关键任务。阿里云提供了一系列工具和策略,帮助用户提升数据库的查询效...

阿里云代理商:阿里云G6ne实例如何承载1.4亿QPS?

一、阿里云G6ne实例概述1.1G6ne实例的背景与定位阿里云G6ne实例是基于阿里云自主研发的“飞天”架构设计的高性能云服务器实例,专为大规模、需要高IOPS和低延迟的业务场景设计。它采用了更强大的...

取消回复欢迎 发表评论: