piątek, 30 marca 2018

Apache ZooKeeper oraz Apache Kafka w OpenShifcie

Jeżeli nie posiadamy wykupionej lub darmowej usługi OpenShift (Starter lub Pro) to możemy postawić sobie lokalnie własny klaster OpenShift (opis powstał w oparciu o system Ubuntu).

Mając już zainstalowanego Dockera (warto dodać użytkownika do grupy "docker" aby nie trzeba było wykonywać komend dockerowych z prawami roota) sprawdzamy za pomocą komendy (jako root)
sysctl net.ipv4.ip_forward
czy wartością zwracaną jest 1.

Następnie konfigurujemy Dockera aby można było korzystać z niezaufanego Registry lokalnego OpenShifta poprzez edycję pliku "/etc/docker/daemon.json" i dodanie i (lub) aktualizację jak następuje:
{
   "insecure-registries": [
      "172.30.0.0/16"
   ]
}
Po wszystkim wywołujemy (jako root)
systemctl daemon-reload
i
systemctl restart docker
Pobieramy archiwum z narzędziem do zarządzania OpenShiftem:
wget https://github.com/openshift/origin/releases/download/v3.9.0/openshift-origin-client-tools-v3.9.0-191fece-linux-64bit.tar.gz
Rozpakowujemy:
tar -zxvf openshift-origin-client-tools-v3.9.0-191fece-linux-64bit.tar.gz
Kopiujemy (jako root):
cp ./openshift-origin-client-tools-v3.9.0-alpha.3-78ddc10-linux-64bit/oc /usr/bin/
Ustalamy prawa (jako root):
chmod 755 /usr/bin/oc
Stawianie i wyłączanie klastra odbywa się odpowiednio poprzez komendy
oc cluster up
i
oc cluster down
Po postawieniu klastra dostajemy zwrotkę mówiącą pod jakim adresem mamy interfejs oraz dane logowania:
Starting OpenShift using registry.access.redhat.com/openshift3/ose:v3.7.23 ...
OpenShift server started.

The server is accessible via web console at:
    https://127.0.0.1:8443

You are logged in as:
    User:     developer
    Password: <any value>

To login as administrator:
    oc login -u system:admin
Aby mieć możliwość pełnej administracji (m.in. dostęp do wszystkich projektów i zarządzanie różnego rodzaju politykami) musimy (jako root) najpierw zalogować się do OpenShifta:
oc login -u admin -p admin
Następnie (jako root):
oc adm policy add-cluster-role-to-user cluster-admin system --config=/var/lib/origin/openshift.local.config/master/admin.kubeconfig
Wychodzimy z konta roota i wchodzimy na stronę z OpenShiftem. W moim przypadku jest to "https://127.0.0.1:8443/". Logujemy się jako "system" i hasło "admin".

Zalogować się do naszej platformy możemy również za pomocą tokena. Na stronie OpenShifta wprowadzamy login i hasło, a następnie po pomyślnym uwierzytelnieniu prawym górnym rogu klikamy na ikonę użytkownika i wybieramy "Copy Login Command" i zawartość schowka wklejamy do terminala aby się zalogować jak np.:
oc login https://127.0.0.1:8443 --token=6TmPUCncEyzqXglh3goioYJSBasTGb7thS-vyaXIoVU
Możemy się również zalogować wpisując
oc login -u system -p admin
Sprawdźmy jakie mamy projekty:
oc get project
Następnie używamy konkretnego projektu:
oc project nazwa_projektu
lub tworzymy nowy projekt:
oc new-project kafka
Warto zaznaczyć aby na każdym etapie tworzenia poszczególnych zasobów w OpenShifcie sprawdzać czy nie wystąpiły jakieś błędy.

Nazwa projektu musi się zgadzać z opcją "namespace" w plikach konfiguracyjnych jeżeli takowa występuje.

Przechowywanie danych w klastrze jest obsługiwane przez tworzenie ze źródeł obiektów PersistentVolume. Dostęp do nich możesz uzyskać rozszcząc sobie prawa do zasobu poprzez wysłanie żądania (ze specjalnymi atrybutami jak np. rozmiar magazynu z danymi) przy użyciu obiektu PersistentVolumeClaim. Pomiędzymi tymi dwoma obiektami istnieje proces, który dopasowuje żądanie z dostępnym wolumenem i wiąże je ze sobą.

PersistentVolume reprezentuje część istniejącego sieciowego miejsca do przechowywania danych w klastrze, który został postawiony przez administratora. Z kolei PersistentVolumeClaim reprezentuje żądanie użytkownika o przydzielenia miejsca. Innymi słowy tak jak strąk (z ang. "pod") konsumuje zasoby węzła tak PersistenceVolumeClaim konsumuje zasoby PersistentVolume.

Tworzymy plik "kafka_pvc.yml" z definicją obiektów PersistentVolumeClaim dla Kafki:
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: datadir-kafka-0
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 1Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: datadir-kafka-1
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 1Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: datadir-kafka-2
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 1Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: datadir-kafka-3
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 1Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: datadir-kafka-4
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 1Gi
Wprowadzamy komendę, która utworzy nam magazyny danych:
oc create -f kafka_pvc.yml
Następnie plik "zookeeper_pvc.yml" dla ZooKeepera:
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: datadir-zoo-0
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 1Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: datadir-zoo-1
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 1Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: datadir-zoo-2
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 1Gi
Odpowiednio podajemy komendę:
oc create -f zookeeper_pvc.yml
Aby sprawdzić jakie mamy dostępne wolumeny wpisujemy:
oc get pvc
Przechodzimy do wdrażania naszych usług, które w Kubernetesie obsługiwane są jako wewnętrzne równoważenie obciążenia (z ang. "internal load balancer"). Można zdefiniować zestaw zreplikowanych strąków ("pods") i wtedy pośredniczyć w przekazywaniu do nich połączeń. Usługi mają przypisany adres IP oraz port, za pomocą którego przekazuje się połączenia do odpowiedniego strąka.

Rekomendowana maksymalna liczba strąków na węzeł OpenShifta to 110.

Tworzymy plik "zookeeper_services.yml" o zawartości:
apiVersion: v1
kind: Service
metadata:
  name: zookeeper
spec:
  ports:
  - port: 2181
    name: client
  selector:
    app: zk
---
apiVersion: v1
kind: Service
metadata:
  annotations:
    service.alpha.kubernetes.io/tolerate-unready-endpoints: "true"
  name: zk
  labels:
    app: zk
spec:
  ports:
  - port: 2888
    name: peer
  - port: 3888
    name: leader-election
  clusterIP: None
  selector:
    app: zk
Odpalamy tworzenie usług jako:
oc create -f zookeeper_services.yml
StatefulSet jest obiektem używanym do zarządzania aplikacjami stanowymi; wdrażaniem i skalowaniem zestawem strąków.

Kolejnym punktem jest wpisanie do pliku "zookeeper_cluster.yml" następujących danych:
apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
  name: zoo
spec:
  serviceName: "zk"
  replicas: 3
  template:
    metadata:
      labels:
        app: zk
      annotations:
        pod.alpha.kubernetes.io/initialized: "true"
        pod.alpha.kubernetes.io/init-containers: '[
            {
                "name": "install",
                "image": "gcr.io/google_containers/zookeeper-install:0.1",
                "imagePullPolicy": "Always",
                "args": ["--version=3.5.2-alpha", "--install-into=/opt", "--work-dir=/work-dir"],
                "volumeMounts": [
                    {
                        "name": "opt",
                        "mountPath": "/opt/"
                    },
                    {
                        "name": "workdir",
                        "mountPath": "/work-dir"
                    }
                ]
            },
            {
                "name": "bootstrap",
                "image": "java:openjdk-8-jre",
                "command": ["/work-dir/peer-finder"],
                "args": ["-on-start=\"/work-dir/on-start.sh\"", "-service=zk"],
                "env": [
                  {
                      "name": "POD_NAMESPACE",
                      "valueFrom": {
                          "fieldRef": {
                              "apiVersion": "v1",
                              "fieldPath": "metadata.namespace"
                          }
                      }
                   }
                ],
                "volumeMounts": [
                    {
                        "name": "opt",
                        "mountPath": "/opt/"
                    },
                    {
                        "name": "workdir",
                        "mountPath": "/work-dir"
                    },
                    {
                        "name": "datadir",
                        "mountPath": "/tmp/zookeeper"
                    }
                ]
            }
        ]'
    spec:
      containers:
      - name: zk
        image: java:openjdk-8-jre
        ports:
        - containerPort: 2181
          name: client
        - containerPort: 2888
          name: peer
        - containerPort: 3888
          name: leader-election
        command:
        - /opt/zookeeper/bin/zkServer.sh
        args:
        - start-foreground
        readinessProbe:
          exec:
            command:
            - sh
            - -c
            - "/opt/zookeeper/bin/zkCli.sh ls /"
          initialDelaySeconds: 15
          timeoutSeconds: 5
        volumeMounts:
        - name: datadir
          mountPath: /tmp/zookeeper
        - name: opt
          mountPath: /opt/
      volumes:
      - name: opt
        emptyDir: {}
      - name: workdir
        emptyDir: {}
  volumeClaimTemplates:
  - metadata:
      name: datadir
      annotations:
        volume.alpha.kubernetes.io/storage-class: anything
    spec:
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 20Gi
Odpalamy jako aby stworzyć StatefulSet "zoo":
oc create -f zookeeper_cluster.yml
Przechodzimy do wdrażania Kafki. Zaczniemy od wprowadzenia poniższych danych do pliku "kafka_broker_service.yml":
---
apiVersion: v1
kind: Service
metadata:
  annotations:
    service.alpha.kubernetes.io/tolerate-unready-endpoints: "true"
  name: broker
spec:
  ports:
  - port: 9092
  clusterIP: None
  selector:
    app: kafka
Odpalamy jako:
oc create -f kafka_broker_service.yml
Każdy z brokerów, do których możemy się podłączyć aby wysyłać wiadomości będzie dostępny pod nazwą hosta według wzoru "kafka-NUMER_WĘZŁA.broker.kafka.svc.cluster.local".

Kolejną rzeczą jest następna usługa, której definicję wrzucamy do pliku "kafka_service.yml":
---
apiVersion: v1
kind: Service
metadata:
  name: kafka
spec:
  ports:
  - port: 9092
  selector:
    app: kafka
 
Wprowadzamy komendę:
oc create -f kafka_service.yml
Na koniec definicja klastra Kafki w pliku "kafka_cluster.yml":
apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
  name: kafka
spec:
  serviceName: "broker"
  replicas: 3
  template:
    metadata:
      labels:
        app: kafka
      annotations:
        pod.alpha.kubernetes.io/initialized: "true"
        pod.alpha.kubernetes.io/init-containers: '[
        ]'
    spec:
      containers:
      - name: broker
        image: solsson/kafka:0.10.0.1
        ports:
        - containerPort: 9092
        command:
        - sh
        - -c
        - "./bin/kafka-server-start.sh config/server.properties --override broker.id=$(hostname | awk -F'-' '{print $2}')"
        volumeMounts:
        - name: datadir
          mountPath: /opt/kafka/data
  volumeClaimTemplates:
  - metadata:
      name: datadir
      annotations:
        volume.alpha.kubernetes.io/storage-class: anything
    spec:
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 100Mi
Ostatecznie wprowadzamy:
oc create -f kafka_cluster.yml
Teraz musimy sobie przetestować nasze wdrożenia. W tym celu tworzymy nowy strąk ("pod") definiując go w pliku "kafka_client_pod.yml":
apiVersion: v1
kind: Pod
metadata:
  name: testclient
spec:
  containers:
  - name: kafka
    image: solsson/kafka:0.10.0.1
    command:
      - sh
      - -c
      - "exec tail -f /dev/null"
I odpalamy:
oc create -f kafka_client_pod.yml
Na dwóch oddzielnych terminalach odpalamy komendy umożliwiające nad zalogowanie się do osobnych sesji w kontenerach do testów:
oc rsh testclient
W pierwszym terminalu tworzymy temat w ZooKeeperze:
./bin/kafka-topics.sh --zookeeper zookeeper:2181 --create --partitions 1 --replication-factor 3 --topic test
W tym samym terminalu podłączamy się do ZooKeepera aby konsumować wiadomości:
./bin/kafka-console-consumer.sh --zookeeper zookeeper:2181 --topic test --from-beginning
Na drugim terminalu wpisujemy:
./bin/kafka-console-producer.sh --broker-list kafka-0.broker.kafka.svc.cluster.local:9092,kafka-1.broker.kafka.svc.cluster.local:9092,kafka-2.broker.kafka.svc.cluster.local:9092 --topic test
i wprowadzamy jakikolwiek ciąg znaków, który powinien pojawić się nam w pierwszym terminalu.

Aby usunąć wszystkie strąki ("pody"), usługi i klastry to wpisujemy:
oc delete all --all
Dodatkowo usuwamy wszystkie wolumeny:
oc delete pvc --all