Showing posts with label Hive. Show all posts
Showing posts with label Hive. Show all posts

Thursday, April 10, 2014

Install Shark 0.9.1 on CDH 5.0.0 GA (hadoop 2.3.0) + Spark Configuration on CDH5


How to install Shark in CDH 5.0.0 GA

Requirements for Shark

1. CDH5
2. Spark
spark should be already installed in CDH 5, under /opt/cloudera/parcels/CDH/lib/spark

follow these steps you will install Shark 0.9.1 in /var/lib/spark on CDH 5.0.0 GA with Hadoop version 2.3.0

/var/lib/spark is the default user home of spark user in CDH 5.0.0 GA

you need run these scripts as root or spark user (you need to change the shell of spark user to /bin/bash, by default it is nologin)

1. Download Shark source code

export SPARK_USER_HOME=/var/lib/spark

cd $SPARK_USER_HOME

wget http://www.scala-lang.org/files/archive/scala-2.10.3.tgz

tar zxf scala-2.10.3.tgz

wget https://github.com/amplab/shark/archive/v0.9.1.tar.gz

tar zxf v0.9.1.tar.gz

OR YOU CAN DOWNLOAD MY shark-0.9.1 version, it is complied with CDH 5.0.0 packages:

http://user.cs.tu-berlin.de/~tqiu/fxlive/dataset/shark-0.9.1-cdh-5.0.0.tar.gz

2. Configure Shark

we can use the hive 0.12 in CDH5, so we do not need to download spark/shark version of hive 0.11 bin

set following configs in $SPARK_USER_HOME/shark-0.9.1/conf/shark-env.sh :
export SPARK_USER_HOME=/var/lib/spark
export SPARK_MEM=2g
export SHARK_MASTER_MEM=1g
export SCALA_HOME="$SPARK_USER_HOME/scala-2.10.3"
export HIVE_HOME="/opt/cloudera/parcels/CDH/lib/hive"
export HIVE_CONF_DIR="$HIVE_HOME/conf"
export HADOOP_HOME="/opt/cloudera/parcels/CDH/lib/hadoop"
export SPARK_HOME="/opt/cloudera/parcels/CDH/lib/spark"
export MASTER="spark://test01:7077"

SPARK_JAVA_OPTS=" -Dspark.local.dir=/tmp "
SPARK_JAVA_OPTS+="-Dspark.kryoserializer.buffer.mb=10 "
SPARK_JAVA_OPTS+="-verbose:gc -XX:-PrintGCDetails -XX:+PrintGCTimeStamps "
export SPARK_JAVA_OPTS

(change the host name test01 in MASTER="spark://test01:7077" to your host)

3. Build Shark with Hadoop 2.3.0-cdh5.0.0

if you downloaded my shark-0.9.1 version above (http://user.cs.tu-berlin.de/~tqiu/fxlive/dataset/shark-0.9.1-cdh-5.0.0.tar.gz), you do not need to build it, you can jump to Step 5. Otherwise, you need to compile your shark-0.9.1 with hadoop 2.3.0-cdh5 :

cd $SPARK_USER_HOME/shark-0.9.1/
SHARK_HADOOP_VERSION=2.3.0-cdh5.0.0 ./sbt/sbt package

it takes a long time, depends on your network... normally it will be very slow... -_-

so may be now you want to download the pre-built shark-0.9.1 package for cdh 5.0.0 GA ...

again, it is here:
http://user.cs.tu-berlin.de/~tqiu/fxlive/dataset/shark-0.9.1-cdh-5.0.0.tar.gz

4. Parquet support

wget http://repo1.maven.org/maven2/com/twitter/parquet-hive/1.2.8/parquet-hive-1.2.8.jar -O $SPARK_USER_HOME/shark-0.9.1/lib/parquet-hive-1.2.8.jar

ln -s /opt/cloudera/parcels/CDH/lib/hadoop/parquet-hadoop.jar $SPARK_USER_HOME/shark-0.9.1/lib/
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/parquet-common.jar $SPARK_USER_HOME/shark-0.9.1/lib/
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/parquet-encoding.jar $SPARK_USER_HOME/shark-0.9.1/lib/
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/parquet-format.jar $SPARK_USER_HOME/shark-0.9.1/lib/
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/parquet-avro.jar $SPARK_USER_HOME/shark-0.9.1/lib/
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/parquet-column.jar $SPARK_USER_HOME/shark-0.9.1/lib/
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/parquet-thrift.jar $SPARK_USER_HOME/shark-0.9.1/lib/
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/parquet-generator.jar $SPARK_USER_HOME/shark-0.9.1/lib/
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/parquet-cascading.jar $SPARK_USER_HOME/shark-0.9.1/lib/
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/parquet-hadoop-bundle.jar $SPARK_USER_HOME/shark-0.9.1/lib/
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/parquet-scrooge.jar $SPARK_USER_HOME/shark-0.9.1/lib/

i am not sure if all of these jars are needed. but it works with these parquet jars...

and if you enable this parquet support, you need to set the SPARK_MEM in $SPARK_USER_HOME/shark-0.9.1/conf/shark-env.sh with at least 2GB .

5. Deploy shark to all the worker nodes

#MASTER

cd $SPARK_USER_HOME
tar zcf shark.tgz shark-0.9.1

scp this file to each worker, or

#WORKER

sudo ln -s /usr/bin/java /bin/java

export SPARK_USER_HOME=/var/lib/spark
cd $SPARK_USER_HOME
scp shark@test01:$SPARK_USER_HOME/shark.tgz $SPARK_USER_HOME/
tar zxf shark.tgz

6. Configure Spark

if your spark service can not be started in CM5 (Cloudera Manager 5), you may need to remove the "noexec" part of the /var or /var/run mount point. Using command:
mount -o remount,exec /var/run

and change the mount parameter in the line of /var or /var/run in /lib/init/fstab for a permanent solution.
you may need to go back to #MASTER, and add the workers in /etc/spark/conf/slaves .

such as you have 2 worker nodes:
echo "test02" >> /etc/spark/conf/slaves
echo "test03" >> /etc/spark/conf/slaves

and, in /etc/spark/conf/spark-env.sh you may need to change
export STANDALONE_SPARK_MASTER_HOST=`hostname`
to
export STANDALONE_SPARK_MASTER_HOST=`hostname -f`

7. Run it!

finally, i believe you can run the shark shell now!

go back to #MASTER
$SPARK_USER_HOME/shark-0.9.1/bin/shark-withinfo -skipRddReload

this -skipRddReload is only needed when you have some table with hive/hbase mapping, because of some issus in PassthroughOutputFormat by hive hbase handler.

the error message is something like:
"Property value must not be null"
or
"java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat"

8. Issuses

ref.: http://bigdataanalyze.blogspot.de/2014/03/installing-shark-in-cdh5-beta2.html

it is a good guide for Installing Shark in CDH5 beta2.

the author has also collect some common issues about Shark in CDH5 beta2: http://bigdataanalyze.blogspot.de/2014/03/issues-on-shark-with-cdh5-beta2-1.html

Friday, February 28, 2014

HiveServer2 does not return ResultSets in UTF-8 encoding 解决HiveServer2 JDBC显示UTF8乱码的问题

add following env variables in Hive startup script ($HIVE_HOME/bin/hive):

export LANG=en_US.UTF-8
export HADOOP_OPTS="$HADOOP_OPTS -Dfile.encoding=UTF-8"

MapR cluster: /opt/mapr/hive/hive-0.11/bin/hive
Cloudera cluster: /opt/cloudera/parcels/CDH/lib/hive/bin/hive
other Hadoop distribution: /usr/lib/hive/bin/hive (maybe...)

make sure, your data in HDFS are encoded in UTF-8, if not, you should set LANG variable and file.encoding in HADOOP_OPTS as same as the encoding you used for the files in HDFS.

我们通过 Hive JDBC 读数据的时候,如果有非 ascii 字符,比如中文,CJK,之类的,默认情况下很可能是无法正确读取的,要么是乱码,要么是问号。。。

可以通过给hive的启动脚本添加上面两个环境变量解决,不同hadoop发行版的hive启动脚本位置有所不同,上面也列出了。

其实重点就是,你存入HDFS的文件编码要与启动hive,hiveserver2,sqoop之类的这些服务的环境变量相同,可以通过在启动脚本中设置LANG,和在HADOOP_OPTS中指定file.encoding解决。

如果你存入hdfs的文件是UTF8,那就这样设置,如果是GBK之类的,就

export LANG="zh_CN.GBK"
export HADOOP_OPTS="$HADOOP_OPTS -Dfile.encoding=GBK"

Sunday, December 1, 2013

双十一后台数据分析利器 —— Apache Oozie 工作流调度系统 介绍与Tips

Apache Oozie workflow scheduling system 是一个可以以工作流的形式对Hadoop中的各种任务(action)进行调度的系统。

一个workflow文件通过XML定义了DAG(有向无环图 Direct Acyclic Graph)中的一系列任务的执行顺序,例如,Map/Reduce、Hive、Sqoop、HDFS DistCP、Pig、Java、SSH(用于执行Shell Script)等等。

在调度中,还提供通过 fork-join 的并行执行功能。

一个workflow可以放在一个coordinator中,让它周期性的重复执行,并且可以通过coordinator传递一些类似当前时间的参数,使每次workflow可以以不同的参数启动。

更多的Background的部分就不罗嗦了,很多其他中文的文章,一搜一大把,也可以看我站里的oozie英文版内容:http://www.abcn.net/2013/11/apache-oozie-workflow-scheduling-system.html

说点实在的,正经能用的上的东西。

在刚刚过去的双十一购物狂欢之后,很多人都会面对海量数据的分析任务,对这些数据的分析,我们不需要实时性,做batch processing就可以满足需求,但是可能这些数据大都来自不同的data sources,对不同的source需要做不同的处理后才能做整合分析,类似data warehouse的ETL过程。最后,需要将一份分析结果以报表的形式发给某些相关分析人员。

概括成:
收集/提取 -> 整理/清洗 -> 统一规范化 -> 分析 -> 制作报表 -> 结果发布
这样一个数据处理的流程,如果还要以一定频率重复执行,在big data的世界里,全靠map/reduce,hbase,hive神马的,就有点力不从心了。

写到crontab里?
你的集群里动辄数十台,可能成百上千台,只在一台机器上弄几个cron job去跑,太浪费了

用oozie的好处:
所有的action都是被包装成一个没有reduce的mapreduce程序,被丢到Hadoop集群里去跑,可以充分利用集群资源。

比如以前的
cron job A,在 hdp01 这个机器上,每个小时的15分启动
cron job B,在 hdp05 这个机器上,每个小时的20分启动
cron job C,在 hdp11 这个机器上,每个小时的50分启动,去读A和B的结果,然后做处理

现在用oozie,我们可以通过fork让A和B同时启动,然后join到C,而且A、B、C可以被hdp**集群中的任意一台slaver启动,不会每次都是把负载集中在01、05、11这三台机器上了。


假设现在要处理上面那个工作流程,我们用oozie可能会有这么几个部分:

收集/提取:
sqoop action,从sql数据库中提取数据
distcp action,从其他HDFS中复制过来数据
java action,自己写程序去读数据
shell action,用脚本,比如ftp
所有上面这些,都可以通过写到 fork/join 里,并行着让他们跑

整理/清洗:
java action,自己写java程序扫一遍数据有没有损坏的,这个部分可能就根据应用情况,有千差万别的可能了

统一规范化:
mapreduce action,用不同的InputFormat,把数据放到统一的OutputFormat去
java action,也可以写个java程序,把东西都扔到hbase里去

分析:
hive action,分析这块,我觉得比起自己去写map/reduce,真是应该好好考虑怎么用上hive
java action,在我们的实际应用中,我们还用了hbase endpoint coprocessor,为数据分析做一些数据的预处理工作,比如一些简单的aggregation。这个时候也许你也可以把这些coprocessor的client放到 fork-join 里,但是要考虑清楚让整个HBase集群对一个表同时启动多个endpoint coprocessor是否会吃得消

制作报表:
java action,shell script,what ever……shell的话,R,python,什么都行

发布结果:
mail action,没什么说的了。。。不过oozie自己的mail action貌似不能发带附件的邮件的,所以实际上我们在应用时还是写的shell script去发email,但是可能你的集群中,又有外网的访问限制,slaver是无法发邮件出去的,所以可能这时候就要用到ssh action了,ssh到master上,从HDFS下回报表数据,然后制成相应文件,作为附件发出去。

几个小点需要注意的

首先要说,这里的这几点,是对 http://www.abcn.net/2013/11/apache-oozie-workflow-scheduling-system.html 里说的几个 Tips 的补充,强烈建议先把那里的几个 Tip 了解一下,那些都是最常会遇到的,写成英文的,做做SEO。。。:p 下面这些是后来又想起来的,懒得写鸟语了~ 有空再补充到原来那篇去吧

1)oozie里是没有HBase action滴……也就是说,oozie不能像写hive script那样简单的直接操作hbase,要操作hbase,要么通过oozie的hive action,配合hbase hive mapping,要么自己写java client。另外如果要调用hbase的endpoint coprocessor,就必须自己写java client了,然后在oozie里作为java action放在workflow里用。

2)oozie variable name 变量名书写规则,XML Schema里有规定,不能有什么 - _ 之类的符号,比较郁闷,比如一个变量可能叫 inputDir ,但是你不能给命名成 input-dir ,这很恶心的一点是比如这样的变量名:errorHandlingWorkflowPath 就很恶心……但是嘞,让人崩溃的是,可不要把这个 oozie variable name 跟 oozie action name 的概念搞混,action name 可以用下划线、横线(减号)之类的……

3)在lib中的jar会被每台slaver(TaskTracker)复制过去,每次执行复制一遍,lib里的全部jar!……这在实际使用中会产生大量tmp文件,每台被分配到的TaskTracker都会产生!……这个非常危险!需要手动清理:(

4)在英文版内容:http://www.abcn.net/2013/11/apache-oozie-workflow-scheduling-system.html 中提到了,我们需要自己设置一个合适的maximum output data size,实践中发现,这个值跟你分给oozie的JVM Heap Size也有很大的关系,如果heapsize不多,但是这个max.output.data比较大,有时候可能会出现workflow假死的情况,就是整个工作流停在那里不动了,也不会产生error,所以不会有报错也不会跳到workflow的kill。。。

Friday, November 22, 2013

Apache Oozie workflow scheduling system - Introduction and Tips

Oozie is a workflow scheduling system for managing hadoop jobs and other executable components such as: Java, shell script, Hive, Sqoop, Pig, SSH, MapReduce, Hadoop FS, and provides kinds of EL Function.

Using oozie cooradinator + oozie workflow we can schedule our data processing tasks, it could be also used as a monitoring and error handling system.

Background

To introduce the Oozie framework in detail we need to know some backgrounds of oozie.

Oozie http://oozie.apache.org/ is a workflow scheduler system to manage Apache Hadoop jobs. The workflows and scheduler (in oozie it is a coordinator) will be defined with XML files. Oozie provides a management CLI and a web interface to show its running workflows. And you can also communicate with oozie from a Java program using Oozie Java Client library.

XML Schema
There is two types of XML, workflow, which defines the payload of a oozie job, and the other is coordinator, which defines the scheduling information of a oozie job.

You can use following command to validate if a xml file is valid workflow or coordinator definition in oozie framework.

$ oozie validate workflow.xml

Then we take a look at the detail of these two XML schema.

Workflow
Workflow schema defined, a workflow must have extractly one start element and end element. And between them, 0 to unbounded actions or decisions and so on. Some of actions and decision element will be explained in Example chapter.

Coordinator
Coordinator schema shows, that one coordinator can only involve extractly one workflow in action element.
That means, one coordinator can only control one workflow.

But, one workflow can have any number of sub-workflows.

Management Interface
Oozie provides three kinds of management interface, all you want do with oozie can be done in command line interface (CLI), there is also a Web interface which is using a so called Oozie URL, but from the web interface you can only take a look the information, you can not manage your oozie server or oozie workflows from browser. And third choice is also powerful, you can do everything in your Java program with Oozie Java Client library.

CLI
Some useful command:

Start a oozie workflow (or coordinator):
$ oozie job -oozie http://fxlive.de:11000/oozie -config /some/where/job.properties -run

Get the information of a oozie job with its ID (such as 0000001-130104191423486-oozie-oozi-W):
$ oozie job -oozie http:/fxlive.de:11000/oozie -info 0000001-130104191423486-oozie-oozi-W

Get the task log of a oozie job:
$ oozie job -oozie http://fxlive.de:11000/oozie -log 0000001-130104191423486-oozie-oozi-W

Stop a oozie job:
$ oozie job -oozie http://fxlive.de:11000/oozie -kill 0000003-130104191423486-oozie-oozi-W

The "-oozie" refers to a URL that called Oozie URL, by each command you have to point this URL explicit.

Web
The web interface is just the same as the "Oozie URL". For example, in this case, it is: http://fxlive.de:11000/oozie

Using this URL you can get all of informations about running jobs and configurations by your browser.

Java Client 
There is also a java client library of oozie.



=== Tips ===


1) Deploy Oozie ShareLib in HDFS

http://blog.cloudera.com/blog/2012/12/how-to-use-the-sharelib-in-apache-oozie/

https://ccp.cloudera.com/display/CDH4DOC/Oozie+Installation#OozieInstallation-InstallingtheOozieShareLibinHadoopHDFS

$ mkdir /tmp/ooziesharelib
$ cd /tmp/ooziesharelib
$ tar zxf /usr/lib/oozie/oozie-sharelib.tar.gz
$ sudo -u oozie hadoop fs -put share /user/oozie/share

2) Oozie Sqoop Action arguments from properties

Oozie Sqoop Action does not support multi-lines sqoop command from property file very well, we should use <arg> tag to set sqoop command line by line as sqoop job parameters.


3) ZooKeeper connection problem by importing to HBase using Sqoop-action

Problem: The mapper of a sqoop action tries to access zookeeper on localhost and not the one of the cluster.
Solution:
  1. Go to cloudera manager of the corresponding cluster
  2. Go to zookeeper service and get the hbase-site.xml
  3. Copy hbase-site.xml into hdfs under /tmp/ooziesharelib/share/lib/sqoop/

4) ZooKeeper connection problem by HBase Java client

Just like the similar ZooKeeper problem by Sqoop-action, we can put the hbase-site.xml into oozie common sharelib, or if we want to manually load HBase ZooKeeper configuration in Java, we can put the hbase-site.xml in jar, and then:

Configuration conf = new Configuration();
conf.addResource("hbase-site.xml");
conf.reloadConfiguration();

5) Hive action throws NestedThrowables: JDOFatalInternalException and InvocationTargetException

Put MySQL Java Connector into share lib or in Hive Workflow Root

If it still doesn't work, the take a look at http://cloudfront.blogspot.de/2012/06/failed-error-in-metadata.html

In short form:
hadoop fs  -chmod g+w  /tmp
hadoop fs  -chmod 777  /tmp
hadoop fs  -chmod g+w  /user/hive/warehouse
hadoop fs  -chmod 777  /user/hive/warehouse

6) Fork-Join action errorTo same transitions

It is fixed in Oozie version 3.3.2 (https://issues.apache.org/jira/browse/OOZIE-1035)

A temporarily solution for old Oozie version is shown here: https://issues.apache.org/jira/browse/OOZIE-1142

In short:

In oozie-site.xml , set oozie.validate.ForkJoin to false and restart Oozie.


7) Default maximum output data size is only 2 KB

sometime you will get this error:

Failing Oozie Launcher, Output data size [4 321] exceeds maximum [2 048]
Failing Oozie Launcher, Main class [com.myactions.action.InitAction], exception invoking main(), null
org.apache.oozie.action.hadoop.LauncherException 
 at org.apache.oozie.action.hadoop.LauncherMapper.failLauncher(LauncherMapper.java:571)
yep, it will happen sooner or later, because the default maximum output data size is only 2KB -_-

if you want to change this setting, you need set the property oozie.action.max.output.data to a larger one in oozie-site.xml , such as:
<property>
    <name>oozie.action.max.output.data</name>
    <value>1048576</value>
</property>
will set the max output size to 1024 KB .


8) SSH-Tunnel to bypass the firewall to get the web interface

The port 11000 may be blocked by default in some firewall, so if you want to use the web interface of oozie, you may need to set a ssh tunnel, to redirect the traffic with localhost:11000 to port 11000 on oozie server.

Then you can get the web interface using URL: http://localhost:11000/


9) sendmail-action after a decision-action

Java program set a status property and message property. by decision-action check if the status equals 0.

Sponsors: TUI.com mobilcom-debitel Online-Shop

Tuesday, November 19, 2013

Integration of Hive and HBase: Hive MAP column mapped to an entire column family with binary value (byte value)

准备陆续把一些工作中遇到的问题整理成文档贴到这里。

先来一个最要命的~

在我们之前的工作中,遇到了一个情况,我们需要把一个 hbase 的 column family mapping 到 hive table 里,但是这个cf存的都是 byte value ,column qualifier 是类似timestamp的随机不确定数值,不可能在column mapping时明确指定,必须把整个cf mapping到hive里。

就是说
Hive MAP column mapped to an entire column family with binary value

解决方法
using following mapping type specification:
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:#s:b")
cf:#s:b means, cf is column family name, and #s:b is the type specification for this entire HBase column family, column names (column qualifiers) as the keys of the Hive MAP, their datatype is string (s), and column values as Hive MAP values, with datatype binary (b, byte values).

搜了各种文档,没有一个提到这种情况的,官方文档
https://cwiki.apache.org/confluence/display/Hive/HBaseIntegration#HBaseIntegration-Examplewithbinarycolumns
也只是在讲某个具体的列,是binary column的情况,用 hbase.table.default.storage.type 不适用于要mapping整个cf的情况

参照文档里给的mapping一个具体列的格式
column-family-name:[column-name][#(binary|string)]
我们可以把这个 mapping entire column family with binary value 的写成如下范式:
column-family-name:[#(binary|string):(binary|string)]

举个例子

customer journey analytics 是 big data analytics 中一个很好的用 hbase + hive 的use case,我们假设所有的 customer events 都 tracking 到了一个 hbase table 里,key 是 customer id , long value , 在hbase里,为了保证key定长,long value 都通过 Bytes.toByte() 存为 byte value。一个 column family 存这个 customer 的所有 events ,比如 view , click , buy ,column qualifier 是 event 的timestamp,value 是 event id,同样是 long value,存为 byte。

这样这个表就是:
create 'customer_journey', 'events'
entries look like:
hbase(main):011:0> scan 'customer_journey'
ROW                               COLUMN+CELL
 \x00\x00\x00\x00\x00\x00\x00\x01 column=events:1354824339000, timestamp=1354824339000, value=\x00\x00\x00\x00\x00\x00 \x08
 \x00\x00\x00\x00\x00\x00\x00\x01 column=events:1354824340000, timestamp=1354824340000, value=\x00\x00\x00\x00\x00\x00'\x9E
 \x00\x00\x00\x00\x00\x00\x00\x01 column=events:1354824350000, timestamp=1354824350000, value=\x00\x00\x00\x00\x00\x00\x00\x0F
 \x00\x00\x00\x00\x00\x00\x00\x10 column=events:1354824350000, timestamp=1354824350000, value=\x00\x00\x00\x00\x00\x00\xF0\x08
 \x00\x00\x00\x00\x00\x00\x00\x10 column=events:1354824359000, timestamp=1354824359000, value=\x00\x00\x00\x00\x00\x00\x10\xD8
hive external table should be created like:
hive> CREATE EXTERNAL TABLE customer_journey (customer_id bigint, events map<string, bigint>)
    > STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    > WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key#b, events:#s:b")
    > TBLPROPERTIES ("hbase.table.name" = "customer_journey");
then you will get:
hive> select * from customer_journey;
OK
1 {"1354824339000":8200,"1354824340000":10142,"1354824350000":15}
16 {"1354824350000":61448,"1354824359000":4312}
Time taken: ...

o2 Flat M: Flatrate ins dt. Festnetz + Flatrate ins o2-Netz
Sponsors: TUI.com mobilcom-debitel Online-Shop
© Chutium / Teng Qiu @ ABC Netz Group