1. 使用天气 API
现在,我们已经掌握了基础,让我们来看看一个实际的场景。假设我们需要每隔 30 分钟收集一次天气数据。为此,我们将使用天气数据获取 API。当然,出于教学目的,使用真实的 API 可能会受限,所以我们假设一下场景。
通过 OpenWeather API 获取当前天气
这个示例展示了如何使用 requests
通过 OpenWeather API 获取特定城市的天气数据。
import schedule
import time
import requests
def fetch_weather(city):
api_key = "YOUR_API_KEY" # 替换为你的 OpenWeather API 密钥
url = f"http://api.openweathermap.org/data/2.5/weather?q={city}&appid={api_key}&units=metric"
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
temperature = data["main"]["temp"]
weather_description = data["weather"][0]["description"]
print(f"{city}的当前温度: {temperature}°C")
print(f"天气描述: {weather_description}")
except requests.exceptions.RequestException as e:
print(f"获取天气数据时出错: {e}")
def fetch_weather_of_london():
fetch_weather("London")
# 设置每 30 分钟执行一次任务
schedule.every(30).minutes.do(fetch_weather_of_london)
while True:
schedule.run_pending()
time.sleep(1)
在这里,我们发送了一个 GET 请求到 OpenWeather API 来获取指定城市的当前天气。在返回的 JSON 数据中,我们提取了温度和天气描述并打印到屏幕上。记得将 YOUR_API_KEY
替换为你的 API 密钥。
通过 API 获取当前汇率
在这个示例中,我们将使用 requests
通过 API 获取当前的汇率。
import schedule
import time
import requests
def fetch_exchange_rate():
url = "https://api.exchangerate-api.com/v4/latest/USD"
try:
response = requests.get(url)
response.raise_for_status() # 检查请求是否成功
data = response.json()
usd_to_eur = data["rates"]["EUR"]
print(f"当前 USD 转 EUR 汇率: {usd_to_eur}")
except requests.exceptions.RequestException as e:
print(f"获取数据时发生错误: {e}")
# 设置每 10 分钟执行一次任务
schedule.every(10).minutes.do(fetch_exchange_rate)
while True:
schedule.run_pending()
time.sleep(1)
在此,我们发送了一个 GET 请求到汇率 API,获取了 JSON 格式的数据。从 JSON 响应中提取了 USD 到 EUR 的汇率并打印出来。通过修改 data["rates"]
中的键,可以适配为其他货币对收集数据。
通过这个脚本,我们可以持续收集天气和汇率数据。不赖吧,作为个开始。
现实使用场景
数据收集的自动化在很多场景中都很有用:
- 服务器状态监控: 自动检查服务器状态可以在问题发生前就检测并预警。
- 社交媒体数据收集: 持续分析趋势和品牌提及情况。
- 汇率跟踪: 汇率的变化对于商业或个人需求可能会有帮助。
2. 自动化网页数据收集示例
如果我们想从网页收集数据呢?比如,定期检查新闻更新。此时 BeautifulSoup
和 requests
可以派上用场。
从网页收集数据
假设我们有一个网站,希望从中收集新闻标题。可以这么做:
import requests
from bs4 import BeautifulSoup
def fetch_news():
response = requests.get("http://example.com/news")
soup = BeautifulSoup(response.content, 'html.parser')
for headline in soup.find_all('h2', class_='news'):
print(headline.text)
schedule.every().hour.do(fetch_news)
while True:
schedule.run_pending()
time.sleep(1)
在这个示例中,每隔 60 分钟,我们的脚本会检查网页并打印新闻标题。这简化了获取最新信息的过程。
从网站收集新闻标题
在这个例子中,我们将使用 requests
下载 HTML 页面,并用 BeautifulSoup
解析新闻标题。
import requests
from bs4 import BeautifulSoup
def fetch_news_headlines():
url = "https://www.bbc.com/news"
try:
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
headlines = soup.find_all('h3') # 查找所有 <h3> 标签 (通常标题在这里)
print("BBC 最新新闻标题:")
for headline in headlines[:5]: # 取前5个标题
print("-", headline.get_text(strip=True))
except requests.exceptions.RequestException as e:
print(f"获取数据时出错: {e}")
fetch_news_headlines()
在这里,我们加载了 BBC News 页面,使用 BeautifulSoup
查找所有 <h3>
标签的位置(通常新闻标题在这里)。我们打印出前 5 个标题,并用 strip=True
去除多余的空格和符号。
从电商网站收集商品价格数据
这个示例展示了如何从电商网站(例如 Amazon 或其他商店)提取商品价格数据。我们使用 requests
请求页面,使用 BeautifulSoup
解析价格。
import requests
from bs4 import BeautifulSoup
def fetch_product_price(url):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36"
}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
product_name = soup.find('span', {'id': 'productTitle'}).get_text(strip=True)
price = soup.find('span', {'class': 'a-price-whole'}).get_text(strip=True)
print(f"商品: {product_name}")
print(f"价格: {price} 元")
except requests.exceptions.RequestException as e:
print(f"获取数据时出错: {e}")
except AttributeError:
print("无法找到商品或价格信息")
# 示例商品链接
fetch_product_price("https://www.amazon.com/dp/B08N5WRWNW")
在这个例子中,我们发送了带有 User-Agent
的 GET 请求,以避免被阻止。然后使用 BeautifulSoup
查找商品名称(通过 id="productTitle"
)和商品价格(通过类 a-price-whole
)。我们使用 strip=True
去除了多余空格。
GO TO FULL VERSION