Northwind Esper: Alerts
From RifidiWiki
This is Step 4 in the Northwind Application Tutorial<br> Previous Step: Step 3: Esper: Track Packages<br> Next Step: Step 5: Send Notifications Over JMS
What You Will Learn
- More about how to use Esper statements
Alert: Package Moved Backwards!
At this point, we can track the items that can be seen at the various read zones in our scenario. Now we need to implement the functionality that will fire an alert when an item moves backwards (that is from the weigh station to the dock door).
The Code
<pre> /** * A method that sets up business alerts */ private void setUpAlerts(){ // 1 Create a window for alert messages statements.add(esperService.getProvider().getEPAdministrator().createEPL( "create window alerts.win:length(20) (alert_type int, tag_ID String)"));
// 2 create a window for item leaving the weighstation statements.add(esperService.getProvider().getEPAdministrator().createEPL( "create window weighstation_recent.std:unique(tag_ID) (tag_ID String)"));
// 3 Insert items into weightstation_recent once they leave weighstation statements.add(esperService.getProvider().getEPAdministrator().createEPL( "insert rstream into weighstation_recent select tag_ID from weighstation"));
// 4 whenever an item is seen at the weighstation and then seen at the dockdoor, insert a new item into the alerts window statements.add(esperService.getProvider().getEPAdministrator().createEPL( "on pattern[every-distinct(tag.tag_ID) tag=weighstation_recent -> dockdoor(tag_ID = tag.tag_ID)] " + "insert into alerts " + "select 1 as alert_type, tag_ID as tag_ID " + "from weighstation_recent where tag_ID = tag.tag_ID")); } </pre>
The following code is an addition to the setupListeners() method that was previously defined: <pre> // Create a listener to handle Alerts StatementAwareUpdateListener alertListener = new StatementAwareUpdateListener() { @Override public void update(EventBean[] arrivals, EventBean[] departures, EPStatement arg2, EPServiceProvider arg3) { if (arrivals != null) { for (EventBean bean : arrivals) { int alertType = (Integer) bean.get("alert_type"); String id = (String) bean.get("tag_ID"); switch (alertType) { case 1: System.out.println("Package moved backwards: " + id ); break;
} } } } };
//Create a query that is triggered on insert and remove events from Weigh Station Window EPStatement queryAlert= esperService.getProvider().getEPAdministrator().createEPL( "select * from alerts"); queryAlert.addListener(alertListener); statements.add(queryAlert); </pre>
Explanation of the Code
-  First we need to create a window that will hold alerts.
- Using the <tt>win:length()</tt> view, the window will only keep track of the last 20 alerts
- This window is defined to hold two pieces of information per entry: an int that is the alert type, and a string that is the tag ID
 
- Next we create a window that will hold tags that have departed from the weigh station
- This statement inserts items that have left the <tt>weighstation</tt> window into the <tt>weighstation_recent</tt> window
-  Finally we get to define the rule!  This rule simply looks for the pattern of an event entering the <tt>weighstation_recent</tt> window followed by an event entering the <tt>dockdoor</tt> window which have the same ID.  Once this occurs, it makes a new entry in the <tt>alerts</tt> window.
- We use the <tt>every-distinct</tt> operator to ensure that we will only match the same tag once in the pattern.
- We also insert '1' as the alert type. We will assume that that event type corresponds to a 'tag moved backwards' alert.
 
Alert: Package Skipped the Dock Door!
The next requirement for the Northwind application is to detect when a package appears at the weigh station but was never seen at the dock door. This is a bit different from detecting events going from the weigh station back to the dock door, because we cannot use the followed-by operator on events coming out of the dock door. It is precisely the events that were never seen at the dock door we are looking for!
The code
Add the following statements to the <tt>setUpAlerts</tt> method: <pre> // 1 create a window for item leaving the dock door statements.add(esperService.getProvider().getEPAdministrator().createEPL( "create window dockdoor_recent.std:unique(tag_ID) (tag_ID String)"));
// 2 Insert items into dockdoor_recent once they leave dockdoor statements.add(esperService.getProvider().getEPAdministrator().createEPL( "insert rstream into dockdoor_recent select tag_ID from dockdoor"));
// 3 Any time we see a new weighstation event, check to see if it is not already in dockdoor_recent. If not, make a new alert. statements.add(esperService.getProvider().getEPAdministrator().createEPL( "insert into alerts " + "select 2 as alert_type, tag_ID as tag_ID " + "from weighstation as w " + "where not exists (select * from dockdoor_recent as d where d.tag_ID = w.tag_ID)")); </pre>
In addition, you need to modify the switch statement of the alerts handler: <pre> switch (alertType) { case 1: System.out.println("Package moved backwards: " + id ); break; case 2: System.out.println("Package skipped the dock door: " + id); break;
} </pre>
Explanation of the Code
- Create a named window that will hold tags that have recently departed from the dock_door
- Next we insert the tags that have left the <tt>dockdoor</tt> window into the <tt>dockdoor_recent</tt> window.
-  This statement inserts a new alert into the alerts window whenever a tag arrives at the <tt>weighstation</tt> window which has not been seen at the dock door.
- We will assume that 2 is the alert type of the 'package skipped dock door' alert.
- The statement <tt>select * from dockdoor_recent as d where d.tag_ID = w.tag_ID</tt> is a <tt>subquery</tt> that will return results (just like a normal query). By using it in conjunction with the <tt>exists</tt> condition, we can test whether or not the subquery returned any results. If it does not return results, we need to fire an alert.
 
Alert: Package is Lost!
The Code
Add the following to the setUpAlerts method <pre> // 1 Create a new alert whenever a package departs from the dock door and is not seen at the weighstation with a given time period statements.add(esperService.getProvider().getEPAdministrator().createEPL( "on pattern[every-distinct(tag.tag_ID) tag=dockdoor_recent -> " + "(timer:interval(5 min) and not weighstation(tag_ID = tag.tag_ID))] " + "insert into alerts " + "select 3 as alert_type, tag_ID as tag_ID " + "from dockdoor_recent where (tag_ID = tag.tag_ID)")); </pre> In addition, you need to modify the switch statement of the alerts handler: <pre> switch (alertType) { case 1: System.out.println("Package moved backwards: " + id ); break; case 2: System.out.println("Package skipped the dock door: " + id); break; case 3: System.out.println("Package is lost: " + id); break; } </pre>
Explanation of the Code
-  The only statement we have to add to implement the required functionality is, like the 'Package cannot move backwards' alert, implemented in two parts. The first part (on pattern) looks for a sequence to happen. The second part (insert into) does something whenever the first part happens.
- We can use the followed-by operator to specify that that we should look for an event leaving a dockdoor followed by a specific time period (15 seconds in our case) and no package with a corresponding id at the weigh station.
- When this pattern triggers, we need to insert a new event into the alerts window.
- For this kind of alert, we will assume the type will be 3.
 

