Externalize Flink app configuration in Kubernetes

Apache Flink is a popular distributed processing engine for unbounded and bounded data streams. Flink is designed to run streaming applications at a large scale and can be integrated with cluster resource managers such as Hadoop YARN, Apache Mesos and Kubernetes.

Flink offers a very efficient out-of-the-box memory management when compared to other popular streaming engines such as Spark Streaming, Apache Samza or Apache Storm. Flink applications can be run on thousands of concurrent tasks distributed in a cluster. Flink’s revolutionary asynchronous checkpointing, inspired by the Chandy-Lamport algorithm, allows us to draw consistent snapshots of the distributed data stream and is the backbone of the fault-tolerance mechanism in Flink. It uses the concept of injecting stream barriers as part of the data stream. The barriers separate the records that are part of the current snapshot from the records that will be part of the next snapshot. The stream barriers will be synced across the parallel streams of data in a partitioned stream, while the data will be buffered for further processing which helps in increasing the processing throughput of the application while maintaining a consistent snapshot of the application state and operator state.

One of the challenges we faced while running a Flink application in a containerized environment such as Kubernetes is the ability to dynamically inject application configuration during runtime. We use Typesafe Config to manage application configuration and the configuration file gets bundled as part of the application jar. The application jar is then bundled as a Docker image built from base Flink image. The challenge here was to solve the puzzle of mounting different application configuration across different environments and keep the docker image unchanged.

Kubernetes offers ConfigMaps, an API object to define application configuration as a set of key-value pairs. A sample flink application configuration to demonstrate the ConfigMap is shown below.

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-streaming-config
  namespace: flink-app
  labels:
    app: flink
data:
  flink-streaming: |+
    kafka {
      broker-servers = "localhost:9092"
       input.topic = flink.streaming.input
       output.success.topic = flink.streaming.output
    }
    task {
      consumer.parallelism = 2
      downstream.operators.parallelism = 2
    }

The following code snippet details on how to mount a ConfigMap inside the Flink JobManager container.

apiVersion: batch/v1
kind: Job
metadata:
  name: flink-streaming-jobmanager
  namespace: flink-app
spec:
  template:
    metadata:
      labels:
        app: flink
        component: flink-streaming-jobmanager
    spec:
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-streaming-config
          items:
          - key: flink-streaming
            path: flink-streaming.conf
      restartPolicy: OnFailure
      containers:
      - name: flink-streaming-jobmanager
        image: "flink-streaming-app-docker-image"
        imagePullPolicy: Always
        workingDir: /opt/flink
        command: ["/opt/flink/bin/standalone-job.sh"]
        args: ["start-foreground",
               "--job-classname=com.anand.flink.FlinkStreamingApp", 
               "-Djobmanager.rpc.address=flink-streaming-jobmanager",
               "-Djobmanager.rpc.port=6123",
               "-Dblob.server.port=6124", 
               "-Dqueryable-state.server.ports=6125",
               "--config.file.path",
               "/data/flink/conf/flink-streaming.conf"]
        volumeMounts:
        - name: flink-config-volume
          mountPath: /data/flink/conf/flink-streaming.conf
          subPath: flink-streaming.conf

While mounting the ConfigMap is pretty straightforward, injecting the config file path into the application jar is not. Flink provides a utility named ParameterTool which allows us to read command line parameters that are being passed to the main function of a Java application. The config.file.path property is passed from the Kubernetes deployment manifest. The configuration is then read using the fromArgs function in the ParameterTool. This simple trick allows us to inject the external configuration file from the docker container.

import com.typesafe.config.ConfigFactory
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
import org.apache.flink.streaming.util.serialization.SimpleStringSchema

import java.io.File
import java.util.Properties

object FlinkStreamTask {
  def main(args: Array[String]): Unit = {
    val configFilePath = Option(ParameterTool.fromArgs(args).get("config.file.path"))
    val config = configFilePath.map {
      path => ConfigFactory.parseFile(new File(path)).resolve()
    }.getOrElse(ConfigFactory.load("flink-streaming.conf")
      .withFallback(ConfigFactory.systemEnvironment()))
    val task = new FlinkStreamTask(config)
    task.process()
  }
}

class FlinkStreamTask(config: com.typesafe.config.Config) {

  def process(): Unit = {
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "flink.streaming.group")

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env
      .addSource(new FlinkKafkaConsumer[String](config.getString("kafka.input.topic"), 
        new SimpleStringSchema(), properties))

    val kafkaSink = new FlinkKafkaProducer[String](config.getString("kafka.output.topic"), 
      new SimpleStringSchema(), properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE)

    stream.addSink(kafkaSink)

    env.execute("FlinkStreamingJob")
  }

}
Posted in Uncategorized | Leave a comment

Singleton instance of akka actor per actor system

Akka does provide Cluster Singleton to create a singleton instance of an actor in a clustered environment. There are situations when we need only one instance of an actor created per ActorSystem. For e.g., we had a set of Worker actors executing different jobs spawned by multiple services. We used a Router actor based on RoundRobinRoutingLogic where in multiple services will submit the jobs to the router. The Router actor in turn will submit the jobs to the Worker actors based on the round robin logic. However, we needed the services to submit the jobs to the same router instance. The following piece of code when inserted into each actor will create a new instance of the router actor. It is ok to pass around the ActorRef of the router actor through an Actor’s constructor. But, this will unnecessarily enforce a contract for all the services to implement.

implicit lazy val serviceRouter = context.system.actorOf(Props[RouterActor], "router-actor")

Akka provides a way Akka Extensions to add features to akka. Akka extensions has two basic components namely Extensions and ExtensionId. Extensions are loaded only once per actor system. ExtensionId is needed to get the instance of our extension.

We will create a class called SystemScopedImpl which will take an ActorSystem, Props and a name for the actors which need to be Singleton for that actor system. We will use the usual way of obtaining an instance of the actor i.e. system.actorOf.

class SystemScopedImpl(system: ActorSystem, props: Props, name: String) extends Extension {
    val instance: ActorRef = system.actorOf(props, name = name)
}

We will then create a SystemScoped trait which will extend from the extension we created. The lookup method is needed by the ExtensionIdProvider to load our extension when the ActorSystem is created. The createExtension method will be used to create our extension. We define two more variables, namely instanceProps and instanceName, which need to be overridden by the classes that implement the SystemScopedImpl trait. These two are needed to instantiate the Singleton actor instance.

trait SystemScoped extends ExtensionId[SystemScopedImpl] with ExtensionIdProvider {
    final override def lookup = this
    final override def createExtension(system: ExtendedActorSystem) = new SystemScopedImpl(system, instanceProps, instanceName)

    protected def instanceProps: Props
    protected def instanceName: String
}

We then create a scala object which will implement SystemScoped trait and override the instanceProps and instanceName variables. We use this singleton instance of the router actor by accessing it as shown below in the ServiceActor. We can grab the same instance of the router across the entire ActorSystem. This was very useful not to instantiate the router many times across our application.

object SingletonService extends SystemScoped {
    override lazy val instanceProps = Props[RouterActor]
    override lazy val instanceName = "router-actor"
}

class ServiceActor extends Actor with ActorLogging {
    val actorSystem = context.system
    val router = SingletonService(actorSystem).instance

    def receive = {
        case job: Job => router tell job
    }
}

 

 

 

Posted in akka, Programming, scala | Leave a comment

Mysql/MariaDB – Pull remote table data matching a specific where clause

mysqldump is a really handy tool to pull data from one MySQL/MariaDB database to another. I will quickly highlight how to use to use mysqldump to pull data from a remote table matching a specific criteria into a local table.


mysqldump -u<remote-user-name> -p<remote-password> -h<remote-host-name or IP> --databases <remote-database-name> --tables <remote-table-name> --where="<complete where clause>" | mysql <local-database-name> -u<local-db-username> -p<local-db-password>

Posted in Uncategorized | Leave a comment

psi-probe – Tomcat administration tool

Ever wondered you need a better tool for administration and monitoring for the applications running under tomcat web application container? psi-probe is a wonderful tool which needs absolutely no changes to your existing web applications running under tomcat. psi-probe is a war (web archive) application which needs to be deployed along with your other web applications. It provides the following features (as stated in the psi-probe web page):

  • Requests: Monitor traffic in real-time, even on a per-application basis.
  • Sessions: Browse/search attributes, view last IP, expire, estimate size.
  • JSP: Browse, view source, compile.
  • Data Sources: View pool usage, execute queries.
  • Logs: View contents, download, change levels at runtime.
  • Threads: View execution stack, kill.
  • Connectors: Status, usage charts.
  • Cluster: Status, usage charts.
  • JVM: Memory usage charts, advise GC
  • Java Service Wrapper: Restart JVM.
  • System: CPU usage, memory usage, swap file usage.

Configuration:

1. psi-probe requires following four roles in the order of increasing privileges

  • probeuser
  • poweruser
  • poweruserplus
  • manager – This is the same role required by Tomcat Manager and has the highest level of privileges.

Add these as part of the tomcat-users.xml under $TOMCAT_HOME/conf (TOMCAT_HOME will be the tomcat installation directory).

<?xml version="1.0" encoding="UTF-8"?>
<tomcat-users>
<role rolename="tomcat"/>
<role rolename="manager"/>
<role rolename="manager-gui"/>
<role rolename="probeuser" />
<role rolename="poweruser" />
<role rolename="poweruserplus" />
<user username="admin" password="admin" roles="manager, manager-gui"/>
</tomcat-users>

2. psi-probe requires remote JMX to be enabled to collect and display memory usage, cluster traffic, connection pools, and active threads etc. Create an executable file called setenv.sh and place it under $TOMCAT_HOME/bin directory. It is advised that you add your custom JAVA_OPTS or CATALINA_OPTS to this file instead of modifying catalina.sh. Add the following line to setenv.sh


export CATALINA_OPTS="$CATALINA_OPTS -Dcom.sun.management.jmxremote=true"

3. Deploy the probe.war under $TOMCAT_HOME/webapps directory.

Here are some screenshots from the psi-probe application when I used it to monitor one of my web applications:

1. Installed applications

probe-installed-apps2. Log file monitoring

probe-log-file-montoring3. Memory utilization:

probe-memory-utilization

 

 

 

Posted in Programming, tomcat | Tagged , , , , | 2 Comments

awk – Using shell variables during execution

awk – as the unix manual states is a “pattern-directed scanning and processing language”. awk is acronym which stands for the first letters of its authors Aho, Weinberger and Kernighan. It is a powerful programming language to extract data from a structured input. awk supports regular expressions for search pattern and it cannot deal with non-text files/content. awk treats the input as records and fields.

It is a common assumption that the shell variables can be directly used with the awk command as we generally use shell variables. To use the shell variables in the awk command, we need to use -v switch to assign value to a variable before the awk statement is executed. In the following example, we will use awk to extract data rows from a tab-delimited csv file which match our variable COLUMN_LOOKUP_VALUE value “Value 1”. The -v switch assigns the value of the shell variable to awk local variable lookup_value. The awk variable can now be used with the program execution to match a particular column value (in this case column # 3). If a match is found, the entire row is written to a data_extract.csv file.

COLUMN_LOOKUP_VALUE="Value 1"
awk -F"\t" -v lookup_value="${COLUMN_LOOKUP_VALUE}" '
{
    if (toupper($3) == toupper(lookup_value)) print
}' data_file.csv > data_extract.csv
Posted in Programming, shell scripting | Tagged , | Leave a comment

Twitter bootstrap – Load last seen tab on page reload

This post will talk about how to reload the same tab on a page during page reload using twitter bootstrap v3 and jQuery. We will use the idea of storing the last used tab in the cookie information and display the last loaded tab during page reload/refresh. Here is the maven project structure I created to deploy the application on a servlet container. You need to have bootstrap.min.css, jquery.min.js, bootstrap.min.js and jquery.cookie.js. You can download jquery.cookie.js from http://plugins.jquery.com/cookie/. The other css and js files downloads are straight forward.

lasttab-project-structurelasttab.jsp contents


<!DOCTYPE html>
<head>
<meta charset='utf-8'>

<!-- import required css and javascript files -->
<link href="./css/bootstrap.min.css" rel="stylesheet" type="text/css" />
<script src="./js/jquery.min.js" type="text/javascript"></script>
<script src="./js/jquery.cookie.js" type="text/javascript"></script>
<script src="./js/bootstrap.min.js" type="text/javascript"></script>
</head>
<body>
<div class="container">

<!-- create reference anchors for tabs -->
<ul class="nav nav-tabs" id="tabs">
<li class="active"><a href="#tab1" data-toggle="tab">Tab 1</a></li>
<li><a href="#tab2" data-toggle="tab">Tab 2</a></li>
</ul>

<!-- tab content div -->
<div class="tab-content">
<div class="tab-pane active" id="tab1">
<h3>Tab1</h3>
</div>
<div class="tab-pane" id="tab2">
<h3>Tab2</h3>
</div>
</div>
</div>

<script type="text/javascript">

$(document).ready(function() {
var lastTab = $.cookie('last_tab');

// if last seen tab was stored in cookie
if(typeof(lastTab) !== "undefined") {
//remove active css class from all the unordered list items
$('ul.nav-tabs').children().removeClass('active');
$('a[href='+ lastTab +']').parents('li:first').addClass('active');
$('div.tab-content').children().removeClass('active');
$(lastTab).addClass('active');
}
});

// event to capture tab switch
$('a[data-toggle="tab"]').on('shown.bs.tab', function (event) {
event.preventDefault();
//save the latest tab using a cookie:
$.cookie('last_tab', $(event.target).attr('href'));
console.log('last_tab ' + $.cookie('last_tab'));
});


</script>
</body>
</html>

Use the $.cookie to save the last seen tab in the cookies. The script section in the above html code will tell you how to reload a last seen tab from the cookie. The first tab should be loaded by default when you do a page refresh. However, tab2 gets loaded from our cookies since it was the last viewed tab.

lasttab-from-cookie

Posted in Javascript, Programming | Tagged , , | Leave a comment

Setting JAVA_HOME on Mac OS X when multiple JDKs present

It is pretty common to have multiple JDKs installed on your development machines. On Mac OS X, the following command will let you easily switch between JDKs without much fuss. Add the following line to your ~/.bash_profile and you can start using $JAVA_HOME without hard coding your JDK version at your application layer.

export JAVA_HOME=`/usr/libexec/java_home -v 1.6`

Changing the jdk version value using the -v switch will allow you to easily switch between JDK versions.

Posted in Java, Mac OS X, Programming | Tagged , | 2 Comments

Connect Jenkins to Internal Self-Signed Certificate servers and Configure SSL

Our Jenkins CI server will have to connect to our internal servers such as ldaps, JIRA or IRC etc. These servers will have a self-signed certificate and we will look at how to import these certificates so that Jenkins could connect to them successfully.

We need to import the .cer or .pem file generated by our internal server into the cacerts keystore under jdk/jre/lib/security. To locate where the current jdk is located, you need to follow the symlinks of the java executable. Here is a snippet of how I located my jdk directory. So, my jdk was located under /usr/java/jdk1.7.0_25/.

-bash-4.1$ whereis java
java: /usr/bin/java /etc/java /usr/lib64/java /usr/share/java /usr/share/man/man1/java.1
-bash-4.1$ cd /usr/bin/
-bash-4.1$ ls -ltr java
lrwxrwxrwx 1 root root 22 Jan 8 07:01 java -> /etc/alternatives/java
-bash-4.1$ cd /etc/alternatives/
-bash-4.1$ ls -ltr java
lrwxrwxrwx 1 root root 30 Jan 8 07:01 java -> /usr/java/jdk1.7.0_25/bin/java
-bash-4.1$

To import the certificate into the java keystore, we will be using the keytool command. Keytool is a key and certificate management utility. Run the following command from a terminal window. You will be prompted for your keystore password. The default password is “changeit” if you haven’t changed it. The tool will display the certificate’s contents and will ask you if you want to accept the certificate. Press “y” and you should have your certificate imported into your java keystore.

-bash-4.1$ sudo keytool -import -trustcacerts -alias alias-for-your-internal-server-certificate -keystore /path/to/your/jdk/jre/lib/security/cacerts -file /path/to/your-certificate-.cer-or-.pem file

Configuring LDAPS:

Once you are logged onto the Jenkins CI UI console, click on Manage Jenkins –> Configure Global Security menu from the left tree. Change the ldap protocol from ldap to ldaps. If your ldap server’s ssl port is different from 636, you need to specify the port in the url itself, i.e., ldaps://your-ldap-server-host:port. Restarting the jenkins server will access the ldap server over ldaps protocol.

jenkins-ldaps-smudged

Configuring SSL access for Jenkins CI server:

  1. Under the JENKINS_HOME directory, create a hidden directory .ssl (mkdir .ssl).
  2. To access our CI server over https, we will need  a SSL certificate. We can get an official CA certificate from a CA Authority or we could use the keytool utility to generate a  self-signed certificate. The following command will store the certificate in the path specified under the -keystore switch.
  3.         -bash-4.1$ keytool -genkey -alias jenkins-ssl-cert -keyalg RSA -keystore $JENKINS_HOME/.ssl/.keystore -validity 365
            
  4. The keytool command will then prompt you to enter a passphrase for the keystore. Re-enter a secure passphrase again to confirm. The tool will then prompt you to enter some questions about your organization
    • First and Last Name: Enter your server’s hostname or your organization’s domain name, e.g. http://www.domain.com
    • Organizational Unit: Enter the name of your unit in your organization
    • Organization: Enter the name of your organization
    • City Location: Enter your organization’s city
    • State: Enter the State
    • Country Code: Two letter country code
  5. Confirm if the above mentioned information are correct. The keytool will prompt you to Enter passphrase for key (Press Enter if the same as the keystore password). Press carriage return to complete the certificate generation.
  6. Make the following changes to /etc/sysconfig/jenkins file. You need to have sudo access to do so.
    • Change the JENKINS_PORT value to “-1” – You need to do this to ensure your CI server is no longer accessed through http.
    • Change the JENKINS_HTTPS_PORT value to “443” or any other port which is not taken.
    • Change the JENKINS_ARGS value to “–httpsKeyStore=$JENKINS_HOME/.ssl/.keystore –httpsKeyStorePassword=same-as-provided-during-cert-generation”.

Restarting the jenkins server will allow you to access your CI server using https:// protocol. I have smudged out all the details related to my certificate. You should see all the details you had entered during the certificate creation when you access the CI server in your browser.

jenkins-https-smudged

Posted in Java, Programming | Tagged , , , , , , | 2 Comments

Remote debugging on tomcat

I am sure lot of us have been in situations where we had our applications running on a remote server, say dev or test servers and we would like to debug the application to actually figure out why errors are being thrown. Ah, you might think why would we want to debug an application running on a remote server. It is often possible that the desktops/laptops we use for software development will not have the same OS/Server configurations as the dev or test servers. The other scenario might be that the application always starts from the command line even in our local development machines. I do agree it doesn’t take a lot of time to configure the application to start up from within your IDE, nevertheless, remote debugging feature is useful. We are going to use the Java Platform Debug Architecture (JPDA) options to start a server to allow remote debugging.

Create a file called setenv.sh or setenv.bat (in the case of Windows), if it doesn’t exist already, under your tomcat_home (tomcat installation)/bin directory. It is always advisable not to edit the catalina.sh or catalina.bat, though the same configuration would work from there too. Now, the debug configuration can be set either through JPDA_OPTS or CATALINA_OPTS environmental variables.

#In this case, tomcat needs to be started with jpda switch i.e., $TOMCAT_HOME/bin/catalina.sh jpda start
export JPDA_OPTS="$JPDA_OPTS -agentlib:jdwp=transport=dt_socket, address=9125, server=y, suspend=n"

or

#This doesn't require the jpda switch and will open the specified port for debugging
export CATALINA_OPTS="$CATALINA_OPTS -agentlib:jdwp=transport=dt_socket, address=9125, server=y, suspend=n"

If the suspend option is set to “y”, the JVM will stay suspended until a debugger attaches to it. In most of the cases, the option will be set to “n”. But there are scenarios in which you would like to debug parts of your application start up, such as loading data into JVM memory during application start up. The address option will determine which port on the server will be open for remote debugging. The transport is the underlying mechanism used to move bits between debugger and the debuggee (the server on which the application is running). The possible transport mechanisms include: sockets, serial lines, and shared memory. If the debugger and debuggee are located on the same box, then shared memory transport mechanism can be used. If the debugger and debuggee are located on different boxes, socket transport mechanism must be used.

Once the tomcat server is started with the application deployed under it, we could check if the specified port is actually listening for debuggers to attach. There are quite a few programs which would allow you to list the processes running on different ports. I am going to be using lsof command. Using sudo will allow you to list processes running under other users on the machine too.

personalpc:bin anand$ sudo lsof -n -i4TCP:9125
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 30306 tomcat 5u IPv4 0xb1069c72850a5bdb 0t0 TCP *:9125 (LISTEN)

All the modern day IDEs should allow you to attach a debugger to the remote server as they implement the JPDA APIs. I am going to demonstrate this with both Eclipse and IntelliJ IDEs.

Eclipse:

Access the debug configurations from under the Run –> Debug Configurations menu. Select the Remote Java Application from the menu tree and click on the “New” icon (with a small plus sign) on the top left of the menu tree. Name you debug configuration and select a project that will be associated with this configuration using the Browse button. Leave the Connection type as Socket Attach. Click the Source tab and select the source code folder you would want to attach to this debug configuration. Mention the Host name (or the IP address) of the server on which the remote application is running and the port on which the server will allow remote debugging (as set in the setenv file above). You can start debugging by clicking on the debug button.

eclipse-remote-debug-config

Eclipse wouldn’t display anything on the console if the connection was successful. However, if you switch to the Debug perspective, you should see a thread that is attached to the remote application on the specified port in the stack frame.

eclipse-debug-perspective

IntelliJ IDE:

Access the debug configurations from under the Run –> Edit Configurations menu. Select the Remote from the menu tree and click on the “New” icon (with a small plus sign) to add a new debug configuration. Name you debug configuration. Leave the transport as Socket and Debugger mode as Attach. Leave the hostname as localhost if you are debugging on your local machine or specify the IP address of the remote server and also specify the port on which the application can be debugged. Select the project that will be associated with this configuration using the drop down. Click on Ok to finish the configuration.

intellij-debug_config

Select the debug configuration from the “Select Run/Debug Configuration” drop down and then click on the debug button (the green icon next to the drop down). You should see a message in the debugger console – “Connected to the target VM, address: ‘localhost:9125’, transport: ‘socket'” or an error message in the console.

intelli-debug-remote-app

You can now place breakpoints in your source code and then debug the application as you would normally do for applications running on servers within your IDE! Happy remote debugging!

Posted in Programming, tomcat | Tagged , , , | Leave a comment

Block websites on Mac OS X, *nix OS

There are times when you would like to block some websites from your laptop or desktop. If you are running Mac OS X or any *nix like OSes, it is pretty straight forward and simple. Of course, you need to have root privileges to do so.

Edit the /etc/hosts file and add the websites you need to block, one per line. The /etc/hosts file can be considered as a local DNS resolver and also assists in addressing nodes in a network.


sudo vi /etc/hosts

To add a new line below the current line in the vim editor, use “o”. By default, the 127.0.0.1    localhost entry and some more entries will be there. The IP address 127.0.0.1 represents the hostname localhost (or your computer). Whenever we make an entry to block a website, we need to enter the localhost IP, domain name (or your wesbite address) pair as shown below. To save the file after editing it, use ESC + :wq!.


##
# Host Database
#
# localhost is used to configure the loopback interface
# when the system is booting. Do not change this entry.
##
127.0.0.1    localhost
127.0.0.1    domain1.com
127.0.0.1    domain2.com
127.0.0.1    domain3.com

In most of the *nix like OSes, this is enough to get the websites being blocked. On Mac OS X, however, you need to run one more command to get the changes come to effect. Run the following command from a terminal window. You are then good to go!


sudo dscacheutil -flushcache

etc-hosts

Posted in Mac OS X, Programming | Tagged , , | 3 Comments