Azure Data Factory - Pull files from SFTP

Recently I had a request to pull the data from Linux based SFTP from my customer. ADF is not able to connect the SFTP due to firewall settings and we had discussion with Microsoft to get the solution. But unfortunately Microsoft said we need to wait couple of months to get the solution.

I came up with another solution as business is not ready to wait till Microsoft help us. We have SSIS license already and I thought of making use of it.

Below is the high level architecture that I proposed.

Use SSIS to pull the file from Linux SFTP and download into local folder. For each feed separate folder is created and the files are downloaded based on the last modified date.

ADF pick all the files from windows FTP based on the date and loop through each file and load into Azure data lake store  RAW layer and then later to analytic layer. RAW layer to Analytic layer processing is done using databricks script which is called inside ADF.








EST_GET_FEEDS task hit the database parameter table and get all the feeds to be picked from the SFTP and pass to the for each loop task.

For each loop task contains 3 tasks.

EST_GET_PARAM_VALUES pick the run time parameter required to pull the file from source system to target system such as folder path, file name etc.

ST_ASSIGN_PARAM_VALUES assign the retrieved values to the SSIS package variables. All the parameter values stored in Object variable and later assigned to the respective variable using the c# script. code snippet provided at the end.

ST_SFTP_FILETRANSFER connect to the Linux FTP and pull the files into local directory and then connect to Windows FTP to store the retrieved files. Please note that SFTP is connected using SSH keys.

































ST_ASSIGN_PARAM_VALUES

public void Main()
  {
            DataTable dt = new DataTable();
            OleDbDataAdapter adapter = new OleDbDataAdapter();
            adapter.Fill(dt, Dts.Variables["User::vParam_Obj"].Value);
            string paramkey = null, paramvalue = null;
            foreach (DataRow row in dt.Rows)
            {
                paramkey = "v" + row[0].ToString();
                paramvalue =  row[1].ToString();
                foreach (Variable var in Dts.Variables)
                {
                    if(paramkey.Equals(var.Name))
                    {
                        paramkey = "User::" + paramkey;
                        if(var.DataType.ToString() == "Int32")
                        {
                            Dts.Variables[paramkey].Value = Int32.Parse(paramvalue);
                        }
                        else
                        {
                            Dts.Variables[paramkey].Value = paramvalue;
                        }
                        break;
                 
                    }
                }

            }

   Dts.TaskResult = (int)ScriptResults.Success;
  }

ST_SFTP_FILETRANSFER

Make sure WinSCPnet.dll is referred in this project before the use. but still we need to include the below code in our script to force the program to use the dll from the path mentioned if it running from the server.

static System.Reflection.Assembly CurrentDomain_AssemblyResolve(object sender, ResolveEventArgs args)
 {
            if (args.Name.Contains("WinSCPnet"))
            {
                string path = @"C:\Program Files (x86)\WinSCP\";
                return System.Reflection.Assembly.LoadFile(System.IO.Path.Combine(path,  " WinSCPnet.dll"));
                }
            return null;
     }


  public void Main()
        {
            string winscpexe = "C:\\Program Files (x86)\\WinSCP\\WinSCP.exe";
            string fileSrcPath = string.Empty;
            string fileDeststgPath = string.Empty;
            string fileDestFTPpath = string.Empty;
            string finalsourcefilename;
            string latest;
            fileSrcPath = Dts.Variables["vsrcFilePath"].Value.ToString(); 
            fileDestFTPpath = Dts.Variables["vtgtFilepath"].Value.ToString();
            fileDeststgPath = Dts.Variables["vLocalpath"].Value.ToString(); 
            try
            {
                SessionOptions sessionOptions = new SessionOptions
                {
                    Protocol = Protocol.Sftp,
                    HostName = Dts.Variables["vsrcHostname"].Value.ToString(),
                    UserName = Dts.Variables["vsrcUserName"].Value.ToString(),
                    SshHostKeyFingerprint = Dts.Variables["$Project::P_SRC_SSHKEY"].GetSensitiveValue().ToString(),
                    SshPrivateKeyPath =  @"\\servername\ApplData\WinSCP\Keys\key.ppk",
                };

                TransferOptions trSFTP = new TransferOptions();
                trSFTP.ResumeSupport.State = WinSCP.TransferResumeSupportState.Off;
    // trSFTP.FileMask = ">2020-02-07"; if we need to pick the files based on the date
                //Session session = new Session();          
                Session snSFTP = new Session();
                snSFTP.ExecutablePath = winscpexe;
                if (!snSFTP.Opened)
                    snSFTP.Open(sessionOptions);
                if (snSFTP.Opened)
                {

                    RemoteDirectoryInfo directory = snSFTP.ListDirectory(fileSrcPath);
                    //RemoteFileInfo latest =
                    //                       directory.Files
                    //                       .Where(file => !file.IsDirectory)
                    //                       .OrderByDescending(file => file.LastWriteTime)
                    //                        .FirstOrDefault();
                    latest = Dts.Variables["vsrcFileName"].Value.ToString() + Dts.Variables["vFileDate"].Value.ToString();
                    finalsourcefilename = fileSrcPath + latest;
                    fileDeststgPath = fileDeststgPath +  Dts.Variables["vtgtFileName"].Value.ToString() + latest;


                    snSFTP.GetFiles(finalsourcefilename, fileDeststgPath, false, trSFTP);
                    snSFTP.Close();

                    //put the files in ftp2
                    SessionOptions sessionOptionsftp2 = new SessionOptions
                    {
                        Protocol = Protocol.Sftp,
                        HostName = Dts.Variables["vtgtHostName"].Value.ToString(),    
                        PortNumber = 10022,
                        UserName = Dts.Variables["vtgtUserName"].Value.ToString(),
                        Password = Dts.Variables["$Project::P_TGT_FTP_PWD"].GetSensitiveValue().ToString(),
                        SshHostKeyFingerprint = Dts.Variables["$Project::P_TGT_SSHKEY"].GetSensitiveValue().ToString(),
                };
                    using (Session session = new Session())
                    {
                        // Connect
                        session.Open(sessionOptionsftp2);
                        fileDestFTPpath = fileDestFTPpath + "//" + Dts.Variables["vtgtFileName"].Value.ToString() + latest;
                        if (session.Opened)
                        {
                            if (finalsourcefilename != fileSrcPath)
                            {
                                session.PutFiles(fileDeststgPath, fileDestFTPpath, false, trSFTP);
                               
                                File.Delete(fileDeststgPath);
                            }
                            session.Close();
                        }
                    }
                }
                Dts.TaskResult = (int)ScriptResults.Success;
            }
            catch(Exception e)
            {
                Dts.Events.FireError(0, null,
                    string.Format("Error when using WinSCP to download the file" + e),
                    null, 0);
                Dts.TaskResult = (int)DTSExecResult.Failure;
            }
        }


once the file reached the windows FTP, now it is the task of ADF to pull the files from there and load into ADLS.



Lookup activity hit the database table and get the parameter values required to run this pipeline. second activity ie. GetMetadata accept the ftp folder path and get all the filenames and the file date. third activity is for each loop which loop through all the files retrieved by metadata and load into ADLS. once the file is saved in ADLS, data factory call one stored procedure to update the last file date inside the parameter. this value will be used for the next file retrieval process.

Properties of GetMetadata activity




ForEach activity properties


Copy properties


















Comments

Post a Comment

Popular posts from this blog

Hadoop - Hive - Load data from csv/xls files

Microsoft BI Implementation - Cube back up and restore using XMLA command