Yesterday, I set up the premise for using named pipes in SSIS. Today we are going to get into the actual coding required to get this system up and running. In a bit I will go over how you modify your Script objects to enable you to access the libraries needed for named pipes. Both your producer and consumer scripts need to make the changes below. We will start with the producer Script Task in my example.
Named pipes are available as of .NET 3.5. The objects reside in the System.IO.Pipes namespace. When you first create a Script object (task or component) it defaults to .Net 2.0. To fix this you need to get to the script properties and I don't think it's very intuitive. When you edit your Script object, open the properties window and right-click on the unusually name of your script and select properties (shown here):
This will open up a larger pane. From this window you want to select the appropriate .Net library, which is 3.5.
After you change the framework, it will pop up a box telling you the script must be closed and re-opened, answer yes. Now you are ready to incorporate named pipes. Open up the Script Task that will be your source and import two libraries; System.IO and System.IO.Pipes.
We need to import System.IO so we can read from our directory list. This information will be pushed down the pipes for the consumers to work on. Now let's dissect a producer.
Essentially we are taking lines from within a text file and passing them literally 'down the pipe' to the consumers. The pipe is easy to set up, in this case we want a NamedPipeServerStream not aNamedPipeClientStream. Client streams will be used by our consumers. I name the pipe so it can be found by the clients and I make sure the direction is out only. There is no reason for the clients to talk to the source, in this case anyway.
Now that the pipe has been created you need a way to send the information down the pipe and to do this, you create a StreamWriter whose destination is the pipe. Now that you have the plumbing (pardon the pun) all set up, you enter a loop that says 'So long as I have a line in my source file wait for a client to connect to me'. Let's take a closer look at that loop.
The while loop will loop once for each line in the text file, nothing unusual about that. The next thing that happens is the Script object will listen on that pipe for a process to make a connection request. If no connections are ever made, this package will only end if it is killed or an error is thrown. Once a connection is established the next line in the code is executed, which is the WriteLine. This reads a line from the text via, via ReadLine method and pushes that into the StreamWriter which is hooked up to the pipe. We then flush the stream, because we are only sending a single line of information and don't want any of it to get tossed out when we disconnect. Why do we disconnect? I need to release that pipe and connect to the next consumer that is waiting for data. If I were to keep this open, I wouldn't be able to service any other consumers. We aren't done just yet. We have some housekeeping to do either during the event of an error or when we have no more work to do.
Although the producer will wait indefinitely for someone to take work from it, consumers will not wait indefinitely for work to be sent. The reason for this If I had one unit of work left, but two consumers, one of them would get the work, the producer would gracefully exit, but the last consumer would be waiting on a pipe forever. I have a neat technique to avoid this situation.
In all of that code the important bit is theout_of_band variable. It defaults to a 1, meaning there is work to be done. When the producer exits, it sets this to a 0 letting all of the consumers know there is no more work to be had. We will get into how the consumers use this and how we prevent infinite loops in this framework.Tomorrow -- Consumers.