00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 """Tests for the objectid module."""
00016
00017 import datetime
00018 import warnings
00019 import unittest
00020 import sys
00021 import time
00022 sys.path[0:0] = [""]
00023
00024 from nose.plugins.skip import SkipTest
00025
00026 from bson.errors import InvalidId
00027 from bson.objectid import ObjectId
00028 from bson.tz_util import (FixedOffset,
00029 utc)
00030
00031 def oid(x):
00032 return ObjectId()
00033
00034
00035 class TestObjectId(unittest.TestCase):
00036
00037 def setUp(self):
00038 pass
00039
00040 def test_creation(self):
00041 self.assertRaises(TypeError, ObjectId, 4)
00042 self.assertRaises(TypeError, ObjectId, 175.0)
00043 self.assertRaises(TypeError, ObjectId, {"test": 4})
00044 self.assertRaises(TypeError, ObjectId, ["something"])
00045 self.assertRaises(InvalidId, ObjectId, "")
00046 self.assertRaises(InvalidId, ObjectId, "12345678901")
00047 self.assertRaises(InvalidId, ObjectId, "1234567890123")
00048 self.assert_(ObjectId())
00049 self.assert_(ObjectId("123456789012"))
00050 a = ObjectId()
00051 self.assert_(ObjectId(a))
00052
00053 def test_unicode(self):
00054 a = ObjectId()
00055 self.assertEqual(a, ObjectId(unicode(a)))
00056 self.assertEqual(ObjectId("123456789012123456789012"),
00057 ObjectId(u"123456789012123456789012"))
00058 self.assertRaises(InvalidId, ObjectId, u"hello")
00059
00060 def test_from_hex(self):
00061 ObjectId("123456789012123456789012")
00062 self.assertRaises(InvalidId, ObjectId, "123456789012123456789G12")
00063 self.assertRaises(InvalidId, ObjectId, u"123456789012123456789G12")
00064
00065 def test_repr_str(self):
00066 self.assertEqual(repr(ObjectId("1234567890abcdef12345678")),
00067 "ObjectId('1234567890abcdef12345678')")
00068 self.assertEqual(str(ObjectId("1234567890abcdef12345678")), "1234567890abcdef12345678")
00069 self.assertEqual(str(ObjectId("123456789012")), "313233343536373839303132")
00070 self.assertEqual(ObjectId("1234567890abcdef12345678").binary, '\x124Vx\x90\xab\xcd\xef\x124Vx')
00071 self.assertEqual(str(ObjectId('\x124Vx\x90\xab\xcd\xef\x124Vx')), "1234567890abcdef12345678")
00072
00073 def test_cmp(self):
00074 a = ObjectId()
00075 self.assertEqual(a, ObjectId(a))
00076 self.assertEqual(ObjectId("123456789012"), ObjectId("123456789012"))
00077 self.assertNotEqual(ObjectId(), ObjectId())
00078 self.assertNotEqual(ObjectId("123456789012"), "123456789012")
00079
00080 def test_binary_str_equivalence(self):
00081 a = ObjectId()
00082 self.assertEqual(a, ObjectId(a.binary))
00083 self.assertEqual(a, ObjectId(str(a)))
00084
00085 def test_multiprocessing(self):
00086
00087
00088 if sys.platform == "win32":
00089 raise SkipTest()
00090
00091 try:
00092 import multiprocessing
00093 except ImportError:
00094 raise SkipTest()
00095
00096 pool = multiprocessing.Pool(2)
00097 ids = pool.map(oid, range(20))
00098 pool.close()
00099 pool.join()
00100
00101 map = {}
00102
00103 for id in ids:
00104 self.assert_(id not in map)
00105 map[id] = True
00106
00107 def test_generation_time(self):
00108 d1 = datetime.datetime.utcnow()
00109 d2 = ObjectId().generation_time
00110
00111 self.assertEqual(utc, d2.tzinfo)
00112 d2 = d2.replace(tzinfo=None)
00113 self.assert_(d2 - d1 < datetime.timedelta(seconds = 2))
00114
00115 def test_from_datetime(self):
00116 d = datetime.datetime.utcnow()
00117 d = d - datetime.timedelta(microseconds=d.microsecond)
00118 oid = ObjectId.from_datetime(d)
00119 self.assertEqual(d, oid.generation_time.replace(tzinfo=None))
00120 self.assertEqual("0" * 16, str(oid)[8:])
00121
00122 aware = datetime.datetime(1993, 4, 4, 2, tzinfo=FixedOffset(555, "SomeZone"))
00123 as_utc = (aware - aware.utcoffset()).replace(tzinfo=utc)
00124 oid = ObjectId.from_datetime(aware)
00125 self.assertEqual(as_utc, oid.generation_time)
00126
00127 if __name__ == "__main__":
00128 unittest.main()