Cassandra instantaneous in place node replacement

At some point everyone using Cassandra faces the situation of having to replace nodes. Either because the cluster needs to scale and some nodes are too small or because a node has failed or because your virtualisation provider is going to remove it.

Whichever the reason the situation we face is that a new node has to be streamed in, to hold the exact same data the old one had. Why do we need to wait for a whole streaming process, with the network and CPU overhead this requires when we could just copy the data into the new node and have it join the ring replacing the old one?

That’s what we, at MyDrive have been doing for a while and we want to share the exact process we follow with the community shall it help someone.

Summary

They main idea behind this process is to have the replacement node up and running as quick as possible by cutting down the process where it takes longer, streaming data.

The key points of the process are:

  1. Data will be copied from the old node to the new one using an external volume instead of transmitting it through the network.
  2. The new node will be given the exact same configuration as the replaced one. Therefore, the replacement node will be responsible for the same tokens as the replaced one, and will also have the same Host-ID, so, when it joins the ring, the other nodes won’t even notice the difference!

All our infrastructure is in AWS, therefore, we used EBS volumes to backup and restore cassandra data. You may use a different data transfer method which suits you better in your infrastructure.

Steps

  1. Setup the new node, paying special attention to the following configuration parameters:
    1. listen_address
    2. rpc_address
    3. seeds
  2. Create the external volume you’re going to use to transfer the data from the old node to the new one.
  3. Rsync data and commitlog directories to the external volume
    1. Mount the external volume into the old node in /mnt/backup
    2. Copy the Cassandra data directory into the volume: rsync -av --progress --delete /var/lib/cassandra/data /mnt/backup/data
    3. Copy the Cassandra commitlog directory into the volume: rsync -av --progress --delete /var/lib/cassandra/commitlog /mnt/backup/commitlog
    4. Unmount and disconnect the volume. Connect and mount it into the replacement node.
    5. Copy the Cassandra data directory: rsync -av --progress --delete /mnt/backup/data /var/lib/cassandra/data
    6. Copy the Cassandra commitlog: rsync -av --progress --delete /mnt/backup/commitlog /var/lib/cassandra/commitlog
  4. Drain the old node: nodetool drain
  5. Stop Cassandra in the old node: sudo service cassandra stop
    1. And make sure it doesn’t accidentally come back (i.e. if you’re running chef, supervisor or any other tool that may restart it automatically). This is EXTREMELY important, as if the replacement node tries to join the ring when the old one is alive, the new host will be assigned a new Host ID and the ring will be rebalanced as if we were adding a new node instead of replacing one.
  6. Do a final rsync. This one is to catch any last changes. (Repeat all steps from step 3)
  7. Ensure Cassandra data and commitlog folders are owned by the cassandra user (rsync copies the owner’s UID along with the data and that UID may not be the appropriate in the new machine).
  8. Start the new node. sudo service cassandra start
  9. Check that everything is working properly:
    1. In the replacement’s logs you should see a message like this: WARN <time> Not updating host ID <host ID> for /<replaced node IP address> because it's mine Indicating that the new node is replacing the old one.
    2. In the replacement’s logs you should also see one message like the following per token: INFO <time> Nodes /<old IP address> and /<new IP address> have the same token <a token>. Ignoring /<old IP address> Indicating that the new node is becoming primary owner of the replaced’s tokens.
    3. In the other nodes’ logs you should see a message like: Host ID collision for <Host ID> between /<replaced IP address> and /<replacement IP address>; /<replacement IP address> is the new owner Indicating that the other nodes acknowledge the change
    4. nodetool [status] should show the new node’s IP owning the replaced Host ID and the old one shouldn’t appear anymore.
    5. Everything should look normal.
  10. Update other nodes’ seeds list if the replaced node was a seed one.
  11. You can now safely destroy you old machine.

And voilà! By following these steps carefully you will be able to replace nodes and have them running quickly, avoiding any tokens movement or streaming.

Advertisements

Notes on CodeMesh.io 2015

14264985868265codemesh215logotransparentThis was my first CodeMesh edition. I had been told to attend several times by my former colleague Félix López and it was really good. Probably the conference with highest technical level I’ve ever attended so far and as such I made lots of notes that you can see right now.

Concurrency + Distribution = Availability + Scalability

by Francesco Cesarini

This talk was a really useful step by step recipe to distributed systems with a final summary in the form of a 10 bullet points list:

  1. Split up your system’s functionality into manageable, stand-alone nodes.
  2. Decide what distributed architectural pattern you are going to use.
  3. Decide what network protocols your nodes, node families and clusters will use when communicating with each other.
  4. Define your node interfaces, state and data model.
  5. For every interface function in your nodes, you need to pick a retry strategy.
  6. For all your data and state, pick your sharing strategy across node families, clusters and types, taking into consideration the needs of your retry strategy.
  7. Reiterate through steps 1 to 6 until you have the trade-offs which suit your specification.
  8. Design you cluster blueprint, looking at node ratios for scaling up and down.
  9. Identify where to apply back-pressure and load regulation.
  10. Define your Ops & Management approach, defining system and business alarms, logs and metrics.

You definitely want to have this list close when designing distributed systems, don’t you?

From irrational configuration system to functional data store

by Rob Martin

This talk was an interesting story about how they migrated a highly complicated and inflexible Oracle data store into a functional one based on Tries. With the migration they achieved, apart from a much more simple and manageable system, an impressive read latency speedup (from more than 15 seconds to less than 10 ms). Two take aways from this talk:

Target your response times below 10 ms.

It is always good to have reference values, this is a good one.

Build tools so that your juniors can work on it.

That means:

  • Simplicity
  • You can hire juniors
  • You can develop seniors (not only hire them).

Contemporary approach to data at scale

by Ben Stopford

This talk was a really nice journey through all concepts, problems and available solutions for having data at

Storage engine:

  • Have your indexes in RAM
  • Use indexed batches (Cassandra SSTables)
  • Brute force approach: Columnar. Save one file per row and compress.

Data access:

  • Consistent hashing: Agents know where the data is stored. Very efficient and scalable.
  • Broadcast: Simpler but may cause network overhead when getting bigger.

Architectures (click to enlarge):

IMG_1137

Find the slides at benstopford.com

Exercise analysis

by Jan Machacek

This talk was specially interesting as the usage of mobile sensors to classify user’s activity is something we’re working on at MyDrive as well therefore very interesting takeaways for us here:

  • Use OpenCV library for classification.
  • Classify data on the phone.

And two ideas to improve the product that are potentially applicable for us at MyDrive as well:

  • Have a ML model per user.
  • Implement live sessions instead of submitting all the data to the backend at the end.

Beyond Lists: High Performance Data Structures

by Phil Trelford

This talk was a very good and dense one, therefore I’m really looking forward the video to be released, but until then…

Immutability + memory affinity for high speed.

And three new data structures to look at:

Boot my (secure)->(portable) clouds

by Nicki Watt

This talk was really interesting as well because we at MyDrive, as pretty much everyone out there I think, we rely on cloud system providers as Amazon AWS, for our deployments.

The takeaways from this talk are the Cloud computing ASAP principles and three lessons:

  • Automate everything
  • Separate config from code
  • API driven clouds & tools
  • Prefer modular, open source tools

Lesson 1: There’s NO single common cloud API, therefore use Terraform

This is something we are already doing, but I also made a note on checking Packer out. A tool to manage base OS images (we currently rely on Ubuntu’s AMIs)

Lesson 2: Decouple infrastructure and configuration management tools using automated configuration management tools, such as Chef, Ansible, …

Again, here we’re doing good at MyDrive using Chef, but also an interesting note on checking FreeIPA out to manage user accounts was made.

Lesson 3: Don’t roll your own security tools, use Vault to manage secrets and sensitive information.

This is again a tool to be checked out, but we, at MyDrive, are already following the rule using Citadel, a tool to store secrets in Amazon S3.

OpenCredo‘s guys have already compiled this whole talk in their own Boot my secure government cloud blog post. Read recommended!!

Reactive Design Patterns

by Roland Kuhn

This talk was a good review of concepts and patterns from The Reactive Manifesto

  • Responsiveness: The system responds in a timely manner.
  • Elasticity: Capability of scaling both up and down, both to meet requirements and keep costs controlled.
  • Resilience: The system responds in the presence of failures.
  • Message driven: This means that the system is decoupled with a share nothing architecture.

And also some architecture patterns for it:

  • Single Responsibility Principle:
    • A component must do only one thing and do it completely.
    • In order to achieve it, keep splitting responsibilities into nodes until you have single components, but not any further. Avoid trivial components.
  • Circuit breaker Pattern:
    • Protect services by breaking connection to failing components
  • Request – Response Pattern:
    • Include a return address in the message in order to receive the response.
  • Saga Pattern:
    • Divide long-lived and distributed transactions into quick local ones with compensating transactions for failure cases.
    • Partition your data so that every operation remains local.
    • Example: Bank transfer avoiding lock of both accounts:
    • T1: Transfer money from X to a local working account.
    • C1: Compensate failure by transferring money back to X.
    • T2: Transfer money from local working account to Y.

Panel Discussion

Well, this was a very interesting closing event, having the chance to hear people with so many years of experience and knowledge as the panelists was a pleasure. I was specially excited to listen to Tony Hoare talking about his story and a little bit of the future.

The panelists were:

As you can imagine I can only advice you to watch the video, but there were two specially interesting ideas:

  • A very big improvement for computer science would be to fix the so problematic mutable shared state
  • The tools is where the action is:

CS_Hv0HWsAEKree

Fitting IPython Notebooks, Spark and Cassandra all together

Fitting IPython Notebooks, Spark and Cassandra all together

Yesterday after more than a whole week working on this I’ve finally managed to set all this stack up.

This stack is one of the most hot and trending topics nowadays as it is very useful for BigData, specifically for data exploration purposes.

My starting point was a 3 nodes Cassandra cluster, intended for analytics, data exploration and adhoc reporting. This cluster was running
DSE (DataStax Enterprise) 4.6, which ships with Cassandra 2.0. Worth mentioning that the cluster was running in Cassandra only mode.

After having attended to the latest Cassandra Summit ’15, it was made clear not to use
Spark with any Cassandra earlier than 2.1.8, because the integration was buggy. Therefore:

  1. Upgrade DSE to latest (4.8) version, that includes Cassandra 2.1.10.
  2. Next step is to enable Spark on the cluster. This is one of the points where relying on something like DSE comes in handy, as the DSE distribution comes
    with a Spark installation and the Cassandra-Spark connector already configured and optimised for maximum compatbility and throughput. It is also really easy to
    enable Spark on the nodes.

  3. Once I had Cassandra and Spark running on all nodes and one of them was designated as the Spark Master (first of the Cassandra seeds by default, but you can check it with dse client-tool spark master-address), it’s time to install IPython and all its dependencies:
    1. First step is to install all system required packages (use apt-get, yum, etc depending on your OS): sudo apt-get install build-essential libcurl4-openssl-dev libssl-dev zlib1g-dev libpcre3-dev gfortran libblas-dev libblas3gf liblapack3gf liblapack-dev libncurses5-dev libatlas-dev libatlas-base-dev libscalapack-mpi1 libscalapack-pvm1 liblcms-utils python-imaging-doc python-imaging-dbg libamd2.3.1 libjpeg-turbo8 libjpeg8 liblcms1 libumfpack5.6.2 python-imaging libpng12-0 libpng12-dev libfreetype6 libfreetype6-dev libcurl4-gnutls-dev python-pycurl-dbg git-core cython libhdf5-7 libhdf5-serial-dev python-egenix-mxdatetime vim python-numpy python-scipy pandoc openjdk-7-jdk
    2. Next step is to install Python Virtualenv, as IPython depends on Python and changing the system’s Python installation can be dangerous: sudo pip install virtualenv
    3. Then, create a folder for your installation. This folder will contain the virtual environment for the notebooks installation (ipynb in this example). mkdir ipynb and cd ipynb
    4. Create a virtual environment: virtualenv ipython. Where ipython is the name of the virtual environment we’re creating.
    5. To begin using the virtual environment we need to activate it: source ipython/bin/activate. At this point our prompt will indicate that we’re inside the ipython virtual env.
    6. Install all IPython’s dependencies (using pip): pip install uwsgi numpy freetype-py pillow scipy python-dateutil pytz six scikit-learn pandas matplotlib pygments readline nose pexpect cython networkx numexpr tables patsy statsmodels sympy scikit-image theano xlrd xlwt ipython[notebook]
  4. Let’s create a IPython default profile (we’ll not use it, but it’s safe to create it to avoid bugs and strange issues)
    • ./ipython/bin/ipython profile create --profile=default --ipython-dir .ipython
  5. Then we create the pyspark ipython profile we’ll use.
    • ./ipython/bin/ipython profile create --profile=pyspark --ipython-dir .ipython
  6. Now install MathJax extension:
    • python -c "from IPython.external.mathjax import install_mathjax; install_mathjax(replace=True, dest='~/ipynb/ipython/lib/python2.7/site-packages/IPython/html/static/mathjax')"
  7. Now paste the following contents into ~/ipynb/.ipython/profile_pyspark/ipython_notebook_config.py

      c = get_config()
    
      # The IP address the notebook server will listen on.
      # If set to '*', will listen on all interfaces.
      c.NotebookApp.ip= '*'
    
      # Port to host on (e.g. 8888, the default)
      c.NotebookApp.port = 8888
    
      # Open browser (probably want False)
      c.NotebookApp.open_browser = False
    
  8. Create now the following file under ~/ipynb/.ipython/profile_pyspark/startup/00-pyspark-setup.py

      import os
      import sys
    
      spark_home = os.environ.get('SPARK_HOME', None)
      if not spark_home:
        raise ValueError('SPARK_HOME environment variable is not set')
      sys.path.insert(0, os.path.join(spark_home, 'python'))
      sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip'))
      execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))
    
  9. Prepare the environment:

    1. export SPARK_HOME=/usr/share/dse/spark
    2. export PYSPARK_SUBMIT_ARGS='--master spark://<spark_master_host>:<spark_master_port> pyspark-shell'
  10. Start your Notebooks server!!:

    • ipython/bin/ipython notebook --profile=pyspark

Now you should be able to navigate to <host_running_notebooks_server>:8888 and and see the WebUI!

Finally, check that everything is working by creating a new notebook and typing and running sc into it. You should see <pyspark.context.SparkContext at 0x7fc70ac8af10>

Troubleshooting:

  1. I’ve followed the steps but when I type and run sc, an empty string is returned.

    • Make sure environment variables defined at step 9 are properly set.
    • Make sure the py4j-0.8.2.1-src.zip actually exists under SPARK_HOME/python/lib (Update your version of it on the startup script accordingly).
    • That’s because the startup script (saved at step 8 under …startup/00-pyspark-setup.py) hasn’t run properly. Try to run it’s contents as a notebook to debug what’s happening.
  2. When running the startup script as a notebook I get: Java gateway process exited before sending the driver its port number

    • You are not running using Java JDK but JRE instead.

Notes on Cassandra Summit 2015

Screen Shot 2015-09-27 at 18.59.35

DataStax Cassandra Summit is becoming a GREAT conference!! With each new edition more and more people, talks and activities are adding themselves to the event to make it really awesome.

This year has been very special and intense for me, for several reasons.

  • First because of the location. This year, the Summit was held in Santa Clara, CA. Which meant an opportunity for me to know the area, including San Francisco and The Bay. I’ve really enjoyed walking and visiting the city’s touristic attractions. San Francisco is a lovely city and the weather has been just amazing, but I wouldn’t like to close this note without mentioning the reality of the streets. It is simply incredible the amount of homeless and people suffering mental diseases you can see just by walking the city. And it seems that the situation is getting worse as the housing prices increase. This article on the news particularly caught my attention: Tech bus drivers forced to live in cars to make ends meet.

  • Second reason is because I have spoken there!! This has been, by far, the biggest conference I’ve had the pleasure to speak at. And I really enjoyed it!! The talk was actually a live coding demo (best kind of talks for sure, aren’t they?) showing how I, being a Cassandra developer, not admin, tackled a production issue a couple of months ago. You can see the exact steps in this previous blog post and also in this video of this same talk in the London Cassandra Meetup.

  • Third reason is because I’ve been officially certified as an Apache Cassandra Developer by DataStax and O’Reilly Media: https://twitter.com/calonso/status/646480545514258432

  • And finally, because I’ve been awarded with one of the DataStax MVPs of the year 2015!! And, to be honest, I couldn’t be more excited. I love Cassandra and I love contributing to OS projects and getting some recognition on it is always welcome. I’d like to congratulate every other MVP of this year (full roster to be announced soon) and the whole DataStax for such a great and well organised event.

Tech Notes. Day 1

And after all this personal notes let’s go into the technical notes I’ve made from the talks I’ve attended. Hope you find them interesting!!

http://esri.github.io/ project and it’s GIS tools for Hadoop subproject

This first one is a project to check out and bear in mind always when working with geospatial data and corresponding visualisations.

Time series writing performance can be improved by buffering

This is pretty simple. If you do some buffering in your application and write bigger chunks of data, rather than doing one write per observation. It’s likely that your overall performance goes up.

PagerDuty’s ‘One year of Cassandra failures’

PagerDuty’s talk about their infrastructure was really interesting to me, as it somehow resembles ours.

Watch to dropped messages counts. They anticipate bigger issues

Upgrade Cassandra. At least to the corresponding latest DSE’s version.

The Cassandra Danger Metrics page.

Having a Dashboard of danger metrics to look at can be very useful. This should include pending tasks and latencies at least.

The Weather Company

Robbie Strickland’s was one of my favourite ones. I found it really dense and intense. Lots of notes here.

The Weather Company Lambda Architecture

First of all is to check Robbie’s book Cassandra High Availability

Be careful when using DTCS. It is dangerous.

Compactions in Cassandra have to be deeply understood, otherwise they will simply bite you sooner or later.

Process and filter your events at ingestion time.

That makes sense. Instead of having unstructured data, parse and process your data before ingesting it and that way you’ll be able to filter invalid data if necessary. If you don’t do it, you’ll have to parse and process every time you read, which is definitely worse.

And speaking particularly on his lambda architecture (in the picture), a couple of ideas:

  • Daily backup data from C* to S3 + Parquet. C* writes quickly but is more expensive than S3 for long term storage. Compute analytics at C* and dump when data is not likely to be required anymore.
  • Beware versions: Spark + Cassandra = 2.1.8
  • The usage of secondary indexes to help Spark reading data is a good practice, but keeping index cardinality low (<= 50k)
  • Beware wide rows, it only takes one to get you in trouble. Use nodetool toppartitions or nodetool cfstats Max row bytes
  • Make your data visualisable ASAP: Zeppelin project

Tech Notes. Day 2

Netflix

Christos Kalantzis and his colleagues’ talk about how they survived AWS re:boot gave as well lots of notes:

  • Run periodic checks on each nodes’ health, using, for example, Jenkins.
  • They use internal products Atlas and Priam for managing and monitoring the cluster.
  • Have idempotent processes.
  • Retry with exponential back-off.
  • Collect data/stats on your clusters to predict failures.
  • No ops team. Everyone acts as devops and gets on-call.

Troubleshooting with Cassandra

This talk from a DataStax’s support engineer left several notes as well:

  • If cache is full but the hit rate is low, increase cache capacity.
  • If memtableflushwriter:All time blocked jobs is significant means disk pressure on writes.
  • proxyhistograms command shows the full time to process the request, including the network round-trip from the coordinator to the required replicas. By contrast, the cfhistograms command shows the local latency for a single replica.
  • Use describecluster for schema disagreements.
  • Log warns with ‘Compacting large partition’ message when if encounters a big partition that you should take care of.

Lessons learnt with Cassandra and Spark

This talk by Christopher Bradford from OpenSourceConnections gave me some architectural notes.

  • Their architecture is composed by Cassandra to store data, Spark to ETL and Solr to search. An example of this setup is in their github account: https://github.com/o19s/Spark-Cassandra-Demo
  • Build balanced models: spread data/load evenly across all nodes.
  • Use vnodes in small clusters. Single token nodes if you’re big (Apple, Netflix, …)
  • Have a look at Metrics: Dropwizard‘s Java profiler project.

Extreme Cassandra Optimization.

I must admit this talk by Al Tobey was a bit out of reach for me. There were so many advanced optimisation tips that I can only link to his guide to come back to it when I get the appropriate level: Al’s Cassandra 2.1 tunning guide

Repeatable, Scalable, Reliable, Observable: Cassandra

This talk by Aaron Morton had sooooo many notes on how to monitor Cassandra that at a given point I decided to give up and simply grab the slides later to review them. REALLY GOOD!!! http://www.slideshare.net/aaronmorton/cassandra-sf-2015-repeatable-scalable-reliable-observable-cassandra

Case Study: Troubleshooting production issues as a developer.

And that was my talk!! A live coding demo!! I have to massively thank to everyone who attended to it as that was the last slot and I acknowledge everyone was already kind of burnout of Cassandra and I also hope they enjoyed it and found it valuable.

Here you can see a video of the same talk during the Cassandra London Meetup and the slides as well:

And that’s about it!! Hope this notes are as useful for you guys as they’re for me and a HUGE THANKS to everyone who spoke at the Summit for sharing such a valuable knowledge.

Benchmarking Cassandra Models

During the last couple of weeks we’ve been very focused on deeply understanding Cassandra’s write and read paths to build our models the best possible way.

After so much investigation I can summarise which are the steps we follow to assess our model’s performance.

1. Have good practices and anti-patterns always in mind.

Cassandra is known to be a very resilient and highly performant platform, but only so long as you follow the rules and work with the underlying data model.

There are quite a few of these rules so my recommendation is to read through them quickly before thinking of your model.

There are lots of places online where you can read about the good practices to follow and anti-patterns to avoid when data modelling for Cassandra but I have my own repository, here are the links:

Once you’ve all the ideas fresh in your mind is time to start thinking about your data model.

2. Design your data model and alternatives

Following all the ideas and principles learnt in the previous step, design your data model and try to think of alternatives. These alternatives usually come as minor adjustments that can be applied to the model but just by following the best practices you can’t decide whether one or the other is a better choice.

Here I’ll provide two examples we’ve recently had:

  1. Having a bucketed time series with a maximum of 86,400 rows per partition, how is it better to read an entire partition?
    a) By reading the whole partition at once
    b) By reading the whole partition in chunks (2 halves, 4 quarters, …)
  2. Having a model that contains the information of a discretised distribution on each record, how is it better to save the bins?
    a) By using a List element that will contain the 100 bins
    b) By having 100 columns, one for each bin

The resulting models will meet all the good practices and avoid all the anti-patterns regardless of the final decision so, how do you decide which way to go?

3. Benchmark your alternatives

For this purpose I’ve created a script (Ruby in this case) that:

  1. Creates the table purposed by the model under test
  2. Generates PRODUCTION DATA and saves it (memory or disk, depending on the size)
  3. Benchmarks the applicable access patterns, in our case:
    3.1. Insertion of all the data generated in step 2
    3.2. Reading of all the data
    3.3. Updating the data

It is important that the access patterns are exactly the same way they’ll be in production, otherwise the result of the benchmark is completely useless.

This script should be adapted and ran for every single alternative.

Here you can see the scripts used for both alternatives proposed for the example No. 2 described above:

4. Let data decide

We’re almost there!!

Now we have to collect the data for each execution and compare them to work out which of the candidates is our final model.

There are two sources of data you should look at:

  1. The output of the script’s execution.
    1.1 The script will print an output for every workload benchmarked, (Insert, Read and Update in our example)
  2. The DataStax OpsCenter’s Graphs.
    2.1 DataStax OpsCenter is probably the most advanced and easy to use Cassandra monitoring tool.

In our previous example we could see this output from the scripts:

calonso@XXX-XXX-XXX-XXX: bundle exec ruby lists_benchmark.rb
                user     system        total          real
Writing:  133.840000   5.180000   139.020000   (171.499862)
Reading:   24.000000   0.350000    24.350000   ( 47.897192)
Updating:   2.560000   0.210000     2.770000   (  4.135555)

calonso@XXX-XXX-XXX-XXX: bundle exec ruby cols_benchmark.rb
                user     system        total          real
Writing:  133.730000   2.710000   136.440000   (144.749167)
Reading:   30.340000   0.410000    30.750000   ( 41.759687)
Updating:   1.950000   0.090000     2.040000   (  3.020997)

So, we could say that the columns model performs better than the lists based one, but let’s confirm our initial assessment by looking at OpsCenter’s graphs:

In all the graphs we can see two peaks, the first one was generated during the execution of the lists based model benchmarking and the second one during the columns based one.

Absolutely every graph comparison points towards the columns based model as the one with better performance:

reads

  • This graphs show the total reads per second received in the whole cluster on the and coordinator nodes and the average time taken in responding them.

writes

  • This graphs show the total writes per second received in the whole cluster on the and coordinator nodes and the average time taken in responding them.

os_load

  • Average measure of the amount of work a computer performs. A Load of 0 means no work at all, and a load of 1 means 100% of work for a single core, therefore, this value depends on how many cores available. In our deployment = 2.

gcs

  • Number of times each of the JVM GCs run per second and the time elapsed in each run.

local_reads

  • Total reads per second received on the specific column families being used and the average time taken to respond them.

local_writes

  • Total writes per second received on the specific column families being used and the average time taken to respond them.

And that’s all for us for now!

Troubleshooting Cassandra performance issues

A couple of weeks ago we released a feature and it’s performance was unexpectedly poor and here I want to share the steps and tools used to get to the root cause of the problem.

To give a little bit of background I’ll tell you that the feature was something really common nowadays: Saving a bunch of time series in Cassandra

Step 1: Look for evidences in metrics

The natural first step I think everyone does is to look at the graphs (here we use graphite) and try to find evidences of what’s going on.

In our case we had a very clear evidence that something was wrong as the time consumed in the process had increased by around 600% after the deploy!!

events_latency_increase

But that means that not only something is wrong in the feature but also, and even more scary, in our whole process!! How can such a poorly performing piece of code have reached production without anyone noticing it before? Ok, we don’t have performance tests as part of our CI process, but we test every feature in our pre-production environments before going to production and that should have appeared there!! That would have simply been unacceptable and processes are easy and strong here @_MyDrive, so, after scrolling a little bit along the graphs we found an explanation. The tests ran in the pre-production environment were ok!

events_latency_increase_with_labs

Ok, so we have somewhere to look at: something on our production environment is performing poorly and, at first glance, our stats are not showing it at all!.

Step 2: Profile suspicious code

At this step we use the fantastic RubyProf to wrap the suspicious code in a RubyProf.profile block and save the results to analyse later.

require 'rubyprof'

results = RubyProf.profile do [Code to profile] end

printer = RubyProf::GraphPrinter.new results 
printer.print(File.new('/tmp/profiled-events-insert.txt', 'w'), min_percent: 2) 

Reading the saved files I could clearly see that the time was going into the Cassandra related stuff and made the assumption that the problem would be somewhere in the model/queries stuff.

I could have read a little more of the profiling result and will probably have saved some steps here, but as we were issuing several thousands of inserts asynchronously and the first lines of the profiling report were pointing to Cassandra everything looked crystal clear.

Step 3: Trace queries

There’s only one thing we’re doing here: INSERT so…

cqlsh:carlos_test> TRACING ON cqlsh:carlos_test> INSERT INTO ...
activity                          | timestamp    | source       | source_elapsed 
----------------------------------+--------------+--------------+----------------
execute_cql3_query                | 10:26:35,809 | 10.36.136.42 | 0 
Parsing INSERT INTO ...           | 10:26:35,836 | 10.36.136.42 | 26221 
Preparing statement               | 10:26:35,847 | 10.36.136.42 | 37556 
Determining replicas for mutation | 10:26:35,847 | 10.36.136.42 | 37867 
Acquiring switchLock read lock    | 10:26:35,848 | 10.36.136.42 | 38492 
Appending to commitlog            | 10:26:35,848 | 10.36.136.42 | 38558 
Adding to events memtable         | 10:26:35,848 | 10.36.136.42 | 38600 
Request complete                  | 10:26:35,847 | 10.36.136.42 | 38926 

Looking at this results something looks broken on parsing the query, and running the same thing on our pre-production environment it clearly shows as something broken!

cqlsh:benchmark> TRACING ON cqlsh:benchmark> INSERT INTO ...
activity                          |  timestamp   |    source     | source_elapsed 
----------------------------------+--------------+---------------+---------------- 
execute_cql3_query                | 10:27:40,390 | 10.36.168.248 | 0 
Parsing INSERT INTO ...           | 10:27:40,390 | 10.36.168.248 | 75 
Preparing statement               | 10:27:40,390 | 10.36.168.248 | 233 
Determining replicas for mutation | 10:27:40,390 | 10.36.168.248 | 615 
Acquiring switchLock read lock    | 10:27:40,390 | 10.36.168.248 | 793 
Appending to commitlog            | 10:27:40,390 | 10.36.168.248 | 827 
Adding to events memtable         | 10:27:40,390 | 10.36.168.248 | 879 
Request complete                  | 10:27:40,391 | 10.36.168.248 | 1099

But, what could be so wrong with parsing a query?

Step 4: Simplify the problem

At this point I decided to write a small Ruby program that:

  1. Connects to the Cassandra cluster
  2. Creates a test keyspace
  3. Creates a column family within the test keyspace
  4. Runs an insert like the one profiled above
  5. Drop the test keyspace

and profile it all using RubyProf to try to spot something obvious.

Running this script in production showed something useful, more than 98% of the time was spent in Cluster#connect method! Furthermore, almost 100% of the time inside that method was going to IO.select! Which means that the time is being wasted outside Ruby itself.

Actually I could have saved some time, because this exact same thing was also clear in the first profiling I did in step 1, but my premature assumption made me walk some extra unnecessary steps.

Ok, we have a new symptom, but no idea where to look at, so after some desperate and useless attempts like tcpdumping the communications between the client and the cluster I decided to go back to the script I wrote and…

Step 5: Enable Logging

Maybe this should have been the first, right?

But this was really revealing!, for some reason, the driver was trying to connect to four nodes when in our ring we only have three!! Of course, the connection to this extra node was failing on timeout (10 seconds) and that was the source of our poor performance!

A quick google with the symptoms as the query and … ta-dah!! We’re facing a Cassandra bug!!

Quickly applied the fix and we were saving our 10 seconds per execution again.

Conclusions

  1. Measure everything
    • Metrics and monitoring let us know we were facing an unexpected performance issue
  2. Keep calm, read to the end
    • On first profiling report I could have spotted that the issue was on Cluster#connect but due to my eagerness to find a fix I made a wrong assumption that meant more time.
  3. Thank the community
  4. Take advantage of learning opportunities!
    • These kind of unexpected situations normally push you out of your comfort zone and really test your limits which is really good for your personal development.

Cassandra Day London 2015

DataStax has made it again!!

So far I’ve attended to Cassandra Day London 2014, Cassandra Summit 2014 and today’s Cassandra Day London 2015 and several Cassandra Meetups, all organised by DataStax and I can only admire them, both for the organisation itself (food, merchandising, sponsors, etc…) but most important for the quality of the contents they deliver. I can arguably say that Cassandra would not be the same without DataStax.

But now let’s focus in what’s really important to us, the contents. I usually make notes on important things I listen to in conferences and then just transcribe them here for further reading and sharing.

Cassandra resilience through a catastrophe’s post-mortem.

by @cjrolo

They lost slightly more than 50% of their data center and their experience was that, after some tweaks and nights awake, Cassandra could still ingest all the data.

Their setup:

  • 1Tb writes per day
  • Three nodes cluster
  • Write consistency: ONE
  • Read consistency: QUORUM

Their recommendations:

  • Five nodes cluster (RF=3)
  • >1Gb links
  • SSDs
  • Avoid Batches and Counters.

They claim to have been in Cassandra since pre releases and that particular catastrophe happened to them before DataStax had released any OpsCenter whatsoever so I was curious to know how they were monitoring their cluster. They were using the bundled Graphite Reporter along with Statsd.

Using Cassandra in a micro services environment.

by @mck_sw

Mick’s talk was mostly about tools, particularly highlighting two:

  • Zipkin: A distributed tracing system developed by Twitter.
    • Useful for debugging, tracing and profiling on distributed services.
  • Grafana: OpenSource Graphs dashboard.
    • Very useful because can easily integrate with tools like Graphite or Cyanite.

One of the most interesting parts was, once again, the emphasis on monitoring.

Lessons learnt building a data platform.

by @jcasals & @jimanning, from british gas connected homes

They are building the Connected Homes product at British Gas, that is basically an IOT system that monitors temperature and boilers with several benefits for the customers.

They receive data back from users every two minutes.

And the lessons are:

  • Spark has overhead, so make sure it worths using it.
    • Basically, Spark takes advantage of parallelism and distribution across nodes, so if all computations are to be done in a single node then maybe you don’t need Spark.
  • Upsert data from different sources

Given this structure:

CREATE TABLE users(
id integer,
name text,
surname text,
birthdate timestamp,
PRIMARY KEY (id));

We can UPSERT like this

INSERT INTO users (id, name, surname) VALUES (1, 'Carlos', 'Alonso');
INSERT INTO users (id, birthdate) VALUES (1, 1368438171000);

Resulting in a completed record.

  • Tweak Spark jobs to avoid it killing Cassandra. Bear in mind that Spark is much more powerful than Cassandra and can, kill its memory. Check this little comic below for more info 😉

Spark vs Cassandra Comic

  • Gain velocity by breaking the barrier between Data Scientists and Developers in your team.

Amaze yourself with this visualisation of London’s energy consumption they showed!

London's Energy consumption

 

Cassandra at Hailo Cabs

by chris hoolihan, infrastructure engineer at hailo

At Hailo Cabs, they use Amazon AWS as their infrastructure to support Cassandra, particularly they use:

  • m1.xlarge instances in development systems
  • c3.2xlarge instances in production systems.
  • striped-ephemeral disks
  • 3 availability zones per DC

Again, one of the most interesting parts were the monitoring. They showed several really interesting tools, some of them developed by themselves!

  • Grafana
  • CTOP (Top for Casandra).
  • The Cassandra metrics graphite plugin.

And GoCassa, a Go Language wrapper for the Go Cassandra driver they developed themselves, to basically encourage best practices.

Finally he gave one final advice: Don’t put too much data!!

Antipatterns

By @CHBATEY, Apache Cassandra evangelist at Datastax

This talk was simply awesome, it really was a long time ago since I last had to make notes so fast and be so concentrated to try not to miss a word, and here are them!

Make sure every operation hits ONLY ONE SINGLE NODE.

Easy to explain, right? The more nodes, the more connections and therefore more time in resolving your query.

Use Cassandra Cluster Manager.

This is a development tool for creating local Cassandra clusters. Can be found here.

Use query TRACING.

Is the best way to profile how your queries perform.

  • Good queries trace small.
  • Bad queries trace long.

Cassandra cannot join or aggregate, so denormalise.

You have to find the balance between denormalisation and too much duplication. Also bear in mind that User Defined Types are very useful when denormalising.

‘Bucketing’ is good for time series.

It can help you distributing load among the different nodes and also achieving the first principle here: “Make sure every operations hits only one single node”.

It is better to have several asynchronous ‘gets’ hitting only one node each than a single ‘get’ query that hits several nodes.

Unlogged batches

Beware that this batches do not guarantee completion.

Unlogged batches can save on network hops but while the coordinator is going to be very busy while processing the batch, the other nodes will be mostly idle. It’s better to run individual queries and let the driver load the balance and manage the responses. Only if all parts of the batch are to be executed on the same partition, then, the batch is a good choice.

Logged batches

This ones guarantee completion by saving the batch to a particular commit log.

Logged batches are much slower than their unlogged counterpart (~30%) so only use them if consistency is ABSOLUTELY mandatory.

Shared mutable data is dangerous also in Cassandra.

This always reminds me of this tweet with a very descriptive explanation of how dangerous it is 😉

There are two main ways to avoid it:

  • Upserting (explained above)
  • Event sourcing: Basically just appending new data as it comes.
    • As this doesn’t scale, it’s good to combine it with some snapshot technique (taking a snapshot every night in batch job).

Cassandra does not rollback

So it’s pointless retrying failed inserts unless failed in the coordinator, because if it reached the coordinator, the it’ll have a hint to retry it later.

Don’t use Cassandra as a queue!!

Cassandra doesn’t delete, instead marks as deleted and those registers are around for a while and that will affect reads.

TTL’s also generate tombstones so beware!! (unless DateTieredCompaction)

Secondary Indexes

As Cassandra doesn’t know the cardinality it saves the index in local tables.

Local tables are on every node and only contains references to data that could be found on the corresponding node.

Therefore, to use them, a query will run on all the nodes.

You can see slides of this last talk here: http://www.slideshare.net/chbatey/webinar-cassandra-antipatterns-45996021

And that was it!! Amazing, huh?