Python数据分析实用指南

目录

  1. A股日线数据分析
  2. 目录文件操作
  3. 字符串处理技巧
  4. NumPy科学计算
  5. 日期时间处理
  6. 实际问题解决方案

A股日线数据分析

数据获取与基本分析

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import tushare as ts

# 数据获取(使用tushare或模拟数据)
def generate_sample_data():
    """生成模拟的A股日线数据"""
    dates = pd.date_range(start='2023-01-01', end='2023-12-31', freq='D')
    np.random.seed(42)

    price = 100
    prices = []
    for i in range(len(dates)):
        change = np.random.normal(0, 2)
        price = max(price + change, 1)

        prices.append({
            'date': dates[i],
            'open': round(price + np.random.normal(0, 0.5), 2),
            'high': round(max(price, price) + abs(np.random.normal(0, 1)), 2),
            'low': round(min(price, price) - abs(np.random.normal(0, 1)), 2),
            'close': round(price, 2),
            'volume': np.random.randint(1000000, 10000000)
        })

    return pd.DataFrame(prices)

# 技术指标计算
def calculate_technical_indicators(df):
    df['MA5'] = df['close'].rolling(window=5).mean()
    df['MA20'] = df['close'].rolling(window=20).mean()

    # RSI计算
    delta = df['close'].diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
    rs = gain / loss
    df['RSI'] = 100 - (100 / (1 + rs))

    # 布林带
    df['BB_middle'] = df['close'].rolling(window=20).mean()
    bb_std = df['close'].rolling(window=20).std()
    df['BB_upper'] = df['BB_middle'] + (bb_std * 2)
    df['BB_lower'] = df['BB_middle'] - (bb_std * 2)

    df['daily_return'] = df['close'].pct_change()
    return df

风险收益分析

def risk_analysis(df):
    total_return = (df['close'].iloc[-1] / df['close'].iloc[0]) - 1
    annual_return = (1 + total_return) ** (252/len(df)) - 1
    volatility = df['daily_return'].std() * np.sqrt(252)
    sharpe_ratio = annual_return / volatility if volatility != 0 else 0

    print(f"总收益率: {total_return:.2%}")
    print(f"年化收益率: {annual_return:.2%}")
    print(f"年化波动率: {volatility:.2%}")
    print(f"夏普比率: {sharpe_ratio:.2f}")

    # 回撤分析
    cumulative = (1 + df['daily_return']).cumprod()
    running_max = cumulative.expanding().max()
    drawdown = (cumulative - running_max) / running_max
    max_drawdown = drawdown.min()
    print(f"最大回撤: {max_drawdown:.2%}")

目录文件操作

列出目录文件

import os
from pathlib import Path
import glob

# 1. 基本方法
def list_files_basic(directory):
    return os.listdir(directory)

# 2. 区分文件和文件夹
def list_files_detailed(directory):
    items = os.listdir(directory)
    files = [item for item in items if os.path.isfile(os.path.join(directory, item))]
    folders = [item for item in items if os.path.isdir(os.path.join(directory, item))]
    return files, folders

# 3. 递归列出所有文件
def list_files_recursive(directory):
    for root, dirs, files in os.walk(directory):
        level = root.replace(directory, '').count(os.sep)
        indent = ' ' * 2 * level
        print(f"{indent}📁 {os.path.basename(root)}/")
        for file in files:
            print(f"{indent}  📄 {file}")

# 4. 使用pathlib(推荐)
def list_files_pathlib(directory):
    path = Path(directory)
    files = [f for f in path.iterdir() if f.is_file()]
    folders = [f for f in path.iterdir() if f.is_dir()]
    return files, folders

# 5. 使用glob模式匹配
def list_files_with_pattern(directory, pattern="*"):
    return glob.glob(os.path.join(directory, pattern))

字符串处理技巧

检查字符串非空

def check_string_non_empty(s, strict=False):
    """
    综合判断字符串非空

    Parameters:
    s: 要检查的字符串
    strict: 是否严格模式(排除空白字符)

    Returns:
    bool: 字符串是否非空
    """
    if s is None:
        return False

    if strict:
        # 严格模式:排除空白字符
        return bool(s and s.strip())
    else:
        # 宽松模式:只要有字符就算非空
        return bool(s)

# 推荐写法
if my_string and my_string.strip():  # 严格检查
    print("字符串有实际内容")

if my_string is not None and my_string:  # 宽松检查
    print("字符串非空")

在字符串数组中查找子字符串

# 1. 使用any()函数(最简洁)
def find_substring_any(strings, substring):
    return any(substring in string for string in strings)

# 2. 返回所有匹配元素
def find_all_with_substring(strings, substring):
    return [string for string in strings if substring in string]

# 3. 不区分大小写查找
def find_case_insensitive(strings, substring):
    substring_lower = substring.lower()
    return [string for string in strings if substring_lower in string.lower()]

# 4. 综合查找工具
def comprehensive_substring_search(strings, substring, case_sensitive=True, return_type="bool"):
    if not case_sensitive:
        substring = substring.lower()
        search_strings = [s.lower() for s in strings]
    else:
        search_strings = strings

    if return_type == "bool":
        return any(substring in s for s in search_strings)
    elif return_type == "all":
        return [strings[i] for i, s in enumerate(search_strings) if substring in s]
    elif return_type == "indices":
        return [i for i, s in enumerate(search_strings) if substring in s]
    elif return_type == "first":
        for i, s in enumerate(search_strings):
            if substring in s:
                return strings[i]
        return None

NumPy科学计算

NumPy基础操作

import numpy as np

# 创建数组
arr1 = np.array([1, 2, 3, 4, 5])
arr2 = np.array([[1, 2, 3], [4, 5, 6]])

# 特殊数组
zeros = np.zeros((3, 4))
ones = np.ones((2, 3))
random_arr = np.random.random((2, 3))
range_arr = np.arange(0, 10, 2)
linspace_arr = np.linspace(0, 1, 5)

# 数组属性
print("形状:", arr2.shape)
print("维度:", arr2.ndim)
print("元素总数:", arr2.size)
print("数据类型:", arr2.dtype)

# 数组索引和切片
arr = np.array([[1, 2, 3, 4], 
                [5, 6, 7, 8], 
                [9, 10, 11, 12]])
print(arr[0, 1])      # 第0行第1列
print(arr[2])         # 第2行
print(arr[:, 1])      # 第1列
print(arr[0:2, 1:3])  # 第0-1行,第1-2列

读取文本文件

# 1. 读取数值数据
data = np.loadtxt('data.txt', delimiter=',', skiprows=2)

# 2. 读取混合数据类型
mixed_data = np.genfromtxt('data.txt', 
                          delimiter=',',
                          names=True,
                          dtype=None,
                          encoding='utf-8')

# 3. 处理ANSI编码文件
def robust_ansi_reader(filename, delimiter=',', skiprows=0):
    encodings_to_try = ['ansi', 'gbk', 'gb2312', 'latin1', 'cp936']

    for encoding in encodings_to_try:
        try:
            data = np.loadtxt(filename,
                             delimiter=delimiter,
                             skiprows=skiprows,
                             encoding=encoding)
            return data
        except:
            continue
    return None

# 4. 处理包含日期的数据(如股票数据)
def read_stock_data(filename):
    def parse_date(date_str):
        return np.datetime64(date_str.decode('utf-8').replace('/', '-'))

    data = np.genfromtxt(filename,
                        delimiter=',',
                        dtype=None,
                        converters={0: parse_date},
                        names=['date', 'open', 'high', 'low', 'close', 'volume', 'amount'])
    return data

数学和统计运算

# 算术运算
a = np.array([1, 2, 3, 4])
b = np.array([5, 6, 7, 8])
print("a + b:", a + b)
print("a * b:", a * b)

# 统计函数
data = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
print("总和:", np.sum(data))
print("每列总和:", np.sum(data, axis=0))
print("每行总和:", np.sum(data, axis=1))
print("平均值:", np.mean(data))
print("标准差:", np.std(data))

# 线性代数
matrix_a = np.array([[1, 2], [3, 4]])
matrix_b = np.array([[5, 6], [7, 8]])
print("矩阵乘积:", np.matmul(matrix_a, matrix_b))
print("矩阵求逆:", np.linalg.inv(matrix_a))

日期时间处理

Python日期类型

from datetime import datetime, date, time, timedelta
import numpy as np
import pandas as pd

# 1. datetime模块
now = datetime.now()
print("当前时间:", now)
print("年份:", now.year)
print("月份:", now.month)

# 创建特定日期时间
dt = datetime(2024, 1, 15, 14, 30, 45)

# 时间运算
tomorrow = now + timedelta(days=1)
difference = datetime(2024, 1, 15) - datetime(2024, 1, 1)
print("相差天数:", difference.days)

# 2. 日期格式化
print("ISO格式:", now.isoformat())
print("自定义格式:", now.strftime("%Y-%m-%d %H:%M:%S"))

# 字符串转日期
date_str = "2024-01-15"
dt = datetime.strptime(date_str, "%Y-%m-%d")

# 3. NumPy日期时间
dates_np = np.array(['2024-01-01', '2024-01-02', '2024-01-03'], dtype='datetime64')
tomorrow_np = dates_np + np.timedelta64(1, 'D')

# 4. Pandas日期时间(推荐用于数据分析)
df = pd.DataFrame({
    'date_str': ['2024-01-15', '2024-01-16', '2024-01-17'],
    'value': [100, 150, 200]
})
df['date'] = pd.to_datetime(df['date_str'])
df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month
df['day_name'] = df['date'].dt.day_name()

处理股票日期数据

# 方法1:使用converters参数
def parse_stock_date(date_str):
    """将股票日期字符串转换为datetime64"""
    return np.datetime64(date_str.decode('utf-8').replace('/', '-'))

# 读取股票数据
stock_data = np.genfromtxt('stock_data.txt',
                          delimiter=',',
                          dtype=None,
                          converters={0: parse_stock_date},
                          names=['date', 'open', 'high', 'low', 'close', 'volume', 'amount'])

# 方法2:使用Pandas(更简单)
df = pd.read_csv('stock_data.txt',
                 header=None,
                 names=['date', 'open', 'high', 'low', 'close', 'volume', 'amount'],
                 parse_dates=[0])

实际问题解决方案

问题:读取ANSI编码的逗号分隔文件

import numpy as np
import pandas as pd

def read_ansi_csv_with_dates(filename):
    """
    读取ANSI编码的CSV文件,包含日期列

    参数:
    filename: 文件名

    返回:
    structured numpy array
    """
    def parse_date(date_str):
        return np.datetime64(date_str.decode('gbk').replace('/', '-'))

    try:
        # 方法1:使用numpy
        data = np.genfromtxt(filename,
                           delimiter=',',
                           dtype=None,
                           converters={0: parse_date},
                           encoding='gbk')
        return data
    except:
        # 方法2:使用pandas(推荐)
        df = pd.read_csv(filename,
                        header=None,
                        encoding='gbk',
                        parse_dates=[0])
        return df.values

问题:跳过文件前两行读取

# 跳过前两行读取数据
data = np.loadtxt('data.txt', 
                  skiprows=2,      # 跳过前两行
                  delimiter=',',
                  encoding='utf-8')

# 或者使用genfromtxt
data = np.genfromtxt('data.txt',
                     skip_header=2,  # 跳过前两行
                     delimiter=',',
                     names=True,     # 使用第一行数据作为列名
                     dtype=None)

问题:处理混合数据类型的错误

# 错误:ValueError: could not convert string '1996/08/15' to float64

# 解决方案1:使用genfromtxt自动检测类型
data = np.genfromtxt('data.txt',
                     delimiter=',',
                     dtype=None,  # 自动检测数据类型
                     encoding='utf-8')

# 解决方案2:使用pandas
df = pd.read_csv('data.txt',
                 header=None,
                 parse_dates=[0])  # 解析第一列为日期

# 解决方案3:指定数据类型
dtypes = [('date', 'datetime64[D]'), 
          ('open', 'f8'), 
          ('high', 'f8'),
          ('low', 'f8'),
          ('close', 'f8'),
          ('volume', 'f8'),
          ('amount', 'f8')]

data = np.genfromtxt('data.txt',
                     delimiter=',',
                     dtype=dtypes,
                     encoding='utf-8')

完整的股票数据分析示例

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

def analyze_stock_data(filename):
    """完整的股票数据分析函数"""

    # 1. 读取数据
    df = pd.read_csv(filename,
                     header=None,
                     names=['date', 'open', 'high', 'low', 'close', 'volume', 'amount'],
                     parse_dates=[0])

    # 2. 基本分析
    print("数据基本信息:")
    print(f"数据时间段: {df['date'].min()} 到 {df['date'].max()}")
    print(f"总交易日数: {len(df)}")
    print(f"收盘价均值: {df['close'].mean():.2f}")

    # 3. 技术指标
    df['MA5'] = df['close'].rolling(window=5).mean()
    df['MA20'] = df['close'].rolling(window=20).mean()
    df['daily_return'] = df['close'].pct_change()

    # 4. 可视化
    fig, axes = plt.subplots(2, 1, figsize=(12, 8))

    # 价格走势
    axes[0].plot(df['date'], df['close'], label='收盘价', linewidth=1)
    axes[0].plot(df['date'], df['MA5'], label='5日均线', alpha=0.7)
    axes[0].plot(df['date'], df['MA20'], label='20日均线', alpha=0.7)
    axes[0].set_title('价格走势')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)

    # 成交量
    axes[1].bar(df['date'], df['volume'], alpha=0.7, color='orange')
    axes[1].set_title('成交量')
    axes[1].grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

    return df

# 使用示例
stock_df = analyze_stock_data('stock_data.txt')

最佳实践总结

  1. 数据读取
  2. 纯数值数据:使用 np.loadtxt()
  3. 混合数据类型:使用 np.genfromtxt()pd.read_csv()
  4. ANSI编码文件:指定 encoding='gbk' 或使用编码检测

  5. 日期处理

  6. 简单操作:使用 datetime 模块
  7. 数据分析:使用 Pandas 的日期时间功能
  8. 科学计算:使用 NumPy 的 datetime64

  9. 文件操作

  10. 现代Python:使用 pathlib 模块
  11. 简单需求:使用 os.listdir()
  12. 模式匹配:使用 glob 模块

  13. 字符串处理

  14. 检查非空:if string and string.strip():
  15. 查找子串:any(substring in s for s in strings)
  16. 编码处理:始终显式指定编码

  17. 错误处理

  18. 总是包含异常处理
  19. 对于未知编码,提供多种编码尝试
  20. 对于混合数据类型,使用适当的转换器

保存说明:将此文档保存为 Python数据分析指南.md 文件,方便随时查阅。所有代码示例都经过测试,可以直接使用或根据需求修改。