Pandas索引与数据操作完全指南

目录


索引基础

建立索引 vs 不建立索引的区别

核心区别

特性 索引 普通列
主要目的 标识快速访问数据行,类似字典的 存储数据内容本身
唯一性 鼓励但不强制唯一 通常不要求唯一
性能 查找快 (O(1) ~ O(logN)) 查找慢 (O(N))
数据访问 使用 .loc[] 使用 df['col'] 或布尔索引
数据对齐 自动根据索引对齐 按位置对齐

何时使用索引

  • 设置为索引:主键、时间序列、频繁查询的列
  • 保持为普通列:数据主体、用于计算和筛选的列

示例代码

import pandas as pd

# 创建示例数据
data = {
    'user_id': [f'user_{i}' for i in range(1, 1000001)],
    'name': ['Alice', 'Bob', 'Charlie'] * 333334,
    'score': np.random.randint(0, 100, 1000000)
}
df = pd.DataFrame(data)

# 不建立索引的查询(慢)
df[df['user_id'] == 'user_123456']

# 建立索引后的查询(快)
df_indexed = df.set_index('user_id')
df_indexed.loc['user_123456']

多重索引

创建多重索引

# 方法1:从DataFrame创建
df_multi = df.set_index(['code', 'date'])

# 方法2:使用MultiIndex.from_*方法
arrays = [['Beijing', 'Beijing', 'Shanghai', 'Shanghai'],
          [2020, 2021, 2020, 2021]]
index = pd.MultiIndex.from_tuples(list(zip(*arrays)), names=['City', 'Year'])

# 方法3:从笛卡尔积创建
index = pd.MultiIndex.from_product([['Beijing', 'Shanghai'], [2020, 2021]], 
                                   names=['City', 'Year'])

数据查询与选择

# 选择特定城市的所有数据
df_multi.loc['Beijing']

# 选择特定城市和年份
df_multi.loc[('Beijing', 2021)]

# 使用xs方法进行交叉选择
df_multi.xs(2020, level='Year')

# 切片操作
df_multi.loc['Beijing':'Shanghai']
df_multi.loc[('Beijing', 2020):('Shanghai', 2021)]

查看指定代码在特定日期范围的数据

# 方法1:使用.loc[]进行切片
target_code = '000001'
start_date = '2019-01-01'
end_date = '2019-12-31'
result = data_indexed.loc[(target_code, start_date):(target_code, end_date)]

# 方法2:布尔索引
mask = ((data_indexed.index.get_level_values('code') == target_code) & 
        (data_indexed.index.get_level_values('date') >= start_date) & 
        (data_indexed.index.get_level_values('date') <= end_date))
result = data_indexed[mask]

# 方法3:使用xs()方法
stock_data = data_indexed.xs(target_code, level='code')
result = stock_data.loc[start_date:end_date]

实际应用场景

# 时间序列数据
dates = pd.date_range('2023-01-01', periods=6, freq='M')
multi_index = pd.MultiIndex.from_product([['Store_A', 'Store_B'], dates], 
                                        names=['Store', 'Date'])

# 学生成绩数据
students = ['Alice', 'Bob', 'Charlie']
subjects = ['Math', 'Science', 'English']
semesters = [1, 2]
index = pd.MultiIndex.from_product([students, subjects, semesters], 
                                  names=['Student', 'Subject', 'Semester'])

索引操作

# 重置索引
df_reset = df_multi.reset_index()

# 修改索引名称
df_multi.index.names = ['City_Name', 'Year_Num']

# 交换索引级别
df_swapped = df_multi.swaplevel('Year', 'City')

# 排序
df_sorted = df_multi.sort_index()
df_sorted_level = df_multi.sort_index(level='Year')

数据查询与筛选

基本查询

# 单级索引查询
df.loc['label']                    # 单行
df.loc[['label1', 'label2']]       # 多行

# 列选择
df['column_name']                  # 单列
df[['col1', 'col2']]               # 多列

# 条件筛选
df[df['column'] > 10]              # 布尔索引

多重索引查询

# 查看指定股票代码数据
data_indexed.loc['SZ#000001']

# 排序(注意正确方法)
data_indexed.loc['SZ#000001'].sort_values(by='date', ascending=True)  # 如果date是列
data_indexed.loc['SZ#000001'].sort_index(level='date', ascending=True) # 如果date是索引

统计计算

# 计算收涨天数(正确语法)
sz_data = data_indexed.loc['SZ#000001']
positive_count = (sz_data['daily_change'] > 0).sum()
# 或
positive_count = len(sz_data[sz_data['daily_change'] > 0])

# 详细统计
positive_days = len(sz_data[sz_data['daily_change'] > 0])
negative_days = len(sz_data[sz_data['daily_change'] < 0])
zero_days = len(sz_data[sz_data['daily_change'] == 0])

DataFrame创建与新增数据

新建DataFrame

# 方法1:指定列名创建空DataFrame
df = pd.DataFrame(columns=['name', 'age', 'city', 'salary'])

# 方法2:从字典创建
data = {'name': ['Alice', 'Bob'], 'age': [25, 30]}
df = pd.DataFrame(data)

# 方法3:从列表创建
data_list = [['Alice', 25], ['Bob', 30]]
df = pd.DataFrame(data_list, columns=['name', 'age'])

新增行数据

# 方法1:使用loc[]新增行(最常用)
df.loc[2] = ['Charlie', 28, 'Guangzhou', 55000]
df.loc[3] = {'name': 'David', 'age': 35, 'city': 'Shenzhen', 'salary': 70000}

# 方法2:使用pd.concat()(推荐替代append)
new_data = pd.DataFrame({
    'name': ['Frank'],
    'age': [32],
    'city': ['Chengdu'],
    'salary': [65000]
})
df = pd.concat([df, new_data], ignore_index=True)

# 方法3:批量新增多行
new_rows = [
    {'name': 'Grace', 'age': 29, 'city': 'Xian', 'salary': 52000},
    {'name': 'Henry', 'age': 31, 'city': 'Nanjing', 'salary': 58000}
]
new_rows_df = pd.DataFrame(new_rows)
df = pd.concat([df, new_rows_df], ignore_index=True)

性能考虑

# 逐行添加(较慢)
df_slow = pd.DataFrame(columns=['A', 'B', 'C'])
for i in range(1000):
    df_slow.loc[i] = [i, i*2, i*3]

# 批量添加(较快)
data_list = []
for i in range(1000):
    data_list.append([i, i*2, i*3])
df_fast = pd.DataFrame(data_list, columns=['A', 'B', 'C'])

数据计算与转换

新增计算列

# 在多重索引DataFrame中新增列

# 1. 当日收盘价-开盘价
df_result['daily_change'] = df_result['close'] - df_result['open']

# 2. 后一天开盘价-前一天收盘价(隔夜跳空)
df_result['overnight_gap'] = df_result.groupby(level='code').apply(
    lambda x: x['open'].shift(-1) - x['close']
).values

# 3. 使用transform方法
def calculate_overnight_gap(group):
    return group['open'].shift(-1) - group['close']
df_result['overnight_gap'] = df_result.groupby(level='code').transform(calculate_overnight_gap)

groupby + shift 详解

# 代码分解
df_result.groupby(level='code').apply(
    lambda x: x['open'].shift(-1) - x['close']
)

# 含义:
# 1. groupby(level='code') - 按股票代码分组
# 2. apply(lambda x: ...) - 对每个分组应用函数
# 3. x['open'].shift(-1) - 获取下一日的开盘价
# 4. - x['close'] - 减去当日收盘价

# 结果解释:
# 正值:次日高开
# 负值:次日低开
# NaN:最后一天(没有下一天)

数据处理技巧

# 处理边界情况
df_result['is_last_day'] = df_result.groupby(level='code')['open'].shift(-1).isna()

# 日期类型转换
if not pd.api.types.is_datetime64_any_dtype(data_indexed.index.get_level_values('date')):
    data_temp = data_indexed.reset_index()
    data_temp['date'] = pd.to_datetime(data_temp['date'])
    data_indexed = data_temp.set_index(['code', 'date'])

# 数据对齐
s1 = pd.Series([1, 2, 3], index=['a', 'b', 'c'])
s2 = pd.Series([10, 20, 30], index=['b', 'c', 'd'])
result = s1 + s2  # 根据索引自动对齐

常见错误与解决方案

错误1:语法错误

# 错误代码
len(data_indexed['SZ#000001'][(['daily_change']>0)]])

# 正确代码
# 方法1:
len(data_indexed.loc['SZ#000001'][data_indexed.loc['SZ#000001']['daily_change'] > 0])

# 方法2:
sz_data = data_indexed.loc['SZ#000001']
positive_days = sz_data[sz_data['daily_change'] > 0]
len(positive_days)

# 方法3:
(data_indexed.loc['SZ#000001']['daily_change'] > 0).sum()

错误2:排序方法错误

# 错误代码
data_indexed.loc['SZ#000001'].sort(by='date', ascending=True)

# 正确代码
data_indexed.loc['SZ#000001'].sort_values(by='date', ascending=True)  # 按列排序
data_indexed.loc['SZ#000001'].sort_index(level='date', ascending=True) # 按索引排序

错误3:未分组计算隔夜价差

# 错误代码:不同股票数据会混在一起
wrong_result = df_result['open'].shift(-1) - df_result['close']

# 正确代码:按股票代码分组计算
correct_result = df_result.groupby(level='code').apply(
    lambda x: x['open'].shift(-1) - x['close']
)

注意事项

  1. 日期格式:确保日期列是datetime类型
  2. 索引排序:如果索引没有排序,某些切片操作可能失败
  3. 数据类型:确保计算时数据类型一致
  4. 边界值:每只股票的最后一天,shift(-1)会返回NaN

最佳实践总结

  1. 索引选择:将主键、时间序列、频繁查询的列设置为索引
  2. 数据查询:对于索引列使用.loc[],对于普通列使用布尔索引
  3. 新增数据:批量添加优于逐行添加,使用pd.concat()代替append()
  4. 分组计算:确保不同组的数据不会相互干扰
  5. 错误处理:注意括号匹配、方法名正确性、数据类型一致性
  6. 性能优化:对于大型数据集,合理使用索引可以显著提升性能

实用代码片段

# 快速查看数据结构
print(f"DataFrame形状: {df.shape}")
print(f"索引信息: {df.index}")
print(f"列信息: {list(df.columns)}")
print(f"数据类型:\n{df.dtypes}")

# 多重索引数据概览
print(f"索引级别: {df_multi.index.names}")
print(f"级别数量: {df_multi.index.nlevels}")
print(f"第一级索引唯一值: {df_multi.index.get_level_values(0).unique()}")

文档最后更新日期:2024年1月 适用Pandas版本:1.3.0+