Python: Save Pandas DataFrame to Teradata

Raymond Raymond event 2020-05-03 visibility 14,122 comment 6
more_vert

Pandas is commonly used by Python users to perform data operations. In many scenarios, the results need to be saved to a storage like Teradata. This article shows you how to do that easily using JayDeBeApi or sqlalchemy-teradata package. 

Prerequisites

Teradata JDBC & JayDeBeApi

JayDeBeApi package and Teradata JDBC driver are required. They are commonly used in many of my articles.

For installation and more details, refer to Create, Insert, Delete, Update Operations on Teradata via JDBC in Python.

For this article, I am using TeraJDBC__indep_indep.16.20.00.13 (only one JAR file is required).

sqlalchemy-teradata

Install this package via the following command line:

pip install sqlalchemy
pip install sqlalchemy-teradata

And also make sure you have Teradata Database ODBC Driver 16.10 (or any other Teradata compatible drivers) installed on your computer too.

Teradata

If you want to setup a Teradata server on your Windows, refer to the following article:

Install Teradata Express 15.0.0.8 by Using VMware Player 6.0 in Windows

Sample data

The sample data from one of the previous examples: Python: Load / Read Multiline CSV File.

ID,Text1,Text2
1,Record 1,Hello World!
2,Record 2,Hello Hadoop!
3,Record 3,"Hello 
Kontext!"
4,Record 4,Hello!

Approach 1 - JayDeBeApi

To improve performance, we use executemany function so insert multiple records.

Code snippet

The following code snippet does these operations:

  • Establish a JDBC connection using connect function.
  • Create a cursor object 
  • The cursor object is then used to create a table in the database
  • and insert all the records into the database via batch mode.
  • Close cursor and connection. (always a good practice when connecting to database)
import jaydebeapi
import pandas as pd
file_path = 'data.csv'
pdf = pd.read_csv(file_path)

database = "TestDb"
table = "csv_jaydebeapi"
user = "dbc"
password = "dbc"
driver = 'com.teradata.jdbc.TeraDriver'
conn = jaydebeapi.connect(driver,
                          f'jdbc:teradata://192.168.119.128/Database={database}',
                          [user, password],
                          ["../terajdbc4.jar"])
cursor = conn.cursor()
cursor.execute(f"create multiset table {database}.{table} (ID int, Text1 VARCHAR(100), Text2 VARCHAR(100))")
cursor.executemany(f"""
        insert into {database}.{table} (ID, Text1, Text2)
        values (?, ?, ?)""", pdf.values.tolist())

cursor.close()
conn.close()

Verify result

The result can be queried directly using Teradata SQL Assistant:

2020050321005-image.png

Approach 2 - sqlalchemy

Another approach is to use sqlalchemy connection and then use pandas.DataFrame.to_sql function to save the result. With this approach, we don't need to create the table in advance.

Create pandas data frame

Pandas data frame can be easily created using read_csv API:

import pandas as pd
file_path = 'data.csv'
pdf = pd.read_csv(file_path)

Save to Teradata

We can use to_sql function of Pandas dataframe to save the data to Teradata.

Definition of to_sql

The following parameters are supported in the latest stable release (as at 2020-05-03). 

def to_sql(
        self,
        name: str,
        con,
        schema=None,
        if_exists: str = "fail",
        index: bool_t = True,
        index_label=None,
        chunksize=None,
        dtype=None,
        method=None,
    )
Code snippet 

The following code snippets create a database engine using connection string. Teradata native ODBC driver is used. You can also add many other connection string parameters for Teradata.  sqlalchemy.engine.url.URL can be used to establish a connection too. Parameter query can be used to pass parameters.

For example, the following one use extra parameters (LDAP used as authentication mechanism) when establishing connection. 

url = sqlalchemy.engine.url.URL(drivername='teradata',username=user,
password=password,
host=host,
database=database,
query={'authentication':'LDAP','driver':'Teradata','Session Mode':'ANSI'}
)
td_engine = create_engine(url)

Once connection is established, to_sql function is directly invoked to write the data into database. If the table exists already, it will be overwritten since if_exists parameter is specified as 'replace'.

from sqlalchemy import create_engine
import pandas as pd

file_path = 'data.csv'
pdf = pd.read_csv(file_path)

database = "TestDb"
table = "csv_sqlalchemy"
user = "dbc"
password = "dbc"
host = '192.168.119.128'

td_engine = create_engine(
    f'teradata://{user}:{password}@{host}/?database={database}&driver=Teradata Database ODBC Driver 16.10')
conn = td_engine.connect()
pdf.to_sql(name=table, con=conn, index=False, if_exists='replace')
conn.close()

Verify result

Run the following command in SQL Assistant, it will returns the following result:

select * from TestDb.csv_sqlalchemy

2020050324715-image.png

References

More from Kontext
comment Comments
S Swapnil Patil

Swapnil access_time 3 years ago link more_vert

Hi ,


I am facing one issue where  Iam reading from excel tabs and loading to oralce tables.

they are smallexcels but for some rows its taking  time and giving  .


" WARN jdbc.JdbcUtils: Requested isolation level 1 is not supported; falling back to default isolation level 2"


i am using jdbc to write to oracle 

jdbc:oracle:thin



 

Raymond Raymond

Raymond access_time 3 years ago link more_vert

It's hard to tell without looking into the code. Can you please provide some code snippet for understanding the problem you are facing? 

S Swapnil Patil

Swapnil access_time 3 years ago link more_vert

i am  reading excel into dataframe  and saving it as 

s_df_othr_id.dropDuplicates().write.format('jdbc').options(

      url='jdbc:oracle:thin:usr/pwd@//localhost:1521/orcl',

      driver='oracle.jdbc.driver.OracleDriver',

      dbtable='scott.emp',

      user='scott',

      password='pwd').mode('overwrite').save().


for 411 records it took 3 mins, 

Raymond Raymond

Raymond access_time 3 years ago link more_vert

Hi Swapnil,

By looking into the code itself, I could not identify any apparent potential issue.

Typically isolation level 2 will take more resources than level 1 as it blocks other transactions from modifying the data before the current transactions finish the operation. However I don't think it is the cause as your volume is small.

The following are a few things you can explore to see if it helps:

  • Look into Spark UI (or History UI) to see how much time taken as each stage - i.e. when reading it and then when writing into database via JDBC. You can also write to write the results as parquet files in HDFS or other supported file systems. In this way, you can be sure whether the bottleneck is at JDBC or it is because reading from the Excel file. 
  • If the issue is to be sure at JDBC part, look into the root cause why your database is not supporting transaction level 1, i.e. some settings in Oracle?
  • If the issue is at reading Excel part, can you convert Excel as standard CSV? That usually can be faster than Excel file format as it doesn't involve Excel ODBC or OleDB driver.

Reference - JDBC transaction isolation levels

    /**
     * A constant indicating that transactions are not supported.
     */
    int TRANSACTION_NONE             = 0;
    /**
     * A constant indicating that
     * dirty reads, non-repeatable reads and phantom reads can occur.
     * This level allows a row changed by one transaction to be read
     * by another transaction before any changes in that row have been
     * committed (a "dirty read").  If any of the changes are rolled back,
     * the second transaction will have retrieved an invalid row.
     */
    int TRANSACTION_READ_UNCOMMITTED = 1;
    /**
     * A constant indicating that
     * dirty reads are prevented; non-repeatable reads and phantom
     * reads can occur.  This level only prohibits a transaction
     * from reading a row with uncommitted changes in it.
     */
    int TRANSACTION_READ_COMMITTED   = 2;
    /**
     * A constant indicating that
     * dirty reads and non-repeatable reads are prevented; phantom
     * reads can occur.  This level prohibits a transaction from
     * reading a row with uncommitted changes in it, and it also
     * prohibits the situation where one transaction reads a row,
     * a second transaction alters the row, and the first transaction
     * rereads the row, getting different values the second time
     * (a "non-repeatable read").
     */
    int TRANSACTION_REPEATABLE_READ  = 4;
    /**
     * A constant indicating that
     * dirty reads, non-repeatable reads and phantom reads are prevented.
     * This level includes the prohibitions in
     * <code>TRANSACTION_REPEATABLE_READ</code> and further prohibits the
     * situation where one transaction reads all rows that satisfy
     * a <code>WHERE</code> condition, a second transaction inserts a row that
     * satisfies that <code>WHERE</code> condition, and the first transaction
     * rereads for the same condition, retrieving the additional
     * "phantom" row in the second read.
     */
    int TRANSACTION_SERIALIZABLE     = 8;


S Swapnil Patil

Swapnil access_time 3 years ago link more_vert

Thanks Raymond  for the Hints,I see in web ui that shuffle after drop  duplicates create some empty partitions so just added coalesce(1) and it completed fast in seconds..

Although transaction warning still keeps coming ..


Best regards,

Swapnil

Raymond Raymond

Raymond access_time 3 years ago link more_vert

That is a very good observation. Using multiple partitions to write into the database will be useful usually when you have partitions defined in the target database system. Since you are using local Spark executor, it is usually more efficient to use one partition. You can also directly specify `numPartitions` to specify the maximum partition number to write into table.

The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts