最近被动的接了点BI的活,各个部门经常会有excel的导入数据库操作流入到我这边,所以开始搭建python脚本来处理这些导入需求,简单记录下。
首先是目录结构,目前还是比较简单的,将会按照操作逻辑进行模块拆分。现有逻辑值拆分了两个包,一个是工具包,负责一些通用逻辑和公共函数的编写,一个是Exce2DB表示从excel导入数据到数据库。
在pycharm中创建项目,默认会使用虚拟环境,图示venv,目的是分离环境,避免依赖版本干扰。如果不在python中构建项目,需要执行如下指令。
# 初始化创建虚拟环境
python3 -m venv myenv # 激活虚拟环境 # windows myenv\Scripts\activate # linux source myenv/bin/activate # 虚拟环境下安装运行等 pip install ... ... # 退出虚拟环境 deactivate
接下来,来看本次需求构建的代码。
首先是数据库入库操作【BI数据库操作.py】。这里inser_df将会从dataframe中获取数据,在连接中批量执行写入,注意修改自己的数据库IP、库名、用户名、密码以及连接url。
import pyodbc
from pandas import DataFrame
def db_connection():
server = 'IP地址'
database = '数据库'
username = '用户名'
password = '密码'
# sqlserver 连接
conn = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};SERVER=%s;DATABASE=%s;UID=%s;PWD=%s' % (
server, database, username, password))
cursor = conn.cursor()
return conn, cursor
def insert_df (data: DataFrame, table_name: str):
"""
从df插入数据到 JSDataWarehouse库下 指定表
:param data:
:param table_name:
:return:
"""
try:
table_header = ','.join('[' + col + ']' for col in data.columns)
data = data.fillna('')
result = list(data.itertuples(index=False))
conn, cursor = db_connection()
sql = ("insert into " + table_name + "(" + table_header + ") values(" + "?," * (data.columns.size - 1) + "?)")
print(sql)
cursor.executemany(sql, result)
conn.commit()
except Exception as err:
print('load_df_into_db error:', err)
finally:
cursor.close()
conn.close()
接着是对 excel 的表头-数据库表字段名称 映射和校验操作【常用工具.py】。这里的check_rename_insert_df2bi是封装在最前端的函数,一般情况的导入用这个函数就足够了。
from pandas import DataFrame
from 工具包.BI数据库操作 import insert_df
def check_df_with_map(df: DataFrame, column_map: dict):
"""
检查df的列名结果是否符合需要
:param df:
:param column_map:
:return:
"""
for k, v in column_map.items():
if not df.columns.__contains__(k):
raise Exception("列<" + k + ">不存在")
def check_rename_insert_df2bi(column_map: dict, df: DataFrame, table_name: str):
"""
一键 检查表头、匹配名称、插入BI数据库
:param column_map:
:param df:
:param table_name:
:return:
"""
check_df_with_map(df, column_map)
df = df.rename(columns= column_map)
insert_df(df, table_name)
最后是调用侧【固定售后率[玉衡].py】,这个excel一共有四个sheet页,为了避免重复读取装载excel,直接设置sheet_name=None一次性装载所有的sheet页,之后对每个sheet页单独做逻辑处理即可,本次是纯导入,所以没有额外的数据处理,直接调用check_rename_insert_df2bi即可实现数据入库。
import pandas as pd
from pandas import DataFrame
from 工具包.常用工具 import check_rename_insert_df2bi
filePath = "C:\\Users\\Administrator\\Desktop\\固定售后占比(分利润中心)-2025年10月.xlsx"
data = pd.read_excel(filePath, sheet_name=None)
spuDf = data.get("SPU固定")
typeDf = data.get("类目固定")
plmDf = data.get("平台固定")
deptDf = data.get("利润中心固定-分销")
def handle_spu (df: DataFrame):
check_rename_insert_df2bi(column_map = {
"利润中心": "profit_center",
"平台": "platform",
"日期": "stat_date",
"SPU型号": "spu",
"spu售后率": "spu_rate"
}, df = df, table_name = "[ods].ods_lcf_after_sale_rate_spu")
def handle_type (df: DataFrame):
check_rename_insert_df2bi(column_map = {
"日期": "stat_date",
"利润中心": "profit_center",
"平台": "platform",
"类目": "leaf_category",
"类目售后率": "leaf_category_rate",
}, df = df, table_name = "[ods].ods_lcf_after_sale_rate_leaf_category")
def handle_plm (df: DataFrame):
check_rename_insert_df2bi(column_map = {
"日期": "stat_date",
"利润中心": "profit_center",
"平台": "platform",
"固定售后占比": "platform_rate"
}, df = df, table_name = "[ods].ods_lcf_after_sale_rate_platform")
def handle_dept (df: DataFrame):
check_rename_insert_df2bi(column_map = {
"日期": "stat_date",
"利润中心": "profit_center",
"固定售后占比": "profit_center_rate"
}, df = df, table_name = "[ods].ods_lcf_after_sale_rate_profit_center")
handle_spu(spuDf)
handle_type(typeDf)
handle_plm(plmDf)
handle_dept(deptDf)