Saturday, January 5, 2013

Quick Start CEP with Esper - Stock Trading

Event processing is a method of tracking and analyzing (processing) streams of information (data) about things that happen (events), and deriving a conclusion from them. Complex event processing (CEP) is event processing that combines data from multiple sources to infer events or patterns that suggest more complicated circumstances. The goal of complex event processing is to identify meaningful events (such as opportunities or threats) and respond to them as quickly as possible. These events may be sales leads, orders or customer service calls. Or, they may be news items, text messages, social media posts, stock market feeds, traffic reports, weather reports, or other kinds of data. An event may also be defined as a "change of state," when a measurement exceeds a predefined threshold of time, temperature, or other value. You can find more about CEP at the wiki page

I use Esper as the CEP engine for this blogger to illustrate the complex event processing. Esper is open-source software available under the GNU General Public License (GPL). You can find it at 

The financial services was an early adopter of CEP technology, using complex event processing to structure and contextualize available data so that it could inform trading behavior, specifically algorithmic trading, by identifying opportunities or threats that indicate traders (or automatic trading systems) should buy or sell.

Here is an example. Suppose a day trader Joe wants to buy Google, symbol GOOG. His target price is set at $90, and he likes to buy at a dip where price hits 10% below the moving average. Here is the Esper code to illustrate the stock trade:
public class StockTrade {

 private static final Log log = LogFactory.getLog(StockTrade.class);

  public static class StockTradeListener implements UpdateListener {
   private double avgPrice=-1.0;
   private double minPrice=-1.0;
   private double targetPrice=0.0;
   private double dropPercetage=0.0;
   private boolean buyNow=false;
   public void update(EventBean[] newEvents, EventBean[] oldEvents) {
    if (newEvents == null || newEvents.length == 0) return;

    EventBean event = newEvents[0];
    avgPrice = event.get("AvgPrice") == null ? 0.0 :  (double)(((Double)event.get("AvgPrice")).doubleValue());
    minPrice = event.get("MinPrice") == null ? 0.0 :  (double)(((Double)event.get("MinPrice")).doubleValue());
    if (avgPrice>0 && minPrice>0) {"minPrice=" + minPrice + " avgPrice=" + avgPrice);
     if (minPrice<=targetPrice && minPrice<= (avgPrice* (100.00-dropPercetage) /100.00  ) ) {
      buyNow=true;"buyNow=" + buyNow);

   public StockTradeListener(double targetPrice, double dropPercetage) {
   this.targetPrice = targetPrice;
   this.dropPercetage = dropPercetage;

  public boolean isBuyNow() { return buyNow; }

 public boolean runStockTrade(String symbal, long interval, 
   double [] arrayPrice, double targetPrice, double dropPercetage ) throws InterruptedException {"symbal=" + symbal);"interval=" + interval);"targetPrice=" + targetPrice);"percetageDrop=" + dropPercetage);"arrayPrice=" + Arrays.toString(arrayPrice));
        // Configuration
        Configuration config = new Configuration();
        EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(config);

        // Creating a Statement
        String expression = "select avg(price) as AvgPrice, min(price) as MinPrice from sec)";
        EPStatement statement = epService.getEPAdministrator().createEPL(expression);

        // Adding a Listener
        StockTradeListener listener = new StockTradeListener(targetPrice, dropPercetage);

        for (int i=0; i<arrayPrice.length; i++) {
            // Sending events
            PriceEvent event = new PriceEvent(symbal, arrayPrice[i]);
            if ( listener.isBuyNow() ) return true; 

  return false;


Here is the event bean Java class:
public class PriceEvent {

 private String symbol;
    private double price;
    public PriceEvent(String symbol, double price) {
  this.symbol = symbol;
  this.price = price;

 public double getPrice() { return price; }
 public void setPrice(double price) { this.price = price; }

 public String getSymbol() { return symbol; }
 public void setSymbol(String symbol) { this.symbol = symbol; }

 public String toString() {
  return "PriceEvent [symbol=" + symbol + ", price=" + price + "]";

Here is the how you can get the source:

$ git clone 

$ cd CepSample

$ mvn compile test 

Here is the test result:

Running com.lei.cep.StockTradeTest 19:39:25,419 INFO [StockTrade] start testStockTradeNotBuy() 19:39:25,419 INFO [StockTrade] symbal=GOOG 19:39:25,419 INFO [StockTrade] interval=1000 19:39:25,419 INFO [StockTrade] targetPrice=90.0 19:39:25,419 INFO [StockTrade] percetageDrop=10.0 19:39:25,420 INFO [StockTrade] arrayPrice=[100.0, 99.0, 100.5, 101.01, 100.0, 102.01, 101.0, 99.5, 100.0, 101.0, 102.0, 100.0, 99.99, 98.87, 93.99, 95.09, 90.99, 89.8, 93.9] 19:39:26,919 INFO [StockTrade] minPrice=100.0 avgPrice=100.0 19:39:27,921 INFO [StockTrade] minPrice=99.0 avgPrice=99.5 19:39:28,923 INFO [StockTrade] minPrice=99.0 avgPrice=99.83333333333333 19:39:29,924 INFO [StockTrade] minPrice=99.0 avgPrice=100.1275 19:39:30,925 INFO [StockTrade] minPrice=99.0 avgPrice=100.102 19:39:31,927 INFO [StockTrade] minPrice=99.0 avgPrice=100.42 19:39:32,928 INFO [StockTrade] minPrice=99.0 avgPrice=100.50285714285714 19:39:33,930 INFO [StockTrade] minPrice=99.0 avgPrice=100.3775 19:39:34,931 INFO [StockTrade] minPrice=99.0 avgPrice=100.33555555555556 19:39:35,932 INFO [StockTrade] minPrice=99.0 avgPrice=100.402 19:39:36,934 INFO [StockTrade] minPrice=99.0 avgPrice=100.54727272727273 19:39:37,935 INFO [StockTrade] minPrice=99.0 avgPrice=100.50166666666667 19:39:38,937 INFO [StockTrade] minPrice=99.0 avgPrice=100.46230769230769 19:39:39,938 INFO [StockTrade] minPrice=98.87 avgPrice=100.34857142857143 19:39:40,940 INFO [StockTrade] minPrice=93.99 avgPrice=99.92466666666668 19:39:41,941 INFO [StockTrade] minPrice=93.99 avgPrice=99.6225 19:39:42,943 INFO [StockTrade] minPrice=90.99 avgPrice=99.11470588235295 19:39:43,944 INFO [StockTrade] minPrice=89.8 avgPrice=98.59722222222223 19:39:44,946 INFO [StockTrade] minPrice=89.8 avgPrice=98.35000000000001 19:39:44,946 INFO [StockTrade] end testStockTradeNotBuy() buy GOOG? no 19:39:44,947 INFO [StockTrade] start testStockTradeBuy() 19:39:44,947 INFO [StockTrade] symbal=GOOG 19:39:44,947 INFO [StockTrade] interval=1000 19:39:44,947 INFO [StockTrade] targetPrice=90.0 19:39:44,947 INFO [StockTrade] percetageDrop=10.0 19:39:44,947 INFO [StockTrade] arrayPrice=[100.0, 99.0, 100.5, 106.01, 103.0, 107.01, 107.0, 108.5, 109.0, 105.0, 105.01, 102.0, 103.91, 105.0, 107.5, 102.0, 101.0, 104.01, 101.0, 102.91, 101.0, 99.5, 106.0, 101.0, 102.0, 100.0, 98.0, 89.0, 93.0] 19:39:46,265 INFO [StockTrade] minPrice=100.0 avgPrice=100.0 19:39:47,266 INFO [StockTrade] minPrice=99.0 avgPrice=99.5 19:39:48,268 INFO [StockTrade] minPrice=99.0 avgPrice=99.83333333333333 19:39:49,270 INFO [StockTrade] minPrice=99.0 avgPrice=101.3775 19:39:50,271 INFO [StockTrade] minPrice=99.0 avgPrice=101.702 19:39:51,273 INFO [StockTrade] minPrice=99.0 avgPrice=102.58666666666666 19:39:52,275 INFO [StockTrade] minPrice=99.0 avgPrice=103.21714285714286 19:39:53,277 INFO [StockTrade] minPrice=99.0 avgPrice=103.8775 19:39:54,279 INFO [StockTrade] minPrice=99.0 avgPrice=104.44666666666666 19:39:55,281 INFO [StockTrade] minPrice=99.0 avgPrice=104.502 19:39:56,282 INFO [StockTrade] minPrice=99.0 avgPrice=104.54818181818182 19:39:57,284 INFO [StockTrade] minPrice=99.0 avgPrice=104.33583333333333 19:39:58,286 INFO [StockTrade] minPrice=99.0 avgPrice=104.30307692307693 19:39:59,288 INFO [StockTrade] minPrice=99.0 avgPrice=104.35285714285715 19:40:00,290 INFO [StockTrade] minPrice=99.0 avgPrice=104.56266666666667 19:40:01,292 INFO [StockTrade] minPrice=99.0 avgPrice=104.4025 19:40:02,293 INFO [StockTrade] minPrice=99.0 avgPrice=104.20235294117647 19:40:03,294 INFO [StockTrade] minPrice=99.0 avgPrice=104.19166666666666 19:40:04,296 INFO [StockTrade] minPrice=99.0 avgPrice=104.02368421052631 19:40:05,297 INFO [StockTrade] minPrice=99.0 avgPrice=103.968 19:40:06,299 INFO [StockTrade] minPrice=99.0 avgPrice=103.82666666666667 19:40:07,300 INFO [StockTrade] minPrice=99.0 avgPrice=103.63000000000001 19:40:08,302 INFO [StockTrade] minPrice=99.0 avgPrice=103.73304347826088 19:40:09,304 INFO [StockTrade] minPrice=99.0 avgPrice=103.61916666666667 19:40:10,306 INFO [StockTrade] minPrice=99.0 avgPrice=103.5544 19:40:11,308 INFO [StockTrade] minPrice=99.0 avgPrice=103.4176923076923 19:40:12,310 INFO [StockTrade] minPrice=98.0 avgPrice=103.21703703703704 19:40:13,311 INFO [StockTrade] minPrice=89.0 avgPrice=102.70928571428571 19:40:13,311 INFO [StockTrade] buyNow=true 19:40:13,311 INFO [StockTrade] end testStockTradeBuy() buy GOOG? yes Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 47.893 sec

I hope you like this quick start on CEP with Esper post, and I certainly enjoy the software journey.


  1. Replies
    1. Hi, Great.. Tutorial is just awesome..It is really helpful for a newbie like me.. I am a regular follower of your blog. Really very informative post you shared here. Kindly keep blogging. If anyone wants to become a Java developer learn from Java Training in Chennai. or learn thru Java EE Online Training from India . Nowadays Java has tons of job opportunities on various vertical industry.

  2. Very well explained post also a day trader in stock markets and completely understand how irrational pricing sometimes give me best of trades and profits.
    cheap stock trading