summaryrefslogtreecommitdiffstats
path: root/test/functional_auth/swiftkerbauth/test_swkrbath_active.py
blob: 86c79efb6e8a0a88c27569245b695334899dd145 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#!/usr/bin/python

# Copyright (c) 2010-2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import re
import unittest
from nose import SkipTest
import commands
import os
from test import get_config
from swift.common.bufferedhttp import http_connect_raw as http_connect

config = get_config('func_test')

class Utils:
    @classmethod
    def SwiftKerbAuthPrep(self,
            user=config['username'],domain=config['domain_name'],\
                passwd=config['password']):
        username = '%s@%s' % (user, domain)
        return commands.getstatusoutput('kinit %s <<< %s' % (username, passwd))

    @classmethod
    def SwiftKerbAuthCleanAll(self):
        return commands.getstatusoutput('kdestroy')


class TestSwKrbAthActive(unittest.TestCase):
    def setUp(self):
        #Perform kinit in active mode.
        (status, output) = Utils.SwiftKerbAuthPrep()
        self.assertEqual(status, 0, \
                'swkrbauth prep failed with valid credentials'+output)
        self.auth_host = config['auth_host']
        self.auth_port = int(config['auth_port'])
        self.auth_prefix = config.get('auth_prefix', '/auth/')
        self.auth_version = str(config.get('auth_version', '1'))
        self.account_name = config['account']
        self.username = config['username']
        self.password = config['password']
        self.auth_scheme = config['auth_scheme']

        #Prepare auth_url. e.g. http://client.rhelbox.com:8080/auth/v1.0
        if self.auth_version == "1":
            self.auth_path = '%sv1.0' % (self.auth_prefix)
        else:
            self.auth_path = self.auth_prefix
        self.auth_netloc = "%s:%d" % (self.auth_host, self.auth_port)
        auth_url = self.auth_scheme + self.auth_netloc + self.auth_path

        #Obtain the X-Auth-Token from kerberos server to use it in furhter
        #testing
        self.auth_token = None
        (status, output) = commands.getstatusoutput('curl -v -u : --negotiate\
                --location-trusted %s' % (auth_url))
        self.assertEqual(status, 0, 'Token negotiation failed:' +output)
        match = re.search('X-Auth-Token: AUTH.*', output)
        if match:
            self.auth_token = match.group(0).split(':')[1].strip()
        else:
            self.fail('No X-Auth-Token found, failed')

    def tearDown(self):
        Utils.SwiftKerbAuthCleanAll()


    def _get_auth_token(self):
        return {'X-Auth-Token' : self.auth_token}

    def testGetAccounts(self):
        #TODO: The test case is to perform GET on the account mentioned via
        #configuration file. This is a sample test case. The whole test
        #suite can be enhanced further to have further complicated test cases.
        path = '/v1/AUTH_%s' % (config['account'])

        headers = self._get_auth_token()
        conn = http_connect(config['auth_host'], config['auth_port'], 'GET',
                path, headers)
        resp = conn.getresponse()
        self.assertTrue(resp.status == 204)