Saturday, December 29, 2012

Simple Java NIO Socket Server

There are plenty of awesome Java Socket Server Implementations out there, the purpose of writing a rather simple implementation of my own is purely academic. Interested readers can check out,

  1. Netty at https://netty.io/
  2. Apache MINA at http://mina.apache.org/
The above implementations/frameworks have gone way past the point of being just "NIO Socket Servers". They are in fact fully evolved event based asynchronous systems you can use for a wide variety of needs.

Let's get down to business then, The following implementation simulates a request-reply.

The server simply listens for connections on a port and responds to valid requests with a reply.

package com.sockets.server;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Date;

public class ProcessSocketChannel implements Runnable {
 private SocketChannel socketChannel;
 private int BUFFER_SIZE = 1024;

 public ProcessSocketChannel(SocketChannel socketChannel) {
  this.socketChannel = socketChannel;
  Thread thread = new Thread(this);
  thread.start();
 }

 public void run() {
  System.out.println("Connection received from "
    + socketChannel.socket().getInetAddress().getHostAddress());
  readMessage();
  sendMessage("This is the server!!");
 }

 void sendMessage(String msg) {
  String fullmessage = new Date().toString() + " > " + msg;
  ByteBuffer buf = ByteBuffer.allocate(fullmessage.getBytes().length);
  buf.clear();
  buf.put(fullmessage.getBytes());
  buf.flip();
  while (buf.hasRemaining()) {
   try {
    socketChannel.write(buf);
   } catch (IOException e) {
    e.printStackTrace();
   }
  }
 }

 void readMessage() {
  ByteBuffer byteBuffer = ByteBuffer.allocate(BUFFER_SIZE);
  Charset charset = Charset.forName("us-ascii");
  CharsetDecoder decoder = charset.newDecoder();
  CharBuffer charBuffer;
  try {
   int bytes = socketChannel.read(byteBuffer);
   byteBuffer.flip();
   charBuffer = decoder.decode(byteBuffer);
   String result = charBuffer.toString();
   System.out.println(result);
  } catch (IOException e) {
   e.printStackTrace();
  } finally {
   byteBuffer = null;
   charset = null;
   decoder = null;
   charBuffer = null;
  }
 }
}

Processing of requests is delegated to a thread ProcessSocketChannel ,

package com.sockets.server;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Date;

public class ProcessSocketChannel implements Runnable {
 private SocketChannel socketChannel;
 private int BUFFER_SIZE = 1024;

 public ProcessSocketChannel(SocketChannel socketChannel) {
  this.socketChannel = socketChannel;
  Thread thread = new Thread(this);
  thread.start();
 }

 public void run() {
  System.out.println("Connection received from "
    + socketChannel.socket().getInetAddress().getHostAddress());
  readMessage();
  sendMessage("This is the server!!");
 }

 void sendMessage(String msg) {
  String fullmessage = new Date().toString() + " > " + msg;
  ByteBuffer buf = ByteBuffer.allocate(fullmessage.getBytes().length);
  buf.clear();
  buf.put(fullmessage.getBytes());
  buf.flip();
  while (buf.hasRemaining()) {
   try {
    socketChannel.write(buf);
   } catch (IOException e) {
    e.printStackTrace();
   }
  }
 }

 void readMessage() {
  ByteBuffer byteBuffer = ByteBuffer.allocate(BUFFER_SIZE);
  Charset charset = Charset.forName("us-ascii");
  CharsetDecoder decoder = charset.newDecoder();
  CharBuffer charBuffer;
  try {
   int bytes = socketChannel.read(byteBuffer);
   byteBuffer.flip();
   charBuffer = decoder.decode(byteBuffer);
   String result = charBuffer.toString();
   System.out.println(result);
  } catch (IOException e) {
   e.printStackTrace();
  } finally {
   byteBuffer = null;
   charset = null;
   decoder = null;
   charBuffer = null;
  }
 }
}

As for the client

package com.sockets.client;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Date;

public class Client {
 //The default buffer size
 private final int BUFFER_SIZE = 1024;
 private SocketChannel clientSocket;
 //The port of the server connecting to
 private int port = 7777;

 void run() {
  try {
   clientSocket = SocketChannel.open();
   //Obtaining the localhost
   InetAddress host = InetAddress.getLocalHost();
   //Connecting to server
   clientSocket.connect(new InetSocketAddress(host, port));
   System.out.println(String.format("Connected to %s on port %d",
     host.getHostAddress(), port));
   //Sending a message to the server
   sendMessage("Hello Server!!");
   //Reading the reply sent from the server
   readMessage();
  } catch (UnknownHostException unknownHost) {
   System.err.println("You are trying to connect to an unknown host!");
  } catch (IOException ioException) {
   ioException.printStackTrace();
  } finally {
   //Closing connection
   try {
    clientSocket.close();
   } catch (IOException ioException) {
    ioException.printStackTrace();
   }
  }
 }

 void sendMessage(String msg) throws IOException {
  String fullmessage = new Date().toString() + " > " + msg;

  ByteBuffer buf = ByteBuffer.allocate(fullmessage.getBytes().length);
  //Initialize the buffer
  buf.clear();
  buf.put(fullmessage.getBytes());
  //Flip the content of the buffer before writing
  buf.flip();
  //Writing Buffer Content to socket
  while (buf.hasRemaining()) {
   clientSocket.write(buf);
  }
 }

 void readMessage() {
  //Reads a text message
  ByteBuffer byteBuffer = ByteBuffer.allocate(BUFFER_SIZE);
  Charset charset = Charset.forName("us-ascii");
  CharsetDecoder decoder = charset.newDecoder();
  CharBuffer charBuffer;
  try {
   int bytes = clientSocket.read(byteBuffer);
   byteBuffer.flip();
   charBuffer = decoder.decode(byteBuffer);
   String result = charBuffer.toString();
   System.out.println(result);
  } catch (IOException e) {
   e.printStackTrace();
  } finally {
   byteBuffer = null;
   charset = null;
   decoder = null;
   charBuffer = null;
  }
 }

 public static void main(String args[]) {
  Client client = new Client();
  client.run();
 }
}

While I admit that the above implementation is less than ideal, given that it captures the basics of socket communications, I think it's a good starting point for those who still care about how the clock actually ticks.

Tuesday, December 25, 2012

Downloading Images with java NIO

So We got this flex application, it's got tons of images it's got to show and the client is not too happy with the loading time. It connects to a JEE back-end which pushes these image URLs. 

I have already cached requests at the back end, solved half the problem actually. And while we were at it, we cached the images at the front end too. Now, it takes around 6-7 seconds for a screen(state) to load the first time around, but we have managed to cut the subsequent requests down to 2-3 seconds range with request and content caching. The client, well he is still not too happy with initial loading times(no surprises there). So we came up with the idea to pre-download the images on to the local machine the application is running on.


Which brings us over to the topic of the post, downloading images. We wanted this to be light weight, so I initially planned on using Apache's HTTP client to do the dirty work asynchronously.Turns out it's slower than my arthritic granny, I couldn't have that. So I figured why not NIO, it's fast, it is efficient and gets the job done.


The download action is given below, I used an interface, but this is really not necessary, as I am not adding anything to it.

public interface Downloadable extends Runnable {
 
}

and here is the implementation of the download artifact,

import java.io.FileOutputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;

public class DownloadImage implements Downloadable {
 private String imageUrl;

 public DownloadImage(String imageUrl) {
  this.imageUrl = imageUrl;
 }

 public void run() {
  String resultFileName = DigestTools.getMD5Digest(imageUrl);
  try {
   URL url = new URL(imageUrl);
   ReadableByteChannel readableByteChannel = Channels.newChannel(url.openStream());
   FileOutputStream fileOutputStream = new FileOutputStream(
     PropertyLoader.getDownloadFileLocation() + resultFileName);
   FileChannel fileChannel = fileOutputStream.getChannel();
   fileChannel.transferFrom(readableByteChannel,0, 1 << 24);
   fileOutputStream.close();
   fileChannel.close();
   readableByteChannel.close();
  } catch (MalformedURLException e) {
   e.printStackTrace();
  }catch (IOException e) {
   e.printStackTrace();
  }
 }

 /*
  * (non-Javadoc)
  * 
  * @see java.lang.Object#hashCode()
  */
 @Override
 public int hashCode() {
  final int prime = 31;
  int result = 1;
  result = prime * result
    + ((imageUrl == null) ? 0 : imageUrl.hashCode());
  return result;
 }

 /*
  * (non-Javadoc)
  * 
  * @see java.lang.Object#equals(java.lang.Object)
  */
 @Override
 public boolean equals(Object obj) {
  if (this == obj) {
   return true;
  }
  if (obj == null) {
   return false;
  }
  if (!(obj instanceof DownloadImage)) {
   return false;
  }
  DownloadImage other = (DownloadImage) obj;
  if (imageUrl == null) {
   if (other.imageUrl != null) {
    return false;
   }
  } else if (!imageUrl.equals(other.imageUrl)) {
   return false;
  }
  return true;
 }

 @Override
 public String toString() {
  return "Download Artifact : " + imageUrl;
 }
}

Note that I have used several custom utility classes, like DigestTools and PropertyLoader. And now to the executor, I decided to go with Java Executor Service.

public class ImageDownloader {
 private static final Logger LOGGER = Logger
   .getLogger(ImageDownloader.class);
 private static final boolean DEBUG = LOGGER.isDebugEnabled();
 Collection<Downloadable> downloadables;
 ExecutorService executorService = Executors.newFixedThreadPool(PropertyLoader.getConcurrentThreads());

 public ImageDownloader() {
  if (DEBUG) {
   LOGGER.debug("Initializing image downloader");
  }
  downloadables = new HashSet<Downloadable>();
 }

 public void addDownloadable(Downloadable downloadable) {
  if (DEBUG) {
   LOGGER.debug("Adding downloadable with url " + downloadable);
  }
  downloadables.add(downloadable);
 }

 public void downloadImages() throws InterruptedException,
   ExecutionException, TimeoutException {
  if (executorService.isShutdown()) {
   executorService = Executors.newFixedThreadPool(PropertyLoader.getConcurrentThreads());
  }
  Collection<Future<Downloadable>> futures = new HashSet<Future<Downloadable>>();
  for (Downloadable downloadable : downloadables) {
   Future<Downloadable> future = executorService.submit(downloadable,downloadable);
   future.get(6, TimeUnit.SECONDS);
   futures.add(future);
  }
  for (Future<Downloadable> future : futures) {
   future.get();
  }
  executorService.shutdown();
  downloadables = Collections.EMPTY_SET;
 }
}

It's just a matter of firing it up now.



public class PartyStarter {

 /**
  * @param args
  * @throws ExecutionException 
  * @throws InterruptedException 
  * @throws TimeoutException 
  * @throws IOException 
  */
 public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException, IOException {
  Downloadable d1 = new DownloadImage("image_url1");
  Downloadable d2 = new DownloadImage("image_url2");
  
  ImageDownloader imageDownloader = new ImageDownloader();
  imageDownloader.addDownloadable(d1);
  imageDownloader.downloadImages();
 }

}
That's it for this post folks.

Sunday, December 23, 2012

Redis for all your Caching Needs



On this post I'll be walking you through Redis and how to access it with Java. Redis also supports other drivers, so connecting to Redis with your favorite programming language is a breeze. Click http://redis.io/clients for a full list of clients.  

I have used Redis primarily as a caching solution although there are plenty of other use cases.

Let first get the pleasantries out of the way.

Redis is a key-value data store. More correctly it's an in-memory persistent multi-value data store. The persistence comes from the fact that Redis by default snapshots its memory content on a frequent basis on to disk. This process is configurable, i.e you can configure Redis to save the data store every X number or key changes, every Y seconds. Redis is a database, albeit not your everyday relational database. It does capture the essence of a typical database you and I are familiar with. Redis databases are identified by numbers, 0 being the default one, and hence the one automatically selected when you don't specify one. Apart from Strings Redis also lets you save sets, lists and hashes and finally sorted sets. I usually JSON encode my data before saving to Redis, that way I can store complete data structures. Interested readers can try out other data structure, Redis comes loaded with heaps of operations you can perform on your data.

Redis is fast, it's really really fast, but you need to carefully analyze your requirements to see if it is in fact what you need for your data storage requirements. Redis is no-sql, non-relational and most of all its in-memory, well at least for most of the time. So you if you are looking to store a lot of data and that data is mission critical then Redis might not be the best of choices. But if you are looking for a really fast multi-value data store, your data is relatively light weight, and is not critical then Redis is your guy. To add to the last point, Redis does allow asynchronous master-slave replication mode which alleviates data loss due to unexpected server failure, therefore it's not a risk though.

I prefer using hierarchical keys, per say, to make retrieval of data easier. A typical key is prepended by the application name, component name etc and of course a unique identifier, for e.g suppose I want to save details about users on my system, I would create a User entity, instantiate it and populate it with data, stringify it before finally saving it, and keys would take the form of "[application name]::[component name]::[unique id]", this also gives me the ability to retrieve content based on key patterns. Of course you can create keys and name them at your whims and fancies. Whats more, you can set TTls for keys too.

I believe this is enough for you to get through this post. Redis however deserves a full post.

You can download Redis for windows at https://github.com/dmajkic/redis/downloads. Unzip it, and invoke redis-server.exe in a suitable version (win-32 or win-64). You know you are good to go when you see "[37740] 23 Dec 15:37:27 * The server is now ready to accept connections on port 6379". You can override ports and a lot more, but that's outside the scope of this post.

Jedis is an excellent choice as an interface to Redis, because its comprehensive and easy to configure. Download Jedis from https://github.com/xetorthio/jedis. However it can also lead many astray, hence it's always a good idea to wrap Jedis Operations in a facade and only expose the operation you want to be performed on your data. 

Ok, on to the fun part. Now as I mentioned earlier you're going to need a driver to interact with Redis server, I use Jedis, following is the pom file with Jedis dependency.


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>com.redis.test</groupId>
 <artifactId>RedisCache</artifactId>
 <version>0.0.1-SNAPSHOT</version>
 <name>Redis Cache Client</name>
 <description>Redis Cache Client</description>
 <dependencies>
  <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>1.5.2</version>
        </dependency>
 </dependencies>
</project>

Now on to the Jedis facade as I call it.



package com.redis.client;

import java.util.Set;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class RedisClient {
 private JedisPool jedisPool;

 public void save(String key, String value) {
  if (key == null) {
   throw new RuntimeException("? Redis key can not be null");
  }
  Jedis jedis = jedisPool.getResource();
  try {
   jedis.set(key, value);
  } catch (Exception e) {
  } finally {
   jedisPool.returnResource(jedis);
  }
 }

 public String load(String key) {
  if (key == null) {
   throw new RuntimeException("? Redis key can not be null");
  }
  Jedis jedis = jedisPool.getResource();
  String result;
  try {
   result = jedis.get(key);
  } catch (Exception e) {
   result = null;
  } finally {
   jedisPool.returnResource(jedis);
  }
  return result;
 }

 public Long addToSet(String key, String member) {
  if (key == null) {
   throw new RuntimeException("? Redis key can not be null");
  }
  Jedis jedis = jedisPool.getResource();
  Long result;
  try {
   result = jedis.sadd(key, member);
  } catch (Exception e) {
   result = null;
  } finally {
   jedisPool.returnResource(jedis);
  }
  return result;
 }

 public Set getKeysMatchingPattern(String pattern) {
  Set result = null;
  Jedis jedis = jedisPool.getResource();
  try {
   result = jedis.keys("*"+pattern+"*");
  } catch (Exception e) {
   result = null;
  }finally {
   jedisPool.returnResource(jedis);
  }
  return result;
 }

 /**
  * @param jedisPool
  *            the jedisPool to set
  */
 public void setJedisPool(JedisPool jedisPool) {
  this.jedisPool = jedisPool;
 }

}
Notice how I always return the connection back to the pool. This is important, you don't want to hold on to your connections once you are done. Also opt for batch operations, where with a single connection you do a lot of things, this is an excellent way to avoid connection exhaustion. Note also, that are heaps more operations you can perform with Jedis on top of Redis, feel free to explore them at http://redis.io/commands.

Where there are connections there is pooling. Jedis allows you to configure pool properties. It is not always a good idea to go with defaults, as defaults have a tendency to come back and bite you where it hurts.


Finally here's how you can use it,




package com.redis;

import java.util.Set;

import org.apache.commons.pool.impl.GenericObjectPool;

import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import com.redis.client.RedisClient;

public class Tester {
 public static void main(String[] args) {

  JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
  
  /* The maximum active connections per Redis instance */
  jedisPoolConfig.setMaxActive(10);
  /* The minimum idling connections- these connections are always open and always ready */
  jedisPoolConfig.setMinIdle(5);
  /* The maximum active connections per Redis instance */
  jedisPoolConfig.setMaxActive(5);
  /* Fail- fast behaviour Set the action to take when your pool runs out of connections 
   * default is to block the caller till a connection frees up */
  jedisPoolConfig
    .setWhenExhaustedAction(GenericObjectPool.WHEN_EXHAUSTED_BLOCK);
  /*Tests if a connection is still alive at the time of retrieval*/
  jedisPoolConfig.setTestOnBorrow(true);
  /* Tests whether connections are dead during idle periods */
  jedisPoolConfig.setTestWhileIdle(true);
  /*Number of connections to check at each idle check*/
  jedisPoolConfig.setNumTestsPerEvictionRun(10);
  /* Check idling connections every */
  jedisPoolConfig.setTimeBetweenEvictionRunsMillis(60000);
  /*maximum time in milliseconds to wait when the exhaust action is set to block*/
  jedisPoolConfig.setMaxWait(3000);

  JedisPool jedisPool = new JedisPool(jedisPoolConfig, "localhost", 6379);
  RedisClient redisClient = new RedisClient();
  redisClient.setJedisPool(jedisPool);

  redisClient.save("rediscache::tester::test1", "test");
  redisClient.save("rj::rediscache::tester::test2", "test2");
  Set keys = redisClient.getKeysMatchingPattern("rediscache::tester::");
  for (String key : keys) {
   System.out.println("Key : " + key + " Value : " + redisClient.load(key));
  }
 }
}
Before running this make sure your Redis instance is up and running.

You can grab the full source here.

Tuesday, December 18, 2012

Spring Integration

OK lets do this. In this post I will be guiding you through a simple example on Spring Integration. Spring integration, no matter how wild it appears to be, at its heart, its a messaging framework that supports both application wide messaging and with a bit of work (not too much), across the enterprise too. Its light weight and moreover you get all this with separation of concerns enabling you to write extensible, easy to maintain applications.

Before we go any further let me first run you through existing integration patterns or styles.

  1. Shared Database Integration
  2. Remote Procedure Calls
  3. File Based Integration
  4. Message based integration
I am sure most of you are familiar with these styles. Spring integration however is based on message based integration, although you would find support for others in the framework.

Lets now see what Spring Integration is about. Here is a few things you need to understand the example that's about to follow.

Messages

Messages are units of information, information that you will be exchanging within/between applications or to be general endpoints. There are no restrictions on what your message content can be. It could be XML, plain strings, you can have header content too.

Message Channels

Message channels are a medium of transportation, if  you will. Channels manage message delivery, thereby decoupling senders (producers) from receivers(consumers) of messages and moreover from any concerns about the state of the consumer (passive or active) and delivery mechanism.

Message Endpoints

Message endpoints can be applications or application components. They are the ones doing something with the messages. The could be publishers, consumers or components sitting in the middle relaying/ aggregating/ splitting messages. There are different types on endpoints in Spring Integration, in this example we will be using service-activator and messaging gateway.
Service Activator
A service Activator simple is a component that takes an input from a channel and invokes a service and returns a message (more correctly an outbound message) based on an incoming message. The beauty of this is that the service would be completely unaware that its a part of a messaging system.
Messaging Gateway
Messaging gateway on the other hand is in simple terms an entry point or an exit point of a messaging flow. You can define both inbound and outbound messaging gateways, outbound messaging gateways allow you to connect to JMS, Web Services etc. Gateways when declared with XML allows your code to be completely void of any spring integration related details.

And now for the fun part.

Lets obey the unwritten traditions of programming and create a much cliche'd Hello World Application, only this time using Spring Integration.

Following is the structure of the project,

As for the pom file
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>com.spring.test</groupId>
 <artifactId>SpringIntegration</artifactId>
 <version>0.0.1-SNAPSHOT</version>
 <packaging>war</packaging>
 <name>SpringIntegration</name>
 <dependencies>
  <dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-core</artifactId>
   <version>2.1.3.RELEASE</version>
  </dependency>
 </dependencies>
</project>

Here is the service interface

package com.spring.integration;

public interface HelloService {
 String sayHello(String name);
}

 
note that it returns a value, you can have service definitions with void types too.

as for the service implementation we have a simple POJO completely oblivious of its participation in a messaging system.

package com.spring.integration.services;

import com.spring.integration.HelloService;

public class MyHelloService implements HelloService {

 public String sayHello(String name) {
  return ("Hello " + name);
 }

}

lets have a look at the spring configuration file shall we

<beans:beans xmlns:beans="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.springframework.org/schema/integration"
 xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd">


 <gateway service-interface="com.spring.integration.HelloService"
  id="helloGateway" default-request-channel="names" />
  
 <channel id="names" />
 
 <service-activator input-channel="names" ref="helloService"
  method="sayHello" />
  
 <beans:bean id="helloService"
  class="com.spring.integration.services.MyHelloService" />
  
</beans:beans>

That's is it now lets run this thing shall we.

package com.spring.integration.main;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.spring.integration.HelloService;

public class Tester {

 public static void main(String[] args) {
  ApplicationContext applicationContext = new ClassPathXmlApplicationContext(
    "classpath:spring-beans.xml");
  HelloService helloService = applicationContext.getBean("helloGateway",
    HelloService.class);
  System.out.println(helloService.sayHello("World"));

 }

}

See how your code is completely void of any spring integration details, the gateway definition allows us to interact with the service just like you interact with any other spring bean.

for the complete source
Click here