Django 通过k8s的api接口实现查看以及远程ssh容器
基本思路,django编写页面,通过api调用获取kubernetes的部署情况,列出pod信息,然后根据pod信息埋点url,点击跳转到ssh docker的页面,通过嵌套 https://github.com/kubernetes-ui/container-terminal 实现登录到docker。
具体代码片段
django model 设计:
from django.db import models
# Create your models here.
class K8sHost(models.Model):
k8s_api = models.CharField(max_length=64, verbose_name=u"k8s连接地址",unique=True,help_text="例如 https://10.255.56.250:6443")
k8s_api_token = models.TextField(verbose_name=u'k8s连接token',help_text="参考 apiserver token")
k8s_ws = models.CharField(max_length=64, verbose_name=u"k8s连接地址",unique=True,help_text="例如 ws://10.255.56.250:8080")
k8s_ws_token = models.CharField(max_length=255,verbose_name=u'k8s websoket连接token',help_text="参考 ws token")
k8s_name = models.CharField(max_length=255,verbose_name='k8s集群名称',default='default',help_text="给k8s起个名")
def __str__(self):
return self.k8s_name
class Meta:
verbose_name = "k8s配置"
verbose_name_plural = verbose_name
class K8sexec(K8sHost):
class Meta:
verbose_name = "k8s管理"
verbose_name_plural = verbose_name
proxy = True
Django view 设计:
from django.shortcuts import render,render_to_response
from django.contrib.auth.decorators import login_required
# Create your views here.
import json
from django.http import HttpResponse
from k8sapp.utils import K8sApi
from .models import K8sHost
def getnamespacelist(request):
if request.method == 'GET':
k8s_apiport = str(request.GET.get('k8s_apiport',None)).strip()
message = []
if k8s_apiport:
k8s_client = K8sApi(confid=int(k8s_apiport))
namespaces_list = k8s_client.get_namespacelist()
for item in namespaces_list.items:
message.append(item.metadata.name)
return HttpResponse (json.dumps({'message': message}, ensure_ascii=False),
content_type="application/json,")
else:
return HttpResponse (json.dumps ({'message': "不允许POST请求"}, ensure_ascii=False),
content_type="application/json,charset=utf-8")
def getpodlist(request):
if request.method == "POST":
messages = ''
k8s_apiport = str(request.POST.get('k8s_apiport',None)).strip()
namespace = str(request.POST.get('k8s_namespaces',None)).strip()
if k8s_apiport and namespace:
k8s_client = K8sApi(confid=int(k8s_apiport))
pods_list = k8s_client.get_podlist(namespace=namespace)
for items in pods_list.items:
pod_name = items.metadata.name
pod_namespace = items.metadata.namespace
pod_creatime = items.metadata.creation_timestamp
host_ip = items.status.host_ip
pod_ip = items.status.pod_ip
messages += '''
%s
%s
%s
%s
%s
''' %(pod_namespace,k8s_apiport,pod_name,pod_name,pod_ip,host_ip,pod_namespace,pod_creatime)
return HttpResponse(json.dumps({'message': messages}, ensure_ascii=False),
content_type="application/json,charset=utf-8")
else:
return HttpResponse(json.dumps({'message': "不允许GET请求"}, ensure_ascii=False),
content_type="application/json,charset=utf-8")
@login_required()
def connectpod(request):
if request.method == "GET":
namespace = str(request.GET.get ('k8s_namespaces', None)).strip()
k8s_apiport = str(request.GET.get('k8s_apiport', None)).strip()
pod_name = str(request.GET.get ('k8s_pod', None)).strip()
token = str(K8sHost.objects.values ('k8s_ws_token').get(id=k8s_apiport)['k8s_ws_token']).strip()
k8s_url = K8sHost.objects.values('k8s_ws').get(id=k8s_apiport)['k8s_ws']
return render_to_response ('xtem_pod.html', {'status': 'ok', 'namespace': namespace, 'k8s_url': k8s_url,
'pod_name': pod_name,'token':token})
else:
return render_to_response ('xtem_pod.html', {'status': 'error'})
def podexec(request):
if request.method == "POST":
namespace = str(request.POST.get('k8s_namespaces', None)).strip()
k8s_apiport = str(request.POST.get('k8s_apiport', None)).strip()
pod_name = str(request.POST.get('k8s_pod',None)).strip()
command = str(request.POST.get('command', None)).strip ()
k8s_client = K8sApi(confid=int(k8s_apiport))
rest = k8s_client.get_pods_exec(podname=pod_name,namespace=namespace,command=command)
else:
rest = '不允许除POST之外的任何访问.'
return HttpResponse(json.dumps({'message': rest}, ensure_ascii=False),content_type="application/json,charset=utf-8")
django k8s接口调用插件:
from kubernetes import client, config
from kubernetes.stream import stream
from .models import K8sHost
# Create a configuration object
class K8sApi:
def __init__(self,confid):
self.confid = confid
def get_client(self):
baseurl = K8sHost.objects.values('k8s_api').get(id=self.confid)['k8s_api']
token = str(K8sHost.objects.values('k8s_api_token').get(id=self.confid)['k8s_api_token']).strip()
aConfiguration = client.Configuration()
aConfiguration.host = baseurl
aConfiguration.verify_ssl = False
aConfiguration.api_key = {"authorization": "Bearer " + token}
aApiClient = client.ApiClient (aConfiguration)
v1 = client.CoreV1Api(aApiClient)
return v1
def get_podlist(self,namespace):
client_v1 = self.get_client()
ret_pod = client_v1.list_namespaced_pod(namespace=namespace)
return ret_pod
def get_namespacelist(self):
client_v1 = self.get_client()
ret_namespace = client_v1.list_namespace()
return ret_namespace
def test_pods_connect(self,podname,namespace,command,container=None):
client_v1 = self.get_client()
if stream(client_v1.connect_get_namespaced_pod_exec, podname, namespace, command=command,
container=container,
stderr=True, stdin=False,
stdout=True, tty=False):
return True
else:
return False
def get_pods_exec(self,podname,namespace,command,container=None):
client_v1 = self.get_client()
if container:
rest = stream(client_v1.connect_get_namespaced_pod_exec, podname, namespace, command=command,
container=container,
stderr=True, stdin=False,
stdout=True, tty=False)
else:
rest = stream(client_v1.connect_get_namespaced_pod_exec, podname, namespace, command=command,
stderr=True, stdin=False,
stdout=True, tty=False)
return rest
容器连接页面设计:
{{ namespace }} - {{ pod_name }}
* {
box-sizing: border-box;
}
body {
margin: 20px !important;
font-family: sans-serif;
}
label {
display: block !important;
}
label span {
float: left;
width: 100px;
margin-top: 2px;
}
label .form-control {
display: inline !important;
width: 300px;
}
body > div {
margin-top: 15px;
}
kubernetes-container-terminal {
}
命名空间
: {{ namespace }} 连接容器名: {{ pod_name }}
angular.module('exampleApp', ['kubernetesUI'])
.config(function (kubernetesContainerSocketProvider) {
kubernetesContainerSocketProvider.WebSocketFactory = "CustomWebSockets";
})
.run(function ($rootScope) {
$rootScope.baseUrl = "{{ k8s_url }}";
$rootScope.selfLink = "/api/v1/namespaces/{{ namespace }}/pods/{{ pod_name }}";
$rootScope.containerName = "";
$rootScope.accessToken = "{{ token }}";
$rootScope.preventSocket = true;
})
/* Our custom WebSocket factory adapts the url */
.factory("CustomWebSockets", function ($rootScope) {
return function CustomWebSocket(url, protocols) {
url = $rootScope.baseUrl + url;
if ($rootScope.accessToken)
url += "&access_token=" + $rootScope.accessToken;
return new WebSocket(url, protocols);
};
});
实现效果:


