ExfSparkGen: ETL to Spark
Drop files here
OR
--
--
--
--
-
-
from pyspark import SparkContext, SparkConf, SQLContext import json import configparser from pathlib import Path import glob import sys import snowflake.connector appName = "ExfSparkGen" master = "<MasterNodeIP>" # replace your master node IP conf=SparkConf() #Path to your Spark Jars for database connectivity sparkJar1='file:///home/user/spark-programs/Spark_Jars/mssql-jdbc-8.4.1.jre11.jar,file:///home/user/spark-programs/Spark_Jars/ojdbc11-21.1.0.0.jar,file:///home/user/spark-programs/Spark_Jars/snowflake-jdbc-3.2.6.jar' conf.setAll([('spark.repl.local.jars', sparkJar1), ('spark.app.name', appName), ('spark.rdd.compress', 'True'), ('spark.serializer.objectStreamReset', '100'), ('spark.submit.pyFiles', sparkJar1), ('spark.submit.deployMode', 'client'), ('spark.files', sparkJar1), ('spark.master',master), ('spark.ui.showConsoleProgress', 'true'), ("spark.driver.extraClassPath",sparkJar1), ('spark.jars',sparkJar1), ('spark.sql.legacy.timeParserPolicy','LEGACY'), ('spark.cores.max','4'), ('spark.executors.cores','4')]) sc = SparkContext(conf=conf) sqlContext = SQLContext(sc) spark = sqlContext.sparkSession try: with open('./DBconfig.json', 'r') as config_file: Db_config_json = json.load(config_file) WorkFlowsDir = 'ConfigFilesToLoad' #Give name of YourRootWorkFLow Directory workFlowName=sys.argv[1] #Give Name of the WorkFlow Directory MappingName=sys.argv[2] #Give Either * to select All Config Files or Give Specific File Name conn=None def create_snowflake_connection(): # Snowflake connection parameters snowflake_user = 'your_username' snowflake_password = 'your_password' snowflake_account = 'your_account_name' snowflake_warehouse = 'your_warehouse_name' snowflake_database = 'your_database_name' snowflake_schema = 'your_schema_name' # Establish connection conn = snowflake.connector.connect( user=snowflake_user, password=snowflake_password, account=snowflake_account, warehouse=snowflake_warehouse, database=snowflake_database, schema=snowflake_schema ) return conn def updateTable(query): global conn conn = create_snowflake_connection() cur = conn.cursor() cur.execute(query) print('executed Succesfully') counter=1 def routerPipeline(piplineNo): global counter for transformation in config_json['MAPPINGS']['TRANSFORMATIONS']: if( transformation['PIPELINE_NUMBER']==piplineNo): if transformation['INSTANCE'] in list(tables.keys()): if(transformation['TYPE']=='Target'): print(transformation['SQL_TRANSFORMATION']) tables[transformation['INSTANCE']] = sqlContext.sql(transformation['SQL_TRANSFORMATION'][0]) tables[transformation['INSTANCE']].show() tables[transformation['INSTANCE']].registerTempTable(transformation['INSTANCE']) elif('Update' in transformation['TYPE']): tempUpdf = sqlContext.sql(transformation['SQL_TRANSFORMATION'][0]) for table in config_json['TABLES']: if(temp_df.count() != 0 and transformation['INSTANCE']==table['NAME'] ): dbDetails=table['DB_CONNECTION_NAME'] tempUpdf.write.mode("overwrite").format("jdbc") \ .option("url", f"{Db_config_json['DB_CONNECTIONS'][dbDetails]['HOST']}") \ .option("db",Db_config_json['DB_CONNECTIONS'][dbDetails]['DATABASE_NAME']) \ .option("schema", table['SCHEMA']) \ .option("dbtable", table['NAME']+'_temp'+str(counter)) \ .option("user", Db_config_json['DB_CONNECTIONS'][dbDetails]['USERNAME']) \ .option("password", Db_config_json['DB_CONNECTIONS'][dbDetails]['PASSWORD']) \ .option("driver",Db_config_json['DB_CONNECTIONS'][dbDetails]['DRIVER']) \ .option("warehouse", 'COMPUTE_WH') \ .option("role", 'SYSADMIN') \ .save() query="Update {} set {} from {} where {}".format( table['NAME'], ",".join([table['NAME']+'.'+column+'='+table['NAME']+'_temp'+str(counter)+'.'+column for column in transformation['UpdateInfo']['Columns']]), table['NAME']+'_temp'+str(counter) , ",".join([table['NAME']+'.'+column+'='+table['NAME']+'_temp'+str(counter)+'.'+column for column in transformation['UpdateInfo']['Primary_Key']]) ) updateTable(query) counter+=1 else: temp_df = sqlContext.sql(transformation['SQL_TRANSFORMATION'][0]) temp_df.show() temp_df.registerTempTable(transformation['INSTANCE']) with open('./RulesConfig.json', 'r') as config: config_json = json.load(config) tables={} for table in config_json['TABLES']: for dbDetails in Db_config_json['DB_CONNECTIONS']: if(table['DB_CONNECTION_NAME']==dbDetails): print(Db_config_json['DB_CONNECTIONS'][dbDetails]['HOST']) if(table['TYPE']=='SOURCE'): tables[table['NAME']+'_SRC'] = spark.read.format("jdbc") \ .option("url", f"{Db_config_json['DB_CONNECTIONS'][dbDetails]['HOST']}") \ .option("db",Db_config_json['DB_CONNECTIONS'][dbDetails]['DATABASE_NAME']) \ .option("schema", 'ADMIN') \ .option("dbtable", table['NAME']) \ .option("user", Db_config_json['DB_CONNECTIONS'][dbDetails]['USERNAME']) \ .option("password", Db_config_json['DB_CONNECTIONS'][dbDetails]['PASSWORD']) \ .option("driver",Db_config_json['DB_CONNECTIONS'][dbDetails]['DRIVER']) \ .option("warehouse", 'COMPUTE_WH') \ .load() tables[table['NAME']+'_SRC'].registerTempTable(table['NAME']+'_SRC') else: tables[table['NAME']] = spark.read.format("jdbc") \ .option("url", f"{Db_config_json['DB_CONNECTIONS'][dbDetails]['HOST']}") \ .option("db",Db_config_json['DB_CONNECTIONS'][dbDetails]['DATABASE_NAME']) \ .option("database",table['DATABASE_NAME']) \ .option("schema", table['SCHEMA']) \ .option("dbtable", table['NAME']) \ .option("user", Db_config_json['DB_CONNECTIONS'][dbDetails]['USERNAME']) \ .option("password", Db_config_json['DB_CONNECTIONS'][dbDetails]['PASSWORD']) \ .option("driver",Db_config_json['DB_CONNECTIONS'][dbDetails]['DRIVER']) \ .option("warehouse", 'COMPUTE_WH') \ .load() tables[table['NAME']].registerTempTable(table['NAME']) for transformation in config_json['MAPPINGS']['TRANSFORMATIONS']: if transformation['INSTANCE'] in list(tables.keys()): tables[transformation['INSTANCE']] = sqlContext.sql(transformation['SQL_TRANSFORMATION'][0]) tables[transformation['INSTANCE']].show() tables[transformation['INSTANCE']].registerTempTable(transformation['INSTANCE']) else: if(transformation['TYPE'])=='Router': for i in range(len(transformation['NEXT_PIPELINES'])): temp_df = sqlContext.sql(transformation['SQL_TRANSFORMATION'][i]) temp_df.show() temp_df.registerTempTable(transformation['INSTANCE'][i]) routerPipeline(transformation['NEXT_PIPELINES'][i]) break else: for i in range(len(transformation['SQL_TRANSFORMATION'])): temp_df = sqlContext.sql(transformation['SQL_TRANSFORMATION'][i]) temp_df.show() temp_df.registerTempTable(transformation['INSTANCE']) for table in config_json['TABLES']: for dbDetails in Db_config_json["DB_CONNECTIONS"]: if(table['DB_CONNECTION_NAME']==dbDetails): if table['TYPE'] =='TARGET': tables[table['NAME']].write.mode("append").format("jdbc") \ .option("url", f"{Db_config_json['DB_CONNECTIONS'][dbDetails]['HOST']}") \ .option("db",Db_config_json['DB_CONNECTIONS'][dbDetails]['DATABASE_NAME']) \ .option("schema", table['SCHEMA']) \ .option("dbtable", table['NAME']) \ .option("user", Db_config_json['DB_CONNECTIONS'][dbDetails]['USERNAME']) \ .option("password", Db_config_json['DB_CONNECTIONS'][dbDetails]['PASSWORD']) \ .option("driver",Db_config_json['DB_CONNECTIONS'][dbDetails]['DRIVER']) \ .option("warehouse", 'COMPUTE_WH') \ .option("role", 'SYSADMIN') \ .save() except Exception as e: print(e) finally: conn.commit() conn.close() sc.stop()