原文towardsdatascience.com/simplifying-the-python-code-for-data-engineering-projects-95f0c41dc58a?sourcecollection_archive---------1-----------------------#2024-06-12数据摄取、验证、处理和测试的 Python 技巧与技术实用操作指南https://medium.com/johnleungTJ?sourcepost_page---byline--95f0c41dc58a--------------------------------https://towardsdatascience.com/?sourcepost_page---byline--95f0c41dc58a-------------------------------- John Leung·发布于 Towards Data Science ·阅读时间 10 分钟·2024 年 6 月 12 日–原始数据来自不同的来源和格式。在数据能够用来回答关键的商业问题之前需要大量的努力和时间来进行数据工程。虽然基础的数据基础设施可能根据数据的量、速度和分析需求而有所不同但一些基本的代码设计技巧仍然是相关的可以简化和优化各种任务。本文将探讨一般数据工程项目中的不同关键部分从数据摄取到管道测试。Python 是数据工程中最广泛使用的编程语言我们将学习如何使用 Python 中的内置功能和高效库来处理这些用例。https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/128d4c278ec7ce5302584afdc96ee3f1.png图片来源Katerina Pavlyuchkova 在 Unsplash想象一下你拥有一家在线零售店销售独特的各类节庆礼品。这个在线商店非常受欢迎每分钟每秒的交易量都非常大。你希望通过分析当前交易的购买习惯满足更多现有客户的需求并服务更多的新客户这促使你开始深入数据处理准备交易记录。#0 模拟数据我们首先使用JSON LinesJSONL文本格式将一些交易数据模拟到文件中其中每一行都是一个独立的 JSON 对象。这种格式在数据流处理领域非常有吸引力例如网页/应用分析和日志管理。在我们的文件中数据字段属于不同的数据类型。它们包括客户和产品标识符以整数/数组格式支付方式以字符串格式以及交易总金额以浮动数字格式。importjsonimportrandomimportnumpyasnpimportdatetime# Remove existing retail_transactions.jsonl file, if any! rm-f/p/a/t/h retail_transactions.jsonl# Set the no of transactionsno_of_iteration500000# Open a file in write modewithopen(retail_transactions.jsonl,w)asf:fornuminrange(no_of_iteration):if(random.randint(1,10000)!5000):# Create a valid transactionnew_txn{orderID:num,customerID:random.randint(1,100000),productID:np.random.randint(10000,sizerandom.randint(1,5)).tolist(),paymentMthd:random.choice([Credit card,Debit card,Digital wallet,Cash on delivery,Cryptocurrency]),totalAmt:round(random.random()*5000,2),invoiceTime:datetime.datetime.now().isoformat()}else:# Create an invalid transactionnew_txn{orderID:,customerID:,productID:,paymentMthd:,totalAmt:,invoiceTime:}# Write the transaciton as a JSON line to the filef.write(json.dumps(new_txn)\n)你可能会发现文件中有几条单独的交易数据其中一些数据字段为空。这模拟了缺失数据的问题这是现实世界中常见的数据质量问题之一。#1 数据摄取 — Yield读取文件中的交易记录最简单的方法之一是将数据集遍历到一个列表中然后将其转换为 Pandas DataFrame。这个方法对于我们演示数据集中的 500,000 条交易非常有效。但如果现实世界中的数据集有数百万甚至数十亿行数据呢如果不导致内存问题我们可能需要长时间等待整个计算完成。有时候我们不关心整个结果而是希望在加载最后一条记录之前先处理初步结果。在这种情况下我们可以使用yield来控制生成器的流向。生成器不会将整个记录存储在内存中。相反它一次只返回一个值并在请求下一个值之前暂停函数执行。在用户代码和库代码交替执行的过程中还会强制顺序执行这意味着你不能在到达第一条记录之前访问第二条记录。你可以在Pydata 讲座视频中了解更多关于这个概念的详细解释。yield语句有不同的实际用途。例如我们可以遍历文件中的每一行只返回非空记录。下面展示了如何执行实时数据过滤importjsondefread_json_file(file_name):# Read the JSONL filewithopen(file_name)asf:forlineinf:txnjson.loads(line)# Yield valid transactions onlyif(txn[orderID]!):yield(txn)txn_generatorread_json_file(retail_transactions.jsonl)这些代码的输出是一个 Python 生成器一种特殊类型的迭代器。你可以在循环中使用next函数来逐个返回后续项目。除了实时数据过滤另一个思路是设计一个生成器函数预处理数据并以预定义的批量大小进行生成这可以直接解析并供机器学习模型训练使用。而且我们还可以用它来异步处理网页请求和响应进行网页爬取。#2 数据验证 — Pydantic假设你有一个包含交易记录信息的 JSON 数据列表这是数据摄取后的一个示例交易{orderID:10000,customerID:48316,productID:[5620],paymentMthd:Cash on delivery,totalAmt:9301.2,invoiceTime:2024-06-10T23:30:29.608443,price:-1}对于每一条传入的数据我们希望确保它经过验证否则在运行后续的数据处理函数时我们很容易遇到各种错误。这可以通过使用pydantic库来实现。我们首先使用PyDantic 模型定义数据字段的模式然后使用model_validate()函数验证我们的 JSON 数据。fromdatetimeimportdatetimefrompydanticimportBaseModel,ValidationError# Define the data model for a transaction recordclassTxnModel(BaseModel):orderID:intcustomerID:intproductID:list[int]paymentMthd:strtotalAmt:floatinvoiceTime:datetimetry:# Validate the sample case against the schemaTxnModel.model_validate(sample_txn)print(Validated successfully!)exceptValidationErrorasexc:# Print error messages for any validation errorprint(Validation Error:)print(exc.errors())# Output:# Validated successfully有时我们发现需要应用更严格的验证规则。例如Pydantic 基础模型会尝试将字符串数据强制转换为整数。为避免这种情况可以在模型级别或字段级别设置strictTrue。此外我们还可以对数据字段应用自定义验证规则。例如我们可能希望检查支付方式值是否符合我们的预期。为了方便测试我们手动将示例案例的支付方式设置为“Bitcoin”这是在线商店中不存在的选项然后使用AfterValidator嵌入一个函数进行进一步检查。fromtypingimportAnnotatedfrompydantic.functional_validatorsimportAfterValidator# Customize the validation ruledefvalidate_payment_mthd(paymentMthd:str):possible_values[Credit card,Debit card,Digital wallet,Cash on delivery,Cryptocurrency]ifpaymentMthdnotinpossible_values:raiseValueError(fInvalid paymentMthd, payment type must be one of{possible_values})returnstorage# Define the data model for a transaction recordclassTxnModel(BaseModel):orderID:intField(strictTrue)customerID:intproductID:list[int]paymentMthd:Annotated[str,AfterValidator(validate_payment_mthd)]totalAmt:Annotated[float,Field(strictTrue,gt0)]invoiceTime:datetime# Manually define a non-existent payment methodsample_txn[paymentMthd]Bitcointry:# Validate the sample case against the schemaTxnModel.model_validate(sample_txn)print(Validated successfully!)exceptValidationErrorasexc:# Print error messages for any validation errorprint(Validation Error:)print(exc.errors()[0][ctx])# Output# Validation Error:# {error: ValueError(Invalid paymentMthd, payment type must be one of [Credit card, Debit card, Digital wallet, Cash on delivery, Cryptocurrency])}验证器成功识别到支付方式不在可能值的列表中。这是通过应用 Pydantic 的内部验证逻辑并随后使用自定义验证函数完成的。代码会引发一个ValueError并填充ValidationError。当触发错误时我们可以采取后续行动进行纠正。这些功能有助于消除数据错误从而确保数据的准确性和完整性。#3 数据处理(1) Python 装饰器数据验证后我们开始处理数据密集型函数。随着数据管道的复杂化执行时间可能会变长。我们希望找出根本原因并优化函数的时间性能。一种简单的方法是在每个函数的开始和结束时收集两个时间戳然后逐一计算时间差。为了确保数据管道中的代码更简洁我们可以利用Python 装饰器。我们首先设计一个 Python 装饰器来测量执行时间。之后我们为任何需要此功能的函数添加注解。例如您可以测量对所有交易进行分类所需的时间。importtime# Measure the excution time of a given functiondeftime_decorator(func):defwrapper(*args,**kwargs):begin_timetime.time()outputfunc(*args,**kwargs)end_timetime.time()print(fExecution time of function{func.__name__}:{round(end_time-begin_time,2)}seconds.)returnoutputreturnwrapper# Categorize the total amount of each transactiontime_decoratordefgroup_txn_price(data):fortxnindata:pricetxn[totalAmt]if0price1500:txn[totalAmtCat]Lowelif1500price3500:txn[totalAmtCat]Moderateelif3500price:txn[totalAmtCat]Highreturndata txn_listgroup_txn_price(txn_list)# Output# Execution time of function group_txn_price: 0.26 seconds.装饰器方法使得代码在不改变原始函数源代码的情况下可以复用。类似地我们可以应用装饰器的思想用于记录函数完成情况或在任务失败时发送邮件警报。(2) Map、reduce、filter这些是常用的 Python 数组方法许多开发者可能都很熟悉。但我仍然认为它们值得提及原因有几点1不可变性——这些函数不会修改原始列表的值2链式灵活性——可以同时应用多个函数的组合3简洁可读——只需一行代码。假设我们有一个包含两个键的 JSON 对象列表支付方式和总金额。让我们探索这些函数是如何工作的。Map对列表中的所有元素执行相同的操作例如为支付方式的值添加后缀。updated_txn_listlist(map(lambdax:{paymentMthd:f{x[paymentMthd]}_2024,totalAmt:x[totalAmt]},txn_list))print(updated_txn_list)# Output# [{paymentMthd: Cryptocurrency_2024, totalAmt: 3339.85},# {paymentMthd: Cash on delivery_2024, totalAmt: 872.52},# ...]Filter获取符合某个条件的元素子集例如仅记录支付方式为加密货币的记录。updated_txn_listlist(map(lambdax:x,filter(lambday:y[paymentMthd]Cryptocurrency,txn_list)))print(updated_txn_list)# Output# [{paymentMthd: Cryptocurrency, totalAmt: 3339.85},# {paymentMthd: Cryptocurrency, totalAmt: 576.15},# ...]Reduce获取单一值的结果例如求和或将所有元素相乘。fromfunctoolsimportreducetotal_amt_cryptoreduce(lambdaacc,x:accx[totalAmt],updated_txn_list,0)print(total_amt_crypto)# Output# 250353984.67000002我们可以在数据科学项目的转换步骤中利用这些函数。例如使用map()来缩放或标准化数据使用filter()来去除异常值和不相关的数据点使用reduce()来生成汇总统计数据。#4 数据管道测试 — Pytest数据管道通常涉及数据摄取、数据清理和提取-转换-加载ETL操作。潜在错误的范围可以非常广泛且容易被忽视尤其是当模型流和结果难以被用户解读时。这导致开发团队更依赖于测试工作。通常会进行单元测试以确保机器学习系统的每个组件按预期执行。最受欢迎的 Python 测试框架之一是[Pytest](https://docs.pytest.org/en/stable/contents.html)。假设我们希望确保转化后的数据质量技术团队和决策者都可以信任这些数据。我们可以测试我们之前处理的关于分类交易价格的函数。为了实现这一点我们需要准备两个 Python 文件feature_engineering.py包含之前构建的函数的文件# Categorize the total amount of each transactiondefadd_features(sample_cases):fortxninsample_cases:pricetxn[‘totalAmt’]if0price1500:txn[‘totalAmtCat’]‘Low’elif1500price3500:txn[‘totalAmtCat’]‘Moderate’elif3500price:txn[‘totalAmtCat’]‘High’returnsample_casestest_feature_engineering.py带有“test_”前缀的文件Pytest 仅在测试过程中识别此文件。fromfeature_engineeringimportadd_featuresdeftest_add_features():sample_cases[{orderID:1,customerID:36536,productID:[2209,2262,4912,3162,5734],paymentMthd:Cryptocurrency,totalAmt:576.15,invoiceTime:2024–06–10T23:53:25.329928}]# Call the function with the sample casessample_casesadd_features(sample_cases)# Check the assertationsfortxninsample_cases:asserttotalAmtCatinlist(txn.keys())assertlen(txn)7assertlen(txn[totalAmtCat])!0上面的 assert 语句确保新的“totalAmtCat”数据字段已添加且其值非空同时原始数据字段不受影响。通过执行命令Pytest我们可以知道测试已经通过https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/f5d42bc912f608940f41c8519fbb3498.pngPytest 结果 — 测试通过图片由作者提供对于一个更高级的案例假设我们有三个函数顺序如下load_data、clean_data和add_features。我们应该如何设计测试文件来逐个验证这些函数的输出importpytestimportjsonfromfeature_engineeringimportload_data,clean_data,add_features# Set up a temporary JSONL filepytest.fixturedefjsonl_file(tmp_path):sample_cases[{orderID:10000,customerID:48316,productID:[5620],paymentMthd:Cash on delivery,totalAmt:9301.2,invoiceTime:2024-06-10T23:30:29.608443,price:-1}]file_pathtmp_path/test_transactions.jsonlwithopen(file_path,w)asf:fortxninsample_cases:f.write(json.dumps(txn)\n)returnfile_path# Test function to validate the load_data functiondeftest_load_data(jsonl_file):dataload_data(jsonl_file)# assert statements here# Test function to validate the clean_data functiondeftest_clean_data(jsonl_file):dataload_data(jsonl_file)dataclean_data(data)# assert statements here# Test function to validate the add_features functiondeftest_add_features(jsonl_file):dataload_data(jsonl_file)dataclean_data(data)dataadd_features(data)# assert statements here我们应该为初始化定义一个固定的基准例如一个包含样本测试用例的 JSON Lines 文件。在这里我们使用pytest.fixture装饰器它类似于我们在 Python 装饰器部分讨论的time_decorator。这个装饰器有助于避免反复初始化样本文件。对于剩下的代码我们涉及几个测试函数来运行管道函数并使用 assert 语句来检测逻辑错误。总结一下我们遇到了数据工程项目中的几个关键方面并探索了如何简化和优化 Python 代码以提高效率和可读性数据摄取使用yield处理大数据集同时实现高效的内存使用。数据验证利用Pydantic根据模式和自定义值模式验证数据字段。数据处理通过应用 Python 装饰器和内置库来启用额外的功能而无需重复代码。通过使用Pytest进行管道测试以确保工作流中各个环节的函数输出质量。在你继续之前如果你喜欢这篇文章我邀请你关注我的Medium 页面和LinkedIn 页面。通过这种方式你可以随时获取有关数据科学副项目、机器学习操作MLOps演示以及项目管理方法的最新内容。## 使用 LangChain 和 LLMs 进行客户分析探索 LangChain 在客户分析中的潜力与局限性并附带实际的实现过程…towardsdatascience.com ## 使用 PySpark 在 Databricks 上进行时间序列特征工程探索 PySpark 在时间序列数据中的潜力导入、提取和可视化数据并附带实际操作…towardsdatascience.com