类似Kettle数据ETL工具,同时比Kettle更加易用和轻量,底层基于duckdb,速度超快,是一款可以让普通用户快速使用的数据处理工具,基于插件机制,
可以快速配置各种数据处理工作流,让数据处理工作流就像搭积木一样,简单易用。
- 基于python的插件机制,目前提供70+插件,同时支持自定义插件
- 基于json的flow任务,支持自定义任务配置
- 底层基于duckdb的内存数据库,支持sql脚本和json配置,支持亿级别的数据进行join查询,并且毫秒出结果


- plugin_test.py 测试插件功能
- flow_test.py 测试flow功能
NiceFlow exec --path csv_input_ck_output.json
# 1.json中的参数可以使用param参数传入
NiceFlow exec --path 1.json --param '{"name":"test"}'
# --sql_script,"sql脚本语句,支持多行"
# --sql_path,"sql脚本文件,支持多行,和sql_script二选一"
# --db_path, "输入duckdb数据库的路径,不存在则为内存模式[可选]"
# --res_path,"输入文件路径,该路径下的文件会被自动加载到db中[可选]"
# --function_path,"输入函数路径,该路径为python文件,可以作为数据库自定义函数使用[可选]"
NiceFlow sql --sql_path 1.sql \
--res_path='C:/Users/xiaow/Desktop/22/test' \
--function_path='C:/Users/xiaow/Desktop/22/test/1.python'
# sql语句
copy (select f_print(d_date) from msd_2024 where d_date = '2023-12-31') to 'C:/Users/xiaow/Desktop/22/test/2.csv';
select f_print(d_date) from msd_2024 where d_date = '2023-12-31';
# python文件中定义函数
def f_print(x:str)->str:
return x+"___";
{
"flow": {
"name": "",
"uid": "",
"param": {
}
},
"nodes": [
{
"id": "FakerInput",
"name": "read1",
"type": "input",
"properties": {
"rows": 10000,
"columns": [
"name",
"address",
"city",
"street_address",
"date_of_birth",
"phone_number"
],
"randoms": [
{
"key": "sex",
"values": [
"男",
"女",
"未知"
]
}
]
}
},
{
"id": "Console",
"name": "write1",
"type": "output",
"properties": {
"row": 100
}
}
],
"edges": [
{
"startId": "read1",
"endId": "write1"
}
]
}
import os
from NiceFlow.core.flow import Flow
from NiceFlow.core.manager import FlowManager
def getProjectPath() -> str:
# 获取当前文件的绝对路径
current_file = os.path.abspath(__file__)
# 获取当前文件所在目录的绝对路径
current_directory = os.path.dirname(current_file)
# 获取当前项目的根目录
project_root = os.path.dirname(os.path.dirname(current_directory))
return project_root
def test_base():
path = getProjectPath() + "/doc/faker_input_console.json"
myFlow: Flow = FlowManager.read(path)
myFlow.run()
if __name__ == '__main__':
test_base()
| 插件 |
功能 |
完成情况 |
文档 |
| Agg |
聚合组件 |
完成 |
聚合组件 |
| Filter |
过滤器 |
完成 |
过滤器 |
| Mapping |
映射器 |
完成 |
映射器 |
| For |
遍历器 |
完成 |
遍历器 |
| IF |
条件判断器 |
完成 |
条件判断器 |
| Join |
连接器 |
完成 |
连接器 |
| Mask |
脱敏器 |
完成 |
脱敏器 |
| Pivot |
透视表 |
完成 |
透视表 |
| Printer |
打印器 |
完成 |
打印器 |
| RegularExtract |
正则提取器 |
|
正则提取器 |
| Rename |
重命名器 |
完成 |
重命名器 |
| Samples |
采样器 |
完成 |
采样器 |
| Sort |
排序器 |
完成 |
排序器 |
| SQL |
SQL转换器 |
完成 |
SQL转换器 |
| Switch |
条件转换器 |
|
条件转换器 |
| Unpivot |
取消透视表 |
完成 |
取消透视表 |
| Variable |
变量转换器 |
完成 |
变量转换器 |
| While |
循环转换器 |
完成 |
循环转换器 |
| Duplicate |
去重器 |
完成 |
去重器 |
| Console |
控制台打印 |
完成 |
控制台输出 |
| SplitToRows |
列拆分为多行 |
完成 |
列转行 |
| Function |
动态函数 |
完成 |
动态函数 |
| 插件 |
功能 |
完成情况 |
文档 |
| FileOutput |
文件输出 |
完成 |
文件输出 |
| KafkaOutput |
Kafka输出 |
完成 |
Kafka输出 |
| SqlServerOutput |
SQLServer输出 |
完成 |
SQLServer输出 |
| S3Output |
S3输出 |
完成 |
S3输出 |
| PulsarOutput |
Pulsar输出 |
完成 |
Pulsar输出 |
| PostgresOutput |
Postgres输出 |
完成 |
Postgres输出 |
| ParquetOutput |
Parquet输出 |
完成 |
Parquet输出 |
| PaimonOutput |
Paimon输出 |
完成 |
Paimon输出 |
| OracleOutput |
Oracle输出 |
完成 |
Oracle输出 |
| OdpsOutput |
MaxCompute输出 |
完成 |
MaxCompute输出 |
| MySQLOutput |
MySQL输出 |
完成 |
MySQL输出 |
| MqttOutput |
MQTT输出 |
|
|
| MongoDBOutput |
MongoDB输出 |
完成 |
MongoDB输出 |
| MarkdownOutput |
Markdown输出 |
完成 |
Markdown输出 |
| HttpOutput |
Http输出 |
|
|
| HiveOutput |
Hive输出 |
|
|
| HdfsOutput |
HDFS输出 |
|
|
| FtpOutput |
FTP输出 |
|
|
| DB2Output |
写入数据到DB2数据库 |
|
|
| DorisOutput |
写入数据到Doris数据库 |
|
|
| DuckDBOutput |
写入数据到DuckDB数据库 |
|
|
| ExcelOutput |
Excel输出 |
完成 |
Excel输出 |
| ESOutput |
Elasticsearch输出 |
|
|
| DuckOutput |
DuckDB输出 |
|
|
| CsvOutput |
CSV输出 |
完成 |
CSV输出 |
| CosOutput |
COS输出 |
|
|
| ClickHouseOutput |
ClickHouse输出 |
|
ClickHouse输出 |
| JsonOutput |
Json输出 |
|
Json输出 |
| HtmlOutput |
html输出 |
|
Html输出 |
- 系统内置PyScript插件,该插件没有固定内容,可以自定义脚本,如下所示
{
"flow": {
"name": "",
"uid": "",
"param": {
} },
"nodes": [
{
"id": "FakerInput",
"name": "read1",
"type": "input",
"properties": {
"rows":10000,
"columns": ["name","address","city","street_address","date_of_birth","phone_number"],
"randoms":[
{"key":"sex","values":["男","女","未知"]}
]
}
},
{
"id": "PyScript",
"name": "write1",
"type": "output",
"properties": {
"content": "import json\n\nfrom NiceFlow.core.flow import Flow\nfrom NiceFlow.core.plugin import IPlugin\n\n\nclass PyScript(IPlugin):\n\n def init(self, param: json, flow: Flow):\n super(PyScript, self).init(param, flow)\n\n def execute(self):\n super(PyScript, self).execute()\n row = int(self.param.get(\"row\",10))\n\n # 获取上一步结果\n pre_node = self.pre_nodes[0]\n PyScript_df = self._pre_result_dict[pre_node.name]\n PyScript_df.limit(row).show()\n self.set_result(PyScript_df)\n\n\n def to_json(self):\n super(PyScript, self).to_json()\n\n def close(self):\n super(PyScript, self).close()"
}
}
],
"edges": [
{
"startId": "read1",
"endId": "write1"
}
]
}
- 使用
PluginManager.register_user_plugin("C://Users//xiaow//Downloads//plugins")指定自定义插件路径
- 该目录下插件示例参考项目
src/plugins目录
import unittest
from NiceFlow.core.flow import Flow
from NiceFlow.core.manager import FlowManager, PluginManager
class TestLoadUserPlugin(unittest.TestCase):
def test_load_user_plugin(self):
path = "faker_input_to_hello.json"
PluginManager.register_user_plugin("C://Users//xw//Downloads//plugins")
myFlow: Flow = FlowManager.read(path)
flow_param = {
}
myFlow.set_param(flow_param)
myFlow.run()
if __name__ == '__main__':
unittest.main()