Python数据分析实用指南
目录
A股日线数据分析
数据获取与基本分析
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import tushare as ts
# 数据获取(使用tushare或模拟数据)
def generate_sample_data():
"""生成模拟的A股日线数据"""
dates = pd.date_range(start='2023-01-01', end='2023-12-31', freq='D')
np.random.seed(42)
price = 100
prices = []
for i in range(len(dates)):
change = np.random.normal(0, 2)
price = max(price + change, 1)
prices.append({
'date': dates[i],
'open': round(price + np.random.normal(0, 0.5), 2),
'high': round(max(price, price) + abs(np.random.normal(0, 1)), 2),
'low': round(min(price, price) - abs(np.random.normal(0, 1)), 2),
'close': round(price, 2),
'volume': np.random.randint(1000000, 10000000)
})
return pd.DataFrame(prices)
# 技术指标计算
def calculate_technical_indicators(df):
df['MA5'] = df['close'].rolling(window=5).mean()
df['MA20'] = df['close'].rolling(window=20).mean()
# RSI计算
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
df['RSI'] = 100 - (100 / (1 + rs))
# 布林带
df['BB_middle'] = df['close'].rolling(window=20).mean()
bb_std = df['close'].rolling(window=20).std()
df['BB_upper'] = df['BB_middle'] + (bb_std * 2)
df['BB_lower'] = df['BB_middle'] - (bb_std * 2)
df['daily_return'] = df['close'].pct_change()
return df
风险收益分析
def risk_analysis(df):
total_return = (df['close'].iloc[-1] / df['close'].iloc[0]) - 1
annual_return = (1 + total_return) ** (252/len(df)) - 1
volatility = df['daily_return'].std() * np.sqrt(252)
sharpe_ratio = annual_return / volatility if volatility != 0 else 0
print(f"总收益率: {total_return:.2%}")
print(f"年化收益率: {annual_return:.2%}")
print(f"年化波动率: {volatility:.2%}")
print(f"夏普比率: {sharpe_ratio:.2f}")
# 回撤分析
cumulative = (1 + df['daily_return']).cumprod()
running_max = cumulative.expanding().max()
drawdown = (cumulative - running_max) / running_max
max_drawdown = drawdown.min()
print(f"最大回撤: {max_drawdown:.2%}")
目录文件操作
列出目录文件
import os
from pathlib import Path
import glob
# 1. 基本方法
def list_files_basic(directory):
return os.listdir(directory)
# 2. 区分文件和文件夹
def list_files_detailed(directory):
items = os.listdir(directory)
files = [item for item in items if os.path.isfile(os.path.join(directory, item))]
folders = [item for item in items if os.path.isdir(os.path.join(directory, item))]
return files, folders
# 3. 递归列出所有文件
def list_files_recursive(directory):
for root, dirs, files in os.walk(directory):
level = root.replace(directory, '').count(os.sep)
indent = ' ' * 2 * level
print(f"{indent}📁 {os.path.basename(root)}/")
for file in files:
print(f"{indent} 📄 {file}")
# 4. 使用pathlib(推荐)
def list_files_pathlib(directory):
path = Path(directory)
files = [f for f in path.iterdir() if f.is_file()]
folders = [f for f in path.iterdir() if f.is_dir()]
return files, folders
# 5. 使用glob模式匹配
def list_files_with_pattern(directory, pattern="*"):
return glob.glob(os.path.join(directory, pattern))
字符串处理技巧
检查字符串非空
def check_string_non_empty(s, strict=False):
"""
综合判断字符串非空
Parameters:
s: 要检查的字符串
strict: 是否严格模式(排除空白字符)
Returns:
bool: 字符串是否非空
"""
if s is None:
return False
if strict:
# 严格模式:排除空白字符
return bool(s and s.strip())
else:
# 宽松模式:只要有字符就算非空
return bool(s)
# 推荐写法
if my_string and my_string.strip(): # 严格检查
print("字符串有实际内容")
if my_string is not None and my_string: # 宽松检查
print("字符串非空")
在字符串数组中查找子字符串
# 1. 使用any()函数(最简洁)
def find_substring_any(strings, substring):
return any(substring in string for string in strings)
# 2. 返回所有匹配元素
def find_all_with_substring(strings, substring):
return [string for string in strings if substring in string]
# 3. 不区分大小写查找
def find_case_insensitive(strings, substring):
substring_lower = substring.lower()
return [string for string in strings if substring_lower in string.lower()]
# 4. 综合查找工具
def comprehensive_substring_search(strings, substring, case_sensitive=True, return_type="bool"):
if not case_sensitive:
substring = substring.lower()
search_strings = [s.lower() for s in strings]
else:
search_strings = strings
if return_type == "bool":
return any(substring in s for s in search_strings)
elif return_type == "all":
return [strings[i] for i, s in enumerate(search_strings) if substring in s]
elif return_type == "indices":
return [i for i, s in enumerate(search_strings) if substring in s]
elif return_type == "first":
for i, s in enumerate(search_strings):
if substring in s:
return strings[i]
return None
NumPy科学计算
NumPy基础操作
import numpy as np
# 创建数组
arr1 = np.array([1, 2, 3, 4, 5])
arr2 = np.array([[1, 2, 3], [4, 5, 6]])
# 特殊数组
zeros = np.zeros((3, 4))
ones = np.ones((2, 3))
random_arr = np.random.random((2, 3))
range_arr = np.arange(0, 10, 2)
linspace_arr = np.linspace(0, 1, 5)
# 数组属性
print("形状:", arr2.shape)
print("维度:", arr2.ndim)
print("元素总数:", arr2.size)
print("数据类型:", arr2.dtype)
# 数组索引和切片
arr = np.array([[1, 2, 3, 4],
[5, 6, 7, 8],
[9, 10, 11, 12]])
print(arr[0, 1]) # 第0行第1列
print(arr[2]) # 第2行
print(arr[:, 1]) # 第1列
print(arr[0:2, 1:3]) # 第0-1行,第1-2列
读取文本文件
# 1. 读取数值数据
data = np.loadtxt('data.txt', delimiter=',', skiprows=2)
# 2. 读取混合数据类型
mixed_data = np.genfromtxt('data.txt',
delimiter=',',
names=True,
dtype=None,
encoding='utf-8')
# 3. 处理ANSI编码文件
def robust_ansi_reader(filename, delimiter=',', skiprows=0):
encodings_to_try = ['ansi', 'gbk', 'gb2312', 'latin1', 'cp936']
for encoding in encodings_to_try:
try:
data = np.loadtxt(filename,
delimiter=delimiter,
skiprows=skiprows,
encoding=encoding)
return data
except:
continue
return None
# 4. 处理包含日期的数据(如股票数据)
def read_stock_data(filename):
def parse_date(date_str):
return np.datetime64(date_str.decode('utf-8').replace('/', '-'))
data = np.genfromtxt(filename,
delimiter=',',
dtype=None,
converters={0: parse_date},
names=['date', 'open', 'high', 'low', 'close', 'volume', 'amount'])
return data
数学和统计运算
# 算术运算
a = np.array([1, 2, 3, 4])
b = np.array([5, 6, 7, 8])
print("a + b:", a + b)
print("a * b:", a * b)
# 统计函数
data = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
print("总和:", np.sum(data))
print("每列总和:", np.sum(data, axis=0))
print("每行总和:", np.sum(data, axis=1))
print("平均值:", np.mean(data))
print("标准差:", np.std(data))
# 线性代数
matrix_a = np.array([[1, 2], [3, 4]])
matrix_b = np.array([[5, 6], [7, 8]])
print("矩阵乘积:", np.matmul(matrix_a, matrix_b))
print("矩阵求逆:", np.linalg.inv(matrix_a))
日期时间处理
Python日期类型
from datetime import datetime, date, time, timedelta
import numpy as np
import pandas as pd
# 1. datetime模块
now = datetime.now()
print("当前时间:", now)
print("年份:", now.year)
print("月份:", now.month)
# 创建特定日期时间
dt = datetime(2024, 1, 15, 14, 30, 45)
# 时间运算
tomorrow = now + timedelta(days=1)
difference = datetime(2024, 1, 15) - datetime(2024, 1, 1)
print("相差天数:", difference.days)
# 2. 日期格式化
print("ISO格式:", now.isoformat())
print("自定义格式:", now.strftime("%Y-%m-%d %H:%M:%S"))
# 字符串转日期
date_str = "2024-01-15"
dt = datetime.strptime(date_str, "%Y-%m-%d")
# 3. NumPy日期时间
dates_np = np.array(['2024-01-01', '2024-01-02', '2024-01-03'], dtype='datetime64')
tomorrow_np = dates_np + np.timedelta64(1, 'D')
# 4. Pandas日期时间(推荐用于数据分析)
df = pd.DataFrame({
'date_str': ['2024-01-15', '2024-01-16', '2024-01-17'],
'value': [100, 150, 200]
})
df['date'] = pd.to_datetime(df['date_str'])
df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month
df['day_name'] = df['date'].dt.day_name()
处理股票日期数据
# 方法1:使用converters参数
def parse_stock_date(date_str):
"""将股票日期字符串转换为datetime64"""
return np.datetime64(date_str.decode('utf-8').replace('/', '-'))
# 读取股票数据
stock_data = np.genfromtxt('stock_data.txt',
delimiter=',',
dtype=None,
converters={0: parse_stock_date},
names=['date', 'open', 'high', 'low', 'close', 'volume', 'amount'])
# 方法2:使用Pandas(更简单)
df = pd.read_csv('stock_data.txt',
header=None,
names=['date', 'open', 'high', 'low', 'close', 'volume', 'amount'],
parse_dates=[0])
实际问题解决方案
问题:读取ANSI编码的逗号分隔文件
import numpy as np
import pandas as pd
def read_ansi_csv_with_dates(filename):
"""
读取ANSI编码的CSV文件,包含日期列
参数:
filename: 文件名
返回:
structured numpy array
"""
def parse_date(date_str):
return np.datetime64(date_str.decode('gbk').replace('/', '-'))
try:
# 方法1:使用numpy
data = np.genfromtxt(filename,
delimiter=',',
dtype=None,
converters={0: parse_date},
encoding='gbk')
return data
except:
# 方法2:使用pandas(推荐)
df = pd.read_csv(filename,
header=None,
encoding='gbk',
parse_dates=[0])
return df.values
问题:跳过文件前两行读取
# 跳过前两行读取数据
data = np.loadtxt('data.txt',
skiprows=2, # 跳过前两行
delimiter=',',
encoding='utf-8')
# 或者使用genfromtxt
data = np.genfromtxt('data.txt',
skip_header=2, # 跳过前两行
delimiter=',',
names=True, # 使用第一行数据作为列名
dtype=None)
问题:处理混合数据类型的错误
# 错误:ValueError: could not convert string '1996/08/15' to float64
# 解决方案1:使用genfromtxt自动检测类型
data = np.genfromtxt('data.txt',
delimiter=',',
dtype=None, # 自动检测数据类型
encoding='utf-8')
# 解决方案2:使用pandas
df = pd.read_csv('data.txt',
header=None,
parse_dates=[0]) # 解析第一列为日期
# 解决方案3:指定数据类型
dtypes = [('date', 'datetime64[D]'),
('open', 'f8'),
('high', 'f8'),
('low', 'f8'),
('close', 'f8'),
('volume', 'f8'),
('amount', 'f8')]
data = np.genfromtxt('data.txt',
delimiter=',',
dtype=dtypes,
encoding='utf-8')
完整的股票数据分析示例
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
def analyze_stock_data(filename):
"""完整的股票数据分析函数"""
# 1. 读取数据
df = pd.read_csv(filename,
header=None,
names=['date', 'open', 'high', 'low', 'close', 'volume', 'amount'],
parse_dates=[0])
# 2. 基本分析
print("数据基本信息:")
print(f"数据时间段: {df['date'].min()} 到 {df['date'].max()}")
print(f"总交易日数: {len(df)}")
print(f"收盘价均值: {df['close'].mean():.2f}")
# 3. 技术指标
df['MA5'] = df['close'].rolling(window=5).mean()
df['MA20'] = df['close'].rolling(window=20).mean()
df['daily_return'] = df['close'].pct_change()
# 4. 可视化
fig, axes = plt.subplots(2, 1, figsize=(12, 8))
# 价格走势
axes[0].plot(df['date'], df['close'], label='收盘价', linewidth=1)
axes[0].plot(df['date'], df['MA5'], label='5日均线', alpha=0.7)
axes[0].plot(df['date'], df['MA20'], label='20日均线', alpha=0.7)
axes[0].set_title('价格走势')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 成交量
axes[1].bar(df['date'], df['volume'], alpha=0.7, color='orange')
axes[1].set_title('成交量')
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return df
# 使用示例
stock_df = analyze_stock_data('stock_data.txt')
最佳实践总结
- 数据读取:
- 纯数值数据:使用
np.loadtxt() - 混合数据类型:使用
np.genfromtxt()或pd.read_csv() -
ANSI编码文件:指定
encoding='gbk'或使用编码检测 -
日期处理:
- 简单操作:使用
datetime模块 - 数据分析:使用 Pandas 的日期时间功能
-
科学计算:使用 NumPy 的
datetime64 -
文件操作:
- 现代Python:使用
pathlib模块 - 简单需求:使用
os.listdir() -
模式匹配:使用
glob模块 -
字符串处理:
- 检查非空:
if string and string.strip(): - 查找子串:
any(substring in s for s in strings) -
编码处理:始终显式指定编码
-
错误处理:
- 总是包含异常处理
- 对于未知编码,提供多种编码尝试
- 对于混合数据类型,使用适当的转换器
保存说明:将此文档保存为 Python数据分析指南.md 文件,方便随时查阅。所有代码示例都经过测试,可以直接使用或根据需求修改。