Dec 20, 2017

Dynamic SFTP Connection Factory for Spring Integration

I recently had the opportunity to dive into a Spring Integration project that presented an interesting challenge: the creation of a outbound SFTP Connection Factory at runtime based on Spring Batch jobs.

In a simple outbound SFTP integration, configuration for Spring Integration is quite succinct. You simply need to define the MessageChanel that the SFTP adapter will listen to, the SessionFactory containing the SFTP details, and the IntegrationFlow that defines the path of the message to be processed. Below is an example of a simple Integration flow for a single, static SFTP outbound connection:

@Bean
MessageChannel outboundSftpChannel(){
    new DirectChannel()
}	
 
@Bean
SessionFactory sftpSessionFactory(){
    def sessionFactory = new DefaultSftpSessionFactory(false)
    sessionFactory.host = 'host'
    sessionFactory.port = 1234
    sessionFactory.user = 'user'
    // ...
 
    sessionFactory
}
 
@Bean
IntegrationFlow sftpOutboundFlow(SessionFactory sftpSessionFactory){
    IntegrationFlows.from('outboundSftpChannel')
        .handle(Sftp.outboundAdapter(sftpSessionFactory)
        .remoteDirectory('/tmp/' ))
        .get()
}

This works well for a single SFTP connection, but what if we wanted to define multiple connections, each with a unique host/username? Well, Spring Integration provides the DelegatingSessionFactory class to do that.

The DelegatingSessionFactory contains a SessionFactoryLocator that finds the correct SessionFactory based on a ThreadKey that is set when the message is being written to the MessageChanel. This means you can wire up several SFTP connections or read them from your configuration, place them in their own session factories and have the proper SessionFactory create a SFTP connection for you as the message flows through the defined pipeline. To set that up is also quite simple. With just a few adjustments to our previous configuration, we are able to facilitate the DelegatingSessionFactory.

@Bean
MessageChannel outboundSftpChannel(){
    new DirectChannel()
}	
 
@Bean
DelegatingSessionFactory delegatingSessionFactory(){
	def firstSessionFactory = new DefaultSftpSessionFactory(false)
	firstSessionFactory.host = 'host'
	firstSessionFactory.port = 1234
	//...
 
	def secondSessionFactory = new DefaultSftpSessionFactory(false)
	secondSessionFactory.host = 'hosttwo'
	secondSessionFactory.port = 1234
	//...
 
	def defaultSessionFactory = new DefaultSftpSessionFactory(false)
	defaultSessionFactory.host = 'default'
	defaultSessionFactory.port = 1234
	//...
 
	def sessionFactoryMap = [0:firstSessionFactory, 1: secondSessionFactory]
 
    new DelegatingSessionFactory(sessionFactoryMap, defaultSessionFactory)
}
 
@Bean
IntegrationFlow sftpOutboundFlow(DelegatingSessionFactory delegatingSessionFactory){
    IntegrationFlows.from('outboundSftpChannel')
        .handle(Sftp.outboundAdapter(delegatingSessionFactory)
        .remoteDirectory('/tmp/' ))
        .get()
}

With this configuration, I have specified 3 different connection factories for my SFTP endpoints. As long as we specify the appropriate Thread key, the proper SFTP connection will be initiated and our single SFTP adapter can now handle multiple endpoints. This works well for any process that has a long and stable life, but what if the process that feeds the outboundSftpChannel is more dynamic? What if there is a business use case to be able to add/change/remove SFTP connections at runtime? How can we solve for that?

There are many ways the DelegatingSessionFactory and it’s factory locator can be updated at runtime. The default locator implementation even provides public methods to do so. Add a connection, run the process that invokes this outbound SFTP channel, and you’re done.

I didn’t do that.

The methods for the DefaultSessionFactoryLocator weren’t quite dynamic enough. The SFTP process required the registration of the new connection prior to a message sent down the pipeline for that connection. I hated the thought of having to add more configuration to facilitate what is supposed to be a dynamic process. Why ask the users to remember to set up the new connection in the configuration store (database, file, etc) and have to invoke whatever endpoint would need to be designed to call the appropriate methods? We should let the application control it’s own process.

I injected a custom factory locator functionally similar to the default session factory locator with one key difference: instead of requiring a separate, manual process to register a new session factory with the locator, it would instantiate a new session factory if the right one isn’t in factory storage.

@Component
class ExampleRuntimeSessionFactoryLocator implements SessionFactoryLocator {
 
    private final Map sessionFactoryMap = [:]
 
    @Override
    SessionFactory getSessionFactory(Object key) {
        def sessionFactory = sessionFactoryMap[key]
 
        if (!sessionFactory){
            sessionFactory = generateSessionFactory(key as Long)
            sessionFactoryMap[key] = sessionFactory
        }
 
        sessionFactory
    }
 
    private DefaultSftpSessionFactory generateSessionFactory(Long key){
        new DefaultSftpSessionFactory(
                host: 'host',
                port: 1234,
		//...
        )
    }

And to wire it up in the Spring Integration Configuration, a small change is required to the DelegatingSessionFactory.

@Bean
    DelegatingSessionFactory delegatingSessionFactory(ExampleRuntimeSessionFactoryLocator runtimeSessionFactoryLocator){
        new DelegatingSessionFactory(runtimeSessionFactoryLocator)
    }

And with that, anytime a message is sent on the SFTP outbound channel, the application will automatically wire up any relevant session factories for use.

If you want to see an example project with all of these pieces in place with the addition of looking up the SFTP connection information from a database table, I have uploaded the example code to my GitHub.

About the Author

Object Partners profile.

One thought on “Dynamic SFTP Connection Factory for Spring Integration

  1. Andy says:

    Great article. It helped me a lot. Earlier I had no idea how to accomplish this but after seeing its a big sign of relief. Keep up the good work. Also I have a question what is the difference between DefaultSftpSessionFactory and DelegatingSessionFactory. I tried researching this but didnot find much help.

    1. I’m glad this post helped you.

      As to your question, the DefaultSftpSessionFactory is for managing sessions of a single SFTP endpoint. So if your application is only required to pull from or send to a single SFTP source, the DefaultSftpSessionFactory would be the logical choice.

      The DelegatingSftpSessionFactory is for managing sessions for multiple SFTP endpoints and seems to be intended for primarily outbound sftp sources. As in the code above, the DelegatingSftpSessionFactory is populated with multiple DefaultSftpSessionFactory instances so that at run-time the application can pick with which sftp server to communicate.

  2. Dora says:

    What a big relief I fell when I found your article! It is amazing. But I have a question to your provided demo. Could I also change “remoteDirectory” at runtime?

    1. Thanks for the comment!

      Spring Integration has many ways to choose which remote directory an outbound sftp connection can write to. To be able to specify which destination folder to write to at runtime, replace the remoteDirectory builder with the remoteDirectoryExpression builder and use a SPeL expression to determine the directory location. This expression is evaluated for each message in the integration flow.

      For example, a simple usage would have one of the steps of the integration provide a value in a specific message header, then, have the remoteDirectoryExpression read the appropriate header. Combined with the examples above, each message would then control both its sftp session and its destination directory.

      1. Dora says:

        Thanks for your advice! I really appreciate it. I’ll try it.

        1. Dora says:

          Hi Chris, your article and your advice give me a lot of help! Recently I’m trying using thread pool to upload the files to sftp server. But it always throws error while getSession(). Do you know why? Is it because of using DelegatingSessionFactory? I also ask my colleagues. But they have no idea. I’m sorry to bother you again.
          “org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.MessagingException: Failed to obtain pooled item; nested exception is java.lang.IllegalStateException: failed to create SFTP Session”

  3. Montaser Qasem says:

    You have no idea how thankful I am for this article.

Leave a Reply

Your email address will not be published.

Related Blog Posts
Natively Compiled Java on Google App Engine
Google App Engine is a platform-as-a-service product that is marketed as a way to get your applications into the cloud without necessarily knowing all of the infrastructure bits and pieces to do so. Google App […]
Building Better Data Visualization Experiences: Part 2 of 2
If you don't have a Ph.D. in data science, the raw data might be difficult to comprehend. This is where data visualization comes in.
Unleashing Feature Flags onto Kafka Consumers
Feature flags are a tool to strategically enable or disable functionality at runtime. They are often used to drive different user experiences but can also be useful in real-time data systems. In this post, we’ll […]
A security model for developers
Software security is more important than ever, but developing secure applications is more confusing than ever. TLS, mTLS, RBAC, SAML, OAUTH, OWASP, GDPR, SASL, RSA, JWT, cookie, attack vector, DDoS, firewall, VPN, security groups, exploit, […]