How to dynamically update parameters of an existing Airflow (1.9 version)Connection within code?
How to dynamically update parameters of an existing Airflow (1.9 version)Connection within code?
I have defined a SSH connection via Airflow Admin UI. However I am only defining a service account , host and port in the UI. I am retrieving the password in the first task instance and I need to update the SSH connection with the password in the second task instance and use it in the third task instance.
Currently t1 and t2 work as expected ,however t3 fails since the password is not getting updated and it is looking for .ssh
key file based authentication. Can someone please suggest how this can be implemented ?
.ssh
Here is my code snippet :
from airflow import models
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.models import Variable
from airflow.models import Connection
from airflow.settings import Session
from airflow.utils import db
from airflow.utils.db import provide_session
from airflow import DAG
import logging
import os
svcpassword = 'XXXX'
logging.getLogger().setLevel(logging.DEBUG)
ssh01 = SSHHook(ssh_conn_id='ssh_conn1')
ssh02 = SSHHook(ssh_conn_id='ssh_conn2')
default_args = {
'owner': 'user',
'depends_on_past': False,
'start_date': datetime.now(),
'email': ['abcd@gmail.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay':timedelta(minutes=1)
}
dag = DAG('dag_POC', default_args=default_args,
schedule_interval="@once")
path1 = '/home/user/R_samplescript'
t1 = SSHOperator(
task_id='SSHTask',
command='Rscript '+path1+'.R',
ssh_hook=ssh01,
params={},retries =1 ,
do_xcom_push = True,
dag = dag
)
def create_new_connection(**kwargs):
ti = kwargs['ti']
pwd = ti.xcom_pull(task_ids='SSHTask')
password = str(pwd).replace("\n","n")
password = password[password.find(' ')+1 : ]
password = password.strip()
svcpassword = password
db.merge_conn( models.Connection(
conn_id='ssh_conn2', conn_type='SSH',
host='server_name', port='XXXX',login =
'account_name',password = svcpassword))
t2 = PythonOperator(
task_id='Create_Connection',
python_callable=create_new_connection,
provide_context=True,
dag=dag
)
t3 = SSHOperator(
task_id='RemoteCallTest',
command="R command",
ssh_hook = SSHHook().get_conn('ssh_conn2'),
do_xcom_push = False,
retries = 1,
dag=dag
)
t1 >> t2 >> t3
in the future please provide code examples as it would make this easier to answer
– andscoop
11 hours ago
1 Answer
1
You need to leverage the session wrapper to persist changes to the db
@provide_session()
def set_password(session=None):
conn = MyHook().get_conn(conn_id)
conn.set_password(my_password)
session.add(conn)
session.commit()
I am getting a syntax error : @provide_session() TypeError: provide_session() takes exactly 1 argument (0 given). I have added the updated code snippet above. Also please note that I am using python 2.7.5 version. –
– tpatole
5 hours ago
By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.
idownvotedbecau.se/nocode
– tobi6
17 hours ago