Python: Save Pandas DataFrame to Teradata
Pandas is commonly used by Python users to perform data operations. In many scenarios, the results need to be saved to a storage like Teradata. This article shows you how to do that easily using JayDeBeApi or sqlalchemy-teradata package.
Prerequisites
Teradata JDBC & JayDeBeApi
JayDeBeApi package and Teradata JDBC driver are required. They are commonly used in many of my articles.
For installation and more details, refer to Create, Insert, Delete, Update Operations on Teradata via JDBC in Python.
For this article, I am using TeraJDBC__indep_indep.16.20.00.13 (only one JAR file is required).
sqlalchemy-teradata
Install this package via the following command line:
pip install sqlalchemy
pip install sqlalchemy-teradata
And also make sure you have Teradata Database ODBC Driver 16.10 (or any other Teradata compatible drivers) installed on your computer too.
Teradata
If you want to setup a Teradata server on your Windows, refer to the following article:
Install Teradata Express 15.0.0.8 by Using VMware Player 6.0 in Windows
Sample data
The sample data from one of the previous examples: Python: Load / Read Multiline CSV File.
ID,Text1,Text2 1,Record 1,Hello World! 2,Record 2,Hello Hadoop! 3,Record 3,"Hello Kontext!" 4,Record 4,Hello!
Approach 1 - JayDeBeApi
To improve performance, we use executemany function so insert multiple records.
Code snippet
The following code snippet does these operations:
- Establish a JDBC connection using connect function.
- Create a cursor object
- The cursor object is then used to create a table in the database
- and insert all the records into the database via batch mode.
- Close cursor and connection. (always a good practice when connecting to database)
import jaydebeapi import pandas as pd file_path = 'data.csv' pdf = pd.read_csv(file_path) database = "TestDb" table = "csv_jaydebeapi" user = "dbc" password = "dbc" driver = 'com.teradata.jdbc.TeraDriver' conn = jaydebeapi.connect(driver, f'jdbc:teradata://192.168.119.128/Database={database}', [user, password], ["../terajdbc4.jar"]) cursor = conn.cursor() cursor.execute(f"create multiset table {database}.{table} (ID int, Text1 VARCHAR(100), Text2 VARCHAR(100))") cursor.executemany(f""" insert into {database}.{table} (ID, Text1, Text2) values (?, ?, ?)""", pdf.values.tolist()) cursor.close() conn.close()
Verify result
The result can be queried directly using Teradata SQL Assistant:
Approach 2 - sqlalchemy
Another approach is to use sqlalchemy connection and then use pandas.DataFrame.to_sql function to save the result. With this approach, we don't need to create the table in advance.
Create pandas data frame
Pandas data frame can be easily created using read_csv API:
import pandas as pd file_path = 'data.csv' pdf = pd.read_csv(file_path)
Save to Teradata
We can use to_sql function of Pandas dataframe to save the data to Teradata.
Definition of to_sql
The following parameters are supported in the latest stable release (as at 2020-05-03).
def to_sql( self, name: str, con, schema=None, if_exists: str = "fail", index: bool_t = True, index_label=None, chunksize=None, dtype=None, method=None, )
Code snippet
The following code snippets create a database engine using connection string. Teradata native ODBC driver is used. You can also add many other connection string parameters for Teradata. sqlalchemy.engine.url.URL can be used to establish a connection too. Parameter query can be used to pass parameters.
For example, the following one use extra parameters (LDAP used as authentication mechanism) when establishing connection.
url = sqlalchemy.engine.url.URL(drivername='teradata',username=user, password=password, host=host, database=database, query={'authentication':'LDAP','driver':'Teradata','Session Mode':'ANSI'} ) td_engine = create_engine(url)
Once connection is established, to_sql function is directly invoked to write the data into database. If the table exists already, it will be overwritten since if_exists parameter is specified as 'replace'.
from sqlalchemy import create_engine import pandas as pd file_path = 'data.csv' pdf = pd.read_csv(file_path) database = "TestDb" table = "csv_sqlalchemy" user = "dbc" password = "dbc" host = '192.168.119.128' td_engine = create_engine( f'teradata://{user}:{password}@{host}/?database={database}&driver=Teradata Database ODBC Driver 16.10') conn = td_engine.connect() pdf.to_sql(name=table, con=conn, index=False, if_exists='replace') conn.close()
Verify result
Run the following command in SQL Assistant, it will returns the following result:
select * from TestDb.csv_sqlalchemy
References
- Connect to Teradata database through Python
- Create, Insert, Delete, Update Operations on Teradata via JDBC in Python
It's hard to tell without looking into the code. Can you please provide some code snippet for understanding the problem you are facing?
i am reading excel into dataframe and saving it as
s_df_othr_id.dropDuplicates().write.format('jdbc').options(
url='jdbc:oracle:thin:usr/pwd@//localhost:1521/orcl',
driver='oracle.jdbc.driver.OracleDriver',
dbtable='scott.emp',
user='scott',
password='pwd').mode('overwrite').save().
for 411 records it took 3 mins,
Hi Swapnil,
By looking into the code itself, I could not identify any apparent potential issue.
Typically isolation level 2 will take more resources than level 1 as it blocks other transactions from modifying the data before the current transactions finish the operation. However I don't think it is the cause as your volume is small.
The following are a few things you can explore to see if it helps:
- Look into Spark UI (or History UI) to see how much time taken as each stage - i.e. when reading it and then when writing into database via JDBC. You can also write to write the results as parquet files in HDFS or other supported file systems. In this way, you can be sure whether the bottleneck is at JDBC or it is because reading from the Excel file.
- If the issue is to be sure at JDBC part, look into the root cause why your database is not supporting transaction level 1, i.e. some settings in Oracle?
- If the issue is at reading Excel part, can you convert Excel as standard CSV? That usually can be faster than Excel file format as it doesn't involve Excel ODBC or OleDB driver.
Reference - JDBC transaction isolation levels
/**
* A constant indicating that transactions are not supported.
*/
int TRANSACTION_NONE = 0;
/**
* A constant indicating that
* dirty reads, non-repeatable reads and phantom reads can occur.
* This level allows a row changed by one transaction to be read
* by another transaction before any changes in that row have been
* committed (a "dirty read"). If any of the changes are rolled back,
* the second transaction will have retrieved an invalid row.
*/
int TRANSACTION_READ_UNCOMMITTED = 1;
/**
* A constant indicating that
* dirty reads are prevented; non-repeatable reads and phantom
* reads can occur. This level only prohibits a transaction
* from reading a row with uncommitted changes in it.
*/
int TRANSACTION_READ_COMMITTED = 2;
/**
* A constant indicating that
* dirty reads and non-repeatable reads are prevented; phantom
* reads can occur. This level prohibits a transaction from
* reading a row with uncommitted changes in it, and it also
* prohibits the situation where one transaction reads a row,
* a second transaction alters the row, and the first transaction
* rereads the row, getting different values the second time
* (a "non-repeatable read").
*/
int TRANSACTION_REPEATABLE_READ = 4;
/**
* A constant indicating that
* dirty reads, non-repeatable reads and phantom reads are prevented.
* This level includes the prohibitions in
* <code>TRANSACTION_REPEATABLE_READ</code> and further prohibits the
* situation where one transaction reads all rows that satisfy
* a <code>WHERE</code> condition, a second transaction inserts a row that
* satisfies that <code>WHERE</code> condition, and the first transaction
* rereads for the same condition, retrieving the additional
* "phantom" row in the second read.
*/
int TRANSACTION_SERIALIZABLE = 8;
Thanks Raymond for the Hints,I see in web ui that shuffle after drop duplicates create some empty partitions so just added coalesce(1) and it completed fast in seconds..
Although transaction warning still keeps coming ..
Best regards,
Swapnil
That is a very good observation. Using multiple partitions to write into the database will be useful usually when you have partitions defined in the target database system. Since you are using local Spark executor, it is usually more efficient to use one partition. You can also directly specify `numPartitions
` to specify the maximum partition number to write into table.
The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions)
before writing.
Hi ,
I am facing one issue where Iam reading from excel tabs and loading to oralce tables.
they are smallexcels but for some rows its taking time and giving .
" WARN jdbc.JdbcUtils: Requested isolation level 1 is not supported; falling back to default isolation level 2"
i am using jdbc to write to oracle
jdbc:oracle:thin