常用 Python 包的安装

常用的 python 包的安装

pip3 install filelock
pip3 install torch torchvision
pip3 install pandas dask numpy sympy

pip3 install nltk spacy
pip3 install sqlalchemy
pip3 install requests_html beautifulsoup4 lxml
pip3 install 
pip3 install 
pip3 install regexp

pip3 install pillow
pip3 install networkx
pip3 install yt-dlp gallery-dl

ubuntu 下可以用 apt 安装,注意可能有点旧。

用 Python 实现数据分析

相关库的用法见之前的贴文。

Python 数据分析是用 Python 生态完成数据读取、清洗、探索、可视化、建模与报告的全流程,核心是 NumPy + Pandas + Matplotlib/Seaborn + Scikit-learn,配合 Jupyter 做交互式分析。

工具

工具

  • 基础计算:Python 语法(列表 / 字典 / 函数 / 循环)+ NumPy 数组操作。
  • 表格处理:Pandas 全流程(读 / 洗 / 聚 / 合 / 时间序列)。
  • 可视化:Matplotlib 基础 + Seaborn 统计图。
  • 统计模型:Scikit-learn 建模 + 特征工程 + 模型评估。Statsmodels。
  • 开发环境 Jupyter Notebook 做交互式分析
  • 实战:Kaggle 项目、业务数据集、自动化报表
  • 参考资料 Kaggle 案例

常见坑与避坑

  • Pandas 链式赋值警告:用 .copy() 或 .loc 避免
  • Matplotlib 中文乱码:配置字体(plt.rcParams)
  • 数据泄露:先划分训练 / 测试集,再做特征工程
  • 循环低效:优先用 Pandas/NumPy 矢量化,少写 for 循环

流程

数据分析流程

  • 数据采集:读 CSV/Excel/ 数据库 / API → pd.read_csv()、pd.read_excel()、pd.read_sql()
  • 数据清洗(最耗时):
    • 缺失值:df.dropna() / df.fillna()
    • 重复值:df.drop_duplicates()
    • 异常值:筛选 / 截断 / 对数变换
    • 类型转换:df.astype()、pd.to_datetime()
  • 探索性分析(EDA):
    • 描述统计:df.describe()、df.info()
    • 单变量:直方图、箱线图、计数图
    • 多变量:散点图、热力图、分组对比
    • 时间序列:趋势、周期、异常点
    • 统计模型:回归分析、假设检验
  • 特征工程:
    • 特征创建:从日期提取年 / 月 / 周、计算比率 / 差值
    • 编码:One-Hot、Label Encoding
    • 缩放:标准化(StandardScaler)、归一化(MinMaxScaler)
  • 建模与评估(可选):
    • 划分训练 / 测试集
    • 训练模型(回归 / 分类 / 聚类)
    • 评估:准确率、召回率、F1、MAE、RMSE、ROC-AUC
    • 调参:网格搜索、随机搜索
  • 可视化与报告:
    • 静态图:Matplotlib/Seaborn
    • 交互图:Plotly
    • 自动化报告:Jupyter + Markdown、导出 PDF/HTML/Excel

典型场景

场景 1:电商销售分析

  • 目标:看销量 / 销售额趋势、爆款产品、用户复购
import pandas as pd
import matplotlib.pyplot as plt
# 读取数据
df = pd.read_csv("sales_data.csv", parse_dates=["order_date"])
# 按月汇总
df["month"] = df["order_date"].dt.to_period("M")
monthly_sales = df.groupby("month")["total"].sum()
# 绘图
monthly_sales.plot(kind="line", figsize=(12,4), title="月度销售额趋势")
plt.show()
# 爆款产品
top10 = df.groupby("product")["quantity"].sum().nlargest(10)
top10.plot(kind="barh", title="销量Top10产品")

场景 2:用户行为分析

  • 目标:用户活跃度、留存、转化漏斗
  • 工具:Pandas 分组 + Seaborn 分类图 + Plotly 漏斗图
import pandas as pd
import seaborn as sns
import plotly.express as px

# 日活
df = pd.read_csv("user_log.csv", parse_dates=["log_date"])
daily_active = df.groupby("log_date")["user_id"].nunique()
sns.lineplot(data=daily_active).set_title("日活跃用户")

# 转化漏斗
funnel = pd.DataFrame({"stage":["浏览","加购","下单","支付"], "count":[1000,500,300,180]})
px.funnel(funnel, x="count", y="stage", title="转化漏斗").show()

场景 3:金融数据分析

  • 目标:股价趋势、收益率、波动率、相关性
  • 工具:Pandas 时间序列 + NumPy 计算 + Matplotlib 蜡烛图
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import mplfinance as mpf

# 股价趋势
df = pd.read_csv("stock.csv", parse_dates=["date"], index_col="date")
df["close"].plot(title="股价走势")

# 收益率
df["return"] = df["close"].pct_change()
print("年化波动率:", df["return"].std() * np.sqrt(252))

# 蜡烛图
mpf.plot(df.tail(30), type="candle", title="近30日K线")
plt.show()

文本分析

文本数据分析是从非结构化文本中提取价值信息的核心技能,广泛用于评论分析、舆情分析、关键词提取、文本分类等场景。

工具

  • 基础处理:re正则、string字符串
  • 分词/预处理:jieba 中文必备
  • 结构化分析:pandas(数据承载)
  • 词频/可视化:collections、wordcloud
  • 进阶:sklearn 文本向量化、nltk 英文、spacy

实战案例:商品评论分析

文本分析固定流程:清洗 + 分词 + 统计 + 可视化 + 情感分布

  1. 数据准备:我们用商品评论作为示例数据,可以用 excel/csv 中读取。
  2. 文本预处理(最关键步骤) 非结构化文本必须清洗后才能分析: 3. 去除特殊符号、数字、空格 4. 中文分词 5. 去除停用词(的、了、吗、很…)
    • 文本清洗:去掉无用字符
    • 中文分词:jieba 是中文必备工具
    • 停用词过滤:提升分析准确性
  3. 基础文本分析,通过统计和可视化,输出词频、词云、情感分布。
    • 词频统计:找到核心关键词,文本中出现最多的词。输出示例: ` 高频关键词: 拍照: 2 续航: 2 质量: 2 快: 2 产品: 2 不错: 2`
    • 词云可视化:词云图可以直观展示关键词分布。
    • 规则型情感分析:快速判断文本情绪,简单规则(数关键词)判断正面/负面,然后得到分布。
  4. 进阶分析
    • 机器学习文本分类(用 TF-IDF + 朴素贝叶斯/SVM)
    • LDA 主题模型(自动提取文本主题)自动文本主题挖掘。例如: 主题1:续航、拍照、充电 主题2:质量、客服、售后
    • 深度学习情感分析(BERT 模型):比规则/机器学习准确率高
    • 批量爬取评论 + 自动化文本分析
  5. 扩展
    • 指定网站的评论
    • 输出 Word/Excel 分析报告
    • 做成可视化网页(Streamlit)
    • 适配论文/毕设需求

imdb 数据集
统计分析可以增加分类变量。

Python 文本分析
✅ 基础文本分析(清洗/分词/词频/词云/情感)
进阶功能
TF-IDF + 机器学习分类(朴素贝叶斯/SVM)
LDA 自动文本主题挖掘
BERT 深度学习情感分析
批量评论爬虫 + 自动化分析
你能直接获得的 8 大能力

  1. 基础文本处理 清洗、分词、去停用词, 词频统计、词云图
  2. 规则情感分析 简单高效,适合快速判断正负向
  3. TF-IDF + 机器学习分类(你要的!) 朴素贝叶斯 SVM 自动分类、预测新文本
  4. LDA 主题模型(你要的!) 自动从一堆文本里挖出隐藏主题 例如: 主题1:续航、拍照、充电 主题2:质量、客服、售后
  5. BERT 深度学习情感(你要的!) 目前中文最准、工业级情感分析 比规则/机器学习准确率高很多
  6. 批量评论爬虫(你要的!) 可直接扩展到京东、淘宝、小红书、微博
  7. 自动化分析 pipeline 爬取 → 清洗 → 分词 → 主题 → 情感 → 输出报告
  8. 可直接用于作业/毕设/项目 结构完整、注释清晰、可直接提交
    可以
  9. 核心分析:词频统计 找出文本中出现最多的关键词输出示例高频关键词: 拍照: 2 续航: 2 质量: 2 快: 2 产品: 2 不错: 2
  10. 可视化:词云图 直观展示关键词分布。
  11. 进阶:情感倾向判断 简单规则(数关键词)判断正面/负面评论
  12. 主题模型
    总结
  13. Python 文本分析 = 清洗 + 分词 + 统计 + 可视化 + 情感判断
  14. 中文必备工具:jieba
  15. 最常用输出:词频、词云、情感分布
  16. 全套代码可直接替换你的文本数据使用
    需要我把这套代码改成适配你的Excel/CSV文件,或者教你做自动文本分类吗?

完整代码,可直接替换你的文本数据使用。

# 安装相关库
# !pip3 install pandas jieba wordcloud matplotlib
import pandas as pd
import jieba
import re
import matplotlib.pyplot as plt
from collections import Counter
from wordcloud import WordCloud

# ========== 1. 数据 ==========
# 模拟中文评论数据
data = {
    "comment": [
        "这款手机续航超级好,充电快,拍照清晰,非常推荐!",
        "质量太差了,用了三天就坏了,客服态度还很差",
        "物流很快,包装完好,产品和描述一致,满意",
        "不好用,卡顿严重,发热明显,不建议购买",
        "性价比很高,功能齐全,颜值也高,大爱",
        "差差差!售后无人回应,问题得不到解决",
        "续航一般,但是拍照效果很棒,整体还行",
        "发货速度快,产品质量不错,会回购",
        "系统流畅,手感舒服,值得入手",
        "垃圾产品,完全不值这个价格,踩雷了"
    ]
}
df = pd.DataFrame(data)

# ========== 2. 文本预处理 ==========

# 文本清洗函数
def clean_text(text):
	# 只保留中文
    return re.sub(r'[^\u4e00-\u9fa5]', '', text)
    
# 定义停用词(可扩展)
stop_words = {"的", "了", "是", "很", "也", "都", "就", "不", "还", "超级", "非常", "这个", "我"}

# 批量处理数据
df["clean"] = df["comment"].apply(clean_text) # 清洗
df["cut"] = df["clean"].apply(jieba.lcut) # 分词
df["filtered"] = df["cut"].apply(lambda x: [w for w in x if w not in stop_words and len(w) > 1]) # 去停用词

# 查看结果
print(df[["comment", "filtered"]].head())

# ========== 3. 词频统计 ==========

# 合并所有分词
all_words = [w for words in df["filtered"] for w in words]

# 统计词频 TOP10
top10 = Counter(all_words).most_common(10)
print("=== TOP10 关键词 ===")
for w, c in top10:
    print(w, c)

# ========== 4. 词云 ==========
# 拼接成字符串
text = " ".join(all_words)
# 生成词云
wc = WordCloud(font_path="SimHei.ttf", background_color="white", width=800, height=600).generate(text)
# 显示
plt.figure(figsize=(10,7))
plt.imshow(wc)
plt.axis("off")
plt.show()

# ========== 5. 情感分析 ==========
# 定义正面/负面词库
positive_words = {"好","推荐","满意","不错","值得","快","清晰","流畅","回购"}
negative_words = {"差","坏","垃圾","卡顿","发热","踩雷","不值"}

# 情感判断函数
def sentiment(words):
    p = sum(1 for w in words if w in positive_words)
    n = sum(1 for w in words if w in negative_words)
    return "正面" if p>n else "负面" if n>p else "中性"
    
# 应用
df["sentiment"] = df["filtered"].apply(sentiment)

# 统计结果
print("\n=== 情感分析结果 ===")
print(df[["comment","sentiment"]])
print("\n=== 情感分布 ===")
print(df["sentiment"].value_counts())

进阶:

  • 以分析 imdb 数据为例
  • from sklearn.datasets import load_20newsgroups
  • from datasets import load_dataset imdb = load_dataset(“imdb”) # 直接在 sklearn 流水线中使用
# !pip install pandas jieba wordcloud matplotlib scikit-learn gensim transformers torch requests beautifulsoup4
import pandas as pd
import jieba
import re
import matplotlib.pyplot as plt
from collections import Counter
from wordcloud import WordCloud

# --------------------------
# 1. 基础数据(10条评论)
# --------------------------
data = {
    "comment": [
        "这款手机续航超级好,充电快,拍照清晰,非常推荐!",
        "质量太差了,用了三天就坏了,客服态度还很差",
        "物流很快,包装完好,产品和描述一致,满意",
        "不好用,卡顿严重,发热明显,不建议购买",
        "性价比很高,功能齐全,颜值也高,大爱",
        "差差差!售后无人回应,问题得不到解决",
        "续航一般,但是拍照效果很棒,整体还行",
        "发货速度快,产品质量不错,会回购",
        "系统流畅,手感舒服,值得入手",
        "垃圾产品,完全不值这个价格,踩雷了"
    ]
}
df = pd.DataFrame(data)

# --------------------------
# 2. 文本预处理(清洗+分词+去停用词)
# --------------------------
def clean(text):
    return re.sub(r'[^\u4e00-\u9fa5]', '', text)

df['clean'] = df['comment'].apply(clean)
df['cut'] = df['clean'].apply(lambda x: jieba.lcut(x))

stop_words = {"的","了","是","很","也","都","就","不","还","超级","非常","这个","我","用","得","这"]
df['words'] = df['cut'].apply(lambda x: [w for w in x if w not in stop_words and len(w)>1])

# --------------------------
# 3. 词频统计 + 词云
# --------------------------
all_words = [w for words in df['words'] for w in words]
top10 = Counter(all_words).most_common(10)
print("===== 高频词 =====")
for w,c in top10:
    print(w, c)

# 词云
text = " ".join(all_words)
wc = WordCloud(font_path="SimHei.ttf", background_color="white", width=800, height=600).generate(text)
plt.figure(figsize=(10,6))
plt.imshow(wc)
plt.axis('off')
plt.show()

# --------------------------
# 4. 规则情感分析
# --------------------------
positive = {"好","推荐","满意","不错","值得","快","清晰","流畅","回购"}
negative = {"差","坏","垃圾","卡顿","发热","踩雷","不值"}

def sent(ws):
    p = sum(1 for w in ws if w in positive)
    n = sum(1 for w in ws if w in negative)
    return "正面" if p>n else "负面" if n>p else "中性"

df['rule_sent'] = df['words'].apply(sent)
print("\n===== 规则情感结果 =====")
print(df[['comment','rule_sent']])

# --------------------------
# 【进阶1】TF-IDF + 机器学习分类(朴素贝叶斯 / SVM)
# --------------------------
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score

# 构造训练数据
df_train = df.copy()
df_train['label'] = df_train['rule_sent'].map({"正面":1,"负面":0,"中性":2})
corpus = [" ".join(ws) for ws in df_train['words']]

# TF-IDF向量化
tfidf = TfidfVectorizer()
X = tfidf.fit_transform(corpus)
y = df_train['label']

# 朴素贝叶斯
nb = MultinomialNB()
nb.fit(X, y)
pred_nb = nb.predict(X)

# SVM
svm = SVC()
svm.fit(X, y)
pred_svm = svm.predict(X)

print("\n===== 机器学习分类准确率 =====")
print("朴素贝叶斯准确率:", accuracy_score(y, pred_nb))
print("SVM 准确率:", accuracy_score(y, pred_svm))

# 测试新句子
def predict_text(text):
    t = clean(text)
    ws = jieba.lcut(t)
    ws = [w for w in ws if w not in stop_words and len(w)>1]
    vec = tfidf.transform([" ".join(ws)])
    return "正面" if nb.predict(vec)[0]==1 else "负面" if nb.predict(vec)[0]==0 else "中性"

print("\n===== 机器学习预测 =====")
print("测试:拍照超清晰,续航很强 →", predict_text("拍照超清晰,续航很强"))
print("测试:经常死机,发热严重 →", predict_text("经常死机,发热严重"))

# --------------------------
# 【进阶2】LDA 主题模型(自动提取文本主题)
# --------------------------
from gensim import corpora, models
import warnings
warnings.filterwarnings('ignore')

# 构建词典
dictionary = corpora.Dictionary(df['words'])
corpus_lda = [dictionary.doc2bow(ws) for ws in df['words']]

# LDA训练(2个主题)
lda = models.LdaModel(corpus_lda, num_topics=2, id2word=dictionary, passes=10)

print("\n===== LDA 自动提取主题 =====")
for i, topic in lda.print_topics(num_words=5):
    print(f"主题 {i+1}: {topic}")

# --------------------------
# 【进阶3】BERT 深度学习情感分析
# --------------------------
from transformers import pipeline
print("\n===== BERT 深度学习情感分析 =====")
bert = pipeline("sentiment-analysis", model="uer/roberta-base-finetuned-chinanews-chinese")

for c in df['comment'][:5]:
    res = bert(c)[0]
    print(f"文本:{c}")
    print(f"BERT结果:{res['label']} | 置信度:{res['score']:.2f}\n")

# --------------------------
# 【进阶4】批量爬取评论 + 自动化分析
# --------------------------
import requests
from bs4 import BeautifulSoup

def crawl_demo():
    print("\n===== 开始批量爬取评论(演示) =====")
    # 模拟爬取(可替换成真实电商/论坛接口)
    comments = [
        "这个商品真的很好用,质量很棒",
        "太差了,用一次就坏了,不推荐",
        "物流很快,包装完好,非常满意",
        "一般般,性价比不高",
        "超级喜欢,下次还会买"
    ]
    # 自动分析
    for i, c in enumerate(comments,1):
        res = bert(c)[0]
        print(f"评论{i}{c}")
        print(f"情感:{res['label']} | 置信度:{res['score']:.2f}\n")

crawl_demo()

print("\n===== 全套文本分析运行完成!=====")

你需要我定制成你的课程作业/毕业设计版本吗?

金融数据分析

会用到一些统计学的包

回归分析可以发现相关性。

import pandas as pd
import statsmodels.api as sm

# 加载数据
data = pd.DataFrame({
    'income': [10000, 15000, 20000, 25000, 30000],
    'consumption': [8000, 11000, 14000, 16000, 18000]
})

# 使用公式语法进行回归分析
model = sm.OLS.from_formula('consumption ~ income', data=data).fit()
print(model.summary())

结果是

                            OLS Regression Results                            
==============================================================================
Dep. Variable:            consumption   R-squared:                       0.989
Model:                            OLS   Adj. R-squared:                  0.985
Method:                 Least Squares   F-statistic:                     267.9
Date:                Thu, 30 Apr 2026   Prob (F-statistic):           0.000496
Time:                        10:47:44   Log-Likelihood:                -36.718
No. Observations:                   5   AIC:                             77.44
Df Residuals:                       3   BIC:                             76.66
Df Model:                           1                                         
Covariance Type:            nonrobust                                         
==============================================================================
                coef    std err          t      P>|t|      [0.025      0.975]
------------------------------------------------------------------------------
Intercept   3400.0000    648.074      5.246      0.013    1337.539    5462.461
income         0.5000      0.031     16.366      0.000       0.403       0.597
==============================================================================
Omnibus:                          nan   Durbin-Watson:                   1.429
Prob(Omnibus):                    nan   Jarque-Bera (JB):                0.375
Skew:                           0.344   Prob(JB):                        0.829
Kurtosis:                       1.847   Cond. No.                     6.36e+04
==============================================================================

Notes:
[1] Standard Errors assume that the covariance matrix of the errors is correctly specified.
[2] The condition number is large, 6.36e+04. This might indicate that there are strong multicollinearity or other numerical problems.

结果解读

  • R² = 0.989:模型解释了消费变异的98.9%,拟合效果极好。
  • Adj. R² = 0.985:调整后R²仍很高,无过度拟合。
  • F检验:F=267.9,p=0.0005,收入对消费有显著线性影响。
  • 截距 (3400):p=0.013,显著。
  • 收入系数 (0.50):p=0.000,收入每增1单位,消费增0.5单位。
  • Durbin-Watson = 1.429:残差自相关问题不严重。
  • 正态性检验:Omnibus和Jarque-Bera p值均不显著(0.829),残差近似正态。
  • 条件数 = 6.36e+04极大,可能存在严重多重共线性或数值问题。
  • 样本量 = 5:极小,统计推断可靠性有限。

用 R 里的示例数据

import numpy as np
import statsmodels.api as sm
import statsmodels.formula.api as smf

# Load data
dat = sm.datasets.get_rdataset("Guerry", "HistData").data

# Fit regression model (using the natural log of one of the regressors)
results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=dat).fit()

# Inspect the results
print(results.summary())

有时候用因果推断来评估政策效果。比如用 PSM,IV,DID,RDD。

调查问卷分析

调查问卷信度+效度分析

  • 信度:Cronbach’s α 系数
  • 效度:KMO检验、Bartlett球形度检验、探索性因子分析EFA

只需要把问卷数据换成自己的就能跑。

# pip3 install pandas numpy scipy scikit-learn factor-analyzer

import pandas as pd
import numpy as np
from scipy.stats import chi2
from factor_analyzer import FactorAnalyzer
from factor_analyzer.factor_analyzer import calculate_bartlett_sphericity, calculate_kmo

# ===================== 1. 加载数据 =====================
# 替换成你的问卷数据(Excel/CSV)
# 数据格式:每一列是一个题项,每一行是一个受访者
df = pd.read_excel("问卷数据.xlsx")  # 或 pd.read_csv("问卷数据.csv")

# 只保留量表题项(删除姓名、性别等无关列)
# 假设你的题项从第1列到第20列,自行修改
items = df.iloc[:, 0:20]  

# 去除缺失值
items = items.dropna()
print("数据维度(样本数, 题项数):", items.shape)

# ===================== 2. 信度分析:Cronbach α系数 =====================
def cronbach_alpha(df):
    # 计算题项数
    k = df.shape[1]
    # 计算方差
    var = df.var(axis=0, ddof=1).sum()
    # 计算总分方差
    total_var = df.sum(axis=1).var(ddof=1)
    # 计算α系数
    alpha = (k / (k - 1)) * (1 - var / total_var)
    return alpha

alpha = cronbach_alpha(items)
print("\n========== 信度分析 (Cronbach's α) ==========")
print(f"α系数 = {alpha:.3f}")
print("判定标准:>0.9 极好;0.8~0.9 很好;0.7~0.8 可接受;0.6~0.7 一般;<0.6 差")

# ===================== 3. 效度分析 =====================
print("\n========== 效度分析 ==========")

# 3.1 KMO检验(因子分析适用性)
kmo_all, kmo_model = calculate_kmo(items)
print(f"KMO值 = {kmo_model:.3f}")
print("判定标准:>0.8 非常适合;0.7~0.8 适合;0.6~0.7 尚可;<0.6 不适合")

# 3.2 Bartlett球形度检验
chi_square_value, p_value = calculate_bartlett_sphericity(items)
print(f"Bartlett χ² = {chi_square_value:.3f}, p值 = {p_value:.4f}")
print("判定标准:p < 0.05 说明数据适合做因子分析")

# 3.3 探索性因子分析 EFA(检验结构效度)
fa = FactorAnalyzer(n_factors=1, rotation='varimax')
fa.fit(items)

# 因子载荷量
loadings = fa.loadings_
loadings_df = pd.DataFrame(loadings, columns=['因子载荷'], index=items.columns)
print("\n========== 因子载荷(结构效度) ==========")
print(loadings_df.round(3))
print("判定标准:载荷 > 0.5 说明题项有效;>0.7 非常好")

# 特征值与方差解释
ev, v = fa.get_eigenvalues()
print("\n========== 方差解释 ==========")
print(f"特征值:{ev[0]:.3f}")
print(f"方差解释率:{v[0]:.2f}%")
print("判定标准:累计方差解释率 > 60% 效度良好")

三、使用说明(超简单)

  1. 准备数据
    • 每一列 = 一个问卷题项
    • 每一行 = 一个受访者
    • 只保留1-5分的量表题,删除性别、年龄、开放题等
  2. 修改代码里的文件路径和题项范围
    df = pd.read_excel("你的文件名.xlsx")
    items = df.iloc[:, 0:20]  # 改成你的题项列
    
  3. 直接运行,自动输出所有结果

结果判定标准

  1. 信度(Cronbach’s α)
    • α ≥ 0.9:极好
    • 0.8 ≤ α < 0.9:很好
    • 0.7 ≤ α < 0.8:可接受
    • 0.6 ≤ α < 0.7:一般
    • α < 0.6:差,需要修改问卷
  2. 效度
    • KMO ≥ 0.6:适合做因子分析
    • Bartlett p < 0.05:数据有效
    • 因子载荷 ≥ 0.5:题项有效
    • 累计方差解释率 ≥ 60%:结构效度良好

五、我可以帮你定制
如果你把问卷数据(Excel)发给我,我可以:

  1. 帮你直接跑通结果
  2. 按你的问卷维度(比如多个子量表)分别计算信效度
  3. 生成可直接写进论文的结果文本

需要我帮你处理你的真实问卷数据吗?

科学计算

工具

  • NumPy 数值计算,矩阵运算。
  • SciPy 在 NumPy 基础上提供专业科学算法。 常用模块有积分、微分方程 优化、拟合,信号处理、图像处理,统计分布。
  • SymPy 符号计算(解方程、求导、积分公式推导)
  • Statsmodels 统计建模、回归分析、假设检验
  • Scikit-learn 机器学习(分类、回归、聚类)
  • TensorFlow/PyTorch 深度学习、神经网络

最优化,可以带约束。支持很多不同算法,包括差分进化、模拟退火这样的全局启发算法。

from scipy.optimize import minimize

def f(x):
    return x**2 + 2*x + 5

# 初始猜测值
x0 = [0]
res = minimize(f, x0)

print("极小值点 x =", res.x[0])
print("极小值 f(x) =", res.fun)

线性规划。整数线性规划用其他库。

from scipy.optimize import linprog

c = [1, 2]
A = [[1, 1]]
b = [5]
x_bounds = [(0, None), (0, None)]

res = linprog(c, A_ub=A, b_ub=b, bounds=x_bounds)
print(res.x)

回归分析
假设验证
时序分析

大数据分析

当数据量很大时,需要做一些改进。
比如改用 spark + mllib。

用 Python 实现 Web 开发

sqlalchemy

  • 连接 sqlite3 engine = create_engine(f"sqlite:///db.sqlite3")
  • 注意版本 api 变化,会影响兼容性。
from sqlalchemy import create_engine, text

# CREATE USER 'testuser'@'localhost' IDENTIFIED BY 'Test@123456';
# 连接字符串:mysql+pymysql://<用户名>:<密码>@<数据库地址>:<端口>/<数据库名>
# 数据库名为 test_db
engine = create_engine('mysql+pymysql://testuser:Test%40123456@localhost:3306/test_db')


with engine.connect() as conn:
    create_table_sql = """
        CREATE TABLE IF NOT EXISTS user_table (
            id INT AUTO_INCREMENT PRIMARY KEY,
            name VARCHAR(50) NOT NULL,
            age INT,
            email VARCHAR(100) UNIQUE NOT NULL,
            create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
    """
    conn.execute(text(create_table_sql))

    insert_sql1 = text("INSERT INTO user_table (name, age, email) VALUES (:name, :age, :email)")
    conn.execute(insert_sql1, {"name": "张三", "age": 25, "email": "zhangsan@test.com"})
    
    conn.commit()

redis

命令行工具 redis-cli

import redis

# 创建 Redis 连接
r = redis.Redis(host='localhost', port=6379, db=0)

# 设置 Redis 数据
r.set('foo', 'bar')

# 获取 Redis 数据
value = r.get('foo')
print(value.decode('utf-8'))    # 输出 'bar'

通常当 kv 缓存使用。

也可以当队列使用,注意不一定可靠。

消息队列

最简单的任务队列实现是线程池。

用于一些超长的任务(视频转换),跨服务的任务(而用RPC),流量有波动的任务(瞬时大量订单)。以及分布式事务、广播。实现异步调用。
消息通知用消息队列用户可以界面上不用等待,让消息提示完成。
结果存数据库,或者推送。
是否有消息放丢失的机制,持久化,确认。

Celery 基于 rabbitmq 或者 redis。RabbitMQ(pika/amqpstorm)则更底层一些。

sudo apt-get install rabbitmq-server
celery -A tasks worker --loglevel=INFO
from celery import Celery

app = Celery('hello', broker='amqp://guest@localhost//')

@app.task
def hello():
    return 'hello world'

if __name__ == '__main__':
    result = hello.delay()
from tasks import add
add.delay(4, 4)
# 导入 Celery 类
from celery import Celery
import time

# 1. 初始化 Celery 实例
# broker:消息代理(Redis 地址)
# backend:结果存储(Redis 地址)
app = Celery(
    'celery_demo',  # 任务名称(自定义)
    broker='redis://localhost:6379/0',  # Redis 第0个数据库作为消息代理
    backend='redis://localhost:6379/1'  # Redis 第1个数据库存储任务结果
)

# 2. 定义异步任务
@app.task  # 装饰器标记为 Celery 任务
def add(x, y):
    """简单的加法任务,模拟耗时操作"""
    time.sleep(2)  # 模拟任务耗时(比如网络请求、数据处理)
    return x + y

@app.task
def multiply(x, y):
    """乘法任务"""
    time.sleep(1)
    return x * y

# 3. 测试代码(可选,仅用于演示调用方式)
if __name__ == '__main__':
    # 方式1:异步调用任务(非阻塞)
    result1 = add.delay(10, 20)
    result2 = multiply.delay(5, 6)
    
    # 打印任务ID
    print(f"加法任务ID: {result1.id}")
    print(f"乘法任务ID: {result2.id}")
    
    # 方式2:获取任务结果(会阻塞直到任务完成)
    print(f"加法任务结果: {result1.get()}")  # 输出 30
    print(f"乘法任务结果: {result2.get()}")  # 输出 30

web 框架

python 的 web 框架 flask, fastapi,django

flask

只负责 web 页面,数据库使用其他组件。
运行 ...

from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/')
def hello():
    return 'Hello World!'

steamlit

steamlit 实现一个简单的数据分析的界面

import streamlit as st
import numpy as np

# 输出,可以是文本,或者 dataframe。
st.write('Hello World')


# 输入组件
st.button('Click me')
st.checkbox('Check me')
st.radio('Choose a number', [1, 2, 3])


st.text_input('Enter your name', '')


python 爬虫

  • 用 request 或者 requests_html
  • 请求头
  • cookie 会变化,通常用来防止爬虫。属于 http 协议,不需要 js 支持。这个 request 支持,用 session。
  • bs4 用 lxml 能解析一些出错的 html。
  • 注意检查文件长度,和文件头对比。 response.headers.get(“Content-Length”))。有时候会接收不完整。
  • 原子写,防止异常或断电。
  • 并行可以提升速度。
  • 模拟浏览器,用。性能不好。
  • 注意设置超时时间,否则会一直卡住。
  • 注意偶尔会得到异常数据。检查内容。
  • 请求头格式不要传 br 否则 requests 不能自动解压缩。
  • 从浏览器读取 cookies 用 库。
  • 防止硬盘满或者写入失败。

分布式

一致性

  • 分布式锁 redlock-py
  • 分布式事务 两阶段提交 (2PC)

分布式事务

分布式事务的实现,从易到难主要有:

  • 两阶段提交 (2PC):强一致性,但性能差、易阻塞
  • 补偿事务 (TCC):柔性事务,性能好但开发成本高
  • 本地消息表:基于消息队列的最终一致性方案
  • SAGA 模式:长事务的补偿方案

典型应用场景

  • 跨数据库的事务操作(如订单库扣库存 + 支付库扣余额);
  • 数据库与消息队列的一致性保证(如下单成功后发送消息通知);
  • 金融、电商等对数据一致性要求极高的核心业务场景。

分布式数据库

  • OceanBase 靠多副本 + Paxos 协议,保证任何单台机器挂掉,数据不丢、服务不断。
  • MySQL 主从架构,分库分表,中间件。

两阶段提交 2PC

  • 步骤

代码解释
get_db_connection:上下文管理器,确保数据库连接正确关闭
TwoPhaseCommit 类:
prepare 方法:预执行所有操作,不提交,仅验证是否可执行
commit 方法:确认所有操作可执行后,统一提交
rollback 方法:任何环节失败,回滚所有操作
test_distributed_transaction:封装完整的 2PC 流程,处理异常并回滚

import pymysql
from contextlib import contextmanager

# 数据库连接配置
DB_CONFIG1 = {
    'host': 'localhost',
    'user': 'root',
    'password': 'your_password',
    'database': 'db1'
}

DB_CONFIG2 = {
    'host': 'localhost',
    'user': 'root',
    'password': 'your_password',
    'database': 'db2'
}

@contextmanager
def get_db_connection(config):
    """获取数据库连接(上下文管理器)"""
    conn = None
    try:
        conn = pymysql.connect(**config)
        yield conn
    except Exception as e:
        if conn:
            conn.rollback()
        raise e
    finally:
        if conn:
            conn.close()

class TwoPhaseCommit:
    def __init__(self, conn1, conn2):
        self.conn1 = conn1
        self.conn2 = conn2
        self.prepare_success = False

    def prepare(self, amount):
        """第一阶段:准备阶段(预提交)"""
        try:
            # 1. 操作db1:扣减账户1的余额
            cursor1 = self.conn1.cursor()
            cursor1.execute("UPDATE account SET balance = balance - %s WHERE id = 1", (amount,))
            # 不提交,仅预检查

            # 2. 操作db2:增加账户2的余额
            cursor2 = self.conn2.cursor()
            cursor2.execute("UPDATE account SET balance = balance + %s WHERE id = 2", (amount,))
            # 不提交,仅预检查

            # 检查是否都可以提交
            self.prepare_success = True
            print("准备阶段成功,等待提交")
        except Exception as e:
            self.prepare_success = False
            raise Exception(f"准备阶段失败: {e}")

    def commit(self):
        """第二阶段:提交"""
        if not self.prepare_success:
            raise Exception("准备阶段未成功,无法提交")
        
        try:
            self.conn1.commit()
            self.conn2.commit()
            print("提交阶段成功,分布式事务完成")
        except Exception as e:
            # 提交失败,回滚所有操作
            self.conn1.rollback()
            self.conn2.rollback()
            raise Exception(f"提交阶段失败: {e}")

    def rollback(self):
        """回滚所有操作"""
        try:
            self.conn1.rollback()
            self.conn2.rollback()
            print("回滚成功")
        except Exception as e:
            raise Exception(f"回滚失败: {e}")

# 测试分布式事务
def test_distributed_transaction(amount):
    with get_db_connection(DB_CONFIG1) as conn1, get_db_connection(DB_CONFIG2) as conn2:
        # 关闭自动提交
        conn1.autocommit(False)
        conn2.autocommit(False)
        
        tpc = TwoPhaseCommit(conn1, conn2)
        try:
            # 第一阶段:准备
            tpc.prepare(amount)
            # 第二阶段:提交
            tpc.commit()
        except Exception as e:
            # 任何阶段失败,回滚
            tpc.rollback()
            print(f"分布式事务失败: {e}")

# 执行测试(转账100元)
if __name__ == "__main__":
    test_distributed_transaction(100)

本地消息表 + 消息队列
代码解释
本地消息表:与业务操作在同一本地事务中,确保消息一定会被保存
消息队列:异步发送消息,执行远程操作
补偿机制:定时扫描未处理的消息,重试发送,保证最终一致性

import pymysql
import redis
import time
from contextlib import contextmanager

# 数据库配置(单库,实际可扩展为多库)
DB_CONFIG = {
    'host': 'localhost',
    'user': 'root',
    'password': 'your_password',
    'database': 'test'
}

# Redis 配置(消息队列)
REDIS_CONFIG = {
    'host': 'localhost',
    'port': 6379,
    'db': 0
}

# 创建本地消息表
def init_message_table():
    with get_db_connection(DB_CONFIG) as conn:
        cursor = conn.cursor()
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS local_message (
                id INT AUTO_INCREMENT PRIMARY KEY,
                business_id VARCHAR(64) NOT NULL,
                content JSON NOT NULL,
                status TINYINT DEFAULT 0,  -- 0:待发送 1:已发送 2:已确认
                create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
                update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
            )
        """)
        conn.commit()

@contextmanager
def get_db_connection(config):
    conn = None
    try:
        conn = pymysql.connect(**config)
        yield conn
    finally:
        if conn:
            conn.close()

class LocalMessageTransaction:
    def __init__(self):
        self.redis_client = redis.Redis(**REDIS_CONFIG)
        self.queue_name = "distributed_transaction_queue"

    def save_message(self, conn, business_id, content):
        """保存本地消息(与业务操作在同一事务)"""
        cursor = conn.cursor()
        cursor.execute(
            "INSERT INTO local_message (business_id, content, status) VALUES (%s, %s, 0)",
            (business_id, content)
        )

    def execute_business(self, amount):
        """执行本地业务操作 + 保存消息"""
        business_id = f"transfer_{int(time.time())}"
        content = f'amount: {amount}, target_account: 2'

        with get_db_connection(DB_CONFIG) as conn:
            conn.autocommit(False)
            try:
                # 1. 执行本地业务(扣减余额)
                cursor = conn.cursor()
                cursor.execute("UPDATE account SET balance = balance - %s WHERE id = 1", (amount,))
                
                # 2. 保存本地消息(与业务操作同事务)
                self.save_message(conn, business_id, content)
                
                # 3. 提交本地事务
                conn.commit()
                print("本地业务执行成功,消息已保存")
                
                # 4. 发送消息到队列(即使失败,有补偿机制)
                self.send_message(business_id, content)
            except Exception as e:
                conn.rollback()
                raise Exception(f"本地事务失败: {e}")

    def send_message(self, business_id, content):
        """发送消息到Redis队列"""
        try:
            self.redis_client.lpush(self.queue_name, f"{business_id}:{content}")
            # 更新消息状态为已发送
            with get_db_connection(DB_CONFIG) as conn:
                cursor = conn.cursor()
                cursor.execute(
                    "UPDATE local_message SET status = 1 WHERE business_id = %s",
                    (business_id,)
                )
                conn.commit()
            print(f"消息 {business_id} 发送成功")
        except Exception as e:
            print(f"消息发送失败: {e}")

    def consume_message(self):
        """消费消息,执行远程操作"""
        while True:
            # 阻塞获取消息
            message = self.redis_client.brpop(self.queue_name, timeout=5)
            if not message:
                continue
            
            _, msg = message
            business_id, content = msg.decode().split(":", 1)
            try:
                # 模拟执行远程操作(如更新db2的账户余额)
                print(f"执行远程操作: {content}")
                # 这里可以替换为实际的远程调用逻辑
                
                # 更新消息状态为已确认
                with get_db_connection(DB_CONFIG) as conn:
                    cursor = conn.cursor()
                    cursor.execute(
                        "UPDATE local_message SET status = 2 WHERE business_id = %s",
                        (business_id,)
                    )
                    conn.commit()
                print(f"消息 {business_id} 处理完成")
            except Exception as e:
                # 处理失败,重新入队(或记录异常)
                self.redis_client.lpush(self.queue_name, msg)
                print(f"消息 {business_id} 处理失败,重新入队: {e}")

    def compensate(self):
        """补偿机制:定时重试未发送/处理失败的消息"""
        while True:
            with get_db_connection(DB_CONFIG) as conn:
                cursor = conn.cursor()
                # 查询状态为0(待发送)或1(已发送但未确认)的消息
                cursor.execute("""
                    SELECT business_id, content FROM local_message 
                    WHERE status IN (0, 1) AND update_time < DATE_SUB(NOW(), INTERVAL 5 MINUTE)
                """)
                messages = cursor.fetchall()
                
                for business_id, content in messages:
                    # 重新发送消息
                    self.send_message(business_id, content)
                    print(f"补偿消息 {business_id} 发送成功")
            
            # 每5分钟检查一次
            time.sleep(300)

# 测试代码
if __name__ == "__main__":
    # 初始化消息表
    init_message_table()
    
    # 创建事务实例
    transaction = LocalMessageTransaction()
    
    # 1. 执行本地业务(转账100元)
    transaction.execute_business(100)
    
    # 2. 启动消费者(实际部署为独立进程)
    # import threading
    # consumer_thread = threading.Thread(target=transaction.consume_message)
    # consumer_thread.daemon = True
    # consumer_thread.start()
    
    # 3. 启动补偿机制(实际部署为独立进程)
    # compensate_thread = threading.Thread(target=transaction.compensate)
    # compensate_thread.daemon = True
    # compensate_thread.start()
    
    # 保持主线程运行
    # while True:
    #     time.sleep(1)

分布式计算

dask

  • 代替 pandas import dask.dataframe as dd 注意不能原地修改,注意 IO 性能消耗会非常非常大。
  • 代替 mapreduce import dask.bag as db 可以使用 map 来并行。

Linux

ubuntu

  • apt install apt-xapian-index 用来给 提供搜索功能
  • snap install

python redis
有些软件可以用命令行调用,也可以在程序中调用。

多任务 C-z fg bg ps 可以实现对任务切换

命令行工具

  • mc 文件管理器
  • htop 查看 cpu、内存占用情况
  • du 查看硬盘占用情况
  • ncdu

GUI 工具

  • git 工具
  • 文本比较
  • 远程
  • 硬盘占用
  • 重复文件

zsh 推荐两个插件

  • 语法高亮
  • 自动补全

ssh

  • 登录 ssh -p 端口号 用户名@服务器IP
  • 服务器可以用

tmux 用来远程时保持进程,也可以用来实现多窗口

  • 启动
    • 新建会话 tmux new -s 会话名:
    • 重新接入会话 tmux a / tmux attach:
    • 列出所有会话 tmux ls:
  • 快捷键 以 Ctrl+b 开头
    • 新建窗口 c;切窗口 n/p/数字
    • 分屏 %(左右)、”(上下);切窗格 方向键
    • 最大化窗格 z;关窗格 x
    • 切换会话 s ,分离:d
    • 复制 [ 空格 回车,粘贴 ] 或者用鼠标
  • 也可以用 screen。
  • 鼠标模式需要去配置中开启

shell

busybox

常见命令:

 . : [ [[ alias break cd chdir command continue echo eval exec exit export false getopts hash help history jobs kill let local printf pwd read readonly return set shift source test times title trap true type ulimit umask unalias unset wait [ [[ ar arch ascii ash awk base32 base64 basename bash bc bunzip2 busybox bzcat bzip2 cal cat cdrop chattr chmod cksum clear cmp comm cp cpio crc32 cut date dc dd df diff dirname dos2unix dpkg dpkg-deb drop du echo ed egrep env expand expr factor false fgrep find flock fold free fsync ftpget ftpput getopt grep groups gunzip gzip hd head hexdump httpd iconv id inotifyd install ipcalc jn kill killall lash less link ln logname ls lsattr lzcat lzma lzop lzopcat make man md5sum mkdir mktemp mv nc nl nproc od paste patch pdpmake pdrop pgrep pidof pipe_progress pkill printenv printf ps pwd readlink realpath reset rev rm rmdir rpm rpm2cpio sed seq sh sha1sum sha256sum sha384sum sha3sum sha512sum shred shuf sleep sort split ssl_client stat strings stty su sum sync tac tail tar tee test time timeout touch tr true truncate ts tsort ttysize uname uncompress unexpand uniq unix2dos unlink unlzma unlzop unxz unzip uptime usleep uudecode uuencode vi watch wc wget which whoami whois xargs xxd xz xzcat yes zcat

BusyBox for Windows

git

  • ​ git add code.c ​
  • ​ git commit -m “xxx”
  • ​ git rebase
  • ​ git gui

docker

  • 启动 docker run -it ubuntu bash--name <container_name> 指定名称
  • 执行命令 docker exec -it mycontainer bash
  • docker-compose 启动命令 docker-compose up -d 可以一次启动多个容器
  • 其他工具 清理 docker-clean

MySQL

  • 安装 MySQL sudo apt install -y mysql-server
    • 配置用户和密码 ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY 'root'; 密码比如 wegWweg82.asdg
    • 远程访问需要设置密码或新建用户。如果需要还要设置防火墙。
    • 新建用户 CREATE USER '用户名'@'localhost' IDENTIFIED BY '密码'
    • 修改密码 ALTER USER 'test_user'@'%' IDENTIFIED BY 'NewTest@789'
  • 登录 MySQL sudo mysql -u root -p 也可以用图形客户端
  • 字符编码在 MySQL 中需要手动设置为 utf8mb4
  • 主键 id INT AUTO_INCREMENT PRIMARY KEY
  • 查询的时候注意索引。分页用 where 不要用 offset。
  • 客户端工具 mysql-workbench,dbeaver-ce, HeidiSQL, datagrip
  • 备份 mysqldump -u 用户名 -p 数据库名 > backup.sql 可以设置定时备份 > /backup/date +\%F.sql
  • 主从,读写分离,往往是一写多读,注意同步。分库分表。
    • 使用 mysql 的主从复制(Replication)。
    • 可以在应用层实现,也可以使用中间件。
  • 用 docker
    • 运行容器: docker run -d --name mysql-dev -p 3306:3306 -e MYSQL_ROOT_PASSWORD=你的密码 -v mysql_data:/var/lib/mysql mysql:8.0
    • 连接 MySQL: docker exec -it mysql-dev mysql -u root -p
  • 补充
    • 早期版本(5.0 之前)的 MySQL 语法很不一样。
    • 不设置外键约束,放到代码中处理。

redis

  • 安装 sudo apt install redis-server
  • 客户端 redis-cli
    • 基本命令 SET mykey "Hello" GET mykey EXISTS mykey
  • 最简单的情况是当作可以 k-v 数据库,用来作为缓存,可以设置超时时间。
  • 做消息列队无法保证可靠性。
  • 案例,秒杀

python 客户端 pip install redis

spring

  • spring 全家桶
    • pom.xml(Maven 项目)
  • springboot 可以替代 xml
    • 配置文件 application.properties server.port=8080
    • 运行 mvn spring-boot:run
  • Spring MVC
    • 为什么java喜欢用RequestBody接收json数据,而不用RequestParam接收k/v?
    • 分三层,控制器,服务,数据。
  • 连接数据库可以用
    • jdbcTemplate 拼接字符串。
    • mybatis 可以写 sql
    • JPA 是 ORM
  • 其他功能
    • 定时任务
    • 缓存
    • 队列,线程池
import org.springframework.boot.*;
import org.springframework.boot.autoconfigure.*;
import org.springframework.web.bind.annotation.*;

@SpringBootApplication
public class DemoApplication {
        public static void main(String[] args) {
                SpringApplication.run(DemoApplication.class, args);
        }

        @RestController
        class HelloController {
                @GetMapping("/hello")
                public String hello() {
                        return "Hello, Spring Boot (One File, No Maven)!";
                }
        }
}

version: '3.8'
services:
    mysql:
        image: mysql:8.0
        container_name: mysql
        restart: unless-stopped
        environment:
            MYSQL_ROOT_PASSWORD: your_password
            MYSQL_DATABASE: myapp
            MYSQL_USER: appuser
            MYSQL_PASSWORD: app_password
        ports:
            - "3306:3306"
        volumes:
            - mysql_data:/var/lib/mysql
        command: --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci
volumes:
    mysql_data:

网页前端

  • jquery + bootstrap
  • react
  • vue 配合组件库
  • 打包工具
  • 界面库
  • steamlit 还是比较已用的,特别是用来做数据展示。

Ruby on Rails

  • 新建 Rails 项目 rails new myapp 然后 cd myapp
  • 初始化数据库 bin/rails db:create
    • 创建数据表/模型 bin/rails generate model User name:string email:string
    • 执行数据表修改 bin/rails db:migrate
  • 启动开发服务器 bin/rails server
  • 在 Rails 里创建页面,按照:模型 → 路由 → 控制器 → 视图。
    • 创建控制器 bin/rails generate controller Users
  • 页面支持 CRUD 增查改删
    • 通常写操作需要用户权限
  • 开发工具 JetBrains RubyMine
  • 书籍 《Agile Web Development with Rails》
  • 其他
    • 修改数据表 bin/rails generate migration AddAgeAndPhoneToUsers age:integer phone:string
    • 创建首页 bin/rails generate controller Home index 然后设置路由 root "home#index"
  • 前端,也可以使用 React
  • 示例项目 Mastodon

路由 config/routes.rb

Rails.application.routes.draw do
    get "users", to: "users#index"
    root "home#index"
end

控制器 app/controllers/users_controller.rb

class UsersController < ApplicationController
    def index
        @users = User.all
    end
end

视图模板 app/views/users/index.html.erb



Python 其他

正则表达式

匹配中文

import regex as re
re.search(r'\p{Han}+',"apple苹果")

LLM

微调

  • Finetune GPT-2 with LoRA https://nn.labml.ai/lora/experiment.html

在 colab 上微调

ChatGPT

使用 openai api 调用 llm

  • deepseek
  • ollama 地址是 http://localhost:11434
#!pip3 install openai
import os
from openai import OpenAI

client = OpenAI(
    api_key=os.environ.get('DEEPSEEK_API_KEY'),
    base_url="https://api.deepseek.com")

response = client.chat.completions.create(
    model="deepseek-chat",
    messages=[
        {"role": "system", "content": "You are a helpful assistant"},
        {"role": "user", "content": "Hello"},
    ],
    stream=False
)

print(response.choices[0].message.content)