@@ -844,93 +844,64 @@ def data_loader_ingest_data_from_query():
844844 }), status_code
845845
846846
847- @tables_bp .route ('/refresh -derived-data' , methods = ['POST' ])
848- def refresh_derived_data ():
849- """Refresh derived data by re-executing Python code on updated base table"""
847+ @tables_bp .route ('/recalculate -derived-data' , methods = ['POST' ])
848+ def recalculate_derived_data ():
849+ """Recalculate derived data by re-executing Python code on updated base table"""
850850 try :
851- from data_formulator .py_sandbox import run_transform_in_sandbox2020
852-
853851 data = request .get_json ()
854852
855- # Get updated base table data and transformation info
856- updated_table = data .get ('updated_table' ) # {name, rows, columns}
857- derived_tables = data .get ('derived_tables' , []) # [{id, code, source_tables: [names]}]
853+ # Get updated base table data
854+ updated_table_id = data .get ('updated_table_id' )
855+ updated_table_rows = data .get ('updated_table_rows' )
858856
859- if not updated_table :
860- return jsonify ({"status" : "error" , "message" : "No updated table provided" }), 400
857+ if not updated_table_id :
858+ return jsonify ({"status" : "error" , "message" : "No table ID provided" }), 400
861859
862- if not derived_tables :
863- return jsonify ({"status" : "error" , "message" : "No derived tables to refresh " }), 400
860+ if not updated_table_rows :
861+ return jsonify ({"status" : "error" , "message" : "No updated data provided " }), 400
864862
865- # Validate updated table has expected structure
866- updated_table_name = updated_table ['name' ]
867- updated_columns = set (updated_table ['columns' ])
863+ # Get list of affected derived tables
864+ affected_derived_tables = data .get ('affected_derived_tables' , [])
868865
869- # Verify columns match by checking against database schema
870- with db_manager .connection (session ['session_id' ]) as db :
871- try :
872- existing_columns = [col [0 ] for col in db .execute (f"DESCRIBE { updated_table_name } " ).fetchall ()]
873- existing_columns_set = set (existing_columns )
874-
875- # Validate that all existing columns are present in updated data
876- if not existing_columns_set .issubset (updated_columns ):
877- missing = existing_columns_set - updated_columns
878- return jsonify ({
879- "status" : "error" ,
880- "message" : f"Updated data is missing required columns: { ', ' .join (missing )} "
881- }), 400
882- except Exception as e :
883- logger .warning (f"Could not validate columns for { updated_table_name } : { str (e )} " )
866+ if not affected_derived_tables :
867+ # No derived tables to refresh, just success
868+ return jsonify ({
869+ "status" : "success" ,
870+ "results" : []
871+ })
884872
885873 results = []
886874
887- # Process each derived table
888- for derived_info in derived_tables :
875+ # Process each affected derived table independently
876+ for derived_info in affected_derived_tables :
889877 try :
890- code = derived_info ['code' ]
891- source_table_names = derived_info ['source_tables' ]
892878 derived_table_id = derived_info ['id' ]
879+ code = derived_info ['code' ]
880+ source_table_ids = derived_info ['source_tables' ]
881+ is_virtual = derived_info .get ('is_virtual' , False )
893882
894- # Prepare input dataframes
895- df_list = []
896-
897- for source_name in source_table_names :
898- if source_name == updated_table_name :
899- # Use the updated data
900- df = pd .DataFrame (updated_table ['rows' ])
901- else :
902- # Fetch from database
903- with db_manager .connection (session ['session_id' ]) as db :
904- result = db .execute (f"SELECT * FROM { source_name } " ).fetchdf ()
905- df = result
906-
907- df_list .append (df )
908-
909- # Execute the transformation code in subprocess for safety
910- exec_result = run_transform_in_sandbox2020 (code , df_list , exec_python_in_subprocess = True )
911-
912- if exec_result ['status' ] == 'ok' :
913- output_df = exec_result ['content' ]
914-
915- # Convert to records format efficiently
916- rows = output_df .to_dict (orient = 'records' )
917- columns = list (output_df .columns )
918-
919- results .append ({
920- 'id' : derived_table_id ,
921- 'status' : 'success' ,
922- 'rows' : rows ,
923- 'columns' : columns
924- })
925- else :
883+ # For now, only support Python (non-virtual) tables
884+ if is_virtual :
926885 results .append ({
927886 'id' : derived_table_id ,
928- 'status' : 'error ' ,
929- 'message' : exec_result [ 'content' ]
887+ 'status' : 'skipped ' ,
888+ 'message' : 'Virtual (DuckDB) table refresh not yet supported'
930889 })
890+ continue
891+
892+ # Recalculate using Python
893+ result = recalc_derived_data_py (
894+ updated_table_id = updated_table_id ,
895+ updated_table_rows = updated_table_rows ,
896+ derived_table_id = derived_table_id ,
897+ code = code ,
898+ source_table_ids = source_table_ids
899+ )
900+
901+ results .append (result )
931902
932903 except Exception as e :
933- logger .error (f"Error refreshing derived table { derived_info .get ('id' )} : { str (e )} " )
904+ logger .error (f"Error recalculating derived table { derived_info .get ('id' )} : { str (e )} " )
934905 results .append ({
935906 'id' : derived_info .get ('id' ),
936907 'status' : 'error' ,
@@ -943,9 +914,82 @@ def refresh_derived_data():
943914 })
944915
945916 except Exception as e :
946- logger .error (f"Error refreshing derived data: { str (e )} " )
917+ logger .error (f"Error recalculating derived data: { str (e )} " )
947918 safe_msg , status_code = sanitize_db_error_message (e )
948919 return jsonify ({
949920 "status" : "error" ,
950921 "message" : safe_msg
951- }), status_code
922+ }), status_code
923+
924+
925+ def recalc_derived_data_py (updated_table_id , updated_table_rows , derived_table_id , code , source_table_ids ):
926+ """
927+ Recalculate a Python-based derived table using updated input data.
928+
929+ Args:
930+ updated_table_id: ID of the table that was updated
931+ updated_table_rows: New rows for the updated table
932+ derived_table_id: ID of the derived table to recalculate
933+ code: Python transformation code
934+ source_table_ids: List of source table IDs this derived table depends on
935+
936+ Returns:
937+ dict with status, rows, and columns
938+ """
939+ from data_formulator .py_sandbox import run_transform_in_sandbox2020
940+
941+ try :
942+ # Prepare input dataframes
943+ df_list = []
944+
945+ for source_id in source_table_ids :
946+ if source_id == updated_table_id :
947+ # Use the updated data
948+ df = pd .DataFrame (updated_table_rows )
949+ else :
950+ # Fetch from database or state
951+ with db_manager .connection (session ['session_id' ]) as db :
952+ try :
953+ result = db .execute (f"SELECT * FROM { source_id } " ).fetchdf ()
954+ df = result
955+ except Exception as e :
956+ logger .warning (f"Could not fetch table { source_id } from database: { str (e )} " )
957+ # Table might not be in database yet (in-memory only)
958+ return {
959+ 'id' : derived_table_id ,
960+ 'status' : 'error' ,
961+ 'message' : f'Could not fetch source table: { source_id } '
962+ }
963+
964+ df_list .append (df )
965+
966+ # Execute the transformation code in subprocess for safety
967+ exec_result = run_transform_in_sandbox2020 (code , df_list , exec_python_in_subprocess = True )
968+
969+ if exec_result ['status' ] == 'ok' :
970+ output_df = exec_result ['content' ]
971+
972+ # Convert to records format efficiently
973+ rows = output_df .to_dict (orient = 'records' )
974+ columns = list (output_df .columns )
975+
976+ return {
977+ 'id' : derived_table_id ,
978+ 'status' : 'success' ,
979+ 'rows' : rows ,
980+ 'columns' : columns
981+ }
982+ else :
983+ return {
984+ 'id' : derived_table_id ,
985+ 'status' : 'error' ,
986+ 'message' : exec_result ['content' ]
987+ }
988+
989+ except Exception as e :
990+ logger .error (f"Error in recalc_derived_data_py for { derived_table_id } : { str (e )} " )
991+ return {
992+ 'id' : derived_table_id ,
993+ 'status' : 'error' ,
994+ 'message' : str (e )
995+ }
0 commit comments