Python调用Web API指南

目录


使用requests库(推荐)

安装requests

pip install requests

基本GET请求

import requests

response = requests.get('https://api.github.com/users/octocat')
print(response.status_code)  # 状态码
print(response.json())       # 解析JSON响应

带参数的GET请求

import requests

# 方法1:直接在URL中添加参数
response = requests.get('https://api.github.com/users/octocat/repos?page=1&per_page=10')

# 方法2:使用params参数
params = {
    'page': 1,
    'per_page': 10
}
response = requests.get('https://api.github.com/users/octocat/repos', params=params)

print(response.url)  # 查看完整的请求URL
data = response.json()

POST请求

import requests
import json

# 准备数据
data = {
    'name': 'John Doe',
    'email': 'john@example.com'
}

# 发送POST请求
response = requests.post(
    'https://httpbin.org/post',
    json=data  # 自动设置Content-Type为application/json
)

print(response.status_code)
print(response.json())

添加请求头

import requests

headers = {
    'Authorization': 'Bearer YOUR_ACCESS_TOKEN',
    'User-Agent': 'MyApp/1.0',
    'Content-Type': 'application/json'
}

response = requests.get(
    'https://api.github.com/user',
    headers=headers
)

处理认证

import requests
from requests.auth import HTTPBasicAuth

# Basic认证
response = requests.get(
    'https://api.example.com/data',
    auth=HTTPBasicAuth('username', 'password')
)

# Bearer Token认证
headers = {
    'Authorization': 'Bearer YOUR_TOKEN_HERE'
}
response = requests.get('https://api.example.com/data', headers=headers)

错误处理

import requests
from requests.exceptions import RequestException

try:
    response = requests.get('https://api.example.com/data', timeout=10)
    response.raise_for_status()  # 如果状态码不是200,抛出异常

    data = response.json()
    print(data)

except requests.exceptions.HTTPError as err:
    print(f"HTTP错误: {err}")
except requests.exceptions.ConnectionError as err:
    print(f"连接错误: {err}")
except requests.exceptions.Timeout as err:
    print(f"请求超时: {err}")
except requests.exceptions.RequestException as err:
    print(f"请求异常: {err}")

使用urllib(Python标准库)

import urllib.request
import urllib.parse
import json

# GET请求
url = 'https://api.github.com/users/octocat'
with urllib.request.urlopen(url) as response:
    data = json.loads(response.read().decode())
    print(data)

# POST请求
url = 'https://httpbin.org/post'
data = {'name': 'John Doe'}
data = json.dumps(data).encode('utf-8')

req = urllib.request.Request(
    url, 
    data=data,
    headers={'Content-Type': 'application/json'}
)

with urllib.request.urlopen(req) as response:
    result = json.loads(response.read().decode())
    print(result)

异步调用(使用aiohttp)

安装aiohttp

pip install aiohttp

异步GET请求

import aiohttp
import asyncio

async def fetch_data():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.github.com/users/octocat') as response:
            data = await response.json()
            print(data)
            return data

# 运行异步函数
asyncio.run(fetch_data())

并发请求

import aiohttp
import asyncio

async def fetch_user(session, username):
    url = f'https://api.github.com/users/{username}'
    async with session.get(url) as response:
        return await response.json()

async def main():
    users = ['octocat', 'torvalds', 'gvanrossum']

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_user(session, user) for user in users]
        results = await asyncio.gather(*tasks)

        for user_data in results:
            print(f"{user_data['login']}: {user_data['name']}")

asyncio.run(main())

实际示例:调用GitHub API

import requests
import json

class GitHubAPI:
    def __init__(self, token=None):
        self.base_url = 'https://api.github.com'
        self.headers = {
            'Accept': 'application/vnd.github.v3+json',
            'User-Agent': 'Python-API-Client'
        }
        if token:
            self.headers['Authorization'] = f'token {token}'

    def get_user_info(self, username):
        url = f'{self.base_url}/users/{username}'
        response = requests.get(url, headers=self.headers)
        response.raise_for_status()
        return response.json()

    def get_user_repos(self, username, page=1, per_page=30):
        url = f'{self.base_url}/users/{username}/repos'
        params = {
            'page': page,
            'per_page': per_page,
            'sort': 'updated'
        }
        response = requests.get(url, headers=self.headers, params=params)
        response.raise_for_status()
        return response.json()

    def create_repo(self, name, description="", private=False):
        if not hasattr(self, 'token'):
            raise Exception("需要认证token来创建仓库")

        url = f'{self.base_url}/user/repos'
        data = {
            'name': name,
            'description': description,
            'private': private,
            'auto_init': True
        }
        response = requests.post(url, headers=self.headers, json=data)
        response.raise_for_status()
        return response.json()

# 使用示例
if __name__ == "__main__":
    # 不需要认证的调用
    github = GitHubAPI()
    user_info = github.get_user_info('octocat')
    print(f"用户: {user_info['login']}")
    print(f"姓名: {user_info.get('name', 'N/A')}")
    print(f"仓库数: {user_info['public_repos']}")

    # 获取仓库列表
    repos = github.get_user_repos('octocat', per_page=5)
    for repo in repos:
        print(f"- {repo['name']}: {repo['description']}")

处理分页

import requests

def get_all_repos(username):
    repos = []
    page = 1
    per_page = 100

    while True:
        url = f'https://api.github.com/users/{username}/repos'
        params = {
            'page': page,
            'per_page': per_page
        }

        response = requests.get(url, params=params)
        response.raise_for_status()

        page_repos = response.json()
        if not page_repos:  # 没有更多数据
            break

        repos.extend(page_repos)

        # 检查是否有下一页
        if 'next' in response.links:
            page += 1
        else:
            break

    return repos

# 使用
all_repos = get_all_repos('octocat')
print(f"总仓库数: {len(all_repos)}")

使用会话保持连接

import requests

# 使用Session可以保持连接,提高性能
with requests.Session() as session:
    session.headers.update({
        'User-Agent': 'MyApp/1.0',
        'Accept': 'application/json'
    })

    # 所有请求都会使用相同的会话
    response1 = session.get('https://api.example.com/endpoint1')
    response2 = session.get('https://api.example.com/endpoint2')

最佳实践

  1. 总是处理异常 - 网络请求可能会失败
  2. 设置超时 - 避免请求永远挂起
  3. 使用会话 - 对于多个请求到同一API
  4. 尊重速率限制 - 检查API的速率限制
  5. 缓存响应 - 对于不经常变化的数据
  6. 使用环境变量存储敏感信息 - 如API密钥
import os
import requests
from dotenv import load_dotenv

# 从.env文件加载环境变量
load_dotenv()

API_KEY = os.getenv('API_KEY')
BASE_URL = os.getenv('API_BASE_URL')

def make_authenticated_request(endpoint):
    headers = {
        'Authorization': f'Bearer {API_KEY}',
        'Content-Type': 'application/json'
    }

    response = requests.get(
        f'{BASE_URL}/{endpoint}',
        headers=headers,
        timeout=30
    )
    response.raise_for_status()
    return response.json()

文档创建时间: 2025-11-21
最后更新: 2025-11-21
版本: 1.0
适用Python版本: Python 3.6+