Pandas索引与数据操作完全指南
目录
索引基础
建立索引 vs 不建立索引的区别
核心区别
| 特性 | 索引 | 普通列 |
|---|---|---|
| 主要目的 | 标识和快速访问数据行,类似字典的键 | 存储数据内容本身 |
| 唯一性 | 鼓励但不强制唯一 | 通常不要求唯一 |
| 性能 | 查找快 (O(1) ~ O(logN)) | 查找慢 (O(N)) |
| 数据访问 | 使用 .loc[] |
使用 df['col'] 或布尔索引 |
| 数据对齐 | 自动根据索引对齐 | 按位置对齐 |
何时使用索引
- 设置为索引:主键、时间序列、频繁查询的列
- 保持为普通列:数据主体、用于计算和筛选的列
示例代码
import pandas as pd
# 创建示例数据
data = {
'user_id': [f'user_{i}' for i in range(1, 1000001)],
'name': ['Alice', 'Bob', 'Charlie'] * 333334,
'score': np.random.randint(0, 100, 1000000)
}
df = pd.DataFrame(data)
# 不建立索引的查询(慢)
df[df['user_id'] == 'user_123456']
# 建立索引后的查询(快)
df_indexed = df.set_index('user_id')
df_indexed.loc['user_123456']
多重索引
创建多重索引
# 方法1:从DataFrame创建
df_multi = df.set_index(['code', 'date'])
# 方法2:使用MultiIndex.from_*方法
arrays = [['Beijing', 'Beijing', 'Shanghai', 'Shanghai'],
[2020, 2021, 2020, 2021]]
index = pd.MultiIndex.from_tuples(list(zip(*arrays)), names=['City', 'Year'])
# 方法3:从笛卡尔积创建
index = pd.MultiIndex.from_product([['Beijing', 'Shanghai'], [2020, 2021]],
names=['City', 'Year'])
数据查询与选择
# 选择特定城市的所有数据
df_multi.loc['Beijing']
# 选择特定城市和年份
df_multi.loc[('Beijing', 2021)]
# 使用xs方法进行交叉选择
df_multi.xs(2020, level='Year')
# 切片操作
df_multi.loc['Beijing':'Shanghai']
df_multi.loc[('Beijing', 2020):('Shanghai', 2021)]
查看指定代码在特定日期范围的数据
# 方法1:使用.loc[]进行切片
target_code = '000001'
start_date = '2019-01-01'
end_date = '2019-12-31'
result = data_indexed.loc[(target_code, start_date):(target_code, end_date)]
# 方法2:布尔索引
mask = ((data_indexed.index.get_level_values('code') == target_code) &
(data_indexed.index.get_level_values('date') >= start_date) &
(data_indexed.index.get_level_values('date') <= end_date))
result = data_indexed[mask]
# 方法3:使用xs()方法
stock_data = data_indexed.xs(target_code, level='code')
result = stock_data.loc[start_date:end_date]
实际应用场景
# 时间序列数据
dates = pd.date_range('2023-01-01', periods=6, freq='M')
multi_index = pd.MultiIndex.from_product([['Store_A', 'Store_B'], dates],
names=['Store', 'Date'])
# 学生成绩数据
students = ['Alice', 'Bob', 'Charlie']
subjects = ['Math', 'Science', 'English']
semesters = [1, 2]
index = pd.MultiIndex.from_product([students, subjects, semesters],
names=['Student', 'Subject', 'Semester'])
索引操作
# 重置索引
df_reset = df_multi.reset_index()
# 修改索引名称
df_multi.index.names = ['City_Name', 'Year_Num']
# 交换索引级别
df_swapped = df_multi.swaplevel('Year', 'City')
# 排序
df_sorted = df_multi.sort_index()
df_sorted_level = df_multi.sort_index(level='Year')
数据查询与筛选
基本查询
# 单级索引查询
df.loc['label'] # 单行
df.loc[['label1', 'label2']] # 多行
# 列选择
df['column_name'] # 单列
df[['col1', 'col2']] # 多列
# 条件筛选
df[df['column'] > 10] # 布尔索引
多重索引查询
# 查看指定股票代码数据
data_indexed.loc['SZ#000001']
# 排序(注意正确方法)
data_indexed.loc['SZ#000001'].sort_values(by='date', ascending=True) # 如果date是列
data_indexed.loc['SZ#000001'].sort_index(level='date', ascending=True) # 如果date是索引
统计计算
# 计算收涨天数(正确语法)
sz_data = data_indexed.loc['SZ#000001']
positive_count = (sz_data['daily_change'] > 0).sum()
# 或
positive_count = len(sz_data[sz_data['daily_change'] > 0])
# 详细统计
positive_days = len(sz_data[sz_data['daily_change'] > 0])
negative_days = len(sz_data[sz_data['daily_change'] < 0])
zero_days = len(sz_data[sz_data['daily_change'] == 0])
DataFrame创建与新增数据
新建DataFrame
# 方法1:指定列名创建空DataFrame
df = pd.DataFrame(columns=['name', 'age', 'city', 'salary'])
# 方法2:从字典创建
data = {'name': ['Alice', 'Bob'], 'age': [25, 30]}
df = pd.DataFrame(data)
# 方法3:从列表创建
data_list = [['Alice', 25], ['Bob', 30]]
df = pd.DataFrame(data_list, columns=['name', 'age'])
新增行数据
# 方法1:使用loc[]新增行(最常用)
df.loc[2] = ['Charlie', 28, 'Guangzhou', 55000]
df.loc[3] = {'name': 'David', 'age': 35, 'city': 'Shenzhen', 'salary': 70000}
# 方法2:使用pd.concat()(推荐替代append)
new_data = pd.DataFrame({
'name': ['Frank'],
'age': [32],
'city': ['Chengdu'],
'salary': [65000]
})
df = pd.concat([df, new_data], ignore_index=True)
# 方法3:批量新增多行
new_rows = [
{'name': 'Grace', 'age': 29, 'city': 'Xian', 'salary': 52000},
{'name': 'Henry', 'age': 31, 'city': 'Nanjing', 'salary': 58000}
]
new_rows_df = pd.DataFrame(new_rows)
df = pd.concat([df, new_rows_df], ignore_index=True)
性能考虑
# 逐行添加(较慢)
df_slow = pd.DataFrame(columns=['A', 'B', 'C'])
for i in range(1000):
df_slow.loc[i] = [i, i*2, i*3]
# 批量添加(较快)
data_list = []
for i in range(1000):
data_list.append([i, i*2, i*3])
df_fast = pd.DataFrame(data_list, columns=['A', 'B', 'C'])
数据计算与转换
新增计算列
# 在多重索引DataFrame中新增列
# 1. 当日收盘价-开盘价
df_result['daily_change'] = df_result['close'] - df_result['open']
# 2. 后一天开盘价-前一天收盘价(隔夜跳空)
df_result['overnight_gap'] = df_result.groupby(level='code').apply(
lambda x: x['open'].shift(-1) - x['close']
).values
# 3. 使用transform方法
def calculate_overnight_gap(group):
return group['open'].shift(-1) - group['close']
df_result['overnight_gap'] = df_result.groupby(level='code').transform(calculate_overnight_gap)
groupby + shift 详解
# 代码分解
df_result.groupby(level='code').apply(
lambda x: x['open'].shift(-1) - x['close']
)
# 含义:
# 1. groupby(level='code') - 按股票代码分组
# 2. apply(lambda x: ...) - 对每个分组应用函数
# 3. x['open'].shift(-1) - 获取下一日的开盘价
# 4. - x['close'] - 减去当日收盘价
# 结果解释:
# 正值:次日高开
# 负值:次日低开
# NaN:最后一天(没有下一天)
数据处理技巧
# 处理边界情况
df_result['is_last_day'] = df_result.groupby(level='code')['open'].shift(-1).isna()
# 日期类型转换
if not pd.api.types.is_datetime64_any_dtype(data_indexed.index.get_level_values('date')):
data_temp = data_indexed.reset_index()
data_temp['date'] = pd.to_datetime(data_temp['date'])
data_indexed = data_temp.set_index(['code', 'date'])
# 数据对齐
s1 = pd.Series([1, 2, 3], index=['a', 'b', 'c'])
s2 = pd.Series([10, 20, 30], index=['b', 'c', 'd'])
result = s1 + s2 # 根据索引自动对齐
常见错误与解决方案
错误1:语法错误
# 错误代码
len(data_indexed['SZ#000001'][(['daily_change']>0)]])
# 正确代码
# 方法1:
len(data_indexed.loc['SZ#000001'][data_indexed.loc['SZ#000001']['daily_change'] > 0])
# 方法2:
sz_data = data_indexed.loc['SZ#000001']
positive_days = sz_data[sz_data['daily_change'] > 0]
len(positive_days)
# 方法3:
(data_indexed.loc['SZ#000001']['daily_change'] > 0).sum()
错误2:排序方法错误
# 错误代码
data_indexed.loc['SZ#000001'].sort(by='date', ascending=True)
# 正确代码
data_indexed.loc['SZ#000001'].sort_values(by='date', ascending=True) # 按列排序
data_indexed.loc['SZ#000001'].sort_index(level='date', ascending=True) # 按索引排序
错误3:未分组计算隔夜价差
# 错误代码:不同股票数据会混在一起
wrong_result = df_result['open'].shift(-1) - df_result['close']
# 正确代码:按股票代码分组计算
correct_result = df_result.groupby(level='code').apply(
lambda x: x['open'].shift(-1) - x['close']
)
注意事项
- 日期格式:确保日期列是datetime类型
- 索引排序:如果索引没有排序,某些切片操作可能失败
- 数据类型:确保计算时数据类型一致
- 边界值:每只股票的最后一天,
shift(-1)会返回NaN
最佳实践总结
- 索引选择:将主键、时间序列、频繁查询的列设置为索引
- 数据查询:对于索引列使用
.loc[],对于普通列使用布尔索引 - 新增数据:批量添加优于逐行添加,使用
pd.concat()代替append() - 分组计算:确保不同组的数据不会相互干扰
- 错误处理:注意括号匹配、方法名正确性、数据类型一致性
- 性能优化:对于大型数据集,合理使用索引可以显著提升性能
实用代码片段
# 快速查看数据结构
print(f"DataFrame形状: {df.shape}")
print(f"索引信息: {df.index}")
print(f"列信息: {list(df.columns)}")
print(f"数据类型:\n{df.dtypes}")
# 多重索引数据概览
print(f"索引级别: {df_multi.index.names}")
print(f"级别数量: {df_multi.index.nlevels}")
print(f"第一级索引唯一值: {df_multi.index.get_level_values(0).unique()}")
文档最后更新日期:2024年1月 适用Pandas版本:1.3.0+