In this section you can find code snippets commonly used to accomplish different tasks. The Java code can be run from a pre/postscript in any action/trigger or within an IDE, the PL/SQL code from a DB management tool.
Print Current Concurrency and Concurrency Usage
This script prints out the current concurrency throttle settings, and the throttles taken up by execution on a per namespace basis. The code can be run as is from within any IDE.
import flux.Engine; import flux.EngineException; import flux.Factory; import flux.runtimeconfiguration.RuntimeConfigurationNode; import java.io.FileNotFoundException; import java.rmi.NotBoundException; import java.rmi.RemoteException; public class PrintConcurrency { public static void main(String[] args) throws NotBoundException, EngineException, RemoteException, FileNotFoundException { Factory factory = Factory.makeInstance(); //Change the hostname to suit your environment. Engine engine = factory.lookupEngine("localhost"); engine.login("admin", "admin"); RuntimeConfigurationNode root = engine.getRuntimeConfiguration(); if (root.getConcurrencyThrottle() != null) { System.out.println(root.getConcurrencyThrottle() + " - Total Used: " + engine.size("/")); } for (RuntimeConfigurationNode childNode : root.getChildren()) { System.out.println(childNode.getConcurrencyThrottle() + " - Used: " + engine.size(childNode.getFullyQualifiedName() + "/")); } } }
Print Workflow's Scheduled vs Actual Firing Time
This PL/SQL code prints out a list of scheduled workflows, including the full namespace, the scheduled firing time and the actual firing time. This code was tested on SQL Server and can be used to put together a Report.
SELECT a.FLOW_CHART_NAME, b.SCHEDULED_T_DATE, c.START_TIMESTAMP FROM FLUX_READY a, FLUX_TIMER_TRIGGER b, FLUX_FLOW_CHART_RUN c, FLUX_VARIABLE d WHERE a.ACTION IN (SELECT OWNER FROM FLUX_VARIABLE WHERE FK_TYPE_PK = b.PK) AND d.OWNER = b.ABSTRACT_TIMEXP AND c.NAMESPACE = a.FLOW_CHART_NAME;
Remove All Failed Workflows from the Engine
This script looks for workflows that are in the FAILED state and removes them from the engine one at a time.
import flux.*; import java.io.FileNotFoundException; import java.rmi.NotBoundException; import java.rmi.RemoteException; public class RemoveFailedWF { public static void main(String[] args) throws NotBoundException, EngineException, RemoteException, FileNotFoundException { Factory factory = Factory.makeInstance(); Engine engine = factory.lookupEngine("localhost"); engine.login("admin", "admin"); FlowChartIterator fcit = engine.getByState("/",SuperState.ERROR, SubState.FAILED); while (fcit.hasNext()) { FlowChart fc = fcit.next(); engine.remove(fc.getName()); } fcit.close(); } }
Pass Information Across to an Error Break
When an error occurs in Flux and an ERROR flow is followed, the Flow Context and Workflow variables are cleared before entering the flow. This example shows a way to keep (and pass on) any needed information.
import flux.ActionListener; import flux.Configuration; import flux.DatabaseType; import flux.Engine; import flux.EngineHelper; import flux.Factory; import flux.FlowChart; import flux.JavaAction; import flux.KeyFlowContext; import flux.NullAction; import flux.VariableManager; import flux.runtimeconfiguration.RuntimeConfigurationFactory; import flux.runtimeconfiguration.RuntimeConfigurationNode; /** * Simple test case to demonstrate using a subclassed exception to pass information across an error break * without having to rely on doing a transaction break first. * * Normally when an error occurs, workflow and flowcontext variables are cleared before entering the error flow. * This offers a means to pass needed information into the error flow. * * */ public class ErrorHandlerTest { public static void main(String[] args) throws Exception { Factory factory = Factory.makeInstance(); Configuration config = new Configuration(); config.setClusterNetworkingEnabled(true); config.setDatabaseType(DatabaseType.H2); FlowChart errorHandler = EngineHelper.makeFlowChart("Handle Error"); JavaAction errorListener = errorHandler.makeJavaAction("Error Catcher"); errorListener.setListener(ErrorExceptionOutput.class); RuntimeConfigurationFactory runtimeFactory = factory.makeRuntimeConfigurationFactory(); RuntimeConfigurationNode runtimeConfigurationNode = runtimeFactory.makeRuntimeConfiguration(); runtimeConfigurationNode.setDefaultFlowChartErrorHandler(errorHandler); config.setRuntimeConfiguration(runtimeConfigurationNode); Engine engine = factory.makeEngine(config); engine.clear(); for (int i = 1; i <= 1; i++) { FlowChart workflow = EngineHelper.makeFlowChart("recurring job " + i); NullAction na = workflow.makeNullAction("na"); JavaAction ja = workflow.makeJavaAction("java"); ja.setListener(TestListener.class); na.addFlow(ja); String name = engine.put(workflow); System.out.println("Added job = " + name); } engine.start(); System.out.println("Engine started!"); System.in.read(); engine.dispose(); } public static class TestListener implements ActionListener { @Override public Object actionFired(KeyFlowContext flowContext) throws Exception { // do some processing here. Catch any exceptions and populate your subclassed exception and throw it here VariableManager variableManager = flowContext.getFlowChart().getVariableManager(); // If desired - grab data fdrom variable manager and load to MyException MyException myException = new MyException(); myException.setDescription("This is a description out from my custom exception"); throw myException; } } public static class ErrorExceptionOutput implements ActionListener { @Override public Object actionFired(KeyFlowContext flowContext) throws Exception { System.out.println(((MyException)flowContext.getErrorResult().exception).getDescription()); throw new Exception("Bailing out"); // Will mark the workflow as failed. } } public static class MyException extends Exception { public String description = ""; public String getDescription() { return description; } public void setDescription(String aDescription) { description = aDescription; } } }
Get the Email Address from a Mail Trigger
This code example grabs the RESULT.from_addresses field from the RESULT object of a Mail Trigger (in this example it's mapped to a flow context variable named from_addresses) and returns only the mail address so that it can be used in an address field of a mail action later on.
import javax.mail.internet.InternetAddress; //Get the variable 'from_addresses' from the flowContext String mailAddressFromResult = flowContext.get("from_addresses").get(0); String mailAddress = new InternetAddress(mailAddressFromResult).getAddress(); //put the new variable to the flowContext flowContext.put("mail_from", mailAddress);
Basic Listener that picks up the RESULT from a File Trigger
This basic Listener class (to be run in a Java action) shows how to pick up the result from a File Trigger (which should be the previous action in the workflows) and prints out the list of files found. The example helps in demonstrating how to pick up flow context information and process it using the java action.
To use the listener, you'll need to compile it and then package it into a JAR file. Once that's done, place it in Flux's classpath (or point to the jar file using the workflow property Listener Classpath).
import flux.ActionListener; import flux.KeyFlowContext; import flux.file.FileTrigger; import java.util.Iterator; public class ActiveWindowListener implements ActionListener { public Object actionFired(KeyFlowContext fc) throws Exception { FileTrigger.FileTriggerResult ftr = (FileTrigger.FileTriggerResult) fc.getLastResult(); Iterator it = ftr.url_string_matches.iterator(); System.out.println("This example prints out the url_string_matches for the files found."); while (it.hasNext()) { String file = it.next().toString(); System.out.println("File \"" + file + "\" exists"); } System.out.println(""); return null; } }
Create a Workflow that Makes Use of all File Actions
This code demonstrates how to build and use Flux's file actions using the APIs. It also shows how to set up Runtime Data Mapping between actions.
You'll need to set up the following runtime configuration variables in you runtime configuration path (or tweak the code to hardcode the values).
/CreatePath=<folder_path> /NewFile=<filename> /MovePath=<folder_path> /RenameFile=<file_rename>
import flux.*; import flux.file.*; import flux.repository.RepositoryAdministrator; import flux.runtimeconfiguration.RuntimeConfigurationNode; import flux.runtimedatamap.ActionPropertyTarget; import flux.runtimedatamap.FlowContextSource; import java.util.HashMap; import java.util.Map; public class FileActions { public static FlowChart FileActions(String namespace, RuntimeConfigurationNode runtime) throws EngineException { FlowChart flowChart = EngineHelper.makeFlowChart(namespace + "FileActions"); FileFactory fileFactory = flowChart.makeFileFactory(); FileCreateAction fileCreate = fileFactory.makeFileCreateAction("Create File"); IncludesFileCriteria fileCreateIncludes = fileFactory.makeIncludesFileCriteria(); fileCreateIncludes.include("${runtime CreatePath}" + "/" + "${runtime NewFile}"); fileCreate.addSource(fileCreateIncludes); FileExistTrigger fileTrigger = fileFactory.makeFileExistTrigger("Poll for Files"); fileTrigger.addSource(fileCreateIncludes); FileMoveAction fileMove = fileFactory.makeFileMoveAction("Move Files"); TargetFileCriteria moveTarget = fileFactory.makeTargetFileCriteria(); moveTarget.setDirectory("${runtime MovePath}"); fileMove.addTarget(moveTarget); Map runtimeDataMap = new HashMap(); FlowContextSource source = flowChart.makeFlowContextSource("RESULT.url_string_matches"); ActionPropertyTarget target = flowChart.makeActionPropertyTarget("Includes File Criteria"); runtimeDataMap.put(source, target); FileCopyAction fileCopy = fileFactory.makeFileCopyAction("Copy Files"); IncludesFileCriteria copyIncludes = fileFactory.makeIncludesFileCriteria(); copyIncludes.include("${runtime MovePath}" + "/" + "${runtime NewFile}"); fileCopy.addSource(copyIncludes); TargetFileCriteria copyTarget = fileFactory.makeTargetFileCriteria(); copyTarget.setDirectory("${runtime CreatePath}"); fileCopy.addTarget(copyTarget); Renamer renamer = fileFactory.makeGlobRenamer("${runtime NewFile}" ,"${runtime RenameFile}"); fileCopy.setRenamer(renamer); FileDeleteAction fileDelete1 = fileFactory.makeFileDeleteAction("File Delete 1"); IncludesFileCriteria delete1Includes = fileFactory.makeIncludesFileCriteria(); delete1Includes.include("${runtime CreatePath}" + "/" + "${runtime RenameFile}"); fileDelete1.addSource(delete1Includes); FileDeleteAction fileDelete2 = fileFactory.makeFileDeleteAction("File Delete 1"); IncludesFileCriteria delete2Includes = fileFactory.makeIncludesFileCriteria(); delete2Includes.include("${runtime MovePath}" + "/" + "${runtime NewFile}"); fileDelete2.addSource(delete2Includes); FileNotExistTrigger notExistTrigger1 = fileFactory.makeFileNotExistTrigger("File Not Exist1"); notExistTrigger1.addSource(delete1Includes); FileNotExistTrigger notExistTrigger2 = fileFactory.makeFileNotExistTrigger("File Not Exist2"); notExistTrigger2.addSource(delete2Includes); ConsoleAction console1 = flowChart.makeConsoleAction("Console 1"); console1.setMessage("${date yyyy.MM.dd HH:mm:ss} - ${runtime RenameFile} not found in ${runtime CreatePath}!\n"); ConsoleAction console2 = flowChart.makeConsoleAction("Console 2"); console2.setMessage("${date yyyy.MM.dd HH:mm:ss} - ${runtime NewFile} not found in ${runtime MovePath}!\n"); fileCreate.setStartAction(true); fileCreate.addFlow(fileTrigger); fileTrigger.addFlow(fileMove); fileMove.addFlow(fileCopy); fileCopy.addFlow(fileDelete1); fileCopy.addFlow(fileDelete2); fileDelete1.addFlow(notExistTrigger1); fileDelete2.addFlow(notExistTrigger2); notExistTrigger1.addFlow(console1); notExistTrigger2.addFlow(console2); return flowChart; } public static void main(String[] args) throws Exception { Factory factory = Factory.makeInstance(); Engine engine = factory.lookupEngine("localhost"); engine.login("admin", "admin"); RuntimeConfigurationNode runtime = engine.getRuntimeConfiguration(); String namespace = "${runtime namespace}"; //Comment out this two lines if you don't want to upload the wf to the wf repository RepositoryAdministrator repoAdmin = engine.getRepositoryAdministrator(); repoAdmin.put(FileActions(namespace,runtime), true); //Comment this line out if you only want to upload the wf to the wf repository engine.put(FileActions(namespace,runtime)); System.out.println("Exported to the engine"); } }
MFT Workflow Example
This example shows how to put together a Managed File Transfer workflow using Flux's APIs
You'll need to set up a runtime configuration file with the paths and
import flux.*; import flux.file.*; import flux.notification.MailAction; import flux.notification.NotificationFactory; import flux.repository.RepositoryAdministrator; import flux.runtimeconfiguration.RuntimeConfigurationNode; import flux.runtimedatamap.ActionPropertyTarget; import flux.runtimedatamap.FlowContextSource; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class MFTExample { public static FlowChart MFTExample(String namespace, RuntimeConfigurationNode runtime) throws EngineException { FlowChart flowChart = EngineHelper.makeFlowChart(namespace + "MFT-Example"); FileFactory fileFactory = flowChart.makeFileFactory(); NotificationFactory notificationFactory = flowChart.makeNotificationFactory(); TimerTrigger timer = flowChart.makeTimerTrigger("Timer"); timer.setTimeExpression("0 0 0 9 * * 2-6 * * * *"); NetworkHost sftpHost = fileFactory.makeSecureFtpHost(); sftpHost.setName("${runtime SFTPserver}"); sftpHost.setPort(22); sftpHost.setUsername("${runtime SFTPuser}"); sftpHost.setPassword("fluxtest"); NetworkHost internalHost = fileFactory.makeSecureFtpHost(); internalHost.setName("${runtime LOCALserver}"); internalHost.setPort(22); internalHost.setUsername("${runtime LOCALuser}"); internalHost.setPassword("fluxtest"); FileExistTrigger fileExistSFTP = fileFactory.makeFileExistTrigger("File Exist SFTP"); IncludesFileCriteria SFTPIncludes = fileFactory.makeIncludesFileCriteria(); SFTPIncludes.setHost(sftpHost); SFTPIncludes.include("${runtime SFTPsource}" + "/" + "${runtime SFTPsourceFile}"); fileExistSFTP.addSource(SFTPIncludes); ForEachCollectionElementAction iterateSFTP = flowChart.makeForEachCollectionElementAction("Iterate SFTP Files"); iterateSFTP.setLoopIndex("SFTPfiles"); Map runtimeDataMapSFTP = new HashMap(); FlowContextSource source = flowChart.makeFlowContextSource("RESULT.filename_matches"); ActionPropertyTarget target = flowChart.makeActionPropertyTarget("Collection"); runtimeDataMapSFTP.put(source, target); RegularExpressionAction regexAction = flowChart.makeRegularExpressionAction("Regular Expression Action"); regexAction.setPattern("^(19|20)\\d\\d([- /.])(0[1-9]|1[012])\\2(0[1-9]|[12][0-9]|3[01])$"); regexAction.setInput("${SFTPfiles?substring(7,17)}"); FileMoveAction fileMove = fileFactory.makeFileMoveAction("File Move From SFTP"); IncludesFileCriteria fileMoveIncludes = fileFactory.makeIncludesFileCriteria(); fileMoveIncludes.setHost(sftpHost); fileMoveIncludes.include("${runtime SFTPsource}" + "/" + "${SFTPFiles}"); fileMove.addSource(fileMoveIncludes); TargetFileCriteria fileMoveTarget = fileFactory.makeTargetFileCriteria(); fileMoveTarget.setHost(sftpHost); fileMoveTarget.setDirectory("${runtime LOCALdir}"); MailAction moveErrorMail = notificationFactory.makeMailAction("File Move From SFTP Error"); moveErrorMail.setMailServer("${runtime MAILserver}"); moveErrorMail.setPort(465); moveErrorMail.setUsername("${runtime MAILuser}"); moveErrorMail.setPassword("<D94%d5g"); moveErrorMail.setFromAddress("${runtime MAILFromAddr}"); List<String> toAddr = new ArrayList<String>(); toAddr.add("${runtime MAILToAddr}"); moveErrorMail.setToAddresses(toAddr); moveErrorMail.setSubject("[FluxSamples] File Move Failed"); moveErrorMail.setBody("There was an error moving the file ${SFTPfiles} from the remote server. \n" + "\n" + "Please contact your Flux administrator for more information. \n"); FileExistTrigger fileExistLocal = fileFactory.makeFileExistTrigger("File Exist Local"); IncludesFileCriteria fileLocalIncludes = fileFactory.makeIncludesFileCriteria(); fileLocalIncludes.setHost(sftpHost); fileLocalIncludes.include("${runtime LOCALdir}" + "/" + "${runtime LOCALsourceFile}"); fileExistLocal.addSource(fileLocalIncludes); ForEachCollectionElementAction iterateLocal = flowChart.makeForEachCollectionElementAction("Iterate Local Files"); iterateLocal.setLoopIndex("DECRYPTfiles"); Map runtimeDataMapLocal = new HashMap(); FlowContextSource source1 = flowChart.makeFlowContextSource("RESULT.filename_matches"); ActionPropertyTarget target1 = flowChart.makeActionPropertyTarget("Collection"); runtimeDataMapLocal.put(source1, target1); FileDecryptPgpAction pgpDecrypt = new FileDecryptPgpAction(flowChart, "Decrypt SFTP Files"); IncludesFileCriteria pgpDecryptIncludes = fileFactory.makeIncludesFileCriteria(); pgpDecryptIncludes.setHost(internalHost); pgpDecryptIncludes.include("${runtime LOCALdir}" + "/" + "${DECRYPTfiles}"); pgpDecrypt.addSource(pgpDecryptIncludes); TargetFileCriteria pgpDecryptTarget = fileFactory.makeTargetFileCriteria(); pgpDecryptTarget.setHost(internalHost); pgpDecryptTarget.setDirectory("${runtime DECRYPTdir}"); pgpDecrypt.addTarget(pgpDecryptTarget); pgpDecrypt.setDeleteSource(true); pgpDecrypt.setKeyNamespace("/PGP Key Pair/Customer Private Key"); MailAction decryptErrorMail = notificationFactory.makeMailAction("Decrypt Failed Error"); decryptErrorMail.setMailServer("${runtime MAILserver}"); decryptErrorMail.setPort(465); decryptErrorMail.setUsername("${runtime MAILuser}"); decryptErrorMail.setPassword("<D94%d5g"); decryptErrorMail.setFromAddress("${runtime MAILFromAddr}"); List<String> decryptToAddr = new ArrayList<String>(); toAddr.add("${runtime MAILToAddr}"); decryptErrorMail.setToAddresses(decryptToAddr); decryptErrorMail.setSubject("[FluxSamples] Decryption Failed"); decryptErrorMail.setBody("There was an error decrypting the file ${DECRYPTfiles}. \n" + "\n" + "Please contact your Flux administrator for more information.\n"); FileExistTrigger fileExistEncrypt = fileFactory.makeFileExistTrigger("File Exist Re-Encrypt"); IncludesFileCriteria fileReEncryptIncludes = fileFactory.makeIncludesFileCriteria(); fileReEncryptIncludes.setHost(internalHost); fileReEncryptIncludes.include("${runtime DECRYPTdir}" + "/" + "*.txt"); fileExistEncrypt.addSource(fileReEncryptIncludes); ForEachCollectionElementAction iterateEncrypt = flowChart.makeForEachCollectionElementAction("Iterate Files for Re-Encrypt"); iterateLocal.setLoopIndex("ReEncryptFiles"); Map runtimeDataMapEncrypt = new HashMap(); FlowContextSource source2 = flowChart.makeFlowContextSource("RESULT.filename_matches"); ActionPropertyTarget target2 = flowChart.makeActionPropertyTarget("Collection"); runtimeDataMapEncrypt.put(source2, target2); FileEncryptPgpAction pgpEncrypt = new FileEncryptPgpAction(flowChart, "Re-Encrypt Files"); IncludesFileCriteria pgpEncryptIncludes = fileFactory.makeIncludesFileCriteria(); pgpEncryptIncludes.setHost(internalHost); pgpEncryptIncludes.include("${runtime DECRYPTdir}" + "/" + "${ReEncryptFiles}"); pgpEncrypt.addSource(pgpEncryptIncludes); TargetFileCriteria pgpEncryptTarget = fileFactory.makeTargetFileCriteria(); pgpEncryptTarget.setHost(internalHost); pgpEncryptTarget.setDirectory("${runtime ENCRYPTdir}"); pgpEncrypt.addTarget(pgpDecryptTarget); pgpEncrypt.setDeleteSource(true); pgpEncrypt.setKeyNamespace("/PGP Key Pair/Re-Encrypt Key"); MailAction encryptErrorMail = notificationFactory.makeMailAction("Re-Encrypt Error"); encryptErrorMail.setMailServer("${runtime MAILserver}"); encryptErrorMail.setPort(465); encryptErrorMail.setUsername("${runtime MAILuser}"); encryptErrorMail.setPassword("<D94%d5g"); encryptErrorMail.setFromAddress("${runtime MAILFromAddr}"); List<String> encryptToAddr = new ArrayList<String>(); toAddr.add("${runtime MAILToAddr}"); encryptErrorMail.setToAddresses(encryptToAddr); encryptErrorMail.setSubject("[FluxSamples] Re-Encryption Failed"); encryptErrorMail.setBody("There was an error encrypting the file ${ReEncryptFiles}. \n" + "\n" + "Please contact your Flux administrator for more information. \n"); timer.setStartAction(true); timer.addFlow(fileExistSFTP); fileExistSFTP.addFlow(iterateSFTP); iterateSFTP.addFlow(regexAction); regexAction.addFlow(iterateSFTP).setCondition("RESULT.matched = false"); regexAction.addFlow(fileMove).setCondition("RESULT.matched=true"); fileMove.addFlow(iterateSFTP); fileMove.setErrorFlow(moveErrorMail); iterateSFTP.setElseFlow(fileExistLocal); fileExistLocal.addFlow(iterateLocal); iterateLocal.addFlow(pgpDecrypt); pgpDecrypt.addFlow(iterateLocal); pgpDecrypt.setErrorFlow(decryptErrorMail); decryptErrorMail.addFlow(iterateLocal); iterateLocal.setElseFlow(fileExistEncrypt); fileExistEncrypt.addFlow(iterateEncrypt); iterateEncrypt.addFlow(pgpEncrypt); pgpEncrypt.addFlow(iterateEncrypt); pgpEncrypt.setErrorFlow(encryptErrorMail); encryptErrorMail.addFlow(iterateEncrypt); iterateEncrypt.setElseFlow(timer); return flowChart; } public static void main(String[] args) throws Exception { Factory factory = Factory.makeInstance(); Engine engine = factory.lookupEngine("localhost"); engine.login("admin", "admin"); RuntimeConfigurationNode runtime = engine.getRuntimeConfiguration(); String namespace = "${runtime namespace}"; //Comment out this two lines if you don't want to upload the wf to the wf repository RepositoryAdministrator repoAdmin = engine.getRepositoryAdministrator(); repoAdmin.put(MFTExample(namespace, runtime), true); //Comment this line out if you only want to upload the wf to the wf repository engine.put(MFTExample(namespace,runtime)); System.out.println("Exported to the engine"); } }
Dynamic Java Action Listener
This sample code shows a simple Listener class to be used with a Dynamic Java Action.
public class DynamicJavaActionListener { /** * @param args * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { Thread.sleep(5000); System.out.println("Dynamic Java Action"); } public int printNumber(int arg){ System.out.println("Dynamic Java Action test: The number received is: " + arg); System.out.println("Dynamic Java Action test: Now the Java method multiplies it by 2 and returns it to the workflow"); int i = 2; i = arg*i; return i; } }
Invoke a Secure REST Service
import flux.ActionListener; import flux.Engine; import flux.EngineHelper; import flux.Factory; import flux.FlowChart; import flux.JavaAction; import flux.KeyFlowContext; import flux.RestAction; import jersey.server.api.container.filter.LoggingFilter; import jersey.server.api.core.DefaultResourceConfig; import jersey.server.api.core.ResourceConfig; import jersey.server.spi.resource.Singleton; import jersey.simple.container.SimpleServerFactory; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Context; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; import javax.ws.rs.core.UriBuilder; import java.io.Closeable; import java.io.IOException; import java.net.URI; import java.net.URL; import java.util.Properties; /** * Shows how to perform an operation on a secured REST Service. */ public class InvokeSecureRestService { public static final String CONTEXT = ""; public static final int port = 9193; private Closeable server; public InvokeSecureRestService() throws Exception { Factory fluxFactory = Factory.makeInstance(); Engine engine = fluxFactory.makeEngine(); // Start the web server startServer(); FlowChart job = EngineHelper.makeFlowChart("Secure Rest Services Example"); RestAction restAction = job.makeRestAction("Rest Action"); // Specify the URL the rest action is to use. restAction.setBaseUrl(new URL("http://localhost:9193/secure")); restAction.setResponseType(String.class); restAction.setUsername("joe"); restAction.setPassword("secret"); Properties properties = new Properties(); properties.setProperty("s","GOOG+YHOO+HPQ+IBM+MSFT+ORCL"); properties.setProperty("f","snl1d1t1ohgdr"); restAction.setQueryParameters(properties); // Create a Java Action to process the result from the REST service JavaAction javaAction = job.makeJavaAction("Process REST response"); javaAction.setListener(JavaActionListener.class); // Make the REST Action flow into the Java Action restAction.addFlow(javaAction); // Schedule the job for immediate execution. String jobId = engine.put(job); // Start the engine. engine.start(); // Give the engine a chance to run the job, up to 10 seconds. engine.join(jobId, "+30s", "+5s"); // Clean up the resources. engine.dispose(); // Stop the web server stopServer(); System.out.println("Secure REST Services example finished."); } // constructor @Path("/secure") @Singleton public static class SecureResource { @GET public String get(@Context HttpHeaders h) { String value = h.getRequestHeaders().getFirst("Authorization"); if (value == null) { throw new WebApplicationException(Response.status(401).header("WWW-Authenticate", "Basic realm=\"WallyWorld\"").build()); } return "AUTHENICATED GET RESPONSE"; } } private void startServer() { ResourceConfig rc = new DefaultResourceConfig(SecureResource.class); rc.getProperties().put(ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS, LoggingFilter.class.getName()); if (server != null) { stopServer(); } System.out.println("Starting Server port " + port); URI u = UriBuilder.fromUri("http://localhost").port(port).build(); try { server = SimpleServerFactory.create(u, rc); } catch (IOException e) { throw new RuntimeException(e); } System.out.println("Started WebServer"); } public void stopServer() { if (server != null) { try { server.close(); } catch (IOException e) { e.printStackTrace(); } } System.out.println("Stopped WebServer"); } public static class JavaActionListener implements ActionListener { public Object actionFired(KeyFlowContext flowContext) throws Exception { RestAction.RestActionResult result = (RestAction.RestActionResult) flowContext.get("RESULT"); System.out.println("Response : " + result.result); System.out.println("Response status : " + result.responseStatus); System.out.println("Response code : " + result.responseCode); return null; } } public static void main(String[] args) { try { System.out.println("Running the Secure REST Services example."); System.out.println(""); new InvokeSecureRestService(); } // try catch (Exception e) { e.printStackTrace(); } // catch } // main() }
Create and Upload a Custom Business Interval to the Repository
This sample shows how to put together a Business Interval, and exclude weekends as well as given dates. You can use the printScedule() method to test the Business Interval and see the next firing schedules.
import flux.*; import flux.repository.RepositoryAdministrator; import fluximpl.time_expression.Relative_Exp; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import java.util.GregorianCalendar; public class CustomBusinessCalendar { public AllowableBusinessInterval createCalendar() throws Exception { BusinessIntervalFactory bif = EngineHelper.makeBusinessIntervalFactory(); BusinessInterval weekend = bif.makeSaturdaySundayWeekend(); weekend.exclude("Office time", null, "+14H", "0 0 1 17 * * *"); //Exclude off hours from 17:01 until 7:59 //List of holidays set up as time expressions rather than hard-coded Relative rel1 = new Relative_Exp(); rel1.acceptRelativeString(">may@25d<mon"); //Victoria Day weekend.excludeDay("Victoria Day", rel1.next()); Cron cron1 = EngineHelper.makeCron(); cron1.accept("0 0 0 0 ^MO SEP * * * * *"); //Labour Day weekend.excludeDay("Labour Day", cron1.next()); Cron cron2 = EngineHelper.makeCron(); cron2.accept("0 0 0 0 2MO OCT * * * * *"); //Thanksgiving weekend.excludeDay("Thanksgiving", cron2.next()); Relative rel2 = new Relative_Exp(); rel2.acceptRelativeString(">dec@25d>b"); //Boxing Day weekend.excludeDay("Boxing Day", rel2.next()); int thisYear = new GregorianCalendar().get(Calendar.YEAR); Calendar c = CustomBusinessCalendar.findEaster(thisYear); Date easter = c.getTime(); weekend.excludeDay("Easter", easter); Calendar c1 = c; c1.add(Calendar.DATE, 1); Date easterMonday = c1.getTime(); weekend.excludeDay("Easter Monday", easterMonday); Calendar c2 = c; c2.add(Calendar.DATE, -2); Date goodFriday = c2.getTime(); weekend.excludeDay("Good Friday", goodFriday); //List of holidays shown in http://www.statutoryholidays.com/2016.php SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy"); Date d1 = sdf.parse("15/02/2016"); //Feb 15th - Islander Day, Louis Riel Day, Heritage Day, Family Day Date d2 = sdf.parse("25/03/2016"); Date d3 = sdf.parse("28/03/2016"); Date d4 = sdf.parse("23/05/2016"); Date d5 = sdf.parse("21/06/2016"); //Aboriginal Day Date d6 = sdf.parse("24/06/2016"); //St. Jean Baptiste Day Date d7 = sdf.parse("01/07/2016"); //Canada Day Date d8 = sdf.parse("01/08/2016"); //Civic Holiday Date d9 = sdf.parse("05/09/2016"); Date d10 = sdf.parse("10/10/2016"); Date d11 = sdf.parse("11/11/2016"); //Remembrance Day Date d12 = sdf.parse("25/12/2016"); //Christmas Date d13 = sdf.parse("26/12/2016"); //Exclude the holiday dates from the Business Interval weekend.excludeDay("Feb 15", d1); //weekend.excludeDay("Good Friday", d2); //weekend.excludeDay("Easter Monday", d3); //Excluded as a time expression above weekend.excludeDay("Victoria Day", d4); //Excluded as a time expression above weekend.excludeDay("Aboriginal Day", d5); weekend.excludeDay("St. Jean Baptiste Day", d6); weekend.excludeDay("Canada Day", d7); weekend.excludeDay("Civic Holiday", d8); //weekend.excludeDay("Labour Day", d9); //Excluded as a time expression above //weekend.excludeDay("Thanksgiving", d10); //Excluded as a time expression above weekend.excludeDay("Remembrance Day", d11); weekend.excludeDay("Christmas Day", d12); //weekend.excludeDay("Boxing Day", d13); //Excluded as a time expression above return weekend; } public static final Calendar findEaster(int year) { if (year <= 1582) { throw new IllegalArgumentException( "Algorithm invalid before April 1583"); } int golden, century, x, z, d, epact, n; golden = (year % 19) + 1; /* E1: metonic cycle */ century = (year / 100) + 1; /* E2: e.g. 1984 was in 20th C */ x = (3 * century / 4) - 12; /* E3: leap year correction */ z = ((8 * century + 5) / 25) - 5; /* E3: sync with moon's orbit */ d = (5 * year / 4) - x - 10; epact = (11 * golden + 20 + z - x) % 30; /* E5: epact */ if ((epact == 25 && golden > 11) || epact == 24) epact++; n = 44 - epact; n += 30 * (n < 21 ? 1 : 0); /* E6: */ n += 7 - ((d + n) % 7); if (n > 31) /* E7: */ return new GregorianCalendar(year, 4 - 1, n - 31); /* April */ else return new GregorianCalendar(year, 3 - 1, n); /* March */ } public void printSchedule(Cron cron) throws Exception { cron.accept("0 0 0 +1 *b"); cron.setBusinessInterval(this.createCalendar()); // Print out the next X schedules. for (int i = 0; i < 160; ++i) { System.out.println(cron.next()); } } public static void main(String[] args) throws Exception { Factory factory = Factory.makeInstance(); Engine engine = factory.lookupEngine("192.168.0.100"); engine.login("admin", "admin"); CustomBusinessCalendar office = new CustomBusinessCalendar(); Cron cron = EngineHelper.makeCron(); cron.accept("0 0 0 +1 *b *"); cron.setBusinessInterval(office.createCalendar()); //Print the next firings office.printSchedule(cron); //Save the Business Calendar to the BI Repository RepositoryAdministrator repoAdmin = engine.getRepositoryAdministrator(); repoAdmin.put("Business Calendar - Canada Holidays", office.createCalendar(), true); } }
Here is the same business calendar packaged as a workflow, with a single action, where the code is contained in a prescript. Note that in order to accomplish this, simply copy the code inside the main method to the end of prescript leaving everything else unchanged. Running this workflow with then execute this code.
CreateCanadianBusinessCalendar.ffc
Message Queuing with One-to-many Delivery Using the Audit Trail Trigger
In addition to its extensive and powerful logging capabilities, the Flux audit trail can be used as a robust, lightweight messaging platform, allowing delivery of automated notifications and even data transfer between workflows on the Flux engine.
The code below is the API example corresponding to the Message Queuing with One-to-many Delivery Using the Audit Trail Trigger example workflow.
import flux.Engine; import flux.EngineException; import flux.Factory; import flux.FlowChart; import flux.repository.RepositoryAdministrator; public class AddListenersToEngine { public static void main(String[] args) throws EngineException, CloneNotSupportedException { Engine engine = Factory.makeInstance().lookupEngine("localhost"); RepositoryAdministrator repositoryAdministrator = engine.getRepositoryAdministrator(); FlowChart template = repositoryAdministrator.getElement("/Template").getFlowChart(); for (int i = 0; i < 10; i++) { FlowChart listener = (FlowChart) template.clone(); listener.setName("/MessageListener" + i); listener.getVariableManager().put("EventToMonitor", "MyCustomEvent"); engine.put(listener); } System.out.println("Listeners running.\nReturn to \"StartEngineMain\" and follow step 3."); } }
import flux.*; public class PublishAuditTrailMessage { public static void main(String[] args) throws EngineException { Engine engine = Factory.makeInstance().lookupEngine("localhost"); FlowChart flowChart = EngineHelper.makeFlowChart("/AuditTrailMessagePublisher"); NullAction nullAction = flowChart.makeNullAction("Null Action"); nullAction.setPrescript("flowContext.sendToAuditTrail(\"MyCustomEvent\", \"EventMessage\");"); engine.put(flowChart); System.out.println("Message published.\nReturn to \"StartEngineMain\" and watch for the message receipt."); } }
import flux.Configuration; import flux.Engine; import flux.EngineException; import flux.Factory; import flux.logging.Level; import flux.logging.LoggerType; import java.util.HashSet; import java.util.Set; public class StartEngineMain { public static void main(String[] args) throws EngineException { Factory factory = Factory.makeInstance(); Configuration config = new Configuration(); config.setLoggerType(LoggerType.INTERNAL_ASYNCHRONOUS); config.setInternalLoggerLevel(Level.WARNING); Set<String> auditTrailFilter = config.getAuditTrailFilter(); if (auditTrailFilter == null) auditTrailFilter = new HashSet<String>(); auditTrailFilter.add("flux.audittrail.server.StartingRunEvent"); auditTrailFilter.add("MyCustomEvent"); Engine engine = factory.makeEngine(config); engine.start(); System.out.println("Engine Started! Next, perform the following steps:\n\n"); System.out.println("1) Run \"UploadTemplate\" to add the listener template to the repository.\n"); System.out.println("2) Run \"AddListenersToEngine\" to create 10 listener instances and put them on the engine.\n"); System.out.println("3) Run \"PublishAuditTrailMessage\" to deliver a message to the audit trail. One of the waiting listeners\n" + "\twill pick up the message - watch stdout here (the JVM where the engine is running) to see the message receipt confirmation.\n\n"); System.out.println("Repeat step 3 to publish additional messages to the queue on demand!\n\n"); } }
import flux.*; import flux.repository.RepositoryAdministrator; public class UploadTemplate { public static void main(String[] args) throws EngineException { Engine engine = Factory.makeInstance().lookupEngine("localhost"); RepositoryAdministrator repositoryAdministrator = engine.getRepositoryAdministrator(); FlowChart flowChart = EngineHelper.makeFlowChart("/Template"); AuditTrailTrigger auditTrailTrigger = flowChart.makeAuditTrailTrigger("Audit Trail Trigger"); auditTrailTrigger.setPrescript("action.setEventName(\"${EventToMonitor}\");\n" + "if(vm.get(\"MessageToMonitor\") != null) {\n" + " action.setEventMessage(\"${MessageToMonitor}\");\n" + "}"); // Dummy event name that will be overwritten by the prescript. auditTrailTrigger.setEventName("flux.audittrail.client.EnteringCallEvent"); ConsoleAction consoleAction = flowChart.makeConsoleAction("Console"); consoleAction.setMessage("\n" + "====\n" + "Received a message from ${RESULT.namespace} with timestamp ${RESULT.timestamp}.\n" + "\n" + "Message contents: ${RESULT.eventMessage}\n" + "Message sent from action: ${RESULT.actionName}\n" + "====\n" + "\n"); auditTrailTrigger.addFlow(consoleAction); consoleAction.addFlow(auditTrailTrigger); repositoryAdministrator.put(flowChart, true); System.out.println("Template Uploaded.\nReturn to \"StartEngineMain\" and follow step 2."); } }
You can download all four files of this example here: messaging_one_to_many.zip
Read from a JMS queue using the JMS Trigger
The following is unsupported example code to interact with a JMS queue from within Flux. Download the necessary jars here: jmstrigger(1).zip
import flux.*; import flux.j2ee.JmsAction; import flux.j2ee.JmsMessageType; import fluximpl.TextMessageImpl; import org.junit.After; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import java.util.Enumeration; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import static junit.framework.Assert.assertEquals; /** * JmsTrigger example. * * @author arul@flux.ly */ @Ignore public class JmsTriggerTest extends ActiveMQBroker { private static final String queue = "example.MyQueue"; private static final String jndiName = "MyQueue"; private static final String activeMQCF = "org.apache.activemq.jndi.ActiveMQInitialContextFactory"; private static final String connectionFactory = "queueConnectionFactory"; private static final String vmProviderUrl = "vm://localhost"; private static final String tcpProviderUrl = "tcp://localhost:61616"; private Engine engine; Factory factory = Factory.makeInstance(); static Map<String, String> expectedResult = new HashMap<String, String>(); @Before public void setup() throws Exception { startBroker(); Configuration config = factory.makeConfiguration(); config.setDatabaseType(DatabaseType.H2); engine = factory.makeEngine(config); engine.start(); expectedResult.put("symbol", "FB"); expectedResult.put("size", "4000"); expectedResult.put("type", "SELL"); expectedResult.put("user", "bigjoe"); } @Test public void producerConsumerTest() throws Exception { EngineHelper helper = factory.makeEngineHelper(); FlowChart workflow = helper.makeFlowChart("jms trigger"); JmsAction publisher1 = workflow.makeJ2eeFactory().makeJmsAction("publisher1"); publisher1.setConnectionFactory(connectionFactory); publisher1.setMessageType(JmsMessageType.TEXT); publisher1.setBody("This goes in body"); Map<String, String> stringProperties = new HashMap<String, String>(); stringProperties.put("symbol", "FB"); stringProperties.put("size", "4000"); stringProperties.put("type", "SELL"); stringProperties.put("user", "bigjoe"); publisher1.setProperties(stringProperties); publisher1.setType("NASDAQ"); publisher1.setDestinationName(jndiName); publisher1.setCorrelationId("1234"); publisher1.setProviderUrl(vmProviderUrl); publisher1.setPostscript("System.out.println(\"Message1 published!\");"); JmsAction publisher2 = workflow.makeJ2eeFactory().makeJmsAction("publisher2"); publisher2.setConnectionFactory(connectionFactory); publisher2.setMessageType(JmsMessageType.TEXT); publisher2.setBody("This goes in body"); stringProperties = new HashMap<String, String>(); stringProperties.put("symbol", "LNKD"); stringProperties.put("size", "400"); stringProperties.put("type", "BUY"); stringProperties.put("user", "littlejoe"); publisher2.setProperties(stringProperties); publisher2.setType("NYSE"); publisher2.setDestinationName(jndiName); publisher2.setCorrelationId("5678"); publisher2.setProviderUrl(vmProviderUrl); publisher2.setPostscript("System.out.println(\"Message2 published!\");"); JmsAction publisher3 = workflow.makeJ2eeFactory().makeJmsAction("publisher3"); publisher3.setConnectionFactory(connectionFactory); publisher3.setMessageType(JmsMessageType.TEXT); publisher3.setBody("This goes in body"); stringProperties = new HashMap<String, String>(); stringProperties.put("symbol", "FB"); stringProperties.put("size", "400"); stringProperties.put("type", "BUY"); stringProperties.put("user", "littlejoe"); publisher3.setProperties(stringProperties); publisher3.setType("NASDAQ"); publisher3.setDestinationName(jndiName); publisher3.setCorrelationId("7890"); publisher3.setProviderUrl(vmProviderUrl); publisher3.setPostscript("System.out.println(\"Message3 published!\");"); JmsTriggerFactory customActionFactory = (JmsTriggerFactory) workflow.makeFactory("JmsTriggerFactory"); JmsTrigger consumer = customActionFactory.makeJmsTrigger("consumer"); consumer.setListeningQueueName(queue); consumer.setInitialContextFactory(activeMQCF); consumer.setProviderUrl(vmProviderUrl); consumer.setFetchAllMessages(true); consumer.setConnectionFactory(connectionFactory); consumer.setPollingDelay("+5s"); // consumer.setMessageSelector("size > 1000"); // consumer.setMessageSelector("symbol = 'FB'"); consumer.setMessageSelector("JMSType = 'NASDAQ' AND size > 1000"); // consumer.setMessageSelector("JMSCorrelationID = 1234"); publisher1.addFlow(publisher2); publisher2.addFlow(publisher3); publisher3.addFlow(consumer); JavaAction fetcher = workflow.makeJavaAction("message fetcher"); fetcher.setListener(MessageFetcher.class); fetcher.setTransactionBreak(true); consumer.addFlow(fetcher); JavaAction processor = workflow.makeJavaAction("message processor"); processor.setListener(MessageProcessor.class); processor.setTransactionBreak(true); fetcher.addFlow(processor); ConsoleAction console = workflow.makeConsoleAction("console"); console.setMessage("Workflow finished!"); console.println(); processor.addFlow(console); String name = engine.put(workflow); engine.join(name, "+60s", "+5s"); } @After public void teardown() throws Exception { engine.dispose(); stopBroker(); } public static class MessageFetcher implements ActionListener { @Override public Object actionFired(KeyFlowContext flowContext) throws Exception { JmsTrigger.JmsTriggerResult result = (JmsTrigger.JmsTriggerResult) flowContext.getLastResult(); if (result != null) System.out.println("Fetched messages # " + result.getMessagesReceived()); flowContext.put("jms", result); return null; } } public static class MessageProcessor implements ActionListener { @Override public Object actionFired(KeyFlowContext flowContext) throws Exception { JmsTrigger.JmsTriggerResult result = (JmsTrigger.JmsTriggerResult) flowContext.get("jms"); if (result != null) { System.out.println("Matched messages " + result.getMessages().size()); if (!result.getMessages().isEmpty()) { TextMessageImpl message = (TextMessageImpl) result.getMessages().get(0); assertEquals("This goes in body", message.getText()); Iterable<String> itr = Enumerator.make(message.getPropertyNames()); for (String key : itr) { System.out.println("K = " + key + "; V = " + message.getStringProperty(key)); assertEquals(expectedResult.get(key), message.getStringProperty(key)); } } } return null; } } public static class Enumerator<T> implements Iterable<T> { private final Enumeration<T> en; public Enumerator(Enumeration<T> en) { this.en = en; } public Iterator<T> iterator() { return new Iterator<T>() { public boolean hasNext() { return en.hasMoreElements(); } public T next() { return en.nextElement(); } public void remove() { throw new UnsupportedOperationException(); } }; } public static <T> Iterable<T> make(Enumeration<T> en) { return new Enumerator<T>(en); } } }
Monitor Workflows and Notify if a Workflow Fails
This example listens for two Audit Trail events: Exiting Trigger On Error and Exiting Action On Error. If either one is published by an action/trigger to the Audit Trail, a mail is fired to notify a user (or group of users) about it.
import flux.*; import flux.notification.MailAction; import flux.notification.NotificationFactory; import flux.runtimedatamap.FlowContextSource; import flux.runtimedatamap.FlowContextTarget; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class AuditTrailCheckFailedWf { public static FlowChart AuditTrailCheckFailedWf() throws EngineException { FlowChart flowChart = EngineHelper.makeFlowChart(); Factory factory = Factory.makeInstance(); NotificationFactory notificationFactory = flowChart.makeNotificationFactory(); //Listen for failed triggers AuditTrailTrigger failedTrigger = flowChart.makeAuditTrailTrigger("Check failed trigger"); failedTrigger.setEventName("ExitingTriggerOnErrorEvent"); failedTrigger.setStartAction(true); //Listen for failed actions AuditTrailTrigger failedAction = flowChart.makeAuditTrailTrigger("Check failed action"); failedAction.setEventName("ExitingActionOnErrorEvent"); failedAction.setStartAction(true); //Send alert mail MailAction mailAction = notificationFactory.makeMailAction("Send Alert"); mailAction.setFromAddress("${runtime MAILFromAddr}"); List<String> toAddresses = new ArrayList(); toAddresses.add("${runtime MAILToAddr}"); mailAction.setToAddresses(toAddresses); mailAction.setSubject("Workflow ${workflow} has FAILED"); mailAction.setMailServer("smtp.gmail.com"); mailAction.setPort(25); mailAction.setUsername("${runtime MAILuser}"); mailAction.setPassword("<Dw4%d5g"); mailAction.setBody("Workflow ${workflow} has failed. This action/trigger threw an exception: ${action}"); //Runtime data map from failed trigger to mail action Map runtimeDataMap1 = new HashMap(); FlowContextSource workflowName1 = flowChart.makeFlowContextSource("RESULT.namespace"); FlowContextTarget workflowTarget1 = flowChart.makeFlowContextTarget("workflow"); runtimeDataMap1.put(workflowName1, workflowTarget1); FlowContextSource triggerName1 = flowChart.makeFlowContextSource("RESULT.actionName"); FlowContextTarget actionTarget1 = flowChart.makeFlowContextTarget("action"); runtimeDataMap1.put(triggerName1, actionTarget1); //Runtime data map from failed action to mail action Map runtimeDataMap2 = new HashMap(); FlowContextSource workflowName2 = flowChart.makeFlowContextSource("RESULT.namespace"); FlowContextTarget workflowTarget2 = flowChart.makeFlowContextTarget("workflow"); runtimeDataMap2.put(workflowName2, workflowTarget2); FlowContextSource triggerName2 = flowChart.makeFlowContextSource("RESULT.actionName"); FlowContextTarget actionTarget2 = flowChart.makeFlowContextTarget("action"); runtimeDataMap2.put(triggerName2, actionTarget2); failedTrigger.addFlow(mailAction).setRuntimeDataMap(runtimeDataMap1); failedAction.addFlow(mailAction).setRuntimeDataMap(runtimeDataMap2); mailAction.addFlow(failedTrigger); mailAction.addFlow(failedAction); return flowChart; } }